diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_rep.erl | 46 |
1 files changed, 35 insertions, 11 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 41b3cea2..8805577e 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -171,7 +171,6 @@ handle_info({missing_revs_checkpoint, SourceSeq}, State) -> handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> - ?LOG_DEBUG("SourceSeq ~p N ~p", [SourceSeq, N]), MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), couch_task_status:update("W Processed source update #~p", [SourceSeq]), @@ -359,6 +358,7 @@ has_session_id(SessionId, [{Props} | Rest]) -> make_replication_id({Props}, UserCtx) -> %% funky algorithm to preserve backwards compatibility {ok, HostName} = inet:gethostname(), + % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)), couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))). @@ -457,7 +457,7 @@ do_checkpoint(State) -> stats = Stats } = State, ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), - RecordSeqNum = case commit_to_both(Source, Target) of + RecordSeqNum = case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> NewSeqNum; _Else -> @@ -505,11 +505,11 @@ do_checkpoint(State) -> State end. -commit_to_both(Source, Target) -> +commit_to_both(Source, Target, RequiredSeq) -> % commit the src async ParentPid = self(), SrcCommitPid = spawn_link(fun() -> - ParentPid ! {self(), ensure_full_commit(Source)} end), + ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end), % commit tgt sync TargetStartTime = ensure_full_commit(Target), @@ -523,8 +523,8 @@ commit_to_both(Source, Target) -> end, {SourceStartTime, TargetStartTime}. -ensure_full_commit(#http_db{} = Db) -> - Req = Db#http_db{ +ensure_full_commit(#http_db{} = Target) -> + Req = Target#http_db{ resource = "_ensure_full_commit", method = post, body = true @@ -532,22 +532,46 @@ ensure_full_commit(#http_db{} = Db) -> {ResultProps} = couch_rep_httpc:request(Req), true = proplists:get_value(<<"ok">>, ResultProps), proplists:get_value(<<"instance_start_time">>, ResultProps); -ensure_full_commit(Db) -> - {ok, NewDb} = couch_db:open(Db#db.name, []), - UpdateSeq = couch_db:get_update_seq(Db), +ensure_full_commit(Target) -> + {ok, NewDb} = couch_db:open(Target#db.name, []), + UpdateSeq = couch_db:get_update_seq(Target), CommitSeq = couch_db:get_committed_update_seq(NewDb), InstanceStartTime = NewDb#db.instance_start_time, couch_db:close(NewDb), if UpdateSeq > CommitSeq -> ?LOG_DEBUG("replication needs a full commit: update ~p commit ~p", [UpdateSeq, CommitSeq]), - {ok, NewTime} = couch_db:ensure_full_commit(Db), - NewTime; + {ok, DbStartTime} = couch_db:ensure_full_commit(Target), + DbStartTime; true -> ?LOG_DEBUG("replication doesn't need a full commit", []), InstanceStartTime end. +ensure_full_commit(#http_db{} = Source, RequiredSeq) -> + Req = Source#http_db{ + resource = "_ensure_full_commit", + method = post, + body = true, + qs = [{seq, RequiredSeq}] + }, + {ResultProps} = couch_rep_httpc:request(Req), + case proplists:get_value(<<"ok">>, ResultProps) of + true -> + proplists:get_value(<<"instance_start_time">>, ResultProps); + undefined -> nil end; +ensure_full_commit(Source, RequiredSeq) -> + {ok, NewDb} = couch_db:open(Source#db.name, []), + CommitSeq = couch_db:get_committed_update_seq(NewDb), + InstanceStartTime = NewDb#db.instance_start_time, + couch_db:close(NewDb), + if RequiredSeq > CommitSeq -> + {ok, DbStartTime} = couch_db:ensure_full_commit(Source), + DbStartTime; + true -> + InstanceStartTime + end. + update_local_doc(#http_db{} = Db, #doc{id=DocId} = Doc) -> Req = Db#http_db{ resource = couch_util:url_encode(DocId), |