summaryrefslogtreecommitdiff
path: root/src/fabric_all_docs.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-02 12:07:30 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-02 12:07:30 -0400
commit0340953f4ff3cc6ee09b786031442f97f79090c3 (patch)
treee6b87a407b634f6619bf640e5515cc0e744330a4 /src/fabric_all_docs.erl
parentb12bcd244bdf743de8c91808cf03417b1ea9dde2 (diff)
_all_docs with keys, closes BugzID 10218
Diffstat (limited to 'src/fabric_all_docs.erl')
-rw-r--r--src/fabric_all_docs.erl66
1 files changed, 65 insertions, 1 deletions
diff --git a/src/fabric_all_docs.erl b/src/fabric_all_docs.erl
index 9304570a..77d21bcd 100644
--- a/src/fabric_all_docs.erl
+++ b/src/fabric_all_docs.erl
@@ -1,10 +1,11 @@
-module(fabric_all_docs).
-export([go/4]).
+-export([open_doc/3]). % exported for spawn
-include("fabric.hrl").
-go(DbName, QueryArgs, Callback, Acc0) ->
+go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}),
Shard#shard{ref = Ref}
@@ -26,6 +27,26 @@ go(DbName, QueryArgs, Callback, Acc0) ->
{ok, NewState#collector.user_acc};
Error ->
Error
+ end;
+
+go(DbName, QueryArgs, Callback, Acc0) ->
+ #view_query_args{
+ direction = Dir,
+ include_docs = IncludeDocs,
+ limit = Limit0,
+ skip = Skip0,
+ keys = Keys
+ } = QueryArgs,
+ {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end),
+ Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) ||
+ Id <- Keys],
+ Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end,
+ receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
+ {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0),
+ {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1),
+ Callback(complete, Acc2)
+ after 10000 ->
+ Callback(timeout, Acc0)
end.
handle_message({rexi_DOWN, _, _, _}, nil, State) ->
@@ -172,6 +193,8 @@ stop(#view_query_args{direction=fwd, end_key=EndKey}, #view_row{id=Id}) ->
stop(#view_query_args{direction=rev, end_key=EndKey}, #view_row{id=Id}) ->
couch_db_updater:less_docid(Id, EndKey).
+transform_row(#view_row{key=Key, id=undefined}) ->
+ {row, {[{key,Key}, {error,not_found}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
{row, {[{key,Key}, {id,Id}, {value,Value}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
@@ -236,6 +259,47 @@ is_progress_possible(Counters) ->
end, First, Rest),
Head =:= Tail.
+doc_receive_loop([], _, _, _, Acc) ->
+ {ok, Acc};
+doc_receive_loop(_, _, 0, _, Acc) ->
+ {ok, Acc};
+doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 ->
+ receive {'DOWN', Ref, process, Pid, #view_row{}} ->
+ doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc)
+ after 10000 ->
+ timeout
+ end;
+doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) ->
+ receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
+ case Callback(transform_row(Row), AccIn) of
+ {ok, Acc} ->
+ doc_receive_loop(Rest, 0, Limit-1, Callback, Acc);
+ {stop, Acc} ->
+ {ok, Acc}
+ end
+ after 10000 ->
+ timeout
+ end.
+
+open_doc(DbName, Id, IncludeDocs) ->
+ Row = case fabric:open_doc(DbName, Id, [deleted]) of
+ {not_found, missing} ->
+ Doc = undefined,
+ #view_row{key=Id};
+ {ok, #doc{deleted=true, revs=Revs}} ->
+ Doc = null,
+ {RevPos, [RevId|_]} = Revs,
+ Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]},
+ #view_row{key=Id, id=Id, value=Value};
+ {ok, #doc{revs=Revs} = Doc0} ->
+ Doc = couch_doc:to_json_obj(Doc0, []),
+ {RevPos, [RevId|_]} = Revs,
+ Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]},
+ #view_row{key=Id, id=Id, value=Value}
+ end,
+ exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
+
+
% Instead of ets, let's use an ordered keylist. We'll need to revisit if we
% have >> 100 shards, so a private interface is a good idea. - APK June 2010