From 5dcbc2290ac780f1a625b5c9435cfb35eac4e1ef Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Mon, 10 Aug 2009 18:37:43 +0000 Subject: new replicator using _changes feed for continuous replication git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802888 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep.erl | 1156 +++++++++++++++------------------------------ 1 file changed, 390 insertions(+), 766 deletions(-) (limited to 'src/couchdb/couch_rep.erl') diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 1692943a..c5a07685 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -12,155 +12,97 @@ -module(couch_rep). -behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([replicate/2]). --define(BUFFER_NDOCS, 1000). --define(BUFFER_NATTACHMENTS, 50). --define(BUFFER_MEMORY, 10000000). %% bytes - -include("couch_db.hrl"). --include("../ibrowse/ibrowse.hrl"). - -%% @spec replicate(Source::binary(), Target::binary()) -> -%% {ok, Stats} | {error, Reason} -%% @doc Triggers a replication. Stats is a JSON Object with the following -%% keys: session_id (UUID), source_last_seq (integer), and history (array). -%% Each element of the history is an Object with keys start_time, end_time, -%% start_last_seq, end_last_seq, missing_checked, missing_found, docs_read, -%% and docs_written. -%% -%% The supervisor will try to restart the replication in case of any error -%% other than shutdown. Just call this function again to listen for the -%% result of the retry. -replicate(Source, Target) -> - {ok, HostName} = inet:gethostname(), - RepId = couch_util:to_hex( - erlang:md5(term_to_binary([HostName, Source, Target]))), - Args = [?MODULE, [RepId, Source,Target], []], +-record(state, { + changes_feed, + missing_revs, + reader, + writer, + + source, + target, + init_args, + + start_seq, + history, + source_log, + target_log, + rep_starttime, + src_starttime, + tgt_starttime, + checkpoint_history = nil, + + listeners = [], + complete = false, + committed_seq = 0, + + stats = nil +}). +%% convenience function to do a simple replication from the shell +replicate(Source, Target) when is_list(Source) -> + replicate(?l2b(Source), Target); +replicate(Source, Target) when is_binary(Source), is_list(Target) -> + replicate(Source, ?l2b(Target)); +replicate(Source, Target) when is_binary(Source), is_binary(Target) -> + replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{}); + +%% function handling POST to _replicate +replicate(PostBody, UserCtx) -> + RepId = make_replication_id(PostBody, UserCtx), Replicator = {RepId, - {gen_server, start_link, Args}, + {gen_server, start_link, [?MODULE, [RepId, PostBody, UserCtx], []]}, transient, 1, worker, [?MODULE] }, - Server = case supervisor:start_child(couch_rep_sup, Replicator) of - {ok, Pid} -> - ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), - Pid; - {error, already_present} -> - case supervisor:restart_child(couch_rep_sup, RepId) of - {ok, Pid} -> - ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), - Pid; - {error, running} -> - %% this error occurs if multiple replicators are racing - %% each other to start and somebody else won. Just grab - %% the Pid by calling start_child again. - {error, {already_started, Pid}} = - supervisor:start_child(couch_rep_sup, Replicator), - ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), - Pid - end; - {error, {already_started, Pid}} -> - ?LOG_INFO("replication ~p already running at ~p", [RepId, Pid]), - Pid - end, + Server = start_replication_server(Replicator), - case gen_server:call(Server, get_result, infinity) of - retry -> replicate(Source, Target); - Else -> Else + try gen_server:call(Server, get_result, infinity) of + retry -> replicate(PostBody, UserCtx); + Else -> Else + catch + exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> + %% oops, this replication just finished -- restart it. + replicate(PostBody, UserCtx); + exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> + %% we made the call during terminate + replicate(PostBody, UserCtx) end. -%%============================================================================= -%% gen_server callbacks -%%============================================================================= - --record(old_http_db, { - uri, - headers, - oauth -}). - - --record(state, { - context, - current_seq, - source, - target, - stats, - enum_pid, - docs_buffer = [], - listeners = [], - done = false -}). - - -init([RepId, Source, Target]) -> +init([RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), - {ok, DbSrc, SrcName} = open_db(Source), - {ok, DbTgt, TgtName} = open_db(Target), + SourceProps = proplists:get_value(<<"source">>, PostProps), + TargetProps = proplists:get_value(<<"target">>, PostProps), - DocKey = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + Source = open_db(SourceProps, UserCtx), + Target = open_db(TargetProps, UserCtx), - {ok, InfoSrc} = get_db_info(DbSrc), - {ok, InfoTgt} = get_db_info(DbTgt), + SourceLog = open_replication_log(Source, RepId), + TargetLog = open_replication_log(Target, RepId), - ReplicationStartTime = httpd_util:rfc1123_date(), - SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc), - TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt), + SourceInfo = dbinfo(Source), + TargetInfo = dbinfo(Target), + + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), - RepRecDocSrc = - case open_doc(DbSrc, DocKey, []) of - {ok, SrcDoc} -> - ?LOG_DEBUG("Found existing replication record on source", []), - SrcDoc; - _ -> #doc{id=DocKey} - end, - - RepRecDocTgt = - case open_doc(DbTgt, DocKey, []) of - {ok, TgtDoc} -> - ?LOG_DEBUG("Found existing replication record on target", []), - TgtDoc; - _ -> #doc{id=DocKey} - end, - - #doc{body={RepRecProps}} = RepRecDocSrc, - #doc{body={RepRecPropsTgt}} = RepRecDocTgt, - - case proplists:get_value(<<"session_id">>, RepRecProps) == - proplists:get_value(<<"session_id">>, RepRecPropsTgt) of - true -> - % if the records have the same session id, - % then we have a valid replication history - OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0), - OldHistory = proplists:get_value(<<"history">>, RepRecProps, []); - false -> - ?LOG_INFO("Replication records differ. " - "Performing full replication instead of incremental.", []), - ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", - [RepRecProps, RepRecPropsTgt]), - OldSeqNum = 0, - OldHistory = [] - end, - - Context = [ - {start_seq, OldSeqNum}, - {history, OldHistory}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecDocSrc}, - {tgt_record, RepRecDocTgt} - ], + {ok, ChangesFeed} = + couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), + {ok, MissingRevs} = + couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps), + {ok, Writer} = + couch_rep_writer:start_link(self(), Target, Reader, PostProps), Stats = ets:new(replication_stats, [set, private]), ets:insert(Stats, {total_revs,0}), @@ -169,158 +111,116 @@ init([RepId, Source, Target]) -> ets:insert(Stats, {docs_written, 0}), ets:insert(Stats, {doc_write_failures, 0}), - couch_task_status:add_task("Replication", < ", - TgtName/binary>>, "Starting"), - - Parent = self(), - Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{OldSeqNum,0}) end), + {ShortId, _} = lists:split(6, RepId), + couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", + [ShortId, dbname(Source), dbname(Target)]), "Starting"), State = #state{ - context = Context, - current_seq = OldSeqNum, - enum_pid = Pid, - source = DbSrc, - target = DbTgt, - stats = Stats - }, + changes_feed = ChangesFeed, + missing_revs = MissingRevs, + reader = Reader, + writer = Writer, - {ok, State}. -handle_call(get_result, From, #state{listeners=L,done=true} = State) -> - {stop, normal, State#state{listeners=[From|L]}}; -handle_call(get_result, From, #state{listeners=L} = State) -> - {noreply, State#state{listeners=[From|L]}}; - -handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) -> - #state{ - context = Context, - current_seq = Seq, - docs_buffer = Buffer, source = Source, target = Target, - stats = Stats - } = State, + init_args = InitArgs, + stats = Stats, + + start_seq = StartSeq, + history = History, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = httpd_util:rfc1123_date(), + src_starttime = proplists:get_value(instance_start_time, SourceInfo), + tgt_starttime = proplists:get_value(instance_start_time, TargetInfo) + }, + {ok, State}. - ets:update_counter(Stats, missing_revs, length(Revs)), +handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> + {stop, normal, State#state{listeners=[From]}}; +handle_call(get_result, From, State) -> + Listeners = State#state.listeners, + {noreply, State#state{listeners=[From|Listeners]}}. - %% get document(s) - {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]), - Docs = [RevDoc || {ok, RevDoc} <- DocResults], - ets:update_counter(Stats, docs_read, length(Docs)), +handle_cast(_Msg, State) -> + {noreply, State}. - %% save them (maybe in a buffer) - {NewBuffer, NewContext} = - case should_flush(lists:flatlength([Docs|Buffer])) of - true -> - Docs2 = lists:flatten([Docs|Buffer]), - try update_docs(Target, Docs2, [], replicated_changes) of - {ok, Errors} -> - dump_update_errors(Errors), - ets:update_counter(Stats, doc_write_failures, length(Errors)), - ets:update_counter(Stats, docs_written, length(Docs2) - - length(Errors)), - {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), - {[], Ctxt} - catch - throw:attachment_write_failed -> - ?LOG_ERROR("attachment request failed during write to disk", []), - exit({internal_server_error, replication_link_failure}) - end; - false -> - {[Docs | Buffer], Context} - end, +handle_info({missing_revs_checkpoint, SourceSeq}, State) -> + couch_task_status:update("MR Processed source update #~p", [SourceSeq]), + {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})}; + +handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) + when SourceSeq > N -> + MissingRevs = State#state.missing_revs, + ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), + couch_task_status:update("W Processed source update #~p", [SourceSeq]), + {noreply, do_checkpoint(State#state{committed_seq = SourceSeq})}; +handle_info({writer_checkpoint, _}, State) -> + {noreply, State}; - {reply, ok, State#state{context=NewContext, docs_buffer=NewBuffer}}; +handle_info({update_stats, Key, N}, State) -> + ets:update_counter(State#state.stats, Key, N), + {noreply, State}; -handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) -> - ets:update_counter(State#state.stats, total_revs, RevsCount), +handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) -> case State#state.listeners of [] -> - % still waiting for the first listener to send a request - {noreply, State#state{current_seq=LastSeq,done=true}}; - _ -> - {stop, normal, ok, State#state{current_seq=LastSeq}} - end. - -handle_cast({increment_update_seq, Seq}, State) -> - couch_task_status:update("Processed source update #~p", [Seq]), - {noreply, State#state{current_seq=Seq}}. - -handle_info({'EXIT', Pid, Reason}, #state{enum_pid=Pid} = State) -> - ?LOG_ERROR("replication enumerator exited with ~p .. respawning", [Reason]), - #state{ - current_seq = Seq, - source = Src, - target = Tgt, - enum_pid = Pid - } = State, - Parent = self(), - NewPid = spawn_link(fun() -> enum_docs_since(Parent,Src,Tgt,{Seq,0}) end), - {noreply, State#state{enum_pid=NewPid}}; + {noreply, State#state{complete = true}}; + _Else -> + {stop, normal, State} + end; -%% if any linked process dies, respawn the enumerator to get things going again -handle_info({'EXIT', _From, normal}, State) -> - {noreply, State}; -handle_info({'EXIT', From, Reason}, #state{enum_pid=EnumPid} = State) -> - ?LOG_ERROR("replicator-linked pid ~p exited with ~p", [From, Reason]), - exit(EnumPid, pls_restart_kthxbye), +handle_info({'EXIT', _, normal}, State) -> {noreply, State}; - -handle_info(_Msg, State) -> - {noreply, State}. +handle_info({'EXIT', Pid, Reason}, State) -> + ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]), + {stop, Reason, State}. terminate(normal, State) -> + % ?LOG_DEBUG("replication terminating normally", []), #state{ - context = Context, - current_seq = Seq, - docs_buffer = Buffer, + checkpoint_history = CheckpointHistory, + committed_seq = NewSeq, listeners = Listeners, source = Source, target = Target, - stats = Stats + stats = Stats, + source_log = #doc{body={OldHistory}} } = State, - - try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of - {ok, Errors} -> - dump_update_errors(Errors), - ets:update_counter(Stats, doc_write_failures, length(Errors)), - ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - - length(Errors)) - catch - throw:attachment_write_failed -> - ?LOG_ERROR("attachment request failed during final write", []), - exit({internal_server_error, replication_link_failure}) - end, - couch_task_status:update("Finishing"), - - {ok, NewRepHistory, _} = do_checkpoint(Source, Target, Context, Seq, Stats), ets:delete(Stats), close_db(Target), + + NewRepHistory = case CheckpointHistory of + nil -> + {[{<<"no_changes">>, true} | OldHistory]}; + _Else -> + CheckpointHistory + end, - [Original|Rest] = Listeners, + %% reply to original requester + [Original|OtherListeners] = lists:reverse(Listeners), gen_server:reply(Original, {ok, NewRepHistory}), %% maybe trigger another replication. If this replicator uses a local %% source Db, changes to that Db since we started will not be included in %% this pass. - case up_to_date(Source, Seq) of + case up_to_date(Source, NewSeq) of true -> - [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest]; + [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners]; false -> - [gen_server:reply(R, retry) || R <- Rest] + [gen_server:reply(R, retry) || R <- OtherListeners] end, close_db(Source); + terminate(Reason, State) -> - ?LOG_ERROR("replicator terminating with reason ~p", [Reason]), #state{ listeners = Listeners, source = Source, target = Target, stats = Stats } = State, - [gen_server:reply(L, {error, Reason}) || L <- Listeners], - ets:delete(Stats), close_db(Target), close_db(Source). @@ -328,560 +228,284 @@ terminate(Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%============================================================================= -%% internal functions -%%============================================================================= - +% internal funs -% we should probably write these to a special replication log -% or have a callback where the caller decides what to do with replication -% errors. -dump_update_errors([]) -> ok; -dump_update_errors([{{Id, Rev}, Error}|Rest]) -> - ?LOG_INFO("error replicating document \"~s\" rev \"~s\":~p", - [Id, couch_doc:rev_to_str(Rev), Error]), - dump_update_errors(Rest). - -attachment_loop(ReqId, Conn) -> - couch_util:should_flush(), - receive - {From, {set_req_id, NewId}} -> - %% we learn the ReqId to listen for - From ! {self(), {ok, NewId}}, - attachment_loop(NewId, Conn); - {ibrowse_async_headers, ReqId, Status, Headers} -> - %% we got header, give the controlling process a chance to react - receive - {From, gimme_status} -> - %% send status/headers to controller - From ! {self(), {status, Status, Headers}}, - receive - {From, continue} -> - %% normal case - attachment_loop(ReqId, Conn); - {From, fail} -> - %% error, failure code - ?LOG_ERROR( - "streaming attachment failed with status ~p", - [Status]), - catch ibrowse:stop_worker_process(Conn), - exit(attachment_request_failed); - {From, stop_ok} -> - %% stop looping, controller will start a new loop - catch ibrowse:stop_worker_process(Conn), - stop_ok - end - end, - attachment_loop(ReqId, Conn); - {ibrowse_async_response, ReqId, {chunk_start,_}} -> - attachment_loop(ReqId, Conn); - {ibrowse_async_response, ReqId, chunk_end} -> - attachment_loop(ReqId, Conn); - {ibrowse_async_response, ReqId, {error, Err}} -> - ?LOG_ERROR("streaming attachment failed with ~p", [Err]), - catch ibrowse:stop_worker_process(Conn), - exit(attachment_request_failed); - {ibrowse_async_response, ReqId, Data} -> - receive {From, gimme_data} -> From ! {self(), Data} end, - attachment_loop(ReqId, Conn); - {ibrowse_async_response_end, ReqId} -> - catch ibrowse:stop_worker_process(Conn), - exit(normal) +start_replication_server(Replicator) -> + RepId = element(1, Replicator), + case supervisor:start_child(couch_rep_sup, Replicator) of + {ok, Pid} -> + ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), + Pid; + {error, already_present} -> + case supervisor:restart_child(couch_rep_sup, RepId) of + {ok, Pid} -> + ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), + Pid; + {error, running} -> + %% this error occurs if multiple replicators are racing + %% each other to start and somebody else won. Just grab + %% the Pid by calling start_child again. + {error, {already_started, Pid}} = + supervisor:start_child(couch_rep_sup, Replicator), + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), + Pid + end; + {error, {already_started, Pid}} -> + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), + Pid end. -att_stub_converter(DbS, Id, Rev, - #att{name=Name,data=stub,type=Type,len=Length}=Att) -> - #old_http_db{uri=DbUrl, headers=Headers} = DbS, - {Pos, [RevId|_]} = Rev, - Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)), - "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]), - ?LOG_DEBUG("Attachment URL ~s", [Url]), - {ok, RcvFun} = make_att_stub_receiver(Url, Headers, Name, - Type, Length), - Att#att{name=Name,type=Type,data=RcvFun,len=Length}. - -make_att_stub_receiver(Url, Headers, Name, Type, Length) -> - make_att_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000). - -make_att_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) -> - ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s", - [Url]), - exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])}); - -make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) -> - %% start the process that receives attachment data from ibrowse - #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), - {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port), - Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end), - - %% make the async request - Opts = [{stream_to, Pid}, {response_format, binary}], - ReqId = - case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of - {ibrowse_req_id, X} -> - X; - {error, Reason} -> - ?LOG_INFO("retrying couch_rep attachment request in ~p " ++ - "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]), - catch ibrowse:stop_worker_process(Conn), - timer:sleep(Pause), - make_att_stub_receiver(Url, Headers, Name, Type, Length, - Retries-1, 2*Pause) - end, - - %% tell our receiver about the ReqId it needs to look for - Pid ! {self(), {set_req_id, ReqId}}, - receive - {Pid, {ok, ReqId}} -> - ok; - {'EXIT', Pid, _Reason} -> - catch ibrowse:stop_worker_process(Conn), - timer:sleep(Pause), - make_att_stub_receiver(Url, Headers, Name, Type, Length, - Retries-1, 2*Pause) - end, - - %% wait for headers to ensure that we have a 200 status code - %% this is where we follow redirects etc - Pid ! {self(), gimme_status}, - receive - {'EXIT', Pid, attachment_request_failed} -> - catch ibrowse:stop_worker_process(Conn), - make_att_stub_receiver(Url, Headers, Name, Type, Length, - Retries-1, Pause); - {Pid, {status, StreamStatus, StreamHeaders}} -> - ?LOG_DEBUG("streaming attachment Status ~p Headers ~p", - [StreamStatus, StreamHeaders]), - - ResponseCode = list_to_integer(StreamStatus), - if - ResponseCode >= 200, ResponseCode < 300 -> - % the normal case - Pid ! {self(), continue}, - %% this function goes into the streaming attachment code. - %% It gets executed by the replication gen_server, so it can't - %% be the one to actually receive the ibrowse data. - {ok, fun() -> - Pid ! {self(), gimme_data}, - receive - {Pid, Data} -> - Data; - {'EXIT', Pid, attachment_request_failed} -> - throw(attachment_write_failed) - end - end}; - ResponseCode >= 300, ResponseCode < 400 -> - % follow the redirect - Pid ! {self(), stop_ok}, - RedirectUrl = mochiweb_headers:get_value("Location", - mochiweb_headers:make(StreamHeaders)), - catch ibrowse:stop_worker_process(Conn), - make_att_stub_receiver(RedirectUrl, Headers, Name, Type, - Length, Retries - 1, Pause); - ResponseCode >= 400, ResponseCode < 500 -> - % an error... log and fail - ?LOG_ERROR("streaming attachment failed with code ~p: ~s", - [ResponseCode, Url]), - Pid ! {self(), fail}, - exit(attachment_request_failed); - ResponseCode == 500 -> - % an error... log and retry - ?LOG_INFO("retrying couch_rep attachment request in ~p " ++ - "seconds due to 500 response: ~s", [Pause/1000, Url]), - Pid ! {self(), fail}, - catch ibrowse:stop_worker_process(Conn), - timer:sleep(Pause), - make_att_stub_receiver(Url, Headers, Name, Type, Length, - Retries - 1, 2*Pause) - end +compare_replication_logs(SrcDoc, TgtDoc) -> + #doc{body={RepRecProps}} = SrcDoc, + #doc{body={RepRecPropsTgt}} = TgtDoc, + case proplists:get_value(<<"session_id">>, RepRecProps) == + proplists:get_value(<<"session_id">>, RepRecPropsTgt) of + true -> + % if the records have the same session id, + % then we have a valid replication history + OldSeqNum = proplists:get_value(<<"source_last_seq">>, RepRecProps, 0), + OldHistory = proplists:get_value(<<"history">>, RepRecProps, []), + {OldSeqNum, OldHistory}; + false -> + SourceHistory = proplists:get_value(<<"history">>, RepRecProps, []), + TargetHistory = proplists:get_value(<<"history">>, RepRecPropsTgt, []), + ?LOG_INFO("Replication records differ. " + "Scanning histories to find a common ancestor.", []), + ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", + [RepRecProps, RepRecPropsTgt]), + compare_rep_history(SourceHistory, TargetHistory) end. - -open_db({remote, Url, Headers, Auth})-> - {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url}; -open_db({local, DbName, UserCtx})-> - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> {ok, Db, DbName}; - Error -> Error +compare_rep_history(S, T) when length(S) =:= 0 orelse length(T) =:= 0 -> + ?LOG_INFO("no common ancestry -- performing full replication", []), + {0, []}; +compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) -> + SourceId = proplists:get_value(<<"session_id">>, S), + case has_session_id(SourceId, Target) of + true -> + RecordSeqNum = proplists:get_value(<<"recorded_seq">>, S, 0), + ?LOG_INFO("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, SourceRest}; + false -> + TargetId = proplists:get_value(<<"session_id">>, T), + case has_session_id(TargetId, SourceRest) of + true -> + RecordSeqNum = proplists:get_value(<<"recorded_seq">>, T, 0), + ?LOG_INFO("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, TargetRest}; + false -> + compare_rep_history(SourceRest, TargetRest) + end end. - -close_db(#old_http_db{})-> +close_db(#http_db{})-> ok; close_db(Db)-> couch_db:close(Db). -do_checkpoint(Source, Target, Context, NewSeqNum, Stats) -> - ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), - [ - {start_seq, StartSeqNum}, - {history, OldHistory}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime}, - {src_record, #doc{body={LastRepRecord}}=RepRecDocSrc}, - {tgt_record, RepRecDocTgt} - ] = Context, - - case NewSeqNum == StartSeqNum andalso OldHistory /= [] of - true -> - % nothing changed, don't record results - {ok, {[{<<"no_changes">>, true} | LastRepRecord]}, Context}; - false -> - % something changed, record results for incremental replication, - - % commit changes to both src and tgt. The src because if changes - % we replicated are lost, we'll record the a seq number ahead - % of what was committed. If those changes are lost and the seq number - % reverts to a previous committed value, we will skip future changes - % when new doc updates are given our already replicated seq nums. - - % commit the src async - ParentPid = self(), - SrcCommitPid = spawn_link(fun() -> - ParentPid ! {self(), ensure_full_commit(Source)} end), - - % commit tgt sync - {ok, TgtInstanceStartTime2} = ensure_full_commit(Target), - - SrcInstanceStartTime2 = - receive - {SrcCommitPid, {ok, Timestamp}} -> - Timestamp; - {'EXIT', SrcCommitPid, {http_request_failed, _}} -> - exit(replication_link_failure) - end, - - RecordSeqNum = - if SrcInstanceStartTime2 == SrcInstanceStartTime andalso - TgtInstanceStartTime2 == TgtInstanceStartTime -> - NewSeqNum; - true -> - ?LOG_INFO("A server has restarted sinced replication start. " - "Not recording the new sequence number to ensure the " - "replication is redone and documents reexamined.", []), - StartSeqNum - end, - - NewHistoryEntry = { - [{<<"start_time">>, list_to_binary(ReplicationStartTime)}, - {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, - {<<"start_last_seq">>, StartSeqNum}, - {<<"end_last_seq">>, NewSeqNum}, - {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, - {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, - {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, - {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, - {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)} - ]}, - % limit history to 50 entries - HistEntries =lists:sublist([NewHistoryEntry | OldHistory], 50), - - NewRepHistory = - {[{<<"session_id">>, couch_util:new_uuid()}, - {<<"source_last_seq">>, RecordSeqNum}, - {<<"history">>, HistEntries}]}, - - {ok, {SrcRevPos,SrcRevId}} = update_doc(Source, - RepRecDocSrc#doc{body=NewRepHistory}, []), - {ok, {TgtRevPos,TgtRevId}} = update_doc(Target, - RepRecDocTgt#doc{body=NewRepHistory}, []), - - NewContext = [ - {start_seq, StartSeqNum}, - {history, OldHistory}, - {rep_starttime, ReplicationStartTime}, - {src_starttime, SrcInstanceStartTime}, - {tgt_starttime, TgtInstanceStartTime}, - {src_record, RepRecDocSrc#doc{revs={SrcRevPos,[SrcRevId]}}}, - {tgt_record, RepRecDocTgt#doc{revs={TgtRevPos,[TgtRevId]}}} - ], - - {ok, NewRepHistory, NewContext} - +dbname(#http_db{} = Db) -> + Db#http_db.url; +dbname(Db) -> + Db#db.name. + +dbinfo(#http_db{} = Db) -> + {DbProps} = couch_rep_httpc:request(Db), + [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]; +dbinfo(Db) -> + {ok, Info} = couch_db:get_db_info(Db), + Info. + +has_session_id(_SessionId, []) -> + false; +has_session_id(SessionId, [{Props} | Rest]) -> + case proplists:get_value(<<"session_id">>, Props, nil) of + SessionId -> + true; + _Else -> + has_session_id(SessionId, Rest) end. -do_http_request(Url, Action, Headers, Auth) -> - do_http_request(Url, Action, Headers, Auth, []). - -do_http_request(Url, Action, Headers, Auth, JsonBody) -> - Headers0 = case Auth of - {Props} -> - % Add OAuth header - {OAuth} = proplists:get_value(<<"oauth">>, Props), - ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, OAuth)), - Token = ?b2l(proplists:get_value(<<"token">>, OAuth)), - TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, OAuth)), - ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, OAuth)), - Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1}, - Method = case Action of - get -> "GET"; - post -> "POST"; - put -> "PUT" - end, - Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret), - [{"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)} | Headers]; - _Else -> - Headers - end, - do_http_request0(Url, Action, Headers0, JsonBody, 10, 1000). - -do_http_request0(Url, Action, Headers, Body, Retries, Pause) when is_binary(Url) -> - do_http_request0(?b2l(Url), Action, Headers, Body, Retries, Pause); -do_http_request0(Url, Action, _Headers, _JsonBody, 0, _Pause) -> - ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", - [Action, Url]), - exit({http_request_failed, ?l2b(["failed to replicate ", Url])}); -do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) -> - ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]), - Body = - case JsonBody of - [] -> - <<>>; +make_replication_id({Props}, UserCtx) -> + %% funky algorithm to preserve backwards compatibility + {ok, HostName} = inet:gethostname(), + Src = get_rep_endpoint(UserCtx, proplists:get_value(<<"source">>, Props)), + Tgt = get_rep_endpoint(UserCtx, proplists:get_value(<<"target">>, Props)), + couch_util:to_hex(erlang:md5(term_to_binary([HostName, Src, Tgt]))). + +maybe_add_trailing_slash(Url) -> + re:replace(Url, "[^/]$", "&/", [{return, list}]). + +get_rep_endpoint(_UserCtx, {Props}) -> + Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)), + {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}), + {Auth} = proplists:get_value(<<"auth">>, Props, {[]}), + case proplists:get_value(<<"oauth">>, Auth) of + undefined -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; + {OAuth} -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} + end; +get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> + {remote, maybe_add_trailing_slash(Url), []}; +get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> + {remote, maybe_add_trailing_slash(Url), []}; +get_rep_endpoint(UserCtx, <>) -> + {local, DbName, UserCtx}. + +open_replication_log(#http_db{}=Db, RepId) -> + DocId = ?LOCAL_DOC_PREFIX ++ RepId, + Req = Db#http_db{resource=couch_util:url_encode(DocId)}, + case couch_rep_httpc:request(Req) of + {[{<<"error">>, _}, {<<"reason">>, _}]} -> + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), + #doc{id=?l2b(DocId)}; + Doc -> + ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), + couch_doc:from_json_obj(Doc) + end; +open_replication_log(Db, RepId) -> + DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + case couch_db:open_doc(Db, DocId, []) of + {ok, Doc} -> + ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), + Doc; _ -> - iolist_to_binary(?JSON_ENCODE(JsonBody)) - end, - Options = case Action of - get -> []; - _ -> [{transfer_encoding, {chunked, 65535}}] - end ++ [ - {content_type, "application/json; charset=utf-8"}, - {max_pipeline_size, 101}, - {response_format, binary} - ], - case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of - {ok, Status, ResponseHeaders, ResponseBody} -> - ResponseCode = list_to_integer(Status), - if - ResponseCode >= 200, ResponseCode < 300 -> - ?JSON_DECODE(ResponseBody); - ResponseCode >= 300, ResponseCode < 400 -> - RedirectUrl = mochiweb_headers:get_value("Location", - mochiweb_headers:make(ResponseHeaders)), - do_http_request0(RedirectUrl, Action, Headers, JsonBody, Retries-1, - Pause); - ResponseCode >= 400, ResponseCode < 500 -> - ?JSON_DECODE(ResponseBody); - ResponseCode == 500 -> - ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds " ++ - "due to 500 error: ~s", [Action, Pause/1000, Url]), - timer:sleep(Pause), - do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) - end; - {error, Reason} -> - ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds due to " ++ - "{error, ~p}: ~s", [Action, Pause/1000, Reason, Url]), - timer:sleep(Pause), - do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), + #doc{id=DocId} end. -ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> - {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, - Headers, OAuth, true), - true = proplists:get_value(<<"ok">>, ResultProps), - {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)}; -ensure_full_commit(Db) -> - couch_db:ensure_full_commit(Db). - -enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> - case get_doc_info_list(DbSource, StartSeq) of - [] -> - gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity); - DocInfoList -> - SrcRevsList = lists:map(fun(#doc_info{id=Id,revs=RevInfos}) -> - SrcRevs = [Rev || #rev_info{rev=Rev} <- RevInfos], - {Id, SrcRevs} - end, DocInfoList), - {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList), - - %% do we need to check for success here? - [gen_server:call(Pid, {replicate_doc, Info}, infinity) - || Info <- MissingRevs ], - - #doc_info{high_seq=LastSeq} = lists:last(DocInfoList), - RevsCount2 = RevsCount + length(SrcRevsList), - gen_server:cast(Pid, {increment_update_seq, LastSeq}), - - enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2}) - end. - - - -get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> - {DbProps} = do_http_request(DbUrl, get, Headers, OAuth), - {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]}; -get_db_info(Db) -> - couch_db:get_db_info(Db). - -get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) -> - Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey=" - ++ integer_to_list(StartSeq), - {Results} = do_http_request(Url, get, Headers, OAuth), - lists:map(fun({RowInfoList}) -> - {RowValueProps} = proplists:get_value(<<"value">>, RowInfoList), - Seq = proplists:get_value(<<"key">>, RowInfoList), - Revs = - [#rev_info{rev=couch_doc:parse_rev(proplists:get_value(<<"rev">>, RowValueProps)), deleted = proplists:get_value(<<"deleted">>, RowValueProps, false)} | - [#rev_info{rev=Rev,deleted=false} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"conflicts">>, RowValueProps, []))] ++ - [#rev_info{rev=Rev,deleted=true} || Rev <- couch_doc:parse_revs(proplists:get_value(<<"deleted_conflicts">>, RowValueProps, []))]], - #doc_info{ - id=proplists:get_value(<<"id">>, RowInfoList), - high_seq = Seq, - revs = Revs - } - end, proplists:get_value(<<"rows">>, Results)); -get_doc_info_list(DbSource, StartSeq) -> - {ok, {_Count, DocInfoList}} = couch_db:enum_docs_since(DbSource, StartSeq, - fun (_, _, {100, DocInfoList}) -> - {stop, {100, DocInfoList}}; - (DocInfo, _, {Count, DocInfoList}) -> - {ok, {Count+1, [DocInfo|DocInfoList]}} - end, {0, []}), - lists:reverse(DocInfoList). - -get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) -> - DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList], - {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth, - {DocIdRevsList2}), - {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers), - DocMissingRevsList2 = [{Id, couch_doc:parse_revs(MissingRevStrs)} || {Id, MissingRevStrs} <- DocMissingRevsList], - {ok, DocMissingRevsList2}; -get_missing_revs(Db, DocId) -> - couch_db:get_missing_revs(Db, DocId). - - -open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) -> - [] = Options, - case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of - {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} -> - {couch_util:to_existing_atom(ErrId), Reason}; - Doc -> - {ok, couch_doc:from_json_obj(Doc)} +open_db({Props}, _UserCtx) -> + Url = maybe_add_trailing_slash(proplists:get_value(<<"url">>, Props)), + {AuthProps} = proplists:get_value(<<"auth">>, Props, {[]}), + {BinHeaders} = proplists:get_value(<<"headers">>, Props, {[]}), + Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], + DefaultHeaders = (#http_db{})#http_db.headers, + Db = #http_db{ + url = Url, + auth = AuthProps, + headers = lists:ukeymerge(1, Headers, DefaultHeaders) + }, + case couch_rep_httpc:db_exists(Db) of + true -> Db; + false -> throw({db_not_found, Url}) end; -open_doc(Db, DocId, Options) -> - couch_db:open_doc(Db, DocId, Options). - -open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0, - [latest]) -> - Revs = couch_doc:rev_to_strs(Revs0), - BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true", - - %% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests - MaxN = trunc((8192 - length(BaseUrl))/14), - - JsonResults = case length(Revs) > MaxN of - false -> - Url = ?l2b(BaseUrl ++ "&open_revs=" ++ ?JSON_ENCODE(Revs)), - do_http_request(Url, get, Headers, OAuth); - true -> - {_, Rest, Acc} = lists:foldl( - fun(Rev, {Count, RevsAcc, AccResults}) when Count =:= MaxN -> - QSRevs = ?JSON_ENCODE(lists:reverse(RevsAcc)), - Url = ?l2b(BaseUrl ++ "&open_revs=" ++ QSRevs), - {1, [Rev], AccResults++do_http_request(Url, get, Headers, OAuth)}; - (Rev, {Count, RevsAcc, AccResults}) -> - {Count+1, [Rev|RevsAcc], AccResults} - end, {0, [], []}, Revs), - Acc ++ do_http_request(?l2b(BaseUrl ++ "&open_revs=" ++ - ?JSON_ENCODE(lists:reverse(Rest))), get, Headers, OAuth) - end, - - Results = - lists:map( - fun({[{<<"missing">>, Rev}]}) -> - {{not_found, missing}, couch_doc:parse_rev(Rev)}; - ({[{<<"ok">>, JsonDoc}]}) -> - #doc{id=Id, revs=Rev, atts=Atts} = Doc = - couch_doc:from_json_obj(JsonDoc), - {ok, Doc#doc{atts=[att_stub_converter(DbS,Id,Rev,A) || A <- Atts]}} - end, JsonResults), - {ok, Results}; -open_doc_revs(Db, DocId, Revs, Options) -> - couch_db:open_doc_revs(Db, DocId, Revs, Options). - -%% @spec should_flush() -> true | false -%% @doc Calculates whether it's time to flush the document buffer. Considers -%% - memory utilization -%% - number of pending document writes -%% - approximate number of pending attachment writes -should_flush(DocCount) when DocCount > ?BUFFER_NDOCS -> - true; -should_flush(_DocCount) -> - MeAndMyLinks = [self()| - [P || P <- element(2,process_info(self(),links)), is_pid(P)]], - - case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of - true -> true; - false -> - case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of - true -> - [garbage_collect(Pid) || Pid <- MeAndMyLinks], - memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY; - false -> false - end - end. - -%% @spec memory_footprint([pid()]) -> integer() -%% @doc Sum of process and binary memory utilization for all processes in list -memory_footprint(PidList) -> - memory_footprint(PidList, {0,0}). - -memory_footprint([], {ProcessMemory, BinaryMemory}) -> - ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory, BinaryMemory]), - ProcessMemory + BinaryMemory; -memory_footprint([Pid|Rest], {ProcAcc, BinAcc}) -> - case is_process_alive(Pid) of - true -> - ProcMem = element(2,process_info(Pid, memory)), - BinMem = binary_memory(Pid), - memory_footprint(Rest, {ProcMem + ProcAcc, BinMem + BinAcc}); - false -> - memory_footprint(Rest, {ProcAcc, BinAcc}) +open_db(<<"http://",_/binary>>=Url, _) -> + open_db({[{<<"url">>,Url}]}, []); +open_db(<<"https://",_/binary>>=Url, _) -> + open_db({[{<<"url">>,Url}]}, []); +open_db(<>, UserCtx) -> + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> Db; + {not_found, no_db_file} -> throw({db_not_found, DbName}) end. -%% @spec binary_memory(pid()) -> integer() -%% @doc Memory utilization of all binaries referenced by this process. -binary_memory(Pid) -> - lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end, - 0, element(2,process_info(Pid, binary))). - -update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) -> - [] = Options, - Url = DbUrl ++ couch_util:url_encode(DocId), - {ResponseMembers} = do_http_request(Url, put, Headers, OAuth, - couch_doc:to_json_obj(Doc, [attachments])), +do_checkpoint(State) -> + #state{ + source = Source, + target = Target, + committed_seq = NewSeqNum, + start_seq = StartSeqNum, + history = OldHistory, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = ReplicationStartTime, + src_starttime = SrcInstanceStartTime, + tgt_starttime = TgtInstanceStartTime, + stats = Stats + } = State, + ?LOG_INFO("recording a checkpoint at source update_seq ~p", [NewSeqNum]), + RecordSeqNum = case commit_to_both(Source, Target) of + {SrcInstanceStartTime, TgtInstanceStartTime} -> + NewSeqNum; + _Else -> + ?LOG_INFO("A server has restarted sinced replication start. " + "Not recording the new sequence number to ensure the " + "replication is redone and documents reexamined.", []), + StartSeqNum + end, + SessionId = couch_util:new_uuid(), + NewHistoryEntry = {[ + {<<"session_id">>, SessionId}, + {<<"start_time">>, list_to_binary(ReplicationStartTime)}, + {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_last_seq">>, StartSeqNum}, + {<<"end_last_seq">>, NewSeqNum}, + {<<"recorded_seq">>, RecordSeqNum}, + {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, + {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, + {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, + ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + % limit history to 50 entries + NewRepHistory = {[ + {<<"session_id">>, SessionId}, + {<<"source_last_seq">>, RecordSeqNum}, + {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} + ]}, + % ?LOG_DEBUG("updating src doc ~p", [SourceLog]), + {SrcRevPos,SrcRevId} = + update_doc(Source, SourceLog#doc{body=NewRepHistory}, []), + % ?LOG_DEBUG("updating tgt doc ~p", [TargetLog]), + {TgtRevPos,TgtRevId} = + update_doc(Target, TargetLog#doc{body=NewRepHistory}, []), + State#state{ + checkpoint_history = NewRepHistory, + source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, + target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} + }. + +commit_to_both(Source, Target) -> + % commit the src async + ParentPid = self(), + SrcCommitPid = spawn_link(fun() -> + ParentPid ! {self(), ensure_full_commit(Source)} end), + + % commit tgt sync + TargetStartTime = ensure_full_commit(Target), + + SourceStartTime = + receive + {SrcCommitPid, Timestamp} -> + Timestamp; + {'EXIT', SrcCommitPid, {http_request_failed, _}} -> + exit(replication_link_failure) + end, + {SourceStartTime, TargetStartTime}. + +ensure_full_commit(#http_db{} = Db) -> + Req = Db#http_db{ + resource = "_ensure_full_commit", + method = post, + body = true + }, + {ResultProps} = couch_rep_httpc:request(Req), + true = proplists:get_value(<<"ok">>, ResultProps), + proplists:get_value(<<"instance_start_time">>, ResultProps); +ensure_full_commit(Db) -> + {ok, StartTime} = couch_db:ensure_full_commit(Db), + StartTime. + +update_doc(#http_db{} = Db, #doc{id=DocId} = Doc, []) -> + Req = Db#http_db{ + resource = couch_util:url_encode(DocId), + method = put, + body = couch_doc:to_json_obj(Doc, [attachments]) + }, + {ResponseMembers} = couch_rep_httpc:request(Req), Rev = proplists:get_value(<<"rev">>, ResponseMembers), - {ok, couch_doc:parse_rev(Rev)}; + couch_doc:parse_rev(Rev); update_doc(Db, Doc, Options) -> - couch_db:update_doc(Db, Doc, Options). - -update_docs(_, [], _, _) -> - {ok, []}; -update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) -> - JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], - ErrorsJson = - do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth, - {[{new_edits, false}, {docs, JsonDocs}]}), - ErrorsList = - lists:map( - fun({Props}) -> - Id = proplists:get_value(<<"id">>, Props), - Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)), - ErrId = couch_util:to_existing_atom( - proplists:get_value(<<"error">>, Props)), - Reason = proplists:get_value(<<"reason">>, Props), - Error = {ErrId, Reason}, - {{Id, Rev}, Error} - end, ErrorsJson), - {ok, ErrorsList}; -update_docs(Db, Docs, Options, UpdateType) -> - couch_db:update_docs(Db, Docs, Options, UpdateType). - -up_to_date(#old_http_db{}, _Seq) -> + {ok, Result} = couch_db:update_doc(Db, Doc, Options), + Result. + +up_to_date(#http_db{}, _Seq) -> true; up_to_date(Source, Seq) -> {ok, NewDb} = couch_db:open(Source#db.name, []), T = NewDb#db.update_seq == Seq, couch_db:close(NewDb), T. - -- cgit v1.2.3