summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_query_servers.erl
diff options
context:
space:
mode:
authorChristopher Lenz <cmlenz@apache.org>2008-08-31 09:43:41 +0000
committerChristopher Lenz <cmlenz@apache.org>2008-08-31 09:43:41 +0000
commit15a175144d83d6177e9bbb923a7f7157e5ea8917 (patch)
tree92b7becc9610c46f87ddf7ab4c313642b007c4aa /src/couchdb/couch_query_servers.erl
parentac4075a7987dc43aadeb18a94e07f090d1b77546 (diff)
Merged json_term_changes branch back into trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@690668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r--src/couchdb/couch_query_servers.erl97
1 files changed, 52 insertions, 45 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index ad207a63..9b1324eb 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -18,7 +18,7 @@
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
-export([reduce/3, rereduce/3]).
--export([test/0]).
+% -export([test/0]).
-include("couch_db.hrl").
@@ -32,7 +32,6 @@ readline(Port) ->
readline(Port, []).
readline(Port, Acc) ->
-
case get(query_server_timeout) of
undefined ->
Timeout = list_to_integer(couch_config:get(
@@ -44,7 +43,7 @@ readline(Port, Acc) ->
{Port, {data, {noeol, Data}}} ->
readline(Port, [Data|Acc]);
{Port, {data, {eol, Data}}} ->
- lists:flatten(lists:reverse(Acc, Data));
+ lists:reverse(Acc, Data);
{Port, Err} ->
catch port_close(Port),
throw({map_process_error, Err})
@@ -54,8 +53,9 @@ readline(Port, Acc) ->
end.
read_json(Port) ->
- case cjson:decode(readline(Port)) of
- {obj, [{"log", Msg}]} when is_list(Msg) ->
+ Line = readline(Port),
+ case ?JSON_DECODE(Line) of
+ {[{<<"log">>,Msg}]} when is_binary(Msg) ->
% we got a message to log. Log it and continue
?LOG_INFO("Query Server Log Message: ~s", [Msg]),
read_json(Port);
@@ -63,17 +63,15 @@ read_json(Port) ->
Else
end.
-writeline(Port, String) ->
- true = port_command(Port, String ++ "\n").
-
% send command and get a response.
prompt(Port, Json) ->
- writeline(Port, cjson:encode(Json)),
+ Bin = iolist_to_binary([?JSON_ENCODE(Json) , "\n"]),
+ true = port_command(Port, Bin),
case read_json(Port) of
- {obj, [{"error", Id}, {"reason", Reason}]} ->
- throw({list_to_atom(Id),Reason});
- {obj, [{"reason", Reason}, {"error", Id}]} ->
- throw({list_to_atom(Id),Reason});
+ {[{<<"error">>, Id}, {<<"reason">>, Reason}]} ->
+ throw({list_to_atom(binary_to_list(Id)),Reason});
+ {[{<<"reason">>, Reason}, {<<"error">>, Id}]} ->
+ throw({list_to_atom(binary_to_list(Id)),Reason});
Result ->
Result
end.
@@ -83,7 +81,7 @@ start_doc_map(Lang, Functions) ->
Port = get_linked_port(Lang),
% send the functions as json strings
lists:foreach(fun(FunctionSource) ->
- true = prompt(Port, {"add_fun", FunctionSource})
+ true = prompt(Port, [<<"add_fun">>, FunctionSource])
end,
Functions),
{ok, {Lang, Port}}.
@@ -93,13 +91,19 @@ map_docs({_Lang, Port}, Docs) ->
Results = lists:map(
fun(Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
- Results = prompt(Port, {"map_doc", Json}),
+
+ FunsResults = prompt(Port, [<<"map_doc">>, Json]),
% the results are a json array of function map yields like this:
- % {FunResults1, FunResults2 ...}
+ % [FunResults1, FunResults2 ...]
% where funresults is are json arrays of key value pairs:
- % {{Key1, Value1}, {Key2, Value2}}
- % Convert to real lists, execept the key, value pairs
- [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)]
+ % [[Key1, Value1], [Key2, Value2]]
+ % Convert the key, value pairs to tuples like
+ % [{Key1, Value1}, {Key2, Value2}]
+ lists:map(
+ fun(FunRs) ->
+ [list_to_tuple(FunResult) || FunResult <- FunRs]
+ end,
+ FunsResults)
end,
Docs),
{ok, Results}.
@@ -114,12 +118,13 @@ get_linked_port(Lang) ->
case gen_server:call(couch_query_servers, {get_port, Lang}) of
{ok, Port0} ->
link(Port0),
- true = prompt(Port0, {"reset"}),
+ true = prompt(Port0, [<<"reset">>]),
Port0;
{empty, Cmd} ->
?LOG_INFO("Spawning new ~s instance.", [Lang]),
open_port({spawn, Cmd}, [stream,
{line, 1000},
+ binary,
exit_status,
hide]);
Error ->
@@ -152,8 +157,8 @@ rereduce(Lang, RedSrcs, ReducedValues) ->
Grouped = group_reductions_results(ReducedValues),
Results = lists:zipwith(
fun(FunSrc, Values) ->
- {true, {Result}} =
- prompt(Port, {"rereduce", {FunSrc}, list_to_tuple(Values)}),
+ [true, [Result]] =
+ prompt(Port, [<<"rereduce">>, [FunSrc], Values]),
Result
end, RedSrcs, Grouped),
@@ -164,10 +169,10 @@ reduce(_Lang, [], _KVs) ->
{ok, []};
reduce(Lang, RedSrcs, KVs) ->
Port = get_linked_port(Lang),
- {true, Results} = prompt(Port,
- {"reduce", list_to_tuple(RedSrcs), list_to_tuple(KVs)}),
+ [true, Results] = prompt(Port,
+ [<<"reduce">>, RedSrcs, KVs]),
return_linked_port(Lang, Port),
- {ok, tuple_to_list(Results)}.
+ {ok, Results}.
init([]) ->
@@ -182,18 +187,27 @@ init([]) ->
?MODULE:stop()
end),
- QueryServerList = couch_config:lookup_match(
+ QueryServers = couch_config:lookup_match(
{{"query_servers", '$1'}, '$2'}, []),
+ QueryServers2 =
+ [{list_to_binary(Lang), Path} || {Lang, Path} <- QueryServers],
- {ok, {QueryServerList, []}}.
+ {ok, {QueryServers2, []}}.
terminate(_Reason, _Server) ->
ok.
handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) ->
- case lists:keysearch(Lang, 1, LangPorts) of
- {value, {_, Port}=LangPort} ->
+ case proplists:get_value(Lang, LangPorts) of
+ undefined ->
+ case proplists:get_value(Lang, QueryServerList) of
+ undefined -> % not a supported language
+ {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}};
+ ServerCmd ->
+ {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}}
+ end;
+ Port ->
Result =
case catch port_connect(Port, FromPid) of
true ->
@@ -203,14 +217,7 @@ handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) ->
catch port_close(Port),
Error
end,
- {reply, Result, {QueryServerList, LangPorts -- [LangPort]}};
- false ->
- case lists:keysearch(Lang, 1, QueryServerList) of
- {value, {_, ServerCmd}} ->
- {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}};
- false -> % not a supported language
- {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}}
- end
+ {reply, Result, {QueryServerList, LangPorts -- [{Lang,Port}]}}
end;
handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) ->
case catch port_connect(Port, self()) of
@@ -241,11 +248,11 @@ code_change(_OldVsn, State, _Extra) ->
% test() ->
% test("../js/js -f main.js").
-
-test() ->
- start_link(),
- {ok, DocMap} = start_doc_map("javascript", ["function(doc) {if (doc[0] == 'a') return doc[1];}"]),
- {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]),
- io:format("Results: ~w~n", [Results]),
- stop_doc_map(DocMap),
- ok.
+%
+% test(Cmd) ->
+% start_link(Cmd),
+% {ok, DocMap} = start_doc_map(<<"javascript">>, [<<"function(doc) {if (doc[0] == 'a') return doc[1];}">>]),
+% {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]),
+% io:format("Results: ~w~n", [Results]),
+% stop_doc_map(DocMap),
+% ok.