% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_query_servers).
-behaviour(gen_server).

-export([start_link/0]).

-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
-export([start_doc_map/3, map_docs/2, stop_doc_map/1]).
-export([reduce/3, rereduce/3,validate_doc_update/5]).
-export([filter_docs/5]).

-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).

% -export([test/0]).

-include("couch_db.hrl").

-record(proc, {
    pid,
    lang,
    ddoc_keys = [],
    prompt_fun,
    set_timeout_fun,
    stop_fun
}).

-record(qserver, {
    langs, % Keyed by language name, value is {Mod,Func,Arg}
    pid_procs, % Keyed by PID, valus is a #proc record.
    lang_procs, % Keyed by language name, value is a #proc record
    lang_limits, % Keyed by language name, value is {Lang, Limit, Current}
    waitlist = [],
    config
}).

start_link() ->
    gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []).

start_doc_map(Lang, Functions, Lib) ->
    Proc = get_os_process(Lang),
    case Lib of
    {[]} -> ok;
    Lib ->
        true = proc_prompt(Proc, [<<"add_lib">>, Lib])
    end,
    lists:foreach(fun(FunctionSource) ->
        true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource])
    end, Functions),
    {ok, Proc}.

map_docs(Proc, Docs) ->
    % send the documents
    Results = lists:map(
        fun(Doc) ->
            Json = couch_doc:to_json_obj(Doc, []),

            FunsResults = proc_prompt(Proc, [<<"map_doc">>, Json]),
            % the results are a json array of function map yields like this:
            % [FunResults1, FunResults2 ...]
            % where funresults is are json arrays of key value pairs:
            % [[Key1, Value1], [Key2, Value2]]
            % Convert the key, value pairs to tuples like
            % [{Key1, Value1}, {Key2, Value2}]
            lists:map(
                fun(FunRs) ->
                    [list_to_tuple(FunResult) || FunResult <- FunRs]
                end,
            FunsResults)
        end,
        Docs),
    {ok, Results}.


stop_doc_map(nil) ->
    ok;
stop_doc_map(Proc) ->
    ok = ret_os_process(Proc).

group_reductions_results([]) ->
    [];
group_reductions_results(List) ->
    {Heads, Tails} = lists:foldl(
        fun([H|T], {HAcc,TAcc}) ->
            {[H|HAcc], [T|TAcc]}
        end, {[], []}, List),
    case Tails of
    [[]|_] -> % no tails left
        [Heads];
    _ ->
     [Heads | group_reductions_results(Tails)]
    end.

rereduce(_Lang, [], _ReducedValues) ->
    {ok, []};
rereduce(Lang, RedSrcs, ReducedValues) ->
    Grouped = group_reductions_results(ReducedValues),
    Results = lists:zipwith(
        fun
        (<<"_", _/binary>> = FunSrc, Values) ->
            {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []),
            Result;
        (FunSrc, Values) ->
            os_rereduce(Lang, [FunSrc], Values)
        end, RedSrcs, Grouped),
    {ok, Results}.

reduce(_Lang, [], _KVs) ->
    {ok, []};
reduce(Lang, RedSrcs, KVs) ->
    {OsRedSrcs, BuiltinReds} = lists:partition(fun
        (<<"_", _/binary>>) -> false;
        (_OsFun) -> true
    end, RedSrcs),
    {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs),
    {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []),
    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []).

recombine_reduce_results([], [], [], Acc) ->
    {ok, lists:reverse(Acc)};
recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) ->
    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]);
recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) ->
    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]).

os_reduce(_Lang, [], _KVs) ->
    {ok, []};
os_reduce(Lang, OsRedSrcs, KVs) ->
    Proc = get_os_process(Lang),
    OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
        [true, Reductions] -> Reductions
    after
        ok = ret_os_process(Proc)
    end,
    {ok, OsResults}.

os_rereduce(Lang, OsRedSrcs, KVs) ->
    Proc = get_os_process(Lang),
    try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of
        [true, [Reduction]] -> Reduction
    after
        ok = ret_os_process(Proc)
    end.


builtin_reduce(_Re, [], _KVs, Acc) ->
    {ok, lists:reverse(Acc)};
builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) ->
    Sum = builtin_sum_rows(KVs),
    builtin_reduce(Re, BuiltinReds, KVs, [Sum|Acc]);
builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
    Count = length(KVs),
    builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]);
builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
    Count = builtin_sum_rows(KVs),
    builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]);
builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) ->
    Stats = builtin_stats(Re, KVs),
    builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]).

builtin_sum_rows(KVs) ->
    lists:foldl(fun
        ([_Key, Value], Acc) when is_number(Value), is_number(Acc) ->
            Acc + Value;
        ([_Key, Value], Acc) when is_list(Value), is_list(Acc) ->
            sum_terms(Acc, Value);
        ([_Key, Value], Acc) when is_number(Value), is_list(Acc) ->
            sum_terms(Acc, [Value]);
        ([_Key, Value], Acc) when is_list(Value), is_number(Acc) ->
            sum_terms([Acc], Value);
        (_Else, _Acc) ->
            throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>})
    end, 0, KVs).

sum_terms([], []) ->
    [];
sum_terms([_|_]=Xs, []) ->
    Xs;
sum_terms([], [_|_]=Ys) ->
    Ys;
sum_terms([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) ->
    [X+Y | sum_terms(Xs,Ys)];
sum_terms(_, _) ->
    throw({invalid_value, <<"builtin _sum function requires map values to be numbers or lists of numbers">>}).

builtin_stats(reduce, [[_,First]|Rest]) when is_number(First) ->
    Stats = lists:foldl(fun([_K,V], {S,C,Mi,Ma,Sq}) when is_number(V) ->
        {S+V, C+1, lists:min([Mi, V]), lists:max([Ma, V]), Sq+(V*V)};
    (_, _) ->
        throw({invalid_value,
            <<"builtin _stats function requires map values to be numbers">>})
    end, {First,1,First,First,First*First}, Rest),
    {Sum, Cnt, Min, Max, Sqr} = Stats,
    {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]};

builtin_stats(rereduce, [[_,First]|Rest]) ->
    {[{sum,Sum0}, {count,Cnt0}, {min,Min0}, {max,Max0}, {sumsqr,Sqr0}]} = First,
    Stats = lists:foldl(fun([_K,Red], {S,C,Mi,Ma,Sq}) ->
        {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]} = Red,
        {Sum+S, Cnt+C, lists:min([Min, Mi]), lists:max([Max, Ma]), Sqr+Sq}
    end, {Sum0,Cnt0,Min0,Max0,Sqr0}, Rest),
    {Sum, Cnt, Min, Max, Sqr} = Stats,
    {[{sum,Sum}, {count,Cnt}, {min,Min}, {max,Max}, {sumsqr,Sqr}]}.

% use the function stored in ddoc.validate_doc_update to test an update.
validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
    JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
    JsonDiskDoc = json_doc(DiskDoc),
    case ddoc_prompt(DDoc, [<<"validate_doc_update">>], [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]) of
        1 ->
            ok;
        {[{<<"forbidden">>, Message}]} ->
            throw({forbidden, Message});
        {[{<<"unauthorized">>, Message}]} ->
            throw({unauthorized, Message})
    end.

json_doc(nil) -> null;
json_doc(Doc) ->
    couch_doc:to_json_obj(Doc, [revs]).

filter_docs(Req, Db, DDoc, FName, Docs) ->
    JsonReq = case Req of
    {json_req, JsonObj} ->
        JsonObj;
    #httpd{} = HttpReq ->
        couch_httpd_external:json_req_obj(HttpReq, Db)
    end,
    JsonDocs = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs],
    [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName], [JsonDocs, JsonReq]),
    {ok, Passes}.

ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
    proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).

ddoc_prompt(DDoc, FunPath, Args) ->
    with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
        proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
    end).

with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
    Rev = couch_doc:rev_to_str({Start, DiskRev}),
    DDocKey = {DDocId, Rev},
    Proc = get_ddoc_process(DDoc, DDocKey),
    try Fun({Proc, DDocId})
    after
        ok = ret_os_process(Proc)
    end.

