% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_rep). -behaviour(gen_server). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([replicate/2, checkpoint/1, start_link/3]). -include("couch_db.hrl"). -record(state, { changes_feed, missing_revs, reader, writer, source, target, continuous, create_target, init_args, checkpoint_scheduled = nil, start_seq, history, source_log, target_log, rep_starttime, src_starttime, tgt_starttime, checkpoint_history = nil, listeners = [], complete = false, committed_seq = 0, stats = nil, doc_ids = nil }). start_link(Id, PostBody, UserCtx) -> gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []). %% convenience function to do a simple replication from the shell replicate(Source, Target) when is_list(Source) -> replicate(?l2b(Source), Target); replicate(Source, Target) when is_binary(Source), is_list(Target) -> replicate(Source, ?l2b(Target)); replicate(Source, Target) when is_binary(Source), is_binary(Target) -> replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{}); %% function handling POST to _replicate replicate({Props}=PostBody, UserCtx) -> {BaseId, Extension} = make_replication_id(PostBody, UserCtx), Replicator = {BaseId ++ Extension, {?MODULE, start_link, [BaseId, PostBody, UserCtx]}, temporary, 1, worker, [?MODULE] }, case couch_util:get_value(<<"cancel">>, Props, false) of true -> case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of {error, not_found} -> {error, not_found}; ok -> ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension), {ok, {cancelled, ?l2b(BaseId)}} end; false -> Server = start_replication_server(Replicator), Continuous = couch_util:get_value(<<"continuous">>, Props, false), Async = couch_util:get_value(<<"async">>, Props, false), case {Continuous, Async} of {true, _} -> {ok, {continuous, ?l2b(BaseId)}}; {_, true} -> spawn(fun() -> get_result(Server, PostBody, UserCtx) end), Server; _ -> get_result(Server, PostBody, UserCtx) end end. checkpoint(Server) -> gen_server:cast(Server, do_checkpoint). get_result(Server, PostBody, UserCtx) -> try gen_server:call(Server, get_result, infinity) of retry -> replicate(PostBody, UserCtx); Else -> Else catch exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> %% oops, this replication just finished -- restart it. replicate(PostBody, UserCtx); exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> %% we made the call during terminate replicate(PostBody, UserCtx) end. init(InitArgs) -> try do_init(InitArgs) catch _:Error -> {stop, Error} end. do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), SourceProps = couch_util:get_value(<<"source">>, PostProps), TargetProps = couch_util:get_value(<<"target">>, PostProps), DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil), Continuous = couch_util:get_value(<<"continuous">>, PostProps, false), CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false), ProxyParams = parse_proxy_params( couch_util:get_value(<<"proxy">>, PostProps, [])), Source = open_db(SourceProps, UserCtx, ProxyParams), Target = open_db(TargetProps, UserCtx, ProxyParams, CreateTarget), SourceInfo = dbinfo(Source), TargetInfo = dbinfo(Target), case DocIds of List when is_list(List) -> % Fast replication using only a list of doc IDs to replicate. % Replication sessions, checkpoints and logs are not created % since the update sequence number of the source DB is not used % for determining which documents are copied into the target DB. SourceLog = nil, TargetLog = nil, StartSeq = nil, History = nil, ChangesFeed = nil, MissingRevs = nil, {ok, Reader} = couch_rep_reader:start_link(self(), Source, DocIds, PostProps); _ -> % Replication using the _changes API (DB sequence update numbers). SourceLog = open_replication_log(Source, RepId), TargetLog = open_replication_log(Target, RepId), {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), {ok, ChangesFeed} = couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), {ok, MissingRevs} = couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), {ok, Reader} = couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps) end, {ok, Writer} = couch_rep_writer:start_link(self(), Target, Reader, PostProps), Stats = ets:new(replication_stats, [set, private]), ets:insert(Stats, {total_revs,0}), ets:insert(Stats, {missing_revs, 0}), ets:insert(Stats, {docs_read, 0}), ets:insert(Stats, {docs_written, 0}), ets:insert(Stats, {doc_write_failures, 0}), {ShortId, _} = lists:split(6, RepId), couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", [ShortId, dbname(Source), dbname(Target)]), "Starting"), State = #state{ changes_feed = ChangesFeed, missing_revs = MissingRevs, reader = Reader, writer = Writer, source = Source, target = Target, continuous = Continuous, create_target = CreateTarget, init_args = InitArgs, stats = Stats, checkpoint_scheduled = nil, start_seq = StartSeq, history = History, source_log = SourceLog, target_log = TargetLog, rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), doc_ids = DocIds }, {ok, State}. handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> {stop, normal, State#state{listeners=[From]}}; handle_call(get_result, From, State) -> Listeners = State#state.listeners, {noreply, State#state{listeners=[From|Listeners]}}. handle_cast(do_checkpoint, State) -> {noreply, do_checkpoint(State)}; handle_cast(_Msg, State) -> {noreply, State}. handle_info({missing_revs_checkpoint, SourceSeq}, State) -> couch_task_status:update("MR Processed source update #~p of ~p", [SourceSeq, seqnum(State#state.source)]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), couch_task_status:update("W Processed source update #~p of ~p", [SourceSeq, seqnum(State#state.source)]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, _}, State) -> {noreply, State}; handle_info({update_stats, Key, N}, State) -> ets:update_counter(State#state.stats, Key, N), {noreply, State}; handle_info({'DOWN', _, _, Pid, _}, State) -> Me = node(), case erlang:node(Pid) of Me -> ?LOG_INFO("replication terminating - local DB is shutting down", []); Node -> ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node]) end, timer:cancel(State#state.checkpoint_scheduled), {stop, shutdown, State}; handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) -> case State#state.listeners of [] -> {noreply, State#state{complete = true}}; _Else -> {stop, normal, State} end; handle_info({'EXIT', _, normal}, State) -> {noreply, State}; handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; Err == target_error -> ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]), timer:cancel(State#state.checkpoint_scheduled), {stop, shutdown, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. terminate(normal, #state{checkpoint_scheduled=nil} = State) -> do_terminate(State); terminate(normal, State) -> timer:cancel(State#state.checkpoint_scheduled), do_terminate(do_checkpoint(State)); terminate(Reason, State) -> #state{ listeners = Listeners, source = Source, target = Target, stats = Stats } = State, [gen_server:reply(L, {error, Reason}) || L <- Listeners], ets:delete(Stats), close_db(Target), close_db(Source). code_change(_OldVsn, State, _Extra) -> {ok, State}. % internal funs start_replication_server(Replicator) -> start_replication_server(Replicator, fun start_child/1). start_replication_server(Replicator, StartFun) -> case StartFun(Replicator) of {ok, Pid} -> Pid; {error, already_present} -> start_replication_server(Replicator, fun restart_child/1); {error, {already_started, Pid}} -> Pid; {error, running} -> Children = supervisor:which_children(couch_rep_sup), {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children), Pid; % sadly both seem to be needed. I don't know why. {error, {{db_not_found, DbUrl}, _}} -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); {error, {db_not_found, DbUrl}} -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); {error, {node_not_connected, Node}} -> throw({node_not_connected, Node}) end. start_child(Replicator) -> supervisor:start_child(couch_rep_sup, Replicator). restart_child(Replicator) -> supervisor:restart_child(couch_rep_sup, element(1, Replicator)). compare_replication_logs(SrcDoc, TgtDoc) -> #doc{body={RepRecProps}} = SrcDoc, #doc{body={RepRecPropsTgt}} = TgtDoc, case couch_util:get_value(<<"session_id">>, RepRecProps) == couch_util:get_value(<<"session_id">>, RepRecPropsTgt) of true -> % if the records have the same session id, % then we have a valid replication history OldSeqNum = couch_util:get_value(<<"source_last_seq">>, RepRecProps, 0), OldHistory = couch_util:get_value(<<"history">>, RepRecProps, []), {OldSeqNum, OldHistory}; false -> SourceHistory = couch_util:get_value(<<"history">>, RepRecProps, []), TargetHistory = couch_util:get_value(<<"history">>, RepRecPropsTgt, []), ?LOG_INFO("Replication records differ. " "Scanning histories to find a common ancestor.", []), ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", [RepRecProps, RepRecPropsTgt]), compare_rep_history(SourceHistory, TargetHistory) end. compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> ?LOG_INFO("no common ancestry -- performing full replication", []), {0, []}; compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) -> SourceId = couch_util:get_value(<<"session_id">>, S), case has_session_id(SourceId, Target) of true -> RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, S, 0), ?LOG_INFO("found a common replication record with source_seq ~p", [RecordSeqNum]), {RecordSeqNum, SourceRest}; false -> TargetId = couch_util:get_value(<<"session_id">>, T), case has_session_id(TargetId, SourceRest) of true -> RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, T, 0), ?LOG_INFO("found a common replication record with source_seq ~p", [RecordSeqNum]), {RecordSeqNum, TargetRest}; false -> compare_rep_history(SourceRest, TargetRest) end end. close_db(#http_db{}) -> ok; close_db(Db) -> couch_db:close(Db). dbname(#http_db{url = Url}) -> strip_password(Url); dbname(#db{name = Name, main_pid = MainPid}) -> ?l2b([Name, " (", pid_to_list(MainPid), ")"]). strip_password(Url) -> re:replace(Url, "http(s)?://([^:]+):[^@]+@(.*)$", "http\\1://\\2:*****@\\3", [{return, list}]). dbinfo(#http_db{} = Db) -> {DbProps} = couch_rep_httpc:request(Db), [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]; dbinfo(Db) -> {ok, Info} = couch_db:get_db_info(Db), Info. do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) -> #state{ listeners = Listeners, rep_starttime = ReplicationStartTime, stats = Stats } = State, RepByDocsJson = {[ {<<"start_time">>, ?l2b(ReplicationStartTime)}, {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)} ]}, terminate_cleanup(State), [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)]; do_terminate(State) -> #state{ checkpoint_history = CheckpointHistory, committed_seq = NewSeq, listeners = Listeners, source = Source, continuous = Continuous, source_log = #doc{body={OldHistory}} } = State, NewRepHistory = case CheckpointHistory of nil -> {[{<<"no_changes">>, true} | OldHistory]}; _Else -> CheckpointHistory end, %% reply to original requester OtherListeners = case Continuous of true -> []; % continuous replications have no listeners _ -> [Original|Rest] = lists:reverse(Listeners), gen_server:reply(Original, {ok, NewRepHistory}), Rest end, %% maybe trigger another replication. If this replicator uses a local %% source Db, changes to that Db since we started will not be included in %% this pass. case up_to_date(Source, NewSeq) of true -> [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners]; false -> [gen_server:reply(R, retry) || R <- OtherListeners] end, terminate_cleanup(State). terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) -> couch_task_status:update("Finishing"), close_db(Target), close_db(Source), ets:delete(Stats). has_session_id(_SessionId, []) -> false; has_session_id(SessionId, [{Props} | Rest]) -> case couch_util:get_value(<<"session_id">>, Props, nil) of SessionId -> true; _Else -> has_session_id(SessionId, Rest) end. maybe_append_options(Options, Props) -> lists:foldl(fun(Option, Acc) -> Acc ++ case couch_util:get_value(Option, Props, false) of true -> "+" ++ ?b2l(Option); false -> "" end end, [], Options). make_replication_id({Props}, UserCtx) -> %% funky algorithm to preserve backwards compatibility case couch_util:get_value(<<"use_hostname">>, Props, false) of true -> {ok, HostName} = inet:gethostname(); false -> HostName = couch_config:get("replication", "hostname", "cloudant.com") end, % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), Base = [HostName, Src, Tgt] ++ case couch_util:get_value(<<"filter">>, Props) of undefined -> case couch_util:get_value(<<"doc_ids">>, Props) of undefined -> []; DocIds -> [DocIds] end; Filter -> [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})] end, Extension = maybe_append_options( [<<"continuous">>, <<"create_target">>], Props), {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}. maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). get_rep_endpoint(UserCtx, {Props}) -> case couch_util:get_value(<<"url">>, Props) of undefined -> Node = couch_util:get_value(<<"node">>, Props), Name = couch_util:get_value(<<"name">>, Props), {Node, Name, UserCtx}; RawUrl -> Url = maybe_add_trailing_slash(RawUrl), {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), case couch_util:get_value(<<"oauth">>, Auth) of undefined -> {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; {OAuth} -> {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} end end; get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> {remote, maybe_add_trailing_slash(Url), []}; get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> {remote, maybe_add_trailing_slash(Url), []}; get_rep_endpoint(UserCtx, <>) -> {local, DbName, UserCtx}. open_replication_log(#http_db{}=Db, RepId) -> DocId = ?LOCAL_DOC_PREFIX ++ RepId, Req = Db#http_db{resource=couch_util:url_encode(DocId)}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), #doc{id=?l2b(DocId)}; Doc -> % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), couch_doc:from_json_obj(Doc) end; open_replication_log(Db, RepId) -> DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), case couch_db:open_doc(Db, DocId, []) of {ok, Doc} -> % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), Doc; _ -> % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), #doc{id=DocId} end. open_db(Props, UserCtx, ProxyParams) -> open_db(Props, UserCtx, ProxyParams, false). open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) -> open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) -> open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); open_db({Props}, UserCtx, ProxyParams, Create) -> case couch_util:get_value(<<"url">>, Props) of undefined -> Node = couch_util:get_value(<<"node">>, Props, node()), DbName = couch_util:get_value(<<"name">>, Props), open_local_db(Node, DbName, UserCtx, Create); _Url -> open_remote_db({Props}, ProxyParams, Create) end; open_db(<>, UserCtx, _ProxyParams, Create) -> open_local_db(node(), DbName, UserCtx, Create). open_remote_db({Props}, ProxyParams, CreateTarget) -> Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}), {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], DefaultHeaders = (#http_db{})#http_db.headers, Db1 = #http_db{ url = Url, auth = AuthProps, headers = lists:ukeymerge(1, Headers, DefaultHeaders) }, Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams}, couch_rep_httpc:db_exists(Db, CreateTarget). open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) -> try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create) catch error:badarg -> ?LOG_ERROR("unknown replication node ~s", [Node]), throw({node_not_connected, Node}) end; open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) -> case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of {ok, #db{} = Db} -> couch_db:monitor(Db), Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)}; {ok, MainPid} when is_pid(MainPid) -> {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx), couch_db:monitor(Db), Db; {not_found, no_db_file} when Create =:= false-> throw({db_not_found, DbName}); {not_found, no_db_file} -> ok = couch_httpd:verify_is_server_admin(UserCtx), couch_server:create(DbName, [{user_ctx, UserCtx}]); {'EXIT', {{nodedown, Node}, _Stack}} -> throw({node_not_connected, couch_util:to_binary(Node)}); {'EXIT', {noproc, {gen_server,call,_}}} -> timer:sleep(1000), throw({noproc, couch_server, Node}) end. schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> Server = self(), case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of {ok, TRef} -> State#state{checkpoint_scheduled = TRef}; Error -> ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]), State end; schedule_checkpoint(State) -> State. do_checkpoint(State) -> #state{ source = Source, target = Target, committed_seq = NewSeqNum, start_seq = StartSeqNum, history = OldHistory, source_log = SourceLog, target_log = TargetLog, rep_starttime = ReplicationStartTime, src_starttime = SrcInstanceStartTime, tgt_starttime = TgtInstanceStartTime, stats = Stats } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p" " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]), SessionId = couch_uuids:new(), TargetNode = case Target of #db{main_pid=MainPid} -> erlang:node(MainPid); _ -> http end, NewHistoryEntry = {[ {<<"session_id">>, SessionId}, {<<"start_time">>, list_to_binary(ReplicationStartTime)}, {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, {<<"start_last_seq">>, StartSeqNum}, {<<"end_last_seq">>, NewSeqNum}, {<<"recorded_seq">>, NewSeqNum}, {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, {<<"doc_write_failures">>, ets:lookup_element(Stats, doc_write_failures, 2)} ]}, % limit history to 50 entries NewRepHistory = {[ {<<"session_id">>, SessionId}, {<<"source_last_seq">>, NewSeqNum}, {<<"target_node">>, TargetNode}, {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} ]}, try {SrcRevPos,SrcRevId} = update_local_doc(Source, SourceLog#doc{body=NewRepHistory}), {TgtRevPos,TgtRevId} = update_local_doc(Target, TargetLog#doc{body=NewRepHistory}), State#state{ checkpoint_scheduled = nil, checkpoint_history = NewRepHistory, source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} } catch throw:conflict -> ?LOG_ERROR("checkpoint failure: conflict (are you replicating to " "yourself?)", []), State end; Else -> ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [ {SrcInstanceStartTime, TgtInstanceStartTime}, Else]), ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", [dbname(Source), dbname(Target)]), #state{ changes_feed = CF, missing_revs = MR, reader = Reader, writer = Writer } = State, Pids = [Writer, Reader, MR, CF], [unlink(Pid) || Pid <- Pids], [exit(Pid, shutdown) || Pid <- Pids], close_db(Target), close_db(Source), {ok, NewState} = init(State#state.init_args), NewState#state{listeners=State#state.listeners} end. commit_to_both(Source, Target, RequiredSeq) -> % commit the src async ParentPid = self(), SrcCommitPid = spawn_link(fun() -> ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end), % commit tgt sync TargetStartTime = ensure_full_commit(Target), SourceStartTime = receive {SrcCommitPid, Timestamp} -> Timestamp; {'EXIT', SrcCommitPid, {http_request_failed, _}} -> nil; {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} -> nil; % DB crashed, this should trigger a reboot {'EXIT', SrcCommitPid, Else} -> ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]), nil end, {SourceStartTime, TargetStartTime}. ensure_full_commit(#http_db{headers = Headers} = Target) -> Req = Target#http_db{ resource = "_ensure_full_commit", method = post, headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers) }, {ResultProps} = couch_rep_httpc:request(Req), true = couch_util:get_value(<<"ok">>, ResultProps), couch_util:get_value(<<"instance_start_time">>, ResultProps); ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) -> TargetNode = erlang:node(Pid), {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]), UpdateSeq = couch_db:get_update_seq(Target), CommitSeq = couch_db:get_committed_update_seq(NewDb), InstanceStartTime = NewDb#db.instance_start_time, catch couch_db:close(NewDb), if UpdateSeq > CommitSeq -> ?LOG_DEBUG("target needs a full commit: update ~p commit ~p", [UpdateSeq, CommitSeq]), {ok, DbStartTime} = couch_db:ensure_full_commit(Target), DbStartTime; true -> ?LOG_DEBUG("target doesn't need a full commit", []), InstanceStartTime end. ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) -> Req = Source#http_db{ resource = "_ensure_full_commit", method = post, qs = [{seq, RequiredSeq}], headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers) }, {ResultProps} = couch_rep_httpc:request(Req), case couch_util:get_value(<<"ok">>, ResultProps) of true -> couch_util:get_value(<<"instance_start_time">>, ResultProps); undefined -> nil end; ensure_full_commit(Source, RequiredSeq) -> {ok, NewDb} = couch_db:open_int(Source#db.name, []), CommitSeq = couch_db:get_committed_update_seq(NewDb), InstanceStartTime = NewDb#db.instance_start_time, couch_db:close(NewDb), if RequiredSeq > CommitSeq -> ?LOG_DEBUG("source needs a full commit: required ~p committed ~p", [RequiredSeq, CommitSeq]), {ok, DbStartTime} = couch_db:ensure_full_commit(Source), DbStartTime; true -> ?LOG_DEBUG("source doesn't need a full commit", []), InstanceStartTime end. update_local_doc(#http_db{} = Db, #doc{id=DocId} = Doc) -> Req = Db#http_db{ resource = couch_util:url_encode(DocId), method = put, body = couch_doc:to_json_obj(Doc, [attachments]), headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers] }, {ResponseMembers} = couch_rep_httpc:request(Req), Rev = couch_util:get_value(<<"rev">>, ResponseMembers), couch_doc:parse_rev(Rev); update_local_doc(Db, Doc) -> {ok, Result} = couch_db:update_doc(Db, Doc, [delay_commit]), Result. up_to_date(#http_db{}, _Seq) -> true; up_to_date(Source, Seq) -> {ok, NewDb} = couch_db:open_int(Source#db.name, []), T = NewDb#db.update_seq == Seq, couch_db:close(NewDb), T. seqnum(#http_db{}) -> -1; seqnum(Db) -> Db#db.update_seq. parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> parse_proxy_params(?b2l(ProxyUrl)); parse_proxy_params([]) -> []; parse_proxy_params(ProxyUrl) -> {url, _, Base, Port, User, Passwd, _Path, _Proto} = ibrowse_lib:parse_url(ProxyUrl), [{proxy_host, Base}, {proxy_port, Port}] ++ case is_list(User) andalso is_list(Passwd) of false -> []; true -> [{proxy_user, User}, {proxy_password, Passwd}] end.