summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-01-05 05:42:52 +0000
committerDamien F. Katz <damien@apache.org>2009-01-05 05:42:52 +0000
commit10eac24f530b5de1ea30c83518d3971f99890db6 (patch)
treef5565b0002ec028538c78b65febaddb675297596 /src/couchdb
parent4228d249b4216d390aa1f8f063d09527c4dcbe6a (diff)
Delayed commit patch. Allows documents to be saved immediately but fully committed asynchronously. On by default, to fully commit documents synchronously, use X-Couch-Full-Commit=true in the http header. We still needs to prevent committing view indexes ahead of data and to detect when a server has crashed causing potentially lost updates during remote replication (fixed by retrying the incremental replication).
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@731452 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/couch_db.erl7
-rw-r--r--src/couchdb/couch_db.hrl4
-rw-r--r--src/couchdb/couch_db_updater.erl45
-rw-r--r--src/couchdb/couch_httpd_db.erl31
4 files changed, 59 insertions, 28 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 0e652b68..7c86eeae 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -21,7 +21,7 @@
-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3,make_doc/2,set_admins/2,get_admins/1]).
+-export([start_link/3,make_doc/2,set_admins/2,get_admins/1,ensure_full_commit/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
@@ -64,6 +64,9 @@ create(DbName, Options) ->
open(DbName, Options) ->
couch_server:open(DbName, Options).
+ensure_full_commit(#db{update_pid=UpdatePid}) ->
+ gen_server:call(UpdatePid, full_commit, infinity).
+
close(#db{fd=Fd}) ->
couch_file:drop_ref(Fd).
@@ -462,7 +465,7 @@ init({DbName, Filepath, Fd, Options}) ->
ok = couch_file:add_ref(Fd),
gen_server:call(UpdaterPid, get_db).
-terminate(_Reason, Db) ->
+terminate(_Reason, _Db) ->
ok.
handle_call({open_ref_counted_instance, OpenerPid}, _From, #db{fd=Fd}=Db) ->
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 22fad2ef..5dbbcee8 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -132,7 +132,8 @@
validate_doc_funs=[],
admins=[],
admins_ptr=nil,
- user_ctx=#user_ctx{}
+ user_ctx=#user_ctx{},
+ waiting_delayed_commit=nil
}).
@@ -161,6 +162,7 @@
views,
id_btree=nil,
current_seq=0,
+ committed_seq=0,
purge_seq=0,
query_server=nil,
commit_fun
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index b4faefee..d02464ca 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -54,6 +54,10 @@ handle_call({update_docs, DocActions, Options}, _From, Db) ->
throw: conflict ->
{reply, conflict, Db}
end;
+handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
+ {reply, ok, Db}; % no data waiting, return ok immediately
+handle_call(full_commit, _From, Db) ->
+ {reply, ok, commit_data(Db)}; % commit the data and return ok
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
@@ -185,9 +189,8 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{noreply, Db2}
end.
-handle_info(Msg, Db) ->
- ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
- exit({error, Msg}).
+handle_info(delayed_commit, Db) ->
+ {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -456,13 +459,8 @@ update_docs_int(Db, DocsList, Options) ->
_ ->
Db4 = refresh_validate_doc_funs(Db3)
end,
-
- case lists:member(delay_commit, Options) of
- true ->
- {ok, Db4};
- false ->
- {ok, commit_data(Db4)}
- end.
+
+ {ok, commit_data(Db4, not lists:member(full_commit, Options))}.
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
Ids = [Id || #doc{id=Id} <- Docs],
@@ -500,21 +498,34 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
{ok, Db#db{local_docs_btree = Btree2}}.
+commit_data(Db) ->
+ commit_data(Db, false).
-commit_data(#db{fd=Fd, header=Header} = Db) ->
- Header2 = Header#db_header{
+
+commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
+ Db2 = Db#db{header = Header#db_header{
update_seq = Db#db.update_seq,
summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
admins_ptr = Db#db.admins_ptr
- },
- if Header == Header2 ->
- Db; % unchanged. nothing to do
+ }},
+ if Delay and (Db#db.waiting_delayed_commit == nil) ->
+ Db2#db{waiting_delayed_commit=
+ erlang:send_after(1000, self(), delayed_commit)};
+ Delay ->
+ Db2;
true ->
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
- Db#db{header = Header2}
+ if Db#db.waiting_delayed_commit /= nil ->
+ case erlang:cancel_timer(Db#db.waiting_delayed_commit) of
+ false -> receive delayed_commit -> ok after 0 -> ok end;
+ _ -> ok
+ end;
+ true -> ok
+ end,
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Db2#db.header),
+ Db2#db{waiting_delayed_commit=nil}
end.
copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 84a0a812..df8eba80 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -89,10 +89,24 @@ db_req(#httpd{method='POST',path_parts=[_DbName]}=Req, Db) ->
db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
+db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) ->
+ ok = couch_db:ensure_full_commit(Db),
+ send_json(Req, 201, {[
+ {ok, true}
+ ]});
+
+db_req(#httpd{path_parts=[_,<<"_ensure_full_commit">>]}=Req, _Db) ->
+ send_method_not_allowed(Req, "POST");
+
db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) ->
{JsonProps} = couch_httpd:json_body(Req),
DocsArray = proplists:get_value(<<"docs">>, JsonProps),
- % convert all the doc elements to native docs
+ case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
+ "true" ->
+ Options = [full_commit];
+ _ ->
+ Options = []
+ end,
case proplists:get_value(<<"new_edits">>, JsonProps, true) of
true ->
Docs = lists:map(
@@ -109,7 +123,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) ->
Doc#doc{id=Id,revs=Revs}
end,
DocsArray),
- {ok, ResultRevs} = couch_db:update_docs(Db, Docs, []),
+ {ok, ResultRevs} = couch_db:update_docs(Db, Docs, Options),
% output the results
DocResults = lists:zipwith(
@@ -123,11 +137,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) ->
]});
false ->
- Options =
- case proplists:get_value(<<"new_edits">>, JsonProps, true) of
- true -> [new_edits];
- _ -> []
- end,
Docs = [couch_doc:from_json_obj(JsonObj) || JsonObj <- DocsArray],
ok = couch_db:update_docs(Db, Docs, Options, false),
send_json(Req, 201, {[
@@ -418,13 +427,19 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
[Rev0|_] -> Rev0;
[] -> undefined
end,
+ case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
+ "true" ->
+ Options = [full_commit];
+ _ ->
+ Options = []
+ end,
case extract_header_rev(Req, ExplicitRev) of
missing_rev ->
Revs = [];
Rev ->
Revs = [Rev]
end,
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, []),
+ {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
send_json(Req, 201, [{"Etag", <<"\"", NewRev/binary, "\"">>}], {[
{ok, true},
{id, DocId},