summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/fabric_doc_update.erl36
1 files changed, 21 insertions, 15 deletions
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index 77c182f1..ea7d8d74 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -10,8 +10,10 @@ go(DbName, AllDocs, Options) ->
{Shard#shard{ref=Ref}, Docs}
end, group_docs_by_shard(DbName, AllDocs)),
{Workers, _} = lists:unzip(GroupedDocs),
- Acc0 = {length(Workers), length(AllDocs), couch_util:get_value(w, Options, 1),
- GroupedDocs, dict:from_list([{Doc,[]} || Doc <- AllDocs])},
+ W = list_to_integer(couch_util:get_value(w, Options,
+ couch_config:get("cluster", "w", "2"))),
+ Acc0 = {length(Workers), length(AllDocs), W, GroupedDocs,
+ dict:from_list([{Doc,[]} || Doc <- AllDocs])},
case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
{ok, Results} ->
Reordered = couch_util:reorder_results(AllDocs, Results),
@@ -50,13 +52,16 @@ handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
Docs = couch_util:get_value(Worker, GroupedDocs),
handle_message({ok, [X || _D <- Docs]}, Worker, Acc0).
-force_reply(Doc, Replies, {W, Acc}) ->
- % TODO make a real decision here
- case Replies of
- [] ->
- {W, [{Doc, {error, internal_server_error}} | Acc]};
- [Reply| _] ->
- {W, [{Doc, Reply} | Acc]}
+force_reply(Doc, [], {W, Acc}) ->
+ {W, [{Doc, {error, internal_server_error}} | Acc]};
+force_reply(Doc, [FirstReply|_] = Replies, {W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, Reply} ->
+ {W, [{Doc,Reply} | Acc]};
+ false ->
+ ?LOG_ERROR("write quorum (~p) failed, reply ~p", [W, FirstReply]),
+ % TODO make a smarter choice than just picking the first reply
+ {W, [{Doc,FirstReply} | Acc]}
end.
maybe_reply(_, _, continue) ->
@@ -71,12 +76,13 @@ maybe_reply(Doc, Replies, {stop, W, Acc}) ->
end.
update_quorum_met(W, Replies) ->
- % TODO make a real decision here
- case length(Replies) >= W of
- true ->
- {true, hd(Replies)};
- false ->
- false
+ Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end,
+ orddict:new(), Replies),
+ case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of
+ [] ->
+ false;
+ [{FinalReply, _} | _] ->
+ {true, FinalReply}
end.
-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].