summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ebin/fabric.app4
-rw-r--r--include/fabric.hrl2
-rw-r--r--src/fabric.erl8
-rw-r--r--src/fabric_doc.erl228
-rw-r--r--src/fabric_missing_revs.erl61
-rw-r--r--src/fabric_open_doc.erl61
-rw-r--r--src/fabric_open_revs.erl60
-rw-r--r--src/fabric_update_docs.erl92
8 files changed, 284 insertions, 232 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app
index e08f560b..363655b2 100644
--- a/ebin/fabric.app
+++ b/ebin/fabric.app
@@ -7,7 +7,11 @@
fabric,
fabric_db,
fabric_doc,
+ fabric_missing_revs,
+ fabric_open_doc,
+ fabric_open_revs,
fabric_rpc,
+ fabric_update_docs,
fabric_util
]},
{registered, []},
diff --git a/include/fabric.hrl b/include/fabric.hrl
new file mode 100644
index 00000000..5426addc
--- /dev/null
+++ b/include/fabric.hrl
@@ -0,0 +1,2 @@
+-include("../../couch/src/couch_db.hrl").
+-include("../../dynomite/include/membership.hrl").
diff --git a/src/fabric.erl b/src/fabric.erl
index 26b379af..399b76d6 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -30,21 +30,21 @@ delete_db(DbName, Options) ->
open_doc(DbName, Id, Options) ->
- fabric_doc:open_doc(dbname(DbName), docid(Id), Options).
+ fabric_open_doc:go(dbname(DbName), docid(Id), Options).
open_revs(DbName, Id, Revs, Options) ->
- fabric_doc:open_revs(dbname(DbName), docid(Id), Revs, Options).
+ fabric_open_revs:go(dbname(DbName), docid(Id), Revs, Options).
get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) ->
Sanitized = [idrevs(IdR) || IdR <- IdsRevs],
- fabric_doc:get_missing_revs(dbname(DbName), Sanitized).
+ fabric_missing_revs:go(dbname(DbName), Sanitized).
update_doc(DbName, Doc, Options) ->
{ok, [Result]} = update_docs(DbName, [Doc], Options),
Result.
update_docs(DbName, Docs, Options) ->
- fabric_doc:update_docs(dbname(DbName), docs(Docs), Options).
+ fabric_update_docs:go(dbname(DbName), docs(Docs), Options).
%% some simple type validation and transcoding
diff --git a/src/fabric_doc.erl b/src/fabric_doc.erl
deleted file mode 100644
index e641d286..00000000
--- a/src/fabric_doc.erl
+++ /dev/null
@@ -1,228 +0,0 @@
--module(fabric_doc).
--export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
-
--include("../../couch/src/couch_db.hrl").
--include("../../dynomite/include/membership.hrl").
-
-open_doc(DbName, Id, Options) ->
- Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_doc,
- [Id, Options]),
- SuppressDeletedDoc = not lists:member(deleted, Options),
- Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
- case fabric_util:recv(Workers, #shard.ref, fun handle_open_doc/3, Acc0) of
- {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc ->
- {not_found, deleted};
- {ok, Else} ->
- Else;
- Error ->
- Error
- end.
-
-open_revs(DbName, Id, Revs, Options) ->
- Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_revs,
- [Id, Revs, Options]),
- Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
- case fabric_util:recv(Workers, #shard.ref, fun handle_open_revs/3, Acc0) of
- {ok, {ok, Reply}} ->
- {ok, Reply};
- Else ->
- Else
- end.
-
-get_missing_revs(DbName, AllIdsRevs) ->
- Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) ->
- Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}),
- Shard#shard{ref=Ref}
- end, group_idrevs_by_shard(DbName, AllIdsRevs)),
- ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]),
- Acc0 = {length(Workers), ResultDict},
- fabric_util:recv(Workers, #shard.ref, fun handle_missing_revs/3, Acc0).
-
-update_docs(DbName, AllDocs, Options) ->
- GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
- Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
- {Shard#shard{ref=Ref}, Docs}
- end, group_docs_by_shard(DbName, AllDocs)),
- {Workers, _} = lists:unzip(GroupedDocs),
- Acc0 = {length(Workers), length(AllDocs), couch_util:get_value(w, Options, 1),
- GroupedDocs, dict:new()},
- case fabric_util:recv(Workers, #shard.ref, fun handle_update_docs/3, Acc0) of
- {ok, Results} ->
- {ok, couch_util:reorder_results(AllDocs, Results)};
- Else ->
- Else
- end.
-
-handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
- skip_message(Acc0);
-handle_open_doc(_Worker, {rexi_EXIT, _Reason}, Acc0) ->
- skip_message(Acc0);
-handle_open_doc(_Worker, Reply, {WaitingCount, R, Replies}) ->
- case merge_read_reply(make_key(Reply), Reply, Replies) of
- {_, KeyCount} when KeyCount =:= R ->
- {stop, Reply};
- {NewReplies, KeyCount} when KeyCount < R ->
- if WaitingCount =:= 1 ->
- % last message arrived, but still no quorum
- repair_read_quorum_failure(NewReplies);
- true ->
- {ok, {WaitingCount-1, R, NewReplies}}
- end
- end.
-
-handle_open_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
- skip_message(Acc0);
-handle_open_revs(_Worker, {rexi_EXIT, _}, Acc0) ->
- skip_message(Acc0);
-handle_open_revs(_Worker, Reply, {WaitingCount, R, Replies}) ->
- case merge_read_reply(make_key(Reply), Reply, Replies) of
- {_, KeyCount} when KeyCount =:= R ->
- {stop, Reply};
- {NewReplies, KeyCount} when KeyCount < R ->
- if WaitingCount =:= 1 ->
- % last message arrived, but still no quorum
- repair_read_quorum_failure(NewReplies);
- true ->
- {ok, {WaitingCount-1, R, NewReplies}}
- end
- end.
-
-handle_missing_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
- skip_message(Acc0);
-handle_missing_revs(_Worker, {rexi_EXIT, _, _, _}, Acc0) ->
- skip_message(Acc0);
-handle_missing_revs(_Worker, {ok, Results}, {1, D0}) ->
- D = update_dict(D0, Results),
- {stop, dict:fold(fun force_missing_revs_reply/3, [], D)};
-handle_missing_revs(_Worker, {ok, Results}, {WaitingCount, D0}) ->
- D = update_dict(D0, Results),
- case dict:fold(fun maybe_missing_revs_reply/3, {stop, []}, D) of
- continue ->
- % still haven't heard about some Ids
- {ok, {WaitingCount - 1, D}};
- {stop, FinalReply} ->
- {stop, FinalReply}
- end.
-
-handle_update_docs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
- skip_message(Acc0);
-handle_update_docs(_Worker, {rexi_EXIT, _}, Acc0) ->
- skip_message(Acc0);
-handle_update_docs(Worker, {ok, Replies}, Acc0) ->
- {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
- Docs = couch_util:get_value(Worker, GroupedDocs),
- DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
- case {WaitingCount, dict:size(DocReplyDict)} of
- {1, _} ->
- % last message has arrived, we need to conclude things
- {W, Reply} = dict:fold(fun force_update_reply/3, {W,[]}, DocReplyDict),
- {stop, Reply};
- {_, DocCount} ->
- % we've got at least one reply for each document, let's take a look
- case dict:fold(fun maybe_update_reply/3, {stop,W,[]}, DocReplyDict) of
- continue ->
- {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}};
- {stop, W, FinalReplies} ->
- {stop, FinalReplies}
- end;
- {_, N} when N < DocCount ->
- % no point in trying to finalize anything yet
- {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}
- end.
-
-force_missing_revs_reply(Id, {nil,Revs}, Acc) ->
- % never heard about this ID, assume it's missing
- [{Id, Revs} | Acc];
-force_missing_revs_reply(_, [], Acc) ->
- Acc;
-force_missing_revs_reply(Id, Revs, Acc) ->
- [{Id, Revs} | Acc].
-
-maybe_missing_revs_reply(_, _, continue) ->
- continue;
-maybe_missing_revs_reply(_, {nil, _}, _) ->
- continue;
-maybe_missing_revs_reply(_, [], {stop, Acc}) ->
- {stop, Acc};
-maybe_missing_revs_reply(Id, Revs, {stop, Acc}) ->
- {stop, [{Id, Revs} | Acc]}.
-
-force_update_reply(Doc, Replies, {W, Acc}) ->
- % TODO make a real decision here
- case Replies of
- [] ->
- {W, [{Doc, {error, internal_server_error}} | Acc]};
- [Reply| _] ->
- {W, [{Doc, Reply} | Acc]}
- end.
-
-maybe_update_reply(_, _, continue) ->
- % we didn't meet quorum for all docs, so we're fast-forwarding the fold
- continue;
-maybe_update_reply(Doc, Replies, {stop, W, Acc}) ->
- case update_quorum_met(W, Replies) of
- {true, Reply} ->
- {stop, W, [{Doc, Reply} | Acc]};
- false ->
- continue
- end.
-
-update_quorum_met(W, Replies) ->
- % TODO make a real decision here
- case length(Replies) >= W of
- true ->
- {true, hd(Replies)};
- false ->
- false
- end.
-
--spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
-group_docs_by_shard(DbName, Docs) ->
- dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
- lists:foldl(fun(Shard, D1) ->
- dict:append(Shard, Doc, D1)
- end, D0, partitions:for_key(DbName,Id))
- end, dict:new(), Docs)).
-
-group_idrevs_by_shard(DbName, IdsRevs) ->
- dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
- lists:foldl(fun(Shard, D1) ->
- dict:append(Shard, {Id, Revs}, D1)
- end, D0, partitions:for_key(DbName,Id))
- end, dict:new(), IdsRevs)).
-
-skip_message({1, _R, Replies}) ->
- repair_read_quorum_failure(Replies);
-skip_message({WaitingCount, R, Replies}) ->
- {ok, {WaitingCount-1, R, Replies}}.
-
-merge_read_reply(Key, Reply, Replies) ->
- case lists:keyfind(Key, 1, Replies) of
- false ->
- {[{Key, Reply, 1} | Replies], 1};
- {Key, _, N} ->
- {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
- end.
-
-update_dict(D0, KVs) ->
- lists:foldl(fun({K,V}, D1) -> dict:store(K, V, D1) end, D0, KVs).
-
-append_update_replies([], [], DocReplyDict) ->
- DocReplyDict;
-append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
- % TODO what if the same document shows up twice in one update_docs call?
- append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
-
-make_key({ok, #doc{id=Id, revs=Revs}}) ->
- {Id, Revs};
-make_key(Else) ->
- Else.
-
-repair_read_quorum_failure(Replies) ->
- case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
- [] ->
- {stop, {not_found, missing}};
- [Doc|_] ->
- % TODO merge docs to find the winner as determined by replication
- {stop, {ok, Doc}}
- end.
diff --git a/src/fabric_missing_revs.erl b/src/fabric_missing_revs.erl
new file mode 100644
index 00000000..d329d2aa
--- /dev/null
+++ b/src/fabric_missing_revs.erl
@@ -0,0 +1,61 @@
+-module(fabric_missing_revs).
+-export([go/2]).
+-include("fabric.hrl").
+
+go(DbName, AllIdsRevs) ->
+ Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}),
+ Shard#shard{ref=Ref}
+ end, group_idrevs_by_shard(DbName, AllIdsRevs)),
+ ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]),
+ Acc0 = {length(Workers), ResultDict},
+ fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0).
+
+handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_message(_Worker, {rexi_EXIT, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_message(_Worker, {ok, Results}, {1, D0}) ->
+ D = update_dict(D0, Results),
+ {stop, dict:fold(fun force_reply/3, [], D)};
+handle_message(_Worker, {ok, Results}, {WaitingCount, D0}) ->
+ D = update_dict(D0, Results),
+ case dict:fold(fun maybe_reply/3, {stop, []}, D) of
+ continue ->
+ % still haven't heard about some Ids
+ {ok, {WaitingCount - 1, D}};
+ {stop, FinalReply} ->
+ {stop, FinalReply}
+ end.
+
+force_reply(Id, {nil,Revs}, Acc) ->
+ % never heard about this ID, assume it's missing
+ [{Id, Revs} | Acc];
+force_reply(_, [], Acc) ->
+ Acc;
+force_reply(Id, Revs, Acc) ->
+ [{Id, Revs} | Acc].
+
+maybe_reply(_, _, continue) ->
+ continue;
+maybe_reply(_, {nil, _}, _) ->
+ continue;
+maybe_reply(_, [], {stop, Acc}) ->
+ {stop, Acc};
+maybe_reply(Id, Revs, {stop, Acc}) ->
+ {stop, [{Id, Revs} | Acc]}.
+
+group_idrevs_by_shard(DbName, IdsRevs) ->
+ dict:to_list(lists:foldl(fun({Id, Revs}, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, {Id, Revs}, D1)
+ end, D0, partitions:for_key(DbName,Id))
+ end, dict:new(), IdsRevs)).
+
+update_dict(D0, KVs) ->
+ lists:foldl(fun({K,V}, D1) -> dict:store(K, V, D1) end, D0, KVs).
+
+skip_message({1, Dict}) ->
+ {stop, dict:fold(fun force_reply/3, [], Dict)};
+skip_message({WaitingCount, Dict}) ->
+ {ok, {WaitingCount-1, Dict}}.
diff --git a/src/fabric_open_doc.erl b/src/fabric_open_doc.erl
new file mode 100644
index 00000000..996282d8
--- /dev/null
+++ b/src/fabric_open_doc.erl
@@ -0,0 +1,61 @@
+-module(fabric_open_doc).
+-export([go/3]).
+-include("fabric.hrl").
+
+go(DbName, Id, Options) ->
+ Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_doc,
+ [Id, Options]),
+ SuppressDeletedDoc = not lists:member(deleted, Options),
+ Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc ->
+ {not_found, deleted};
+ {ok, Else} ->
+ Else;
+ Error ->
+ Error
+ end.
+
+handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_message(_Worker, {rexi_EXIT, _Reason}, Acc0) ->
+ skip_message(Acc0);
+handle_message(_Worker, Reply, {WaitingCount, R, Replies}) ->
+ case merge_read_reply(make_key(Reply), Reply, Replies) of
+ {_, KeyCount} when KeyCount =:= R ->
+ {stop, Reply};
+ {NewReplies, KeyCount} when KeyCount < R ->
+ if WaitingCount =:= 1 ->
+ % last message arrived, but still no quorum
+ repair_read_quorum_failure(NewReplies);
+ true ->
+ {ok, {WaitingCount-1, R, NewReplies}}
+ end
+ end.
+
+skip_message({1, _R, Replies}) ->
+ repair_read_quorum_failure(Replies);
+skip_message({WaitingCount, R, Replies}) ->
+ {ok, {WaitingCount-1, R, Replies}}.
+
+merge_read_reply(Key, Reply, Replies) ->
+ case lists:keyfind(Key, 1, Replies) of
+ false ->
+ {[{Key, Reply, 1} | Replies], 1};
+ {Key, _, N} ->
+ {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
+ end.
+
+make_key({ok, #doc{id=Id, revs=Revs}}) ->
+ {Id, Revs};
+make_key(Else) ->
+ Else.
+
+repair_read_quorum_failure(Replies) ->
+ case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
+ [] ->
+ {stop, {not_found, missing}};
+ [Doc|_] ->
+ % TODO merge docs to find the winner as determined by replication
+ {stop, {ok, Doc}}
+ end. \ No newline at end of file
diff --git a/src/fabric_open_revs.erl b/src/fabric_open_revs.erl
new file mode 100644
index 00000000..c5bd586d
--- /dev/null
+++ b/src/fabric_open_revs.erl
@@ -0,0 +1,60 @@
+-module(fabric_open_revs).
+-export([go/4]).
+-include("fabric.hrl").
+
+go(DbName, Id, Revs, Options) ->
+ Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_revs,
+ [Id, Revs, Options]),
+ Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, {ok, Reply}} ->
+ {ok, Reply};
+ Else ->
+ Else
+ end.
+
+handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_message(_Worker, {rexi_EXIT, _}, Acc0) ->
+ skip_message(Acc0);
+handle_message(_Worker, Reply, {WaitingCount, R, Replies}) ->
+ case merge_read_reply(make_key(Reply), Reply, Replies) of
+ {_, KeyCount} when KeyCount =:= R ->
+ {stop, Reply};
+ {NewReplies, KeyCount} when KeyCount < R ->
+ if WaitingCount =:= 1 ->
+ % last message arrived, but still no quorum
+ repair_read_quorum_failure(NewReplies);
+ true ->
+ {ok, {WaitingCount-1, R, NewReplies}}
+ end
+ end.
+
+skip_message({1, _R, Replies}) ->
+ repair_read_quorum_failure(Replies);
+skip_message({WaitingCount, R, Replies}) ->
+ {ok, {WaitingCount-1, R, Replies}}.
+
+merge_read_reply(Key, Reply, Replies) ->
+ case lists:keyfind(Key, 1, Replies) of
+ false ->
+ {[{Key, Reply, 1} | Replies], 1};
+ {Key, _, N} ->
+ {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
+ end.
+
+make_key({ok, #doc{id=Id, revs=Revs}}) ->
+ {Id, Revs};
+make_key(Else) ->
+ Else.
+
+repair_read_quorum_failure(Replies) ->
+ case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
+ [] ->
+ {stop, {not_found, missing}};
+ [Doc|_] ->
+ % TODO merge docs to find the winner as determined by replication
+ {stop, {ok, Doc}}
+ end.
+
+ \ No newline at end of file
diff --git a/src/fabric_update_docs.erl b/src/fabric_update_docs.erl
new file mode 100644
index 00000000..00c7e9d7
--- /dev/null
+++ b/src/fabric_update_docs.erl
@@ -0,0 +1,92 @@
+-module(fabric_update_docs).
+-export([go/3]).
+-include("fabric.hrl").
+
+go(DbName, AllDocs, Options) ->
+ GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ {Shard#shard{ref=Ref}, Docs}
+ end, group_docs_by_shard(DbName, AllDocs)),
+ {Workers, _} = lists:unzip(GroupedDocs),
+ Acc0 = {length(Workers), length(AllDocs), couch_util:get_value(w, Options, 1),
+ GroupedDocs, dict:new()},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ {ok, Results} ->
+ {ok, couch_util:reorder_results(AllDocs, Results)};
+ Else ->
+ Else
+ end.
+
+handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_message(_Worker, {rexi_EXIT, _}, Acc0) ->
+ skip_message(Acc0);
+handle_message(Worker, {ok, Replies}, Acc0) ->
+ {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
+ Docs = couch_util:get_value(Worker, GroupedDocs),
+ DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
+ case {WaitingCount, dict:size(DocReplyDict)} of
+ {1, _} ->
+ % last message has arrived, we need to conclude things
+ {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict),
+ {stop, Reply};
+ {_, DocCount} ->
+ % we've got at least one reply for each document, let's take a look
+ case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of
+ continue ->
+ {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}};
+ {stop, W, FinalReplies} ->
+ {stop, FinalReplies}
+ end;
+ {_, N} when N < DocCount ->
+ % no point in trying to finalize anything yet
+ {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}
+ end.
+
+force_reply(Doc, Replies, {W, Acc}) ->
+ % TODO make a real decision here
+ case Replies of
+ [] ->
+ {W, [{Doc, {error, internal_server_error}} | Acc]};
+ [Reply| _] ->
+ {W, [{Doc, Reply} | Acc]}
+ end.
+
+maybe_reply(_, _, continue) ->
+ % we didn't meet quorum for all docs, so we're fast-forwarding the fold
+ continue;
+maybe_reply(Doc, Replies, {stop, W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, Reply} ->
+ {stop, W, [{Doc, Reply} | Acc]};
+ false ->
+ continue
+ end.
+
+update_quorum_met(W, Replies) ->
+ % TODO make a real decision here
+ case length(Replies) >= W of
+ true ->
+ {true, hd(Replies)};
+ false ->
+ false
+ end.
+
+-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
+group_docs_by_shard(DbName, Docs) ->
+ dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, Doc, D1)
+ end, D0, partitions:for_key(DbName,Id))
+ end, dict:new(), Docs)).
+
+append_update_replies([], [], DocReplyDict) ->
+ DocReplyDict;
+append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
+ % TODO what if the same document shows up twice in one update_docs call?
+ append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+
+skip_message(Acc0) ->
+ % TODO fix this
+ {ok, Acc0}.
+