summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_rep.erl78
1 files changed, 42 insertions, 36 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 9107918c..41b3cea2 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -171,6 +171,7 @@ handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
when SourceSeq > N ->
+ ?LOG_DEBUG("SourceSeq ~p N ~p", [SourceSeq, N]),
MissingRevs = State#state.missing_revs,
ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
couch_task_status:update("W Processed source update #~p", [SourceSeq]),
@@ -196,43 +197,12 @@ handle_info({'EXIT', Pid, Reason}, State) ->
?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]),
{stop, Reason, State}.
-terminate(normal, State) ->
- #state{
- checkpoint_scheduled = TRef,
- checkpoint_history = CheckpointHistory,
- committed_seq = NewSeq,
- listeners = Listeners,
- source = Source,
- target = Target,
- stats = Stats,
- source_log = #doc{body={OldHistory}}
- } = do_checkpoint(State),
- timer:cancel(TRef),
- couch_task_status:update("Finishing"),
- ets:delete(Stats),
- close_db(Target),
+terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
+ do_terminate(State);
- NewRepHistory = case CheckpointHistory of
- nil ->
- {[{<<"no_changes">>, true} | OldHistory]};
- _Else ->
- CheckpointHistory
- end,
-
- %% reply to original requester
- [Original|OtherListeners] = lists:reverse(Listeners),
- gen_server:reply(Original, {ok, NewRepHistory}),
-
- %% maybe trigger another replication. If this replicator uses a local
- %% source Db, changes to that Db since we started will not be included in
- %% this pass.
- case up_to_date(Source, NewSeq) of
- true ->
- [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
- false ->
- [gen_server:reply(R, retry) || R <- OtherListeners]
- end,
- close_db(Source);
+terminate(normal, State) ->
+ timer:cancel(State#state.checkpoint_scheduled),
+ do_terminate(do_checkpoint(State));
terminate(Reason, State) ->
#state{
@@ -340,6 +310,42 @@ dbinfo(Db) ->
{ok, Info} = couch_db:get_db_info(Db),
Info.
+do_terminate(State) ->
+ #state{
+ checkpoint_history = CheckpointHistory,
+ committed_seq = NewSeq,
+ listeners = Listeners,
+ source = Source,
+ target = Target,
+ stats = Stats,
+ source_log = #doc{body={OldHistory}}
+ } = State,
+ couch_task_status:update("Finishing"),
+ ets:delete(Stats),
+ close_db(Target),
+
+ NewRepHistory = case CheckpointHistory of
+ nil ->
+ {[{<<"no_changes">>, true} | OldHistory]};
+ _Else ->
+ CheckpointHistory
+ end,
+
+ %% reply to original requester
+ [Original|OtherListeners] = lists:reverse(Listeners),
+ gen_server:reply(Original, {ok, NewRepHistory}),
+
+ %% maybe trigger another replication. If this replicator uses a local
+ %% source Db, changes to that Db since we started will not be included in
+ %% this pass.
+ case up_to_date(Source, NewSeq) of
+ true ->
+ [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
+ false ->
+ [gen_server:reply(R, retry) || R <- OtherListeners]
+ end,
+ close_db(Source).
+
has_session_id(_SessionId, []) ->
false;
has_session_id(SessionId, [{Props} | Rest]) ->