diff options
-rw-r--r-- | src/couchdb/couch_rep.erl | 106 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 1 |
2 files changed, 60 insertions, 47 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index df36cca7..23a44069 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -502,53 +502,62 @@ do_checkpoint(State) -> tgt_starttime = TgtInstanceStartTime, stats = Stats } = State, - ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), - RecordSeqNum = case commit_to_both(Source, Target, NewSeqNum) of + case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> - NewSeqNum; + ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", + [dbname(Source), dbname(Target), NewSeqNum]), + SessionId = couch_uuids:random(), + NewHistoryEntry = {[ + {<<"session_id">>, SessionId}, + {<<"start_time">>, list_to_binary(ReplicationStartTime)}, + {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_last_seq">>, StartSeqNum}, + {<<"end_last_seq">>, NewSeqNum}, + {<<"recorded_seq">>, NewSeqNum}, + {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, + {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, + {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, + ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + % limit history to 50 entries + NewRepHistory = {[ + {<<"session_id">>, SessionId}, + {<<"source_last_seq">>, NewSeqNum}, + {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} + ]}, + + try + {SrcRevPos,SrcRevId} = + update_local_doc(Source, SourceLog#doc{body=NewRepHistory}), + {TgtRevPos,TgtRevId} = + update_local_doc(Target, TargetLog#doc{body=NewRepHistory}), + State#state{ + checkpoint_scheduled = nil, + checkpoint_history = NewRepHistory, + source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, + target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} + } + catch throw:conflict -> + ?LOG_ERROR("checkpoint failure: conflict (are you replicating to " + "yourself?)", []), + State + end; _Else -> - ?LOG_INFO("A server has restarted since replication start. " - "Not recording the new sequence number to ensure the " - "replication is redone and documents reexamined.", []), - StartSeqNum - end, - SessionId = couch_uuids:random(), - NewHistoryEntry = {[ - {<<"session_id">>, SessionId}, - {<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, - {<<"start_last_seq">>, StartSeqNum}, - {<<"end_last_seq">>, NewSeqNum}, - {<<"recorded_seq">>, RecordSeqNum}, - {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, - {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, - ets:lookup_element(Stats, doc_write_failures, 2)} - ]}, - % limit history to 50 entries - NewRepHistory = {[ - {<<"session_id">>, SessionId}, - {<<"source_last_seq">>, RecordSeqNum}, - {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} - ]}, - - try - {SrcRevPos,SrcRevId} = - update_local_doc(Source, SourceLog#doc{body=NewRepHistory}), - {TgtRevPos,TgtRevId} = - update_local_doc(Target, TargetLog#doc{body=NewRepHistory}), - State#state{ - checkpoint_scheduled = nil, - checkpoint_history = NewRepHistory, - source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, - target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} - } - catch throw:conflict -> - ?LOG_ERROR("checkpoint failure: conflict (are you replicating to yourself?)", - []), - State + ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", + [dbname(Source), dbname(Target)]), + #state{ + changes_feed = CF, + missing_revs = MR, + reader = Reader, + writer = Writer + } = State, + Pids = [CF, MR, Reader, Writer], + [unlink(Pid) || Pid <- Pids], + [exit(Pid, shutdown) || Pid <- Pids], + {ok, NewState} = init(State#state.init_args), + NewState end. commit_to_both(Source, Target, RequiredSeq) -> @@ -585,12 +594,12 @@ ensure_full_commit(Target) -> InstanceStartTime = NewDb#db.instance_start_time, couch_db:close(NewDb), if UpdateSeq > CommitSeq -> - ?LOG_DEBUG("replication needs a full commit: update ~p commit ~p", + ?LOG_DEBUG("target needs a full commit: update ~p commit ~p", [UpdateSeq, CommitSeq]), {ok, DbStartTime} = couch_db:ensure_full_commit(Target), DbStartTime; true -> - ?LOG_DEBUG("replication doesn't need a full commit", []), + ?LOG_DEBUG("target doesn't need a full commit", []), InstanceStartTime end. @@ -612,9 +621,12 @@ ensure_full_commit(Source, RequiredSeq) -> InstanceStartTime = NewDb#db.instance_start_time, couch_db:close(NewDb), if RequiredSeq > CommitSeq -> + ?LOG_DEBUG("source needs a full commit: required ~p committed ~p", + [RequiredSeq, CommitSeq]), {ok, DbStartTime} = couch_db:ensure_full_commit(Source), DbStartTime; true -> + ?LOG_DEBUG("source doesn't need a full commit", []), InstanceStartTime end. diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index e0ebc2f0..7f061500 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -262,6 +262,7 @@ reader_loop(ReaderServer, Source, MissingRevsServer) -> maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]), + couch_db:close(Db), NewDb; maybe_reopen_db(Db, _HighSeq) -> Db. |