summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-05-28 13:15:38 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-05-28 13:18:57 -0400
commit450bc69ea891d9ad03c26c31c181c28ee5cbe8f9 (patch)
treec0a1b4db1fbd6354ef5c22eb832aad3f623d6962
parent8dc112522f8baef4454f3c0ca74fbbd920fc451d (diff)
implement fabric missing_revs, BugzID 10217
-rw-r--r--src/fabric.erl15
-rw-r--r--src/fabric_doc.erl56
-rw-r--r--src/fabric_rpc.erl23
3 files changed, 88 insertions, 6 deletions
diff --git a/src/fabric.erl b/src/fabric.erl
index 3a4575bd..26b379af 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -2,7 +2,8 @@
-export([all_databases/1, create_db/2, delete_db/2, get_db_info/2,
db_path/2]).
--export([open_doc/3, open_revs/4, update_doc/3, update_docs/3]).
+-export([open_doc/3, open_revs/4, get_missing_revs/2]).
+-export([update_doc/3, update_docs/3]).
-include("../../couch/src/couch_db.hrl").
@@ -34,6 +35,10 @@ open_doc(DbName, Id, Options) ->
open_revs(DbName, Id, Revs, Options) ->
fabric_doc:open_revs(dbname(DbName), docid(Id), Revs, Options).
+get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) ->
+ Sanitized = [idrevs(IdR) || IdR <- IdsRevs],
+ fabric_doc:get_missing_revs(dbname(DbName), Sanitized).
+
update_doc(DbName, Doc, Options) ->
{ok, [Result]} = update_docs(DbName, [Doc], Options),
Result.
@@ -70,6 +75,14 @@ doc({_} = Doc) ->
doc(Doc) ->
erlang:error({illegal_doc_format, Doc}).
+idrevs({Id, Revs}) when is_list(Revs) ->
+ {docid(Id), [rev(R) || R <- Revs]}.
+
+rev(Rev) when is_list(Rev); is_binary(Rev) ->
+ couch_doc:parse_rev(Rev);
+rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
+ Rev.
+
generate_customer_path("/", _Customer) ->
"";
generate_customer_path("/favicon.ico", _Customer) ->
diff --git a/src/fabric_doc.erl b/src/fabric_doc.erl
index 039b6aab..97e7cfb6 100644
--- a/src/fabric_doc.erl
+++ b/src/fabric_doc.erl
@@ -24,6 +24,15 @@ open_revs(DbName, Id, Revs, Options) ->
Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
fabric_util:recv(Workers, #shard.ref, fun handle_open_revs/3, Acc0).
+get_missing_revs(DbName, AllIdsRevs) ->
+ Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}),
+ Shard#shard{ref=Ref}
+ end, group_idrevs_by_shard(DbName, AllIdsRevs)),
+ ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]),
+ Acc0 = {length(Workers), ResultDict},
+ fabric_util:recv(Workers, #shard.ref, fun handle_missing_revs/3, Acc0).
+
update_docs(DbName, AllDocs, Options) ->
GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
@@ -39,9 +48,6 @@ update_docs(DbName, AllDocs, Options) ->
Else
end.
-get_missing_revs(_DbName, _IdsRevs) ->
- ok.
-
handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
skip_message(Acc0);
handle_open_doc(_Worker, {rexi_EXIT, _Reason}, Acc0) ->
@@ -66,6 +72,23 @@ handle_open_revs(_Worker, {rexi_EXIT, _}, Acc0) ->
handle_open_revs(_Worker, _Reply, {_WaitingCount, _R, _Replies}) ->
{stop, not_implemented}.
+handle_missing_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_missing_revs(_Worker, {rexi_EXIT, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_missing_revs(_Worker, {ok, Results}, {1, D0}) ->
+ D = update_dict(D0, Results),
+ {stop, dict:fold(fun force_missing_revs_reply/3, [], D)};
+handle_missing_revs(_Worker, {ok, Results}, {WaitingCount, D0}) ->
+ D = update_dict(D0, Results),
+ case dict:fold(fun maybe_missing_revs_reply/3, {stop, []}, D) of
+ continue ->
+ % still haven't heard about some Ids
+ {ok, {WaitingCount - 1, D}};
+ {stop, FinalReply} ->
+ {stop, FinalReply}
+ end.
+
handle_update_docs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
skip_message(Acc0);
handle_update_docs(_Worker, {rexi_EXIT, _}, Acc0) ->
@@ -92,6 +115,23 @@ handle_update_docs(Worker, {ok, Replies}, Acc0) ->
{ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}
end.
+force_missing_revs_reply(Id, {nil,Revs}, Acc) ->
+ % never heard about this ID, assume it's missing
+ [{Id, Revs} | Acc];
+force_missing_revs_reply(_, [], Acc) ->
+ Acc;
+force_missing_revs_reply(Id, Revs, Acc) ->
+ [{Id, Revs} | Acc].
+
+maybe_missing_revs_reply(_, _, continue) ->
+ continue;
+maybe_missing_revs_reply(_, {nil, _}, _) ->
+ continue;
+maybe_missing_revs_reply(_, [], {stop, Acc}) ->
+ {stop, Acc};
+maybe_missing_revs_reply(Id, Revs, {stop, Acc}) ->
+ {stop, [{Id, Revs} | Acc]}.
+
force_update_reply(Doc, Replies, {W, Acc}) ->
% TODO make a real decision here
case Replies of
@@ -129,6 +169,13 @@ group_docs_by_shard(DbName, Docs) ->
end, D0, partitions:for_key(DbName,Id))
end, dict:new(), Docs)).
+group_idrevs_by_shard(DbName, IdsRevs) ->
+ dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, {Id, Revs}, D1)
+ end, D0, partitions:for_key(DbName,Id))
+ end, dict:new(), IdsRevs)).
+
skip_message({1, _R, Replies}) ->
repair_read_quorum_failure(Replies);
skip_message({WaitingCount, R, Replies}) ->
@@ -142,6 +189,9 @@ merge_read_reply(Key, Reply, Replies) ->
{lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
end.
+update_dict(D0, KVs) ->
+ lists:foldl(fun({K,V}, D1) -> dict:store(K, V, D1) end, D0, KVs).
+
append_update_replies([], [], DocReplyDict) ->
DocReplyDict;
append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index c54ec247..75ce5e90 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -1,12 +1,31 @@
-module(fabric_rpc).
--export([open_doc/3, open_doc/4, get_db_info/1, update_docs/3]).
+-export([open_doc/3, open_doc/4, get_db_info/1, update_docs/3,
+ get_missing_revs/2]).
+-include("../../couch/src/couch_db.hrl").
-include_lib("eunit/include/eunit.hrl").
+
open_doc(DbName, DocId, Options) ->
- io:format("~p ~p ~p ~p~n", [?MODULE, DbName, DocId, Options]),
with_db(DbName, {couch_db, open_doc_int, [DocId, Options]}).
+get_missing_revs(DbName, IdRevsList) ->
+ % reimplement here so we get [] for Ids with no missing revs in response
+ rexi:reply(case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
+ {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
+ case FullDocInfoResult of
+ {ok, #full_doc_info{rev_tree=RevisionTree}} ->
+ {Id, couch_key_tree:find_missing(RevisionTree, Revs)};
+ not_found ->
+ {Id, Revs}
+ end
+ end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
+ Error ->
+ Error
+ end).
+
%% rpc endpoints
%% call to with_db will supply your M:F with a #db{} and then remaining args