diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-02 12:07:30 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-06-02 12:07:30 -0400 |
commit | 0340953f4ff3cc6ee09b786031442f97f79090c3 (patch) | |
tree | e6b87a407b634f6619bf640e5515cc0e744330a4 /src/fabric_all_docs.erl | |
parent | b12bcd244bdf743de8c91808cf03417b1ea9dde2 (diff) |
_all_docs with keys, closes BugzID 10218
Diffstat (limited to 'src/fabric_all_docs.erl')
-rw-r--r-- | src/fabric_all_docs.erl | 66 |
1 files changed, 65 insertions, 1 deletions
diff --git a/src/fabric_all_docs.erl b/src/fabric_all_docs.erl index 9304570a..77d21bcd 100644 --- a/src/fabric_all_docs.erl +++ b/src/fabric_all_docs.erl @@ -1,10 +1,11 @@ -module(fabric_all_docs). -export([go/4]). +-export([open_doc/3]). % exported for spawn -include("fabric.hrl"). -go(DbName, QueryArgs, Callback, Acc0) -> +go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}), Shard#shard{ref = Ref} @@ -26,6 +27,26 @@ go(DbName, QueryArgs, Callback, Acc0) -> {ok, NewState#collector.user_acc}; Error -> Error + end; + +go(DbName, QueryArgs, Callback, Acc0) -> + #view_query_args{ + direction = Dir, + include_docs = IncludeDocs, + limit = Limit0, + skip = Skip0, + keys = Keys + } = QueryArgs, + {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end), + Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) || + Id <- Keys], + Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end, + receive {'DOWN', Ref0, _, _, {ok, TotalRows}} -> + {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0), + {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1), + Callback(complete, Acc2) + after 10000 -> + Callback(timeout, Acc0) end. handle_message({rexi_DOWN, _, _, _}, nil, State) -> @@ -172,6 +193,8 @@ stop(#view_query_args{direction=fwd, end_key=EndKey}, #view_row{id=Id}) -> stop(#view_query_args{direction=rev, end_key=EndKey}, #view_row{id=Id}) -> couch_db_updater:less_docid(Id, EndKey). +transform_row(#view_row{key=Key, id=undefined}) -> + {row, {[{key,Key}, {error,not_found}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> {row, {[{key,Key}, {id,Id}, {value,Value}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> @@ -236,6 +259,47 @@ is_progress_possible(Counters) -> end, First, Rest), Head =:= Tail. +doc_receive_loop([], _, _, _, Acc) -> + {ok, Acc}; +doc_receive_loop(_, _, 0, _, Acc) -> + {ok, Acc}; +doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 -> + receive {'DOWN', Ref, process, Pid, #view_row{}} -> + doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc) + after 10000 -> + timeout + end; +doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) -> + receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> + case Callback(transform_row(Row), AccIn) of + {ok, Acc} -> + doc_receive_loop(Rest, 0, Limit-1, Callback, Acc); + {stop, Acc} -> + {ok, Acc} + end + after 10000 -> + timeout + end. + +open_doc(DbName, Id, IncludeDocs) -> + Row = case fabric:open_doc(DbName, Id, [deleted]) of + {not_found, missing} -> + Doc = undefined, + #view_row{key=Id}; + {ok, #doc{deleted=true, revs=Revs}} -> + Doc = null, + {RevPos, [RevId|_]} = Revs, + Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]}, + #view_row{key=Id, id=Id, value=Value}; + {ok, #doc{revs=Revs} = Doc0} -> + Doc = couch_doc:to_json_obj(Doc0, []), + {RevPos, [RevId|_]} = Revs, + Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]}, + #view_row{key=Id, id=Id, value=Value} + end, + exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end). + + % Instead of ets, let's use an ordered keylist. We'll need to revisit if we % have >> 100 shards, so a private interface is a good idea. - APK June 2010 |