diff options
-rw-r--r-- | src/couchdb/couch_rep.erl | 78 |
1 files changed, 42 insertions, 36 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 9107918c..41b3cea2 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -171,6 +171,7 @@ handle_info({missing_revs_checkpoint, SourceSeq}, State) -> handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> + ?LOG_DEBUG("SourceSeq ~p N ~p", [SourceSeq, N]), MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), couch_task_status:update("W Processed source update #~p", [SourceSeq]), @@ -196,43 +197,12 @@ handle_info({'EXIT', Pid, Reason}, State) -> ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]), {stop, Reason, State}. -terminate(normal, State) -> - #state{ - checkpoint_scheduled = TRef, - checkpoint_history = CheckpointHistory, - committed_seq = NewSeq, - listeners = Listeners, - source = Source, - target = Target, - stats = Stats, - source_log = #doc{body={OldHistory}} - } = do_checkpoint(State), - timer:cancel(TRef), - couch_task_status:update("Finishing"), - ets:delete(Stats), - close_db(Target), +terminate(normal, #state{checkpoint_scheduled=nil} = State) -> + do_terminate(State); - NewRepHistory = case CheckpointHistory of - nil -> - {[{<<"no_changes">>, true} | OldHistory]}; - _Else -> - CheckpointHistory - end, - - %% reply to original requester - [Original|OtherListeners] = lists:reverse(Listeners), - gen_server:reply(Original, {ok, NewRepHistory}), - - %% maybe trigger another replication. If this replicator uses a local - %% source Db, changes to that Db since we started will not be included in - %% this pass. - case up_to_date(Source, NewSeq) of - true -> - [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners]; - false -> - [gen_server:reply(R, retry) || R <- OtherListeners] - end, - close_db(Source); +terminate(normal, State) -> + timer:cancel(State#state.checkpoint_scheduled), + do_terminate(do_checkpoint(State)); terminate(Reason, State) -> #state{ @@ -340,6 +310,42 @@ dbinfo(Db) -> {ok, Info} = couch_db:get_db_info(Db), Info. +do_terminate(State) -> + #state{ + checkpoint_history = CheckpointHistory, + committed_seq = NewSeq, + listeners = Listeners, + source = Source, + target = Target, + stats = Stats, + source_log = #doc{body={OldHistory}} + } = State, + couch_task_status:update("Finishing"), + ets:delete(Stats), + close_db(Target), + + NewRepHistory = case CheckpointHistory of + nil -> + {[{<<"no_changes">>, true} | OldHistory]}; + _Else -> + CheckpointHistory + end, + + %% reply to original requester + [Original|OtherListeners] = lists:reverse(Listeners), + gen_server:reply(Original, {ok, NewRepHistory}), + + %% maybe trigger another replication. If this replicator uses a local + %% source Db, changes to that Db since we started will not be included in + %% this pass. + case up_to_date(Source, NewSeq) of + true -> + [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners]; + false -> + [gen_server:reply(R, retry) || R <- OtherListeners] + end, + close_db(Source). + has_session_id(_SessionId, []) -> false; has_session_id(SessionId, [{Props} | Rest]) -> |