diff options
author | Damien F. Katz <damien@apache.org> | 2009-01-09 22:20:48 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2009-01-09 22:20:48 +0000 |
commit | 87f45e73df3e37fbb631bcb14871c621ee77489b (patch) | |
tree | 0e33d44479bc94dd8a28387d771ba5feb037441d /src/couchdb/couch_rep.erl | |
parent | f6664de58f489627fee6e4283a1d17e0c6a99433 (diff) |
Added support so clients can detect if a server has potentially lost commits after multiple updates, like during bulk imports and so the replicator can detect lost commits on remote replications.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@733174 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 43 |
1 files changed, 40 insertions, 3 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 881525f0..29f1fc80 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -69,7 +69,10 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> RepRecKey = <<?LOCAL_DOC_PREFIX, HostNameBin/binary, ":", Source/binary, ":", Target/binary>>, - StartTime = httpd_util:rfc1123_date(), + ReplicationStartTime = httpd_util:rfc1123_date(), + + {ok, SrcInstanceStartTime} = get_db_info(DbSrc), + {ok, TgtInstanceStartTime} = get_db_info(DbTgt), case proplists:get_value(full, Options, false) orelse proplists:get_value("full", Options, false) of @@ -115,9 +118,28 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> % nothing changed, don't record results {ok, {OldRepHistoryProps}}; false -> + % commit changes to both src and tgt. The src because if changes + % we replicated are lost, we'll record the a seq number of ahead + % of what was committed and therefore lose future changes with the + % same seq nums. + + {ok, SrcInstanceStartTime2} = ensure_full_commit(DbSrc), + {ok, TgtInstanceStartTime2} = ensure_full_commit(DbTgt), + + RecordSeqNum = + if SrcInstanceStartTime2 == SrcInstanceStartTime andalso + TgtInstanceStartTime2 == TgtInstanceStartTime -> + NewSeqNum; + true -> + ?LOG_INFO("A server has restarted sinced replication start. " + "Not recording the new sequence number to ensure the " + "replication is redone and documents reexamined.", []), + SeqNum + end, + HistEntries =[ { - [{<<"start_time">>, list_to_binary(StartTime)}, + [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, {<<"start_last_seq">>, SeqNum}, {<<"end_last_seq">>, NewSeqNum} | Stats]} @@ -126,7 +148,7 @@ replicate2(Source, DbSrc, Target, DbTgt, Options) -> NewRepHistory = { [{<<"session_id">>, couch_util:new_uuid()}, - {<<"source_last_seq">>, NewSeqNum}, + {<<"source_last_seq">>, RecordSeqNum}, {<<"history">>, lists:sublist(HistEntries, 50)}]}, {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []), @@ -276,6 +298,21 @@ close_db(#http_db{})-> close_db(Db)-> couch_db:close(Db). +get_db_info(#http_db{uri=DbUrl, headers=Headers}) -> + {DbProps} = do_http_request(DbUrl, get, Headers), + {ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]}; +get_db_info(Db) -> + couch_db:get_db_info(Db). + + +ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) -> + {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, Headers, true), + true = proplists:get_value(<<"ok">>, ResultProps), + {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)}; +ensure_full_commit(Db) -> + couch_db:ensure_full_commit(Db). + + get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) -> Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey=" ++ integer_to_list(StartSeq), |