summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db_updater.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-11-03 20:51:04 +0000
committerDamien F. Katz <damien@apache.org>2009-11-03 20:51:04 +0000
commit4387dc1a5b10c63a540cefcb2bb7c6e5d9b9fd8b (patch)
treed14597954d2065ef880c97998631d0842f19224f /src/couchdb/couch_db_updater.erl
parentf2689f944e1c0f573afe4393ff26bbc988db8baf (diff)
Added batching of multiple updating requests, to improve throughput with many writers. Also removed the couch_batch_save module, now batch requests are simply saved async as immediately, batching with outhr updates if possible.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@832550 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r--src/couchdb/couch_db_updater.erl153
1 files changed, 101 insertions, 52 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index a13f9955..96a59944 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -43,19 +43,6 @@ terminate(Reason, _Srv) ->
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
-handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) ->
- try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of
- {ok, Failures, Db2} ->
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- if Db2#db.update_seq /= Db#db.update_seq ->
- couch_db_update_notifier:notify({updated, Db2#db.name});
- true -> ok
- end,
- {reply, {ok, Failures}, Db2}
- catch
- throw: retry ->
- {reply, retry, Db}
- end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
handle_call(full_commit, _From, Db) ->
@@ -192,6 +179,63 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{noreply, Db2}
end.
+
+merge_updates([], RestB, AccOutGroups) ->
+ lists:reverse(AccOutGroups, RestB);
+merge_updates(RestA, [], AccOutGroups) ->
+ lists:reverse(AccOutGroups, RestA);
+merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA],
+ [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) ->
+ if IdA == IdB ->
+ merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]);
+ IdA < IdB ->
+ merge_updates(RestA, [GroupB | RestB], [GroupA | AccOutGroups]);
+ true ->
+ merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups])
+ end.
+
+collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
+ receive
+ % only collect updates with the same MergeConflicts flag and without
+ % local docs. Makes it easier to avoid multiple local doc updaters.
+ {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
+ GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
+ || DocGroup <- GroupedDocs],
+ GroupedDocsAcc2 =
+ merge_updates(GroupedDocsAcc, GroupedDocs2, []),
+ collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
+ MergeConflicts, (FullCommit or FullCommit2))
+ after 0 ->
+ {GroupedDocsAcc, ClientsAcc, FullCommit}
+ end.
+
+handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
+ FullCommit}, Db) ->
+ GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
+ if NonRepDocs == [] ->
+ {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
+ [Client], MergeConflicts, FullCommit);
+ true ->
+ GroupedDocs3 = GroupedDocs2,
+ FullCommit2 = FullCommit,
+ Clients = [Client]
+ end,
+ NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
+ try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
+ FullCommit2) of
+ {ok, Db2} ->
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ if Db2#db.update_seq /= Db#db.update_seq ->
+ couch_db_update_notifier:notify({updated, Db2#db.name});
+ true -> ok
+ end,
+ [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
+ {noreply, Db2}
+ catch
+ throw: retry ->
+ [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
+ {noreply, Db}
+ end;
handle_info(delayed_commit, Db) ->
{noreply, commit_data(Db)}.
@@ -399,18 +443,24 @@ flush_trees(#db{fd=Fd,header=Header}=Db,
end, Unflushed),
flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccConflicts, AccSeq};
+
+send_result(Client, Id, OriginalRevs, NewResult) ->
+ % used to send a result to the client
+ catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
+
+merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+ {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) ->
+ [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
- {NewRevTree, NewConflicts} = lists:foldl(
- fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) ->
+ NewRevTree = lists:foldl(
+ fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
if not MergeConflicts ->
case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
{_NewTree, conflicts} when (not OldDeleted) ->
- {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]};
+ send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ AccTree;
{NewTree, no_conflicts} when AccTree == NewTree ->
% the tree didn't change at all
% meaning we are saving a rev that's already
@@ -426,26 +476,28 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
{NewTree2, _} = couch_key_tree:merge(AccTree,
[couch_db:doc_to_tree(NewDoc2)]),
- % we changed the rev id, this tells the caller we did.
- {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}}
- | AccConflicts2]};
+ % we changed the rev id, this tells the caller we did
+ send_result(Client, Id, {Pos-1,PrevRevs},
+ {ok, {OldPos + 1, NewRevId}}),
+ NewTree2;
true ->
- {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}
+ send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ AccTree
end;
{NewTree, _} ->
- {NewTree, AccConflicts2}
+ NewTree
end;
true ->
{NewTree, _} = couch_key_tree:merge(AccTree,
[couch_db:doc_to_tree(NewDoc)]),
- {NewTree, AccConflicts2}
+ NewTree
end
end,
- {OldTree, AccConflicts}, NewDocs),
+ OldTree, NewDocs),
if NewRevTree == OldTree ->
% nothing changed
merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos,
- AccRemoveSeqs, NewConflicts, AccSeq);
+ AccRemoveSeqs, AccSeq);
true ->
% we have updated the document, give it a new seq #
NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
@@ -454,7 +506,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
_ -> [OldSeq | AccRemoveSeqs]
end,
merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo,
- [NewInfo|AccNewInfos], RemoveSeqs, NewConflicts, AccSeq+1)
+ [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
end.
@@ -473,13 +525,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
[Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
#full_doc_info{rev_tree=Tree}=Info <- DocInfos].
-update_docs_int(Db, DocsList, NonRepDocs, Options) ->
+update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
- Ids = [Id || [#doc{id=Id}|_] <- DocsList],
+ Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
@@ -489,17 +541,15 @@ update_docs_int(Db, DocsList, NonRepDocs, Options) ->
#full_doc_info{id=Id}
end,
Ids, OldDocLookups),
-
% Merge the new docs into the revision trees.
- {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees(
- lists:member(merge_conflicts, Options),
- DocsList, OldDocInfos, [], [], [], LastSeq),
+ {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees(
+ MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
% All documents are now ready to write.
- {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs),
+ {ok, Db2} = update_local_docs(Db, NonRepDocs),
% Write out the document summaries (the bodies are stored in the nodes of
% the trees, the attachments are already written to disk)
@@ -526,15 +576,14 @@ update_docs_int(Db, DocsList, NonRepDocs, Options) ->
Db4 = refresh_validate_doc_funs(Db3)
end,
- {ok, LocalConflicts ++ Conflicts,
- commit_data(Db4, not lists:member(full_commit, Options))}.
+ {ok, commit_data(Db4, not FullCommit)}.
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
- Ids = [Id || #doc{id=Id} <- Docs],
+ Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) ->
+ fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) ->
case PrevRevs of
[RevStr|_] ->
PrevRev = list_to_integer(?b2l(RevStr));
@@ -549,28 +598,28 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
case OldRev == PrevRev of
true ->
case Delete of
- false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}};
- true -> {remove, Id, PrevRevs}
+ false ->
+ send_result(Client, Id, {0, PrevRevs}, {ok,
+ {0, ?l2b(integer_to_list(PrevRev + 1))}}),
+ {update, {Id, {PrevRev + 1, Body}}};
+ true ->
+ send_result(Client, Id, {0, PrevRevs},
+ {ok, {0, <<"0">>}}),
+ {remove, Id}
end;
false ->
- {conflict, {Id, {0, PrevRevs}}}
+ send_result(Client, Id, {0, PrevRevs}, conflict),
+ ignore
end
end, Docs, OldDocLookups),
- BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries],
- BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries],
- Results =
- [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}}
- || {remove, Id, PrevRevs} <- BtreeEntries] ++
- [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}}
- || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++
- [{IdRevs, conflict}
- || {conflict, IdRevs} <- BtreeEntries],
+ BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+ BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
- {ok, Results, Db#db{local_docs_btree = Btree2}}.
+ {ok, Db#db{local_docs_btree = Btree2}}.
commit_data(Db) ->