summaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-10-19 17:17:08 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-10-19 17:17:08 -0400
commited231d0fd1cf7647cc7a154de81e85cd584bc8b5 (patch)
treeda3c49e4ef1fa06f122ee27c9d2cf3b1a100f92c /apps
parent1fc998d21aeb437064324b560f1ac84887e37d6f (diff)
use new read repair and ancestry checks in fabric:open_doc/3
Diffstat (limited to 'apps')
-rw-r--r--apps/fabric/src/fabric_doc_open.erl101
1 files changed, 70 insertions, 31 deletions
diff --git a/apps/fabric/src/fabric_doc_open.erl b/apps/fabric/src/fabric_doc_open.erl
index 1530515d..9131c94b 100644
--- a/apps/fabric/src/fabric_doc_open.erl
+++ b/apps/fabric/src/fabric_doc_open.erl
@@ -27,54 +27,93 @@ go(DbName, Id, Options) ->
R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")),
Acc0 = {length(Workers), list_to_integer(R), []},
case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
- {ok, {ok, #doc{deleted=true}}} when SuppressDeletedDoc ->
- {not_found, deleted};
- {ok, Else} ->
- Else;
+ {ok, Reply} ->
+ format_reply(Reply, SuppressDeletedDoc);
+ {error, needs_repair, Reply} ->
+ spawn(fabric, open_revs, [DbName, Id, all, Options]),
+ format_reply(Reply, SuppressDeletedDoc);
+ {error, needs_repair} ->
+ % we couldn't determine the correct reply, so we'll run a sync repair
+ {ok, Results} = fabric:open_revs(DbName, Id, all, Options),
+ case lists:partition(fun({ok, #doc{deleted=Del}}) -> Del end, Results) of
+ {[], []} ->
+ {not_found, missing};
+ {_DeletedDocs, []} when SuppressDeletedDoc ->
+ {not_found, deleted};
+ {DeletedDocs, []} ->
+ lists:last(lists:sort(DeletedDocs));
+ {_, LiveDocs} ->
+ lists:last(lists:sort(LiveDocs))
+ end;
Error ->
Error
end.
+format_reply({ok, #doc{deleted=true}}, true) ->
+ {not_found, deleted};
+format_reply(Else, _) ->
+ Else.
+
handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
skip_message(Acc0);
handle_message({rexi_EXIT, _Reason}, _Worker, Acc0) ->
skip_message(Acc0);
handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
- case merge_read_reply(make_key(Reply), Reply, Replies) of
- {_, KeyCount} when KeyCount =:= R ->
- {stop, Reply};
- {NewReplies, KeyCount} when KeyCount < R ->
+ NewReplies = orddict:update_counter(Reply, 1, Replies),
+ Reduced = fabric_util:remove_ancestors(NewReplies, []),
+ case lists:dropwhile(fun({_, Count}) -> Count < R end, Reduced) of
+ [{QuorumReply, _} | _] ->
+ if length(NewReplies) =:= 1 ->
+ {stop, QuorumReply};
+ true ->
+ % we had some disagreement amongst the workers, so repair is useful
+ {error, needs_repair, QuorumReply}
+ end;
+ [] ->
if WaitingCount =:= 1 ->
- % last message arrived, but still no quorum
- repair_read_quorum_failure(NewReplies);
+ {error, needs_repair};
true ->
{ok, {WaitingCount-1, R, NewReplies}}
end
end.
-skip_message({1, _R, Replies}) ->
- repair_read_quorum_failure(Replies);
+skip_message({1, _R, _Replies}) ->
+ {error, needs_repair};
skip_message({WaitingCount, R, Replies}) ->
{ok, {WaitingCount-1, R, Replies}}.
-merge_read_reply(Key, Reply, Replies) ->
- case lists:keyfind(Key, 1, Replies) of
- false ->
- {[{Key, Reply, 1} | Replies], 1};
- {Key, _, N} ->
- {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
- end.
-make_key({ok, #doc{id=Id, revs=Revs}}) ->
- {Id, Revs};
-make_key(Else) ->
- Else.
+open_doc_test() ->
+ Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
+ Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
+ Bar1 = {ok, #doc{revs = {1,[<<"bar">>]}}},
+ Baz1 = {ok, #doc{revs = {1,[<<"baz">>]}}},
+ NF = {not_found, missing},
+ State0 = {3, 2, []},
+ State1 = {2, 2, [{Foo1,1}]},
+ State2 = {1, 2, [{Bar1,1}, {Foo1,1}]},
+ ?assertEqual({ok, State1}, handle_message(Foo1, nil, State0)),
-repair_read_quorum_failure(Replies) ->
- case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
- [] ->
- {stop, {not_found, missing}};
- [Doc|_] ->
- % TODO merge docs to find the winner as determined by replication
- {stop, {ok, Doc}}
- end. \ No newline at end of file
+ % normal case - quorum reached, no disagreement
+ ?assertEqual({stop, Foo1}, handle_message(Foo1, nil, State1)),
+
+ % 2nd worker disagrees, voting continues
+ ?assertEqual({ok, State2}, handle_message(Bar1, nil, State1)),
+
+ % 3rd worker resolves voting, but repair is needed
+ ?assertEqual({error, needs_repair, Foo1}, handle_message(Foo1, nil, State2)),
+
+ % 2nd worker comes up with descendant of Foo1, voting resolved, run repair
+ ?assertEqual({error, needs_repair, Foo2}, handle_message(Foo2, nil, State1)),
+
+ % not_found is considered to be an ancestor of everybody
+ ?assertEqual({error, needs_repair, Foo1}, handle_message(NF, nil, State1)),
+
+ % 3 distinct edit branches result in quorum failure
+ ?assertEqual({error, needs_repair}, handle_message(Baz1, nil, State2)),
+
+ % bad node concludes voting w/o success, run sync repair to get the result
+ ?assertEqual(
+ {error, needs_repair},
+ handle_message({rexi_DOWN, 1, 2, 3}, nil, State2)
+ ).