diff options
author | Brad Anderson <brad@cloudant.com> | 2010-04-23 15:26:18 -0400 |
---|---|---|
committer | Brad Anderson <brad@cloudant.com> | 2010-05-09 22:56:24 -0400 |
commit | db3e28aa026a0e2e22356851d7b93fec8247c159 (patch) | |
tree | e3354bbfd05640da66c159a65758893958af4bbf | |
parent | f23371c2ec884628e73abd783c3beedfaa25d490 (diff) |
gossip handling revamped, BugzID 10068
-rw-r--r-- | src/mem3.erl | 97 | ||||
-rw-r--r-- | test/mem3_test.erl | 19 |
2 files changed, 70 insertions, 46 deletions
diff --git a/src/mem3.erl b/src/mem3.erl index cb69d27d..007e1926 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -25,6 +25,9 @@ -export([nodes/0, nodes_for_part/1, nodes_for_part/2, all_nodes_parts/1]). -export([parts_for_node/1]). +%% for testing more than anything else +-export([merge_nodes/2]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -207,19 +210,11 @@ handle_call(Msg, _From, State) -> handle_cast(stop, State) -> {stop, normal, State}; +%% gossip handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) -> showroom_log:message(info, "membership: received gossip from ~p", [RemoteNode]), - {MergeType, MergedState} = merge_state(RemoteState, LocalState), - case MergeType of - equal -> {noreply, MergedState}; - merged -> - showroom_log:message(info, "membership: merged new gossip: ~p", - [MergedState]), - new_state(MergedState), - gossip(MergedState), - {noreply, MergedState} - end; + {noreply, handle_gossip(RemoteState, LocalState)}; %% ignored cast handle_cast(Msg, State) -> @@ -324,6 +319,51 @@ int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State, new_state(NewState, Pmap, Fullmap, Config). +handle_gossip(RemoteState=#mem{clock=RemoteClock}, + LocalState=#mem{clock=LocalClock}) -> + case vector_clock:compare(RemoteClock, LocalClock) of + equal -> LocalState; + less -> LocalState; + greater -> + % this node needs updating + new_state(RemoteState); + concurrent -> + % ick, so let's resolve and merge states + showroom_log:message(info, + "~nmembership: Concurrent Clocks~n" + "~nRemoteState : ~p~nLocalState : ~p~n" + , [RemoteState, LocalState]), + MergedState = merge_states(RemoteState, LocalState), + new_state(MergedState) + end. + + +merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState, + #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) -> + MergedClock = vector_clock:merge(RemoteClock, LocalClock), + MergedNodes = merge_nodes(RemoteNodes, LocalNodes), + LocalState#mem{clock=MergedClock, nodes=MergedNodes}. + + +%% this will give one of the lists back, deterministically +merge_nodes(Remote, Local) -> + % get rid of the initial 0 node if it's still there, and sort + Remote1 = lists:usort(lists:keydelete(0,1,Remote)), + Local1 = lists:usort(lists:keydelete(0,1,Local)), + % handle empty lists as well as other cases + case {Remote1, Local1} of + {[], L} -> L; + {R, []} -> R; + _ -> erlang:min(Remote1, Local1) + end. + + + +% notify(Type, Nodes) -> +% lists:foreach(fun(Node) -> +% gen_event:notify(membership_events, {Type, Node}) +% end, Nodes). + gossip(#mem{args=Args} = NewState) -> Test = proplists:get_value(test, Args), gossip(Test, NewState). @@ -341,6 +381,7 @@ gossip(_,_) -> % testing, so don't gossip ok. + %% @doc find the latest state file on disk find_latest_state_filename(Config) -> Dir = Config#config.directory, @@ -387,7 +428,6 @@ save_state_file(State, Config) -> TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B", [Year, Month, Day, Hour, Minute, Second]), FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS, - ?debugFmt("~nFilename: ~s~n", [FN]), ok = filelib:ensure_dir(FN), {ok, File} = file:open(FN, [binary, write]), io:format(File, "~w.~n", [State]), @@ -475,41 +515,6 @@ int_reset(Test, State) -> State#mem{node=Node, nodes=Nodes, clock=Clock}. -merge_state(_RemoteState=#mem{clock=RemoteClock, nodes=RemoteNodes}, - LocalState=#mem{clock=LocalClock, nodes=LocalNodes}) -> - case vector_clock:equals(RemoteClock, LocalClock) of - true -> - {equal, LocalState}; - false -> - {MergedClock, MergedNodes} = - merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes), - -% % notify of arrivals & departures -% Arrived = MergedNodes -- LocalNodes, -% notify(node_join, Arrived), -% Departed = LocalNodes -- MergedNodes, -% notify(node_leave, Departed), - - {merged, LocalState#mem{clock=MergedClock, nodes=MergedNodes}} - end. - - -merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) -> - MergedClock = vector_clock:merge(RemoteClock, LocalClock), - Merged1 = lists:ukeymerge(1, - lists:keysort(1, RemoteNodes), - lists:keysort(1, LocalNodes)), - Merged = lists:keydelete(0, 1, Merged1), - % TODO: make sure we don't have dupe keys ? - {MergedClock, lists:keysort(1, Merged)}. - - -% notify(Type, Nodes) -> -% lists:foreach(fun(Node) -> -% gen_event:notify(membership_events, {Type, Node}) -% end, Nodes). - - ping_all_yall(Nodes) -> lists:map(fun(Node) -> net_adm:ping(Node) end, Nodes). diff --git a/test/mem3_test.erl b/test/mem3_test.erl index 8be90ef8..03e55978 100644 --- a/test/mem3_test.erl +++ b/test/mem3_test.erl @@ -123,3 +123,22 @@ join_with_wrong_order(_Pid) -> ?assertEqual({error,{position_exists,3,c}}, Res), %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]), ok. + + +merge_nodes_test() -> + A = [{1,a1,[]},{2,a2,[]},{3,a3,[]}], + B = [{1,a1,[]},{2,a2,[]},{3,b3,[]}], + ?assertEqual(A, mem3:merge_nodes(A,B)), + ?assertEqual(mem3:merge_nodes(A,B), mem3:merge_nodes(B,A)), + C = [{1,c1,[]},{2,c2,[]},{3,c3,[]}], + ?assertEqual(A, mem3:merge_nodes(A,C)), + ?assertEqual(A, mem3:merge_nodes(C,A)), + ok. + + +merge_nodes_with_init_nodelist_test() -> + A = [{1,a1,[]},{2,a2,[]},{3,a3,[]}], + B = [{0, b, []}], + ?assertEqual(A, mem3:merge_nodes(A,B)), + ?assertEqual(mem3:merge_nodes(A,B), mem3:merge_nodes(B,A)), + ok. |