diff options
-rw-r--r-- | src/fabric_doc_update.erl | 36 |
1 files changed, 21 insertions, 15 deletions
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl index 77c182f1..ea7d8d74 100644 --- a/src/fabric_doc_update.erl +++ b/src/fabric_doc_update.erl @@ -10,8 +10,10 @@ go(DbName, AllDocs, Options) -> {Shard#shard{ref=Ref}, Docs} end, group_docs_by_shard(DbName, AllDocs)), {Workers, _} = lists:unzip(GroupedDocs), - Acc0 = {length(Workers), length(AllDocs), couch_util:get_value(w, Options, 1), - GroupedDocs, dict:from_list([{Doc,[]} || Doc <- AllDocs])}, + W = list_to_integer(couch_util:get_value(w, Options, + couch_config:get("cluster", "w", "2"))), + Acc0 = {length(Workers), length(AllDocs), W, GroupedDocs, + dict:from_list([{Doc,[]} || Doc <- AllDocs])}, case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of {ok, Results} -> Reordered = couch_util:reorder_results(AllDocs, Results), @@ -50,13 +52,16 @@ handle_message({not_found, no_db_file} = X, Worker, Acc0) -> Docs = couch_util:get_value(Worker, GroupedDocs), handle_message({ok, [X || _D <- Docs]}, Worker, Acc0). -force_reply(Doc, Replies, {W, Acc}) -> - % TODO make a real decision here - case Replies of - [] -> - {W, [{Doc, {error, internal_server_error}} | Acc]}; - [Reply| _] -> - {W, [{Doc, Reply} | Acc]} +force_reply(Doc, [], {W, Acc}) -> + {W, [{Doc, {error, internal_server_error}} | Acc]}; +force_reply(Doc, [FirstReply|_] = Replies, {W, Acc}) -> + case update_quorum_met(W, Replies) of + {true, Reply} -> + {W, [{Doc,Reply} | Acc]}; + false -> + ?LOG_ERROR("write quorum (~p) failed, reply ~p", [W, FirstReply]), + % TODO make a smarter choice than just picking the first reply + {W, [{Doc,FirstReply} | Acc]} end. maybe_reply(_, _, continue) -> @@ -71,12 +76,13 @@ maybe_reply(Doc, Replies, {stop, W, Acc}) -> end. update_quorum_met(W, Replies) -> - % TODO make a real decision here - case length(Replies) >= W of - true -> - {true, hd(Replies)}; - false -> - false + Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end, + orddict:new(), Replies), + case lists:dropwhile(fun({_, Count}) -> Count < W end, Counters) of + [] -> + false; + [{FinalReply, _} | _] -> + {true, FinalReply} end. -spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. |