summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-05-28 10:17:30 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-05-28 10:19:12 -0400
commita60895fedef53626b622f04af4d20a5792cfbd7d (patch)
treea97fd5d14f32e57cf7ae0d71c2c1b413c176674d
parent6c7e3665cd941083dedb8ead5e9314f3c531ff89 (diff)
lots of work, expecially update_docs
-rw-r--r--src/fabric_doc.erl143
-rw-r--r--src/fabric_rpc.erl8
-rw-r--r--src/fabric_util.erl2
3 files changed, 122 insertions, 31 deletions
diff --git a/src/fabric_doc.erl b/src/fabric_doc.erl
index 407a9187..c3cc0f47 100644
--- a/src/fabric_doc.erl
+++ b/src/fabric_doc.erl
@@ -1,15 +1,14 @@
-module(fabric_doc).
--export([open_doc/3, open_doc_revs/4, get_missing_revs/2, update_docs/3]).
+-export([open_doc/3, open_revs/4, get_missing_revs/2, update_docs/3]).
-include("../../couch/src/couch_db.hrl").
-include("../../dynomite/include/membership.hrl").
-open_doc(DbName, DocId, Opts) ->
- Shards = partitions:for_key(DbName, DocId),
- Workers = fabric_util:submit_jobs(Shards, open_doc, [DocId, [deleted|Opts]]),
- Acc0 = {length(Workers), couch_util:get_value(r, Opts, 1), []},
- ?LOG_INFO("Workers ~p Acc0 ~p", [Workers, Acc0]),
- SuppressDeletedDoc = not lists:member(deleted, Opts),
+open_doc(DbName, Id, Options) ->
+ Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_doc,
+ [Id, Options]),
+ SuppressDeletedDoc = not lists:member(deleted, Options),
+ Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
case fabric_util:recv(Workers, #shard.ref, fun handle_open_doc/3, Acc0) of
{ok, #doc{deleted=true}} when SuppressDeletedDoc ->
{not_found, deleted};
@@ -19,31 +18,36 @@ open_doc(DbName, DocId, Opts) ->
Else
end.
-open_doc_revs(DbName, DocId, Revs, Options) ->
- ok.
+open_revs(DbName, Id, Revs, Options) ->
+ Workers = fabric_util:submit_jobs(partitions:for_key(DbName,Id), open_revs,
+ [Id, Revs, Options]),
+ Acc0 = {length(Workers), couch_util:get_value(r, Options, 1), []},
+ fabric_util:recv(Workers, #shard.ref, fun handle_open_revs/3, Acc0).
-update_docs(DbName, Docs, Options) ->
- ok.
+update_docs(DbName, AllDocs, Options) ->
+ GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
+ Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+ {Shard#shard{ref=Ref}, Docs}
+ end, group_docs_by_shard(DbName, AllDocs)),
+ {Workers, _} = lists:unzip(GroupedDocs),
+ Acc0 = {length(Workers), length(AllDocs), couch_util:get_value(w, Options, 1),
+ GroupedDocs, dict:new()},
+ case fabric_util:recv(Workers, #shard.ref, fun handle_update_docs/3, Acc0) of
+ {ok, Results} ->
+ {ok, couch_util:reorder_results(AllDocs, Results)};
+ Else ->
+ Else
+ end.
-get_missing_revs(DbName, IdsRevs) ->
+get_missing_revs(_DbName, _IdsRevs) ->
ok.
-handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, {WaitingCount, R, Replies}) ->
- if WaitingCount =:= 1 ->
- repair_read_quorum_failure(Replies);
- true ->
- {ok, {WaitingCount-1, R, Replies}}
- end;
-handle_open_doc(_Worker, {rexi_EXIT, Reason}, {WaitingCount, R, Replies}) ->
- ?LOG_ERROR("open_doc rexi_EXIT ~p", [Reason]),
- if WaitingCount =:= 1 ->
- repair_read_quorum_failure(Replies);
- true ->
- {ok, {WaitingCount-1, R, Replies}}
- end;
+handle_open_doc(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_open_doc(_Worker, {rexi_EXIT, _Reason}, Acc0) ->
+ skip_message(Acc0);
handle_open_doc(_Worker, Reply, {WaitingCount, R, Replies}) ->
- ?LOG_INFO("got ~p when ~p ~p ~p", [Reply, WaitingCount, R, Replies]),
- case merge_replies(make_key(Reply), Reply, Replies) of
+ case merge_read_reply(make_key(Reply), Reply, Replies) of
{_, KeyCount} when KeyCount =:= R ->
{stop, Reply};
{NewReplies, KeyCount} when KeyCount < R ->
@@ -55,7 +59,82 @@ handle_open_doc(_Worker, Reply, {WaitingCount, R, Replies}) ->
end
end.
-merge_replies(Key, Reply, Replies) ->
+handle_open_revs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_open_revs(_Worker, {rexi_EXIT, _}, Acc0) ->
+ skip_message(Acc0);
+handle_open_revs(_Worker, _Reply, {_WaitingCount, _R, _Replies}) ->
+ {stop, not_implemented}.
+
+handle_update_docs(_Worker, {rexi_DOWN, _, _, _}, Acc0) ->
+ skip_message(Acc0);
+handle_update_docs(_Worker, {rexi_EXIT, _}, Acc0) ->
+ skip_message(Acc0);
+handle_update_docs(Worker, {ok, Replies}, Acc0) ->
+ {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
+ Docs = couch_util:get_value(Worker, GroupedDocs),
+ DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
+ case {WaitingCount, dict:size(DocReplyDict)} of
+ {1, _} ->
+ % last message has arrived, we need to conclude things
+ {W, Reply} = dict:fold(fun force_update_reply/3, {W,[]}, DocReplyDict),
+ {stop, Reply};
+ {_, DocCount} ->
+ % we've got at least one reply for each document, let's take a look
+ case dict:fold(fun maybe_update_reply/3, {stop,W,[]}, DocReplyDict) of
+ continue ->
+ {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}};
+ {stop, W, FinalReplies} ->
+ {stop, FinalReplies}
+ end;
+ {_, N} when N < DocCount ->
+ % no point in trying to finalize anything yet
+ {ok, {WaitingCount - 1, DocCount, W, GroupedDocs, DocReplyDict}}
+ end.
+
+force_update_reply(Doc, Replies, {W, Acc}) ->
+ % TODO make a real decision here
+ case Replies of
+ [] ->
+ {W, [{Doc, {error, internal_server_error}} | Acc]};
+ [Reply| _] ->
+ {W, [{Doc, Reply} | Acc]}
+ end.
+
+maybe_update_reply(_, _, continue) ->
+ % we didn't meet quorum for all docs, so we're fast-forwarding the fold
+ continue;
+maybe_update_reply(Doc, Replies, {stop, W, Acc}) ->
+ case update_quorum_met(W, Replies) of
+ {true, Reply} ->
+ {stop, W, [{Doc, Reply} | Acc]};
+ false ->
+ continue
+ end.
+
+update_quorum_met(W, Replies) ->
+ % TODO make a real decision here
+ case length(Replies) >= W of
+ true ->
+ {true, hd(Replies)};
+ false ->
+ false
+ end.
+
+-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
+group_docs_by_shard(DbName, Docs) ->
+ dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
+ lists:foldl(fun(Shard, D1) ->
+ dict:append(Shard, Doc, D1)
+ end, D0, partitions:for_key(DbName,Id))
+ end, dict:new(), Docs)).
+
+skip_message({1, _R, Replies}) ->
+ repair_read_quorum_failure(Replies);
+skip_message({WaitingCount, R, Replies}) ->
+ {ok, {WaitingCount-1, R, Replies}}.
+
+merge_read_reply(Key, Reply, Replies) ->
case lists:keyfind(Key, 1, Replies) of
false ->
{[{Key, Reply, 1} | Replies], 1};
@@ -63,6 +142,12 @@ merge_replies(Key, Reply, Replies) ->
{lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
end.
+append_update_replies([], [], DocReplyDict) ->
+ DocReplyDict;
+append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
+ % TODO what if the same document shows up twice in one update_docs call?
+ append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
+
make_key({ok, #doc{id=Id, revs=Revs}}) ->
{Id, Revs};
make_key({not_found, missing}) ->
@@ -72,7 +157,7 @@ repair_read_quorum_failure(Replies) ->
case [Doc || {ok, Doc} <- Replies] of
[] ->
{stop, {not_found, missing}};
- [Doc|Rest] ->
+ [Doc|_] ->
% TODO merge docs to find the winner as determined by replication
{stop, {ok, Doc}}
end.
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index a0c0a568..c54ec247 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -1,10 +1,11 @@
-module(fabric_rpc).
--export([open_doc/3, open_doc/4, get_db_info/1]).
+-export([open_doc/3, open_doc/4, get_db_info/1, update_docs/3]).
-include_lib("eunit/include/eunit.hrl").
open_doc(DbName, DocId, Options) ->
- with_db(DbName, {couch_db, open_doc, [DocId, Options]}).
+ io:format("~p ~p ~p ~p~n", [?MODULE, DbName, DocId, Options]),
+ with_db(DbName, {couch_db, open_doc_int, [DocId, Options]}).
%% rpc endpoints
%% call to with_db will supply your M:F with a #db{} and then remaining args
@@ -15,6 +16,9 @@ open_doc(DbName, DocId, Revs, Options) ->
get_db_info(DbName) ->
with_db(DbName, {couch_db, get_db_info, []}).
+update_docs(DbName, Docs, Options) ->
+ with_db(DbName, {couch_db, update_docs, [Docs, Options]}).
+
%%
%% internal
%%
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index 38f0b9f3..936ba4ee 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -7,6 +7,7 @@
submit_jobs(Shards, EndPoint, ExtraArgs) ->
lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
+ io:format("submitting ~p ~p~n", [Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}]),
Ref = rexi:cast(Node, {fabric_rpc, EndPoint, [ShardName | ExtraArgs]}),
Shard#shard{ref = Ref}
end, Shards).
@@ -50,6 +51,7 @@ process_message(RefList, Keypos, Fun, Acc0, TimeoutRef, PerMsgTO) ->
{timeout, TimeoutRef} ->
timeout;
{Ref, Msg} ->
+ io:format("process_message ~p ~p~n", [Ref, Msg]),
case lists:keyfind(Ref, Keypos, RefList) of
false ->
% this was some non-matching message which we will ignore