init([]) ->
    % read config and register for configuration changes

    % just stop if one of the config settings change. couch_server_sup
    % will restart us and then we will pick up the new settings.

    ok = couch_config:register(
        fun("query_servers" ++ _, _) ->
            supervisor:terminate_child(couch_secondary_services, query_servers),
            supervisor:restart_child(couch_secondary_services, query_servers)
        end),
    ok = couch_config:register(
        fun("native_query_servers" ++ _, _) ->
            supervisor:terminate_child(couch_secondary_services, query_servers),
            [supervisor:restart_child(couch_secondary_services, query_servers)]
        end),
    ok = couch_config:register(
        fun("query_server_config" ++ _, _) ->
            supervisor:terminate_child(couch_secondary_services, query_servers),
            supervisor:restart_child(couch_secondary_services, query_servers)
        end),

    Langs = ets:new(couch_query_server_langs, [set, private]),
    LangLimits = ets:new(couch_query_server_lang_limits, [set, private]),
    PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
    LangProcs = ets:new(couch_query_server_procs, [set, private]),

    ProcTimeout = list_to_integer(couch_config:get(
                        "couchdb", "os_process_timeout", "5000")),
    ReduceLimit = list_to_atom(
        couch_config:get("query_server_config","reduce_limit","true")),
    OsProcLimit = list_to_integer(
        couch_config:get("query_server_config","os_process_limit","10")),

    % 'query_servers' specifies an OS command-line to execute.
    lists:foreach(fun({Lang, Command}) ->
        true = ets:insert(LangLimits, {?l2b(Lang), OsProcLimit, 0}),
        true = ets:insert(Langs, {?l2b(Lang),
                          couch_os_process, start_link, [Command]})
    end, couch_config:get("query_servers")),
    % 'native_query_servers' specifies a {Module, Func, Arg} tuple.
    lists:foreach(fun({Lang, SpecStr}) ->
        {ok, {Mod, Fun, SpecArg}} = couch_util:parse_term(SpecStr),
        true = ets:insert(LangLimits, {?l2b(Lang), 0, 0}), % 0 means no limit
        true = ets:insert(Langs, {?l2b(Lang),
                          Mod, Fun, SpecArg})
    end, couch_config:get("native_query_servers")),


    process_flag(trap_exit, true),
    {ok, #qserver{
        langs = Langs, % Keyed by language name, value is {Mod,Func,Arg}
        pid_procs = PidProcs, % Keyed by PID, valus is a #proc record.
        lang_procs = LangProcs, % Keyed by language name, value is a #proc record
        lang_limits = LangLimits, % Keyed by language name, value is {Lang, Limit, Current}
        config = {[{<<"reduce_limit">>, ReduceLimit},{<<"timeout">>, ProcTimeout}]}
    }}.

terminate(_Reason, #qserver{pid_procs=PidProcs}) ->
    [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)],
    ok.

handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, Server) ->
    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
    case lang_proc(Lang, Server, fun(Procs) ->
            % find a proc in the set that has the DDoc
            proc_with_ddoc(DDoc, DDocKey, Procs)
        end) of
    {ok, Proc} ->
        {reply, {ok, Proc, Server#qserver.config}, Server};
    wait ->
        {noreply, add_to_waitlist({DDoc, DDocKey}, From, Server)};
    Error ->
        {reply, Error, Server}
    end;
handle_call({get_proc, Lang}, From, Server) ->
    case lang_proc(Lang, Server, fun([P|_Procs]) ->
            {ok, P}
        end) of
    {ok, Proc} ->
        {reply, {ok, Proc, Server#qserver.config}, Server};
    wait ->
        {noreply, add_to_waitlist({Lang}, From, Server)};
    Error ->
        {reply, Error, Server}
    end;
handle_call({unlink_proc, Pid}, _From, #qserver{pid_procs=PidProcs}=Server) ->
    rem_value(PidProcs, Pid),
    unlink(Pid),
    {reply, ok, Server};
handle_call({ret_proc, Proc}, _From, #qserver{
        pid_procs=PidProcs,
        lang_procs=LangProcs}=Server) ->
    % Along with max process limit, here we should check
    % if we're over the limit and discard when we are.
    add_value(PidProcs, Proc#proc.pid, Proc),
    add_to_list(LangProcs, Proc#proc.lang, Proc),
    link(Proc#proc.pid),
    {reply, true, service_waitlist(Server)}.

handle_cast(_Whatever, Server) ->
    {noreply, Server}.

handle_info({'EXIT', Pid, Status}, #qserver{
        pid_procs=PidProcs,
        lang_procs=LangProcs,
        lang_limits=LangLimits}=Server) ->
    case ets:lookup(PidProcs, Pid) of
    [{Pid, Proc}] ->
        case Status of
        normal -> ok;
        _ -> ?LOG_DEBUG("Linked process died abnormally: ~p (reason: ~p)", [Pid, Status])
        end,
        rem_value(PidProcs, Pid),
        catch rem_from_list(LangProcs, Proc#proc.lang, Proc),
        [{Lang, Lim, Current}] = ets:lookup(LangLimits, Proc#proc.lang),
        true = ets:insert(LangLimits, {Lang, Lim, Current-1}),
        {noreply, service_waitlist(Server)};
    [] ->
        case Status of
        normal ->
            {noreply, Server};
        _ ->
            {stop, Status, Server}
        end
    end.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

% Private API

add_to_waitlist(Info, From, #qserver{waitlist=Waitlist}=Server) ->
    Server#qserver{waitlist=[{Info, From}|Waitlist]}.

service_waitlist(#qserver{waitlist=[]}=Server) ->
    Server;
service_waitlist(#qserver{waitlist=Waitlist}=Server) ->
    [Oldest|RevWList] = lists:reverse(Waitlist),
    case service_waiting(Oldest, Server) of
    ok ->
        Server#qserver{waitlist=lists:reverse(RevWList)};
    wait ->
        Server#qserver{waitlist=Waitlist}
    end.

% todo get rid of duplication
service_waiting({{#doc{body={Props}}=DDoc, DDocKey}, From}, Server) ->
    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
    case lang_proc(Lang, Server, fun(Procs) ->
            % find a proc in the set that has the DDoc
            proc_with_ddoc(DDoc, DDocKey, Procs)
        end) of
    {ok, Proc} ->
        gen_server:reply(From, {ok, Proc, Server#qserver.config}),
        ok;
    wait -> % this should never happen
        wait;
    Error ->
        gen_server:reply(From, Error),
        ok
    end;
service_waiting({{Lang}, From}, Server) ->
    case lang_proc(Lang, Server, fun([P|_Procs]) ->
            {ok, P}
        end) of
    {ok, Proc} ->
        gen_server:reply(From, {ok, Proc, Server#qserver.config}),
        ok;
    wait -> % this should never happen
        wait;
    Error ->
        gen_server:reply(From, Error),
        ok
    end.

lang_proc(Lang, #qserver{
        langs=Langs,
        pid_procs=PidProcs,
        lang_procs=LangProcs,
        lang_limits=LangLimits}, PickFun) ->
    % Note to future self. Add max process limit.
    case ets:lookup(LangProcs, Lang) of
    [{Lang, [P|Procs]}] ->
        {ok, Proc} = PickFun([P|Procs]),
        rem_from_list(LangProcs, Lang, Proc),
        {ok, Proc};
    _ ->
        case (catch new_process(Langs, LangLimits, Lang)) of
        {ok, Proc} ->
            add_value(PidProcs, Proc#proc.pid, Proc),
            PickFun([Proc]);
        ErrorOrWait ->
            ErrorOrWait
        end
    end.

new_process(Langs, LangLimits, Lang) ->
    [{Lang, Lim, Current}] = ets:lookup(LangLimits, Lang),
    if (Lim == 0) or (Current < Lim) -> % Lim == 0 means no limit
        % we are below the limit for our language, make a new one
        case ets:lookup(Langs, Lang) of
        [{Lang, Mod, Func, Arg}] ->
            {ok, Pid} = apply(Mod, Func, Arg),
            true = ets:insert(LangLimits, {Lang, Lim, Current+1}),
            {ok, #proc{lang=Lang,
                       pid=Pid,
                       % Called via proc_prompt, proc_set_timeout, and proc_stop
                       prompt_fun={Mod, prompt},
                       set_timeout_fun={Mod, set_timeout},
                       stop_fun={Mod, stop}}};
        _ ->
            {unknown_query_language, Lang}
        end;
    true ->
        wait
    end.

proc_with_ddoc(DDoc, DDocKey, LangProcs) ->
    DDocProcs = lists:filter(fun(#proc{ddoc_keys=Keys}) ->
            lists:any(fun(Key) ->
                Key == DDocKey
            end, Keys)
        end, LangProcs),
    case DDocProcs of
        [DDocProc|_] ->
            ?LOG_DEBUG("DDocProc found for DDocKey: ~p",[DDocKey]),
            {ok, DDocProc};
        [] ->
            [TeachProc|_] = LangProcs,
            ?LOG_DEBUG("Teach ddoc to new proc ~p with DDocKey: ~p",[TeachProc, DDocKey]),
            {ok, SmartProc} = teach_ddoc(DDoc, DDocKey, TeachProc),
            {ok, SmartProc}
    end.

proc_prompt(Proc, Args) ->
    {Mod, Func} = Proc#proc.prompt_fun,
    apply(Mod, Func, [Proc#proc.pid, Args]).

proc_stop(Proc) ->
    {Mod, Func} = Proc#proc.stop_fun,
    apply(Mod, Func, [Proc#proc.pid]).

proc_set_timeout(Proc, Timeout) ->
    {Mod, Func} = Proc#proc.set_timeout_fun,
    apply(Mod, Func, [Proc#proc.pid, Timeout]).

teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
    % send ddoc over the wire
    % we only share the rev with the client we know to update code
    % but it only keeps the latest copy, per each ddoc, around.
    true = proc_prompt(Proc, [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
    % we should remove any other ddocs keys for this docid
    % because the query server overwrites without the rev
    Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
    % add ddoc to the proc
    {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.

get_ddoc_process(#doc{} = DDoc, DDocKey) ->
    % remove this case statement
    case gen_server:call(couch_query_servers, {get_proc, DDoc, DDocKey}) of
    {ok, Proc, {QueryConfig}} ->
        % process knows the ddoc
        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
        true ->
            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
            link(Proc#proc.pid),
            gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
            Proc;
        _ ->
            catch proc_stop(Proc),
            get_ddoc_process(DDoc, DDocKey)
        end;
    Error ->
        throw(Error)
    end.

get_os_process(Lang) ->
    case gen_server:call(couch_query_servers, {get_proc, Lang}) of
    {ok, Proc, {QueryConfig}} ->
        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
        true ->
            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
            link(Proc#proc.pid),
            gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}),
            Proc;
        _ ->
            catch proc_stop(Proc),
            get_os_process(Lang)
        end;
    Error ->
        throw(Error)
    end.

ret_os_process(Proc) ->
    true = gen_server:call(couch_query_servers, {ret_proc, Proc}),
    catch unlink(Proc#proc.pid),
    ok.

add_value(Tid, Key, Value) ->
    true = ets:insert(Tid, {Key, Value}).

rem_value(Tid, Key) ->
    true = ets:delete(Tid, Key).

add_to_list(Tid, Key, Value) ->
    case ets:lookup(Tid, Key) of
    [{Key, Vals}] ->
        true = ets:insert(Tid, {Key, [Value|Vals]});
    [] ->
        true = ets:insert(Tid, {Key, [Value]})
    end.

rem_from_list(Tid, Key, Value) when is_record(Value, proc)->
    Pid = Value#proc.pid,
    case ets:lookup(Tid, Key) of
    [{Key, Vals}] ->
        % make a new values list that doesn't include the Value arg
        NewValues = [Val || #proc{pid=P}=Val <- Vals, P /= Pid],
        ets:insert(Tid, {Key, NewValues});
    [] -> ok
    end;
rem_from_list(Tid, Key, Value) ->
    case ets:lookup(Tid, Key) of
    [{Key, Vals}] ->
        % make a new values list that doesn't include the Value arg
        NewValues = [Val || Val <- Vals, Val /= Value],
        ets:insert(Tid, {Key, NewValues});
    [] -> ok
    end.