% Licensed under the Apache License, Version 2.0 (the "License");
% you may not use this file except in compliance with the License.
%
% You may obtain a copy of the License at
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
% either express or implied.
%
% See the License for the specific language governing permissions
% and limitations under the License.
%
% This file drew much inspiration from erlview, which was written by and
% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
%
%
% This module provides the smallest possible native view-server.
% With this module in-place, you can add the following to your couch INI files:
%  [native_query_servers]
%  erlang={couch_native_process, start_link, []}
%
% Which will then allow following example map function to be used:
%
%  fun({Doc}) ->
%    % Below, we emit a single record - the _id as key, null as value
%    DocId = couch_util:get_value(Doc, <<"_id">>, null),
%    Emit(DocId, null)
%  end.
%
% which should be roughly the same as the javascript:
%    emit(doc._id, null);
%
% This module exposes enough functions such that a native erlang server can
% act as a fully-fleged view server, but no 'helper' functions specifically
% for simplifying your erlang view code.  It is expected other third-party
% extensions will evolve which offer useful layers on top of this view server
% to help simplify your view code.
-module(couch_native_process).
-behaviour(gen_server).

-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
         handle_info/2]).
-export([set_timeout/2, prompt/2]).

-define(STATE, native_proc_state).
-record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).

-include("couch_db.hrl").

start_link() ->
    gen_server:start_link(?MODULE, [], []).

% this is a bit messy, see also couch_query_servers handle_info
% stop(_Pid) ->
%     ok.

set_timeout(Pid, TimeOut) ->
    gen_server:call(Pid, {set_timeout, TimeOut}).

prompt(Pid, Data) when is_list(Data) ->
    gen_server:call(Pid, {prompt, Data}).

