summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_rep.erl31
-rw-r--r--src/couchdb/couch_rep_reader.erl1
2 files changed, 27 insertions, 5 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 95a23869..9107918c 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -15,7 +15,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/2]).
+-export([replicate/2, checkpoint/1]).
-include("couch_db.hrl").
@@ -28,6 +28,7 @@
source,
target,
init_args,
+ checkpoint_scheduled = nil,
start_seq,
history,
@@ -73,6 +74,9 @@ replicate({Props}=PostBody, UserCtx) ->
get_result(Server, PostBody, UserCtx)
end.
+checkpoint(Server) ->
+ gen_server:cast(Server, do_checkpoint).
+
get_result(Server, PostBody, UserCtx) ->
try gen_server:call(Server, get_result, infinity) of
retry -> replicate(PostBody, UserCtx);
@@ -137,6 +141,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
target = Target,
init_args = InitArgs,
stats = Stats,
+ checkpoint_scheduled = nil,
start_seq = StartSeq,
history = History,
@@ -154,19 +159,22 @@ handle_call(get_result, From, State) ->
Listeners = State#state.listeners,
{noreply, State#state{listeners=[From|Listeners]}}.
+handle_cast(do_checkpoint, State) ->
+ {noreply, do_checkpoint(State)};
+
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
- {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+ {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
when SourceSeq > N ->
MissingRevs = State#state.missing_revs,
ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
couch_task_status:update("W Processed source update #~p", [SourceSeq]),
- {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})};
+ {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, _}, State) ->
{noreply, State};
@@ -190,6 +198,7 @@ handle_info({'EXIT', Pid, Reason}, State) ->
terminate(normal, State) ->
#state{
+ checkpoint_scheduled = TRef,
checkpoint_history = CheckpointHistory,
committed_seq = NewSeq,
listeners = Listeners,
@@ -197,7 +206,8 @@ terminate(normal, State) ->
target = Target,
stats = Stats,
source_log = #doc{body={OldHistory}}
- } = State,
+ } = do_checkpoint(State),
+ timer:cancel(TRef),
couch_task_status:update("Finishing"),
ets:delete(Stats),
close_db(Target),
@@ -414,6 +424,18 @@ open_db(<<DbName/binary>>, UserCtx) ->
{not_found, no_db_file} -> throw({db_not_found, DbName})
end.
+schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
+ Server = self(),
+ case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of
+ {ok, TRef} ->
+ State#state{checkpoint_scheduled = TRef};
+ Error ->
+ ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]),
+ State
+ end;
+schedule_checkpoint(State) ->
+ State.
+
do_checkpoint(State) ->
#state{
source = Source,
@@ -466,6 +488,7 @@ do_checkpoint(State) ->
{TgtRevPos,TgtRevId} =
update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
State#state{
+ checkpoint_scheduled = nil,
checkpoint_history = NewRepHistory,
source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index 4feb3804..735ab742 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -130,7 +130,6 @@ handle_next_docs(_From, State) ->
gen_server:reply(ReaderFrom, ok);
true -> ok end,
NewState = State#state{count=0, reader_from=nil, docs=queue:new()},
- ?LOG_INFO("replying to next_docs with HighSeq ~p", [calculate_new_high_seq(State)]),
{reply, {calculate_new_high_seq(State), queue:to_list(Docs)}, NewState}.
handle_open_remote_doc(Id, Seq, Revs, From, #state{monitor_count=N} = State)