From 4387dc1a5b10c63a540cefcb2bb7c6e5d9b9fd8b Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Tue, 3 Nov 2009 20:51:04 +0000 Subject: Added batching of multiple updating requests, to improve throughput with many writers. Also removed the couch_batch_save module, now batch requests are simply saved async as immediately, batching with outhr updates if possible. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@832550 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db_updater.erl | 153 ++++++++++++++++++++++++++------------- 1 file changed, 101 insertions(+), 52 deletions(-) (limited to 'src/couchdb/couch_db_updater.erl') diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index a13f9955..96a59944 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -43,19 +43,6 @@ terminate(Reason, _Srv) -> handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; -handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) -> - try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of - {ok, Failures, Db2} -> - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), - if Db2#db.update_seq /= Db#db.update_seq -> - couch_db_update_notifier:notify({updated, Db2#db.name}); - true -> ok - end, - {reply, {ok, Failures}, Db2} - catch - throw: retry -> - {reply, retry, Db} - end; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately handle_call(full_commit, _From, Db) -> @@ -192,6 +179,63 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {noreply, Db2} end. + +merge_updates([], RestB, AccOutGroups) -> + lists:reverse(AccOutGroups, RestB); +merge_updates(RestA, [], AccOutGroups) -> + lists:reverse(AccOutGroups, RestA); +merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA], + [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) -> + if IdA == IdB -> + merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]); + IdA < IdB -> + merge_updates(RestA, [GroupB | RestB], [GroupA | AccOutGroups]); + true -> + merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups]) + end. + +collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> + receive + % only collect updates with the same MergeConflicts flag and without + % local docs. Makes it easier to avoid multiple local doc updaters. + {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} -> + GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup] + || DocGroup <- GroupedDocs], + GroupedDocsAcc2 = + merge_updates(GroupedDocsAcc, GroupedDocs2, []), + collect_updates(GroupedDocsAcc2, [Client | ClientsAcc], + MergeConflicts, (FullCommit or FullCommit2)) + after 0 -> + {GroupedDocsAcc, ClientsAcc, FullCommit} + end. + +handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, + FullCommit}, Db) -> + GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs], + if NonRepDocs == [] -> + {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2, + [Client], MergeConflicts, FullCommit); + true -> + GroupedDocs3 = GroupedDocs2, + FullCommit2 = FullCommit, + Clients = [Client] + end, + NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs], + try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts, + FullCommit2) of + {ok, Db2} -> + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + if Db2#db.update_seq /= Db#db.update_seq -> + couch_db_update_notifier:notify({updated, Db2#db.name}); + true -> ok + end, + [catch(ClientPid ! {done, self()}) || ClientPid <- Clients], + {noreply, Db2} + catch + throw: retry -> + [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients], + {noreply, Db} + end; handle_info(delayed_commit, Db) -> {noreply, commit_data(Db)}. @@ -399,18 +443,24 @@ flush_trees(#db{fd=Fd,header=Header}=Db, end, Unflushed), flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). -merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) -> - {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccConflicts, AccSeq}; + +send_result(Client, Id, OriginalRevs, NewResult) -> + % used to send a result to the client + catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}). + +merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) -> + {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq}; merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], - [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) -> + [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) -> #full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq} = OldDocInfo, - {NewRevTree, NewConflicts} = lists:foldl( - fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) -> + NewRevTree = lists:foldl( + fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) -> if not MergeConflicts -> case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of {_NewTree, conflicts} when (not OldDeleted) -> - {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}; + send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + AccTree; {NewTree, no_conflicts} when AccTree == NewTree -> % the tree didn't change at all % meaning we are saving a rev that's already @@ -426,26 +476,28 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}}, {NewTree2, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc2)]), - % we changed the rev id, this tells the caller we did. - {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}} - | AccConflicts2]}; + % we changed the rev id, this tells the caller we did + send_result(Client, Id, {Pos-1,PrevRevs}, + {ok, {OldPos + 1, NewRevId}}), + NewTree2; true -> - {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]} + send_result(Client, Id, {Pos-1,PrevRevs}, conflict), + AccTree end; {NewTree, _} -> - {NewTree, AccConflicts2} + NewTree end; true -> {NewTree, _} = couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]), - {NewTree, AccConflicts2} + NewTree end end, - {OldTree, AccConflicts}, NewDocs), + OldTree, NewDocs), if NewRevTree == OldTree -> % nothing changed merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos, - AccRemoveSeqs, NewConflicts, AccSeq); + AccRemoveSeqs, AccSeq); true -> % we have updated the document, give it a new seq # NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, @@ -454,7 +506,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], _ -> [OldSeq | AccRemoveSeqs] end, merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, - [NewInfo|AccNewInfos], RemoveSeqs, NewConflicts, AccSeq+1) + [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1) end. @@ -473,13 +525,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. -update_docs_int(Db, DocsList, NonRepDocs, Options) -> +update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> #db{ fulldocinfo_by_id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, - Ids = [Id || [#doc{id=Id}|_] <- DocsList], + Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], % lookup up the old documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocInfos = lists:zipwith( @@ -489,17 +541,15 @@ update_docs_int(Db, DocsList, NonRepDocs, Options) -> #full_doc_info{id=Id} end, Ids, OldDocLookups), - % Merge the new docs into the revision trees. - {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees( - lists:member(merge_conflicts, Options), - DocsList, OldDocInfos, [], [], [], LastSeq), + {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees( + MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq), NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0), % All documents are now ready to write. - {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs), + {ok, Db2} = update_local_docs(Db, NonRepDocs), % Write out the document summaries (the bodies are stored in the nodes of % the trees, the attachments are already written to disk) @@ -526,15 +576,14 @@ update_docs_int(Db, DocsList, NonRepDocs, Options) -> Db4 = refresh_validate_doc_funs(Db3) end, - {ok, LocalConflicts ++ Conflicts, - commit_data(Db4, not lists:member(full_commit, Options))}. + {ok, commit_data(Db4, not FullCommit)}. update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> - Ids = [Id || #doc{id=Id} <- Docs], + Ids = [Id || {_Client, #doc{id=Id}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) -> + fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) -> case PrevRevs of [RevStr|_] -> PrevRev = list_to_integer(?b2l(RevStr)); @@ -549,28 +598,28 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> case OldRev == PrevRev of true -> case Delete of - false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}}; - true -> {remove, Id, PrevRevs} + false -> + send_result(Client, Id, {0, PrevRevs}, {ok, + {0, ?l2b(integer_to_list(PrevRev + 1))}}), + {update, {Id, {PrevRev + 1, Body}}}; + true -> + send_result(Client, Id, {0, PrevRevs}, + {ok, {0, <<"0">>}}), + {remove, Id} end; false -> - {conflict, {Id, {0, PrevRevs}}} + send_result(Client, Id, {0, PrevRevs}, conflict), + ignore end end, Docs, OldDocLookups), - BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries], - BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries], - Results = - [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}} - || {remove, Id, PrevRevs} <- BtreeEntries] ++ - [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}} - || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++ - [{IdRevs, conflict} - || {conflict, IdRevs} <- BtreeEntries], + BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], + BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries], {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Results, Db#db{local_docs_btree = Btree2}}. + {ok, Db#db{local_docs_btree = Btree2}}. commit_data(Db) -> -- cgit v1.2.3