diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-08-11 15:22:33 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-11 17:39:37 -0400 |
commit | 81bdbed444df2cbcf3cdb32f7d4a74019de06454 (patch) | |
tree | eade7d0d9bb4cac01b55fd8642adfe0f7da35161 /apps/couch/src/couch_rep.erl | |
parent | cc1910f73fbd20c5ffc94bd61e7701d7f5e4c92a (diff) |
reorganize couch .erl and driver code into rebar layout
Diffstat (limited to 'apps/couch/src/couch_rep.erl')
-rw-r--r-- | apps/couch/src/couch_rep.erl | 748 |
1 files changed, 748 insertions, 0 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl new file mode 100644 index 00000000..65573e8c --- /dev/null +++ b/apps/couch/src/couch_rep.erl @@ -0,0 +1,748 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([replicate/2, checkpoint/1]). + +-include("couch_db.hrl"). + +-record(state, { + changes_feed, + missing_revs, + reader, + writer, + + source, + target, + continuous, + create_target, + init_args, + checkpoint_scheduled = nil, + + start_seq, + history, + source_log, + target_log, + rep_starttime, + src_starttime, + tgt_starttime, + checkpoint_history = nil, + + listeners = [], + complete = false, + committed_seq = 0, + + stats = nil, + doc_ids = nil +}). + +%% convenience function to do a simple replication from the shell +replicate(Source, Target) when is_list(Source) -> + replicate(?l2b(Source), Target); +replicate(Source, Target) when is_binary(Source), is_list(Target) -> + replicate(Source, ?l2b(Target)); +replicate(Source, Target) when is_binary(Source), is_binary(Target) -> + replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{}); + +%% function handling POST to _replicate +replicate({Props}=PostBody, UserCtx) -> + {BaseId, Extension} = make_replication_id(PostBody, UserCtx), + Replicator = {BaseId ++ Extension, + {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, + temporary, + 1, + worker, + [?MODULE] + }, + + case couch_util:get_value(<<"cancel">>, Props, false) of + true -> + case supervisor:terminate_child(couch_rep_sup, BaseId ++ Extension) of + {error, not_found} -> + {error, not_found}; + ok -> + ok = supervisor:delete_child(couch_rep_sup, BaseId ++ Extension), + {ok, {cancelled, ?l2b(BaseId)}} + end; + false -> + Server = start_replication_server(Replicator), + + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> + {ok, {continuous, ?l2b(BaseId)}}; + false -> + get_result(Server, PostBody, UserCtx) + end + end. + +checkpoint(Server) -> + gen_server:cast(Server, do_checkpoint). + +get_result(Server, PostBody, UserCtx) -> + try gen_server:call(Server, get_result, infinity) of + retry -> replicate(PostBody, UserCtx); + Else -> Else + catch + exit:{noproc, {gen_server, call, [Server, get_result , infinity]}} -> + %% oops, this replication just finished -- restart it. + replicate(PostBody, UserCtx); + exit:{normal, {gen_server, call, [Server, get_result , infinity]}} -> + %% we made the call during terminate + replicate(PostBody, UserCtx) + end. + +init(InitArgs) -> + try do_init(InitArgs) + catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end. + +do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> + process_flag(trap_exit, true), + + SourceProps = couch_util:get_value(<<"source">>, PostProps), + TargetProps = couch_util:get_value(<<"target">>, PostProps), + + DocIds = couch_util:get_value(<<"doc_ids">>, PostProps, nil), + Continuous = couch_util:get_value(<<"continuous">>, PostProps, false), + CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false), + + ProxyParams = parse_proxy_params( + couch_util:get_value(<<"proxy">>, PostProps, [])), + Source = open_db(SourceProps, UserCtx, ProxyParams), + Target = open_db(TargetProps, UserCtx, ProxyParams, CreateTarget), + + SourceInfo = dbinfo(Source), + TargetInfo = dbinfo(Target), + + case DocIds of + List when is_list(List) -> + % Fast replication using only a list of doc IDs to replicate. + % Replication sessions, checkpoints and logs are not created + % since the update sequence number of the source DB is not used + % for determining which documents are copied into the target DB. + SourceLog = nil, + TargetLog = nil, + + StartSeq = nil, + History = nil, + + ChangesFeed = nil, + MissingRevs = nil, + + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, DocIds, PostProps); + + _ -> + % Replication using the _changes API (DB sequence update numbers). + SourceLog = open_replication_log(Source, RepId), + TargetLog = open_replication_log(Target, RepId), + + {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog), + + {ok, ChangesFeed} = + couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps), + {ok, MissingRevs} = + couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps), + {ok, Reader} = + couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps) + end, + + {ok, Writer} = + couch_rep_writer:start_link(self(), Target, Reader, PostProps), + + Stats = ets:new(replication_stats, [set, private]), + ets:insert(Stats, {total_revs,0}), + ets:insert(Stats, {missing_revs, 0}), + ets:insert(Stats, {docs_read, 0}), + ets:insert(Stats, {docs_written, 0}), + ets:insert(Stats, {doc_write_failures, 0}), + + {ShortId, _} = lists:split(6, RepId), + couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", + [ShortId, dbname(Source), dbname(Target)]), "Starting"), + + State = #state{ + changes_feed = ChangesFeed, + missing_revs = MissingRevs, + reader = Reader, + writer = Writer, + + source = Source, + target = Target, + continuous = Continuous, + create_target = CreateTarget, + init_args = InitArgs, + stats = Stats, + checkpoint_scheduled = nil, + + start_seq = StartSeq, + history = History, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = httpd_util:rfc1123_date(), + src_starttime = couch_util:get_value(instance_start_time, SourceInfo), + tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), + doc_ids = DocIds + }, + {ok, State}. + +handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> + {stop, normal, State#state{listeners=[From]}}; +handle_call(get_result, From, State) -> + Listeners = State#state.listeners, + {noreply, State#state{listeners=[From|Listeners]}}. + +handle_cast(do_checkpoint, State) -> + {noreply, do_checkpoint(State)}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({missing_revs_checkpoint, SourceSeq}, State) -> + couch_task_status:update("MR Processed source update #~p", [SourceSeq]), + {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; + +handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) + when SourceSeq > N -> + MissingRevs = State#state.missing_revs, + ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), + couch_task_status:update("W Processed source update #~p", [SourceSeq]), + {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; +handle_info({writer_checkpoint, _}, State) -> + {noreply, State}; + +handle_info({update_stats, Key, N}, State) -> + ets:update_counter(State#state.stats, Key, N), + {noreply, State}; + +handle_info({'DOWN', _, _, _, _}, State) -> + ?LOG_INFO("replication terminating because local DB is shutting down", []), + timer:cancel(State#state.checkpoint_scheduled), + {stop, shutdown, State}; + +handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) -> + case State#state.listeners of + [] -> + {noreply, State#state{complete = true}}; + _Else -> + {stop, normal, State} + end; + +handle_info({'EXIT', _, normal}, State) -> + {noreply, State}; +handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; + Err == target_error -> + ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]), + timer:cancel(State#state.checkpoint_scheduled), + {stop, shutdown, State}; +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}. + +terminate(normal, #state{checkpoint_scheduled=nil} = State) -> + do_terminate(State); + +terminate(normal, State) -> + timer:cancel(State#state.checkpoint_scheduled), + do_terminate(do_checkpoint(State)); + +terminate(Reason, State) -> + #state{ + listeners = Listeners, + source = Source, + target = Target, + stats = Stats + } = State, + [gen_server:reply(L, {error, Reason}) || L <- Listeners], + ets:delete(Stats), + close_db(Target), + close_db(Source). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +% internal funs + +start_replication_server(Replicator) -> + RepId = element(1, Replicator), + case supervisor:start_child(couch_rep_sup, Replicator) of + {ok, Pid} -> + ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), + Pid; + {error, already_present} -> + case supervisor:restart_child(couch_rep_sup, RepId) of + {ok, Pid} -> + ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), + Pid; + {error, running} -> + %% this error occurs if multiple replicators are racing + %% each other to start and somebody else won. Just grab + %% the Pid by calling start_child again. + {error, {already_started, Pid}} = + supervisor:start_child(couch_rep_sup, Replicator), + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), + Pid; + {error, {db_not_found, DbUrl}} -> + throw({db_not_found, <<"could not open ", DbUrl/binary>>}) + end; + {error, {already_started, Pid}} -> + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), + Pid; + {error, {{db_not_found, DbUrl}, _}} -> + throw({db_not_found, <<"could not open ", DbUrl/binary>>}) + end. + +compare_replication_logs(SrcDoc, TgtDoc) -> + #doc{body={RepRecProps}} = SrcDoc, + #doc{body={RepRecPropsTgt}} = TgtDoc, + case couch_util:get_value(<<"session_id">>, RepRecProps) == + couch_util:get_value(<<"session_id">>, RepRecPropsTgt) of + true -> + % if the records have the same session id, + % then we have a valid replication history + OldSeqNum = couch_util:get_value(<<"source_last_seq">>, RepRecProps, 0), + OldHistory = couch_util:get_value(<<"history">>, RepRecProps, []), + {OldSeqNum, OldHistory}; + false -> + SourceHistory = couch_util:get_value(<<"history">>, RepRecProps, []), + TargetHistory = couch_util:get_value(<<"history">>, RepRecPropsTgt, []), + ?LOG_INFO("Replication records differ. " + "Scanning histories to find a common ancestor.", []), + ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", + [RepRecProps, RepRecPropsTgt]), + compare_rep_history(SourceHistory, TargetHistory) + end. + +compare_rep_history(S, T) when S =:= [] orelse T =:= [] -> + ?LOG_INFO("no common ancestry -- performing full replication", []), + {0, []}; +compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) -> + SourceId = couch_util:get_value(<<"session_id">>, S), + case has_session_id(SourceId, Target) of + true -> + RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, S, 0), + ?LOG_INFO("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, SourceRest}; + false -> + TargetId = couch_util:get_value(<<"session_id">>, T), + case has_session_id(TargetId, SourceRest) of + true -> + RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, T, 0), + ?LOG_INFO("found a common replication record with source_seq ~p", + [RecordSeqNum]), + {RecordSeqNum, TargetRest}; + false -> + compare_rep_history(SourceRest, TargetRest) + end + end. + +close_db(#http_db{}) -> + ok; +close_db(Db) -> + couch_db:close(Db). + +dbname(#http_db{url = Url}) -> + strip_password(Url); +dbname(#db{name = Name}) -> + Name. + +strip_password(Url) -> + re:replace(Url, + "http(s)?://([^:]+):[^@]+@(.*)$", + "http\\1://\\2:*****@\\3", + [{return, list}]). + +dbinfo(#http_db{} = Db) -> + {DbProps} = couch_rep_httpc:request(Db), + [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]; +dbinfo(Db) -> + {ok, Info} = couch_db:get_db_info(Db), + Info. + +do_terminate(#state{doc_ids=DocIds} = State) when is_list(DocIds) -> + #state{ + listeners = Listeners, + rep_starttime = ReplicationStartTime, + stats = Stats + } = State, + + RepByDocsJson = {[ + {<<"start_time">>, ?l2b(ReplicationStartTime)}, + {<<"end_time">>, ?l2b(httpd_util:rfc1123_date())}, + {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, + ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + + terminate_cleanup(State), + [gen_server:reply(L, {ok, RepByDocsJson}) || L <- lists:reverse(Listeners)]; + +do_terminate(State) -> + #state{ + checkpoint_history = CheckpointHistory, + committed_seq = NewSeq, + listeners = Listeners, + source = Source, + continuous = Continuous, + source_log = #doc{body={OldHistory}} + } = State, + + NewRepHistory = case CheckpointHistory of + nil -> + {[{<<"no_changes">>, true} | OldHistory]}; + _Else -> + CheckpointHistory + end, + + %% reply to original requester + OtherListeners = case Continuous of + true -> + []; % continuous replications have no listeners + _ -> + [Original|Rest] = lists:reverse(Listeners), + gen_server:reply(Original, {ok, NewRepHistory}), + Rest + end, + + %% maybe trigger another replication. If this replicator uses a local + %% source Db, changes to that Db since we started will not be included in + %% this pass. + case up_to_date(Source, NewSeq) of + true -> + [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners]; + false -> + [gen_server:reply(R, retry) || R <- OtherListeners] + end, + terminate_cleanup(State). + +terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) -> + couch_task_status:update("Finishing"), + close_db(Target), + close_db(Source), + ets:delete(Stats). + +has_session_id(_SessionId, []) -> + false; +has_session_id(SessionId, [{Props} | Rest]) -> + case couch_util:get_value(<<"session_id">>, Props, nil) of + SessionId -> + true; + _Else -> + has_session_id(SessionId, Rest) + end. + +maybe_append_options(Options, Props) -> + lists:foldl(fun(Option, Acc) -> + Acc ++ + case couch_util:get_value(Option, Props, false) of + true -> + "+" ++ ?b2l(Option); + false -> + "" + end + end, [], Options). + +make_replication_id({Props}, UserCtx) -> + %% funky algorithm to preserve backwards compatibility + {ok, HostName} = inet:gethostname(), + % Port = mochiweb_socket_server:get(couch_httpd, port), + Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), + Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), + Base = [HostName, Src, Tgt] ++ + case couch_util:get_value(<<"filter">>, Props) of + undefined -> + case couch_util:get_value(<<"doc_ids">>, Props) of + undefined -> + []; + DocIds -> + [DocIds] + end; + Filter -> + [Filter, couch_util:get_value(<<"query_params">>, Props, {[]})] + end, + Extension = maybe_append_options( + [<<"continuous">>, <<"create_target">>], Props), + {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}. + +maybe_add_trailing_slash(Url) -> + re:replace(Url, "[^/]$", "&/", [{return, list}]). + +get_rep_endpoint(_UserCtx, {Props}) -> + Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), + {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), + {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), + case couch_util:get_value(<<"oauth">>, Auth) of + undefined -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; + {OAuth} -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} + end; +get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> + {remote, maybe_add_trailing_slash(Url), []}; +get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) -> + {remote, maybe_add_trailing_slash(Url), []}; +get_rep_endpoint(UserCtx, <<DbName/binary>>) -> + {local, DbName, UserCtx}. + +open_replication_log(#http_db{}=Db, RepId) -> + DocId = ?LOCAL_DOC_PREFIX ++ RepId, + Req = Db#http_db{resource=couch_util:url_encode(DocId)}, + case couch_rep_httpc:request(Req) of + {[{<<"error">>, _}, {<<"reason">>, _}]} -> + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), + #doc{id=?l2b(DocId)}; + Doc -> + ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), + couch_doc:from_json_obj(Doc) + end; +open_replication_log(Db, RepId) -> + DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), + case couch_db:open_doc(Db, DocId, []) of + {ok, Doc} -> + ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), + Doc; + _ -> + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), + #doc{id=DocId} + end. + +open_db(Props, UserCtx, ProxyParams) -> + open_db(Props, UserCtx, ProxyParams, false). + +open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> + Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), + {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}), + {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), + Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], + DefaultHeaders = (#http_db{})#http_db.headers, + Db1 = #http_db{ + url = Url, + auth = AuthProps, + headers = lists:ukeymerge(1, Headers, DefaultHeaders) + }, + Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams}, + couch_rep_httpc:db_exists(Db, CreateTarget); +open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> + open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); +open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> + open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); +open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> + case CreateTarget of + true -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); + false -> ok + end, + + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + couch_db:monitor(Db), + Db; + {not_found, no_db_file} -> throw({db_not_found, DbName}) + end. + +schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> + Server = self(), + case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of + {ok, TRef} -> + State#state{checkpoint_scheduled = TRef}; + Error -> + ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]), + State + end; +schedule_checkpoint(State) -> + State. + +do_checkpoint(State) -> + #state{ + source = Source, + target = Target, + committed_seq = NewSeqNum, + start_seq = StartSeqNum, + history = OldHistory, + source_log = SourceLog, + target_log = TargetLog, + rep_starttime = ReplicationStartTime, + src_starttime = SrcInstanceStartTime, + tgt_starttime = TgtInstanceStartTime, + stats = Stats + } = State, + case commit_to_both(Source, Target, NewSeqNum) of + {SrcInstanceStartTime, TgtInstanceStartTime} -> + ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", + [dbname(Source), dbname(Target), NewSeqNum]), + SessionId = couch_uuids:random(), + NewHistoryEntry = {[ + {<<"session_id">>, SessionId}, + {<<"start_time">>, list_to_binary(ReplicationStartTime)}, + {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())}, + {<<"start_last_seq">>, StartSeqNum}, + {<<"end_last_seq">>, NewSeqNum}, + {<<"recorded_seq">>, NewSeqNum}, + {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)}, + {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)}, + {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)}, + {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}, + {<<"doc_write_failures">>, + ets:lookup_element(Stats, doc_write_failures, 2)} + ]}, + % limit history to 50 entries + NewRepHistory = {[ + {<<"session_id">>, SessionId}, + {<<"source_last_seq">>, NewSeqNum}, + {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} + ]}, + + try + {SrcRevPos,SrcRevId} = + update_local_doc(Source, SourceLog#doc{body=NewRepHistory}), + {TgtRevPos,TgtRevId} = + update_local_doc(Target, TargetLog#doc{body=NewRepHistory}), + State#state{ + checkpoint_scheduled = nil, + checkpoint_history = NewRepHistory, + source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}}, + target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}} + } + catch throw:conflict -> + ?LOG_ERROR("checkpoint failure: conflict (are you replicating to " + "yourself?)", []), + State + end; + _Else -> + ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", + [dbname(Source), dbname(Target)]), + #state{ + changes_feed = CF, + missing_revs = MR, + reader = Reader, + writer = Writer + } = State, + Pids = [Writer, Reader, MR, CF], + [unlink(Pid) || Pid <- Pids], + [exit(Pid, shutdown) || Pid <- Pids], + close_db(Target), + close_db(Source), + {ok, NewState} = init(State#state.init_args), + NewState#state{listeners=State#state.listeners} + end. + +commit_to_both(Source, Target, RequiredSeq) -> + % commit the src async + ParentPid = self(), + SrcCommitPid = spawn_link(fun() -> + ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end), + + % commit tgt sync + TargetStartTime = ensure_full_commit(Target), + + SourceStartTime = + receive + {SrcCommitPid, Timestamp} -> + Timestamp; + {'EXIT', SrcCommitPid, {http_request_failed, _}} -> + exit(replication_link_failure) + end, + {SourceStartTime, TargetStartTime}. + +ensure_full_commit(#http_db{headers = Headers} = Target) -> + Req = Target#http_db{ + resource = "_ensure_full_commit", + method = post, + headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers) + }, + {ResultProps} = couch_rep_httpc:request(Req), + true = couch_util:get_value(<<"ok">>, ResultProps), + couch_util:get_value(<<"instance_start_time">>, ResultProps); +ensure_full_commit(Target) -> + {ok, NewDb} = couch_db:open_int(Target#db.name, []), + UpdateSeq = couch_db:get_update_seq(Target), + CommitSeq = couch_db:get_committed_update_seq(NewDb), + InstanceStartTime = NewDb#db.instance_start_time, + couch_db:close(NewDb), + if UpdateSeq > CommitSeq -> + ?LOG_DEBUG("target needs a full commit: update ~p commit ~p", + [UpdateSeq, CommitSeq]), + {ok, DbStartTime} = couch_db:ensure_full_commit(Target), + DbStartTime; + true -> + ?LOG_DEBUG("target doesn't need a full commit", []), + InstanceStartTime + end. + +ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) -> + Req = Source#http_db{ + resource = "_ensure_full_commit", + method = post, + qs = [{seq, RequiredSeq}], + headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers) + }, + {ResultProps} = couch_rep_httpc:request(Req), + case couch_util:get_value(<<"ok">>, ResultProps) of + true -> + couch_util:get_value(<<"instance_start_time">>, ResultProps); + undefined -> nil end; +ensure_full_commit(Source, RequiredSeq) -> + {ok, NewDb} = couch_db:open_int(Source#db.name, []), + CommitSeq = couch_db:get_committed_update_seq(NewDb), + InstanceStartTime = NewDb#db.instance_start_time, + couch_db:close(NewDb), + if RequiredSeq > CommitSeq -> + ?LOG_DEBUG("source needs a full commit: required ~p committed ~p", + [RequiredSeq, CommitSeq]), + {ok, DbStartTime} = couch_db:ensure_full_commit(Source), + DbStartTime; + true -> + ?LOG_DEBUG("source doesn't need a full commit", []), + InstanceStartTime + end. + +update_local_doc(#http_db{} = Db, #doc{id=DocId} = Doc) -> + Req = Db#http_db{ + resource = couch_util:url_encode(DocId), + method = put, + body = couch_doc:to_json_obj(Doc, [attachments]), + headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers] + }, + {ResponseMembers} = couch_rep_httpc:request(Req), + Rev = couch_util:get_value(<<"rev">>, ResponseMembers), + couch_doc:parse_rev(Rev); +update_local_doc(Db, Doc) -> + {ok, Result} = couch_db:update_doc(Db, Doc, [delay_commit]), + Result. + +up_to_date(#http_db{}, _Seq) -> + true; +up_to_date(Source, Seq) -> + {ok, NewDb} = couch_db:open_int(Source#db.name, []), + T = NewDb#db.update_seq == Seq, + couch_db:close(NewDb), + T. + +parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> + parse_proxy_params(?b2l(ProxyUrl)); +parse_proxy_params([]) -> + []; +parse_proxy_params(ProxyUrl) -> + {url, _, Base, Port, User, Passwd, _Path, _Proto} = + ibrowse_lib:parse_url(ProxyUrl), + [{proxy_host, Base}, {proxy_port, Port}] ++ + case is_list(User) andalso is_list(Passwd) of + false -> + []; + true -> + [{proxy_user, User}, {proxy_password, Passwd}] + end. |