summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_batch_save.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_batch_save.erl')
-rw-r--r--src/couchdb/couch_batch_save.erl267
1 files changed, 267 insertions, 0 deletions
diff --git a/src/couchdb/couch_batch_save.erl b/src/couchdb/couch_batch_save.erl
new file mode 100644
index 00000000..43a6f2dd
--- /dev/null
+++ b/src/couchdb/couch_batch_save.erl
@@ -0,0 +1,267 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_batch_save).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/2, eventually_save_doc/3, commit_now/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+-record(batch_state, {
+ batch_size=1000,
+ batch_interval=1000
+ }).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link(BatchSize, BatchInterval) ->
+ gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []).
+
+%%--------------------------------------------------------------------
+%% Function: commit_doc(Doc) -> committed
+%% Description: Puts the doc into the set to commit. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+eventually_save_doc(DbName, Doc, UserCtx) ->
+ % find or create a process for the {DbName, UserCtx} pair
+ {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx),
+ % hand it the document
+ ?LOG_DEBUG("sending doc to batch ~p",[Pid]),
+ ok = send_doc_to_batch(Pid, Doc).
+
+%%--------------------------------------------------------------------
+%% Function: commit_now(DbName) -> committed
+%% Description: Commits all docs for the DB. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+commit_now(DbName, UserCtx) ->
+ % find the process for the {DbName, UserCtx} pair
+ {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false),
+ case Pid of
+ none -> committed;
+ _Else ->
+ ok = send_commit(Pid),
+ committed
+ end.
+
+%%--------------------------------------------------------------------
+%% Function: commit_now() -> committed
+%% Description: Commits all docs for all DBs. Does not reply until
+%% the commit is complete.
+%%--------------------------------------------------------------------
+% commit_all() ->
+% committed = gen_server:call(couch_batch_save, commit_now, infinity).
+%
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init([BatchSize, BatchInterval]) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server with the meanings
+%%--------------------------------------------------------------------
+init([BatchSize, BatchInterval]) ->
+ ets:new(couch_batch_save_by_db, [set, public, named_table]),
+ {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{
+ batch_size=BatchSize,
+ batch_interval=BatchInterval
+ }=State) ->
+ % Create the pid in a serialized process.
+ % We checked before to see that we need the Pid, but the check is outside
+ % the gen_server for parellelism. We check again here to ensure we don't
+ % make a duplicate.
+ Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
+ [{_, Pid}] ->
+ % we have a pid
+ {ok, Pid};
+ [] ->
+ % no match
+ % start and record the doc collector process
+ ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]),
+ Pid = spawn_link(fun() ->
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new)
+ end),
+ true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}),
+ {ok, Pid}
+ end,
+ {reply, Resp, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+% handle_info({'EXIT', Pid, Reason}, State) ->
+% {noreply, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ % todo shutdown the interval loop
+ % todo kill all the Pids and drop the ets table
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+commit_user_docs(_DbName, _UserCtx, []) ->
+ {ok, []};
+
+commit_user_docs(DbName, UserCtx, Docs) ->
+ ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]),
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ try
+ {ok, Revs} = couch_db:update_docs(Db, Docs),
+ ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]),
+ {ok, Revs}
+ after
+ couch_db:close(Db)
+ end;
+ Error ->
+ throw(Error)
+ end.
+
+% spawned to trigger commits on an interval
+commit_every_ms(Pid, BatchInterval) ->
+ receive
+ after BatchInterval ->
+ ok = send_commit(Pid),
+ commit_every_ms(Pid, BatchInterval)
+ end.
+
+send_commit(Pid) ->
+ Pid ! {self(), commit},
+ receive
+ {Pid, committed} ->
+ ok
+ end.
+
+batch_pid_for_db_and_user(DbName, UserCtx) ->
+ batch_pid_for_db_and_user(DbName, UserCtx, true).
+
+batch_pid_for_db_and_user(DbName, UserCtx, Create) ->
+ % look in the ets table
+ case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
+ [{_, Pid}] ->
+ % we have a pid
+ {ok, Pid};
+ [] ->
+ % no match
+ if Create ->
+ {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity),
+ {ok, Pid};
+ true ->
+ {ok, none}
+ end
+ end.
+
+send_doc_to_batch(Pid, Doc) ->
+ Pid ! {self(), add_doc, Doc},
+ receive
+ {Pid, doc_added} -> ok
+ end.
+
+% the loop that holds documents between commits
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) ->
+ % start a process that triggers commit every BatchInterval milliseconds
+ _IntervalPid = spawn_link(fun() -> commit_every_ms(self(), BatchInterval) end),
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []);
+
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize->
+ collector_commit(DbName, UserCtx, BatchInterval, Docs),
+ exit(normal);
+
+doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) ->
+ receive
+ {From, add_doc, Doc} ->
+ From ! {self(), doc_added},
+ doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]);
+ {From, commit} ->
+ collector_commit(DbName, UserCtx, BatchInterval, Docs),
+ From ! {self(), committed},
+ exit(normal)
+ end.
+
+collector_commit(DbName, UserCtx, BatchInterval, Docs) ->
+ % unregister
+ unregister_collector(DbName, UserCtx, self()),
+ % wait and collect
+ Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs),
+ {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2).
+
+unregister_collector(DbName, UserCtx, Pid) ->
+ % remove from ets
+ ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}).
+
+shutdown_collector(DbName, UserCtx, BatchInterval, Docs) ->
+ receive
+ {From, add_doc, Doc} ->
+ From ! {self(), doc_added},
+ shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs])
+ % this interval will be waited for each time ensure-full-commit is called
+ after BatchInterval ->
+ Docs
+ end. \ No newline at end of file