% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_rep).
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
    code_change/3]).

-export([replicate/2, checkpoint/1]).
-export([make_replication_id/2]).
-export([start_replication/3, end_replication/1, get_result/4]).

-include("couch_db.hrl").
-include("../ibrowse/ibrowse.hrl").

-define(REP_ID_VERSION, 2).

-record(state, {
    changes_feed,
    missing_revs,
    reader,
    writer,

    source,
    target,
    continuous,
    create_target,
    init_args,
    checkpoint_scheduled = nil,

    start_seq,
    history,
    session_id,
    source_log,
    target_log,
    rep_starttime,
    src_starttime,
    tgt_starttime,
    checkpoint_history = nil,

    listeners = [],
    complete = false,
    committed_seq = 0,

    stats = nil,
    source_db_update_notifier = nil,
    target_db_update_notifier = nil
}).

%% convenience function to do a simple replication from the shell
replicate(Source, Target) when is_list(Source) ->
    replicate(?l2b(Source), Target);
replicate(Source, Target) when is_binary(Source), is_list(Target) ->
    replicate(Source, ?l2b(Target));
replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
    replicate({[{<<"source">>, Source}, {<<"target">>, Target}]}, #user_ctx{});

%% function handling POST to _replicate
replicate({Props}=PostBody, UserCtx) ->
    RepId = make_replication_id(PostBody, UserCtx),
    case couch_util:get_value(<<"cancel">>, Props, false) of
    true ->
        end_replication(RepId);
    false ->
        Server = start_replication(PostBody, RepId, UserCtx),
        get_result(Server, RepId, PostBody, UserCtx)
    end.

end_replication({BaseId, Extension}) ->
    RepId = BaseId ++ Extension,
    case supervisor:terminate_child(couch_rep_sup, RepId) of
    {error, not_found} = R ->
        R;
    ok ->
        case supervisor:delete_child(couch_rep_sup, RepId) of
            ok ->
                {ok, {cancelled, ?l2b(BaseId)}};
            {error, not_found} ->
                {ok, {cancelled, ?l2b(BaseId)}};
            {error, _} = Error ->
                Error
        end
    end.

start_replication(RepDoc, {BaseId, Extension} = RepId, UserCtx) ->
    Replicator = {
        BaseId ++ Extension,
        {gen_server, start_link,
            [?MODULE, [RepId, RepDoc, UserCtx], []]},
        temporary,
        1,
        worker,
        [?MODULE]
    },
    start_replication_server(Replicator).

checkpoint(Server) ->
    gen_server:cast(Server, do_checkpoint).

get_result(Server, {BaseId, _Extension}, {Props} = PostBody, UserCtx) ->
    case couch_util:get_value(<<"continuous">>, Props, false) of
    true ->
        {ok, {continuous, ?l2b(BaseId)}};
    false ->
        try gen_server:call(Server, get_result, infinity) of
        retry -> replicate(PostBody, UserCtx);
        Else -> Else
        catch
        exit:{noproc, {gen_server, call, [Server, get_result, infinity]}} ->
            %% oops, this replication just finished -- restart it.
            replicate(PostBody, UserCtx);
        exit:{normal, {gen_server, call, [Server, get_result, infinity]}} ->
            %% we made the call during terminate
            replicate(PostBody, UserCtx)
        end
    end.

init(InitArgs) ->
    try
        do_init(InitArgs)
    catch
    throw:Error ->
        {stop, Error}
    end.

do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) ->
    process_flag(trap_exit, true),

    SourceProps = couch_util:get_value(<<"source">>, PostProps),
    TargetProps = couch_util:get_value(<<"target">>, PostProps),

    Continuous = couch_util:get_value(<<"continuous">>, PostProps, false),
    CreateTarget = couch_util:get_value(<<"create_target">>, PostProps, false),

    ProxyParams = parse_proxy_params(
        couch_util:get_value(<<"proxy">>, PostProps, [])),
    Source = open_db(SourceProps, UserCtx, ProxyParams),
    Target = open_db(TargetProps, UserCtx, ProxyParams, CreateTarget),

    SourceInfo = dbinfo(Source),
    TargetInfo = dbinfo(Target),

    [SourceLog, TargetLog] = find_replication_logs(
        [Source, Target], BaseId, {PostProps}, UserCtx),
    {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),

    {ok, ChangesFeed} =
        couch_rep_changes_feed:start_link(self(), Source, StartSeq, PostProps),
    {ok, MissingRevs} =
        couch_rep_missing_revs:start_link(self(), Target, ChangesFeed, PostProps),
    {ok, Reader} =
        couch_rep_reader:start_link(self(), Source, MissingRevs, PostProps),
    {ok, Writer} =
    couch_rep_writer:start_link(self(), Target, Reader, PostProps),

    Stats = ets:new(replication_stats, [set, private]),
    ets:insert(Stats, {total_revs,0}),
    ets:insert(Stats, {missing_revs, 0}),
    ets:insert(Stats, {docs_read, 0}),
    ets:insert(Stats, {docs_written, 0}),
    ets:insert(Stats, {doc_write_failures, 0}),

    {ShortId, _} = lists:split(6, BaseId),
    couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
        [ShortId, dbname(Source), dbname(Target)]), "Starting"),

    couch_replication_manager:replication_started(RepId),

    State = #state{
        changes_feed = ChangesFeed,
        missing_revs = MissingRevs,
        reader = Reader,
        writer = Writer,

        source = Source,
        target = Target,
        continuous = Continuous,
        create_target = CreateTarget,
        init_args = InitArgs,
        stats = Stats,
        checkpoint_scheduled = nil,

        start_seq = StartSeq,
        history = History,
        session_id = couch_uuids:random(),
        source_log = SourceLog,
        target_log = TargetLog,
        rep_starttime = httpd_util:rfc1123_date(),
        src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
        tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
        source_db_update_notifier = source_db_update_notifier(Source),
        target_db_update_notifier = target_db_update_notifier(Target)
    },
    {ok, State}.

handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
    {stop, normal, State#state{listeners=[From]}};
handle_call(get_result, From, State) ->
    Listeners = State#state.listeners,
    {noreply, State#state{listeners=[From|Listeners]}};

handle_call(get_source_db, _From, #state{source = Source} = State) ->
    {reply, {ok, Source}, State};

handle_call(get_target_db, _From, #state{target = Target} = State) ->
    {reply, {ok, Target}, State}.

handle_cast(reopen_source_db, #state{source = Source} = State) ->
    {ok, NewSource} = couch_db:reopen(Source),
    {noreply, State#state{source = NewSource}};

handle_cast(reopen_target_db, #state{target = Target} = State) ->
    {ok, NewTarget} = couch_db:reopen(Target),
    {noreply, State#state{target = NewTarget}};

handle_cast(do_checkpoint, State) ->
    {noreply, do_checkpoint(State)};

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
    couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
    {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};

handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
        when SourceSeq > N ->
    MissingRevs = State#state.missing_revs,
    ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
    couch_task_status:update("W Processed source update #~p", [SourceSeq]),
    {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, _}, State) ->
    {noreply, State};

handle_info({update_stats, Key, N}, State) ->
    ets:update_counter(State#state.stats, Key, N),
    {noreply, State};

handle_info({'DOWN', _, _, _, _}, State) ->
    ?LOG_INFO("replication terminating because local DB is shutting down", []),
    timer:cancel(State#state.checkpoint_scheduled),
    {stop, shutdown, State};

handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
    case State#state.listeners of
    [] ->
        {noreply, State#state{complete = true}};
    _Else ->
        {stop, normal, State}
    end;

handle_info({'EXIT', _, normal}, State) ->
    {noreply, State};
handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
        Err == target_error ->
    ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]),
    timer:cancel(State#state.checkpoint_scheduled),
    {stop, shutdown, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
    {stop, Reason, State}.

terminate(normal, #state{checkpoint_scheduled=nil, init_args=[RepId | _]} = State) ->
    do_terminate(State),
    couch_replication_manager:replication_completed(RepId);
    
terminate(normal, #state{init_args=[RepId | _]} = State) ->
    timer:cancel(State#state.checkpoint_scheduled),
    do_terminate(do_checkpoint(State)),
    couch_replication_manager:replication_completed(RepId);

terminate(shutdown, #state{listeners = Listeners} = State) ->
    % continuous replication stopped
    [gen_server:reply(L, {ok, stopped}) || L <- Listeners],
    terminate_cleanup(State);

terminate(Reason, #state{listeners = Listeners, init_args=[RepId | _]} = State) ->
    [gen_server:reply(L, {error, Reason}) || L <- Listeners],
    terminate_cleanup(State),
    couch_replication_manager:replication_error(RepId, Reason).

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

% internal funs

start_replication_server(Replicator) ->
    RepId = element(1, Replicator),
    case supervisor:start_child(couch_rep_sup, Replicator) of
    {ok, Pid} ->
        ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
        Pid;
    {error, already_present} ->
        case supervisor:restart_child(couch_rep_sup, RepId) of
        {ok, Pid} ->
            ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
            Pid;
        {error, running} ->
            %% this error occurs if multiple replicators are racing
            %% each other to start and somebody else won.  Just grab
            %% the Pid by calling start_child again.
            {error, {already_started, Pid}} =
                supervisor:start_child(couch_rep_sup, Replicator),
            ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
            Pid;
        {error, {db_not_found, DbUrl}} ->
            throw({db_not_found, <<"could not open ", DbUrl/binary>>});
        {error, {unauthorized, DbUrl}} ->
            throw({unauthorized,
                <<"unauthorized to access or create database ", DbUrl/binary>>});
        {error, {'EXIT', {badarg,
            [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
            % Clause to deal with a change in the supervisor module introduced
            % in R14B02. For more details consult the thread at:
            %     http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
            _ = supervisor:delete_child(couch_rep_sup, RepId),
            start_replication_server(Replicator)
        end;
    {error, {already_started, Pid}} ->
        ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
        Pid;
    {error, {{db_not_found, DbUrl}, _}} ->
        throw({db_not_found, <<"could not open ", DbUrl/binary>>});
    {error, {{unauthorized, DbUrl}, _}} ->
        throw({unauthorized,
            <<"unauthorized to access or create database ", DbUrl/binary>>})
    end.

compare_replication_logs(SrcDoc, TgtDoc) ->
    #doc{body={RepRecProps}} = SrcDoc,
    #doc{body={RepRecPropsTgt}} = TgtDoc,
    case couch_util:get_value(<<"session_id">>, RepRecProps) ==
            couch_util:get_value(<<"session_id">>, RepRecPropsTgt) of
    true ->
        % if the records have the same session id,
        % then we have a valid replication history
        OldSeqNum = couch_util:get_value(<<"source_last_seq">>, RepRecProps, 0),
        OldHistory = couch_util:get_value(<<"history">>, RepRecProps, []),
        {OldSeqNum, OldHistory};
    false ->
        SourceHistory = couch_util:get_value(<<"history">>, RepRecProps, []),
        TargetHistory = couch_util:get_value(<<"history">>, RepRecPropsTgt, []),
        ?LOG_INFO("Replication records differ. "
                "Scanning histories to find a common ancestor.", []),
        ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
                [RepRecProps, RepRecPropsTgt]),
        compare_rep_history(SourceHistory, TargetHistory)
    end.

compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
    ?LOG_INFO("no common ancestry -- performing full replication", []),
    {0, []};
compare_rep_history([{S}|SourceRest], [{T}|TargetRest]=Target) ->
    SourceId = couch_util:get_value(<<"session_id">>, S),
    case has_session_id(SourceId, Target) of
    true ->
        RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, S, 0),
        ?LOG_INFO("found a common replication record with source_seq ~p",
            [RecordSeqNum]),
        {RecordSeqNum, SourceRest};
    false ->
        TargetId = couch_util:get_value(<<"session_id">>, T),
        case has_session_id(TargetId, SourceRest) of
        true ->
            RecordSeqNum = couch_util:get_value(<<"recorded_seq">>, T, 0),
            ?LOG_INFO("found a common replication record with source_seq ~p",
                [RecordSeqNum]),
            {RecordSeqNum, TargetRest};
        false ->
            compare_rep_history(SourceRest, TargetRest)
        end
    end.

close_db(#http_db{}) ->
    ok;
close_db(Db) ->
    couch_db:close(Db).

dbname(#http_db{url = Url}) ->
    couch_util:url_strip_password(Url);
dbname(#db{name = Name}) ->
    Name.

dbinfo(#http_db{} = Db) ->
    {DbProps} = couch_rep_httpc:request(Db),
    [{couch_util:to_existing_atom(K), V} || {K,V} <- DbProps];
dbinfo(Db) ->
    {ok, Info} = couch_db:get_db_info(Db),
    Info.

do_terminate(State) ->
    #state{
        checkpoint_history = CheckpointHistory,
        committed_seq = NewSeq,
        listeners = Listeners,
        source = Source,
        continuous = Continuous,
        source_log = #doc{body={OldHistory}}
    } = State,
    
    NewRepHistory = case CheckpointHistory of
    nil ->
        {[{<<"no_changes">>, true} | OldHistory]};
    _Else ->
        CheckpointHistory
    end,

    %% reply to original requester
    OtherListeners = case Continuous of
    true ->
        []; % continuous replications have no listeners
    _ ->
        [Original|Rest] = lists:reverse(Listeners),
        gen_server:reply(Original, {ok, NewRepHistory}),
        Rest
    end,

    %% maybe trigger another replication. If this replicator uses a local
    %% source Db, changes to that Db since we started will not be included in
    %% this pass.
    case up_to_date(Source, NewSeq) of
        true ->
            [gen_server:reply(R, {ok, NewRepHistory}) || R <- OtherListeners];
        false ->
            [gen_server:reply(R, retry) || R <- OtherListeners]
    end,
    couch_task_status:update("Finishing"),
    terminate_cleanup(State).

terminate_cleanup(State) ->
    close_db(State#state.source),
    close_db(State#state.target),
    stop_db_update_notifier(State#state.source_db_update_notifier),
    stop_db_update_notifier(State#state.target_db_update_notifier),
    ets:delete(State#state.stats).

stop_db_update_notifier(nil) ->
    ok;
stop_db_update_notifier(Notifier) ->
    couch_db_update_notifier:stop(Notifier).

has_session_id(_SessionId, []) ->
    false;
has_session_id(SessionId, [{Props} | Rest]) ->
    case couch_util:get_value(<<"session_id">>, Props, nil) of
    SessionId ->
        true;
    _Else ->
        has_session_id(SessionId, Rest)
    end.

maybe_append_options(Options, {Props}) ->
    lists:foldl(fun(Option, Acc) ->
        Acc ++
        case couch_util:get_value(Option, Props, false) of
        true ->
            "+" ++ ?b2l(Option);
        false ->
            ""
        end
    end, [], Options).

make_replication_id(RepProps, UserCtx) ->
    BaseId = make_replication_id(RepProps, UserCtx, ?REP_ID_VERSION),
    Extension = maybe_append_options(
                  [<<"continuous">>, <<"create_target">>], RepProps),
    {BaseId, Extension}.

% Versioned clauses for generating replication ids
% If a change is made to how replications are identified
% add a new clause and increase ?REP_ID_VERSION at the top
make_replication_id({Props}, UserCtx, 2) ->
    {ok, HostName} = inet:gethostname(),
    Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
    P when is_number(P) ->
        P;
    _ ->
        % On restart we might be called before the couch_httpd process is
        % started.
        % TODO: we might be under an SSL socket server only, or both under
        % SSL and a non-SSL socket.
        % ... mochiweb_socket_server:get(https, port)
        list_to_integer(couch_config:get("httpd", "port", "5984"))
    end,
    Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
    Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
    maybe_append_filters({Props}, [HostName, Port, Src, Tgt], UserCtx);
make_replication_id({Props}, UserCtx, 1) ->
    {ok, HostName} = inet:gethostname(),
    Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
    Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
    maybe_append_filters({Props}, [HostName, Src, Tgt], UserCtx).

maybe_append_filters({Props}, Base, UserCtx) ->
    Base2 = Base ++ 
        case couch_util:get_value(<<"filter">>, Props) of
        undefined ->
            case couch_util:get_value(<<"doc_ids">>, Props) of
            undefined ->
                [];
            DocIds ->
                [DocIds]
            end;
        Filter ->
            [filter_code(Filter, Props, UserCtx),
                couch_util:get_value(<<"query_params">>, Props, {[]})]
        end,
    couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).

filter_code(Filter, Props, UserCtx) ->
    {DDocName, FilterName} =
    case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
    {match, [DDocName0, FilterName0]} ->
        {DDocName0, FilterName0};
    _ ->
        throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
    end,
    ProxyParams = parse_proxy_params(
        couch_util:get_value(<<"proxy">>, Props, [])),
    DbName = couch_util:get_value(<<"source">>, Props),
    Source = try
        open_db(DbName, UserCtx, ProxyParams)
    catch
    _Tag:DbError ->
        DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
           [couch_util:url_strip_password(DbName), couch_util:to_binary(DbError)]),
        throw({error, iolist_to_binary(DbErrorMsg)})
    end,
    try
        Body = case (catch open_doc(Source, <<"_design/", DDocName/binary>>)) of
        {ok, #doc{body = Body0}} ->
            Body0;
        DocError ->
            DocErrorMsg = io_lib:format(
                "Couldn't open document `_design/~s` from source "
                "database `~s`: ~s",
                [DDocName, dbname(Source), couch_util:to_binary(DocError)]),
            throw({error, iolist_to_binary(DocErrorMsg)})
        end,
        Code = couch_util:get_nested_json_value(
            Body, [<<"filters">>, FilterName]),
        re:replace(Code, "^\s*(.*?)\s*$", "\\1", [{return, binary}])
    after
        close_db(Source)
    end.

maybe_add_trailing_slash(Url) ->
    re:replace(Url, "[^/]$", "&/", [{return, list}]).

get_rep_endpoint(_UserCtx, {Props}) ->
    Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
    {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
    {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
    case couch_util:get_value(<<"oauth">>, Auth) of
    undefined ->
        {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
    {OAuth} ->
        {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
    end;
get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
    {remote, maybe_add_trailing_slash(Url), []};
get_rep_endpoint(_UserCtx, <<"https://",_/binary>>=Url) ->
    {remote, maybe_add_trailing_slash(Url), []};
get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
    {local, DbName, UserCtx}.

find_replication_logs(DbList, RepId, RepProps, UserCtx) ->
    LogId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
    fold_replication_logs(DbList, ?REP_ID_VERSION,
        LogId, LogId, RepProps, UserCtx, []).

% Accumulate the replication logs
% Falls back to older log document ids and migrates them
fold_replication_logs([], _Vsn, _LogId, _NewId, _RepProps, _UserCtx, Acc) ->
    lists:reverse(Acc);
fold_replication_logs([Db|Rest]=Dbs, Vsn, LogId, NewId,
        RepProps, UserCtx, Acc) ->
    case open_replication_log(Db, LogId) of
    {error, not_found} when Vsn > 1 ->
        OldRepId = make_replication_id(RepProps, UserCtx, Vsn - 1),
        fold_replication_logs(Dbs, Vsn - 1,
            ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, RepProps, UserCtx, Acc);
    {error, not_found} ->
        fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
            RepProps, UserCtx, [#doc{id=NewId}|Acc]);
    {ok, Doc} when LogId =:= NewId ->
        fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
            RepProps, UserCtx, [Doc|Acc]);
    {ok, Doc} ->
        MigratedLog = #doc{id=NewId,body=Doc#doc.body},
        fold_replication_logs(Rest, ?REP_ID_VERSION, NewId, NewId,
            RepProps, UserCtx, [MigratedLog|Acc])
    end.

open_replication_log(Db, DocId) ->
    case open_doc(Db, DocId) of
    {ok, Doc} ->
        ?LOG_DEBUG("found a replication log for ~s", [dbname(Db)]),
        {ok, Doc};
    _ ->
        ?LOG_DEBUG("didn't find a replication log for ~s", [dbname(Db)]),
        {error, not_found}
    end.

open_doc(#http_db{} = Db, DocId) ->
    Req = Db#http_db{resource = couch_util:encode_doc_id(DocId)},
    case couch_rep_httpc:request(Req) of
    {[{<<"error">>, _}, {<<"reason">>, _}]} ->
        {error, not_found};
    Doc ->
        {ok, couch_doc:from_json_obj(Doc)}
    end;
open_doc(Db, DocId) ->
    couch_db:open_doc(Db, DocId).

open_db(Props, UserCtx, ProxyParams) ->
    open_db(Props, UserCtx, ProxyParams, false).

open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
    Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
    {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
    {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
    Headers = [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders],
    DefaultHeaders = (#http_db{})#http_db.headers,
    Db1 = #http_db{
        url = Url,
        auth = AuthProps,
        headers = lists:ukeymerge(1, Headers, DefaultHeaders)
    },
    Db = Db1#http_db{
        options = Db1#http_db.options ++ ProxyParams ++
            couch_rep_httpc:ssl_options(Db1)
    },
    couch_rep_httpc:db_exists(Db, CreateTarget);
open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
    open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
    open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) ->
    try
        case CreateTarget of
        true ->
            ok = couch_httpd:verify_is_server_admin(UserCtx),
            couch_server:create(DbName, [{user_ctx, UserCtx}]);
        false ->
            ok
        end,

        case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
        {ok, Db} ->
            couch_db:monitor(Db),
            Db;
        {not_found, no_db_file} ->
            throw({db_not_found, DbName})
        end
    catch throw:{unauthorized, _} ->
        throw({unauthorized, DbName})
    end.

schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
    Server = self(),
    case timer:apply_after(5000, couch_rep, checkpoint, [Server]) of
    {ok, TRef} ->
        State#state{checkpoint_scheduled = TRef};
    Error ->
        ?LOG_ERROR("tried to schedule a checkpoint but got ~p", [Error]),
        State
    end;
schedule_checkpoint(State) ->
    State.

do_checkpoint(State) ->
    #state{
        source = Source,
        target = Target,
        committed_seq = NewSeqNum,
        start_seq = StartSeqNum,
        history = OldHistory,
        session_id = SessionId,
        source_log = SourceLog,
        target_log = TargetLog,
        rep_starttime = ReplicationStartTime,
        src_starttime = SrcInstanceStartTime,
        tgt_starttime = TgtInstanceStartTime,
        stats = Stats,
        init_args = [_RepId, {RepDoc} | _]
    } = State,
    case commit_to_both(Source, Target, NewSeqNum) of
    {SrcInstanceStartTime, TgtInstanceStartTime} ->
        ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
            [dbname(Source), dbname(Target), NewSeqNum]),
        EndTime = ?l2b(httpd_util:rfc1123_date()),
        StartTime = ?l2b(ReplicationStartTime),
        DocsRead = ets:lookup_element(Stats, docs_read, 2),
        DocsWritten = ets:lookup_element(Stats, docs_written, 2),
        DocWriteFailures = ets:lookup_element(Stats, doc_write_failures, 2),
        NewHistoryEntry = {[
            {<<"session_id">>, SessionId},
            {<<"start_time">>, StartTime},
            {<<"end_time">>, EndTime},
            {<<"start_last_seq">>, StartSeqNum},
            {<<"end_last_seq">>, NewSeqNum},
            {<<"recorded_seq">>, NewSeqNum},
            {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
            {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
            {<<"docs_read">>, DocsRead},
            {<<"docs_written">>, DocsWritten},
            {<<"doc_write_failures">>, DocWriteFailures}
        ]},
        BaseHistory = [
            {<<"session_id">>, SessionId},
            {<<"source_last_seq">>, NewSeqNum},
            {<<"replication_id_version">>, ?REP_ID_VERSION}
        ] ++ case couch_util:get_value(<<"doc_ids">>, RepDoc) of
        undefined ->
            [];
        DocIds when is_list(DocIds) ->
            % backwards compatibility with the result of a replication by
            % doc IDs in versions 0.11.x and 1.0.x
            [
                {<<"start_time">>, StartTime},
                {<<"end_time">>, EndTime},
                {<<"docs_read">>, DocsRead},
                {<<"docs_written">>, DocsWritten},
                {<<"doc_write_failures">>, DocWriteFailures}
            ]
        end,
        % limit history to 50 entries
        NewRepHistory = {
            BaseHistory ++
            [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
        },

        try
        {SrcRevPos,SrcRevId} =
            update_local_doc(Source, SourceLog#doc{body=NewRepHistory}),
        {TgtRevPos,TgtRevId} =
            update_local_doc(Target, TargetLog#doc{body=NewRepHistory}),
        State#state{
            checkpoint_scheduled = nil,
            checkpoint_history = NewRepHistory,
            source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
            target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
        }
        catch throw:conflict ->
        ?LOG_ERROR("checkpoint failure: conflict (are you replicating to "
            "yourself?)", []),
        State
        end;
    _Else ->
        ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
            [dbname(Source), dbname(Target)]),
        #state{
            changes_feed = CF,
            missing_revs = MR,
            reader = Reader,
            writer = Writer
        } = State,
        Pids = [Writer, Reader, MR, CF],
        [unlink(Pid) || Pid <- Pids],
        [exit(Pid, shutdown) || Pid <- Pids],
        close_db(Target),
        close_db(Source),
        {ok, NewState} = init(State#state.init_args),
        NewState#state{listeners=State#state.listeners}
    end.

commit_to_both(Source, Target, RequiredSeq) ->
    % commit the src async
    ParentPid = self(),
    SrcCommitPid = spawn_link(fun() ->
            ParentPid ! {self(), ensure_full_commit(Source, RequiredSeq)} end),

    % commit tgt sync
    TargetStartTime = ensure_full_commit(Target),

    SourceStartTime =
    receive
    {SrcCommitPid, Timestamp} ->
        Timestamp;
    {'EXIT', SrcCommitPid, {http_request_failed, _}} ->
        exit(replication_link_failure)
    end,
    {SourceStartTime, TargetStartTime}.
    
ensure_full_commit(#http_db{headers = Headers} = Target) ->
    Headers1 = [
        {"Content-Length", 0} |
        couch_util:proplist_apply_field(
            {"Content-Type", "application/json"}, Headers)
    ],
    Req = Target#http_db{
        resource = "_ensure_full_commit",
        method = post,
        headers = Headers1
    },
    {ResultProps} = couch_rep_httpc:request(Req),
    true = couch_util:get_value(<<"ok">>, ResultProps),
    couch_util:get_value(<<"instance_start_time">>, ResultProps);
ensure_full_commit(Target) ->
    {ok, NewDb} = couch_db:open_int(Target#db.name, []),
    UpdateSeq = couch_db:get_update_seq(Target),
    CommitSeq = couch_db:get_committed_update_seq(NewDb),
    InstanceStartTime = NewDb#db.instance_start_time,
    couch_db:close(NewDb),
    if UpdateSeq > CommitSeq ->
        ?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
            [UpdateSeq, CommitSeq]),
        {ok, DbStartTime} = couch_db:ensure_full_commit(Target),
        DbStartTime;
    true ->
        ?LOG_DEBUG("target doesn't need a full commit", []),
        InstanceStartTime
    end.

ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) ->
    Headers1 = [
        {"Content-Length", 0} |
        couch_util:proplist_apply_field(
            {"Content-Type", "application/json"}, Headers)
    ],
    Req = Source#http_db{
        resource = "_ensure_full_commit",
        method = post,
        qs = [{seq, RequiredSeq}],
        headers = Headers1
    },
    {ResultProps} = couch_rep_httpc:request(Req),
    case couch_util:get_value(<<"ok">>, ResultProps) of
    true ->
        couch_util:get_value(<<"instance_start_time">>, ResultProps);
    undefined -> nil end;
ensure_full_commit(Source, RequiredSeq) ->
    {ok, NewDb} = couch_db:open_int(Source#db.name, []),
    CommitSeq = couch_db:get_committed_update_seq(NewDb),
    InstanceStartTime = NewDb#db.instance_start_time,
    couch_db:close(NewDb),
    if RequiredSeq > CommitSeq ->
        ?LOG_DEBUG("source needs a full commit: required ~p committed ~p",
            [RequiredSeq, CommitSeq]),
        {ok, DbStartTime} = couch_db:ensure_full_commit(Source),
        DbStartTime;
    true ->
        ?LOG_DEBUG("source doesn't need a full commit", []),
        InstanceStartTime
    end.

update_local_doc(#http_db{} = Db, Doc) ->
    Req = Db#http_db{
        resource = couch_util:encode_doc_id(Doc),
        method = put,
        body = couch_doc:to_json_obj(Doc, [attachments]),
        headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
    },
    {ResponseMembers} = couch_rep_httpc:request(Req),
    Rev = couch_util:get_value(<<"rev">>, ResponseMembers),
    couch_doc:parse_rev(Rev);
update_local_doc(Db, Doc) ->
    {ok, Result} = couch_db:update_doc(Db, Doc, [delay_commit]),
    Result.

up_to_date(#http_db{}, _Seq) ->
    true;
up_to_date(Source, Seq) ->
    {ok, NewDb} = couch_db:open_int(Source#db.name, []),
    T = NewDb#db.update_seq == Seq,
    couch_db:close(NewDb),
    T.

parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
    parse_proxy_params(?b2l(ProxyUrl));
parse_proxy_params([]) ->
    [];
parse_proxy_params(ProxyUrl) ->
    #url{
        host = Host,
        port = Port,
        username = User,
        password = Passwd
    } = ibrowse_lib:parse_url(ProxyUrl),
    [{proxy_host, Host}, {proxy_port, Port}] ++
        case is_list(User) andalso is_list(Passwd) of
        false ->
            [];
        true ->
            [{proxy_user, User}, {proxy_password, Passwd}]
        end.

source_db_update_notifier(#db{name = DbName}) ->
    Server = self(),
    {ok, Notifier} = couch_db_update_notifier:start_link(
        fun({compacted, DbName1}) when DbName1 =:= DbName ->
                ok = gen_server:cast(Server, reopen_source_db);
            (_) ->
                ok
        end),
    Notifier;
source_db_update_notifier(_) ->
    nil.

target_db_update_notifier(#db{name = DbName}) ->
    Server = self(),
    {ok, Notifier} = couch_db_update_notifier:start_link(
        fun({compacted, DbName1}) when DbName1 =:= DbName ->
                ok = gen_server:cast(Server, reopen_target_db);
            (_) ->
                ok
        end),
    Notifier;
target_db_update_notifier(_) ->
    nil.