summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2009-05-04 19:59:39 +0000
committerJohn Christopher Anderson <jchris@apache.org>2009-05-04 19:59:39 +0000
commit7c05a60479bacc7acbf6f704285a4ab2981ba02b (patch)
treefe781f60e81fdabf751c48eaa91230ef0396a189 /src/couchdb
parent0b878888d10638ec2f2d6691c54c3aad0b4faf9e (diff)
Use batch=ok query param for document PUT and POST to defer index updates until a threshold of documents (or amount of time) has been passed.
This option returns a 202 Accepted response instead of a 201 Created, so do not use it for applications which require all data to be saved safely to disk. It is ideal for applications like logging where losing some events in a crash will be ok. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@771418 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/Makefile.am4
-rw-r--r--src/couchdb/couch_batch_save.erl267
-rw-r--r--src/couchdb/couch_batch_save_sup.erl37
-rw-r--r--src/couchdb/couch_httpd_db.erl82
4 files changed, 365 insertions, 25 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 8b782221..7105300b 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -41,6 +41,8 @@ CLEANFILES = $(compiled_files) $(doc_base)
source_files = \
couch_btree.erl \
+ couch_batch_save.erl \
+ couch_batch_save_sup.erl \
couch_config.erl \
couch_config_writer.erl \
couch_db.erl \
@@ -83,6 +85,8 @@ EXTRA_DIST = $(source_files) couch_db.hrl couch_stats.hrl
compiled_files = \
couch.app \
couch_btree.beam \
+ couch_batch_save.beam \
+ couch_batch_save_sup.beam \
couch_config.beam \
couch_config_writer.beam \
couch_db.beam \
diff --git a/src/couchdb/couch_batch_save.erl b/src/couchdb/couch_batch_save.erl
new file mode 100644
index 00000000..43a6f2dd
--- /dev/null
+++ b/src/couchdb/couch_batch_save.erl
@@ -0,0 +1,267 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_batch_save).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/2, eventually_save_doc/3, commit_now/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+-record(batch_state, {
+ batch_size=1000,
+ batch_interval=1000
+ }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link(BatchSize, BatchInterval) ->
+ gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []).
+
+%%--------------------------------------------------------------------
+%% Function: commit_doc(Doc) -> committed
+%% Description: Puts the doc into the set to commit. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+eventually_save_doc(DbName, Doc, UserCtx) ->
+ % find or create a process for the {DbName, UserCtx} pair
+ {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx),
+ % hand it the document
+ ?LOG_DEBUG("sending doc to batch ~p",[Pid]),
+ ok = send_doc_to_batch(Pid, Doc).
+
+%%--------------------------------------------------------------------
+%% Function: commit_now(DbName) -> committed
+%% Description: Commits all docs for the DB. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+commit_now(DbName, UserCtx) ->
+ % find the process for the {DbName, UserCtx} pair
+ {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false),
+ case Pid of
+ none -> committed;
+ _Else ->
+ ok = send_commit(Pid),
+ committed
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: commit_now() -> committed
+%% Description: Commits all docs for all DBs. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+% commit_all() ->
+% committed = gen_server:call(couch_batch_save, commit_now, infinity).
+%
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init([BatchSize, BatchInterval]) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server with the meanings
+%%--------------------------------------------------------------------
+init([BatchSize, BatchInterval]) ->
+ ets:new(couch_batch_save_by_db, [set, public, named_table]),
+ {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{
+ batch_size=BatchSize,
+ batch_interval=BatchInterval
+ }=State) ->
+ % Create the pid in a serialized process.
+ % We checked before to see that we need the Pid, but the check is outside
+ % the gen_server for parellelism. We check again here to ensure we don't
+ % make a duplicate.
+ Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
+ [{_, Pid}] ->
+ % we have a pid
+ {ok, Pid};
+ [] ->
+ % no match
+ % start and record the doc collector process
+ ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]),
+ Pid = spawn_link(fun() ->
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new)
+ end),
+ true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}),
+ {ok, Pid}
+ end,
+ {reply, Resp, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+% handle_info({'EXIT', Pid, Reason}, State) ->
+% {noreply, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ % todo shutdown the interval loop
+ % todo kill all the Pids and drop the ets table
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+commit_user_docs(_DbName, _UserCtx, []) ->
+ {ok, []};
+
+commit_user_docs(DbName, UserCtx, Docs) ->
+ ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]),
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ try
+ {ok, Revs} = couch_db:update_docs(Db, Docs),
+ ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]),
+ {ok, Revs}
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+% spawned to trigger commits on an interval
+commit_every_ms(Pid, BatchInterval) ->
+ receive
+ after BatchInterval ->
+ ok = send_commit(Pid),
+ commit_every_ms(Pid, BatchInterval)
+ end.
+
+send_commit(Pid) ->
+ Pid ! {self(), commit},
+ receive
+ {Pid, committed} ->
+ ok
+ end.
+
+batch_pid_for_db_and_user(DbName, UserCtx) ->
+ batch_pid_for_db_and_user(DbName, UserCtx, true).
+
+batch_pid_for_db_and_user(DbName, UserCtx, Create) ->
+ % look in the ets table
+ case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
+ [{_, Pid}] ->
+ % we have a pid
+ {ok, Pid};
+ [] ->
+ % no match
+ if Create ->
+ {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity),
+ {ok, Pid};
+ true ->
+ {ok, none}
+ end
+ end.
+
+send_doc_to_batch(Pid, Doc) ->
+ Pid ! {self(), add_doc, Doc},
+ receive
+ {Pid, doc_added} -> ok
+ end.
+
+% the loop that holds documents between commits
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) ->
+ % start a process that triggers commit every BatchInterval milliseconds
+ _IntervalPid = spawn_link(fun() -> commit_every_ms(self(), BatchInterval) end),
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []);
+
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize->
+ collector_commit(DbName, UserCtx, BatchInterval, Docs),
+ exit(normal);
+
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) ->
+ receive
+ {From, add_doc, Doc} ->
+ From ! {self(), doc_added},
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]);
+ {From, commit} ->
+ collector_commit(DbName, UserCtx, BatchInterval, Docs),
+ From ! {self(), committed},
+ exit(normal)
+ end.
+
+collector_commit(DbName, UserCtx, BatchInterval, Docs) ->
+ % unregister
+ unregister_collector(DbName, UserCtx, self()),
+ % wait and collect
+ Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs),
+ {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2).
+
+unregister_collector(DbName, UserCtx, Pid) ->
+ % remove from ets
+ ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}).
+
+shutdown_collector(DbName, UserCtx, BatchInterval, Docs) ->
+ receive
+ {From, add_doc, Doc} ->
+ From ! {self(), doc_added},
+ shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs])
+ % this interval will be waited for each time ensure-full-commit is called
+ after BatchInterval ->
+ Docs
+ end. \ No newline at end of file
diff --git a/src/couchdb/couch_batch_save_sup.erl b/src/couchdb/couch_batch_save_sup.erl
new file mode 100644
index 00000000..42cf1aba
--- /dev/null
+++ b/src/couchdb/couch_batch_save_sup.erl
@@ -0,0 +1,37 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_batch_save_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/0,init/1]).
+
+start_link() ->
+ supervisor:start_link({local, couch_batch_save_sup},
+ couch_batch_save_sup, []).
+
+init([]) ->
+ Self = self(),
+ ok = couch_config:register(
+ fun("couchdb", _) ->
+ exit(Self, reload_config)
+ end),
+
+ BatchSize = list_to_integer(couch_config:get("couchdb",
+ "batch_save_size","1000")),
+ BatchInterval = list_to_integer(couch_config:get("couchdb",
+ "batch_save_interval","1000")),
+
+ Batch = {batch, {couch_batch_save, start_link, [BatchSize, BatchInterval]},
+ permanent, 1000, worker, [batch_save]},
+ {ok, {{one_for_one, 10, 3600}, [Batch]}}.
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 4fa5a9b6..29430cb6 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -102,19 +102,34 @@ db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) ->
db_req(#httpd{method='POST',path_parts=[DbName]}=Req, Db) ->
Doc = couch_doc:from_json_obj(couch_httpd:json_body(Req)),
DocId = couch_util:new_uuid(),
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []),
- DocUrl = absolute_uri(Req,
- binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)),
- send_json(Req, 201, [{"Location", DocUrl}], {[
- {ok, true},
- {id, DocId},
- {rev, couch_doc:rev_to_str(NewRev)}
- ]});
+ case couch_httpd:qs_value(Req, "batch") of
+ "ok" ->
+ % batch
+ ok = couch_batch_save:eventually_save_doc(Db#db.name,
+ Doc#doc{id=DocId}, Db#db.user_ctx),
+ send_json(Req, 202, [], {[
+ {ok, true},
+ {id, DocId}
+ ]});
+ _Normal ->
+ % normal
+ {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId}, []),
+ DocUrl = absolute_uri(Req,
+ binary_to_list(<<"/",DbName/binary,"/",DocId/binary>>)),
+ send_json(Req, 201, [{"Location", DocUrl}], {[
+ {ok, true},
+ {id, DocId},
+ {rev, couch_doc:rev_to_str(NewRev)}
+ ]})
+ end;
+
db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
db_req(#httpd{method='POST',path_parts=[_,<<"_ensure_full_commit">>]}=Req, Db) ->
+ % make the batch save
+ committed = couch_batch_save:commit_now(Db#db.name, Db#db.user_ctx),
{ok, DbStartTime} = couch_db:ensure_full_commit(Db),
send_json(Req, 201, {[
{ok, true},
@@ -527,9 +542,21 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
couch_doc:validate_docid(DocId),
- Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)),
- update_doc(Req, Db, DocId, couch_httpd:json_body(Req),
- [{"Location", Location}]);
+ Json = couch_httpd:json_body(Req),
+ case couch_httpd:qs_value(Req, "batch") of
+ "ok" ->
+ % batch
+ Doc = couch_doc_from_req(Req, DocId, Json),
+ ok = couch_batch_save:eventually_save_doc(Db#db.name, Doc, Db#db.user_ctx),
+ send_json(Req, 202, [], {[
+ {ok, true},
+ {id, DocId}
+ ]});
+ _Normal ->
+ % normal
+ Location = absolute_uri(Req, "/" ++ ?b2l(Db#db.name) ++ "/" ++ ?b2l(DocId)),
+ update_doc(Req, Db, DocId, Json, [{"Location", Location}])
+ end;
db_doc_req(#httpd{method='COPY'}=Req, Db, SourceDocId) ->
SourceRev =
@@ -565,19 +592,7 @@ update_doc(Req, Db, DocId, Json) ->
update_doc(Req, Db, DocId, Json, []).
update_doc(Req, Db, DocId, Json, Headers) ->
- #doc{deleted=Deleted} = Doc = couch_doc:from_json_obj(Json),
- validate_attachment_names(Doc),
- ExplicitDocRev =
- case Doc#doc.revs of
- {Start,[RevId|_]} -> {Start, RevId};
- _ -> undefined
- end,
- case extract_header_rev(Req, ExplicitDocRev) of
- missing_rev ->
- Revs = {0, []};
- {Pos, Rev} ->
- Revs = {Pos, [Rev]}
- end,
+ #doc{deleted=Deleted} = Doc = couch_doc_from_req(Req, DocId, Json),
case couch_httpd:header_value(Req, "X-Couch-Full-Commit", "false") of
"true" ->
@@ -585,7 +600,7 @@ update_doc(Req, Db, DocId, Json, Headers) ->
_ ->
Options = []
end,
- {ok, NewRev} = couch_db:update_doc(Db, Doc#doc{id=DocId, revs=Revs}, Options),
+ {ok, NewRev} = couch_db:update_doc(Db, Doc, Options),
NewRevStr = couch_doc:rev_to_str(NewRev),
ResponseHeaders = [{"Etag", <<"\"", NewRevStr/binary, "\"">>}] ++ Headers,
send_json(Req, if Deleted -> 200; true -> 201 end,
@@ -594,6 +609,23 @@ update_doc(Req, Db, DocId, Json, Headers) ->
{id, DocId},
{rev, NewRevStr}]}).
+couch_doc_from_req(Req, DocId, Json) ->
+ Doc = couch_doc:from_json_obj(Json),
+ validate_attachment_names(Doc),
+ ExplicitDocRev =
+ case Doc#doc.revs of
+ {Start,[RevId|_]} -> {Start, RevId};
+ _ -> undefined
+ end,
+ case extract_header_rev(Req, ExplicitDocRev) of
+ missing_rev ->
+ Revs = {0, []};
+ {Pos, Rev} ->
+ Revs = {Pos, [Rev]}
+ end,
+ Doc#doc{id=DocId, revs=Revs}.
+
+
% Useful for debugging
% couch_doc_open(Db, DocId) ->
% couch_doc_open(Db, DocId, [], []).