summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_query_servers.erl
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2008-12-16 20:31:51 +0000
committerJohn Christopher Anderson <jchris@apache.org>2008-12-16 20:31:51 +0000
commit54ddcb1768b915e90d315e1f5ceba4f322b8e28b (patch)
tree368ddd1d421f88cb9f418d3c16d2c8bd28a5f707 /src/couchdb/couch_query_servers.erl
parent17b04c04c6004725f6115f7b4592d19eba4cda89 (diff)
couch_os_process to manage the JSON line protocol. thanks davisp.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@727132 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r--src/couchdb/couch_query_servers.erl248
1 files changed, 102 insertions, 146 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 8465a632..b694d5e3 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -28,71 +28,20 @@ start_link() ->
stop() ->
exit(whereis(couch_query_servers), close).
-readline(Port) ->
- readline(Port, []).
-
-readline(Port, Acc) ->
- case get(query_server_timeout) of
- undefined ->
- Timeout = list_to_integer(couch_config:get(
- "couchdb", "view_timeout", "5000")),
- put(query_server_timeout, Timeout);
- Timeout -> ok
- end,
- receive
- {Port, {data, {noeol, Data}}} ->
- readline(Port, [Data|Acc]);
- {Port, {data, {eol, Data}}} ->
- lists:reverse(Acc, Data);
- {Port, Err} ->
- catch port_close(Port),
- throw({external_process_error, Err})
- after Timeout ->
- catch port_close(Port),
- throw({external_process_error, "External process timed out"})
- end.
-
-read_json(Port) ->
- Line = readline(Port),
- case ?JSON_DECODE(Line) of
- {[{<<"log">>,Msg}]} when is_binary(Msg) ->
- % we got a message to log. Log it and continue
- ?LOG_INFO("Query Server Log Message: ~s", [Msg]),
- read_json(Port);
- Else ->
- Else
- end.
-
-% send command and get a response.
-prompt(Port, Json) ->
- Bin = iolist_to_binary([?JSON_ENCODE(Json) , "\n"]),
- true = port_command(Port, Bin),
- case read_json(Port) of
- {[{<<"error">>, Id}, {<<"reason">>, Reason}]} ->
- throw({Id,Reason});
- {[{<<"reason">>, Reason}, {<<"error">>, Id}]} ->
- throw({Id,Reason});
- Result ->
- Result
- end.
-
-
start_doc_map(Lang, Functions) ->
- Port = get_linked_port(Lang),
- % send the functions as json strings
+ Pid = get_os_process(Lang),
lists:foreach(fun(FunctionSource) ->
- true = prompt(Port, [<<"add_fun">>, FunctionSource])
- end,
- Functions),
- {ok, {Lang, Port}}.
+ true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource])
+ end, Functions),
+ {ok, {Lang, Pid}}.
-map_docs({_Lang, Port}, Docs) ->
+map_docs({_Lang, Pid}, Docs) ->
% send the documents
Results = lists:map(
fun(Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
- FunsResults = prompt(Port, [<<"map_doc">>, Json]),
+ FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]),
% the results are a json array of function map yields like this:
% [FunResults1, FunResults2 ...]
% where funresults is are json arrays of key value pairs:
@@ -111,30 +60,8 @@ map_docs({_Lang, Port}, Docs) ->
stop_doc_map(nil) ->
ok;
-stop_doc_map({Lang, Port}) ->
- return_linked_port(Lang, Port).
-
-get_linked_port(Lang) ->
- case gen_server:call(couch_query_servers, {get_port, Lang}) of
- {ok, Port0} ->
- link(Port0),
- true = prompt(Port0, [<<"reset">>]),
- Port0;
- {empty, Cmd} ->
- ?LOG_INFO("Spawning new ~s instance.", [Lang]),
- open_port({spawn, Cmd}, [stream,
- {line, 1000},
- binary,
- exit_status,
- hide]);
- Error ->
- throw(Error)
- end.
-
-return_linked_port(Lang, Port) ->
- ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}),
- true = unlink(Port),
- ok.
+stop_doc_map({Lang, Pid}) ->
+ ok = ret_os_process(Lang, Pid).
group_reductions_results([]) ->
[];
@@ -153,29 +80,29 @@ group_reductions_results(List) ->
rereduce(_Lang, [], _ReducedValues) ->
{ok, []};
rereduce(Lang, RedSrcs, ReducedValues) ->
- Port = get_linked_port(Lang),
+ Pid = get_os_process(Lang),
Grouped = group_reductions_results(ReducedValues),
Results = lists:zipwith(
fun(FunSrc, Values) ->
[true, [Result]] =
- prompt(Port, [<<"rereduce">>, [FunSrc], Values]),
+ couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]),
Result
end, RedSrcs, Grouped),
- return_linked_port(Lang, Port),
+ ok = ret_os_process(Lang, Pid),
{ok, Results}.
reduce(_Lang, [], _KVs) ->
{ok, []};
reduce(Lang, RedSrcs, KVs) ->
- Port = get_linked_port(Lang),
- [true, Results] = prompt(Port,
+ Pid = get_os_process(Lang),
+ [true, Results] = couch_os_process:prompt(Pid,
[<<"reduce">>, RedSrcs, KVs]),
- return_linked_port(Lang, Port),
+ ok = ret_os_process(Lang, Pid),
{ok, Results}.
validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
- Port = get_linked_port(Lang),
+ Pid = get_os_process(Lang),
JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
JsonDiskDoc =
if DiskDoc == nil ->
@@ -183,7 +110,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
true ->
couch_doc:to_json_obj(DiskDoc, [revs])
end,
- try prompt(Port,
+ try couch_os_process:prompt(Pid,
[<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of
1 ->
ok;
@@ -192,7 +119,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
{[{<<"unauthorized">>, Message}]} ->
throw({unauthorized, Message})
after
- return_linked_port(Lang, Port)
+ ok = ret_os_process(Lang, Pid)
end.
init([]) ->
@@ -206,72 +133,101 @@ init([]) ->
fun("query_servers" ++ _, _) ->
?MODULE:stop()
end),
-
- QueryServers = couch_config:get("query_servers"),
- QueryServers2 =
- [{list_to_binary(Lang), Path} || {Lang, Path} <- QueryServers],
-
- {ok, {QueryServers2, []}}.
+
+ Langs = ets:new(couch_query_server_langs, [set, private]),
+ PidLangs = ets:new(couch_query_server_pid_langs, [set, private]),
+ Pids = ets:new(couch_query_server_procs, [set, private]),
+ InUse = ets:new(couch_query_server_used, [set, private]),
+ lists:foreach(fun({Lang, Command}) ->
+ true = ets:insert(Langs, {?l2b(Lang), Command})
+ end, couch_config:get("query_servers")),
+ {ok, {Langs, PidLangs, Pids, InUse}}.
terminate(_Reason, _Server) ->
ok.
-handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) ->
- case proplists:get_value(Lang, LangPorts) of
- undefined ->
- case proplists:get_value(Lang, QueryServerList) of
- undefined -> % not a supported language
- {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}};
- ServerCmd ->
- {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}}
- end;
- Port ->
- Result =
- case catch port_connect(Port, FromPid) of
- true ->
- true = unlink(Port),
- {ok, Port};
- Error ->
- catch port_close(Port),
- Error
- end,
- {reply, Result, {QueryServerList, LangPorts -- [{Lang,Port}]}}
- end;
-handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) ->
- case catch port_connect(Port, self()) of
- true ->
- {reply, ok, {QueryServerList, [{Lang, Port} | LangPorts]}};
+handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) ->
+ % Note to future self. Add max process limit.
+ case ets:lookup(Pids, Lang) of
+ [{Lang, [Pid|_]}] ->
+ add_value(PidLangs, Pid, Lang),
+ rem_from_list(Pids, Lang, Pid),
+ add_to_list(InUse, Lang, Pid),
+ true = couch_os_process:prompt(Pid, [<<"reset">>]),
+ {reply, Pid, Server};
_ ->
- catch port_close(Port),
- {reply, ok, {QueryServerList, LangPorts}}
- end.
-
-handle_cast(_Whatever, {Cmd, Ports}) ->
- {noreply, {Cmd, Ports}}.
-
-handle_info({Port, {exit_status, Status}}, {QueryServerList, LangPorts}) ->
- case lists:keysearch(Port, 2, LangPorts) of
- {value, {Lang, _}} ->
+ {ok, Pid} = new_process(Langs, Lang),
+ add_to_list(InUse, Lang, Pid),
+ {reply, Pid, Server}
+ end;
+handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) ->
+ % Along with max process limit, here we should check
+ % if we're over the limit and discard when we are.
+ add_to_list(Pids, Lang, Pid),
+ rem_from_list(InUse, Lang, Pid),
+ {reply, true, Server}.
+
+handle_cast(_Whatever, Server) ->
+ {noreply, Server}.
+
+handle_info({'EXIT', Pid, Status}, {Langs, PidLangs, Pids, InUse}) ->
+ case ets:lookup(PidLangs, Pid) of
+ [{Pid, Lang}] ->
case Status of
- 0 -> ok;
- _ -> ?LOG_ERROR("Abnormal shutdown of ~s query server process (exit_status: ~w).", [Lang, Status])
+ normal -> ok;
+ _ -> ?LOG_DEBUG("Linked process died abnromally: ~p (reason: ~p)", [Pid, Status])
end,
- {noreply, {QueryServerList, lists:keydelete(Port, 2, LangPorts)}};
- _ ->
- ?LOG_ERROR("Unknown linked port/process crash: ~p", [Port])
+ {ok, {
+ Langs,
+ rem_value(PidLangs, Pid),
+ rem_from_list(Pids, Lang, Pid),
+ rem_from_list(InUse, Lang, Pid)
+ }};
+ [] ->
+ ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]),
+ {ok, {Langs, PidLangs, Pids, InUse}}
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-% test() ->
-% test("../js/js -f main.js").
-%
-% test(Cmd) ->
-% start_link(Cmd),
-% {ok, DocMap} = start_doc_map(<<"javascript">>, [<<"function(doc) {if (doc[0] == 'a') return doc[1];}">>]),
-% {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]),
-% io:format("Results: ~w~n", [Results]),
-% stop_doc_map(DocMap),
-% ok.
+% Private API
+
+new_process(Langs, Lang) ->
+ Proc =
+ case ets:lookup(Langs, Lang) of
+ [{Lang, Command}] ->
+ couch_os_process:start_link(Command);
+ _ ->
+ throw({unknown_query_language, Lang})
+ end,
+ Proc.
+
+get_os_process(Lang) ->
+ gen_server:call(couch_query_servers, {get_proc, Lang}).
+
+ret_os_process(Lang, Pid) ->
+ true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}),
+ ok.
+
+add_value(Tid, Key, Value) ->
+ true = ets:insert(Tid, {Key, Value}).
+
+rem_value(Tid, Key) ->
+ true = ets:insert(Tid, Key).
+
+add_to_list(Tid, Key, Value) ->
+ case ets:lookup(Tid, Key) of
+ [{Key, Vals}] ->
+ true = ets:insert(Tid, {Key, [Value|Vals]});
+ [] ->
+ true = ets:insert(Tid, {Key, [Value]})
+ end.
+
+rem_from_list(Tid, Key, Value) ->
+ case ets:lookup(Tid, Key) of
+ [{Key, Vals}] ->
+ ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]});
+ [] -> ok
+ end.