diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_db.erl | 7 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 4 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 45 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 31 |
4 files changed, 59 insertions, 28 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 0e652b68..7c86eeae 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -21,7 +21,7 @@ -export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,make_doc/2,set_admins/2,get_admins/1]). +-export([start_link/3,make_doc/2,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). @@ -64,6 +64,9 @@ create(DbName, Options) -> open(DbName, Options) -> couch_server:open(DbName, Options). +ensure_full_commit(#db{update_pid=UpdatePid}) -> + gen_server:call(UpdatePid, full_commit, infinity). + close(#db{fd=Fd}) -> couch_file:drop_ref(Fd). @@ -462,7 +465,7 @@ init({DbName, Filepath, Fd, Options}) -> ok = couch_file:add_ref(Fd), gen_server:call(UpdaterPid, get_db). -terminate(_Reason, Db) -> +terminate(_Reason, _Db) -> ok. handle_call({open_ref_counted_instance, OpenerPid}, _From, #db{fd=Fd}=Db) -> diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 22fad2ef..5dbbcee8 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -132,7 +132,8 @@ validate_doc_funs=[], admins=[], admins_ptr=nil, - user_ctx=#user_ctx{} + user_ctx=#user_ctx{}, + waiting_delayed_commit=nil }). @@ -161,6 +162,7 @@ views, id_btree=nil, current_seq=0, + committed_seq=0, purge_seq=0, query_server=nil, commit_fun diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index b4faefee..d02464ca 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -54,6 +54,10 @@ handle_call({update_docs, DocActions, Options}, _From, Db) -> throw: conflict -> {reply, conflict, Db} end; +handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> + {reply, ok, Db}; % no data waiting, return ok immediately +handle_call(full_commit, _From, Db) -> + {reply, ok, commit_data(Db)}; % commit the data and return ok handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), @@ -185,9 +189,8 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {noreply, Db2} end. -handle_info(Msg, Db) -> - ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), - exit({error, Msg}). +handle_info(delayed_commit, Db) -> + {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -456,13 +459,8 @@ update_docs_int(Db, DocsList, Options) -> _ -> Db4 = refresh_validate_doc_funs(Db3) end, - - case lists:member(delay_commit, Options) of - true -> - {ok, Db4}; - false -> - {ok, commit_data(Db4)} - end. + + {ok, commit_data(Db4, not lists:member(full_commit, Options))}. update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> Ids = [Id || #doc{id=Id} <- Docs], @@ -500,21 +498,34 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> {ok, Db#db{local_docs_btree = Btree2}}. +commit_data(Db) -> + commit_data(Db, false). -commit_data(#db{fd=Fd, header=Header} = Db) -> - Header2 = Header#db_header{ + +commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> + Db2 = Db#db{header = Header#db_header{ update_seq = Db#db.update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), admins_ptr = Db#db.admins_ptr - }, - if Header == Header2 -> - Db; % unchanged. nothing to do + }}, + if Delay and (Db#db.waiting_delayed_commit == nil) -> + Db2#db{waiting_delayed_commit= + erlang:send_after(1000, self(), delayed_commit)}; + Delay -> + Db2; true -> - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), - Db#db{header = Header2} + if Db#db.waiting_delayed_commit /= nil -> + case erlang:cancel_timer(Db#db.waiting_delayed_commit) of + false -> receive delayed_commit -> ok after 0 -> ok end; + _ -> ok + end; + true -> ok + end, + ok = couch_file:write_header(Fd, ?HEADER_SIG, Db2#db.header), + Db2#db{waiting_delayed_commit=nil} end. copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 84a0a812..df8eba80 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -89,10 +89,24 @@ db_req(#httpd{method='POST',path_parts=[_DbName]}=Req, Db) -> db_req(#httpd{path_parts=[_DbName]}=Req, _Db) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST"); +db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) -> + ok = couch_db:ensure_full_commit(Db), + send_json(Req, 201, {[ + {ok, true} + ]}); + +db_req(#httpd{path_parts=[_,<<"_ensure_full_commit">>]}=Req, _Db) -> + send_method_not_allowed(Req, "POST"); + db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) -> {JsonProps} = couch_httpd:json_body(Req), DocsArray = proplists:get_value(<<"docs">>, JsonProps), - % convert all the doc elements to native docs + case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of + "true" -> + Options = [full_commit]; + _ -> + Options = [] + end, case proplists:get_value(<<"new_edits">>, JsonProps, true) of true -> Docs = lists:map( @@ -109,7 +123,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) -> Doc#doc{id=Id,revs=Revs} end, DocsArray), - {ok, ResultRevs} = couch_db:update_docs(Db, Docs, []), + {ok, ResultRevs} = couch_db:update_docs(Db, Docs, Options), % output the results DocResults = lists:zipwith( @@ -123,11 +137,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) -> ]}); false -> - Options = - case proplists:get_value(<<"new_edits">>, JsonProps, true) of - true -> [new_edits]; - _ -> [] - end, Docs = [couch_doc:from_json_obj(JsonObj) || JsonObj <- DocsArray], ok = couch_db:update_docs(Db, Docs, Options, false), send_json(Req, 201, {[ @@ -418,13 +427,19 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> [Rev0|_] -> Rev0; [] -> undefined end, + case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of + "true" -> + Options = [full_commit]; + _ -> + Options = [] + end, case extract_header_rev(Req, ExplicitRev) of missing_rev -> Revs = []; Rev -> Revs = [Rev] end, - {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, []), + {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options), send_json(Req, 201, [{"Etag", <<"\"", NewRev/binary, "\"">>}], {[ {ok, true}, {id, DocId}, |