From 4387dc1a5b10c63a540cefcb2bb7c6e5d9b9fd8b Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Tue, 3 Nov 2009 20:51:04 +0000 Subject: Added batching of multiple updating requests, to improve throughput with many writers. Also removed the couch_batch_save module, now batch requests are simply saved async as immediately, batching with outhr updates if possible. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@832550 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) (limited to 'src/couchdb/couch_db.erl') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 736b80aa..2dbb88a3 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -586,25 +586,46 @@ set_commit_option(Options) -> [full_commit|Options] end. +collect_results(UpdatePid, MRef, ResultsAcc) -> + receive + {result, UpdatePid, Result} -> + collect_results(UpdatePid, MRef, [Result | ResultsAcc]); + {done, UpdatePid} -> + {ok, ResultsAcc}; + {retry, UpdatePid} -> + retry; + {'DOWN', MRef, _, _, Reason} -> + exit(Reason) + end. + write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, NonRepDocs, Options0) -> Options = set_commit_option(Options0), - case gen_server:call(UpdatePid, - {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of - {ok, Results} -> {ok, Results}; - retry -> - % This can happen if the db file we wrote to was swapped out by - % compaction. Retry by reopening the db and writing to the current file - {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), - DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], - % We only retry once - close(Db2), - case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of + MergeConflicts = lists:member(merge_conflicts, Options), + FullCommit = lists:member(full_commit, Options), + MRef = erlang:monitor(process, UpdatePid), + try + UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit}, + case collect_results(UpdatePid, MRef, []) of {ok, Results} -> {ok, Results}; - retry -> throw({update_error, compaction_retry}) + retry -> + % This can happen if the db file we wrote to was swapped out by + % compaction. Retry by reopening the db and writing to the current file + {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), + DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + % We only retry once + close(Db2), + UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, + case collect_results(UpdatePid, MRef, []) of + {ok, Results} -> {ok, Results}; + retry -> throw({update_error, compaction_retry}) + end end + after + erlang:demonitor(MRef, [flush]) end. + set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) -> Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) -> % already commited to disk, do not set new rev -- cgit v1.2.3