% gen_server callbacks
init([]) ->
    {ok, #evstate{ddocs=dict:new()}}.

handle_call({set_timeout, TimeOut}, _From, State) ->
    {reply, ok, State#evstate{timeout=TimeOut}};

handle_call({prompt, Data}, _From, State) ->
    ?LOG_DEBUG("Prompt native qs: ~s",[?JSON_ENCODE(Data)]),
    {NewState, Resp} = try run(State, to_binary(Data)) of
        {S, R} -> {S, R}
        catch
            throw:{error, Why} ->
                {State, [<<"error">>, Why, Why]}
        end,

    case Resp of
        {error, Reason} ->
            Msg = io_lib:format("couch native server error: ~p", [Reason]),
            {reply, [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)], NewState};
        [<<"error">> | Rest] ->
            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
            % TODO: markh? (jan)
            {reply, [<<"error">> | Rest], NewState};
        [<<"fatal">> | Rest] ->
            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
            % TODO: markh? (jan)
            {stop, fatal, [<<"error">> | Rest], NewState};
        Resp ->
            {reply, Resp, NewState}
    end.

handle_cast(foo, State) -> {noreply, State}.
handle_info({'EXIT',_,normal}, State) -> {noreply, State};
handle_info({'EXIT',_,Reason}, State) ->
    {stop, Reason, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVersion, State, _Extra) -> {ok, State}.

run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
    Pid ! {self(), list_row, Row},
    receive
        {Pid, chunks, Data} ->
            {State, [<<"chunks">>, Data]};
        {Pid, list_end, Data} ->
            receive
                {'EXIT', Pid, normal} -> ok
            after State#evstate.timeout ->
                throw({timeout, list_cleanup})
            end,
            process_flag(trap_exit, erlang:get(do_trap)),
            {State#evstate{list_pid=nil}, [<<"end">>, Data]}
    after State#evstate.timeout ->
        throw({timeout, list_row})
    end;
run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
    Pid ! {self(), list_end},
    Resp =
    receive
        {Pid, list_end, Data} ->
            receive
                {'EXIT', Pid, normal} -> ok
            after State#evstate.timeout ->
                throw({timeout, list_cleanup})
            end,
            [<<"end">>, Data]
    after State#evstate.timeout ->
        throw({timeout, list_end})
    end,
    process_flag(trap_exit, erlang:get(do_trap)),
    {State#evstate{list_pid=nil}, Resp};
run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) ->
    {State, [<<"error">>, list_error, list_error]};
run(#evstate{ddocs=DDocs}, [<<"reset">>]) ->
    {#evstate{ddocs=DDocs}, true};
run(#evstate{ddocs=DDocs}, [<<"reset">>, QueryConfig]) ->
    {#evstate{ddocs=DDocs, query_config=QueryConfig}, true};
run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
    FunInfo = makefun(State, BinFunc),
    {State#evstate{funs=Funs ++ [FunInfo]}, true};
run(State, [<<"map_doc">> , Doc]) ->
    Resp = lists:map(fun({Sig, Fun}) ->
        erlang:put(Sig, []),
        Fun(Doc),
        lists:reverse(erlang:get(Sig))
    end, State#evstate.funs),
    {State, Resp};
run(State, [<<"reduce">>, Funs, KVs]) ->
    {Keys, Vals} =
    lists:foldl(fun([K, V], {KAcc, VAcc}) ->
        {[K | KAcc], [V | VAcc]}
    end, {[], []}, KVs),
    Keys2 = lists:reverse(Keys),
    Vals2 = lists:reverse(Vals),
    {State, catch reduce(State, Funs, Keys2, Vals2, false)};
run(State, [<<"rereduce">>, Funs, Vals]) ->
    {State, catch reduce(State, Funs, null, Vals, true)};
run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
    DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
    {State#evstate{ddocs=DDocs2}, true};
run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) ->
    DDoc = load_ddoc(DDocs, DDocId),
    ddoc(State, DDoc, Rest);
run(_, Unknown) ->
    ?LOG_ERROR("Native Process: Unknown command: ~p~n", [Unknown]),
    throw({error, unknown_command}).
    
ddoc(State, {DDoc}, [FunPath, Args]) ->
    % load fun from the FunPath
    BFun = lists:foldl(fun
        (Key, {Props}) when is_list(Props) ->
            couch_util:get_value(Key, Props, nil);
        (_Key, Fun) when is_binary(Fun) ->
            Fun;
        (_Key, nil) ->
            throw({error, not_found});
        (_Key, _Fun) ->
            throw({error, malformed_ddoc})
        end, {DDoc}, FunPath),
    ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).

ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
    {State, (catch apply(Fun, Args))};
ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) ->
    Resp = lists:map(fun(Doc) -> (catch Fun(Doc, Req)) =:= true end, Docs),
    {State, [true, Resp]};
ddoc(State, {_, Fun}, [<<"shows">>|_], Args) ->
    Resp = case (catch apply(Fun, Args)) of
        FunResp when is_list(FunResp) ->
            FunResp;
        {FunResp} ->
            [<<"resp">>, {FunResp}];
        FunResp ->
            FunResp
    end,
    {State, Resp};
ddoc(State, {_, Fun}, [<<"updates">>|_], Args) ->
    Resp = case (catch apply(Fun, Args)) of
        [JsonDoc, JsonResp]  ->
            [<<"up">>, JsonDoc, JsonResp]
    end,
    {State, Resp};
ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
    Self = self(),
    SpawnFun = fun() ->
        LastChunk = (catch apply(Fun, Args)),
        case start_list_resp(Self, Sig) of
            started ->
                receive
                    {Self, list_row, _Row} -> ignore;
                    {Self, list_end} -> ignore
                after State#evstate.timeout ->
                    throw({timeout, list_cleanup_pid})
                end;
            _ ->
                ok
        end,
        LastChunks =
        case erlang:get(Sig) of
            undefined -> [LastChunk];
            OtherChunks -> [LastChunk | OtherChunks]
        end,
        Self ! {self(), list_end, lists:reverse(LastChunks)}
    end,
    erlang:put(do_trap, process_flag(trap_exit, true)),
    Pid = spawn_link(SpawnFun),
    Resp =
    receive
        {Pid, start, Chunks, JsonResp} ->
            [<<"start">>, Chunks, JsonResp]
    after State#evstate.timeout ->
        throw({timeout, list_start})
    end,
    {State#evstate{list_pid=Pid}, Resp}.

store_ddoc(DDocs, DDocId, DDoc) ->
    dict:store(DDocId, DDoc, DDocs).
load_ddoc(DDocs, DDocId) ->
    try dict:fetch(DDocId, DDocs) of
        {DDoc} -> {DDoc}
    catch
        _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))})
    end.

bindings(State, Sig) ->
    bindings(State, Sig, nil).
bindings(State, Sig, DDoc) ->
    Self = self(),

    Log = fun(Msg) ->
        ?LOG_INFO(Msg, [])
    end,

    Emit = fun(Id, Value) ->
        Curr = erlang:get(Sig),
        erlang:put(Sig, [[Id, Value] | Curr])
    end,

    Start = fun(Headers) ->
        erlang:put(list_headers, Headers)
    end,

    Send = fun(Chunk) ->
        Curr =
        case erlang:get(Sig) of
            undefined -> [];
            Else -> Else
        end,
        erlang:put(Sig, [Chunk | Curr])
    end,

    GetRow = fun() ->
        case start_list_resp(Self, Sig) of
            started ->
                ok;
            _ ->
                Chunks =
                case erlang:get(Sig) of
                    undefined -> [];
                    CurrChunks -> CurrChunks
                end,
                Self ! {self(), chunks, lists:reverse(Chunks)}
        end,
        erlang:put(Sig, []),
        receive
            {Self, list_row, Row} -> Row;
            {Self, list_end} -> nil
        after State#evstate.timeout ->
            throw({timeout, list_pid_getrow})
        end
    end,
   
    FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,

    Bindings = [
        {'Log', Log},
        {'Emit', Emit},
        {'Start', Start},
        {'Send', Send},
        {'GetRow', GetRow},
        {'FoldRows', FoldRows}
    ],
    case DDoc of
        {_Props} ->
            Bindings ++ [{'DDoc', DDoc}];
        _Else -> Bindings
    end.

% thanks to erlview, via:
% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
makefun(State, Source) ->
    Sig = couch_util:md5(Source),
    BindFuns = bindings(State, Sig),
    {Sig, makefun(State, Source, BindFuns)}.
makefun(State, Source, {DDoc}) ->
    Sig = couch_util:md5(lists:flatten([Source, term_to_binary(DDoc)])),
    BindFuns = bindings(State, Sig, {DDoc}),
    {Sig, makefun(State, Source, BindFuns)};
makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
    FunStr = binary_to_list(Source),
    {ok, Tokens, _} = erl_scan:string(FunStr),
    Form = case (catch erl_parse:parse_exprs(Tokens)) of
        {ok, [ParsedForm]} ->
            ParsedForm;
        {error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
            io:format(standard_error, "Syntax error on line: ~p~n", [LineNum]),
            io:format(standard_error, "~s~p~n", [Mesg, Params]),
            throw(Error)
    end,
    Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
        erl_eval:add_binding(Name, Fun, Acc)
    end, erl_eval:new_bindings(), BindFuns),
    {value, Fun, _} = erl_eval:expr(Form, Bindings),
    Fun.

reduce(State, BinFuns, Keys, Vals, ReReduce) ->
    Funs = case is_list(BinFuns) of
        true ->
            lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
        _ ->
            [makefun(State, BinFuns)]
    end,
    Reds = lists:map(fun({_Sig, Fun}) ->
        Fun(Keys, Vals, ReReduce)
    end, Funs),
    [true, Reds].

foldrows(GetRow, ProcRow, Acc) ->
    case GetRow() of
        nil ->
            {ok, Acc};
        Row ->
            case (catch ProcRow(Row, Acc)) of
                {ok, Acc2} ->
                    foldrows(GetRow, ProcRow, Acc2);
                {stop, Acc2} ->
                    {ok, Acc2}
            end
    end.

start_list_resp(Self, Sig) ->
    case erlang:get(list_started) of
        undefined ->
            Headers =
            case erlang:get(list_headers) of
                undefined -> {[{<<"headers">>, {[]}}]};
                CurrHdrs -> CurrHdrs
            end,
            Chunks =
            case erlang:get(Sig) of
                undefined -> [];
                CurrChunks -> CurrChunks
            end,
            Self ! {self(), start, lists:reverse(Chunks), Headers},
            erlang:put(list_started, true),
            erlang:put(Sig, []),
            started;
        _ ->
            ok
    end.

to_binary({Data}) ->
    Pred = fun({Key, Value}) ->
        {to_binary(Key), to_binary(Value)}
    end,
    {lists:map(Pred, Data)};
to_binary(Data) when is_list(Data) ->
    [to_binary(D) || D <- Data];
to_binary(null) ->
    null;
to_binary(true) ->
    true;
to_binary(false) ->
    false;
to_binary(Data) when is_atom(Data) ->
    list_to_binary(atom_to_list(Data));
to_binary(Data) ->
    Data.