diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-10-19 11:37:08 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-10-19 11:37:08 -0400 |
commit | 4074a1f187193df9d13c5b9f35545833ae245a63 (patch) | |
tree | 1c645fb52db89149b649cc25b557c86178f1da8a /apps/fabric/src | |
parent | daf6bdcfe7d2d4eb2f69083973751ff85bb63a3d (diff) |
improved read quorum and repair for open_revs
There are important distinctions between requests with open_revs=all,
requests with an explicit revision list and latest=true, and a simple
revision list. When open_revs=all, we take the union of all revisions
returned by the workers, then reduce it by filtering out ancestors on
each edit branch. An ancestor counts towards the descendant's read
quorum.
If an explicit revision list is given, a worker can respond with
{{not_found, missing}, Rev}, an {ok, #doc{}} corresponding to the rev
in question, or an {ok, #doc{}} with a newer revision if latest=true
is supplied. not_found is considered to be an ancestor of any {ok,_}
revision for quorum purposes.
Down workers do not contribute to the read quorum.
If workers disagree at all on the response, read repair will run
asynchronously. The repair tries to save all the latest #doc{} records
(i.e. the ones w/o descendants) which were received during the voting.
BugzID 11047
Diffstat (limited to 'apps/fabric/src')
-rw-r--r-- | apps/fabric/src/fabric_doc_open_revs.erl | 318 |
1 files changed, 277 insertions, 41 deletions
diff --git a/apps/fabric/src/fabric_doc_open_revs.erl b/apps/fabric/src/fabric_doc_open_revs.erl index 8dbd1aba..722d23a5 100644 --- a/apps/fabric/src/fabric_doc_open_revs.erl +++ b/apps/fabric/src/fabric_doc_open_revs.erl @@ -1,5 +1,5 @@ % Copyright 2010 Cloudant -% +% % Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at @@ -19,65 +19,301 @@ -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-record(state, { + dbname, + worker_count, + reply_count = 0, + r, + revs, + latest, + replies = [] +}). go(DbName, Id, Revs, Options) -> Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs, [Id, Revs, Options]), R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")), - Acc0 = {length(Workers), list_to_integer(R), []}, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + State = #state{ + dbname = DbName, + worker_count = length(Workers), + r = list_to_integer(R), + revs = Revs, + latest = lists:member(latest, Options), + replies = case Revs of all -> []; Revs -> [{Rev,[]} || Rev <- Revs] end + }, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of {ok, {ok, Reply}} -> {ok, Reply}; Else -> Else end. -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> - case merge_read_reply(make_key(Reply), Reply, Replies) of - {_, KeyCount} when KeyCount =:= R -> - {stop, Reply}; - {NewReplies, KeyCount} when KeyCount < R -> - if WaitingCount =:= 1 -> - % last message arrived, but still no quorum - repair_read_quorum_failure(NewReplies); - true -> - {ok, {WaitingCount-1, R, NewReplies}} +handle_message({rexi_DOWN, _, _, _}, _Worker, State) -> + skip(State); +handle_message({rexi_EXIT, _}, _Worker, State) -> + skip(State); +handle_message({ok, RawReplies}, _Worker, #state{revs = all} = State) -> + #state{ + dbname = DbName, + reply_count = ReplyCount, + worker_count = WorkerCount, + replies = All0, + r = R + } = State, + All = lists:foldl(fun(Reply,D) -> orddict:update_counter(Reply,1,D) end, + All0, RawReplies), + Reduced = remove_ancestors(All, []), + Complete = (ReplyCount =:= (WorkerCount - 1)), + Repair = case Reduced of All -> false; _ -> + [D || {{ok,D}, _} <- Reduced] + end, + case maybe_reply(DbName, Reduced, Complete, Repair, R) of + noreply -> + {ok, State#state{replies = All, reply_count = ReplyCount+1}}; + {reply, FinalReply} -> + {stop, FinalReply} + end; +handle_message({ok, RawReplies0}, _Worker, State) -> + % we've got an explicit revision list, but if latest=true the workers may + % return a descendant of the requested revision. Take advantage of the + % fact that revisions are returned in order to keep track. + RawReplies = strip_not_found_missing(RawReplies0), + #state{ + dbname = DbName, + reply_count = ReplyCount, + worker_count = WorkerCount, + replies = All0, + r = R + } = State, + All = lists:zipwith(fun({Rev, D}, Reply) -> + if Reply =:= error -> {Rev, D}; true -> + {Rev, orddict:update_counter(Reply, 1, D)} end + end, All0, RawReplies), + Reduced = [remove_ancestors(X, []) || {_, X} <- All], + FinalReplies = [choose_winner(X, R) || X <- Reduced], + Complete = (ReplyCount =:= (WorkerCount - 1)), + case is_repair_needed(All, FinalReplies) of + true -> + Repair = [D || {{ok,D}, _} <- lists:flatten(Reduced)]; + false -> + Repair = false + end, + case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of + noreply -> + {ok, State#state{replies = All, reply_count = ReplyCount+1}}; + {reply, FinalReply} -> + {stop, FinalReply} end. -skip_message({1, _R, Replies}) -> - repair_read_quorum_failure(Replies); -skip_message({WaitingCount, R, Replies}) -> - {ok, {WaitingCount-1, R, Replies}}. +skip(#state{revs=all} = State) -> + handle_message({ok, []}, nil, State); +skip(#state{revs=Revs} = State) -> + handle_message({ok, [error || _Rev <- Revs]}, nil, State). -merge_read_reply(Key, Reply, Replies) -> - case lists:keyfind(Key, 1, Replies) of +maybe_reply(DbName, ReplyDict, IsComplete, RepairDocs, R) -> + case lists:all(fun({_, C}) -> C >= R end, ReplyDict) of + true -> + maybe_execute_read_repair(DbName, RepairDocs), + {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))}; false -> - {[{Key, Reply, 1} | Replies], 1}; - {Key, _, N} -> - {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} + case IsComplete of false -> noreply; true -> + maybe_execute_read_repair(DbName, RepairDocs), + {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))} + end end. -make_key({ok, L}) when is_list(L) -> - make_key(L); -make_key([]) -> - []; -make_key([{ok, #doc{revs= {Pos,[RevId | _]}}} | Rest]) -> - [{ok, {Pos, RevId}} | make_key(Rest)]; -make_key([{{not_found, missing}, Rev} | Rest]) -> - [{not_found, Rev} | make_key(Rest)]. +choose_winner(Options, R) -> + case lists:dropwhile(fun({_Reply, C}) -> C < R end, Options) of + [] -> + case [Elem || {{ok, #doc{}}, _} = Elem <- Options] of + [] -> + hd(Options); + Docs -> + lists:last(lists:sort(Docs)) + end; + [QuorumMet | _] -> + QuorumMet + end. -repair_read_quorum_failure(Replies) -> - case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of +% repair needed if any reply other than the winner has been received for a rev +is_repair_needed([], []) -> + false; +is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) -> + is_repair_needed(Tail1, Tail2); +is_repair_needed([H1|_], [H2|_]) -> + true. + +% this presumes the incoming list is sorted, i.e. shorter revlists come first +remove_ancestors([], Acc) -> + lists:reverse(Acc); +remove_ancestors([{{not_found, _}, Count} = Head | Tail], Acc) -> + % any document is a descendant + case lists:filter(fun({{ok, #doc{}}, _}) -> true; (_) -> false end, Tail) of + [{{ok, #doc{}} = Descendant, _} | _] -> + remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc); [] -> - {stop, {not_found, missing}}; - [Doc|_] -> - % TODO merge docs to find the winner as determined by replication - {stop, {ok, Doc}} + remove_ancestors(Tail, [Head | Acc]) + end; +remove_ancestors([{{ok, #doc{revs = {Pos, Revs}}}, Count} = Head | Tail], Acc) -> + Descendants = lists:dropwhile(fun + ({{ok, #doc{revs = {Pos2, Revs2}}}, _}) -> + case lists:nthtail(Pos2 - Pos, Revs2) of + [] -> + % impossible to tell if Revs2 is a descendant - assume no + true; + History -> + % if Revs2 is a descendant, History is a prefix of Revs + not lists:prefix(History, Revs) + end + end, Tail), + case Descendants of [] -> + remove_ancestors(Tail, [Head | Acc]); + [{Descendant, _} | _] -> + remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc) end. -
\ No newline at end of file +maybe_execute_read_repair(_Db, false) -> + ok; +maybe_execute_read_repair(Db, Docs) -> + spawn(fun() -> + [#doc{id=Id} | _] = Docs, + Ctx = #user_ctx{roles=[<<"_admin">>]}, + Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]), + ?LOG_INFO("read_repair ~s ~s ~p", [Db, Id, Res]) + end). + +% hackery required so that not_found sorts first +strip_not_found_missing([]) -> + []; +strip_not_found_missing([{{not_found, missing}, Rev} | Rest]) -> + [{not_found, Rev} | strip_not_found_missing(Rest)]; +strip_not_found_missing([Else | Rest]) -> + [Else | strip_not_found_missing(Rest)]. + +unstrip_not_found_missing([]) -> + []; +unstrip_not_found_missing([{not_found, Rev} | Rest]) -> + [{{not_found, missing}, Rev} | unstrip_not_found_missing(Rest)]; +unstrip_not_found_missing([Else | Rest]) -> + [Else | unstrip_not_found_missing(Rest)]. + +remove_ancestors_test() -> + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + Bar2 = {not_found, {1,<<"bar">>}}, + ?assertEqual( + [{Bar1,1}, {Foo1,1}], + remove_ancestors([{Bar1,1}, {Foo1,1}], []) + ), + ?assertEqual( + [{Bar1,1}, {Foo2,2}], + remove_ancestors([{Bar1,1}, {Foo1,1}, {Foo2,1}], []) + ), + ?assertEqual( + [{Bar1,2}], + remove_ancestors([{Bar2,1}, {Bar1,1}], []) + ). + +all_revs_test() -> + State0 = #state{worker_count = 3, r = 2, revs = all}, + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Foo1, Bar1]}, nil, State0) + ), + {ok, State1} = handle_message({ok, [Foo1, Bar1]}, nil, State0), + + % the normal case - workers agree + ?assertEqual( + {stop, [Bar1, Foo1]}, + handle_message({ok, [Foo1, Bar1]}, nil, State1) + ), + + % a case where the 2nd worker has a newer Foo - currently we're considering + % Foo to have reached quorum and execute_read_repair() + ?assertEqual( + {stop, [Bar1, Foo2]}, + handle_message({ok, [Foo2, Bar1]}, nil, State1) + ), + + % a case where quorum has not yet been reached for Foo + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Bar1]}, nil, State1) + ), + {ok, State2} = handle_message({ok, [Bar1]}, nil, State1), + + % still no quorum, but all workers have responded. We include Foo1 in the + % response and execute_read_repair() + ?assertEqual( + {stop, [Bar1, Foo1]}, + handle_message({ok, [Bar1]}, nil, State2) + ). + +specific_revs_test() -> + Revs = [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}], + State0 = #state{ + worker_count = 3, + r = 2, + revs = Revs, + latest = false, + replies = [{Rev,[]} || Rev <- Revs] + }, + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + Baz1 = {{not_found, missing}, {1,<<"baz">>}}, + Baz2 = {ok, #doc{revs = {1, [<<"baz">>]}}}, + + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0) + ), + {ok, State1} = handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0), + + % the normal case - workers agree + ?assertEqual( + {stop, [Foo1, Bar1, Baz1]}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1) + ), + + % latest=true, worker responds with Foo2 and we return it + State0L = State0#state{latest = true}, + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L) + ), + {ok, State1L} = handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L), + ?assertEqual( + {stop, [Foo2, Bar1, Baz1]}, + handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State1L) + ), + + % Foo1 is included in the read quorum for Foo2 + ?assertEqual( + {stop, [Foo2, Bar1, Baz1]}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1L) + ), + + % {not_found, missing} is included in the quorum for any found revision + ?assertEqual( + {stop, [Foo2, Bar1, Baz2]}, + handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State1L) + ), + + % a worker failure is skipped + ?assertMatch( + {ok, #state{}}, + handle_message({rexi_EXIT, foo}, nil, State1L) + ), + {ok, State2L} = handle_message({rexi_EXIT, foo}, nil, State1L), + ?assertEqual( + {stop, [Foo2, Bar1, Baz2]}, + handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State2L) + ). |