summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/Makefile.am4
-rw-r--r--src/couchdb/couch_batch_save.erl267
-rw-r--r--src/couchdb/couch_batch_save_sup.erl37
-rw-r--r--src/couchdb/couch_httpd_db.erl82
4 files changed, 365 insertions, 25 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 8b782221..7105300b 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -41,6 +41,8 @@ CLEANFILES = $(compiled_files) $(doc_base)
source_files = \
couch_btree.erl \
+ couch_batch_save.erl \
+ couch_batch_save_sup.erl \
couch_config.erl \
couch_config_writer.erl \
couch_db.erl \
@@ -83,6 +85,8 @@ EXTRA_DIST = $(source_files) couch_db.hrl couch_stats.hrl
compiled_files = \
couch.app \
couch_btree.beam \
+ couch_batch_save.beam \
+ couch_batch_save_sup.beam \
couch_config.beam \
couch_config_writer.beam \
couch_db.beam \
diff --git a/src/couchdb/couch_batch_save.erl b/src/couchdb/couch_batch_save.erl
new file mode 100644
index 00000000..43a6f2dd
--- /dev/null
+++ b/src/couchdb/couch_batch_save.erl
@@ -0,0 +1,267 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_batch_save).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/2, eventually_save_doc/3, commit_now/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+-record(batch_state, {
+ batch_size=1000,
+ batch_interval=1000
+ }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link(BatchSize, BatchInterval) ->
+ gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []).
+
+%%--------------------------------------------------------------------
+%% Function: commit_doc(Doc) -> committed
+%% Description: Puts the doc into the set to commit. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+eventually_save_doc(DbName, Doc, UserCtx) ->
+ % find or create a process for the {DbName, UserCtx} pair
+ {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx),
+ % hand it the document
+ ?LOG_DEBUG("sending doc to batch ~p",[Pid]),
+ ok = send_doc_to_batch(Pid, Doc).
+
+%%--------------------------------------------------------------------
+%% Function: commit_now(DbName) -> committed
+%% Description: Commits all docs for the DB. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+commit_now(DbName, UserCtx) ->
+ % find the process for the {DbName, UserCtx} pair
+ {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false),
+ case Pid of
+ none -> committed;
+ _Else ->
+ ok = send_commit(Pid),
+ committed
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: commit_now() -> committed
+%% Description: Commits all docs for all DBs. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+% commit_all() ->
+% committed = gen_server:call(couch_batch_save, commit_now, infinity).
+%
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init([BatchSize, BatchInterval]) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server with the meanings
+%%--------------------------------------------------------------------
+init([BatchSize, BatchInterval]) ->
+ ets:new(couch_batch_save_by_db, [set, public, named_table]),
+ {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{
+ batch_size=BatchSize,
+ batch_interval=BatchInterval
+ }=State) ->
+ % Create the pid in a serialized process.
+ % We checked before to see that we need the Pid, but the check is outside
+ % the gen_server for parellelism. We check again here to ensure we don't
+ % make a duplicate.
+ Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
+ [{_, Pid}] ->
+ % we have a pid
+ {ok, Pid};
+ [] ->
+ % no match
+ % start and record the doc collector process
+ ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]),
+ Pid = spawn_link(fun() ->
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new)
+ end),
+ true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}),
+ {ok, Pid}
+ end,
+ {reply, Resp, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+% handle_info({'EXIT', Pid, Reason}, State) ->
+% {noreply, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ % todo shutdown the interval loop
+ % todo kill all the Pids and drop the ets table
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+commit_user_docs(_DbName, _UserCtx, []) ->
+ {ok, []};
+
+commit_user_docs(DbName, UserCtx, Docs) ->
+ ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]),
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ try
+ {ok, Revs} = couch_db:update_docs(Db, Docs),
+ ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]),
+ {ok, Revs}
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+% spawned to trigger commits on an interval
+commit_every_ms(Pid, BatchInterval) ->
+ receive
+ after BatchInterval ->
+ ok = send_commit(Pid),
+ commit_every_ms(Pid, BatchInterval)
+ end.
+
+send_commit(Pid) ->
+ Pid ! {self(), commit},
+ receive
+ {Pid, committed} ->
+ ok
+ end.
+
+batch_pid_for_db_and_user(DbName, UserCtx) ->
+ batch_pid_for_db_and_user(DbName, UserCtx, true).
+
+batch_pid_for_db_and_user(DbName, UserCtx, Create) ->
+ % look in the ets table
+ case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
+ [{_, Pid}] ->
+ % we have a pid
+ {ok, Pid};
+ [] ->
+ % no match
+ if Create ->
+ {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity),
+ {ok, Pid};
+ true ->
+ {ok, none}
+ end
+ end.
+
+send_doc_to_batch(Pid, Doc) ->
+ Pid ! {self(), add_doc, Doc},
+ receive
+ {Pid, doc_added} -> ok
+ end.
+
+% the loop that holds documents between commits
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) ->
+ % start a process that triggers commit every BatchInterval milliseconds
+ _IntervalPid = spawn_link(fun() -> commit_every_ms(self(), BatchInterval) end),
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []);
+
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize->
+ collector_commit(DbName, UserCtx, BatchInterval, Docs),
+ exit(normal);
+
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) ->
+ receive
+ {From, add_doc, Doc} ->
+ From ! {self(), doc_added},
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]);
+ {From, commit} ->
+ collector_commit(DbName, UserCtx, BatchInterval, Docs),
+ From ! {self(), committed},
+ exit(normal)
+ end.
+
+collector_commit(DbName, UserCtx, BatchInterval, Docs) ->
+ % unregister
+ unregister_collector(DbName, UserCtx, self()),
+ % wait and collect
+ Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs),
+ {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2).
+
+unregister_collector(DbName, UserCtx, Pid) ->
+ % remove from ets
+ ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}).
+
+shutdown_collector(DbName, UserCtx, BatchInterval, Docs) ->
+ receive
+ {From, add_doc, Doc} ->
+ From ! {self(), doc_added},
+ shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs])
+ % this interval will be waited for each time ensure-full-commit is called
+ after BatchInterval ->
+ Docs
+ end. \ No newline at end of file
diff --git a/src/couchdb/couch_batch_save_sup.erl b/src/couchdb/couch_batch_save_sup.erl
new file mode 100644
index 00000000..42cf1aba
--- /dev/null
+++ b/src/couchdb/couch_batch_save_sup.erl
@@ -0,0 +1,37 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_batch_save_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0,init/1]).
+
+start_link() ->
+ supervisor:start_link({local, couch_batch_save_sup},
+ couch_batch_save_sup, []).
+
+init([]) ->
+ Self = self(),
+ ok = couch_config:register(
+ fun("couchdb", _) ->
+ exit(Self, reload_config)
+ end),
+
+ BatchSize = list_to_integer(couch_config:get("couchdb",
+ "batch_save_size","1000")),
+ BatchInterval = list_to_integer(couch_config:get("couchdb",
+ "batch_save_interval","1000")),
+
+ Batch = {batch, {couch_batch_save, start_link, [BatchSize, BatchInterval]},
+ permanent, 1000, worker, [batch_save]},
+ {ok, {{one_for_one, 10, 3600}, [Batch]}}.
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 4fa5a9b6..29430cb6 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -102,19 +102,34 @@ db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) ->
db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) ->
Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req)),
DocId = couch_util:new_uuid(),
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []),
- DocUrl = absolute_uri(Req,
- binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)),
- send_json(Req, 201, [{"Location", DocUrl}], {[
- {ok, true},
- {id, DocId},
- {rev, couch_doc:rev_to_str(NewRev)}
- ]});
+ case couch_httpd:qs_value(Req, "batch") of
+ "ok" ->
+ % batch
+ ok = couch_batch_save:eventually_save_doc(Db#db.name,
+ Doc#doc{id=DocId}, Db#db.user_ctx),
+ send_json(Req, 202, [], {[
+ {ok, true},
+ {id, DocId}
+ ]});
+ _Normal ->
+ % normal
+ {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []),
+ DocUrl = absolute_uri(Req,
+ binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)),
+ send_json(Req, 201, [{"Location", DocUrl}], {[
+ {ok, true},
+ {id, DocId},
+ {rev, couch_doc:rev_to_str(NewRev)}
+ ]})
+ end;
+
db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) ->
+ % make the batch save
+ committed = couch_batch_save:commit_now(Db#db.name, Db#db.user_ctx),
{ok, DbStartTime} = couch_db:ensure_full_commit(Db),
send_json(Req, 201, {[
{ok, true},
@@ -527,9 +542,21 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
couch_doc:validate_docid(DocId),
- Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)),
- update_doc(Req, Db, DocId, couch_httpd:json_body(Req),
- [{"Location", Location}]);
+ Json = couch_httpd:json_body(Req),
+ case couch_httpd:qs_value(Req, "batch") of
+ "ok" ->
+ % batch
+ Doc = couch_doc_from_req(Req, DocId, Json),
+ ok = couch_batch_save:eventually_save_doc(Db#db.name, Doc, Db#db.user_ctx),
+ send_json(Req, 202, [], {[
+ {ok, true},
+ {id, DocId}
+ ]});
+ _Normal ->
+ % normal
+ Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)),
+ update_doc(Req, Db, DocId, Json, [{"Location", Location}])
+ end;
db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) ->
SourceRev =
@@ -565,19 +592,7 @@ update_doc(Req, Db, DocId, Json) ->
update_doc(Req, Db, DocId, Json, []).
update_doc(Req, Db, DocId, Json, Headers) ->
- #doc{deleted=Deleted} = Doc = couch_doc:from_json_obj(Json),
- validate_attachment_names(Doc),
- ExplicitDocRev =
- case Doc#doc.revs of
- {Start,[RevId|_]} -> {Start, RevId};
- _ -> undefined
- end,
- case extract_header_rev(Req, ExplicitDocRev) of
- missing_rev ->
- Revs = {0, []};
- {Pos, Rev} ->
- Revs = {Pos, [Rev]}
- end,
+ #doc{deleted=Deleted} = Doc = couch_doc_from_req(Req, DocId, Json),
case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
"true" ->
@@ -585,7 +600,7 @@ update_doc(Req, Db, DocId, Json, Headers) ->
_ ->
Options = []
end,
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
+ {ok, NewRev} = couch_db:update_doc(Db, Doc, Options),
NewRevStr = couch_doc:rev_to_str(NewRev),
ResponseHeaders = [{"Etag", <<"\"", NewRevStr/binary, "\"">>}] ++ Headers,
send_json(Req, if Deleted -> 200; true -> 201 end,
@@ -594,6 +609,23 @@ update_doc(Req, Db, DocId, Json, Headers) ->
{id, DocId},
{rev, NewRevStr}]}).
+couch_doc_from_req(Req, DocId, Json) ->
+ Doc = couch_doc:from_json_obj(Json),
+ validate_attachment_names(Doc),
+ ExplicitDocRev =
+ case Doc#doc.revs of
+ {Start,[RevId|_]} -> {Start, RevId};
+ _ -> undefined
+ end,
+ case extract_header_rev(Req, ExplicitDocRev) of
+ missing_rev ->
+ Revs = {0, []};
+ {Pos, Rev} ->
+ Revs = {Pos, [Rev]}
+ end,
+ Doc#doc{id=DocId, revs=Revs}.
+
+
% Useful for debugging
% couch_doc_open(Db, DocId) ->
% couch_doc_open(Db, DocId, [], []).