diff options
Diffstat (limited to 'src/fabric_doc.erl')
-rw-r--r-- | src/fabric_doc.erl | 56 |
1 files changed, 53 insertions, 3 deletions
diff --git a/src/fabric_doc.erl b/src/fabric_doc.erl index 039b6aab..97e7cfb6 100644 --- a/src/fabric_doc.erl +++ b/src/fabric_doc.erl @@ -24,6 +24,15 @@ open_revs(DbName, Id, Revs, Options) -> Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, fabric_util:recv(Workers, #shard.ref, fun handle_open_revs/3, Acc0). +get_missing_revs(DbName, AllIdsRevs) -> + Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> + Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}), + Shard#shard{ref=Ref} + end, group_idrevs_by_shard(DbName, AllIdsRevs)), + ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]), + Acc0 = {length(Workers), ResultDict}, + fabric_util:recv(Workers, #shard.ref, fun handle_missing_revs/3, Acc0). + update_docs(DbName, AllDocs, Options) -> GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), @@ -39,9 +48,6 @@ update_docs(DbName, AllDocs, Options) -> Else end. -get_missing_revs(_DbName, _IdsRevs) -> - ok. - handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> skip_message(Acc0); handle_open_doc(_Worker, {rexi_EXIT, _Reason}, Acc0) -> @@ -66,6 +72,23 @@ handle_open_revs(_Worker, {rexi_EXIT, _}, Acc0) -> handle_open_revs(_Worker, _Reply, {_WaitingCount, _R, _Replies}) -> {stop, not_implemented}. +handle_missing_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_missing_revs(_Worker, {rexi_EXIT, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_missing_revs(_Worker, {ok, Results}, {1, D0}) -> + D = update_dict(D0, Results), + {stop, dict:fold(fun force_missing_revs_reply/3, [], D)}; +handle_missing_revs(_Worker, {ok, Results}, {WaitingCount, D0}) -> + D = update_dict(D0, Results), + case dict:fold(fun maybe_missing_revs_reply/3, {stop, []}, D) of + continue -> + % still haven't heard about some Ids + {ok, {WaitingCount - 1, D}}; + {stop, FinalReply} -> + {stop, FinalReply} + end. + handle_update_docs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> skip_message(Acc0); handle_update_docs(_Worker, {rexi_EXIT, _}, Acc0) -> @@ -92,6 +115,23 @@ handle_update_docs(Worker, {ok, Replies}, Acc0) -> {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}} end. +force_missing_revs_reply(Id, {nil,Revs}, Acc) -> + % never heard about this ID, assume it's missing + [{Id, Revs} | Acc]; +force_missing_revs_reply(_, [], Acc) -> + Acc; +force_missing_revs_reply(Id, Revs, Acc) -> + [{Id, Revs} | Acc]. + +maybe_missing_revs_reply(_, _, continue) -> + continue; +maybe_missing_revs_reply(_, {nil, _}, _) -> + continue; +maybe_missing_revs_reply(_, [], {stop, Acc}) -> + {stop, Acc}; +maybe_missing_revs_reply(Id, Revs, {stop, Acc}) -> + {stop, [{Id, Revs} | Acc]}. + force_update_reply(Doc, Replies, {W, Acc}) -> % TODO make a real decision here case Replies of @@ -129,6 +169,13 @@ group_docs_by_shard(DbName, Docs) -> end, D0, partitions:for_key(DbName,Id)) end, dict:new(), Docs)). +group_idrevs_by_shard(DbName, IdsRevs) -> + dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, {Id, Revs}, D1) + end, D0, partitions:for_key(DbName,Id)) + end, dict:new(), IdsRevs)). + skip_message({1, _R, Replies}) -> repair_read_quorum_failure(Replies); skip_message({WaitingCount, R, Replies}) -> @@ -142,6 +189,9 @@ merge_read_reply(Key, Reply, Replies) -> {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} end. +update_dict(D0, KVs) -> + lists:foldl(fun({K,V}, D1) -> dict:store(K, V, D1) end, D0, KVs). + append_update_replies([], [], DocReplyDict) -> DocReplyDict; append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> |