diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/Makefile.am | 4 | ||||
-rw-r--r-- | src/couchdb/couch_batch_save.erl | 267 | ||||
-rw-r--r-- | src/couchdb/couch_batch_save_sup.erl | 37 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 82 |
4 files changed, 365 insertions, 25 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 8b782221..7105300b 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -41,6 +41,8 @@ CLEANFILES = $(compiled_files) $(doc_base) source_files = \ couch_btree.erl \ + couch_batch_save.erl \ + couch_batch_save_sup.erl \ couch_config.erl \ couch_config_writer.erl \ couch_db.erl \ @@ -83,6 +85,8 @@ EXTRA_DIST = $(source_files) couch_db.hrl couch_stats.hrl compiled_files = \ couch.app \ couch_btree.beam \ + couch_batch_save.beam \ + couch_batch_save_sup.beam \ couch_config.beam \ couch_config_writer.beam \ couch_db.beam \ diff --git a/src/couchdb/couch_batch_save.erl b/src/couchdb/couch_batch_save.erl new file mode 100644 index 00000000..43a6f2dd --- /dev/null +++ b/src/couchdb/couch_batch_save.erl @@ -0,0 +1,267 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_batch_save). + +-behaviour(gen_server). + +%% API +-export([start_link/2, eventually_save_doc/3, commit_now/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("couch_db.hrl"). + +-record(batch_state, { + batch_size=1000, + batch_interval=1000 + }). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link(BatchSize, BatchInterval) -> + gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []). + +%%-------------------------------------------------------------------- +%% Function: commit_doc(Doc) -> committed +%% Description: Puts the doc into the set to commit. Does not reply until +%% the commit is complete. +%%-------------------------------------------------------------------- +eventually_save_doc(DbName, Doc, UserCtx) -> + % find or create a process for the {DbName, UserCtx} pair + {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx), + % hand it the document + ?LOG_DEBUG("sending doc to batch ~p",[Pid]), + ok = send_doc_to_batch(Pid, Doc). + +%%-------------------------------------------------------------------- +%% Function: commit_now(DbName) -> committed +%% Description: Commits all docs for the DB. Does not reply until +%% the commit is complete. +%%-------------------------------------------------------------------- +commit_now(DbName, UserCtx) -> + % find the process for the {DbName, UserCtx} pair + {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false), + case Pid of + none -> committed; + _Else -> + ok = send_commit(Pid), + committed + end. + +%%-------------------------------------------------------------------- +%% Function: commit_now() -> committed +%% Description: Commits all docs for all DBs. Does not reply until +%% the commit is complete. +%%-------------------------------------------------------------------- +% commit_all() -> +% committed = gen_server:call(couch_batch_save, commit_now, infinity). +% + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init([BatchSize, BatchInterval]) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server with the meanings +%%-------------------------------------------------------------------- +init([BatchSize, BatchInterval]) -> + ets:new(couch_batch_save_by_db, [set, public, named_table]), + {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{ + batch_size=BatchSize, + batch_interval=BatchInterval + }=State) -> + % Create the pid in a serialized process. + % We checked before to see that we need the Pid, but the check is outside + % the gen_server for parellelism. We check again here to ensure we don't + % make a duplicate. + Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of + [{_, Pid}] -> + % we have a pid + {ok, Pid}; + [] -> + % no match + % start and record the doc collector process + ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]), + Pid = spawn_link(fun() -> + doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) + end), + true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}), + {ok, Pid} + end, + {reply, Resp, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +% handle_info({'EXIT', Pid, Reason}, State) -> +% {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + % todo shutdown the interval loop + % todo kill all the Pids and drop the ets table + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +commit_user_docs(_DbName, _UserCtx, []) -> + {ok, []}; + +commit_user_docs(DbName, UserCtx, Docs) -> + ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]), + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + try + {ok, Revs} = couch_db:update_docs(Db, Docs), + ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]), + {ok, Revs} + after + couch_db:close(Db) + end; + Error -> + throw(Error) + end. + +% spawned to trigger commits on an interval +commit_every_ms(Pid, BatchInterval) -> + receive + after BatchInterval -> + ok = send_commit(Pid), + commit_every_ms(Pid, BatchInterval) + end. + +send_commit(Pid) -> + Pid ! {self(), commit}, + receive + {Pid, committed} -> + ok + end. + +batch_pid_for_db_and_user(DbName, UserCtx) -> + batch_pid_for_db_and_user(DbName, UserCtx, true). + +batch_pid_for_db_and_user(DbName, UserCtx, Create) -> + % look in the ets table + case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of + [{_, Pid}] -> + % we have a pid + {ok, Pid}; + [] -> + % no match + if Create -> + {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity), + {ok, Pid}; + true -> + {ok, none} + end + end. + +send_doc_to_batch(Pid, Doc) -> + Pid ! {self(), add_doc, Doc}, + receive + {Pid, doc_added} -> ok + end. + +% the loop that holds documents between commits +doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) -> + % start a process that triggers commit every BatchInterval milliseconds + _IntervalPid = spawn_link(fun() -> commit_every_ms(self(), BatchInterval) end), + doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []); + +doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize-> + collector_commit(DbName, UserCtx, BatchInterval, Docs), + exit(normal); + +doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) -> + receive + {From, add_doc, Doc} -> + From ! {self(), doc_added}, + doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]); + {From, commit} -> + collector_commit(DbName, UserCtx, BatchInterval, Docs), + From ! {self(), committed}, + exit(normal) + end. + +collector_commit(DbName, UserCtx, BatchInterval, Docs) -> + % unregister + unregister_collector(DbName, UserCtx, self()), + % wait and collect + Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs), + {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2). + +unregister_collector(DbName, UserCtx, Pid) -> + % remove from ets + ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}). + +shutdown_collector(DbName, UserCtx, BatchInterval, Docs) -> + receive + {From, add_doc, Doc} -> + From ! {self(), doc_added}, + shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs]) + % this interval will be waited for each time ensure-full-commit is called + after BatchInterval -> + Docs + end.
\ No newline at end of file diff --git a/src/couchdb/couch_batch_save_sup.erl b/src/couchdb/couch_batch_save_sup.erl new file mode 100644 index 00000000..42cf1aba --- /dev/null +++ b/src/couchdb/couch_batch_save_sup.erl @@ -0,0 +1,37 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_batch_save_sup). + +-behaviour(supervisor). + +-export([start_link/0,init/1]). + +start_link() -> + supervisor:start_link({local, couch_batch_save_sup}, + couch_batch_save_sup, []). + +init([]) -> + Self = self(), + ok = couch_config:register( + fun("couchdb", _) -> + exit(Self, reload_config) + end), + + BatchSize = list_to_integer(couch_config:get("couchdb", + "batch_save_size","1000")), + BatchInterval = list_to_integer(couch_config:get("couchdb", + "batch_save_interval","1000")), + + Batch = {batch, {couch_batch_save, start_link, [BatchSize, BatchInterval]}, + permanent, 1000, worker, [batch_save]}, + {ok, {{one_for_one, 10, 3600}, [Batch]}}. diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 4fa5a9b6..29430cb6 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -102,19 +102,34 @@ db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) -> db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) -> Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req)), DocId = couch_util:new_uuid(), - {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []), - DocUrl = absolute_uri(Req, - binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)), - send_json(Req, 201, [{"Location", DocUrl}], {[ - {ok, true}, - {id, DocId}, - {rev, couch_doc:rev_to_str(NewRev)} - ]}); + case couch_httpd:qs_value(Req, "batch") of + "ok" -> + % batch + ok = couch_batch_save:eventually_save_doc(Db#db.name, + Doc#doc{id=DocId}, Db#db.user_ctx), + send_json(Req, 202, [], {[ + {ok, true}, + {id, DocId} + ]}); + _Normal -> + % normal + {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []), + DocUrl = absolute_uri(Req, + binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)), + send_json(Req, 201, [{"Location", DocUrl}], {[ + {ok, true}, + {id, DocId}, + {rev, couch_doc:rev_to_str(NewRev)} + ]}) + end; + db_req(#httpd{path_parts=[_DbName]}=Req, _Db) -> send_method_not_allowed(Req, "DELETE,GET,HEAD,POST"); db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) -> + % make the batch save + committed = couch_batch_save:commit_now(Db#db.name, Db#db.user_ctx), {ok, DbStartTime} = couch_db:ensure_full_commit(Db), send_json(Req, 201, {[ {ok, true}, @@ -527,9 +542,21 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> couch_doc:validate_docid(DocId), - Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)), - update_doc(Req, Db, DocId, couch_httpd:json_body(Req), - [{"Location", Location}]); + Json = couch_httpd:json_body(Req), + case couch_httpd:qs_value(Req, "batch") of + "ok" -> + % batch + Doc = couch_doc_from_req(Req, DocId, Json), + ok = couch_batch_save:eventually_save_doc(Db#db.name, Doc, Db#db.user_ctx), + send_json(Req, 202, [], {[ + {ok, true}, + {id, DocId} + ]}); + _Normal -> + % normal + Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)), + update_doc(Req, Db, DocId, Json, [{"Location", Location}]) + end; db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) -> SourceRev = @@ -565,19 +592,7 @@ update_doc(Req, Db, DocId, Json) -> update_doc(Req, Db, DocId, Json, []). update_doc(Req, Db, DocId, Json, Headers) -> - #doc{deleted=Deleted} = Doc = couch_doc:from_json_obj(Json), - validate_attachment_names(Doc), - ExplicitDocRev = - case Doc#doc.revs of - {Start,[RevId|_]} -> {Start, RevId}; - _ -> undefined - end, - case extract_header_rev(Req, ExplicitDocRev) of - missing_rev -> - Revs = {0, []}; - {Pos, Rev} -> - Revs = {Pos, [Rev]} - end, + #doc{deleted=Deleted} = Doc = couch_doc_from_req(Req, DocId, Json), case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of "true" -> @@ -585,7 +600,7 @@ update_doc(Req, Db, DocId, Json, Headers) -> _ -> Options = [] end, - {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options), + {ok, NewRev} = couch_db:update_doc(Db, Doc, Options), NewRevStr = couch_doc:rev_to_str(NewRev), ResponseHeaders = [{"Etag", <<"\"", NewRevStr/binary, "\"">>}] ++ Headers, send_json(Req, if Deleted -> 200; true -> 201 end, @@ -594,6 +609,23 @@ update_doc(Req, Db, DocId, Json, Headers) -> {id, DocId}, {rev, NewRevStr}]}). +couch_doc_from_req(Req, DocId, Json) -> + Doc = couch_doc:from_json_obj(Json), + validate_attachment_names(Doc), + ExplicitDocRev = + case Doc#doc.revs of + {Start,[RevId|_]} -> {Start, RevId}; + _ -> undefined + end, + case extract_header_rev(Req, ExplicitDocRev) of + missing_rev -> + Revs = {0, []}; + {Pos, Rev} -> + Revs = {Pos, [Rev]} + end, + Doc#doc{id=DocId, revs=Revs}. + + % Useful for debugging % couch_doc_open(Db, DocId) -> % couch_doc_open(Db, DocId, [], []). |