summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-10-28 19:27:31 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-10-28 19:27:31 +0000
commit83ae3d22716bd27cac9aba134a32bfbe05f4301c (patch)
tree565d012f56f835db934316844821569102f3959a
parentcbd893d573e2b37baa50528f2d4a639f885ebefa (diff)
reboot replication from last checkpoint if DB is compacted or server restarts
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@830737 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_rep.erl106
-rw-r--r--src/couchdb/couch_rep_reader.erl1
2 files changed, 60 insertions, 47 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index df36cca7..23a44069 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -502,53 +502,62 @@ do_checkpoint(State) ->
tgt_starttime = TgtInstanceStartTime,
stats = Stats
} = State,
- ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
- RecordSeqNum = case commit_to_both(Source, Target, NewSeqNum) of
+ case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
- NewSeqNum;
+ ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
+ [dbname(Source), dbname(Target), NewSeqNum]),
+ SessionId = couch_uuids:random(),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, StartSeqNum},
+ {<<"end_last_seq">>, NewSeqNum},
+ {<<"recorded_seq">>, NewSeqNum},
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
+ {<<"doc_write_failures">>,
+ ets:lookup_element(Stats, doc_write_failures, 2)}
+ ]},
+ % limit history to 50 entries
+ NewRepHistory = {[
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, NewSeqNum},
+ {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
+ ]},
+
+ try
+ {SrcRevPos,SrcRevId} =
+ update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
+ {TgtRevPos,TgtRevId} =
+ update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
+ State#state{
+ checkpoint_scheduled = nil,
+ checkpoint_history = NewRepHistory,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ }
+ catch throw:conflict ->
+ ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
+ "yourself?)", []),
+ State
+ end;
_Else ->
- ?LOG_INFO("A server has restarted since replication start. "
- "Not recording the new sequence number to ensure the "
- "replication is redone and documents reexamined.", []),
- StartSeqNum
- end,
- SessionId = couch_uuids:random(),
- NewHistoryEntry = {[
- {<<"session_id">>, SessionId},
- {<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, StartSeqNum},
- {<<"end_last_seq">>, NewSeqNum},
- {<<"recorded_seq">>, RecordSeqNum},
- {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
- {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
- {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
- {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)},
- {<<"doc_write_failures">>,
- ets:lookup_element(Stats, doc_write_failures, 2)}
- ]},
- % limit history to 50 entries
- NewRepHistory = {[
- {<<"session_id">>, SessionId},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
- ]},
-
- try
- {SrcRevPos,SrcRevId} =
- update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
- {TgtRevPos,TgtRevId} =
- update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
- State#state{
- checkpoint_scheduled = nil,
- checkpoint_history = NewRepHistory,
- source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
- target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
- }
- catch throw:conflict ->
- ?LOG_ERROR("checkpoint failure: conflict (are you replicating to yourself?)",
- []),
- State
+ ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
+ [dbname(Source), dbname(Target)]),
+ #state{
+ changes_feed = CF,
+ missing_revs = MR,
+ reader = Reader,
+ writer = Writer
+ } = State,
+ Pids = [CF, MR, Reader, Writer],
+ [unlink(Pid) || Pid <- Pids],
+ [exit(Pid, shutdown) || Pid <- Pids],
+ {ok, NewState} = init(State#state.init_args),
+ NewState
end.
commit_to_both(Source, Target, RequiredSeq) ->
@@ -585,12 +594,12 @@ ensure_full_commit(Target) ->
InstanceStartTime = NewDb#db.instance_start_time,
couch_db:close(NewDb),
if UpdateSeq > CommitSeq ->
- ?LOG_DEBUG("replication needs a full commit: update ~p commit ~p",
+ ?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
[UpdateSeq, CommitSeq]),
{ok, DbStartTime} = couch_db:ensure_full_commit(Target),
DbStartTime;
true ->
- ?LOG_DEBUG("replication doesn't need a full commit", []),
+ ?LOG_DEBUG("target doesn't need a full commit", []),
InstanceStartTime
end.
@@ -612,9 +621,12 @@ ensure_full_commit(Source, RequiredSeq) ->
InstanceStartTime = NewDb#db.instance_start_time,
couch_db:close(NewDb),
if RequiredSeq > CommitSeq ->
+ ?LOG_DEBUG("source needs a full commit: required ~p committed ~p",
+ [RequiredSeq, CommitSeq]),
{ok, DbStartTime} = couch_db:ensure_full_commit(Source),
DbStartTime;
true ->
+ ?LOG_DEBUG("source doesn't need a full commit", []),
InstanceStartTime
end.
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index e0ebc2f0..7f061500 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -262,6 +262,7 @@ reader_loop(ReaderServer, Source, MissingRevsServer) ->
maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
{ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]),
+ couch_db:close(Db),
NewDb;
maybe_reopen_db(Db, _HighSeq) ->
Db.