summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-02 12:07:30 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-06-02 12:07:30 -0400
commit0340953f4ff3cc6ee09b786031442f97f79090c3 (patch)
treee6b87a407b634f6619bf640e5515cc0e744330a4 /src
parentb12bcd244bdf743de8c91808cf03417b1ea9dde2 (diff)
_all_docs with keys, closes BugzID 10218
Diffstat (limited to 'src')
-rw-r--r--src/fabric.erl18
-rw-r--r--src/fabric_all_docs.erl66
-rw-r--r--src/fabric_rpc.erl22
-rw-r--r--src/fabric_util.erl2
4 files changed, 91 insertions, 17 deletions
diff --git a/src/fabric.erl b/src/fabric.erl
index ed75a5ab..fd26abc3 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -1,10 +1,19 @@
-module(fabric).
--export([all_databases/1, create_db/2, delete_db/2, get_db_info/2, db_path/2]).
--export([open_doc/3, open_revs/4, get_missing_revs/2]).
--export([update_doc/3, update_docs/3]).
+% DBs
+-export([all_databases/1, create_db/2, delete_db/2, get_db_info/2,
+ get_doc_count/1]).
+
+% Documents
+-export([open_doc/3, open_revs/4, get_missing_revs/2, update_doc/3,
+ update_docs/3]).
+
+% Views
-export([all_docs/4]).
+% miscellany
+-export([db_path/2]).
+
-include("fabric.hrl").
% db operations
@@ -20,6 +29,9 @@ all_databases(Customer) ->
get_db_info(DbName, Customer) ->
fabric_get_db_info:get_db_info(dbname(DbName), Customer).
+get_doc_count(DbName) ->
+ fabric_doc_count:go(dbname(DbName)).
+
create_db(DbName, Options) ->
fabric_create_db:create_db(dbname(DbName), Options).
diff --git a/src/fabric_all_docs.erl b/src/fabric_all_docs.erl
index 9304570a..77d21bcd 100644
--- a/src/fabric_all_docs.erl
+++ b/src/fabric_all_docs.erl
@@ -1,10 +1,11 @@
-module(fabric_all_docs).
-export([go/4]).
+-export([open_doc/3]). % exported for spawn
-include("fabric.hrl").
-go(DbName, QueryArgs, Callback, Acc0) ->
+go(DbName, #view_query_args{keys=nil} = QueryArgs, Callback, Acc0) ->
Workers = lists:map(fun(#shard{name=Name, node=Node} = Shard) ->
Ref = rexi:cast(Node, {fabric_rpc, all_docs, [Name, QueryArgs]}),
Shard#shard{ref = Ref}
@@ -26,6 +27,26 @@ go(DbName, QueryArgs, Callback, Acc0) ->
{ok, NewState#collector.user_acc};
Error ->
Error
+ end;
+
+go(DbName, QueryArgs, Callback, Acc0) ->
+ #view_query_args{
+ direction = Dir,
+ include_docs = IncludeDocs,
+ limit = Limit0,
+ skip = Skip0,
+ keys = Keys
+ } = QueryArgs,
+ {_, Ref0} = spawn_monitor(fun() -> exit(fabric:get_doc_count(DbName)) end),
+ Monitors0 = [spawn_monitor(?MODULE, open_doc, [DbName, Id, IncludeDocs]) ||
+ Id <- Keys],
+ Monitors = if Dir=:=fwd -> Monitors0; true -> lists:reverse(Monitors0) end,
+ receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
+ {ok, Acc1} = Callback({total_and_offset, TotalRows, 0}, Acc0),
+ {ok, Acc2} = doc_receive_loop(Monitors, Skip0, Limit0, Callback, Acc1),
+ Callback(complete, Acc2)
+ after 10000 ->
+ Callback(timeout, Acc0)
end.
handle_message({rexi_DOWN, _, _, _}, nil, State) ->
@@ -172,6 +193,8 @@ stop(#view_query_args{direction=fwd, end_key=EndKey}, #view_row{id=Id}) ->
stop(#view_query_args{direction=rev, end_key=EndKey}, #view_row{id=Id}) ->
couch_db_updater:less_docid(Id, EndKey).
+transform_row(#view_row{key=Key, id=undefined}) ->
+ {row, {[{key,Key}, {error,not_found}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) ->
{row, {[{key,Key}, {id,Id}, {value,Value}]}};
transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) ->
@@ -236,6 +259,47 @@ is_progress_possible(Counters) ->
end, First, Rest),
Head =:= Tail.
+doc_receive_loop([], _, _, _, Acc) ->
+ {ok, Acc};
+doc_receive_loop(_, _, 0, _, Acc) ->
+ {ok, Acc};
+doc_receive_loop([{Pid,Ref}|Rest], Skip, Limit, Callback, Acc) when Skip > 0 ->
+ receive {'DOWN', Ref, process, Pid, #view_row{}} ->
+ doc_receive_loop(Rest, Skip-1, Limit-1, Callback, Acc)
+ after 10000 ->
+ timeout
+ end;
+doc_receive_loop([{Pid,Ref}|Rest], 0, Limit, Callback, AccIn) ->
+ receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
+ case Callback(transform_row(Row), AccIn) of
+ {ok, Acc} ->
+ doc_receive_loop(Rest, 0, Limit-1, Callback, Acc);
+ {stop, Acc} ->
+ {ok, Acc}
+ end
+ after 10000 ->
+ timeout
+ end.
+
+open_doc(DbName, Id, IncludeDocs) ->
+ Row = case fabric:open_doc(DbName, Id, [deleted]) of
+ {not_found, missing} ->
+ Doc = undefined,
+ #view_row{key=Id};
+ {ok, #doc{deleted=true, revs=Revs}} ->
+ Doc = null,
+ {RevPos, [RevId|_]} = Revs,
+ Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}, {deleted,true}]},
+ #view_row{key=Id, id=Id, value=Value};
+ {ok, #doc{revs=Revs} = Doc0} ->
+ Doc = couch_doc:to_json_obj(Doc0, []),
+ {RevPos, [RevId|_]} = Revs,
+ Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]},
+ #view_row{key=Id, id=Id, value=Value}
+ end,
+ exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
+
+
% Instead of ets, let's use an ordered keylist. We'll need to revisit if we
% have >> 100 shards, so a private interface is a good idea. - APK June 2010
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 1cae5ac5..c6aef20b 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -127,21 +127,17 @@ view_fold({{Key,Id}, Value}, _Offset, Acc) ->
true ->
{stop, Acc};
false ->
- Row = case IncludeDocs of
- true ->
+ Doc = if not IncludeDocs -> undefined; true ->
case couch_db:open_doc(Db, Id, []) of
- {not_found, missing} ->
- #view_row{key=Key, id=Id, value=Value, doc={error,missing}};
- {not_found, deleted} ->
- #view_row{key=Key, id=Id, value=Value};
- {ok, Doc} ->
- JsonDoc = couch_doc:to_json_obj(Doc, []),
- #view_row{key=Key, id=Id, value=Value, doc=JsonDoc}
- end;
- false ->
- #view_row{key=Key, id=Id, value=Value}
+ {not_found, deleted} ->
+ null;
+ {not_found, missing} ->
+ undefined;
+ {ok, Doc0} ->
+ couch_doc:to_json_obj(Doc0, [])
+ end
end,
- rexi:sync_reply(Row),
+ rexi:sync_reply(#view_row{key=Key, id=Id, value=Value, doc=Doc}),
{ok, Acc#view_acc{limit=Limit-1}}
end.
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index e904b456..760e511c 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -26,6 +26,8 @@ receive_loop(Workers, Keypos, Fun, Acc0) ->
%% @doc set up the receive loop with an overall timeout
-spec receive_loop([any()], integer(), function(), any(), timeout(), timeout()) ->
{ok, any()}.
+receive_loop(RefPartMap, Keypos, Fun, Acc0, infinity, PerMsgTO) ->
+ process_mailbox(RefPartMap, Keypos, Fun, Acc0, nil, PerMsgTO);
receive_loop(RefPartMap, Keypos, Fun, Acc0, GlobalTimeout, PerMsgTO) ->
TimeoutRef = erlang:make_ref(),
{ok, TRef} = timer:send_after(GlobalTimeout, {timeout, TimeoutRef}),