diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-05-28 10:17:30 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-05-28 10:19:12 -0400 |
commit | a60895fedef53626b622f04af4d20a5792cfbd7d (patch) | |
tree | a97fd5d14f32e57cf7ae0d71c2c1b413c176674d | |
parent | 6c7e3665cd941083dedb8ead5e9314f3c531ff89 (diff) |
lots of work, expecially update_docs
-rw-r--r-- | src/fabric_doc.erl | 143 | ||||
-rw-r--r-- | src/fabric_rpc.erl | 8 | ||||
-rw-r--r-- | src/fabric_util.erl | 2 |
3 files changed, 122 insertions, 31 deletions
diff --git a/src/fabric_doc.erl b/src/fabric_doc.erl index 407a9187..c3cc0f47 100644 --- a/src/fabric_doc.erl +++ b/src/fabric_doc.erl @@ -1,15 +1,14 @@ -module(fabric_doc). --export([open_doc/3, open_doc_revs/4, get_missing_revs/2, update_docs/3]). +-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]). -include("../../couch/src/couch_db.hrl"). -include("../../dynomite/include/membership.hrl"). -open_doc(DbName, DocId, Opts) -> - Shards = partitions:for_key(DbName, DocId), - Workers = fabric_util:submit_jobs(Shards, open_doc, [DocId, [deleted|Opts]]), - Acc0 = {length(Workers), couch_util:get_value(r, Opts, 1), []}, - ?LOG_INFO("Workers ~p Acc0 ~p", [Workers, Acc0]), - SuppressDeletedDoc = not lists:member(deleted, Opts), +open_doc(DbName, Id, Options) -> + Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_doc, + [Id, Options]), + SuppressDeletedDoc = not lists:member(deleted, Options), + Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, case fabric_util:recv(Workers, #shard.ref, fun handle_open_doc/3, Acc0) of {ok, #doc{deleted=true}} when SuppressDeletedDoc -> {not_found, deleted}; @@ -19,31 +18,36 @@ open_doc(DbName, DocId, Opts) -> Else end. -open_doc_revs(DbName, DocId, Revs, Options) -> - ok. +open_revs(DbName, Id, Revs, Options) -> + Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_revs, + [Id, Revs, Options]), + Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []}, + fabric_util:recv(Workers, #shard.ref, fun handle_open_revs/3, Acc0). -update_docs(DbName, Docs, Options) -> - ok. +update_docs(DbName, AllDocs, Options) -> + GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> + Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}), + {Shard#shard{ref=Ref}, Docs} + end, group_docs_by_shard(DbName, AllDocs)), + {Workers, _} = lists:unzip(GroupedDocs), + Acc0 = {length(Workers), length(AllDocs), couch_util:get_value(w, Options, 1), + GroupedDocs, dict:new()}, + case fabric_util:recv(Workers, #shard.ref, fun handle_update_docs/3, Acc0) of + {ok, Results} -> + {ok, couch_util:reorder_results(AllDocs, Results)}; + Else -> + Else + end. -get_missing_revs(DbName, IdsRevs) -> +get_missing_revs(_DbName, _IdsRevs) -> ok. -handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, {WaitingCount, R, Replies}) -> - if WaitingCount =:= 1 -> - repair_read_quorum_failure(Replies); - true -> - {ok, {WaitingCount-1, R, Replies}} - end; -handle_open_doc(_Worker, {rexi_EXIT, Reason}, {WaitingCount, R, Replies}) -> - ?LOG_ERROR("open_doc rexi_EXIT ~p", [Reason]), - if WaitingCount =:= 1 -> - repair_read_quorum_failure(Replies); - true -> - {ok, {WaitingCount-1, R, Replies}} - end; +handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_open_doc(_Worker, {rexi_EXIT, _Reason}, Acc0) -> + skip_message(Acc0); handle_open_doc(_Worker, Reply, {WaitingCount, R, Replies}) -> - ?LOG_INFO("got ~p when ~p ~p ~p", [Reply, WaitingCount, R, Replies]), - case merge_replies(make_key(Reply), Reply, Replies) of + case merge_read_reply(make_key(Reply), Reply, Replies) of {_, KeyCount} when KeyCount =:= R -> {stop, Reply}; {NewReplies, KeyCount} when KeyCount < R -> @@ -55,7 +59,82 @@ handle_open_doc(_Worker, Reply, {WaitingCount, R, Replies}) -> end end. -merge_replies(Key, Reply, Replies) -> +handle_open_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_open_revs(_Worker, {rexi_EXIT, _}, Acc0) -> + skip_message(Acc0); +handle_open_revs(_Worker, _Reply, {_WaitingCount, _R, _Replies}) -> + {stop, not_implemented}. + +handle_update_docs(_Worker, {rexi_DOWN, _, _, _}, Acc0) -> + skip_message(Acc0); +handle_update_docs(_Worker, {rexi_EXIT, _}, Acc0) -> + skip_message(Acc0); +handle_update_docs(Worker, {ok, Replies}, Acc0) -> + {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, + Docs = couch_util:get_value(Worker, GroupedDocs), + DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), + case {WaitingCount, dict:size(DocReplyDict)} of + {1, _} -> + % last message has arrived, we need to conclude things + {W, Reply} = dict:fold(fun force_update_reply/3, {W,[]}, DocReplyDict), + {stop, Reply}; + {_, DocCount} -> + % we've got at least one reply for each document, let's take a look + case dict:fold(fun maybe_update_reply/3, {stop,W,[]}, DocReplyDict) of + continue -> + {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}; + {stop, W, FinalReplies} -> + {stop, FinalReplies} + end; + {_, N} when N < DocCount -> + % no point in trying to finalize anything yet + {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}} + end. + +force_update_reply(Doc, Replies, {W, Acc}) -> + % TODO make a real decision here + case Replies of + [] -> + {W, [{Doc, {error, internal_server_error}} | Acc]}; + [Reply| _] -> + {W, [{Doc, Reply} | Acc]} + end. + +maybe_update_reply(_, _, continue) -> + % we didn't meet quorum for all docs, so we're fast-forwarding the fold + continue; +maybe_update_reply(Doc, Replies, {stop, W, Acc}) -> + case update_quorum_met(W, Replies) of + {true, Reply} -> + {stop, W, [{Doc, Reply} | Acc]}; + false -> + continue + end. + +update_quorum_met(W, Replies) -> + % TODO make a real decision here + case length(Replies) >= W of + true -> + {true, hd(Replies)}; + false -> + false + end. + +-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. +group_docs_by_shard(DbName, Docs) -> + dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, Doc, D1) + end, D0, partitions:for_key(DbName,Id)) + end, dict:new(), Docs)). + +skip_message({1, _R, Replies}) -> + repair_read_quorum_failure(Replies); +skip_message({WaitingCount, R, Replies}) -> + {ok, {WaitingCount-1, R, Replies}}. + +merge_read_reply(Key, Reply, Replies) -> case lists:keyfind(Key, 1, Replies) of false -> {[{Key, Reply, 1} | Replies], 1}; @@ -63,6 +142,12 @@ merge_replies(Key, Reply, Replies) -> {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} end. +append_update_replies([], [], DocReplyDict) -> + DocReplyDict; +append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> + % TODO what if the same document shows up twice in one update_docs call? + append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). + make_key({ok, #doc{id=Id, revs=Revs}}) -> {Id, Revs}; make_key({not_found, missing}) -> @@ -72,7 +157,7 @@ repair_read_quorum_failure(Replies) -> case [Doc || {ok, Doc} <- Replies] of [] -> {stop, {not_found, missing}}; - [Doc|Rest] -> + [Doc|_] -> % TODO merge docs to find the winner as determined by replication {stop, {ok, Doc}} end. diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl index a0c0a568..c54ec247 100644 --- a/src/fabric_rpc.erl +++ b/src/fabric_rpc.erl @@ -1,10 +1,11 @@ -module(fabric_rpc). --export([open_doc/3, open_doc/4, get_db_info/1]). +-export([open_doc/3, open_doc/4, get_db_info/1, update_docs/3]). -include_lib("eunit/include/eunit.hrl"). open_doc(DbName, DocId, Options) -> - with_db(DbName, {couch_db, open_doc, [DocId, Options]}). + io:format("~p ~p ~p ~p~n", [?MODULE, DbName, DocId, Options]), + with_db(DbName, {couch_db, open_doc_int, [DocId, Options]}). %% rpc endpoints %% call to with_db will supply your M:F with a #db{} and then remaining args @@ -15,6 +16,9 @@ open_doc(DbName, DocId, Revs, Options) -> get_db_info(DbName) -> with_db(DbName, {couch_db, get_db_info, []}). +update_docs(DbName, Docs, Options) -> + with_db(DbName, {couch_db, update_docs, [Docs, Options]}). + %% %% internal %% diff --git a/src/fabric_util.erl b/src/fabric_util.erl index 38f0b9f3..936ba4ee 100644 --- a/src/fabric_util.erl +++ b/src/fabric_util.erl @@ -7,6 +7,7 @@ submit_jobs(Shards, EndPoint, ExtraArgs) -> lists:map(fun(#shard{node=Node, name=ShardName} = Shard) -> + io:format("submitting ~p ~p~n", [Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}]), Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}), Shard#shard{ref = Ref} end, Shards). @@ -50,6 +51,7 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) -> {timeout, TimeoutRef} -> timeout; {Ref, Msg} -> + io:format("process_message ~p ~p~n", [Ref, Msg]), case lists:keyfind(Ref, Keypos, RefList) of false -> % this was some non-matching message which we will ignore |