summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-11-03 20:51:04 +0000
committerDamien F. Katz <damien@apache.org>2009-11-03 20:51:04 +0000
commit4387dc1a5b10c63a540cefcb2bb7c6e5d9b9fd8b (patch)
treed14597954d2065ef880c97998631d0842f19224f /src/couchdb/couch_db.erl
parentf2689f944e1c0f573afe4393ff26bbc988db8baf (diff)
Added batching of multiple updating requests, to improve throughput with many writers. Also removed the couch_batch_save module, now batch requests are simply saved async as immediately, batching with outhr updates if possible.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@832550 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r--src/couchdb/couch_db.erl45
1 files changed, 33 insertions, 12 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 736b80aa..2dbb88a3 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -586,25 +586,46 @@ set_commit_option(Options) ->
[full_commit|Options]
end.
+collect_results(UpdatePid, MRef, ResultsAcc) ->
+ receive
+ {result, UpdatePid, Result} ->
+ collect_results(UpdatePid, MRef, [Result | ResultsAcc]);
+ {done, UpdatePid} ->
+ {ok, ResultsAcc};
+ {retry, UpdatePid} ->
+ retry;
+ {'DOWN', MRef, _, _, Reason} ->
+ exit(Reason)
+ end.
+
write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
NonRepDocs, Options0) ->
Options = set_commit_option(Options0),
- case gen_server:call(UpdatePid,
- {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of
- {ok, Results} -> {ok, Results};
- retry ->
- % This can happen if the db file we wrote to was swapped out by
- % compaction. Retry by reopening the db and writing to the current file
- {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
- DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- % We only retry once
- close(Db2),
- case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of
+ MergeConflicts = lists:member(merge_conflicts, Options),
+ FullCommit = lists:member(full_commit, Options),
+ MRef = erlang:monitor(process, UpdatePid),
+ try
+ UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(UpdatePid, MRef, []) of
{ok, Results} -> {ok, Results};
- retry -> throw({update_error, compaction_retry})
+ retry ->
+ % This can happen if the db file we wrote to was swapped out by
+ % compaction. Retry by reopening the db and writing to the current file
+ {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
+ DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ % We only retry once
+ close(Db2),
+ UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(UpdatePid, MRef, []) of
+ {ok, Results} -> {ok, Results};
+ retry -> throw({update_error, compaction_retry})
+ end
end
+ after
+ erlang:demonitor(MRef, [flush])
end.
+
set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
% already commited to disk, do not set new rev