summaryrefslogtreecommitdiff
path: root/1.1.x/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2011-05-17 11:15:14 +0000
committerRobert Newson <rnewson@apache.org>2011-05-17 11:15:14 +0000
commite8e4b0d293021fe90326a85828f3cfb087bf18b7 (patch)
tree986f544eac623ec23b769b36828894f93a173aa3 /1.1.x/src/couchdb/couch_rep.erl
parentda6a5322b0b8084f434752060caa8be214c6f4fa (diff)
tagging 1.1.0
git-svn-id: https://svn.apache.org/repos/asf/couchdb/tags/1.1.0@1104149 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to '1.1.x/src/couchdb/couch_rep.erl')
-rw-r--r--1.1.x/src/couchdb/couch_rep.erl972
1 files changed, 972 insertions, 0 deletions
diff --git a/1.1.x/src/couchdb/couch_rep.erl b/1.1.x/src/couchdb/couch_rep.erl
new file mode 100644
index 00000000..5c9fbce6
--- /dev/null
+++ b/1.1.x/src/couchdb/couch_rep.erl
@@ -0,0 +1,972 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([replicate/2, checkpoint/1]).
+-export([ensure_rep_db_exists/0, make_replication_id/2]).
+-export([start_replication/3, end_replication/1, get_result/4]).
+-export([update_rep_doc/2]).
+
+-include("couch_db.hrl").
+-include("couch_js_functions.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-define(REP_ID_VERSION, 2).
+
+-record(state, {
+ changes_feed,
+ missing_revs,
+ reader,
+ writer,
+
+ source,
+ target,
+ continuous,
+ create_target,
+ init_args,
+ checkpoint_scheduled = nil,
+
+ start_seq,
+ history,
+ session_id,
+ source_log,
+ target_log,
+ rep_starttime,
+ src_starttime,
+ tgt_starttime,
+ checkpoint_history = nil,
+
+ listeners = [],
+ complete = false,
+ committed_seq = 0,
+
+ stats = nil,
+ rep_doc = nil,
+ source_db_update_notifier = nil,
+ target_db_update_notifier = nil
+}).
+
+%% convenience function to do a simple replication from the shell
+replicate(Source, Target) when is_list(Source) ->
+ replicate(?l2b(Source), Target);
+replicate(Source, Target) when is_binary(Source), is_list(Target) ->
+ replicate(Source, ?l2b(Target));
+replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
+ replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{});
+
+%% function handling POST to _replicate
+replicate({Props}=PostBody, UserCtx) ->
+ RepId = make_replication_id(PostBody, UserCtx),
+ case couch_util:get_value(<<"cancel">>, Props, false) of
+ true ->
+ end_replication(RepId);
+ false ->
+ Server = start_replication(PostBody, RepId, UserCtx),
+ get_result(Server, RepId, PostBody, UserCtx)
+ end.
+
+end_replication({BaseId, Extension}) ->
+ RepId = BaseId ++ Extension,
+ case supervisor:terminate_child(couch_rep_sup, RepId) of
+ {error, not_found} = R ->
+ R;
+ ok ->
+ ok = supervisor:delete_child(couch_rep_sup, RepId),
+ {ok, {cancelled, ?l2b(BaseId)}}
+ end.
+
+start_replication(RepDoc, {BaseId, Extension}, UserCtx) ->
+ Replicator = {
+ BaseId ++ Extension,
+ {gen_server, start_link,
+ [?MODULE, [BaseId, RepDoc, UserCtx], []]},
+ temporary,
+ 1,
+ worker,
+ [?MODULE]
+ },
+ start_replication_server(Replicator).
+
+checkpoint(Server) ->
+ gen_server:cast(Server, do_checkpoint).
+
+get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) ->
+ case couch_util:get_value(<<"continuous">>, Props, false) of
+ true ->
+ {ok, {continuous, ?l2b(BaseId)}};
+ false ->
+ try gen_server:call(Server, get_result, infinity) of
+ retry -> replicate(PostBody, UserCtx);
+ Else -> Else
+ catch
+ exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} ->
+ %% oops, this replication just finished -- restart it.
+ replicate(PostBody, UserCtx);
+ exit:{normal, {gen_server, call, [Server, get_result, infinity]}} ->
+ %% we made the call during terminate
+ replicate(PostBody, UserCtx)
+ end
+ end.
+
+init(InitArgs) ->
+ try
+ do_init(InitArgs)
+ catch
+ throw:Error ->
+ {stop, Error}
+ end.
+
+do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
+ process_flag(trap_exit, true),
+
+ SourceProps = couch_util:get_value(<<"source">>, PostProps),
+ TargetProps = couch_util:get_value(<<"target">>, PostProps),
+
+ Continuous = couch_util:get_value(<<"continuous">>, PostProps, false),
+ CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false),
+
+ ProxyParams = parse_proxy_params(
+ couch_util:get_value(<<"proxy">>, PostProps, [])),
+ Source = open_db(SourceProps, UserCtx, ProxyParams),
+ Target = open_db(TargetProps, UserCtx, ProxyParams, CreateTarget),
+
+ SourceInfo = dbinfo(Source),
+ TargetInfo = dbinfo(Target),
+
+ maybe_set_triggered(RepDoc, RepId),
+
+ [SourceLog, TargetLog] = find_replication_logs(
+ [Source, Target], RepId, {PostProps}, UserCtx),
+ {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
+
+ {ok, ChangesFeed} =
+ couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
+ {ok, MissingRevs} =
+ couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
+ {ok, Reader} =
+ couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
+ {ok, Writer} =
+ couch_rep_writer:start_link(self(), Target, Reader, PostProps),
+
+ Stats = ets:new(replication_stats, [set, private]),
+ ets:insert(Stats, {total_revs,0}),
+ ets:insert(Stats, {missing_revs, 0}),
+ ets:insert(Stats, {docs_read, 0}),
+ ets:insert(Stats, {docs_written, 0}),
+ ets:insert(Stats, {doc_write_failures, 0}),
+
+ {ShortId, _} = lists:split(6, RepId),
+ couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
+ [ShortId, dbname(Source), dbname(Target)]), "Starting"),
+
+ State = #state{
+ changes_feed = ChangesFeed,
+ missing_revs = MissingRevs,
+ reader = Reader,
+ writer = Writer,
+
+ source = Source,
+ target = Target,
+ continuous = Continuous,
+ create_target = CreateTarget,
+ init_args = InitArgs,
+ stats = Stats,
+ checkpoint_scheduled = nil,
+
+ start_seq = StartSeq,
+ history = History,
+ session_id = couch_uuids:random(),
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = httpd_util:rfc1123_date(),
+ src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
+ tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
+ rep_doc = RepDoc,
+ source_db_update_notifier = source_db_update_notifier(Source),
+ target_db_update_notifier = target_db_update_notifier(Target)
+ },
+ {ok, State}.
+
+handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
+ {stop, normal, State#state{listeners=[From]}};
+handle_call(get_result, From, State) ->
+ Listeners = State#state.listeners,
+ {noreply, State#state{listeners=[From|Listeners]}};
+
+handle_call(get_source_db, _From, #state{source = Source} = State) ->
+ {reply, {ok, Source}, State};
+
+handle_call(get_target_db, _From, #state{target = Target} = State) ->
+ {reply, {ok, Target}, State}.
+
+handle_cast(reopen_source_db, #state{source = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#state{source = NewSource}};
+
+handle_cast(reopen_target_db, #state{target = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#state{target = NewTarget}};
+
+handle_cast(do_checkpoint, State) ->
+ {noreply, do_checkpoint(State)};
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
+ couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
+ {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
+
+handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
+ when SourceSeq > N ->
+ MissingRevs = State#state.missing_revs,
+ ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
+ couch_task_status:update("W Processed source update #~p", [SourceSeq]),
+ {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
+handle_info({writer_checkpoint, _}, State) ->
+ {noreply, State};
+
+handle_info({update_stats, Key, N}, State) ->
+ ets:update_counter(State#state.stats, Key, N),
+ {noreply, State};
+
+handle_info({'DOWN', _, _, _, _}, State) ->
+ ?LOG_INFO("replication terminating because local DB is shutting down", []),
+ timer:cancel(State#state.checkpoint_scheduled),
+ {stop, shutdown, State};
+
+handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
+ case State#state.listeners of
+ [] ->
+ {noreply, State#state{complete = true}};
+ _Else ->
+ {stop, normal, State}
+ end;
+
+handle_info({'EXIT', _, normal}, State) ->
+ {noreply, State};
+handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
+ Err == target_error ->
+ ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]),
+ timer:cancel(State#state.checkpoint_scheduled),
+ {stop, shutdown, State};
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State}.
+
+terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
+ do_terminate(State),
+ update_rep_doc(
+ State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]);
+
+terminate(normal, State) ->
+ timer:cancel(State#state.checkpoint_scheduled),
+ do_terminate(do_checkpoint(State)),
+ update_rep_doc(
+ State#state.rep_doc, [{<<"_replication_state">>, <<"completed">>}]);
+
+terminate(shutdown, #state{listeners = Listeners} = State) ->
+ % continuous replication stopped
+ [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
+ terminate_cleanup(State);
+
+terminate(Reason, #state{listeners = Listeners} = State) ->
+ [gen_server:reply(L, {error, Reason}) || L <- Listeners],
+ terminate_cleanup(State),
+ update_rep_doc(
+ State#state.rep_doc, [{<<"_replication_state">>, <<"error">>}]).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+% internal funs
+
+start_replication_server(Replicator) ->
+ RepId = element(1, Replicator),
+ case supervisor:start_child(couch_rep_sup, Replicator) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
+ Pid;
+ {error, already_present} ->
+ case supervisor:restart_child(couch_rep_sup, RepId) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
+ Pid;
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, Replicator),
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+ Pid;
+ {error, {db_not_found, DbUrl}} ->
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>});
+ {error, {unauthorized, DbUrl}} ->
+ throw({unauthorized,
+ <<"unauthorized to access database ", DbUrl/binary>>});
+ {error, {'EXIT', {badarg,
+ [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
+ % Clause to deal with a change in the supervisor module introduced
+ % in R14B02. For more details consult the thread at:
+ % http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
+ _ = supervisor:delete_child(couch_rep_sup, RepId),
+ start_replication_server(Replicator)
+ end;
+ {error, {already_started, Pid}} ->
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
+ Pid;
+ {error, {{db_not_found, DbUrl}, _}} ->
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>});
+ {error, {{unauthorized, DbUrl}, _}} ->
+ throw({unauthorized,
+ <<"unauthorized to access database ", DbUrl/binary>>})
+ end.
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
+ case couch_util:get_value(<<"session_id">>, RepRecProps) ==
+ couch_util:get_value(<<"session_id">>, RepRecPropsTgt) of
+ true ->
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = couch_util:get_value(<<"source_last_seq">>, RepRecProps, 0),
+ OldHistory = couch_util:get_value(<<"history">>, RepRecProps, []),
+ {OldSeqNum, OldHistory};
+ false ->
+ SourceHistory = couch_util:get_value(<<"history">>, RepRecProps, []),
+ TargetHistory = couch_util:get_value(<<"history">>, RepRecPropsTgt, []),
+ ?LOG_INFO("Replication records differ. "
+ "Scanning histories to find a common ancestor.", []),
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ compare_rep_history(SourceHistory, TargetHistory)
+ end.
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+ ?LOG_INFO("no common ancestry -- performing full replication", []),
+ {0, []};
+compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) ->
+ SourceId = couch_util:get_value(<<"session_id">>, S),
+ case has_session_id(SourceId, Target) of
+ true ->
+ RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, S, 0),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, SourceRest};
+ false ->
+ TargetId = couch_util:get_value(<<"session_id">>, T),
+ case has_session_id(TargetId, SourceRest) of
+ true ->
+ RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, T, 0),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, TargetRest};
+ false ->
+ compare_rep_history(SourceRest, TargetRest)
+ end
+ end.
+
+close_db(#http_db{}) ->
+ ok;
+close_db(Db) ->
+ couch_db:close(Db).
+
+dbname(#http_db{url = Url}) ->
+ couch_util:url_strip_password(Url);
+dbname(#db{name = Name}) ->
+ Name.
+
+dbinfo(#http_db{} = Db) ->
+ {DbProps} = couch_rep_httpc:request(Db),
+ [{couch_util:to_existing_atom(K), V} || {K,V} <- DbProps];
+dbinfo(Db) ->
+ {ok, Info} = couch_db:get_db_info(Db),
+ Info.
+
+do_terminate(State) ->
+ #state{
+ checkpoint_history = CheckpointHistory,
+ committed_seq = NewSeq,
+ listeners = Listeners,
+ source = Source,
+ continuous = Continuous,
+ source_log = #doc{body={OldHistory}}
+ } = State,
+
+ NewRepHistory = case CheckpointHistory of
+ nil ->
+ {[{<<"no_changes">>, true} | OldHistory]};
+ _Else ->
+ CheckpointHistory
+ end,
+
+ %% reply to original requester
+ OtherListeners = case Continuous of
+ true ->
+ []; % continuous replications have no listeners
+ _ ->
+ [Original|Rest] = lists:reverse(Listeners),
+ gen_server:reply(Original, {ok, NewRepHistory}),
+ Rest
+ end,
+
+ %% maybe trigger another replication. If this replicator uses a local
+ %% source Db, changes to that Db since we started will not be included in
+ %% this pass.
+ case up_to_date(Source, NewSeq) of
+ true ->
+ [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
+ false ->
+ [gen_server:reply(R, retry) || R <- OtherListeners]
+ end,
+ couch_task_status:update("Finishing"),
+ terminate_cleanup(State).
+
+terminate_cleanup(State) ->
+ close_db(State#state.source),
+ close_db(State#state.target),
+ stop_db_update_notifier(State#state.source_db_update_notifier),
+ stop_db_update_notifier(State#state.target_db_update_notifier),
+ ets:delete(State#state.stats).
+
+stop_db_update_notifier(nil) ->
+ ok;
+stop_db_update_notifier(Notifier) ->
+ couch_db_update_notifier:stop(Notifier).
+
+has_session_id(_SessionId, []) ->
+ false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+ case couch_util:get_value(<<"session_id">>, Props, nil) of
+ SessionId ->
+ true;
+ _Else ->
+ has_session_id(SessionId, Rest)
+ end.
+
+maybe_append_options(Options, {Props}) ->
+ lists:foldl(fun(Option, Acc) ->
+ Acc ++
+ case couch_util:get_value(Option, Props, false) of
+ true ->
+ "+" ++ ?b2l(Option);
+ false ->
+ ""
+ end
+ end, [], Options).
+
+make_replication_id(RepProps, UserCtx) ->
+ BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION),
+ Extension = maybe_append_options(
+ [<<"continuous">>, <<"create_target">>], RepProps),
+ {BaseId, Extension}.
+
+% Versioned clauses for generating replication ids
+% If a change is made to how replications are identified
+% add a new clause and increase ?REP_ID_VERSION at the top
+make_replication_id({Props}, UserCtx, 2) ->
+ {ok, HostName} = inet:gethostname(),
+ Port = mochiweb_socket_server:get(couch_httpd, port),
+ Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
+ Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
+ maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx);
+make_replication_id({Props}, UserCtx, 1) ->
+ {ok, HostName} = inet:gethostname(),
+ Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
+ Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
+ maybe_append_filters({Props}, [HostName, Src, Tgt], UserCtx).
+
+maybe_append_filters({Props}, Base, UserCtx) ->
+ Base2 = Base ++
+ case couch_util:get_value(<<"filter">>, Props) of
+ undefined ->
+ case couch_util:get_value(<<"doc_ids">>, Props) of
+ undefined ->
+ [];
+ DocIds ->
+ [DocIds]
+ end;
+ Filter ->
+ [filter_code(Filter, Props, UserCtx),
+ couch_util:get_value(<<"query_params">>, Props, {[]})]
+ end,
+ couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
+
+filter_code(Filter, Props, UserCtx) ->
+ {match, [DDocName, FilterName]} =
+ re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]),
+ ProxyParams = parse_proxy_params(
+ couch_util:get_value(<<"proxy">>, Props, [])),
+ Source = open_db(
+ couch_util:get_value(<<"source">>, Props), UserCtx, ProxyParams),
+ try
+ {ok, DDoc} = open_doc(Source, <<"_design/", DDocName/binary>>),
+ Code = couch_util:get_nested_json_value(
+ DDoc#doc.body, [<<"filters">>, FilterName]),
+ re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}])
+ after
+ close_db(Source)
+ end.
+
+maybe_add_trailing_slash(Url) ->
+ re:replace(Url, "[^/]$", "&/", [{return, list}]).
+
+get_rep_endpoint(_UserCtx, {Props}) ->
+ Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
+ {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
+ {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
+ case couch_util:get_value(<<"oauth">>, Auth) of
+ undefined ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+ {OAuth} ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
+ end;
+get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
+ {remote, maybe_add_trailing_slash(Url), []};
+get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
+ {remote, maybe_add_trailing_slash(Url), []};
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+ {local, DbName, UserCtx}.
+
+find_replication_logs(DbList, RepId, RepProps, UserCtx) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ fold_replication_logs(DbList, ?REP_ID_VERSION,
+ LogId, LogId, RepProps, UserCtx, []).
+
+% Accumulate the replication logs
+% Falls back to older log document ids and migrates them
+fold_replication_logs([], _Vsn, _LogId, _NewId, _RepProps, _UserCtx, Acc) ->
+ lists:reverse(Acc);
+fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId,
+ RepProps, UserCtx, Acc) ->
+ case open_replication_log(Db, LogId) of
+ {error, not_found} when Vsn > 1 ->
+ OldRepId = make_replication_id(RepProps, UserCtx, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, RepProps, UserCtx, Acc);
+ {error, not_found} ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ RepProps, UserCtx, [#doc{id=NewId}|Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ RepProps, UserCtx, [Doc|Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id=NewId,body=Doc#doc.body},
+ fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
+ RepProps, UserCtx, [MigratedLog|Acc])
+ end.
+
+open_replication_log(Db, DocId) ->
+ case open_doc(Db, DocId) of
+ {ok, Doc} ->
+ ?LOG_DEBUG("found a replication log for ~s", [dbname(Db)]),
+ {ok, Doc};
+ _ ->
+ ?LOG_DEBUG("didn't find a replication log for ~s", [dbname(Db)]),
+ {error, not_found}
+ end.
+
+open_doc(#http_db{} = Db, DocId) ->
+ Req = Db#http_db{resource = couch_util:encode_doc_id(DocId)},
+ case couch_rep_httpc:request(Req) of
+ {[{<<"error">>, _}, {<<"reason">>, _}]} ->
+ {error, not_found};
+ Doc ->
+ {ok, couch_doc:from_json_obj(Doc)}
+ end;
+open_doc(Db, DocId) ->
+ couch_db:open_doc(Db, DocId).
+
+open_db(Props, UserCtx, ProxyParams) ->
+ open_db(Props, UserCtx, ProxyParams, false).
+
+open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
+ Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
+ {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
+ {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
+ Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders],
+ DefaultHeaders = (#http_db{})#http_db.headers,
+ Db1 = #http_db{
+ url = Url,
+ auth = AuthProps,
+ headers = lists:ukeymerge(1, Headers, DefaultHeaders)
+ },
+ Db = Db1#http_db{
+ options = Db1#http_db.options ++ ProxyParams ++
+ couch_rep_httpc:ssl_options(Db1)
+ },
+ couch_rep_httpc:db_exists(Db, CreateTarget);
+open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
+ open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
+open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
+ open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
+open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) ->
+ try
+ case CreateTarget of
+ true ->
+ ok = couch_httpd:verify_is_server_admin(UserCtx),
+ couch_server:create(DbName, [{user_ctx, UserCtx}]);
+ false ->
+ ok
+ end,
+
+ case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
+ {ok, Db} ->
+ couch_db:monitor(Db),
+ Db;
+ {not_found, no_db_file} ->
+ throw({db_not_found, DbName})
+ end
+ catch throw:{unauthorized, _} ->
+ throw({unauthorized, DbName})
+ end.
+
+schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
+ Server = self(),
+ case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of
+ {ok, TRef} ->
+ State#state{checkpoint_scheduled = TRef};
+ Error ->
+ ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]),
+ State
+ end;
+schedule_checkpoint(State) ->
+ State.
+
+do_checkpoint(State) ->
+ #state{
+ source = Source,
+ target = Target,
+ committed_seq = NewSeqNum,
+ start_seq = StartSeqNum,
+ history = OldHistory,
+ session_id = SessionId,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = ReplicationStartTime,
+ src_starttime = SrcInstanceStartTime,
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats,
+ rep_doc = {RepDoc}
+ } = State,
+ case commit_to_both(Source, Target, NewSeqNum) of
+ {SrcInstanceStartTime, TgtInstanceStartTime} ->
+ ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
+ [dbname(Source), dbname(Target), NewSeqNum]),
+ EndTime = ?l2b(httpd_util:rfc1123_date()),
+ StartTime = ?l2b(ReplicationStartTime),
+ DocsRead = ets:lookup_element(Stats, docs_read, 2),
+ DocsWritten = ets:lookup_element(Stats, docs_written, 2),
+ DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"start_last_seq">>, StartSeqNum},
+ {<<"end_last_seq">>, NewSeqNum},
+ {<<"recorded_seq">>, NewSeqNum},
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten},
+ {<<"doc_write_failures">>, DocWriteFailures}
+ ]},
+ BaseHistory = [
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, NewSeqNum},
+ {<<"replication_id_version">>, ?REP_ID_VERSION}
+ ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of
+ undefined ->
+ [];
+ DocIds when is_list(DocIds) ->
+ % backwards compatibility with the result of a replication by
+ % doc IDs in versions 0.11.x and 1.0.x
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>, DocsRead},
+ {<<"docs_written">>, DocsWritten},
+ {<<"doc_write_failures">>, DocWriteFailures}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++
+ [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
+
+ try
+ {SrcRevPos,SrcRevId} =
+ update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
+ {TgtRevPos,TgtRevId} =
+ update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
+ State#state{
+ checkpoint_scheduled = nil,
+ checkpoint_history = NewRepHistory,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ }
+ catch throw:conflict ->
+ ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
+ "yourself?)", []),
+ State
+ end;
+ _Else ->
+ ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
+ [dbname(Source), dbname(Target)]),
+ #state{
+ changes_feed = CF,
+ missing_revs = MR,
+ reader = Reader,
+ writer = Writer
+ } = State,
+ Pids = [Writer, Reader, MR, CF],
+ [unlink(Pid) || Pid <- Pids],
+ [exit(Pid, shutdown) || Pid <- Pids],
+ close_db(Target),
+ close_db(Source),
+ {ok, NewState} = init(State#state.init_args),
+ NewState#state{listeners=State#state.listeners}
+ end.
+
+commit_to_both(Source, Target, RequiredSeq) ->
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(fun() ->
+ ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end),
+
+ % commit tgt sync
+ TargetStartTime = ensure_full_commit(Target),
+
+ SourceStartTime =
+ receive
+ {SrcCommitPid, Timestamp} ->
+ Timestamp;
+ {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
+ exit(replication_link_failure)
+ end,
+ {SourceStartTime, TargetStartTime}.
+
+ensure_full_commit(#http_db{headers = Headers} = Target) ->
+ Headers1 = [
+ {"Content-Length", 0} |
+ couch_util:proplist_apply_field(
+ {"Content-Type", "application/json"}, Headers)
+ ],
+ Req = Target#http_db{
+ resource = "_ensure_full_commit",
+ method = post,
+ headers = Headers1
+ },
+ {ResultProps} = couch_rep_httpc:request(Req),
+ true = couch_util:get_value(<<"ok">>, ResultProps),
+ couch_util:get_value(<<"instance_start_time">>, ResultProps);
+ensure_full_commit(Target) ->
+ {ok, NewDb} = couch_db:open_int(Target#db.name, []),
+ UpdateSeq = couch_db:get_update_seq(Target),
+ CommitSeq = couch_db:get_committed_update_seq(NewDb),
+ InstanceStartTime = NewDb#db.instance_start_time,
+ couch_db:close(NewDb),
+ if UpdateSeq > CommitSeq ->
+ ?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
+ [UpdateSeq, CommitSeq]),
+ {ok, DbStartTime} = couch_db:ensure_full_commit(Target),
+ DbStartTime;
+ true ->
+ ?LOG_DEBUG("target doesn't need a full commit", []),
+ InstanceStartTime
+ end.
+
+ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) ->
+ Headers1 = [
+ {"Content-Length", 0} |
+ couch_util:proplist_apply_field(
+ {"Content-Type", "application/json"}, Headers)
+ ],
+ Req = Source#http_db{
+ resource = "_ensure_full_commit",
+ method = post,
+ qs = [{seq, RequiredSeq}],
+ headers = Headers1
+ },
+ {ResultProps} = couch_rep_httpc:request(Req),
+ case couch_util:get_value(<<"ok">>, ResultProps) of
+ true ->
+ couch_util:get_value(<<"instance_start_time">>, ResultProps);
+ undefined -> nil end;
+ensure_full_commit(Source, RequiredSeq) ->
+ {ok, NewDb} = couch_db:open_int(Source#db.name, []),
+ CommitSeq = couch_db:get_committed_update_seq(NewDb),
+ InstanceStartTime = NewDb#db.instance_start_time,
+ couch_db:close(NewDb),
+ if RequiredSeq > CommitSeq ->
+ ?LOG_DEBUG("source needs a full commit: required ~p committed ~p",
+ [RequiredSeq, CommitSeq]),
+ {ok, DbStartTime} = couch_db:ensure_full_commit(Source),
+ DbStartTime;
+ true ->
+ ?LOG_DEBUG("source doesn't need a full commit", []),
+ InstanceStartTime
+ end.
+
+update_local_doc(#http_db{} = Db, Doc) ->
+ Req = Db#http_db{
+ resource = couch_util:encode_doc_id(Doc),
+ method = put,
+ body = couch_doc:to_json_obj(Doc, [attachments]),
+ headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
+ },
+ {ResponseMembers} = couch_rep_httpc:request(Req),
+ Rev = couch_util:get_value(<<"rev">>, ResponseMembers),
+ couch_doc:parse_rev(Rev);
+update_local_doc(Db, Doc) ->
+ {ok, Result} = couch_db:update_doc(Db, Doc, [delay_commit]),
+ Result.
+
+up_to_date(#http_db{}, _Seq) ->
+ true;
+up_to_date(Source, Seq) ->
+ {ok, NewDb} = couch_db:open_int(Source#db.name, []),
+ T = NewDb#db.update_seq == Seq,
+ couch_db:close(NewDb),
+ T.
+
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
+ parse_proxy_params(?b2l(ProxyUrl));
+parse_proxy_params([]) ->
+ [];
+parse_proxy_params(ProxyUrl) ->
+ #url{
+ host = Host,
+ port = Port,
+ username = User,
+ password = Passwd
+ } = ibrowse_lib:parse_url(ProxyUrl),
+ [{proxy_host, Host}, {proxy_port, Port}] ++
+ case is_list(User) andalso is_list(Passwd) of
+ false ->
+ [];
+ true ->
+ [{proxy_user, User}, {proxy_password, Passwd}]
+ end.
+
+update_rep_doc({Props} = _RepDoc, KVs) ->
+ case couch_util:get_value(<<"_id">>, Props) of
+ undefined ->
+ % replication triggered by POSTing to _replicate/
+ ok;
+ RepDocId ->
+ % replication triggered by adding a Rep Doc to the replicator DB
+ {ok, RepDb} = ensure_rep_db_exists(),
+ case couch_db:open_doc(RepDb, RepDocId, []) of
+ {ok, LatestRepDoc} ->
+ update_rep_doc(RepDb, LatestRepDoc, KVs);
+ _ ->
+ ok
+ end,
+ couch_db:close(RepDb)
+ end.
+
+update_rep_doc(RepDb, #doc{body = {RepDocBody}} = RepDoc, KVs) ->
+ NewRepDocBody = lists:foldl(
+ fun({<<"_replication_state">> = K, _V} = KV, Body) ->
+ Body1 = lists:keystore(K, 1, Body, KV),
+ {Mega, Secs, _} = erlang:now(),
+ UnixTime = Mega * 1000000 + Secs,
+ lists:keystore(
+ <<"_replication_state_time">>, 1,
+ Body1, {<<"_replication_state_time">>, UnixTime});
+ ({K, _V} = KV, Body) ->
+ lists:keystore(K, 1, Body, KV)
+ end,
+ RepDocBody,
+ KVs
+ ),
+ % might not succeed - when the replication doc is deleted right
+ % before this update (not an error)
+ couch_db:update_doc(
+ RepDb,
+ RepDoc#doc{body = {NewRepDocBody}},
+ []
+ ).
+
+maybe_set_triggered({RepProps} = RepDoc, RepId) ->
+ case couch_util:get_value(<<"_replication_state">>, RepProps) of
+ <<"triggered">> ->
+ ok;
+ _ ->
+ update_rep_doc(
+ RepDoc,
+ [
+ {<<"_replication_state">>, <<"triggered">>},
+ {<<"_replication_id">>, ?l2b(RepId)}
+ ]
+ )
+ end.
+
+ensure_rep_db_exists() ->
+ DbName = ?l2b(couch_config:get("replicator", "db", "_replicator")),
+ Opts = [
+ {user_ctx, #user_ctx{roles=[<<"_admin">>, <<"_replicator">>]}},
+ sys_db
+ ],
+ case couch_db:open(DbName, Opts) of
+ {ok, Db} ->
+ Db;
+ _Error ->
+ {ok, Db} = couch_db:create(DbName, Opts)
+ end,
+ ok = ensure_rep_ddoc_exists(Db, <<"_design/_replicator">>),
+ {ok, Db}.
+
+ensure_rep_ddoc_exists(RepDb, DDocID) ->
+ case couch_db:open_doc(RepDb, DDocID, []) of
+ {ok, _Doc} ->
+ ok;
+ _ ->
+ DDoc = couch_doc:from_json_obj({[
+ {<<"_id">>, DDocID},
+ {<<"language">>, <<"javascript">>},
+ {<<"validate_doc_update">>, ?REP_DB_DOC_VALIDATE_FUN}
+ ]}),
+ {ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
+ end,
+ ok.
+
+source_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_source_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+source_db_update_notifier(_) ->
+ nil.
+
+target_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_target_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+target_db_update_notifier(_) ->
+ nil.