diff options
author | Damien F. Katz <damien@apache.org> | 2009-11-03 20:51:04 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2009-11-03 20:51:04 +0000 |
commit | 4387dc1a5b10c63a540cefcb2bb7c6e5d9b9fd8b (patch) | |
tree | d14597954d2065ef880c97998631d0842f19224f | |
parent | f2689f944e1c0f573afe4393ff26bbc988db8baf (diff) |
Added batching of multiple updating requests, to improve throughput with many writers. Also removed the couch_batch_save module, now batch requests are simply saved async as immediately, batching with outhr updates if possible.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@832550 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | README | 2 | ||||
-rw-r--r-- | etc/couchdb/default.ini.tpl.in | 1 | ||||
-rw-r--r-- | share/www/script/test/batch_save.js | 61 | ||||
-rw-r--r-- | src/couchdb/Makefile.am | 4 | ||||
-rw-r--r-- | src/couchdb/couch.app.tpl.in | 2 | ||||
-rw-r--r-- | src/couchdb/couch_batch_save.erl | 273 | ||||
-rw-r--r-- | src/couchdb/couch_batch_save_sup.erl | 37 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 45 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 153 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 22 | ||||
-rwxr-xr-x | test/etap/001-load.t | 2 |
11 files changed, 175 insertions, 427 deletions
@@ -452,7 +452,7 @@ Tests are also available to be run individually like such: # Current time local 2009-09-26 23:47:44 # Using etap version "0.3.4" 1..39 - ok 1 - Loaded: couch_batch_save + ok 1 - Loaded: couch_btree ... Cryptographic Software Notice diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in index bc234366..33385207 100644 --- a/etc/couchdb/default.ini.tpl.in +++ b/etc/couchdb/default.ini.tpl.in @@ -48,7 +48,6 @@ reduce_limit = true view_manager={couch_view, start_link, []} external_manager={couch_external_manager, start_link, []} db_update_notifier={couch_db_update_notifier_sup, start_link, []} -batch_save={couch_batch_save_sup, start_link, []} query_servers={couch_query_servers, start_link, []} httpd={couch_httpd, start_link, []} stats_aggregator={couch_stats_aggregator, start, []} diff --git a/share/www/script/test/batch_save.js b/share/www/script/test/batch_save.js index e321b108..1c8a2be9 100644 --- a/share/www/script/test/batch_save.js +++ b/share/www/script/test/batch_save.js @@ -16,45 +16,30 @@ couchTests.batch_save = function(debug) { db.createDb(); if (debug) debugger; - // commit should work fine with no batches - T(db.ensureFullCommit().ok); - - // PUT a doc with ?batch=ok - T(db.save({_id:"0",a:1,b:1}, {batch : "ok"}).ok); - - // test that response is 202 Accepted - T(db.last_req.status == 202); - - T(db.allDocs().total_rows == 0); - - restartServer(); - - // lost the updates - T(db.allDocs().total_rows == 0); - - T(db.save({_id:"0",a:1,b:1}, {batch : "ok"}).ok); - T(db.save({_id:"1",a:1,b:1}, {batch : "ok"}).ok); - T(db.save({_id:"2",a:1,b:1}, {batch : "ok"}).ok); - - T(db.ensureFullCommit().ok); - T(db.allDocs().total_rows == 3); + var i + for(i=0; i < 100; i++) { + T(db.save({_id:i.toString(),a:i,b:i}, {batch : "ok"}).ok); + + // test that response is 202 Accepted + T(db.last_req.status == 202); + } + + for(i=0; i < 100; i++) { + // attempt to save the same document a bunch of times + T(db.save({_id:"foo",a:i,b:i}, {batch : "ok"}).ok); + + // test that response is 202 Accepted + T(db.last_req.status == 202); + } + + while(db.allDocs().total_rows != 101){}; // repeat the tests for POST - var resp = db.request("POST", db.uri + "?batch=ok", {body: JSON.stringify({a:1})}); - T(JSON.parse(resp.responseText).ok); - - // test that response is 202 Accepted - T(resp.status == 202); - - T(db.allDocs().total_rows == 3); - // restartServer(); - // // lost the POSTed doc - // T(db.allDocs().total_rows == 3); - - var resp = db.request("POST", db.uri + "?batch=ok", {body: JSON.stringify({a:1})}); - T(JSON.parse(resp.responseText).ok); - - T(db.ensureFullCommit().ok); - T(db.allDocs().total_rows == 5); + for(i=0; i < 100; i++) { + var resp = db.request("POST", db.uri + "?batch=ok", {body: JSON.stringify({a:1})}); + T(JSON.parse(resp.responseText).ok); + } + + while(db.allDocs().total_rows != 201){}; }; diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 2459ca8f..5842521b 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -56,8 +56,6 @@ source_files = \ couch.erl \ couch_app.erl \ couch_btree.erl \ - couch_batch_save.erl \ - couch_batch_save_sup.erl \ couch_config.erl \ couch_config_writer.erl \ couch_db.erl \ @@ -113,8 +111,6 @@ compiled_files = \ couch.beam \ couch_app.beam \ couch_btree.beam \ - couch_batch_save.beam \ - couch_batch_save_sup.beam \ couch_config.beam \ couch_config_writer.beam \ couch_db.beam \ diff --git a/src/couchdb/couch.app.tpl.in b/src/couchdb/couch.app.tpl.in index 84ac36ee..fa86d2ec 100644 --- a/src/couchdb/couch.app.tpl.in +++ b/src/couchdb/couch.app.tpl.in @@ -3,8 +3,6 @@ {vsn, "@version@"}, {modules, [@modules@]}, {registered, [ - couch_batch_save, - couch_batch_save_sup, couch_config, couch_db_update, couch_db_update_notifier_sup, diff --git a/src/couchdb/couch_batch_save.erl b/src/couchdb/couch_batch_save.erl deleted file mode 100644 index 600dd00e..00000000 --- a/src/couchdb/couch_batch_save.erl +++ /dev/null @@ -1,273 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_batch_save). - --behaviour(gen_server). - -%% API --export([start_link/2, eventually_save_doc/3, commit_now/2]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --include("couch_db.hrl"). - --record(batch_state, { - batch_size=1000, - batch_interval=1000 - }). - -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- -start_link(BatchSize, BatchInterval) -> - gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []). - -%%-------------------------------------------------------------------- -%% Function: commit_doc(Doc) -> committed -%% Description: Puts the doc into the set to commit. Does not reply until -%% the commit is complete. -%%-------------------------------------------------------------------- -eventually_save_doc(DbName, Doc, UserCtx) -> - % find or create a process for the {DbName, UserCtx} pair - {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx), - % hand it the document - ?LOG_DEBUG("sending doc to batch ~p",[Pid]), - ok = send_doc_to_batch(Pid, Doc). - -%%-------------------------------------------------------------------- -%% Function: commit_now(DbName) -> committed -%% Description: Commits all docs for the DB. Does not reply until -%% the commit is complete. -%%-------------------------------------------------------------------- -commit_now(DbName, UserCtx) -> - % find the process for the {DbName, UserCtx} pair - {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false), - case Pid of - none -> committed; - _Else -> - ok = send_commit(Pid), - committed - end. - -%%-------------------------------------------------------------------- -%% Function: commit_now() -> committed -%% Description: Commits all docs for all DBs. Does not reply until -%% the commit is complete. -%%-------------------------------------------------------------------- -% commit_all() -> -% committed = gen_server:call(couch_batch_save, commit_now, infinity). -% - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init([BatchSize, BatchInterval]) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server with the meanings -%%-------------------------------------------------------------------- -init([BatchSize, BatchInterval]) -> - ets:new(couch_batch_save_by_db, [set, public, named_table]), - {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{ - batch_size=BatchSize, - batch_interval=BatchInterval - }=State) -> - % Create the pid in a serialized process. - % We checked before to see that we need the Pid, but the check is outside - % the gen_server for parellelism. We check again here to ensure we don't - % make a duplicate. - Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of - [{_, Pid}] -> - % we have a pid - {ok, Pid}; - [] -> - % no match - % start and record the doc collector process - ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]), - Pid = spawn_link(fun() -> - doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) - end), - true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}), - {ok, Pid} - end, - {reply, Resp, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -% handle_info({'EXIT', Pid, Reason}, State) -> -% {noreply, State}; - -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - % todo shutdown the interval loop - % todo kill all the Pids and drop the ets table - ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- - -commit_user_docs(_DbName, _UserCtx, []) -> - {ok, []}; - -commit_user_docs(DbName, UserCtx, Docs) -> - ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]), - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> - try - {ok, Revs} = couch_db:update_docs(Db, Docs), - ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]), - {ok, Revs} - after - couch_db:close(Db) - end; - Error -> - throw(Error) - end. - -% spawned to trigger commits on an interval -commit_every_ms(Pid, BatchInterval) -> - receive - after BatchInterval -> - ok = send_commit(Pid), - commit_every_ms(Pid, BatchInterval) - end. - -send_commit(Pid) -> - Pid ! {self(), commit}, - receive - {Pid, committed} -> - ok; - {'DOWN', _, _, Pid, _} -> - exit(normal) - end. - -batch_pid_for_db_and_user(DbName, UserCtx) -> - batch_pid_for_db_and_user(DbName, UserCtx, true). - -batch_pid_for_db_and_user(DbName, UserCtx, Create) -> - % look in the ets table - case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of - [{_, Pid}] -> - % we have a pid - {ok, Pid}; - [] -> - % no match - if Create -> - {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity), - {ok, Pid}; - true -> - {ok, none} - end - end. - -send_doc_to_batch(Pid, Doc) -> - Pid ! {self(), add_doc, Doc}, - receive - {Pid, doc_added} -> ok - end. - -% the loop that holds documents between commits -doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) -> - % start a process that triggers commit every BatchInterval milliseconds - Me = self(), - spawn_link(fun() -> - erlang:monitor(process, Me), - commit_every_ms(Me, BatchInterval) - end), - doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []); - -doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize-> - collector_commit(DbName, UserCtx, BatchInterval, Docs), - exit(normal); - -doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) -> - receive - {From, add_doc, Doc} -> - From ! {self(), doc_added}, - doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]); - {From, commit} -> - collector_commit(DbName, UserCtx, BatchInterval, Docs), - From ! {self(), committed}, - exit(normal) - end. - -collector_commit(DbName, UserCtx, BatchInterval, Docs) -> - % unregister - unregister_collector(DbName, UserCtx, self()), - % wait and collect - Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs), - {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2). - -unregister_collector(DbName, UserCtx, Pid) -> - % remove from ets - ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}). - -shutdown_collector(DbName, UserCtx, BatchInterval, Docs) -> - receive - {From, add_doc, Doc} -> - From ! {self(), doc_added}, - shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs]) - % this interval will be waited for each time ensure-full-commit is called - after BatchInterval -> - Docs - end. diff --git a/src/couchdb/couch_batch_save_sup.erl b/src/couchdb/couch_batch_save_sup.erl deleted file mode 100644 index c18e2c1c..00000000 --- a/src/couchdb/couch_batch_save_sup.erl +++ /dev/null @@ -1,37 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_batch_save_sup). - --behaviour(supervisor). - --export([start_link/0,init/1]). - -start_link() -> - supervisor:start_link({local, couch_batch_save_sup}, - couch_batch_save_sup, []). - -init([]) -> - Self = self(), - ok = couch_config:register( - fun("couchdb", _) -> - exit(Self, reload_config) - end), - - BatchSize = list_to_integer(couch_config:get("couchdb", - "batch_save_size","1000")), - BatchInterval = list_to_integer(couch_config:get("couchdb", - "batch_save_interval","1000")), - - Batch = {batch, {couch_batch_save, start_link, [BatchSize, BatchInterval]}, - permanent, 1000, worker, [couch_batch_save]}, - {ok, {{one_for_one, 10, 3600}, [Batch]}}. diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 736b80aa..2dbb88a3 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -586,25 +586,46 @@ set_commit_option(Options) -> [full_commit|Options] end. +collect_results(UpdatePid, MRef, ResultsAcc) -> + receive + {result, UpdatePid, Result} -> + collect_results(UpdatePid, MRef, [Result | ResultsAcc]); + {done, UpdatePid} -> + {ok, ResultsAcc}; + {retry, UpdatePid} -> + retry; + {'DOWN', MRef, _, _, Reason} -> + exit(Reason) + end. + write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, NonRepDocs, Options0) -> Options = set_commit_option(Options0), - case gen_server:call(UpdatePid, - {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of - {ok, Results} -> {ok, Results}; - retry -> - % This can happen if the db file we wrote to was swapped out by - % compaction. Retry by reopening the db and writing to the current file - {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), - DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], - % We only retry once - close(Db2), - case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of + MergeConflicts = lists:member(merge_conflicts, Options), + FullCommit = lists:member(full_commit, Options), + MRef = erlang:monitor(process, UpdatePid), + try + UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit}, + case collect_results(UpdatePid, MRef, []) of {ok, Results} -> {ok, Results}; - retry -> throw({update_error, compaction_retry}) + retry -> + % This can happen if the db file we wrote to was swapped out by + % compaction. Retry by reopening the db and writing to the current file + {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), + DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + % We only retry once + close(Db2), + UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, + case collect_results(UpdatePid, MRef, []) of + {ok, Results} -> {ok, Results}; + retry -> throw({update_error, compaction_retry}) + end end + after + erlang:demonitor(MRef, [flush]) end. + set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) -> Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) -> % already commited to disk, do not set new rev diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index a13f9955..96a59944 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -43,19 +43,6 @@ terminate(Reason, _Srv) -> handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; -handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) -> - try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of - {ok, Failures, Db2} -> - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), - if Db2#db.update_seq /= Db#db.update_seq -> - couch_db_update_notifier:notify({updated, Db2#db.name}); - true -> ok - end, - {reply, {ok, Failures}, Db2} - catch - throw: retry -> - {reply, retry, Db} - end; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately handle_call(full_commit, _From, Db) -> @@ -192,6 +179,63 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {noreply, Db2} end. + +merge_updates([], RestB, AccOutGroups) -> + lists:reverse(AccOutGroups, RestB); +merge_updates(RestA, [], AccOutGroups) -> + lists:reverse(AccOutGroups, RestA); +merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA], + [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) -> + if IdA == IdB -> + merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]); + IdA < IdB -> + merge_updates(RestA, [GroupB | RestB], [GroupA | AccOutGroups]); + true -> + merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups]) + end. + +collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> + receive + % only collect updates with the same MergeConflicts flag and without + % local docs. Makes it easier to avoid multiple local doc updaters. + {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} -> + GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup] + || DocGroup <- GroupedDocs], + GroupedDocsAcc2 = + merge_updates(GroupedDocsAcc, GroupedDocs2, []), + collect_updates(GroupedDocsAcc2, [Client | ClientsAcc], + MergeConflicts, (FullCommit or FullCommit2)) + after 0 -> + {GroupedDocsAcc, ClientsAcc, FullCommit} + end. + +handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, + FullCommit}, Db) -> + GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs], + if NonRepDocs == [] -> + {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2, + [Client], MergeConflicts, FullCommit); + true -> + GroupedDocs3 = GroupedDocs2, + FullCommit2 = FullCommit, + Clients = [Client] + end, + NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs], + try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts, + FullCommit2) of + {ok, Db2} -> + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + if Db2#db.update_seq /= Db#db.update_seq -> + couch_db_update_notifier:notify({updated, Db2#db.name}); + true -> ok + end, + [catch(ClientPid ! {done, self()}) || ClientPid <- Clients], + {noreply, Db2} + catch + throw: retry -> + [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients], + {noreply, Db} + end; handle_info(delayed_commit, Db) -> {noreply, commit_data(Db)}. @@ -399,18 +443,24 @@ flush_trees(#db{fd=Fd,header=Header}=Db, end, Unflushed), flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). -merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) -> - {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccConflicts, AccSeq}; + +send_result(Client, Id, OriginalRevs, NewResult) -> + % used to send a result to the client + catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). + +merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> + {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], - [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) -> + [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) -> #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, - {NewRevTree, NewConflicts} = lists:foldl( - fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) -> + NewRevTree = lists:foldl( + fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) -> if not MergeConflicts -> case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of {_NewTree, conflicts} when (not OldDeleted) -> - {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}; + send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + AccTree; {NewTree, no_conflicts} when AccTree == NewTree -> % the tree didn't change at all % meaning we are saving a rev that's already @@ -426,26 +476,28 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, {NewTree2, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc2)]), - % we changed the rev id, this tells the caller we did. - {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}} - | AccConflicts2]}; + % we changed the rev id, this tells the caller we did + send_result(Client, Id, {Pos-1,PrevRevs}, + {ok, {OldPos + 1, NewRevId}}), + NewTree2; true -> - {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]} + send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + AccTree end; {NewTree, _} -> - {NewTree, AccConflicts2} + NewTree end; true -> {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]), - {NewTree, AccConflicts2} + NewTree end end, - {OldTree, AccConflicts}, NewDocs), + OldTree, NewDocs), if NewRevTree == OldTree -> % nothing changed merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, - AccRemoveSeqs, NewConflicts, AccSeq); + AccRemoveSeqs, AccSeq); true -> % we have updated the document, give it a new seq # NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, @@ -454,7 +506,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], _ -> [OldSeq | AccRemoveSeqs] end, merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, - [NewInfo|AccNewInfos], RemoveSeqs, NewConflicts, AccSeq+1) + [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) end. @@ -473,13 +525,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. -update_docs_int(Db, DocsList, NonRepDocs, Options) -> +update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, - Ids = [Id || [#doc{id=Id}|_] <- DocsList], + Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( @@ -489,17 +541,15 @@ update_docs_int(Db, DocsList, NonRepDocs, Options) -> #full_doc_info{id=Id} end, Ids, OldDocLookups), - % Merge the new docs into the revision trees. - {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees( - lists:member(merge_conflicts, Options), - DocsList, OldDocInfos, [], [], [], LastSeq), + {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees( + MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq), NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), % All documents are now ready to write. - {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs), + {ok, Db2} = update_local_docs(Db, NonRepDocs), % Write out the document summaries (the bodies are stored in the nodes of % the trees, the attachments are already written to disk) @@ -526,15 +576,14 @@ update_docs_int(Db, DocsList, NonRepDocs, Options) -> Db4 = refresh_validate_doc_funs(Db3) end, - {ok, LocalConflicts ++ Conflicts, - commit_data(Db4, not lists:member(full_commit, Options))}. + {ok, commit_data(Db4, not FullCommit)}. update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> - Ids = [Id || #doc{id=Id} <- Docs], + Ids = [Id || {_Client, #doc{id=Id}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) -> + fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) -> case PrevRevs of [RevStr|_] -> PrevRev = list_to_integer(?b2l(RevStr)); @@ -549,28 +598,28 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> case OldRev == PrevRev of true -> case Delete of - false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}}; - true -> {remove, Id, PrevRevs} + false -> + send_result(Client, Id, {0, PrevRevs}, {ok, + {0, ?l2b(integer_to_list(PrevRev + 1))}}), + {update, {Id, {PrevRev + 1, Body}}}; + true -> + send_result(Client, Id, {0, PrevRevs}, + {ok, {0, <<"0">>}}), + {remove, Id} end; false -> - {conflict, {Id, {0, PrevRevs}}} + send_result(Client, Id, {0, PrevRevs}, conflict), + ignore end end, Docs, OldDocLookups), - BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries], - BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries], - Results = - [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}} - || {remove, Id, PrevRevs} <- BtreeEntries] ++ - [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}} - || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++ - [{IdRevs, conflict} - || {conflict, IdRevs} <- BtreeEntries], + BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], + BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries], {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Results, Db#db{local_docs_btree = Btree2}}. + {ok, Db#db{local_docs_btree = Btree2}}. commit_data(Db) -> diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 3945498d..f7fd24a8 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -349,9 +349,15 @@ db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) -> DocId = Doc2#doc.id, case couch_httpd:qs_value(Req, "batch") of "ok" -> - % batch - ok = couch_batch_save:eventually_save_doc( - Db#db.name, Doc2, Db#db.user_ctx), + % async_batching + spawn(fun() -> + case catch(couch_db:update_doc(Db, Doc2, [])) of + {ok, _} -> ok; + Error -> + ?LOG_INFO("Batch doc error (~s): ~p",[DocId, Error]) + end + end), + send_json(Req, 202, [], {[ {ok, true}, {id, DocId} @@ -378,7 +384,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) - {ok, StartTime} = case couch_httpd:qs_value(Req, "seq") of undefined -> - committed = couch_batch_save:commit_now(Db#db.name, Db#db.user_ctx), couch_db:ensure_full_commit(Db); RequiredStr -> RequiredSeq = list_to_integer(RequiredStr), @@ -749,7 +754,14 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> "ok" -> % batch Doc = couch_doc_from_req(Req, DocId, Json), - ok = couch_batch_save:eventually_save_doc(Db#db.name, Doc, Db#db.user_ctx), + + spawn(fun() -> + case catch(couch_db:update_doc(Db, Doc, [])) of + {ok, _} -> ok; + Error -> + ?LOG_INFO("Batch doc error (~s): ~p",[DocId, Error]) + end + end), send_json(Req, 202, [], {[ {ok, true}, {id, DocId} diff --git a/test/etap/001-load.t b/test/etap/001-load.t index 619b289b..66136ed8 100755 --- a/test/etap/001-load.t +++ b/test/etap/001-load.t @@ -21,8 +21,6 @@ main(_) -> code:add_pathz("src/couchdb"), etap:plan(39), Modules = [ - couch_batch_save, - couch_batch_save_sup, couch_btree, couch_config, couch_config_writer, |