summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_query_servers.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_query_servers.erl')
-rw-r--r--src/couchdb/couch_query_servers.erl111
1 files changed, 76 insertions, 35 deletions
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index a6e1750a..1fb2462c 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -17,7 +17,7 @@
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]).
-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
-
+-export([reduce/3, combine/3]).
-export([test/0, test/1]).
-include("couch_db.hrl").
@@ -65,32 +65,21 @@ writeline(Port, String) ->
% send command and get a response.
prompt(Port, Json) ->
writeline(Port, cjson:encode(Json)),
- read_json(Port).
+ case read_json(Port) of
+ {obj, [{"error", Id}, {"reason", Reason}]} ->
+ throw({list_to_atom(Id),Reason});
+ {obj, [{"reason", Reason}, {"error", Id}]} ->
+ throw({list_to_atom(Id),Reason});
+ Result ->
+ Result
+ end.
start_doc_map(Lang, Functions) ->
- Port =
- case gen_server:call(couch_query_servers, {get_port, Lang}) of
- {ok, Port0} ->
- link(Port0),
- Port0;
- {empty, Cmd} ->
- ?LOG_INFO("Spawning new ~s instance.", [Lang]),
- open_port({spawn, Cmd}, [stream,
- {line, 1000},
- exit_status,
- hide]);
- Error ->
- throw(Error)
- end,
- true = prompt(Port, {"reset"}),
+ Port = get_linked_port(Lang),
% send the functions as json strings
lists:foreach(fun(FunctionSource) ->
- case prompt(Port, {"add_fun", FunctionSource}) of
- true -> ok;
- {obj, [{"error", Id}, {"reason", Reason}]} ->
- throw({Id, Reason})
- end
+ true = prompt(Port, {"add_fun", FunctionSource})
end,
Functions),
{ok, {Lang, Port}}.
@@ -100,19 +89,13 @@ map_docs({_Lang, Port}, Docs) ->
Results = lists:map(
fun(Doc) ->
Json = couch_doc:to_json_obj(Doc, []),
- case prompt(Port, {"map_doc", Json}) of
- {obj, [{"error", Id}, {"reason", Reason}]} ->
- throw({list_to_atom(Id),Reason});
- {obj, [{"reason", Reason}, {"error", Id}]} ->
- throw({list_to_atom(Id),Reason});
- Results when is_tuple(Results) ->
- % the results are a json array of function map yields like this:
- % {FunResults1, FunResults2 ...}
- % where funresults is are json arrays of key value pairs:
- % {{Key1, Value1}, {Key2, Value2}}
- % Convert to real lists, execept the key, value pairs
- [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)]
- end
+ Results = prompt(Port, {"map_doc", Json}),
+ % the results are a json array of function map yields like this:
+ % {FunResults1, FunResults2 ...}
+ % where funresults is are json arrays of key value pairs:
+ % {{Key1, Value1}, {Key2, Value2}}
+ % Convert to real lists, execept the key, value pairs
+ [tuple_to_list(FunResult) || FunResult <- tuple_to_list(Results)]
end,
Docs),
{ok, Results}.
@@ -121,10 +104,68 @@ map_docs({_Lang, Port}, Docs) ->
stop_doc_map(nil) ->
ok;
stop_doc_map({Lang, Port}) ->
+ return_linked_port(Lang, Port).
+
+get_linked_port(Lang) ->
+ case gen_server:call(couch_query_servers, {get_port, Lang}) of
+ {ok, Port0} ->
+ link(Port0),
+ true = prompt(Port0, {"reset"}),
+ Port0;
+ {empty, Cmd} ->
+ ?LOG_INFO("Spawning new ~s instance.", [Lang]),
+ open_port({spawn, Cmd}, [stream,
+ {line, 1000},
+ exit_status,
+ hide]);
+ Error ->
+ throw(Error)
+ end.
+
+return_linked_port(Lang, Port) ->
ok = gen_server:call(couch_query_servers, {return_port, {Lang, Port}}),
true = unlink(Port),
ok.
+group_reductions_results([]) ->
+ [];
+group_reductions_results(List) ->
+ {Heads, Tails} = lists:foldl(
+ fun([H|T], {HAcc,TAcc}) ->
+ {[H|HAcc], [T|TAcc]}
+ end, {[], []}, List),
+ case Tails of
+ [[]|_] -> % no tails left
+ [Heads];
+ _ ->
+ [Heads | group_reductions_results(Tails)]
+ end.
+
+combine(_Lang, [], _ReducedValues) ->
+ {ok, []};
+combine(Lang, RedSrcs, ReducedValues) ->
+ Port = get_linked_port(Lang),
+ Grouped = group_reductions_results(ReducedValues),
+ Results = lists:zipwith(
+ fun(FunSrc, Values) ->
+ {true, {Result}} =
+ prompt(Port, {"combine", {FunSrc}, list_to_tuple(Values)}),
+ Result
+ end, RedSrcs, Grouped),
+
+ return_linked_port(Lang, Port),
+ {ok, Results}.
+
+reduce(_Lang, [], _KVs) ->
+ {ok, []};
+reduce(Lang, RedSrcs, KVs) ->
+ Port = get_linked_port(Lang),
+ {true, Results} = prompt(Port,
+ {"reduce", list_to_tuple(RedSrcs), list_to_tuple(KVs)}),
+ return_linked_port(Lang, Port),
+ {ok, tuple_to_list(Results)}.
+
+
init(QueryServerList) ->
{ok, {QueryServerList, []}}.