% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_changes).
-include("couch_db.hrl").

-export([handle_changes/3]).

%% @type Req -> #httpd{} | {json_req, JsonObj()}
handle_changes(#changes_args{}=Args1, Req, Db) ->
    Args = Args1#changes_args{filter=make_filter_fun(Args1, Req, Db)},
    StartSeq = case Args#changes_args.dir of
    rev ->
        couch_db:get_update_seq(Db);
    fwd ->
        Args#changes_args.since
    end,
    if Args#changes_args.feed == "continuous" orelse
        Args#changes_args.feed == "longpoll" ->
        fun(Callback) ->
            start_sending_changes(Callback, Args#changes_args.feed),
            Self = self(),
            {ok, Notify} = couch_db_update_notifier:start_link(
                fun({_, DbName}) when DbName == Db#db.name ->
                    Self ! db_updated;
                (_) ->
                    ok
                end
            ),
            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
            couch_stats_collector:track_process_count(
                Self,
                {httpd, clients_requesting_changes}
            ),
            try
                keep_sending_changes(
                    Args,
                    Callback,
                    Db,
                    StartSeq,
                    <<"">>,
                    Timeout,
                    TimeoutFun
                )
            after
                couch_db_update_notifier:stop(Notify),
                get_rest_db_updated() % clean out any remaining update messages
            end
        end;
    true ->
        fun(Callback) ->
            start_sending_changes(Callback, Args#changes_args.feed),
            {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} =
                send_changes(
                    Args#changes_args{feed="normal"},
                    Callback,
                    Db,
                    StartSeq,
                    <<"">>
                ),
            end_sending_changes(Callback, LastSeq, Args#changes_args.feed)
        end
    end.

%% @type Req -> #httpd{} | {json_req, JsonObj()}
make_filter_fun(#changes_args{filter=FilterName}, Req, Db) ->
    case [list_to_binary(couch_httpd:unquote(Part))
            || Part <- string:tokens(FilterName, "/")] of
    [] ->
        fun(DocInfos) ->
        % doing this as a batch is more efficient for external filters
            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} ||
                #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos]
        end;
    [DName, FName] ->
        DesignId = <<"_design/", DName/binary>>,
        DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
        % validate that the ddoc has the filter fun
        #doc{body={Props}} = DDoc,
        couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
        fun(DocInfos) ->
            Docs = [Doc || {ok, Doc} <- [
                {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts])
                || DInfo <- DocInfos]],
            {ok, Passes} = couch_query_servers:filter_docs(
                Req, Db, DDoc, FName, Docs
            ),
            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}
                || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos,
                Pass <- Passes, Pass == true]
        end;
    _Else ->
        throw({bad_request,
            "filter parameter must be of the form `designname/filtername`"})
    end.

get_changes_timeout(Args, Callback) ->
    #changes_args{
        heartbeat = Heartbeat,
        timeout = Timeout,
        feed = ResponseType
    } = Args,
    DefaultTimeout = list_to_integer(
        couch_config:get("httpd", "changes_timeout", "60000")
    ),
    case Heartbeat of
    undefined ->
        case Timeout of
        undefined ->
            {DefaultTimeout, fun() -> stop end};
        infinity ->
            {infinity, fun() -> stop end};
        _ ->
            {lists:min([DefaultTimeout, Timeout]), fun() -> stop end}
        end;
    true ->
        {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end};
    _ ->
        {lists:min([DefaultTimeout, Heartbeat]),
            fun() -> Callback(timeout, ResponseType), ok end}
    end.

start_sending_changes(_Callback, "continuous") ->
    ok;
start_sending_changes(Callback, ResponseType) ->
    Callback(start, ResponseType).

send_changes(Args, Callback, Db, StartSeq, Prepend) ->
    #changes_args{
        style = Style,
        include_docs = IncludeDocs,
        limit = Limit,
        feed = ResponseType,
        dir = Dir,
        filter = FilterFun
    } = Args,
    couch_db:changes_since(
        Db,
        Style,
        StartSeq,
        fun changes_enumerator/2,
        [{dir, Dir}],
        {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit,
            IncludeDocs}
    ).

keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
    TimeoutFun) ->

    #changes_args{
        feed = ResponseType,
        limit = Limit
    } = Args,
    {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(
        Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend
    ),
    couch_db:close(Db),
    if Limit > NewLimit, ResponseType == "longpoll" ->
        end_sending_changes(Callback, EndSeq, ResponseType);
    true ->
        case wait_db_updated(Timeout, TimeoutFun) of
        updated ->
            case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of
            {ok, Db2} ->
                keep_sending_changes(
                    Args#changes_args{limit=NewLimit},
                    Callback,
                    Db2,
                    EndSeq,
                    Prepend2,
                    Timeout,
                    TimeoutFun
                );
            _Else ->
                end_sending_changes(Callback, EndSeq, ResponseType)
            end;
        stop ->
            end_sending_changes(Callback, EndSeq, ResponseType)
        end
    end.

end_sending_changes(Callback, EndSeq, ResponseType) ->
    Callback({stop, EndSeq}, ResponseType).

changes_enumerator(DocInfos, {Db, _, _, FilterFun, Callback, "continuous",
    Limit, IncludeDocs}) ->

    [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
        = DocInfos,
    Results0 = FilterFun(DocInfos),
    Results = [Result || Result <- Results0, Result /= null],
    Go = if Limit =< 1 -> stop; true -> ok end,
    case Results of
    [] ->
        {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit,
                IncludeDocs}
        };
    _ ->
        ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
        Callback({change, ChangesRow, <<"">>}, "continuous"),
        {Go, {Db, Seq, nil, FilterFun, Callback, "continuous",  Limit - 1,
                IncludeDocs}
        }
    end;
changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Callback, ResponseType,
    Limit, IncludeDocs}) ->

    [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
        = DocInfos,
    Results0 = FilterFun(DocInfos),
    Results = [Result || Result <- Results0, Result /= null],
    Go = if Limit =< 1 -> stop; true -> ok end,
    case Results of
    [] ->
        {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit,
                IncludeDocs}
        };
    _ ->
        ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
        Callback({change, ChangesRow, Prepend}, ResponseType),
        {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit - 1,
                IncludeDocs}
        }
    end.


changes_row(Db, Seq, Id, Del, Results, Rev, true) ->
    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
        deleted_item(Del) ++ couch_httpd_view:doc_member(Db, {Id, Rev})};
changes_row(_, Seq, Id, Del, Results, _, false) ->
    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
        deleted_item(Del)}.

deleted_item(true) -> [{deleted, true}];
deleted_item(_) -> [].

% waits for a db_updated msg, if there are multiple msgs, collects them.
wait_db_updated(Timeout, TimeoutFun) ->
    receive db_updated -> get_rest_db_updated()
    after Timeout ->
        case TimeoutFun() of
        ok -> wait_db_updated(Timeout, TimeoutFun);
        stop -> stop
        end
    end.

get_rest_db_updated() ->
    receive db_updated -> get_rest_db_updated()
    after 0 -> updated
    end.