summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-11-03 20:51:04 +0000
committerDamien F. Katz <damien@apache.org>2009-11-03 20:51:04 +0000
commit4387dc1a5b10c63a540cefcb2bb7c6e5d9b9fd8b (patch)
treed14597954d2065ef880c97998631d0842f19224f
parentf2689f944e1c0f573afe4393ff26bbc988db8baf (diff)
Added batching of multiple updating requests, to improve throughput with many writers. Also removed the couch_batch_save module, now batch requests are simply saved async as immediately, batching with outhr updates if possible.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@832550 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--README2
-rw-r--r--etc/couchdb/default.ini.tpl.in1
-rw-r--r--share/www/script/test/batch_save.js61
-rw-r--r--src/couchdb/Makefile.am4
-rw-r--r--src/couchdb/couch.app.tpl.in2
-rw-r--r--src/couchdb/couch_batch_save.erl273
-rw-r--r--src/couchdb/couch_batch_save_sup.erl37
-rw-r--r--src/couchdb/couch_db.erl45
-rw-r--r--src/couchdb/couch_db_updater.erl153
-rw-r--r--src/couchdb/couch_httpd_db.erl22
-rwxr-xr-xtest/etap/001-load.t2
11 files changed, 175 insertions, 427 deletions
diff --git a/README b/README
index cfa2fe89..3366c8ec 100644
--- a/README
+++ b/README
@@ -452,7 +452,7 @@ Tests are also available to be run individually like such:
# Current time local 2009-09-26 23:47:44
# Using etap version "0.3.4"
1..39
- ok 1 - Loaded: couch_batch_save
+ ok 1 - Loaded: couch_btree
...
Cryptographic Software Notice
diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in
index bc234366..33385207 100644
--- a/etc/couchdb/default.ini.tpl.in
+++ b/etc/couchdb/default.ini.tpl.in
@@ -48,7 +48,6 @@ reduce_limit = true
view_manager={couch_view, start_link, []}
external_manager={couch_external_manager, start_link, []}
db_update_notifier={couch_db_update_notifier_sup, start_link, []}
-batch_save={couch_batch_save_sup, start_link, []}
query_servers={couch_query_servers, start_link, []}
httpd={couch_httpd, start_link, []}
stats_aggregator={couch_stats_aggregator, start, []}
diff --git a/share/www/script/test/batch_save.js b/share/www/script/test/batch_save.js
index e321b108..1c8a2be9 100644
--- a/share/www/script/test/batch_save.js
+++ b/share/www/script/test/batch_save.js
@@ -16,45 +16,30 @@ couchTests.batch_save = function(debug) {
db.createDb();
if (debug) debugger;
- // commit should work fine with no batches
- T(db.ensureFullCommit().ok);
-
- // PUT a doc with ?batch=ok
- T(db.save({_id:"0",a:1,b:1}, {batch : "ok"}).ok);
-
- // test that response is 202 Accepted
- T(db.last_req.status == 202);
-
- T(db.allDocs().total_rows == 0);
-
- restartServer();
-
- // lost the updates
- T(db.allDocs().total_rows == 0);
-
- T(db.save({_id:"0",a:1,b:1}, {batch : "ok"}).ok);
- T(db.save({_id:"1",a:1,b:1}, {batch : "ok"}).ok);
- T(db.save({_id:"2",a:1,b:1}, {batch : "ok"}).ok);
-
- T(db.ensureFullCommit().ok);
- T(db.allDocs().total_rows == 3);
+ var i
+ for(i=0; i < 100; i++) {
+ T(db.save({_id:i.toString(),a:i,b:i}, {batch : "ok"}).ok);
+
+ // test that response is 202 Accepted
+ T(db.last_req.status == 202);
+ }
+
+ for(i=0; i < 100; i++) {
+ // attempt to save the same document a bunch of times
+ T(db.save({_id:"foo",a:i,b:i}, {batch : "ok"}).ok);
+
+ // test that response is 202 Accepted
+ T(db.last_req.status == 202);
+ }
+
+ while(db.allDocs().total_rows != 101){};
// repeat the tests for POST
- var resp = db.request("POST", db.uri + "?batch=ok", {body: JSON.stringify({a:1})});
- T(JSON.parse(resp.responseText).ok);
-
- // test that response is 202 Accepted
- T(resp.status == 202);
-
- T(db.allDocs().total_rows == 3);
- // restartServer();
- // // lost the POSTed doc
- // T(db.allDocs().total_rows == 3);
-
- var resp = db.request("POST", db.uri + "?batch=ok", {body: JSON.stringify({a:1})});
- T(JSON.parse(resp.responseText).ok);
-
- T(db.ensureFullCommit().ok);
- T(db.allDocs().total_rows == 5);
+ for(i=0; i < 100; i++) {
+ var resp = db.request("POST", db.uri + "?batch=ok", {body: JSON.stringify({a:1})});
+ T(JSON.parse(resp.responseText).ok);
+ }
+
+ while(db.allDocs().total_rows != 201){};
};
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 2459ca8f..5842521b 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -56,8 +56,6 @@ source_files = \
couch.erl \
couch_app.erl \
couch_btree.erl \
- couch_batch_save.erl \
- couch_batch_save_sup.erl \
couch_config.erl \
couch_config_writer.erl \
couch_db.erl \
@@ -113,8 +111,6 @@ compiled_files = \
couch.beam \
couch_app.beam \
couch_btree.beam \
- couch_batch_save.beam \
- couch_batch_save_sup.beam \
couch_config.beam \
couch_config_writer.beam \
couch_db.beam \
diff --git a/src/couchdb/couch.app.tpl.in b/src/couchdb/couch.app.tpl.in
index 84ac36ee..fa86d2ec 100644
--- a/src/couchdb/couch.app.tpl.in
+++ b/src/couchdb/couch.app.tpl.in
@@ -3,8 +3,6 @@
{vsn, "@version@"},
{modules, [@modules@]},
{registered, [
- couch_batch_save,
- couch_batch_save_sup,
couch_config,
couch_db_update,
couch_db_update_notifier_sup,
diff --git a/src/couchdb/couch_batch_save.erl b/src/couchdb/couch_batch_save.erl
deleted file mode 100644
index 600dd00e..00000000
--- a/src/couchdb/couch_batch_save.erl
+++ /dev/null
@@ -1,273 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_batch_save).
-
--behaviour(gen_server).
-
-%% API
--export([start_link/2, eventually_save_doc/3, commit_now/2]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--include("couch_db.hrl").
-
--record(batch_state, {
- batch_size=1000,
- batch_interval=1000
- }).
-
-%%====================================================================
-%% API
-%%====================================================================
-%%--------------------------------------------------------------------
-%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
-%% Description: Starts the server
-%%--------------------------------------------------------------------
-start_link(BatchSize, BatchInterval) ->
- gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []).
-
-%%--------------------------------------------------------------------
-%% Function: commit_doc(Doc) -> committed
-%% Description: Puts the doc into the set to commit. Does not reply until
-%% the commit is complete.
-%%--------------------------------------------------------------------
-eventually_save_doc(DbName, Doc, UserCtx) ->
- % find or create a process for the {DbName, UserCtx} pair
- {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx),
- % hand it the document
- ?LOG_DEBUG("sending doc to batch ~p",[Pid]),
- ok = send_doc_to_batch(Pid, Doc).
-
-%%--------------------------------------------------------------------
-%% Function: commit_now(DbName) -> committed
-%% Description: Commits all docs for the DB. Does not reply until
-%% the commit is complete.
-%%--------------------------------------------------------------------
-commit_now(DbName, UserCtx) ->
- % find the process for the {DbName, UserCtx} pair
- {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false),
- case Pid of
- none -> committed;
- _Else ->
- ok = send_commit(Pid),
- committed
- end.
-
-%%--------------------------------------------------------------------
-%% Function: commit_now() -> committed
-%% Description: Commits all docs for all DBs. Does not reply until
-%% the commit is complete.
-%%--------------------------------------------------------------------
-% commit_all() ->
-% committed = gen_server:call(couch_batch_save, commit_now, infinity).
-%
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Function: init([BatchSize, BatchInterval]) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server with the meanings
-%%--------------------------------------------------------------------
-init([BatchSize, BatchInterval]) ->
- ets:new(couch_batch_save_by_db, [set, public, named_table]),
- {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%%--------------------------------------------------------------------
-handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{
- batch_size=BatchSize,
- batch_interval=BatchInterval
- }=State) ->
- % Create the pid in a serialized process.
- % We checked before to see that we need the Pid, but the check is outside
- % the gen_server for parellelism. We check again here to ensure we don't
- % make a duplicate.
- Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
- [{_, Pid}] ->
- % we have a pid
- {ok, Pid};
- [] ->
- % no match
- % start and record the doc collector process
- ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]),
- Pid = spawn_link(fun() ->
- doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new)
- end),
- true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}),
- {ok, Pid}
- end,
- {reply, Resp, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%%--------------------------------------------------------------------
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%%--------------------------------------------------------------------
-% handle_info({'EXIT', Pid, Reason}, State) ->
-% {noreply, State};
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) ->
- % todo shutdown the interval loop
- % todo kill all the Pids and drop the ets table
- ok.
-
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-
-commit_user_docs(_DbName, _UserCtx, []) ->
- {ok, []};
-
-commit_user_docs(DbName, UserCtx, Docs) ->
- ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]),
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} ->
- try
- {ok, Revs} = couch_db:update_docs(Db, Docs),
- ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]),
- {ok, Revs}
- after
- couch_db:close(Db)
- end;
- Error ->
- throw(Error)
- end.
-
-% spawned to trigger commits on an interval
-commit_every_ms(Pid, BatchInterval) ->
- receive
- after BatchInterval ->
- ok = send_commit(Pid),
- commit_every_ms(Pid, BatchInterval)
- end.
-
-send_commit(Pid) ->
- Pid ! {self(), commit},
- receive
- {Pid, committed} ->
- ok;
- {'DOWN', _, _, Pid, _} ->
- exit(normal)
- end.
-
-batch_pid_for_db_and_user(DbName, UserCtx) ->
- batch_pid_for_db_and_user(DbName, UserCtx, true).
-
-batch_pid_for_db_and_user(DbName, UserCtx, Create) ->
- % look in the ets table
- case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
- [{_, Pid}] ->
- % we have a pid
- {ok, Pid};
- [] ->
- % no match
- if Create ->
- {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity),
- {ok, Pid};
- true ->
- {ok, none}
- end
- end.
-
-send_doc_to_batch(Pid, Doc) ->
- Pid ! {self(), add_doc, Doc},
- receive
- {Pid, doc_added} -> ok
- end.
-
-% the loop that holds documents between commits
-doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) ->
- % start a process that triggers commit every BatchInterval milliseconds
- Me = self(),
- spawn_link(fun() ->
- erlang:monitor(process, Me),
- commit_every_ms(Me, BatchInterval)
- end),
- doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []);
-
-doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize->
- collector_commit(DbName, UserCtx, BatchInterval, Docs),
- exit(normal);
-
-doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) ->
- receive
- {From, add_doc, Doc} ->
- From ! {self(), doc_added},
- doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]);
- {From, commit} ->
- collector_commit(DbName, UserCtx, BatchInterval, Docs),
- From ! {self(), committed},
- exit(normal)
- end.
-
-collector_commit(DbName, UserCtx, BatchInterval, Docs) ->
- % unregister
- unregister_collector(DbName, UserCtx, self()),
- % wait and collect
- Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs),
- {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2).
-
-unregister_collector(DbName, UserCtx, Pid) ->
- % remove from ets
- ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}).
-
-shutdown_collector(DbName, UserCtx, BatchInterval, Docs) ->
- receive
- {From, add_doc, Doc} ->
- From ! {self(), doc_added},
- shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs])
- % this interval will be waited for each time ensure-full-commit is called
- after BatchInterval ->
- Docs
- end.
diff --git a/src/couchdb/couch_batch_save_sup.erl b/src/couchdb/couch_batch_save_sup.erl
deleted file mode 100644
index c18e2c1c..00000000
--- a/src/couchdb/couch_batch_save_sup.erl
+++ /dev/null
@@ -1,37 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_batch_save_sup).
-
--behaviour(supervisor).
-
--export([start_link/0,init/1]).
-
-start_link() ->
- supervisor:start_link({local, couch_batch_save_sup},
- couch_batch_save_sup, []).
-
-init([]) ->
- Self = self(),
- ok = couch_config:register(
- fun("couchdb", _) ->
- exit(Self, reload_config)
- end),
-
- BatchSize = list_to_integer(couch_config:get("couchdb",
- "batch_save_size","1000")),
- BatchInterval = list_to_integer(couch_config:get("couchdb",
- "batch_save_interval","1000")),
-
- Batch = {batch, {couch_batch_save, start_link, [BatchSize, BatchInterval]},
- permanent, 1000, worker, [couch_batch_save]},
- {ok, {{one_for_one, 10, 3600}, [Batch]}}.
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 736b80aa..2dbb88a3 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -586,25 +586,46 @@ set_commit_option(Options) ->
[full_commit|Options]
end.
+collect_results(UpdatePid, MRef, ResultsAcc) ->
+ receive
+ {result, UpdatePid, Result} ->
+ collect_results(UpdatePid, MRef, [Result | ResultsAcc]);
+ {done, UpdatePid} ->
+ {ok, ResultsAcc};
+ {retry, UpdatePid} ->
+ retry;
+ {'DOWN', MRef, _, _, Reason} ->
+ exit(Reason)
+ end.
+
write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
NonRepDocs, Options0) ->
Options = set_commit_option(Options0),
- case gen_server:call(UpdatePid,
- {update_docs, DocBuckets, NonRepDocs, Options}, infinity) of
- {ok, Results} -> {ok, Results};
- retry ->
- % This can happen if the db file we wrote to was swapped out by
- % compaction. Retry by reopening the db and writing to the current file
- {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
- DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- % We only retry once
- close(Db2),
- case gen_server:call(UpdatePid, {update_docs, DocBuckets2, NonRepDocs, Options}, infinity) of
+ MergeConflicts = lists:member(merge_conflicts, Options),
+ FullCommit = lists:member(full_commit, Options),
+ MRef = erlang:monitor(process, UpdatePid),
+ try
+ UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(UpdatePid, MRef, []) of
{ok, Results} -> {ok, Results};
- retry -> throw({update_error, compaction_retry})
+ retry ->
+ % This can happen if the db file we wrote to was swapped out by
+ % compaction. Retry by reopening the db and writing to the current file
+ {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
+ DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ % We only retry once
+ close(Db2),
+ UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(UpdatePid, MRef, []) of
+ {ok, Results} -> {ok, Results};
+ retry -> throw({update_error, compaction_retry})
+ end
end
+ after
+ erlang:demonitor(MRef, [flush])
end.
+
set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
Doc#doc{atts= lists:map(fun(#att{data={_Fd,_Sp}}=Att) ->
% already commited to disk, do not set new rev
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index a13f9955..96a59944 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -43,19 +43,6 @@ terminate(Reason, _Srv) ->
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
-handle_call({update_docs, GroupedDocs, NonRepDocs, Options}, _From, Db) ->
- try update_docs_int(Db, GroupedDocs, NonRepDocs, Options) of
- {ok, Failures, Db2} ->
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- if Db2#db.update_seq /= Db#db.update_seq ->
- couch_db_update_notifier:notify({updated, Db2#db.name});
- true -> ok
- end,
- {reply, {ok, Failures}, Db2}
- catch
- throw: retry ->
- {reply, retry, Db}
- end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
handle_call(full_commit, _From, Db) ->
@@ -192,6 +179,63 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{noreply, Db2}
end.
+
+merge_updates([], RestB, AccOutGroups) ->
+ lists:reverse(AccOutGroups, RestB);
+merge_updates(RestA, [], AccOutGroups) ->
+ lists:reverse(AccOutGroups, RestA);
+merge_updates([[{_, #doc{id=IdA}}|_]=GroupA | RestA],
+ [[{_, #doc{id=IdB}}|_]=GroupB | RestB], AccOutGroups) ->
+ if IdA == IdB ->
+ merge_updates(RestA, RestB, [GroupA ++ GroupB | AccOutGroups]);
+ IdA < IdB ->
+ merge_updates(RestA, [GroupB | RestB], [GroupA | AccOutGroups]);
+ true ->
+ merge_updates([GroupA | RestA], RestB, [GroupB | AccOutGroups])
+ end.
+
+collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
+ receive
+ % only collect updates with the same MergeConflicts flag and without
+ % local docs. Makes it easier to avoid multiple local doc updaters.
+ {update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
+ GroupedDocs2 = [[{Client, Doc} || Doc <- DocGroup]
+ || DocGroup <- GroupedDocs],
+ GroupedDocsAcc2 =
+ merge_updates(GroupedDocsAcc, GroupedDocs2, []),
+ collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
+ MergeConflicts, (FullCommit or FullCommit2))
+ after 0 ->
+ {GroupedDocsAcc, ClientsAcc, FullCommit}
+ end.
+
+handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
+ FullCommit}, Db) ->
+ GroupedDocs2 = [[{Client, D} || D <- DocGroup] || DocGroup <- GroupedDocs],
+ if NonRepDocs == [] ->
+ {GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
+ [Client], MergeConflicts, FullCommit);
+ true ->
+ GroupedDocs3 = GroupedDocs2,
+ FullCommit2 = FullCommit,
+ Clients = [Client]
+ end,
+ NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
+ try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
+ FullCommit2) of
+ {ok, Db2} ->
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ if Db2#db.update_seq /= Db#db.update_seq ->
+ couch_db_update_notifier:notify({updated, Db2#db.name});
+ true -> ok
+ end,
+ [catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
+ {noreply, Db2}
+ catch
+ throw: retry ->
+ [catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
+ {noreply, Db}
+ end;
handle_info(delayed_commit, Db) ->
{noreply, commit_data(Db)}.
@@ -399,18 +443,24 @@ flush_trees(#db{fd=Fd,header=Header}=Db,
end, Unflushed),
flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccConflicts, AccSeq};
+
+send_result(Client, Id, OriginalRevs, NewResult) ->
+ % used to send a result to the client
+ catch(Client ! {result, self(), {{Id, OriginalRevs}, NewResult}}).
+
+merge_rev_trees(_MergeConflicts, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
+ {ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccConflicts, AccSeq) ->
+ [OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
#full_doc_info{id=Id,rev_tree=OldTree,deleted=OldDeleted,update_seq=OldSeq}
= OldDocInfo,
- {NewRevTree, NewConflicts} = lists:foldl(
- fun(#doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc, {AccTree, AccConflicts2}) ->
+ NewRevTree = lists:foldl(
+ fun({Client, #doc{revs={Pos,[_Rev|PrevRevs]}}=NewDoc}, AccTree) ->
if not MergeConflicts ->
case couch_key_tree:merge(AccTree, [couch_db:doc_to_tree(NewDoc)]) of
{_NewTree, conflicts} when (not OldDeleted) ->
- {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]};
+ send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ AccTree;
{NewTree, no_conflicts} when AccTree == NewTree ->
% the tree didn't change at all
% meaning we are saving a rev that's already
@@ -426,26 +476,28 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
{NewTree2, _} = couch_key_tree:merge(AccTree,
[couch_db:doc_to_tree(NewDoc2)]),
- % we changed the rev id, this tells the caller we did.
- {NewTree2, [{{Id, {Pos-1,PrevRevs}}, {ok, {OldPos + 1, NewRevId}}}
- | AccConflicts2]};
+ % we changed the rev id, this tells the caller we did
+ send_result(Client, Id, {Pos-1,PrevRevs},
+ {ok, {OldPos + 1, NewRevId}}),
+ NewTree2;
true ->
- {AccTree, [{{Id, {Pos-1,PrevRevs}}, conflict} | AccConflicts2]}
+ send_result(Client, Id, {Pos-1,PrevRevs}, conflict),
+ AccTree
end;
{NewTree, _} ->
- {NewTree, AccConflicts2}
+ NewTree
end;
true ->
{NewTree, _} = couch_key_tree:merge(AccTree,
[couch_db:doc_to_tree(NewDoc)]),
- {NewTree, AccConflicts2}
+ NewTree
end
end,
- {OldTree, AccConflicts}, NewDocs),
+ OldTree, NewDocs),
if NewRevTree == OldTree ->
% nothing changed
merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo, AccNewInfos,
- AccRemoveSeqs, NewConflicts, AccSeq);
+ AccRemoveSeqs, AccSeq);
true ->
% we have updated the document, give it a new seq #
NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
@@ -454,7 +506,7 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
_ -> [OldSeq | AccRemoveSeqs]
end,
merge_rev_trees(MergeConflicts, RestDocsList, RestOldInfo,
- [NewInfo|AccNewInfos], RemoveSeqs, NewConflicts, AccSeq+1)
+ [NewInfo|AccNewInfos], RemoveSeqs, AccSeq+1)
end.
@@ -473,13 +525,13 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
[Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
#full_doc_info{rev_tree=Tree}=Info <- DocInfos].
-update_docs_int(Db, DocsList, NonRepDocs, Options) ->
+update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
#db{
fulldocinfo_by_id_btree = DocInfoByIdBTree,
docinfo_by_seq_btree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
- Ids = [Id || [#doc{id=Id}|_] <- DocsList],
+ Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
@@ -489,17 +541,15 @@ update_docs_int(Db, DocsList, NonRepDocs, Options) ->
#full_doc_info{id=Id}
end,
Ids, OldDocLookups),
-
% Merge the new docs into the revision trees.
- {ok, NewDocInfos0, RemoveSeqs, Conflicts, NewSeq} = merge_rev_trees(
- lists:member(merge_conflicts, Options),
- DocsList, OldDocInfos, [], [], [], LastSeq),
+ {ok, NewDocInfos0, RemoveSeqs, NewSeq} = merge_rev_trees(
+ MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
NewFullDocInfos = stem_full_doc_infos(Db, NewDocInfos0),
% All documents are now ready to write.
- {ok, LocalConflicts, Db2} = update_local_docs(Db, NonRepDocs),
+ {ok, Db2} = update_local_docs(Db, NonRepDocs),
% Write out the document summaries (the bodies are stored in the nodes of
% the trees, the attachments are already written to disk)
@@ -526,15 +576,14 @@ update_docs_int(Db, DocsList, NonRepDocs, Options) ->
Db4 = refresh_validate_doc_funs(Db3)
end,
- {ok, LocalConflicts ++ Conflicts,
- commit_data(Db4, not lists:member(full_commit, Options))}.
+ {ok, commit_data(Db4, not FullCommit)}.
update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
- Ids = [Id || #doc{id=Id} <- Docs],
+ Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun(#doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}, OldDocLookup) ->
+ fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) ->
case PrevRevs of
[RevStr|_] ->
PrevRev = list_to_integer(?b2l(RevStr));
@@ -549,28 +598,28 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
case OldRev == PrevRev of
true ->
case Delete of
- false -> {update, {Id, {PrevRev + 1, PrevRevs, Body}}};
- true -> {remove, Id, PrevRevs}
+ false ->
+ send_result(Client, Id, {0, PrevRevs}, {ok,
+ {0, ?l2b(integer_to_list(PrevRev + 1))}}),
+ {update, {Id, {PrevRev + 1, Body}}};
+ true ->
+ send_result(Client, Id, {0, PrevRevs},
+ {ok, {0, <<"0">>}}),
+ {remove, Id}
end;
false ->
- {conflict, {Id, {0, PrevRevs}}}
+ send_result(Client, Id, {0, PrevRevs}, conflict),
+ ignore
end
end, Docs, OldDocLookups),
- BtreeIdsRemove = [Id || {remove, Id, _PrevRevs} <- BtreeEntries],
- BtreeIdsUpdate = [{Id, {NewRev, Body}} || {update, {Id, {NewRev, _OldRevs, Body}}} <- BtreeEntries],
- Results =
- [{{Id, {0, PrevRevs}}, {ok, {0, <<"0">>}}}
- || {remove, Id, PrevRevs} <- BtreeEntries] ++
- [{{Id, {0, PrevRevs}}, {ok, {0, ?l2b(integer_to_list(NewRev))}}}
- || {update, {Id, {NewRev, PrevRevs, _Body}}} <- BtreeEntries] ++
- [{IdRevs, conflict}
- || {conflict, IdRevs} <- BtreeEntries],
+ BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+ BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
- {ok, Results, Db#db{local_docs_btree = Btree2}}.
+ {ok, Db#db{local_docs_btree = Btree2}}.
commit_data(Db) ->
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 3945498d..f7fd24a8 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -349,9 +349,15 @@ db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) ->
DocId = Doc2#doc.id,
case couch_httpd:qs_value(Req, "batch") of
"ok" ->
- % batch
- ok = couch_batch_save:eventually_save_doc(
- Db#db.name, Doc2, Db#db.user_ctx),
+ % async_batching
+ spawn(fun() ->
+ case catch(couch_db:update_doc(Db, Doc2, [])) of
+ {ok, _} -> ok;
+ Error ->
+ ?LOG_INFO("Batch doc error (~s): ~p",[DocId, Error])
+ end
+ end),
+
send_json(Req, 202, [], {[
{ok, true},
{id, DocId}
@@ -378,7 +384,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) -
{ok, StartTime} =
case couch_httpd:qs_value(Req, "seq") of
undefined ->
- committed = couch_batch_save:commit_now(Db#db.name, Db#db.user_ctx),
couch_db:ensure_full_commit(Db);
RequiredStr ->
RequiredSeq = list_to_integer(RequiredStr),
@@ -749,7 +754,14 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
"ok" ->
% batch
Doc = couch_doc_from_req(Req, DocId, Json),
- ok = couch_batch_save:eventually_save_doc(Db#db.name, Doc, Db#db.user_ctx),
+
+ spawn(fun() ->
+ case catch(couch_db:update_doc(Db, Doc, [])) of
+ {ok, _} -> ok;
+ Error ->
+ ?LOG_INFO("Batch doc error (~s): ~p",[DocId, Error])
+ end
+ end),
send_json(Req, 202, [], {[
{ok, true},
{id, DocId}
diff --git a/test/etap/001-load.t b/test/etap/001-load.t
index 619b289b..66136ed8 100755
--- a/test/etap/001-load.t
+++ b/test/etap/001-load.t
@@ -21,8 +21,6 @@ main(_) ->
code:add_pathz("src/couchdb"),
etap:plan(39),
Modules = [
- couch_batch_save,
- couch_batch_save_sup,
couch_btree,
couch_config,
couch_config_writer,