diff options
-rw-r--r-- | apps/fabric/src/fabric_doc_open_revs.erl | 318 |
1 files changed, 277 insertions, 41 deletions
diff --git a/apps/fabric/src/fabric_doc_open_revs.erl b/apps/fabric/src/fabric_doc_open_revs.erl index 8dbd1aba..722d23a5 100644 --- a/apps/fabric/src/fabric_doc_open_revs.erl +++ b/apps/fabric/src/fabric_doc_open_revs.erl @@ -1,5 +1,5 @@ % Copyright 2010 Cloudant -% +% % Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at @@ -19,65 +19,301 @@ -include("fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-record(state, { + dbname, + worker_count, + reply_count = 0, + r, + revs, + latest, + replies = [] +}). go(DbName, Id, Revs, Options) -> Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), open_revs, [Id, Revs, Options]), R = couch_util:get_value(r, Options, couch_config:get("cluster","r","2")), - Acc0 = {length(Workers), list_to_integer(R), []}, - case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of + State = #state{ + dbname = DbName, + worker_count = length(Workers), + r = list_to_integer(R), + revs = Revs, + latest = lists:member(latest, Options), + replies = case Revs of all -> []; Revs -> [{Rev,[]} || Rev <- Revs] end + }, + case fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of {ok, {ok, Reply}} -> {ok, Reply}; Else -> Else end. -handle_message({rexi_DOWN, _, _, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message({rexi_EXIT, _}, _Worker, Acc0) -> - skip_message(Acc0); -handle_message(Reply, _Worker, {WaitingCount, R, Replies}) -> - case merge_read_reply(make_key(Reply), Reply, Replies) of - {_, KeyCount} when KeyCount =:= R -> - {stop, Reply}; - {NewReplies, KeyCount} when KeyCount < R -> - if WaitingCount =:= 1 -> - % last message arrived, but still no quorum - repair_read_quorum_failure(NewReplies); - true -> - {ok, {WaitingCount-1, R, NewReplies}} +handle_message({rexi_DOWN, _, _, _}, _Worker, State) -> + skip(State); +handle_message({rexi_EXIT, _}, _Worker, State) -> + skip(State); +handle_message({ok, RawReplies}, _Worker, #state{revs = all} = State) -> + #state{ + dbname = DbName, + reply_count = ReplyCount, + worker_count = WorkerCount, + replies = All0, + r = R + } = State, + All = lists:foldl(fun(Reply,D) -> orddict:update_counter(Reply,1,D) end, + All0, RawReplies), + Reduced = remove_ancestors(All, []), + Complete = (ReplyCount =:= (WorkerCount - 1)), + Repair = case Reduced of All -> false; _ -> + [D || {{ok,D}, _} <- Reduced] + end, + case maybe_reply(DbName, Reduced, Complete, Repair, R) of + noreply -> + {ok, State#state{replies = All, reply_count = ReplyCount+1}}; + {reply, FinalReply} -> + {stop, FinalReply} + end; +handle_message({ok, RawReplies0}, _Worker, State) -> + % we've got an explicit revision list, but if latest=true the workers may + % return a descendant of the requested revision. Take advantage of the + % fact that revisions are returned in order to keep track. + RawReplies = strip_not_found_missing(RawReplies0), + #state{ + dbname = DbName, + reply_count = ReplyCount, + worker_count = WorkerCount, + replies = All0, + r = R + } = State, + All = lists:zipwith(fun({Rev, D}, Reply) -> + if Reply =:= error -> {Rev, D}; true -> + {Rev, orddict:update_counter(Reply, 1, D)} end + end, All0, RawReplies), + Reduced = [remove_ancestors(X, []) || {_, X} <- All], + FinalReplies = [choose_winner(X, R) || X <- Reduced], + Complete = (ReplyCount =:= (WorkerCount - 1)), + case is_repair_needed(All, FinalReplies) of + true -> + Repair = [D || {{ok,D}, _} <- lists:flatten(Reduced)]; + false -> + Repair = false + end, + case maybe_reply(DbName, FinalReplies, Complete, Repair, R) of + noreply -> + {ok, State#state{replies = All, reply_count = ReplyCount+1}}; + {reply, FinalReply} -> + {stop, FinalReply} end. -skip_message({1, _R, Replies}) -> - repair_read_quorum_failure(Replies); -skip_message({WaitingCount, R, Replies}) -> - {ok, {WaitingCount-1, R, Replies}}. +skip(#state{revs=all} = State) -> + handle_message({ok, []}, nil, State); +skip(#state{revs=Revs} = State) -> + handle_message({ok, [error || _Rev <- Revs]}, nil, State). -merge_read_reply(Key, Reply, Replies) -> - case lists:keyfind(Key, 1, Replies) of +maybe_reply(DbName, ReplyDict, IsComplete, RepairDocs, R) -> + case lists:all(fun({_, C}) -> C >= R end, ReplyDict) of + true -> + maybe_execute_read_repair(DbName, RepairDocs), + {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))}; false -> - {[{Key, Reply, 1} | Replies], 1}; - {Key, _, N} -> - {lists:keyreplace(Key, 1, Replies, {Key, Reply, N+1}), N+1} + case IsComplete of false -> noreply; true -> + maybe_execute_read_repair(DbName, RepairDocs), + {reply, unstrip_not_found_missing(orddict:fetch_keys(ReplyDict))} + end end. -make_key({ok, L}) when is_list(L) -> - make_key(L); -make_key([]) -> - []; -make_key([{ok, #doc{revs= {Pos,[RevId | _]}}} | Rest]) -> - [{ok, {Pos, RevId}} | make_key(Rest)]; -make_key([{{not_found, missing}, Rev} | Rest]) -> - [{not_found, Rev} | make_key(Rest)]. +choose_winner(Options, R) -> + case lists:dropwhile(fun({_Reply, C}) -> C < R end, Options) of + [] -> + case [Elem || {{ok, #doc{}}, _} = Elem <- Options] of + [] -> + hd(Options); + Docs -> + lists:last(lists:sort(Docs)) + end; + [QuorumMet | _] -> + QuorumMet + end. -repair_read_quorum_failure(Replies) -> - case [Doc || {_Key, {ok, Doc}, _Count} <- Replies] of +% repair needed if any reply other than the winner has been received for a rev +is_repair_needed([], []) -> + false; +is_repair_needed([{_Rev, [Reply]} | Tail1], [Reply | Tail2]) -> + is_repair_needed(Tail1, Tail2); +is_repair_needed([H1|_], [H2|_]) -> + true. + +% this presumes the incoming list is sorted, i.e. shorter revlists come first +remove_ancestors([], Acc) -> + lists:reverse(Acc); +remove_ancestors([{{not_found, _}, Count} = Head | Tail], Acc) -> + % any document is a descendant + case lists:filter(fun({{ok, #doc{}}, _}) -> true; (_) -> false end, Tail) of + [{{ok, #doc{}} = Descendant, _} | _] -> + remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc); [] -> - {stop, {not_found, missing}}; - [Doc|_] -> - % TODO merge docs to find the winner as determined by replication - {stop, {ok, Doc}} + remove_ancestors(Tail, [Head | Acc]) + end; +remove_ancestors([{{ok, #doc{revs = {Pos, Revs}}}, Count} = Head | Tail], Acc) -> + Descendants = lists:dropwhile(fun + ({{ok, #doc{revs = {Pos2, Revs2}}}, _}) -> + case lists:nthtail(Pos2 - Pos, Revs2) of + [] -> + % impossible to tell if Revs2 is a descendant - assume no + true; + History -> + % if Revs2 is a descendant, History is a prefix of Revs + not lists:prefix(History, Revs) + end + end, Tail), + case Descendants of [] -> + remove_ancestors(Tail, [Head | Acc]); + [{Descendant, _} | _] -> + remove_ancestors(orddict:update_counter(Descendant, Count, Tail), Acc) end. -
\ No newline at end of file +maybe_execute_read_repair(_Db, false) -> + ok; +maybe_execute_read_repair(Db, Docs) -> + spawn(fun() -> + [#doc{id=Id} | _] = Docs, + Ctx = #user_ctx{roles=[<<"_admin">>]}, + Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]), + ?LOG_INFO("read_repair ~s ~s ~p", [Db, Id, Res]) + end). + +% hackery required so that not_found sorts first +strip_not_found_missing([]) -> + []; +strip_not_found_missing([{{not_found, missing}, Rev} | Rest]) -> + [{not_found, Rev} | strip_not_found_missing(Rest)]; +strip_not_found_missing([Else | Rest]) -> + [Else | strip_not_found_missing(Rest)]. + +unstrip_not_found_missing([]) -> + []; +unstrip_not_found_missing([{not_found, Rev} | Rest]) -> + [{{not_found, missing}, Rev} | unstrip_not_found_missing(Rest)]; +unstrip_not_found_missing([Else | Rest]) -> + [Else | unstrip_not_found_missing(Rest)]. + +remove_ancestors_test() -> + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + Bar2 = {not_found, {1,<<"bar">>}}, + ?assertEqual( + [{Bar1,1}, {Foo1,1}], + remove_ancestors([{Bar1,1}, {Foo1,1}], []) + ), + ?assertEqual( + [{Bar1,1}, {Foo2,2}], + remove_ancestors([{Bar1,1}, {Foo1,1}, {Foo2,1}], []) + ), + ?assertEqual( + [{Bar1,2}], + remove_ancestors([{Bar2,1}, {Bar1,1}], []) + ). + +all_revs_test() -> + State0 = #state{worker_count = 3, r = 2, revs = all}, + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Foo1, Bar1]}, nil, State0) + ), + {ok, State1} = handle_message({ok, [Foo1, Bar1]}, nil, State0), + + % the normal case - workers agree + ?assertEqual( + {stop, [Bar1, Foo1]}, + handle_message({ok, [Foo1, Bar1]}, nil, State1) + ), + + % a case where the 2nd worker has a newer Foo - currently we're considering + % Foo to have reached quorum and execute_read_repair() + ?assertEqual( + {stop, [Bar1, Foo2]}, + handle_message({ok, [Foo2, Bar1]}, nil, State1) + ), + + % a case where quorum has not yet been reached for Foo + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Bar1]}, nil, State1) + ), + {ok, State2} = handle_message({ok, [Bar1]}, nil, State1), + + % still no quorum, but all workers have responded. We include Foo1 in the + % response and execute_read_repair() + ?assertEqual( + {stop, [Bar1, Foo1]}, + handle_message({ok, [Bar1]}, nil, State2) + ). + +specific_revs_test() -> + Revs = [{1,<<"foo">>}, {1,<<"bar">>}, {1,<<"baz">>}], + State0 = #state{ + worker_count = 3, + r = 2, + revs = Revs, + latest = false, + replies = [{Rev,[]} || Rev <- Revs] + }, + Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}, + Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}}, + Baz1 = {{not_found, missing}, {1,<<"baz">>}}, + Baz2 = {ok, #doc{revs = {1, [<<"baz">>]}}}, + + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0) + ), + {ok, State1} = handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State0), + + % the normal case - workers agree + ?assertEqual( + {stop, [Foo1, Bar1, Baz1]}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1) + ), + + % latest=true, worker responds with Foo2 and we return it + State0L = State0#state{latest = true}, + ?assertMatch( + {ok, #state{}}, + handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L) + ), + {ok, State1L} = handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State0L), + ?assertEqual( + {stop, [Foo2, Bar1, Baz1]}, + handle_message({ok, [Foo2, Bar1, Baz1]}, nil, State1L) + ), + + % Foo1 is included in the read quorum for Foo2 + ?assertEqual( + {stop, [Foo2, Bar1, Baz1]}, + handle_message({ok, [Foo1, Bar1, Baz1]}, nil, State1L) + ), + + % {not_found, missing} is included in the quorum for any found revision + ?assertEqual( + {stop, [Foo2, Bar1, Baz2]}, + handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State1L) + ), + + % a worker failure is skipped + ?assertMatch( + {ok, #state{}}, + handle_message({rexi_EXIT, foo}, nil, State1L) + ), + {ok, State2L} = handle_message({rexi_EXIT, foo}, nil, State1L), + ?assertEqual( + {stop, [Foo2, Bar1, Baz2]}, + handle_message({ok, [Foo2, Bar1, Baz2]}, nil, State2L) + ). |