From 7c05a60479bacc7acbf6f704285a4ab2981ba02b Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Mon, 4 May 2009 19:59:39 +0000 Subject: Use batch=ok query param for document PUT and POST to defer index updates until a threshold of documents (or amount of time) has been passed. This option returns a 202 Accepted response instead of a 201 Created, so do not use it for applications which require all data to be saved safely to disk. It is ideal for applications like logging where losing some events in a crash will be ok. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@771418 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/Makefile.am | 4 + src/couchdb/couch_batch_save.erl | 267 +++++++++++++++++++++++++++++++++++ src/couchdb/couch_batch_save_sup.erl | 37 +++++ src/couchdb/couch_httpd_db.erl | 82 +++++++---- 4 files changed, 365 insertions(+), 25 deletions(-) create mode 100644 src/couchdb/couch_batch_save.erl create mode 100644 src/couchdb/couch_batch_save_sup.erl (limited to 'src/couchdb') diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 8b782221..7105300b 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -41,6 +41,8 @@ CLEANFILES = $(compiled_files) $(doc_base) source_files = \ couch_btree.erl \ + couch_batch_save.erl \ + couch_batch_save_sup.erl \ couch_config.erl \ couch_config_writer.erl \ couch_db.erl \ @@ -83,6 +85,8 @@ EXTRA_DIST = $(source_files) couch_db.hrl couch_stats.hrl compiled_files = \ couch.app \ couch_btree.beam \ + couch_batch_save.beam \ + couch_batch_save_sup.beam \ couch_config.beam \ couch_config_writer.beam \ couch_db.beam \ diff --git a/src/couchdb/couch_batch_save.erl b/src/couchdb/couch_batch_save.erl new file mode 100644 index 00000000..43a6f2dd --- /dev/null +++ b/src/couchdb/couch_batch_save.erl @@ -0,0 +1,267 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_batch_save). + +-behaviour(gen_server). + +%% API +-export([start_link/2, eventually_save_doc/3, commit_now/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("couch_db.hrl"). + +-record(batch_state, { + batch_size=1000, + batch_interval=1000 + }). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link(BatchSize, BatchInterval) -> + gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []). + +%%-------------------------------------------------------------------- +%% Function: commit_doc(Doc) -> committed +%% Description: Puts the doc into the set to commit. Does not reply until +%% the commit is complete. +%%-------------------------------------------------------------------- +eventually_save_doc(DbName, Doc, UserCtx) -> + % find or create a process for the {DbName, UserCtx} pair + {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx), + % hand it the document + ?LOG_DEBUG("sending doc to batch ~p",[Pid]), + ok = send_doc_to_batch(Pid, Doc). + +%%-------------------------------------------------------------------- +%% Function: commit_now(DbName) -> committed +%% Description: Commits all docs for the DB. Does not reply until +%% the commit is complete. +%%-------------------------------------------------------------------- +commit_now(DbName, UserCtx) -> + % find the process for the {DbName, UserCtx} pair + {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false), + case Pid of + none -> committed; + _Else -> + ok = send_commit(Pid), + committed + end. + +%%-------------------------------------------------------------------- +%% Function: commit_now() -> committed +%% Description: Commits all docs for all DBs. Does not reply until +%% the commit is complete. +%%-------------------------------------------------------------------- +% commit_all() -> +% committed = gen_server:call(couch_batch_save, commit_now, infinity). +% + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init([BatchSize, BatchInterval]) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server with the meanings +%%-------------------------------------------------------------------- +init([BatchSize, BatchInterval]) -> + ets:new(couch_batch_save_by_db, [set, public, named_table]), + {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{ + batch_size=BatchSize, + batch_interval=BatchInterval + }=State) -> + % Create the pid in a serialized process. + % We checked before to see that we need the Pid, but the check is outside + % the gen_server for parellelism. We check again here to ensure we don't + % make a duplicate. + Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of + [{_, Pid}] -> + % we have a pid + {ok, Pid}; + [] -> + % no match + % start and record the doc collector process + ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]), + Pid = spawn_link(fun() -> + doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) + end), + true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}), + {ok, Pid} + end, + {reply, Resp, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +% handle_info({'EXIT', Pid, Reason}, State) -> +% {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + % todo shutdown the interval loop + % todo kill all the Pids and drop the ets table + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +commit_user_docs(_DbName, _UserCtx, []) -> + {ok, []}; + +commit_user_docs(DbName, UserCtx, Docs) -> + ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]), + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + try + {ok, Revs} = couch_db:update_docs(Db, Docs), + ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]), + {ok, Revs} + after + couch_db:close(Db) + end; + Error -> + throw(Error) + end. + +% spawned to trigger commits on an interval +commit_every_ms(Pid, BatchInterval) -> + receive + after BatchInterval -> + ok = send_commit(Pid), + commit_every_ms(Pid, BatchInterval) + end. + +send_commit(Pid) -> + Pid ! {self(), commit}, + receive + {Pid, committed} -> + ok + end. + +batch_pid_for_db_and_user(DbName, UserCtx) -> + batch_pid_for_db_and_user(DbName, UserCtx, true). + +batch_pid_for_db_and_user(DbName, UserCtx, Create) -> + % look in the ets table + case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of + [{_, Pid}] -> + % we have a pid + {ok, Pid}; + [] -> + % no match + if Create -> + {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity), + {ok, Pid}; + true -> + {ok, none} + end + end. + +send_doc_to_batch(Pid, Doc) -> + Pid ! {self(), add_doc, Doc}, + receive + {Pid, doc_added} -> ok + end. + +% the loop that holds documents between commits +doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) -> + % start a process that triggers commit every BatchInterval milliseconds + _IntervalPid = spawn_link(fun() -> commit_every_ms(self(), BatchInterval) end), + doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []); + +doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize-> + collector_commit(DbName, UserCtx, BatchInterval, Docs), + exit(normal); + +doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) -> + receive + {From, add_doc, Doc} -> + From ! {self(), doc_added}, + doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]); + {From, commit} -> + collector_commit(DbName, UserCtx, BatchInterval, Docs), + From ! {self(), committed}, + exit(normal) + end. + +collector_commit(DbName, UserCtx, BatchInterval, Docs) -> + % unregister + unregister_collector(DbName, UserCtx, self()), + % wait and collect + Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs), + {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2). + +unregister_collector(DbName, UserCtx, Pid) -> + % remove from ets + ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}). + +shutdown_collector(DbName, UserCtx, BatchInterval, Docs) -> + receive + {From, add_doc, Doc} -> + From ! {self(), doc_added}, + shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs]) + % this interval will be waited for each time ensure-full-commit is called + after BatchInterval -> + Docs + end. \ No newline at end of file diff --git a/src/couchdb/couch_batch_save_sup.erl b/src/couchdb/couch_batch_save_sup.erl new file mode 100644 index 00000000..42cf1aba --- /dev/null +++ b/src/couchdb/couch_batch_save_sup.erl @@ -0,0 +1,37 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_batch_save_sup). + +-behaviour(supervisor). + +-export([start_link/0,init/1]). + +start_link() -> + supervisor:start_link({local, couch_batch_save_sup}, + couch_batch_save_sup, []). + +init([]) -> + Self = self(), + ok = couch_config:register( + fun("couchdb", _) -> + exit(Self, reload_config) + end), + + BatchSize = list_to_integer(couch_config:get("couchdb", + "batch_save_size","1000")), + BatchInterval = list_to_integer(couch_config:get("couchdb", + "batch_save_interval","1000")), + + Batch = {batch, {couch_batch_save, start_link, [BatchSize, BatchInterval]}, + permanent, 1000, worker, [batch_save]}, + {ok, {{one_for_one, 10, 3600}, [Batch]}}. diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 4fa5a9b6..29430cb6 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -102,19 +102,34 @@ db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) -> db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) -> Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req)), DocId = couch_util:new_uuid(), - {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []), - DocUrl = absolute_uri(Req, - binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)), - send_json(Req, 201, [{"Location", DocUrl}], {[ - {ok, true}, - {id, DocId}, - {rev, couch_doc:rev_to_str(NewRev)} - ]}); + case couch_httpd:qs_value(Req, "batch") of + "ok" -> + % batch + ok = couch_batch_save:eventually_save_doc(Db#db.name, + Doc#doc{id=DocId}, Db#db.user_ctx), + send_json(Req, 202, [], {[ + {ok, true}, + {id, DocId} + ]}); + _Normal -> + % normal + {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []), + DocUrl = absolute_uri(Req, + binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)), + send_json(Req, 201, [{"Location", DocUrl}], {[ + {ok, true}, + {id, DocId}, + {rev, couch_doc:rev_to_str(NewRev)} + ]}) + end; + db_req(#httpd{path_parts=[_DbName]}=Req, _Db) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST"); db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) -> + % make the batch save + committed = couch_batch_save:commit_now(Db#db.name, Db#db.user_ctx), {ok, DbStartTime} = couch_db:ensure_full_commit(Db), send_json(Req, 201, {[ {ok, true}, @@ -527,9 +542,21 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> couch_doc:validate_docid(DocId), - Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)), - update_doc(Req, Db, DocId, couch_httpd:json_body(Req), - [{"Location", Location}]); + Json = couch_httpd:json_body(Req), + case couch_httpd:qs_value(Req, "batch") of + "ok" -> + % batch + Doc = couch_doc_from_req(Req, DocId, Json), + ok = couch_batch_save:eventually_save_doc(Db#db.name, Doc, Db#db.user_ctx), + send_json(Req, 202, [], {[ + {ok, true}, + {id, DocId} + ]}); + _Normal -> + % normal + Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)), + update_doc(Req, Db, DocId, Json, [{"Location", Location}]) + end; db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) -> SourceRev = @@ -565,19 +592,7 @@ update_doc(Req, Db, DocId, Json) -> update_doc(Req, Db, DocId, Json, []). update_doc(Req, Db, DocId, Json, Headers) -> - #doc{deleted=Deleted} = Doc = couch_doc:from_json_obj(Json), - validate_attachment_names(Doc), - ExplicitDocRev = - case Doc#doc.revs of - {Start,[RevId|_]} -> {Start, RevId}; - _ -> undefined - end, - case extract_header_rev(Req, ExplicitDocRev) of - missing_rev -> - Revs = {0, []}; - {Pos, Rev} -> - Revs = {Pos, [Rev]} - end, + #doc{deleted=Deleted} = Doc = couch_doc_from_req(Req, DocId, Json), case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of "true" -> @@ -585,7 +600,7 @@ update_doc(Req, Db, DocId, Json, Headers) -> _ -> Options = [] end, - {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options), + {ok, NewRev} = couch_db:update_doc(Db, Doc, Options), NewRevStr = couch_doc:rev_to_str(NewRev), ResponseHeaders = [{"Etag", <<"\"", NewRevStr/binary, "\"">>}] ++ Headers, send_json(Req, if Deleted -> 200; true -> 201 end, @@ -594,6 +609,23 @@ update_doc(Req, Db, DocId, Json, Headers) -> {id, DocId}, {rev, NewRevStr}]}). +couch_doc_from_req(Req, DocId, Json) -> + Doc = couch_doc:from_json_obj(Json), + validate_attachment_names(Doc), + ExplicitDocRev = + case Doc#doc.revs of + {Start,[RevId|_]} -> {Start, RevId}; + _ -> undefined + end, + case extract_header_rev(Req, ExplicitDocRev) of + missing_rev -> + Revs = {0, []}; + {Pos, Rev} -> + Revs = {Pos, [Rev]} + end, + Doc#doc{id=DocId, revs=Revs}. + + % Useful for debugging % couch_doc_open(Db, DocId) -> % couch_doc_open(Db, DocId, [], []). -- cgit v1.2.3