summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_rep.erl46
1 files changed, 35 insertions, 11 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 41b3cea2..8805577e 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -171,7 +171,6 @@ handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
when SourceSeq > N ->
- ?LOG_DEBUG("SourceSeq ~p N ~p", [SourceSeq, N]),
MissingRevs = State#state.missing_revs,
ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
couch_task_status:update("W Processed source update #~p", [SourceSeq]),
@@ -359,6 +358,7 @@ has_session_id(SessionId, [{Props} | Rest]) ->
make_replication_id({Props}, UserCtx) ->
%% funky algorithm to preserve backwards compatibility
{ok, HostName} = inet:gethostname(),
+ % Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)),
couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))).
@@ -457,7 +457,7 @@ do_checkpoint(State) ->
stats = Stats
} = State,
?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]),
- RecordSeqNum = case commit_to_both(Source, Target) of
+ RecordSeqNum = case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
NewSeqNum;
_Else ->
@@ -505,11 +505,11 @@ do_checkpoint(State) ->
State
end.
-commit_to_both(Source, Target) ->
+commit_to_both(Source, Target, RequiredSeq) ->
% commit the src async
ParentPid = self(),
SrcCommitPid = spawn_link(fun() ->
- ParentPid ! {self(), ensure_full_commit(Source)} end),
+ ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end),
% commit tgt sync
TargetStartTime = ensure_full_commit(Target),
@@ -523,8 +523,8 @@ commit_to_both(Source, Target) ->
end,
{SourceStartTime, TargetStartTime}.
-ensure_full_commit(#http_db{} = Db) ->
- Req = Db#http_db{
+ensure_full_commit(#http_db{} = Target) ->
+ Req = Target#http_db{
resource = "_ensure_full_commit",
method = post,
body = true
@@ -532,22 +532,46 @@ ensure_full_commit(#http_db{} = Db) ->
{ResultProps} = couch_rep_httpc:request(Req),
true = proplists:get_value(<<"ok">>, ResultProps),
proplists:get_value(<<"instance_start_time">>, ResultProps);
-ensure_full_commit(Db) ->
- {ok, NewDb} = couch_db:open(Db#db.name, []),
- UpdateSeq = couch_db:get_update_seq(Db),
+ensure_full_commit(Target) ->
+ {ok, NewDb} = couch_db:open(Target#db.name, []),
+ UpdateSeq = couch_db:get_update_seq(Target),
CommitSeq = couch_db:get_committed_update_seq(NewDb),
InstanceStartTime = NewDb#db.instance_start_time,
couch_db:close(NewDb),
if UpdateSeq > CommitSeq ->
?LOG_DEBUG("replication needs a full commit: update ~p commit ~p",
[UpdateSeq, CommitSeq]),
- {ok, NewTime} = couch_db:ensure_full_commit(Db),
- NewTime;
+ {ok, DbStartTime} = couch_db:ensure_full_commit(Target),
+ DbStartTime;
true ->
?LOG_DEBUG("replication doesn't need a full commit", []),
InstanceStartTime
end.
+ensure_full_commit(#http_db{} = Source, RequiredSeq) ->
+ Req = Source#http_db{
+ resource = "_ensure_full_commit",
+ method = post,
+ body = true,
+ qs = [{seq, RequiredSeq}]
+ },
+ {ResultProps} = couch_rep_httpc:request(Req),
+ case proplists:get_value(<<"ok">>, ResultProps) of
+ true ->
+ proplists:get_value(<<"instance_start_time">>, ResultProps);
+ undefined -> nil end;
+ensure_full_commit(Source, RequiredSeq) ->
+ {ok, NewDb} = couch_db:open(Source#db.name, []),
+ CommitSeq = couch_db:get_committed_update_seq(NewDb),
+ InstanceStartTime = NewDb#db.instance_start_time,
+ couch_db:close(NewDb),
+ if RequiredSeq > CommitSeq ->
+ {ok, DbStartTime} = couch_db:ensure_full_commit(Source),
+ DbStartTime;
+ true ->
+ InstanceStartTime
+ end.
+
update_local_doc(#http_db{} = Db, #doc{id=DocId} = Doc) ->
Req = Db#http_db{
resource = couch_util:url_encode(DocId),