diff options
-rw-r--r-- | ebin/fabric.app | 4 | ||||
-rw-r--r-- | include/fabric.hrl | 2 | ||||
-rw-r--r-- | src/fabric.erl | 8 | ||||
-rw-r--r-- | src/fabric_doc.erl | 228 | ||||
-rw-r--r-- | src/fabric_missing_revs.erl | 61 | ||||
-rw-r--r-- | src/fabric_open_doc.erl | 61 | ||||
-rw-r--r-- | src/fabric_open_revs.erl | 60 | ||||
-rw-r--r-- | src/fabric_update_docs.erl | 92 |
8 files changed, 284 insertions, 232 deletions
diff --git a/ebin/fabric.app b/ebin/fabric.app index e08f560b..363655b2 100644 --- a/ebin/fabric.app +++ b/ebin/fabric.app @@ -7,7 +7,11 @@ fabric, fabric_db, fabric_doc, + fabric_missing_revs, + fabric_open_doc, + fabric_open_revs, fabric_rpc, + fabric_update_docs, fabric_util ]}, {registered, []}, diff --git a/include/fabric.hrl b/include/fabric.hrl new file mode 100644 index 00000000..5426addc --- /dev/null +++ b/include/fabric.hrl @@ -0,0 +1,2 @@ +-include("../../couch/src/couch_db.hrl"). +-include("../../dynomite/include/membership.hrl"). diff --git a/src/fabric.erl b/src/fabric.erl index 26b379af..399b76d6 100644 --- a/src/fabric.erl +++ b/src/fabric.erl @@ -30,21 +30,21 @@ delete_db(DbName, Options) -> open_doc(DbName, Id, Options) -> - fabric_doc:open_doc(dbname(DbName), docid(Id), Options). + fabric_open_doc:go(dbname(DbName), docid(Id), Options). open_revs(DbName, Id, Revs, Options) -> - fabric_doc:open_revs(dbname(DbName), docid(Id), Revs, Options). + fabric_open_revs:go(dbname(DbName), docid(Id), Revs, Options). get_missing_revs(DbName, IdsRevs) when is_list(IdsRevs) -> Sanitized = [idrevs(IdR) || IdR <- IdsRevs], - fabric_doc:get_missing_revs(dbname(DbName), Sanitized). + fabric_missing_revs:go(dbname(DbName), Sanitized). update_doc(DbName, Doc, Options) -> {ok, [Result]} = update_docs(DbName, [Doc], Options), Result. update_docs(DbName, Docs, Options) -> - fabric_doc:update_docs(dbname(DbName), docs(Docs), Options). + fabric_update_docs:go(dbname(DbName), docs(Docs), Options). %% some simple type validation and transcoding diff --git a/src/fabric_doc.erl b/src/fabric_doc.erl deleted file mode 100644 index e641d286..00000000 --- a/src/fabric_doc.erl +++ /dev/null @@ -1,228 +0,0 @@ --module(fabric_doc). --export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). - --include("../../couch/src/couch_db.hrl"). --include("../../dynomite/include/membership.hrl"). - -open_doc(DbName, Id, Options) -> - Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_doc, - [Id, Options]), - SuppressDeletedDoc = not lists:member(deleted, Options), - Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, - case fabric_util:recv(Workers, #shard.ref, fun handle_open_doc/3, Acc0) of - {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc -> - {not_found, deleted}; - {ok, Else} -> - Else; - Error -> - Error - end. - -open_revs(DbName, Id, Revs, Options) -> - Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_revs, - [Id, Revs, Options]), - Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, - case fabric_util:recv(Workers, #shard.ref, fun handle_open_revs/3, Acc0) of - {ok, {ok, Reply}} -> - {ok, Reply}; - Else -> - Else - end. - -get_missing_revs(DbName, AllIdsRevs) -> - Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> - Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}), - Shard#shard{ref=Ref} - end, group_idrevs_by_shard(DbName, AllIdsRevs)), - ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]), - Acc0 = {length(Workers), ResultDict}, - fabric_util:recv(Workers, #shard.ref, fun handle_missing_revs/3, Acc0). - -update_docs(DbName, AllDocs, Options) -> - GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> - Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), - {Shard#shard{ref=Ref}, Docs} - end, group_docs_by_shard(DbName, AllDocs)), - {Workers, _} = lists:unzip(GroupedDocs), - Acc0 = {length(Workers), length(AllDocs), couch_util:get_value(w, Options, 1), - GroupedDocs, dict:new()}, - case fabric_util:recv(Workers, #shard.ref, fun handle_update_docs/3, Acc0) of - {ok, Results} -> - {ok, couch_util:reorder_results(AllDocs, Results)}; - Else -> - Else - end. - -handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> - skip_message(Acc0); -handle_open_doc(_Worker, {rexi_EXIT, _Reason}, Acc0) -> - skip_message(Acc0); -handle_open_doc(_Worker, Reply, {WaitingCount, R, Replies}) -> - case merge_read_reply(make_key(Reply), Reply, Replies) of - {_, KeyCount} when KeyCount =:= R -> - {stop, Reply}; - {NewReplies, KeyCount} when KeyCount < R -> - if WaitingCount =:= 1 -> - % last message arrived, but still no quorum - repair_read_quorum_failure(NewReplies); - true -> - {ok, {WaitingCount-1, R, NewReplies}} - end - end. - -handle_open_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> - skip_message(Acc0); -handle_open_revs(_Worker, {rexi_EXIT, _}, Acc0) -> - skip_message(Acc0); -handle_open_revs(_Worker, Reply, {WaitingCount, R, Replies}) -> - case merge_read_reply(make_key(Reply), Reply, Replies) of - {_, KeyCount} when KeyCount =:= R -> - {stop, Reply}; - {NewReplies, KeyCount} when KeyCount < R -> - if WaitingCount =:= 1 -> - % last message arrived, but still no quorum - repair_read_quorum_failure(NewReplies); - true -> - {ok, {WaitingCount-1, R, NewReplies}} - end - end. - -handle_missing_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> - skip_message(Acc0); -handle_missing_revs(_Worker, {rexi_EXIT, _, _, _}, Acc0) -> - skip_message(Acc0); -handle_missing_revs(_Worker, {ok, Results}, {1, D0}) -> - D = update_dict(D0, Results), - {stop, dict:fold(fun force_missing_revs_reply/3, [], D)}; -handle_missing_revs(_Worker, {ok, Results}, {WaitingCount, D0}) -> - D = update_dict(D0, Results), - case dict:fold(fun maybe_missing_revs_reply/3, {stop, []}, D) of - continue -> - % still haven't heard about some Ids - {ok, {WaitingCount - 1, D}}; - {stop, FinalReply} -> - {stop, FinalReply} - end. - -handle_update_docs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> - skip_message(Acc0); -handle_update_docs(_Worker, {rexi_EXIT, _}, Acc0) -> - skip_message(Acc0); -handle_update_docs(Worker, {ok, Replies}, Acc0) -> - {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, - Docs = couch_util:get_value(Worker, GroupedDocs), - DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), - case {WaitingCount, dict:size(DocReplyDict)} of - {1, _} -> - % last message has arrived, we need to conclude things - {W, Reply} = dict:fold(fun force_update_reply/3, {W,[]}, DocReplyDict), - {stop, Reply}; - {_, DocCount} -> - % we've got at least one reply for each document, let's take a look - case dict:fold(fun maybe_update_reply/3, {stop,W,[]}, DocReplyDict) of - continue -> - {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}; - {stop, W, FinalReplies} -> - {stop, FinalReplies} - end; - {_, N} when N < DocCount -> - % no point in trying to finalize anything yet - {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}} - end. - -force_missing_revs_reply(Id, {nil,Revs}, Acc) -> - % never heard about this ID, assume it's missing - [{Id, Revs} | Acc]; -force_missing_revs_reply(_, [], Acc) -> - Acc; -force_missing_revs_reply(Id, Revs, Acc) -> - [{Id, Revs} | Acc]. - -maybe_missing_revs_reply(_, _, continue) -> - continue; -maybe_missing_revs_reply(_, {nil, _}, _) -> - continue; -maybe_missing_revs_reply(_, [], {stop, Acc}) -> - {stop, Acc}; -maybe_missing_revs_reply(Id, Revs, {stop, Acc}) -> - {stop, [{Id, Revs} | Acc]}. - -force_update_reply(Doc, Replies, {W, Acc}) -> - % TODO make a real decision here - case Replies of - [] -> - {W, [{Doc, {error, internal_server_error}} | Acc]}; - [Reply| _] -> - {W, [{Doc, Reply} | Acc]} - end. - -maybe_update_reply(_, _, continue) -> - % we didn't meet quorum for all docs, so we're fast-forwarding the fold - continue; -maybe_update_reply(Doc, Replies, {stop, W, Acc}) -> - case update_quorum_met(W, Replies) of - {true, Reply} -> - {stop, W, [{Doc, Reply} | Acc]}; - false -> - continue - end. - -update_quorum_met(W, Replies) -> - % TODO make a real decision here - case length(Replies) >= W of - true -> - {true, hd(Replies)}; - false -> - false - end. - --spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. -group_docs_by_shard(DbName, Docs) -> - dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, Doc, D1) - end, D0, partitions:for_key(DbName,Id)) - end, dict:new(), Docs)). - -group_idrevs_by_shard(DbName, IdsRevs) -> - dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> - lists:foldl(fun(Shard, D1) -> - dict:append(Shard, {Id, Revs}, D1) - end, D0, partitions:for_key(DbName,Id)) - end, dict:new(), IdsRevs)). - -skip_message({1, _R, Replies}) -> - repair_read_quorum_failure(Replies); -skip_message({WaitingCount, R, Replies}) -> - {ok, {WaitingCount-1, R, Replies}}. - -merge_read_reply(Key, Reply, Replies) -> - case lists:keyfind(Key, 1, Replies) of - false -> - {[{Key, Reply, 1} | Replies], 1}; - {Key, _, N} -> - {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} - end. - -update_dict(D0, KVs) -> - lists:foldl(fun({K,V}, D1) -> dict:store(K, V, D1) end, D0, KVs). - -append_update_replies([], [], DocReplyDict) -> - DocReplyDict; -append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> - % TODO what if the same document shows up twice in one update_docs call? - append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). - -make_key({ok, #doc{id=Id, revs=Revs}}) -> - {Id, Revs}; -make_key(Else) -> - Else. - -repair_read_quorum_failure(Replies) -> - case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of - [] -> - {stop, {not_found, missing}}; - [Doc|_] -> - % TODO merge docs to find the winner as determined by replication - {stop, {ok, Doc}} - end. diff --git a/src/fabric_missing_revs.erl b/src/fabric_missing_revs.erl new file mode 100644 index 00000000..d329d2aa --- /dev/null +++ b/src/fabric_missing_revs.erl @@ -0,0 +1,61 @@ +-module(fabric_missing_revs). +-export([go/2]). +-include("fabric.hrl"). + +go(DbName, AllIdsRevs) -> + Workers = lists:map(fun({#shard{name=Name, node=Node} = Shard, IdsRevs}) -> + Ref = rexi:cast(Node, {fabric_rpc, get_missing_revs, [Name, IdsRevs]}), + Shard#shard{ref=Ref} + end, group_idrevs_by_shard(DbName, AllIdsRevs)), + ResultDict = dict:from_list([{Id, {nil,Revs}} || {Id, Revs} <- AllIdsRevs]), + Acc0 = {length(Workers), ResultDict}, + fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0). + +handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_message(_Worker, {rexi_EXIT, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_message(_Worker, {ok, Results}, {1, D0}) -> + D = update_dict(D0, Results), + {stop, dict:fold(fun force_reply/3, [], D)}; +handle_message(_Worker, {ok, Results}, {WaitingCount, D0}) -> + D = update_dict(D0, Results), + case dict:fold(fun maybe_reply/3, {stop, []}, D) of + continue -> + % still haven't heard about some Ids + {ok, {WaitingCount - 1, D}}; + {stop, FinalReply} -> + {stop, FinalReply} + end. + +force_reply(Id, {nil,Revs}, Acc) -> + % never heard about this ID, assume it's missing + [{Id, Revs} | Acc]; +force_reply(_, [], Acc) -> + Acc; +force_reply(Id, Revs, Acc) -> + [{Id, Revs} | Acc]. + +maybe_reply(_, _, continue) -> + continue; +maybe_reply(_, {nil, _}, _) -> + continue; +maybe_reply(_, [], {stop, Acc}) -> + {stop, Acc}; +maybe_reply(Id, Revs, {stop, Acc}) -> + {stop, [{Id, Revs} | Acc]}. + +group_idrevs_by_shard(DbName, IdsRevs) -> + dict:to_list(lists:foldl(fun({Id, Revs}, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, {Id, Revs}, D1) + end, D0, partitions:for_key(DbName,Id)) + end, dict:new(), IdsRevs)). + +update_dict(D0, KVs) -> + lists:foldl(fun({K,V}, D1) -> dict:store(K, V, D1) end, D0, KVs). + +skip_message({1, Dict}) -> + {stop, dict:fold(fun force_reply/3, [], Dict)}; +skip_message({WaitingCount, Dict}) -> + {ok, {WaitingCount-1, Dict}}. diff --git a/src/fabric_open_doc.erl b/src/fabric_open_doc.erl new file mode 100644 index 00000000..996282d8 --- /dev/null +++ b/src/fabric_open_doc.erl @@ -0,0 +1,61 @@ +-module(fabric_open_doc). +-export([go/3]). +-include("fabric.hrl"). + +go(DbName, Id, Options) -> + Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_doc, + [Id, Options]), + SuppressDeletedDoc = not lists:member(deleted, Options), + Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc -> + {not_found, deleted}; + {ok, Else} -> + Else; + Error -> + Error + end. + +handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_message(_Worker, {rexi_EXIT, _Reason}, Acc0) -> + skip_message(Acc0); +handle_message(_Worker, Reply, {WaitingCount, R, Replies}) -> + case merge_read_reply(make_key(Reply), Reply, Replies) of + {_, KeyCount} when KeyCount =:= R -> + {stop, Reply}; + {NewReplies, KeyCount} when KeyCount < R -> + if WaitingCount =:= 1 -> + % last message arrived, but still no quorum + repair_read_quorum_failure(NewReplies); + true -> + {ok, {WaitingCount-1, R, NewReplies}} + end + end. + +skip_message({1, _R, Replies}) -> + repair_read_quorum_failure(Replies); +skip_message({WaitingCount, R, Replies}) -> + {ok, {WaitingCount-1, R, Replies}}. + +merge_read_reply(Key, Reply, Replies) -> + case lists:keyfind(Key, 1, Replies) of + false -> + {[{Key, Reply, 1} | Replies], 1}; + {Key, _, N} -> + {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} + end. + +make_key({ok, #doc{id=Id, revs=Revs}}) -> + {Id, Revs}; +make_key(Else) -> + Else. + +repair_read_quorum_failure(Replies) -> + case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of + [] -> + {stop, {not_found, missing}}; + [Doc|_] -> + % TODO merge docs to find the winner as determined by replication + {stop, {ok, Doc}} + end.
\ No newline at end of file diff --git a/src/fabric_open_revs.erl b/src/fabric_open_revs.erl new file mode 100644 index 00000000..c5bd586d --- /dev/null +++ b/src/fabric_open_revs.erl @@ -0,0 +1,60 @@ +-module(fabric_open_revs). +-export([go/4]). +-include("fabric.hrl"). + +go(DbName, Id, Revs, Options) -> + Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_revs, + [Id, Revs, Options]), + Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, {ok, Reply}} -> + {ok, Reply}; + Else -> + Else + end. + +handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_message(_Worker, {rexi_EXIT, _}, Acc0) -> + skip_message(Acc0); +handle_message(_Worker, Reply, {WaitingCount, R, Replies}) -> + case merge_read_reply(make_key(Reply), Reply, Replies) of + {_, KeyCount} when KeyCount =:= R -> + {stop, Reply}; + {NewReplies, KeyCount} when KeyCount < R -> + if WaitingCount =:= 1 -> + % last message arrived, but still no quorum + repair_read_quorum_failure(NewReplies); + true -> + {ok, {WaitingCount-1, R, NewReplies}} + end + end. + +skip_message({1, _R, Replies}) -> + repair_read_quorum_failure(Replies); +skip_message({WaitingCount, R, Replies}) -> + {ok, {WaitingCount-1, R, Replies}}. + +merge_read_reply(Key, Reply, Replies) -> + case lists:keyfind(Key, 1, Replies) of + false -> + {[{Key, Reply, 1} | Replies], 1}; + {Key, _, N} -> + {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} + end. + +make_key({ok, #doc{id=Id, revs=Revs}}) -> + {Id, Revs}; +make_key(Else) -> + Else. + +repair_read_quorum_failure(Replies) -> + case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of + [] -> + {stop, {not_found, missing}}; + [Doc|_] -> + % TODO merge docs to find the winner as determined by replication + {stop, {ok, Doc}} + end. + +
\ No newline at end of file diff --git a/src/fabric_update_docs.erl b/src/fabric_update_docs.erl new file mode 100644 index 00000000..00c7e9d7 --- /dev/null +++ b/src/fabric_update_docs.erl @@ -0,0 +1,92 @@ +-module(fabric_update_docs). +-export([go/3]). +-include("fabric.hrl"). + +go(DbName, AllDocs, Options) -> + GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> + Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), + {Shard#shard{ref=Ref}, Docs} + end, group_docs_by_shard(DbName, AllDocs)), + {Workers, _} = lists:unzip(GroupedDocs), + Acc0 = {length(Workers), length(AllDocs), couch_util:get_value(w, Options, 1), + GroupedDocs, dict:new()}, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + {ok, Results} -> + {ok, couch_util:reorder_results(AllDocs, Results)}; + Else -> + Else + end. + +handle_message(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_message(_Worker, {rexi_EXIT, _}, Acc0) -> + skip_message(Acc0); +handle_message(Worker, {ok, Replies}, Acc0) -> + {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, + Docs = couch_util:get_value(Worker, GroupedDocs), + DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), + case {WaitingCount, dict:size(DocReplyDict)} of + {1, _} -> + % last message has arrived, we need to conclude things + {W, Reply} = dict:fold(fun force_reply/3, {W,[]}, DocReplyDict), + {stop, Reply}; + {_, DocCount} -> + % we've got at least one reply for each document, let's take a look + case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of + continue -> + {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}; + {stop, W, FinalReplies} -> + {stop, FinalReplies} + end; + {_, N} when N < DocCount -> + % no point in trying to finalize anything yet + {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}} + end. + +force_reply(Doc, Replies, {W, Acc}) -> + % TODO make a real decision here + case Replies of + [] -> + {W, [{Doc, {error, internal_server_error}} | Acc]}; + [Reply| _] -> + {W, [{Doc, Reply} | Acc]} + end. + +maybe_reply(_, _, continue) -> + % we didn't meet quorum for all docs, so we're fast-forwarding the fold + continue; +maybe_reply(Doc, Replies, {stop, W, Acc}) -> + case update_quorum_met(W, Replies) of + {true, Reply} -> + {stop, W, [{Doc, Reply} | Acc]}; + false -> + continue + end. + +update_quorum_met(W, Replies) -> + % TODO make a real decision here + case length(Replies) >= W of + true -> + {true, hd(Replies)}; + false -> + false + end. + +-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. +group_docs_by_shard(DbName, Docs) -> + dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, Doc, D1) + end, D0, partitions:for_key(DbName,Id)) + end, dict:new(), Docs)). + +append_update_replies([], [], DocReplyDict) -> + DocReplyDict; +append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> + % TODO what if the same document shows up twice in one update_docs call? + append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). + +skip_message(Acc0) -> + % TODO fix this + {ok, Acc0}. + |