summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl43
1 files changed, 40 insertions, 3 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 881525f0..29f1fc80 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -69,7 +69,10 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) ->
RepRecKey = <<?LOCAL_DOC_PREFIX, HostNameBin/binary,
":", Source/binary, ":", Target/binary>>,
- StartTime = httpd_util:rfc1123_date(),
+ ReplicationStartTime = httpd_util:rfc1123_date(),
+
+ {ok, SrcInstanceStartTime} = get_db_info(DbSrc),
+ {ok, TgtInstanceStartTime} = get_db_info(DbTgt),
case proplists:get_value(full, Options, false)
orelse proplists:get_value("full", Options, false) of
@@ -115,9 +118,28 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) ->
% nothing changed, don't record results
{ok, {OldRepHistoryProps}};
false ->
+ % commit changes to both src and tgt. The src because if changes
+ % we replicated are lost, we'll record the a seq number of ahead
+ % of what was committed and therefore lose future changes with the
+ % same seq nums.
+
+ {ok, SrcInstanceStartTime2} = ensure_full_commit(DbSrc),
+ {ok, TgtInstanceStartTime2} = ensure_full_commit(DbTgt),
+
+ RecordSeqNum =
+ if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
+ TgtInstanceStartTime2 == TgtInstanceStartTime ->
+ NewSeqNum;
+ true ->
+ ?LOG_INFO("A server has restarted sinced replication start. "
+ "Not recording the new sequence number to ensure the "
+ "replication is redone and documents reexamined.", []),
+ SeqNum
+ end,
+
HistEntries =[
{
- [{<<"start_time">>, list_to_binary(StartTime)},
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
{<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
{<<"start_last_seq">>, SeqNum},
{<<"end_last_seq">>, NewSeqNum} | Stats]}
@@ -126,7 +148,7 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) ->
NewRepHistory =
{
[{<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, NewSeqNum},
+ {<<"source_last_seq">>, RecordSeqNum},
{<<"history">>, lists:sublist(HistEntries, 50)}]},
{ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []),
@@ -276,6 +298,21 @@ close_db(#http_db{})->
close_db(Db)->
couch_db:close(Db).
+get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
+ {DbProps} = do_http_request(DbUrl, get, Headers),
+ {ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
+get_db_info(Db) ->
+ couch_db:get_db_info(Db).
+
+
+ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) ->
+ {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, Headers, true),
+ true = proplists:get_value(<<"ok">>, ResultProps),
+ {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
+ensure_full_commit(Db) ->
+ couch_db:ensure_full_commit(Db).
+
+
get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
++ integer_to_list(StartSeq),