From 15a175144d83d6177e9bbb923a7f7157e5ea8917 Mon Sep 17 00:00:00 2001 From: Christopher Lenz Date: Sun, 31 Aug 2008 09:43:41 +0000 Subject: Merged json_term_changes branch back into trunk. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@690668 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_query_servers.erl | 97 ++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 45 deletions(-) (limited to 'src/couchdb/couch_query_servers.erl') diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index ad207a63..9b1324eb 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -18,7 +18,7 @@ -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). -export([reduce/3, rereduce/3]). --export([test/0]). +% -export([test/0]). -include("couch_db.hrl"). @@ -32,7 +32,6 @@ readline(Port) -> readline(Port, []). readline(Port, Acc) -> - case get(query_server_timeout) of undefined -> Timeout = list_to_integer(couch_config:get( @@ -44,7 +43,7 @@ readline(Port, Acc) -> {Port, {data, {noeol, Data}}} -> readline(Port, [Data|Acc]); {Port, {data, {eol, Data}}} -> - lists:flatten(lists:reverse(Acc, Data)); + lists:reverse(Acc, Data); {Port, Err} -> catch port_close(Port), throw({map_process_error, Err}) @@ -54,8 +53,9 @@ readline(Port, Acc) -> end. read_json(Port) -> - case cjson:decode(readline(Port)) of - {obj, [{"log", Msg}]} when is_list(Msg) -> + Line = readline(Port), + case ?JSON_DECODE(Line) of + {[{<<"log">>,Msg}]} when is_binary(Msg) -> % we got a message to log. Log it and continue ?LOG_INFO("Query Server Log Message: ~s", [Msg]), read_json(Port); @@ -63,17 +63,15 @@ read_json(Port) -> Else end. -writeline(Port, String) -> - true = port_command(Port, String ++ "\n"). - % send command and get a response. prompt(Port, Json) -> - writeline(Port, cjson:encode(Json)), + Bin = iolist_to_binary([?JSON_ENCODE(Json) , "\n"]), + true = port_command(Port, Bin), case read_json(Port) of - {obj, [{"error", Id}, {"reason", Reason}]} -> - throw({list_to_atom(Id),Reason}); - {obj, [{"reason", Reason}, {"error", Id}]} -> - throw({list_to_atom(Id),Reason}); + {[{<<"error">>, Id}, {<<"reason">>, Reason}]} -> + throw({list_to_atom(binary_to_list(Id)),Reason}); + {[{<<"reason">>, Reason}, {<<"error">>, Id}]} -> + throw({list_to_atom(binary_to_list(Id)),Reason}); Result -> Result end. @@ -83,7 +81,7 @@ start_doc_map(Lang, Functions) -> Port = get_linked_port(Lang), % send the functions as json strings lists:foreach(fun(FunctionSource) -> - true = prompt(Port, {"add_fun", FunctionSource}) + true = prompt(Port, [<<"add_fun">>, FunctionSource]) end, Functions), {ok, {Lang, Port}}. @@ -93,13 +91,19 @@ map_docs({_Lang, Port}, Docs) -> Results = lists:map( fun(Doc) -> Json = couch_doc:to_json_obj(Doc, []), - Results = prompt(Port, {"map_doc", Json}), + + FunsResults = prompt(Port, [<<"map_doc">>, Json]), % the results are a json array of function map yields like this: - % {FunResults1, FunResults2 ...} + % [FunResults1, FunResults2 ...] % where funresults is are json arrays of key value pairs: - % {{Key1, Value1}, {Key2, Value2}} - % Convert to real lists, execept the key, value pairs - [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)] + % [[Key1, Value1], [Key2, Value2]] + % Convert the key, value pairs to tuples like + % [{Key1, Value1}, {Key2, Value2}] + lists:map( + fun(FunRs) -> + [list_to_tuple(FunResult) || FunResult <- FunRs] + end, + FunsResults) end, Docs), {ok, Results}. @@ -114,12 +118,13 @@ get_linked_port(Lang) -> case gen_server:call(couch_query_servers, {get_port, Lang}) of {ok, Port0} -> link(Port0), - true = prompt(Port0, {"reset"}), + true = prompt(Port0, [<<"reset">>]), Port0; {empty, Cmd} -> ?LOG_INFO("Spawning new ~s instance.", [Lang]), open_port({spawn, Cmd}, [stream, {line, 1000}, + binary, exit_status, hide]); Error -> @@ -152,8 +157,8 @@ rereduce(Lang, RedSrcs, ReducedValues) -> Grouped = group_reductions_results(ReducedValues), Results = lists:zipwith( fun(FunSrc, Values) -> - {true, {Result}} = - prompt(Port, {"rereduce", {FunSrc}, list_to_tuple(Values)}), + [true, [Result]] = + prompt(Port, [<<"rereduce">>, [FunSrc], Values]), Result end, RedSrcs, Grouped), @@ -164,10 +169,10 @@ reduce(_Lang, [], _KVs) -> {ok, []}; reduce(Lang, RedSrcs, KVs) -> Port = get_linked_port(Lang), - {true, Results} = prompt(Port, - {"reduce", list_to_tuple(RedSrcs), list_to_tuple(KVs)}), + [true, Results] = prompt(Port, + [<<"reduce">>, RedSrcs, KVs]), return_linked_port(Lang, Port), - {ok, tuple_to_list(Results)}. + {ok, Results}. init([]) -> @@ -182,18 +187,27 @@ init([]) -> ?MODULE:stop() end), - QueryServerList = couch_config:lookup_match( + QueryServers = couch_config:lookup_match( {{"query_servers", '$1'}, '$2'}, []), + QueryServers2 = + [{list_to_binary(Lang), Path} || {Lang, Path} <- QueryServers], - {ok, {QueryServerList, []}}. + {ok, {QueryServers2, []}}. terminate(_Reason, _Server) -> ok. handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) -> - case lists:keysearch(Lang, 1, LangPorts) of - {value, {_, Port}=LangPort} -> + case proplists:get_value(Lang, LangPorts) of + undefined -> + case proplists:get_value(Lang, QueryServerList) of + undefined -> % not a supported language + {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}}; + ServerCmd -> + {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}} + end; + Port -> Result = case catch port_connect(Port, FromPid) of true -> @@ -203,14 +217,7 @@ handle_call({get_port, Lang}, {FromPid, _}, {QueryServerList, LangPorts}) -> catch port_close(Port), Error end, - {reply, Result, {QueryServerList, LangPorts -- [LangPort]}}; - false -> - case lists:keysearch(Lang, 1, QueryServerList) of - {value, {_, ServerCmd}} -> - {reply, {empty, ServerCmd}, {QueryServerList, LangPorts}}; - false -> % not a supported language - {reply, {query_language_unknown, Lang}, {QueryServerList, LangPorts}} - end + {reply, Result, {QueryServerList, LangPorts -- [{Lang,Port}]}} end; handle_call({return_port, {Lang, Port}}, _From, {QueryServerList, LangPorts}) -> case catch port_connect(Port, self()) of @@ -241,11 +248,11 @@ code_change(_OldVsn, State, _Extra) -> % test() -> % test("../js/js -f main.js"). - -test() -> - start_link(), - {ok, DocMap} = start_doc_map("javascript", ["function(doc) {if (doc[0] == 'a') return doc[1];}"]), - {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]), - io:format("Results: ~w~n", [Results]), - stop_doc_map(DocMap), - ok. +% +% test(Cmd) -> +% start_link(Cmd), +% {ok, DocMap} = start_doc_map(<<"javascript">>, [<<"function(doc) {if (doc[0] == 'a') return doc[1];}">>]), +% {ok, Results} = map_docs(DocMap, [#doc{body={"a", "b"}}, #doc{body={"c", "d"}},#doc{body={"a", "c"}}]), +% io:format("Results: ~w~n", [Results]), +% stop_doc_map(DocMap), +% ok. -- cgit v1.2.3