summaryrefslogtreecommitdiff
path: root/apps
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-10-19 11:37:08 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-10-19 11:37:08 -0400
commit4074a1f187193df9d13c5b9f35545833ae245a63 (patch)
tree1c645fb52db89149b649cc25b557c86178f1da8a /apps
parentdaf6bdcfe7d2d4eb2f69083973751ff85bb63a3d (diff)
improved read quorum and repair for open_revs
There are important distinctions between requests with open_revs=all, requests with an explicit revision list and latest=true, and a simple revision list. When open_revs=all, we take the union of all revisions returned by the workers, then reduce it by filtering out ancestors on each edit branch. An ancestor counts towards the descendant's read quorum. If an explicit revision list is given, a worker can respond with {{not_found, missing}, Rev}, an {ok, #doc{}} corresponding to the rev in question, or an {ok, #doc{}} with a newer revision if latest=true is supplied. not_found is considered to be an ancestor of any {ok,_} revision for quorum purposes. Down workers do not contribute to the read quorum. If workers disagree at all on the response, read repair will run asynchronously. The repair tries to save all the latest #doc{} records (i.e. the ones w/o descendants) which were received during the voting. BugzID 11047
Diffstat (limited to 'apps')
-rw-r--r--apps/fabric/src/fabric_doc_open_revs.erl318
1 files changed, 277 insertions, 41 deletions
diff --git a/apps/fabric/src/fabric_doc_open_revs.erl b/apps/fabric/src/fabric_doc_open_revs.erl
index 8dbd1aba..722d23a5 100644
--- a/apps/fabric/src/fabric_doc_open_revs.erl
+++ b/apps/fabric/src/fabric_doc_open_revs.erl
@@ -1,5 +1,5 @@
% Copyright 2010 Cloudant
-%
+%
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
@@ -19,65 +19,301 @@
-include("fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-record(state, {
+ dbname,
+ worker_count,
+ reply_count = 0,
+ r,
+ revs,
+ latest,
+ replies = []
+}).
go(DbName, Id, Revs, Options) ->
Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs,
[Id, Revs, Options]),
R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")),
- Acc0 = {length(Workers), list_to_integer(R), []},
- case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+ State = #state{
+ dbname = DbName,
+ worker_count = length(Workers),
+ r = list_to_integer(R),
+ revs = Revs,
+ latest = lists:member(latest, Options),
+ replies = case Revs of all -> []; Revs -> [{Rev,[]} || Rev <- Revs] end
+ },
+ case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of
{ok, {ok, Reply}} ->
{ok, Reply};
Else ->
Else
end.
-handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message({rexi_EXIT, _}, _Worker, Acc0) ->
- skip_message(Acc0);
-handle_message(Reply, _Worker, {WaitingCount, R, Replies}) ->
- case merge_read_reply(make_key(Reply), Reply, Replies) of
- {_, KeyCount} when KeyCount =:= R ->
- {stop, Reply};
- {NewReplies, KeyCount} when KeyCount < R ->
- if WaitingCount =:= 1 ->
- % last message arrived, but still no quorum
- repair_read_quorum_failure(NewReplies);
- true ->
- {ok, {WaitingCount-1, R, NewReplies}}
+handle_message({rexi_DOWN, _, _, _}, _Worker, State) ->
+ skip(State);
+handle_message({rexi_EXIT, _}, _Worker, State) ->
+ skip(State);
+handle_message({ok, RawReplies}, _Worker, #state{revs = all} = State) ->
+ #state{
+ dbname = DbName,
+ reply_count = ReplyCount,
+ worker_count = WorkerCount,
+ replies = All0,
+ r = R
+ } = State,
+ All = lists:foldl(fun(Reply,D) -> orddict:update_counter(Reply,1,D) end,
+ All0, RawReplies),
+ Reduced = remove_ancestors(All, []),
+ Complete = (ReplyCount =:= (WorkerCount - 1)),
+ Repair = case Reduced of All -> false; _ ->
+ [D || {{ok,D}, _} <- Reduced]
+ end,
+ case maybe_reply(DbName, Reduced, Complete, Repair, R) of
+ noreply ->
+ {ok, State#state{replies = All, reply_count = ReplyCount+1}};
+ {reply, FinalReply} ->
+ {stop, FinalReply}
+ end;
+handle_message({ok, RawReplies0}, _Worker, State) ->
+ % we've got an explicit revision list, but if latest=true the workers may
+ % return a descendant of the requested revision. Take advantage of the
+ % fact that revisions are returned in order to keep track.
+ RawReplies = strip_not_found_missing(RawReplies0),
+ #state{
+ dbname = DbName,
+ reply_count = ReplyCount,
+ worker_count = WorkerCount,
+ replies = All0,
+ r = R
+ } = State,
+ All = lists:zipwith(fun({Rev, D}, Reply) ->
+ if Reply =:= error -> {Rev, D}; true ->
+ {Rev, orddict:update_counter(Reply, 1, D)}
end
+ end, All0, RawReplies),
+ Reduced = [remove_ancestors(X, []) || {_, X} <- All],
+ FinalReplies = [choose_winner(X, R) || X <- Reduced],
+ Complete = (ReplyCount =:= (WorkerCount - 1)),
+ case is_repair_needed(All, FinalReplies) of
+ true ->
+ Repair = [D || {{ok,D}, _} <- lists:flatten(Reduced)];
+ false ->
+ Repair = false
+ end,
+ case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of
+ noreply ->
+ {ok, State#state{replies = All, reply_count = ReplyCount+1}};
+ {reply, FinalReply} ->
+ {stop, FinalReply}
end.
-skip_message({1, _R, Replies}) ->
- repair_read_quorum_failure(Replies);
-skip_message({WaitingCount, R, Replies}) ->
- {ok, {WaitingCount-1, R, Replies}}.
+skip(#state{revs=all} = State) ->
+ handle_message({ok, []}, nil, State);
+skip(#state{revs=Revs} = State) ->
+ handle_message({ok, [error || _Rev <- Revs]}, nil, State).
-merge_read_reply(Key, Reply, Replies) ->
- case lists:keyfind(Key, 1, Replies) of
+maybe_reply(DbName, ReplyDict, IsComplete, RepairDocs, R) ->
+ case lists:all(fun({_, C}) -> C >= R end, ReplyDict) of
+ true ->
+ maybe_execute_read_repair(DbName, RepairDocs),
+ {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))};
false ->
- {[{Key, Reply, 1} | Replies], 1};
- {Key, _, N} ->
- {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1}
+ case IsComplete of false -> noreply; true ->
+ maybe_execute_read_repair(DbName, RepairDocs),
+ {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))}
+ end
end.
-make_key({ok, L}) when is_list(L) ->
- make_key(L);
-make_key([]) ->
- [];
-make_key([{ok, #doc{revs= {Pos,[RevId | _]}}} | Rest]) ->
- [{ok, {Pos, RevId}} | make_key(Rest)];
-make_key([{{not_found, missing}, Rev} | Rest]) ->
- [{not_found, Rev} | make_key(Rest)].
+choose_winner(Options, R) ->
+ case lists:dropwhile(fun({_Reply, C}) -> C < R end, Options) of
+ [] ->
+ case [Elem || {{ok, #doc{}}, _} = Elem <- Options] of
+ [] ->
+ hd(Options);
+ Docs ->
+ lists:last(lists:sort(Docs))
+ end;
+ [QuorumMet | _] ->
+ QuorumMet
+ end.
-repair_read_quorum_failure(Replies) ->
- case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of
+% repair needed if any reply other than the winner has been received for a rev
+is_repair_needed([], []) ->
+ false;
+is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) ->
+ is_repair_needed(Tail1, Tail2);
+is_repair_needed([H1|_], [H2|_]) ->
+ true.
+
+% this presumes the incoming list is sorted, i.e. shorter revlists come first
+remove_ancestors([], Acc) ->
+ lists:reverse(Acc);
+remove_ancestors([{{not_found, _}, Count} = Head | Tail], Acc) ->
+ % any document is a descendant
+ case lists:filter(fun({{ok, #doc{}}, _}) -> true; (_) -> false end, Tail) of
+ [{{ok, #doc{}} = Descendant, _} | _] ->
+ remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc);
[] ->
- {stop, {not_found, missing}};
- [Doc|_] ->
- % TODO merge docs to find the winner as determined by replication
- {stop, {ok, Doc}}
+ remove_ancestors(Tail, [Head | Acc])
+ end;
+remove_ancestors([{{ok, #doc{revs = {Pos, Revs}}}, Count} = Head | Tail], Acc) ->
+ Descendants = lists:dropwhile(fun
+ ({{ok, #doc{revs = {Pos2, Revs2}}}, _}) ->
+ case lists:nthtail(Pos2 - Pos, Revs2) of
+ [] ->
+ % impossible to tell if Revs2 is a descendant - assume no
+ true;
+ History ->
+ % if Revs2 is a descendant, History is a prefix of Revs
+ not lists:prefix(History, Revs)
+ end
+ end, Tail),
+ case Descendants of [] ->
+ remove_ancestors(Tail, [Head | Acc]);
+ [{Descendant, _} | _] ->
+ remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc)
end.
- \ No newline at end of file
+maybe_execute_read_repair(_Db, false) ->
+ ok;
+maybe_execute_read_repair(Db, Docs) ->
+ spawn(fun() ->
+ [#doc{id=Id} | _] = Docs,
+ Ctx = #user_ctx{roles=[<<"_admin">>]},
+ Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]),
+ ?LOG_INFO("read_repair ~s ~s ~p", [Db, Id, Res])
+ end).
+
+% hackery required so that not_found sorts first
+strip_not_found_missing([]) ->
+ [];
+strip_not_found_missing([{{not_found, missing}, Rev} | Rest]) ->
+ [{not_found, Rev} | strip_not_found_missing(Rest)];
+strip_not_found_missing([Else | Rest]) ->
+ [Else | strip_not_found_missing(Rest)].
+
+unstrip_not_found_missing([]) ->
+ [];
+unstrip_not_found_missing([{not_found, Rev} | Rest]) ->
+ [{{not_found, missing}, Rev} | unstrip_not_found_missing(Rest)];
+unstrip_not_found_missing([Else | Rest]) ->
+ [Else | unstrip_not_found_missing(Rest)].
+
+remove_ancestors_test() ->
+ Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
+ Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
+ Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
+ Bar2 = {not_found, {1,<<"bar">>}},
+ ?assertEqual(
+ [{Bar1,1}, {Foo1,1}],
+ remove_ancestors([{Bar1,1}, {Foo1,1}], [])
+ ),
+ ?assertEqual(
+ [{Bar1,1}, {Foo2,2}],
+ remove_ancestors([{Bar1,1}, {Foo1,1}, {Foo2,1}], [])
+ ),
+ ?assertEqual(
+ [{Bar1,2}],
+ remove_ancestors([{Bar2,1}, {Bar1,1}], [])
+ ).
+
+all_revs_test() ->
+ State0 = #state{worker_count = 3, r = 2, revs = all},
+ Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
+ Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
+ Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({ok, [Foo1, Bar1]}, nil, State0)
+ ),
+ {ok, State1} = handle_message({ok, [Foo1, Bar1]}, nil, State0),
+
+ % the normal case - workers agree
+ ?assertEqual(
+ {stop, [Bar1, Foo1]},
+ handle_message({ok, [Foo1, Bar1]}, nil, State1)
+ ),
+
+ % a case where the 2nd worker has a newer Foo - currently we're considering
+ % Foo to have reached quorum and execute_read_repair()
+ ?assertEqual(
+ {stop, [Bar1, Foo2]},
+ handle_message({ok, [Foo2, Bar1]}, nil, State1)
+ ),
+
+ % a case where quorum has not yet been reached for Foo
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({ok, [Bar1]}, nil, State1)
+ ),
+ {ok, State2} = handle_message({ok, [Bar1]}, nil, State1),
+
+ % still no quorum, but all workers have responded. We include Foo1 in the
+ % response and execute_read_repair()
+ ?assertEqual(
+ {stop, [Bar1, Foo1]},
+ handle_message({ok, [Bar1]}, nil, State2)
+ ).
+
+specific_revs_test() ->
+ Revs = [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}],
+ State0 = #state{
+ worker_count = 3,
+ r = 2,
+ revs = Revs,
+ latest = false,
+ replies = [{Rev,[]} || Rev <- Revs]
+ },
+ Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
+ Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
+ Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
+ Baz1 = {{not_found, missing}, {1,<<"baz">>}},
+ Baz2 = {ok, #doc{revs = {1, [<<"baz">>]}}},
+
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0)
+ ),
+ {ok, State1} = handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0),
+
+ % the normal case - workers agree
+ ?assertEqual(
+ {stop, [Foo1, Bar1, Baz1]},
+ handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1)
+ ),
+
+ % latest=true, worker responds with Foo2 and we return it
+ State0L = State0#state{latest = true},
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L)
+ ),
+ {ok, State1L} = handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L),
+ ?assertEqual(
+ {stop, [Foo2, Bar1, Baz1]},
+ handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State1L)
+ ),
+
+ % Foo1 is included in the read quorum for Foo2
+ ?assertEqual(
+ {stop, [Foo2, Bar1, Baz1]},
+ handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1L)
+ ),
+
+ % {not_found, missing} is included in the quorum for any found revision
+ ?assertEqual(
+ {stop, [Foo2, Bar1, Baz2]},
+ handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State1L)
+ ),
+
+ % a worker failure is skipped
+ ?assertMatch(
+ {ok, #state{}},
+ handle_message({rexi_EXIT, foo}, nil, State1L)
+ ),
+ {ok, State2L} = handle_message({rexi_EXIT, foo}, nil, State1L),
+ ?assertEqual(
+ {stop, [Foo2, Bar1, Baz2]},
+ handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State2L)
+ ).