diff options
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 111 |
1 files changed, 76 insertions, 35 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index a6e1750a..1fb2462c 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -17,7 +17,7 @@ -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). - +-export([reduce/3, combine/3]). -export([test/0, test/1]). -include("couch_db.hrl"). @@ -65,32 +65,21 @@ writeline(Port, String) -> % send command and get a response. prompt(Port, Json) -> writeline(Port, cjson:encode(Json)), - read_json(Port). + case read_json(Port) of + {obj, [{"error", Id}, {"reason", Reason}]} -> + throw({list_to_atom(Id),Reason}); + {obj, [{"reason", Reason}, {"error", Id}]} -> + throw({list_to_atom(Id),Reason}); + Result -> + Result + end. start_doc_map(Lang, Functions) -> - Port = - case gen_server:call(couch_query_servers, {get_port, Lang}) of - {ok, Port0} -> - link(Port0), - Port0; - {empty, Cmd} -> - ?LOG_INFO("Spawning new ~s instance.", [Lang]), - open_port({spawn, Cmd}, [stream, - {line, 1000}, - exit_status, - hide]); - Error -> - throw(Error) - end, - true = prompt(Port, {"reset"}), + Port = get_linked_port(Lang), % send the functions as json strings lists:foreach(fun(FunctionSource) -> - case prompt(Port, {"add_fun", FunctionSource}) of - true -> ok; - {obj, [{"error", Id}, {"reason", Reason}]} -> - throw({Id, Reason}) - end + true = prompt(Port, {"add_fun", FunctionSource}) end, Functions), {ok, {Lang, Port}}. @@ -100,19 +89,13 @@ map_docs({_Lang, Port}, Docs) -> Results = lists:map( fun(Doc) -> Json = couch_doc:to_json_obj(Doc, []), - case prompt(Port, {"map_doc", Json}) of - {obj, [{"error", Id}, {"reason", Reason}]} -> - throw({list_to_atom(Id),Reason}); - {obj, [{"reason", Reason}, {"error", Id}]} -> - throw({list_to_atom(Id),Reason}); - Results when is_tuple(Results) -> - % the results are a json array of function map yields like this: - % {FunResults1, FunResults2 ...} - % where funresults is are json arrays of key value pairs: - % {{Key1, Value1}, {Key2, Value2}} - % Convert to real lists, execept the key, value pairs - [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)] - end + Results = prompt(Port, {"map_doc", Json}), + % the results are a json array of function map yields like this: + % {FunResults1, FunResults2 ...} + % where funresults is are json arrays of key value pairs: + % {{Key1, Value1}, {Key2, Value2}} + % Convert to real lists, execept the key, value pairs + [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)] end, Docs), {ok, Results}. @@ -121,10 +104,68 @@ map_docs({_Lang, Port}, Docs) -> stop_doc_map(nil) -> ok; stop_doc_map({Lang, Port}) -> + return_linked_port(Lang, Port). + +get_linked_port(Lang) -> + case gen_server:call(couch_query_servers, {get_port, Lang}) of + {ok, Port0} -> + link(Port0), + true = prompt(Port0, {"reset"}), + Port0; + {empty, Cmd} -> + ?LOG_INFO("Spawning new ~s instance.", [Lang]), + open_port({spawn, Cmd}, [stream, + {line, 1000}, + exit_status, + hide]); + Error -> + throw(Error) + end. + +return_linked_port(Lang, Port) -> ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}), true = unlink(Port), ok. +group_reductions_results([]) -> + []; +group_reductions_results(List) -> + {Heads, Tails} = lists:foldl( + fun([H|T], {HAcc,TAcc}) -> + {[H|HAcc], [T|TAcc]} + end, {[], []}, List), + case Tails of + [[]|_] -> % no tails left + [Heads]; + _ -> + [Heads | group_reductions_results(Tails)] + end. + +combine(_Lang, [], _ReducedValues) -> + {ok, []}; +combine(Lang, RedSrcs, ReducedValues) -> + Port = get_linked_port(Lang), + Grouped = group_reductions_results(ReducedValues), + Results = lists:zipwith( + fun(FunSrc, Values) -> + {true, {Result}} = + prompt(Port, {"combine", {FunSrc}, list_to_tuple(Values)}), + Result + end, RedSrcs, Grouped), + + return_linked_port(Lang, Port), + {ok, Results}. + +reduce(_Lang, [], _KVs) -> + {ok, []}; +reduce(Lang, RedSrcs, KVs) -> + Port = get_linked_port(Lang), + {true, Results} = prompt(Port, + {"reduce", list_to_tuple(RedSrcs), list_to_tuple(KVs)}), + return_linked_port(Lang, Port), + {ok, tuple_to_list(Results)}. + + init(QueryServerList) -> {ok, {QueryServerList, []}}. |