summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2008-12-16 20:31:51 +0000
committerJohn Christopher Anderson <jchris@apache.org>2008-12-16 20:31:51 +0000
commit54ddcb1768b915e90d315e1f5ceba4f322b8e28b (patch)
tree368ddd1d421f88cb9f418d3c16d2c8bd28a5f707 /src
parent17b04c04c6004725f6115f7b4592d19eba4cda89 (diff)
couch_os_process to manage the JSON line protocol. thanks davisp.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@727132 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_db_update_notifier.erl21
-rw-r--r--src/couchdb/couch_os_process.erl161
-rw-r--r--src/couchdb/couch_query_servers.erl248
4 files changed, 277 insertions, 155 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 9db1539b..76d3f13e 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -55,6 +55,7 @@ source_files = \
couch_httpd_misc_handlers.erl \
couch_key_tree.erl \
couch_log.erl \
+ couch_os_process.erl \
couch_query_servers.erl \
couch_rep.erl \
couch_server.erl \
@@ -85,6 +86,7 @@ compiled_files = \
couch_httpd_misc_handlers.beam \
couch_key_tree.beam \
couch_log.beam \
+ couch_os_process.beam \
couch_query_servers.beam \
couch_rep.beam \
couch_server.beam \
diff --git a/src/couchdb/couch_db_update_notifier.erl b/src/couchdb/couch_db_update_notifier.erl
index f2f0342c..68948afb 100644
--- a/src/couchdb/couch_db_update_notifier.erl
+++ b/src/couchdb/couch_db_update_notifier.erl
@@ -37,12 +37,14 @@ stop(Pid) ->
couch_event_sup:stop(Pid).
init(Exec) when is_list(Exec) -> % an exe
- Port = open_port({spawn, Exec}, [stream, exit_status, hide]),
- {ok, Port};
+ {ok, couch_os_process:start_link(Exec, [], [stream, exit_status, hide])};
init(Else) ->
{ok, Else}.
-terminate(_Reason, _Port) ->
+terminate(_Reason, Pid) when is_pid(Pid) ->
+ couch_os_process:stop(Pid),
+ ok;
+terminate(_Reason, _State) ->
ok.
handle_event(Event, Fun) when is_function(Fun, 1) ->
@@ -51,16 +53,17 @@ handle_event(Event, Fun) when is_function(Fun, 1) ->
handle_event(Event, {Fun, FunAcc}) ->
FunAcc2 = Fun(Event, FunAcc),
{ok, {Fun, FunAcc2}};
-handle_event({EventAtom, DbName}, Port) ->
+handle_event({EventAtom, DbName}, Pid) ->
Obj = {[{type, list_to_binary(atom_to_list(EventAtom))}, {db, DbName}]},
- true = port_command(Port, ?JSON_ENCODE(Obj) ++ "\n"),
- {ok, Port}.
+ true = couch_os_process:write(Pid, Obj),
+ {ok, Pid}.
handle_call(_Request, State) ->
- {ok, ok, State}.
+ {reply, ok, State}.
-handle_info({'EXIT', _, _Reason}, _Port) ->
- remove_handler.
+handle_info({'EXIT', Pid, Reason}, Pid) ->
+ ?LOG_ERROR("Update notification process ~p died: ~p", [Pid, Reason]),
+ {stop, nil}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl
new file mode 100644
index 00000000..c0b98946
--- /dev/null
+++ b/src/couchdb/couch_os_process.erl
@@ -0,0 +1,161 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_os_process).
+-behaviour(gen_server).
+
+-export([start_link/1, start_link/2, start_link/3, stop/1]).
+-export([set_timeout/2, write/2, read/1, prompt/2, async/3]).
+-export([writeline/2, readline/1, writejson/2, readjson/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+-include("couch_db.hrl").
+
+-define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]).
+
+-record(os_proc,
+ {command,
+ port,
+ writer,
+ reader,
+ timeout
+ }).
+
+start_link(Command) ->
+ start_link(Command, []).
+start_link(Command, Options) ->
+ start_link(Command, Options, ?PORT_OPTIONS).
+start_link(Command, Options, PortOptions) ->
+ gen_server:start_link(couch_os_process, [Command, Options, PortOptions], []).
+
+stop(Pid) ->
+ gen_server:cast(Pid, stop).
+
+% Read/Write API
+set_timeout(Pid, TimeOut) when is_integer(TimeOut) ->
+ ok = gen_server:call(Pid, {set_timeout, TimeOut}).
+
+write(Pid, Data) ->
+ gen_server:call(Pid, {write, Data}).
+
+read(Pid) ->
+ gen_server:call(Pid, read).
+
+prompt(Pid, Data) ->
+ gen_server:call(Pid, {prompt, Data}).
+
+async(Pid, Data, CallBack) ->
+ gen_server:cast(Pid, {async, Data, CallBack}).
+
+% Utility functions for reading and writing
+% in custom functions
+writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
+ port_command(OsProc#os_proc.port, Data ++ "\n").
+
+readline(OsProc) when is_record(OsProc, os_proc) ->
+ readline(OsProc, []).
+readline(OsProc, Acc) when is_record(OsProc, os_proc) ->
+ #os_proc{port=Port} = OsProc,
+ receive
+ {Port, {data, {noeol, Data}}} ->
+ readline(OsProc, [Data|Acc]);
+ {Port, {data, {eol, Data}}} ->
+ lists:reverse(Acc, Data);
+ {Port, Err} ->
+ catch port_close(Port),
+ throw({os_process_error, Err})
+ after OsProc#os_proc.timeout ->
+ catch port_close(Port),
+ throw({os_process_error, "OS process timed out."})
+ end.
+
+% Standard JSON functions
+writejson(OsProc, Data) when is_record(OsProc, os_proc) ->
+ true = writeline(OsProc, ?JSON_ENCODE(Data)).
+
+readjson(OsProc) when is_record(OsProc, os_proc) ->
+ Line = readline(OsProc),
+ case ?JSON_DECODE(Line) of
+ {[{<<"log">>,Msg}]} when is_binary(Msg) ->
+ % we got a message to log. Log it and continue
+ ?LOG_INFO("OS Process Log Message: ~s", [Msg]),
+ readjson(OsProc);
+ {[{<<"error">>, Id}, {<<"reason">>, Reason}]} ->
+ throw({list_to_atom(binary_to_list(Id)),Reason});
+ {[{<<"reason">>, Reason}, {<<"error">>, Id}]} ->
+ throw({list_to_atom(binary_to_list(Id)),Reason});
+ Result ->
+ Result
+ end.
+
+% gen_server API
+init([Command, Options, PortOptions]) ->
+ BaseProc = #os_proc{
+ command=Command,
+ port=open_port({spawn, Command}, PortOptions),
+ writer=fun writejson/2,
+ reader=fun readjson/1,
+ timeout=5000
+ },
+ OsProc =
+ lists:foldl(fun(Opt, Proc) ->
+ case Opt of
+ {writer, Writer} when is_function(Writer) ->
+ Proc#os_proc{writer=Writer};
+ {reader, Reader} when is_function(Reader) ->
+ Proc#os_proc{reader=Reader};
+ {timeout, TimeOut} when is_integer(TimeOut) ->
+ Proc#os_proc{timeout=TimeOut}
+ end
+ end, BaseProc, Options),
+ {ok, OsProc}.
+
+terminate(_Reason, #os_proc{port=nil}) ->
+ ok;
+terminate(_Reason, #os_proc{port=Port}) ->
+ catch port_close(Port),
+ ok.
+
+handle_call({set_timeout, TimeOut}, _From, OsProc) ->
+ {reply, ok, OsProc#os_proc{timeout=TimeOut}};
+handle_call({write, Data}, _From, OsProc) ->
+ Writer = OsProc#os_proc.writer,
+ {reply, Writer(OsProc, Data), OsProc};
+handle_call(read, _From, OsProc) ->
+ Reader = OsProc#os_proc.reader,
+ {reply, Reader(OsProc), OsProc};
+handle_call({prompt, Data}, _From, OsProc) ->
+ #os_proc{writer=Writer, reader=Reader} = OsProc,
+ Writer(OsProc, Data),
+ {reply, Reader(OsProc), OsProc}.
+
+handle_cast({async, Data, CallBack}, OsProc) ->
+ #os_proc{writer=Writer, reader=Reader} = OsProc,
+ Writer(OsProc, Data),
+ CallBack(Reader(OsProc)),
+ {noreply, OsProc};
+handle_cast(stop, OsProc) ->
+ {stop, normal, OsProc};
+handle_cast(Msg, OsProc) ->
+ ?LOG_DEBUG("OS Proc: Unknown cast: ~p", [Msg]),
+ {noreply, OsProc}.
+
+handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) ->
+ ?LOG_ERROR("OS Process died with status: ~p", [Status]),
+ {stop, error, OsProc};
+handle_info(Msg, OsProc) ->
+ ?LOG_DEBUG("OS Proc: Unknown info: ~p", [Msg]),
+ {noreply, OsProc}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index 8465a632..b694d5e3 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -28,71 +28,20 @@ start_link() ->
stop() ->
exit(whereis(couch_query_servers), close).
-readline(Port) ->
- readline(Port, []).
-
-readline(Port, Acc) ->
- case get(query_server_timeout) of
- undefined ->
- Timeout = list_to_integer(couch_config:get(
- "couchdb", "view_timeout", "5000")),
- put(query_server_timeout, Timeout);
- Timeout -> ok
- end,
- receive
- {Port, {data, {noeol, Data}}} ->
- readline(Port, [Data|Acc]);
- {Port, {data, {eol, Data}}} ->
- lists:reverse(Acc, Data);
- {Port, Err} ->
- catch port_close(Port),
- throw({external_process_error, Err})
- after Timeout ->
- catch port_close(Port),
- throw({external_process_error, "External process timed out"})
- end.
-
-read_json(Port) ->
- Line = readline(Port),
- case ?JSON_DECODE(Line) of
- {[{<<"log">>,Msg}]} when is_binary(Msg) ->
- % we got a message to log. Log it and continue
- ?LOG_INFO("Query Server Log Message: ~s", [Msg]),
- read_json(Port);
- Else ->
- Else
- end.
-
-% send command and get a response.
-prompt(Port, Json) ->
- Bin = iolist_to_binary([?JSON_ENCODE(Json) , "\n"]),
- true = port_command(Port, Bin),
- case read_json(Port) of
- {[{<<"error">>, Id}, {<<"reason">>, Reason}]} ->
- throw({Id,Reason});
- {[{<<"reason">>, Reason}, {<<"error">>, Id}]} ->
- throw({Id,Reason});
- Result ->
- Result
- end.
-
-
start_doc_map(Lang, Functions) ->
- Port = get_linked_port(Lang),
- % send the functions as json strings
+ Pid = get_os_process(Lang),
lists:foreach(fun(FunctionSource) ->
- true = prompt(Port, [<<"add_fun">>, FunctionSource])
- end,
- Functions),
- {ok, {Lang, Port}}.
+ true = couch_os_process:prompt(Pid, [<<"add_fun">>, FunctionSource])
+ end, Functions),
+ {ok, {Lang, Pid}}.
-map_docs({_Lang, Port}, Docs) ->
+map_docs({_Lang, Pid}, Docs) ->
% send the documents
Results = lists:map(
fun(Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
- FunsResults = prompt(Port, [<<"map_doc">>, Json]),
+ FunsResults = couch_os_process:prompt(Pid, [<<"map_doc">>, Json]),
% the results are a json array of function map yields like this:
% [FunResults1, FunResults2 ...]
% where funresults is are json arrays of key value pairs:
@@ -111,30 +60,8 @@ map_docs({_Lang, Port}, Docs) ->
stop_doc_map(nil) ->
ok;
-stop_doc_map({Lang, Port}) ->
- return_linked_port(Lang, Port).
-
-get_linked_port(Lang) ->
- case gen_server:call(couch_query_servers, {get_port, Lang}) of
- {ok, Port0} ->
- link(Port0),
- true = prompt(Port0, [<<"reset">>]),
- Port0;
- {empty, Cmd} ->
- ?LOG_INFO("Spawning new ~s instance.", [Lang]),
- open_port({spawn, Cmd}, [stream,
- {line, 1000},
- binary,
- exit_status,
- hide]);
- Error ->
- throw(Error)
- end.
-
-return_linked_port(Lang, Port) ->
- ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}),
- true = unlink(Port),
- ok.
+stop_doc_map({Lang, Pid}) ->
+ ok = ret_os_process(Lang, Pid).
group_reductions_results([]) ->
[];
@@ -153,29 +80,29 @@ group_reductions_results(List) ->
rereduce(_Lang, [], _ReducedValues) ->
{ok, []};
rereduce(Lang, RedSrcs, ReducedValues) ->
- Port = get_linked_port(Lang),
+ Pid = get_os_process(Lang),
Grouped = group_reductions_results(ReducedValues),
Results = lists:zipwith(
fun(FunSrc, Values) ->
[true, [Result]] =
- prompt(Port, [<<"rereduce">>, [FunSrc], Values]),
+ couch_os_process:prompt(Pid, [<<"rereduce">>, [FunSrc], Values]),
Result
end, RedSrcs, Grouped),
- return_linked_port(Lang, Port),
+ ok = ret_os_process(Lang, Pid),
{ok, Results}.
reduce(_Lang, [], _KVs) ->
{ok, []};
reduce(Lang, RedSrcs, KVs) ->
- Port = get_linked_port(Lang),
- [true, Results] = prompt(Port,
+ Pid = get_os_process(Lang),
+ [true, Results] = couch_os_process:prompt(Pid,
[<<"reduce">>, RedSrcs, KVs]),
- return_linked_port(Lang, Port),
+ ok = ret_os_process(Lang, Pid),
{ok, Results}.
validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
- Port = get_linked_port(Lang),
+ Pid = get_os_process(Lang),
JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
JsonDiskDoc =
if DiskDoc == nil ->
@@ -183,7 +110,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
true ->
couch_doc:to_json_obj(DiskDoc, [revs])
end,
- try prompt(Port,
+ try couch_os_process:prompt(Pid,
[<<"validate">>, FunSrc, JsonEditDoc, JsonDiskDoc, Ctx]) of
1 ->
ok;
@@ -192,7 +119,7 @@ validate_doc_update(Lang, FunSrc, EditDoc, DiskDoc, Ctx) ->
{[{<<"unauthorized">>, Message}]} ->
throw({unauthorized, Message})
after
- return_linked_port(Lang, Port)
+ ok = ret_os_process(Lang, Pid)
end.
init([]) ->
@@ -206,72 +133,101 @@ init([]) ->
fun("query_servers" ++ _, _) ->
?MODULE:stop()
end),
-
- QueryServers = couch_config:get("query_servers"),
- QueryServers2 =
- [{list_to_binary(Lang), Path} || {Lang, Path} <- QueryServers],
-
- {ok, {QueryServers2, []}}.
+
+ Langs = ets:new(couch_query_server_langs, [set, private]),
+ PidLangs = ets:new(couch_query_server_pid_langs, [set, private]),
+ Pids = ets:new(couch_query_server_procs, [set, private]),
+ InUse = ets:new(couch_query_server_used, [set, private]),
+ lists:foreach(fun({Lang, Command}) ->
+ true = ets:insert(Langs, {?l2b(Lang), Command})
+ end, couch_config:get("query_servers")),
+ {ok, {Langs, PidLangs, Pids, InUse}}.
terminate(_Reason, _Server) ->
ok.
-handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) ->
- case proplists:get_value(Lang, LangPorts) of
- undefined ->
- case proplists:get_value(Lang, QueryServerList) of
- undefined -> % not a supported language
- {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}};
- ServerCmd ->
- {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}}
- end;
- Port ->
- Result =
- case catch port_connect(Port, FromPid) of
- true ->
- true = unlink(Port),
- {ok, Port};
- Error ->
- catch port_close(Port),
- Error
- end,
- {reply, Result, {QueryServerList, LangPorts -- [{Lang,Port}]}}
- end;
-handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) ->
- case catch port_connect(Port, self()) of
- true ->
- {reply, ok, {QueryServerList, [{Lang, Port} | LangPorts]}};
+handle_call({get_proc, Lang}, _From, {Langs, PidLangs, Pids, InUse}=Server) ->
+ % Note to future self. Add max process limit.
+ case ets:lookup(Pids, Lang) of
+ [{Lang, [Pid|_]}] ->
+ add_value(PidLangs, Pid, Lang),
+ rem_from_list(Pids, Lang, Pid),
+ add_to_list(InUse, Lang, Pid),
+ true = couch_os_process:prompt(Pid, [<<"reset">>]),
+ {reply, Pid, Server};
_ ->
- catch port_close(Port),
- {reply, ok, {QueryServerList, LangPorts}}
- end.
-
-handle_cast(_Whatever, {Cmd, Ports}) ->
- {noreply, {Cmd, Ports}}.
-
-handle_info({Port, {exit_status, Status}}, {QueryServerList, LangPorts}) ->
- case lists:keysearch(Port, 2, LangPorts) of
- {value, {Lang, _}} ->
+ {ok, Pid} = new_process(Langs, Lang),
+ add_to_list(InUse, Lang, Pid),
+ {reply, Pid, Server}
+ end;
+handle_call({ret_proc, Lang, Pid}, _From, {_, _, Pids, InUse}=Server) ->
+ % Along with max process limit, here we should check
+ % if we're over the limit and discard when we are.
+ add_to_list(Pids, Lang, Pid),
+ rem_from_list(InUse, Lang, Pid),
+ {reply, true, Server}.
+
+handle_cast(_Whatever, Server) ->
+ {noreply, Server}.
+
+handle_info({'EXIT', Pid, Status}, {Langs, PidLangs, Pids, InUse}) ->
+ case ets:lookup(PidLangs, Pid) of
+ [{Pid, Lang}] ->
case Status of
- 0 -> ok;
- _ -> ?LOG_ERROR("Abnormal shutdown of ~s query server process (exit_status: ~w).", [Lang, Status])
+ normal -> ok;
+ _ -> ?LOG_DEBUG("Linked process died abnromally: ~p (reason: ~p)", [Pid, Status])
end,
- {noreply, {QueryServerList, lists:keydelete(Port, 2, LangPorts)}};
- _ ->
- ?LOG_ERROR("Unknown linked port/process crash: ~p", [Port])
+ {ok, {
+ Langs,
+ rem_value(PidLangs, Pid),
+ rem_from_list(Pids, Lang, Pid),
+ rem_from_list(InUse, Lang, Pid)
+ }};
+ [] ->
+ ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]),
+ {ok, {Langs, PidLangs, Pids, InUse}}
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-% test() ->
-% test("../js/js -f main.js").
-%
-% test(Cmd) ->
-% start_link(Cmd),
-% {ok, DocMap} = start_doc_map(<<"javascript">>, [<<"function(doc) {if (doc[0] == 'a') return doc[1];}">>]),
-% {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]),
-% io:format("Results: ~w~n", [Results]),
-% stop_doc_map(DocMap),
-% ok.
+% Private API
+
+new_process(Langs, Lang) ->
+ Proc =
+ case ets:lookup(Langs, Lang) of
+ [{Lang, Command}] ->
+ couch_os_process:start_link(Command);
+ _ ->
+ throw({unknown_query_language, Lang})
+ end,
+ Proc.
+
+get_os_process(Lang) ->
+ gen_server:call(couch_query_servers, {get_proc, Lang}).
+
+ret_os_process(Lang, Pid) ->
+ true = gen_server:call(couch_query_servers, {ret_proc, Lang, Pid}),
+ ok.
+
+add_value(Tid, Key, Value) ->
+ true = ets:insert(Tid, {Key, Value}).
+
+rem_value(Tid, Key) ->
+ true = ets:insert(Tid, Key).
+
+add_to_list(Tid, Key, Value) ->
+ case ets:lookup(Tid, Key) of
+ [{Key, Vals}] ->
+ true = ets:insert(Tid, {Key, [Value|Vals]});
+ [] ->
+ true = ets:insert(Tid, {Key, [Value]})
+ end.
+
+rem_from_list(Tid, Key, Value) ->
+ case ets:lookup(Tid, Key) of
+ [{Key, Vals}] ->
+ ets:insert(Tid, {Key, [Val || Val <- Vals, Val /= Value]});
+ [] -> ok
+ end.