diff options
| author | Adam Kocoloski <kocolosk@apache.org> | 2009-10-28 19:27:31 +0000 | 
|---|---|---|
| committer | Adam Kocoloski <kocolosk@apache.org> | 2009-10-28 19:27:31 +0000 | 
| commit | 83ae3d22716bd27cac9aba134a32bfbe05f4301c (patch) | |
| tree | 565d012f56f835db934316844821569102f3959a /src | |
| parent | cbd893d573e2b37baa50528f2d4a639f885ebefa (diff) | |
reboot replication from last checkpoint if DB is compacted or server restarts
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@830737 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
| -rw-r--r-- | src/couchdb/couch_rep.erl | 106 | ||||
| -rw-r--r-- | src/couchdb/couch_rep_reader.erl | 1 | 
2 files changed, 60 insertions, 47 deletions
| diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index df36cca7..23a44069 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -502,53 +502,62 @@ do_checkpoint(State) ->          tgt_starttime = TgtInstanceStartTime,          stats = Stats      } = State, -    ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), -    RecordSeqNum = case commit_to_both(Source, Target, NewSeqNum) of +    case commit_to_both(Source, Target, NewSeqNum) of      {SrcInstanceStartTime, TgtInstanceStartTime} -> -        NewSeqNum; +        ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", +            [dbname(Source), dbname(Target), NewSeqNum]), +        SessionId = couch_uuids:random(), +        NewHistoryEntry = {[ +            {<<"session_id">>, SessionId}, +            {<<"start_time">>, list_to_binary(ReplicationStartTime)}, +            {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, +            {<<"start_last_seq">>, StartSeqNum}, +            {<<"end_last_seq">>, NewSeqNum}, +            {<<"recorded_seq">>, NewSeqNum}, +            {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, +            {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, +            {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, +            {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, +            {<<"doc_write_failures">>,  +                ets:lookup_element(Stats, doc_write_failures, 2)} +        ]}, +        % limit history to 50 entries +        NewRepHistory = {[ +            {<<"session_id">>, SessionId}, +            {<<"source_last_seq">>, NewSeqNum}, +            {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} +        ]}, + +        try +        {SrcRevPos,SrcRevId} = +            update_local_doc(Source, SourceLog#doc{body=NewRepHistory}), +        {TgtRevPos,TgtRevId} = +            update_local_doc(Target, TargetLog#doc{body=NewRepHistory}), +        State#state{ +            checkpoint_scheduled = nil, +            checkpoint_history = NewRepHistory, +            source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, +            target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} +        } +        catch throw:conflict -> +        ?LOG_ERROR("checkpoint failure: conflict (are you replicating to " +            "yourself?)", []), +        State +        end;      _Else -> -        ?LOG_INFO("A server has restarted since replication start. " -            "Not recording the new sequence number to ensure the " -            "replication is redone and documents reexamined.", []), -        StartSeqNum -    end, -    SessionId = couch_uuids:random(), -    NewHistoryEntry = {[ -        {<<"session_id">>, SessionId}, -        {<<"start_time">>, list_to_binary(ReplicationStartTime)}, -        {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, -        {<<"start_last_seq">>, StartSeqNum}, -        {<<"end_last_seq">>, NewSeqNum}, -        {<<"recorded_seq">>, RecordSeqNum}, -        {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, -        {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, -        {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, -        {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, -        {<<"doc_write_failures">>,  -            ets:lookup_element(Stats, doc_write_failures, 2)} -    ]}, -    % limit history to 50 entries -    NewRepHistory = {[ -        {<<"session_id">>, SessionId}, -        {<<"source_last_seq">>, RecordSeqNum}, -        {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} -    ]}, - -    try -    {SrcRevPos,SrcRevId} = -        update_local_doc(Source, SourceLog#doc{body=NewRepHistory}), -    {TgtRevPos,TgtRevId} = -        update_local_doc(Target, TargetLog#doc{body=NewRepHistory}), -    State#state{ -        checkpoint_scheduled = nil, -        checkpoint_history = NewRepHistory, -        source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, -        target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} -    } -    catch throw:conflict -> -    ?LOG_ERROR("checkpoint failure: conflict (are you replicating to yourself?)", -        []), -    State +        ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", +            [dbname(Source), dbname(Target)]), +        #state{ +            changes_feed = CF, +            missing_revs = MR, +            reader = Reader, +            writer = Writer +        } = State, +        Pids = [CF, MR, Reader, Writer], +        [unlink(Pid) || Pid <- Pids], +        [exit(Pid, shutdown) || Pid <- Pids], +        {ok, NewState} = init(State#state.init_args), +        NewState      end.  commit_to_both(Source, Target, RequiredSeq) -> @@ -585,12 +594,12 @@ ensure_full_commit(Target) ->      InstanceStartTime = NewDb#db.instance_start_time,      couch_db:close(NewDb),      if UpdateSeq > CommitSeq -> -        ?LOG_DEBUG("replication needs a full commit: update ~p commit ~p", +        ?LOG_DEBUG("target needs a full commit: update ~p commit ~p",              [UpdateSeq, CommitSeq]),          {ok, DbStartTime} = couch_db:ensure_full_commit(Target),          DbStartTime;      true -> -        ?LOG_DEBUG("replication doesn't need a full commit", []), +        ?LOG_DEBUG("target doesn't need a full commit", []),          InstanceStartTime      end. @@ -612,9 +621,12 @@ ensure_full_commit(Source, RequiredSeq) ->      InstanceStartTime = NewDb#db.instance_start_time,      couch_db:close(NewDb),      if RequiredSeq > CommitSeq -> +        ?LOG_DEBUG("source needs a full commit: required ~p committed ~p", +            [RequiredSeq, CommitSeq]),          {ok, DbStartTime} = couch_db:ensure_full_commit(Source),          DbStartTime;      true -> +        ?LOG_DEBUG("source doesn't need a full commit", []),          InstanceStartTime      end. diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index e0ebc2f0..7f061500 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -262,6 +262,7 @@ reader_loop(ReaderServer, Source, MissingRevsServer) ->  maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->      {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]), +    couch_db:close(Db),      NewDb;  maybe_reopen_db(Db, _HighSeq) ->      Db. | 
