summaryrefslogtreecommitdiff
path: root/src/fabric_doc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric_doc.erl')
-rw-r--r--src/fabric_doc.erl56
1 files changed, 53 insertions, 3 deletions
diff --git a/src/fabric_doc.erl b/src/fabric_doc.erl
index 039b6aab..97e7cfb6 100644
--- a/src/fabric_doc.erl
+++ b/src/fabric_doc.erl
@@ -24,6 +24,15 @@ open_revs(DbName, Id, Revs, Options) ->
Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
fabric_util:recv(Workers, #shard.ref, fun handle_open_revs/3, Acc0).
+get_missing_revs(DbName, AllIdsRevs) ->
+ Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}),
+ Shard#shard{ref=Ref}
+ end, group_idrevs_by_shard(DbName, AllIdsRevs)),
+ ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]),
+ Acc0 = {length(Workers), ResultDict},
+ fabric_util:recv(Workers, #shard.ref, fun handle_missing_revs/3, Acc0).
+
update_docs(DbName, AllDocs, Options) ->
GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
@@ -39,9 +48,6 @@ update_docs(DbName, AllDocs, Options) ->
Else
end.
-get_missing_revs(_DbName, _IdsRevs) ->
- ok.
-
handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
skip_message(Acc0);
handle_open_doc(_Worker, {rexi_EXIT, _Reason}, Acc0) ->
@@ -66,6 +72,23 @@ handle_open_revs(_Worker, {rexi_EXIT, _}, Acc0) ->
handle_open_revs(_Worker, _Reply, {_WaitingCount, _R, _Replies}) ->
{stop, not_implemented}.
+handle_missing_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_missing_revs(_Worker, {rexi_EXIT, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_missing_revs(_Worker, {ok, Results}, {1, D0}) ->
+ D = update_dict(D0, Results),
+ {stop, dict:fold(fun force_missing_revs_reply/3, [], D)};
+handle_missing_revs(_Worker, {ok, Results}, {WaitingCount, D0}) ->
+ D = update_dict(D0, Results),
+ case dict:fold(fun maybe_missing_revs_reply/3, {stop, []}, D) of
+ continue ->
+ % still haven't heard about some Ids
+ {ok, {WaitingCount - 1, D}};
+ {stop, FinalReply} ->
+ {stop, FinalReply}
+ end.
+
handle_update_docs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
skip_message(Acc0);
handle_update_docs(_Worker, {rexi_EXIT, _}, Acc0) ->
@@ -92,6 +115,23 @@ handle_update_docs(Worker, {ok, Replies}, Acc0) ->
{ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}
end.
+force_missing_revs_reply(Id, {nil,Revs}, Acc) ->
+ % never heard about this ID, assume it's missing
+ [{Id, Revs} | Acc];
+force_missing_revs_reply(_, [], Acc) ->
+ Acc;
+force_missing_revs_reply(Id, Revs, Acc) ->
+ [{Id, Revs} | Acc].
+
+maybe_missing_revs_reply(_, _, continue) ->
+ continue;
+maybe_missing_revs_reply(_, {nil, _}, _) ->
+ continue;
+maybe_missing_revs_reply(_, [], {stop, Acc}) ->
+ {stop, Acc};
+maybe_missing_revs_reply(Id, Revs, {stop, Acc}) ->
+ {stop, [{Id, Revs} | Acc]}.
+
force_update_reply(Doc, Replies, {W, Acc}) ->
% TODO make a real decision here
case Replies of
@@ -129,6 +169,13 @@ group_docs_by_shard(DbName, Docs) ->
end, D0, partitions:for_key(DbName,Id))
end, dict:new(), Docs)).
+group_idrevs_by_shard(DbName, IdsRevs) ->
+ dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, {Id, Revs}, D1)
+ end, D0, partitions:for_key(DbName,Id))
+ end, dict:new(), IdsRevs)).
+
skip_message({1, _R, Replies}) ->
repair_read_quorum_failure(Replies);
skip_message({WaitingCount, R, Replies}) ->
@@ -142,6 +189,9 @@ merge_read_reply(Key, Reply, Replies) ->
{lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
end.
+update_dict(D0, KVs) ->
+ lists:foldl(fun({K,V}, D1) -> dict:store(K, V, D1) end, D0, KVs).
+
append_update_replies([], [], DocReplyDict) ->
DocReplyDict;
append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->