Added batching of multiple updating requests, to improve throughput with many writers. Also removed the couch_batch_save module, now batch requests are simply saved async as immediately, batching with outhr updates if possible.
-%% API
--export([start_link/2, eventually_save_doc/3, commit_now/2]).
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
--record(batch_state, {
- batch_size=1000,
- batch_interval=1000
- }).
-%% API
-%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
-%% Description: Starts the server
-start_link(BatchSize, BatchInterval) ->
- gen_server:start_link({local, couch_batch_save}, ?MODULE, [BatchSize, BatchInterval], []).
-%% Function: commit_doc(Doc) -> committed
-%% Description: Puts the doc into the set to commit. Does not reply until
-%% the commit is complete.
-eventually_save_doc(DbName, Doc, UserCtx) ->
- % find or create a process for the {DbName, UserCtx} pair
- {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx),
- % hand it the document
- ?LOG_DEBUG("sending doc to batch ~p",[Pid]),
- ok = send_doc_to_batch(Pid, Doc).
-%% Function: commit_now(DbName) -> committed
-%% Description: Commits all docs for the DB. Does not reply until
-%% the commit is complete.
-commit_now(DbName, UserCtx) ->
- % find the process for the {DbName, UserCtx} pair
- {ok, Pid} = batch_pid_for_db_and_user(DbName, UserCtx, false),
- case Pid of
- none -> committed;
- _Else ->
- ok = send_commit(Pid),
- committed
- end.
-%% Function: commit_now() -> committed
-%% Description: Commits all docs for all DBs. Does not reply until
-%% the commit is complete.
-% commit_all() ->
-% committed = gen_server:call(couch_batch_save, commit_now, infinity).
-%% gen_server callbacks
-%% Function: init([BatchSize, BatchInterval]) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server with the meanings
-init([BatchSize, BatchInterval]) ->
- ets:new(couch_batch_save_by_db, [set, public, named_table]),
- {ok, #batch_state{batch_size=BatchSize, batch_interval=BatchInterval}}.
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-handle_call({make_pid, DbName, UserCtx}, _From, #batch_state{
- batch_size=BatchSize,
- batch_interval=BatchInterval
- }=State) ->
- % Create the pid in a serialized process.
- % We checked before to see that we need the Pid, but the check is outside
- % the gen_server for parellelism. We check again here to ensure we don't
- % make a duplicate.
- Resp = case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
- [{_, Pid}] ->
- % we have a pid
- {ok, Pid};
- [] ->
- % no match
- % start and record the doc collector process
- ?LOG_DEBUG("making a batch pid ~p",[{DbName, UserCtx}]),
- Pid = spawn_link(fun() ->
- doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new)
- end),
- true = ets:insert_new(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}),
- {ok, Pid}
- end,
- {reply, Resp, State}.
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-handle_cast(_Msg, State) ->
- {noreply, State}.
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-% handle_info({'EXIT', Pid, Reason}, State) ->
-% {noreply, State};
-handle_info(_Info, State) ->
- {noreply, State}.
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-terminate(_Reason, _State) ->
- % todo shutdown the interval loop
- % todo kill all the Pids and drop the ets table
- ok.
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-%%% Internal functions
-commit_user_docs(_DbName, _UserCtx, []) ->
- {ok, []};
-commit_user_docs(DbName, UserCtx, Docs) ->
- ?LOG_INFO("Committing ~p batch docs to ~p",[length(Docs), DbName]),
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} ->
- try
- {ok, Revs} = couch_db:update_docs(Db, Docs),
- ?LOG_INFO("Committed ~p batch docs to ~p",[length(Docs), DbName]),
- {ok, Revs}
- after
- couch_db:close(Db)
- end;
- Error ->
- throw(Error)
- end.
-% spawned to trigger commits on an interval
-commit_every_ms(Pid, BatchInterval) ->
- receive
- after BatchInterval ->
- ok = send_commit(Pid),
- commit_every_ms(Pid, BatchInterval)
- end.
-send_commit(Pid) ->
- Pid ! {self(), commit},
- receive
- {Pid, committed} ->
- ok;
- {'DOWN', _, _, Pid, _} ->
- exit(normal)
- end.
-batch_pid_for_db_and_user(DbName, UserCtx) ->
- batch_pid_for_db_and_user(DbName, UserCtx, true).
-batch_pid_for_db_and_user(DbName, UserCtx, Create) ->
- % look in the ets table
- case ets:lookup(couch_batch_save_by_db, {DbName,UserCtx}) of
- [{_, Pid}] ->
- % we have a pid
- {ok, Pid};
- [] ->
- % no match
- if Create ->
- {ok, Pid} = gen_server:call(couch_batch_save, {make_pid, DbName, UserCtx}, infinity),
- {ok, Pid};
- true ->
- {ok, none}
- end
- end.
-send_doc_to_batch(Pid, Doc) ->
- Pid ! {self(), add_doc, Doc},
- receive
- {Pid, doc_added} -> ok
- end.
-% the loop that holds documents between commits
-doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, new) ->
- % start a process that triggers commit every BatchInterval milliseconds
- Me = self(),
- spawn_link(fun() ->
- erlang:monitor(process, Me),
- commit_every_ms(Me, BatchInterval)
- end),
- doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, []);
-doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) when length(Docs) >= BatchSize->
- collector_commit(DbName, UserCtx, BatchInterval, Docs),
- exit(normal);
-doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, Docs) ->
- receive
- {From, add_doc, Doc} ->
- From ! {self(), doc_added},
- doc_collector(DbName, UserCtx, {BatchSize, BatchInterval}, [Doc|Docs]);
- {From, commit} ->
- collector_commit(DbName, UserCtx, BatchInterval, Docs),
- From ! {self(), committed},
- exit(normal)
- end.
-collector_commit(DbName, UserCtx, BatchInterval, Docs) ->
- % unregister
- unregister_collector(DbName, UserCtx, self()),
- % wait and collect
- Docs2 = shutdown_collector(DbName, UserCtx, BatchInterval, Docs),
- {ok, _Revs} = commit_user_docs(DbName, UserCtx, Docs2).
-unregister_collector(DbName, UserCtx, Pid) ->
- % remove from ets
- ets:delete_object(couch_batch_save_by_db, {{DbName, UserCtx}, Pid}).
-shutdown_collector(DbName, UserCtx, BatchInterval, Docs) ->
- receive
- {From, add_doc, Doc} ->
- From ! {self(), doc_added},
- shutdown_collector(DbName, UserCtx, BatchInterval, [Doc|Docs])
- % this interval will be waited for each time ensure-full-commit is called
- after BatchInterval ->
- Docs
- end.