diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-02 12:07:30 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-06-02 12:07:30 -0400 |
commit | 0340953f4ff3cc6ee09b786031442f97f79090c3 (patch) | |
tree | e6b87a407b634f6619bf640e5515cc0e744330a4 /src | |
parent | b12bcd244bdf743de8c91808cf03417b1ea9dde2 (diff) |
_all_docs with keys, closes BugzID 10218
Diffstat (limited to 'src')
-rw-r--r-- | src/fabric.erl | 18 | ||||
-rw-r--r-- | src/fabric_all_docs.erl | 66 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 22 | ||||
-rw-r--r-- | src/fabric_util.erl | 2 |
4 files changed, 91 insertions, 17 deletions
diff --git a/src/fabric.erl b/src/fabric.erl index ed75a5ab..fd26abc3 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -1,10 +1,19 @@ -module(fabric). --export([all_databases/1, create_db/2, delete_db/2, get_db_info/2, db_path/2]). --export([open_doc/3, open_revs/4, get_missing_revs/2]). --export([update_doc/3, update_docs/3]). +% DBs +-export([all_databases/1, create_db/2, delete_db/2, get_db_info/2, + get_doc_count/1]). + +% Documents +-export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3, + update_docs/3]). + +% Views -export([all_docs/4]). +% miscellany +-export([db_path/2]). + -include("fabric.hrl"). % db operations @@ -20,6 +29,9 @@ all_databases(Customer) -> get_db_info(DbName, Customer) -> fabric_get_db_info:get_db_info(dbname(DbName), Customer). +get_doc_count(DbName) -> + fabric_doc_count:go(dbname(DbName)). + create_db(DbName, Options) -> fabric_create_db:create_db(dbname(DbName), Options). diff --git a/src/fabric_all_docs.erl b/src/fabric_all_docs.erl index 9304570a..77d21bcd 100644 --- a/src/fabric_all_docs.erl +++ b/src/fabric_all_docs.erl @@ -1,10 +1,11 @@ -module(fabric_all_docs). -export([go/4]). +-export([open_doc/3]). % exported for spawn -include("fabric.hrl"). -go(DbName, QueryArgs, Callback, Acc0) -> +go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) -> Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) -> Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}), Shard#shard{ref = Ref} @@ -26,6 +27,26 @@ go(DbName, QueryArgs, Callback, Acc0) -> {ok, NewState#collector.user_acc}; Error -> Error + end; + +go(DbName, QueryArgs, Callback, Acc0) -> + #view_query_args{ + direction = Dir, + include_docs = IncludeDocs, + limit = Limit0, + skip = Skip0, + keys = Keys + } = QueryArgs, + {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end), + Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) || + Id <- Keys], + Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end, + receive {'DOWN', Ref0, _, _, {ok, TotalRows}} -> + {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0), + {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1), + Callback(complete, Acc2) + after 10000 -> + Callback(timeout, Acc0) end. handle_message({rexi_DOWN, _, _, _}, nil, State) -> @@ -172,6 +193,8 @@ stop(#view_query_args{direction=fwd, end_key=EndKey}, #view_row{id=Id}) -> stop(#view_query_args{direction=rev, end_key=EndKey}, #view_row{id=Id}) -> couch_db_updater:less_docid(Id, EndKey). +transform_row(#view_row{key=Key, id=undefined}) -> + {row, {[{key,Key}, {error,not_found}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> {row, {[{key,Key}, {id,Id}, {value,Value}]}}; transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> @@ -236,6 +259,47 @@ is_progress_possible(Counters) -> end, First, Rest), Head =:= Tail. +doc_receive_loop([], _, _, _, Acc) -> + {ok, Acc}; +doc_receive_loop(_, _, 0, _, Acc) -> + {ok, Acc}; +doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 -> + receive {'DOWN', Ref, process, Pid, #view_row{}} -> + doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc) + after 10000 -> + timeout + end; +doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) -> + receive {'DOWN', Ref, process, Pid, #view_row{} = Row} -> + case Callback(transform_row(Row), AccIn) of + {ok, Acc} -> + doc_receive_loop(Rest, 0, Limit-1, Callback, Acc); + {stop, Acc} -> + {ok, Acc} + end + after 10000 -> + timeout + end. + +open_doc(DbName, Id, IncludeDocs) -> + Row = case fabric:open_doc(DbName, Id, [deleted]) of + {not_found, missing} -> + Doc = undefined, + #view_row{key=Id}; + {ok, #doc{deleted=true, revs=Revs}} -> + Doc = null, + {RevPos, [RevId|_]} = Revs, + Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]}, + #view_row{key=Id, id=Id, value=Value}; + {ok, #doc{revs=Revs} = Doc0} -> + Doc = couch_doc:to_json_obj(Doc0, []), + {RevPos, [RevId|_]} = Revs, + Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]}, + #view_row{key=Id, id=Id, value=Value} + end, + exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end). + + % Instead of ets, let's use an ordered keylist. We'll need to revisit if we % have >> 100 shards, so a private interface is a good idea. - APK June 2010 diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index 1cae5ac5..c6aef20b 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -127,21 +127,17 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) -> true -> {stop, Acc}; false -> - Row = case IncludeDocs of - true -> + Doc = if not IncludeDocs -> undefined; true -> case couch_db:open_doc(Db, Id, []) of - {not_found, missing} -> - #view_row{key=Key, id=Id, value=Value, doc={error,missing}}; - {not_found, deleted} -> - #view_row{key=Key, id=Id, value=Value}; - {ok, Doc} -> - JsonDoc = couch_doc:to_json_obj(Doc, []), - #view_row{key=Key, id=Id, value=Value, doc=JsonDoc} - end; - false -> - #view_row{key=Key, id=Id, value=Value} + {not_found, deleted} -> + null; + {not_found, missing} -> + undefined; + {ok, Doc0} -> + couch_doc:to_json_obj(Doc0, []) + end end, - rexi:sync_reply(Row), + rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}), {ok, Acc#view_acc{limit=Limit-1}} end. diff --git a/src/fabric_util.erl b/src/fabric_util.erl index e904b456..760e511c 100644 --- a/src/fabric_util.erl +++ b/src/fabric_util.erl @@ -26,6 +26,8 @@ receive_loop(Workers, Keypos, Fun, Acc0) -> %% @doc set up the receive loop with an overall timeout -spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) -> {ok, any()}. +receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) -> + process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO); receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) -> TimeoutRef = erlang:make_ref(), {ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}), |