diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mem3.erl | 97 |
1 files changed, 51 insertions, 46 deletions
diff --git a/src/mem3.erl b/src/mem3.erl index cb69d27d..007e1926 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -25,6 +25,9 @@ -export([nodes/0, nodes_for_part/1, nodes_for_part/2, all_nodes_parts/1]). -export([parts_for_node/1]). +%% for testing more than anything else +-export([merge_nodes/2]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -207,19 +210,11 @@ handle_call(Msg, _From, State) -> handle_cast(stop, State) -> {stop, normal, State}; +%% gossip handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) -> showroom_log:message(info, "membership: received gossip from ~p", [RemoteNode]), - {MergeType, MergedState} = merge_state(RemoteState, LocalState), - case MergeType of - equal -> {noreply, MergedState}; - merged -> - showroom_log:message(info, "membership: merged new gossip: ~p", - [MergedState]), - new_state(MergedState), - gossip(MergedState), - {noreply, MergedState} - end; + {noreply, handle_gossip(RemoteState, LocalState)}; %% ignored cast handle_cast(Msg, State) -> @@ -324,6 +319,51 @@ int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State, new_state(NewState, Pmap, Fullmap, Config). +handle_gossip(RemoteState=#mem{clock=RemoteClock}, + LocalState=#mem{clock=LocalClock}) -> + case vector_clock:compare(RemoteClock, LocalClock) of + equal -> LocalState; + less -> LocalState; + greater -> + % this node needs updating + new_state(RemoteState); + concurrent -> + % ick, so let's resolve and merge states + showroom_log:message(info, + "~nmembership: Concurrent Clocks~n" + "~nRemoteState : ~p~nLocalState : ~p~n" + , [RemoteState, LocalState]), + MergedState = merge_states(RemoteState, LocalState), + new_state(MergedState) + end. + + +merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState, + #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) -> + MergedClock = vector_clock:merge(RemoteClock, LocalClock), + MergedNodes = merge_nodes(RemoteNodes, LocalNodes), + LocalState#mem{clock=MergedClock, nodes=MergedNodes}. + + +%% this will give one of the lists back, deterministically +merge_nodes(Remote, Local) -> + % get rid of the initial 0 node if it's still there, and sort + Remote1 = lists:usort(lists:keydelete(0,1,Remote)), + Local1 = lists:usort(lists:keydelete(0,1,Local)), + % handle empty lists as well as other cases + case {Remote1, Local1} of + {[], L} -> L; + {R, []} -> R; + _ -> erlang:min(Remote1, Local1) + end. + + + +% notify(Type, Nodes) -> +% lists:foreach(fun(Node) -> +% gen_event:notify(membership_events, {Type, Node}) +% end, Nodes). + gossip(#mem{args=Args} = NewState) -> Test = proplists:get_value(test, Args), gossip(Test, NewState). @@ -341,6 +381,7 @@ gossip(_,_) -> % testing, so don't gossip ok. + %% @doc find the latest state file on disk find_latest_state_filename(Config) -> Dir = Config#config.directory, @@ -387,7 +428,6 @@ save_state_file(State, Config) -> TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B", [Year, Month, Day, Hour, Minute, Second]), FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS, - ?debugFmt("~nFilename: ~s~n", [FN]), ok = filelib:ensure_dir(FN), {ok, File} = file:open(FN, [binary, write]), io:format(File, "~w.~n", [State]), @@ -475,41 +515,6 @@ int_reset(Test, State) -> State#mem{node=Node, nodes=Nodes, clock=Clock}. -merge_state(_RemoteState=#mem{clock=RemoteClock, nodes=RemoteNodes}, - LocalState=#mem{clock=LocalClock, nodes=LocalNodes}) -> - case vector_clock:equals(RemoteClock, LocalClock) of - true -> - {equal, LocalState}; - false -> - {MergedClock, MergedNodes} = - merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes), - -% % notify of arrivals & departures -% Arrived = MergedNodes -- LocalNodes, -% notify(node_join, Arrived), -% Departed = LocalNodes -- MergedNodes, -% notify(node_leave, Departed), - - {merged, LocalState#mem{clock=MergedClock, nodes=MergedNodes}} - end. - - -merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) -> - MergedClock = vector_clock:merge(RemoteClock, LocalClock), - Merged1 = lists:ukeymerge(1, - lists:keysort(1, RemoteNodes), - lists:keysort(1, LocalNodes)), - Merged = lists:keydelete(0, 1, Merged1), - % TODO: make sure we don't have dupe keys ? - {MergedClock, lists:keysort(1, Merged)}. - - -% notify(Type, Nodes) -> -% lists:foreach(fun(Node) -> -% gen_event:notify(membership_events, {Type, Node}) -% end, Nodes). - - ping_all_yall(Nodes) -> lists:map(fun(Node) -> net_adm:ping(Node) end, Nodes). |