summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-04-23 15:26:18 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-09 22:56:24 -0400
commitdb3e28aa026a0e2e22356851d7b93fec8247c159 (patch)
treee3354bbfd05640da66c159a65758893958af4bbf
parentf23371c2ec884628e73abd783c3beedfaa25d490 (diff)
gossip handling revamped, BugzID 10068
-rw-r--r--src/mem3.erl97
-rw-r--r--test/mem3_test.erl19
2 files changed, 70 insertions, 46 deletions
diff --git a/src/mem3.erl b/src/mem3.erl
index cb69d27d..007e1926 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -25,6 +25,9 @@
-export([nodes/0, nodes_for_part/1, nodes_for_part/2, all_nodes_parts/1]).
-export([parts_for_node/1]).
+%% for testing more than anything else
+-export([merge_nodes/2]).
+
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -207,19 +210,11 @@ handle_call(Msg, _From, State) ->
handle_cast(stop, State) ->
{stop, normal, State};
+%% gossip
handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) ->
showroom_log:message(info, "membership: received gossip from ~p",
[RemoteNode]),
- {MergeType, MergedState} = merge_state(RemoteState, LocalState),
- case MergeType of
- equal -> {noreply, MergedState};
- merged ->
- showroom_log:message(info, "membership: merged new gossip: ~p",
- [MergedState]),
- new_state(MergedState),
- gossip(MergedState),
- {noreply, MergedState}
- end;
+ {noreply, handle_gossip(RemoteState, LocalState)};
%% ignored cast
handle_cast(Msg, State) ->
@@ -324,6 +319,51 @@ int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State,
new_state(NewState, Pmap, Fullmap, Config).
+handle_gossip(RemoteState=#mem{clock=RemoteClock},
+ LocalState=#mem{clock=LocalClock}) ->
+ case vector_clock:compare(RemoteClock, LocalClock) of
+ equal -> LocalState;
+ less -> LocalState;
+ greater ->
+ % this node needs updating
+ new_state(RemoteState);
+ concurrent ->
+ % ick, so let's resolve and merge states
+ showroom_log:message(info,
+ "~nmembership: Concurrent Clocks~n"
+ "~nRemoteState : ~p~nLocalState : ~p~n"
+ , [RemoteState, LocalState]),
+ MergedState = merge_states(RemoteState, LocalState),
+ new_state(MergedState)
+ end.
+
+
+merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState,
+ #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) ->
+ MergedClock = vector_clock:merge(RemoteClock, LocalClock),
+ MergedNodes = merge_nodes(RemoteNodes, LocalNodes),
+ LocalState#mem{clock=MergedClock, nodes=MergedNodes}.
+
+
+%% this will give one of the lists back, deterministically
+merge_nodes(Remote, Local) ->
+ % get rid of the initial 0 node if it's still there, and sort
+ Remote1 = lists:usort(lists:keydelete(0,1,Remote)),
+ Local1 = lists:usort(lists:keydelete(0,1,Local)),
+ % handle empty lists as well as other cases
+ case {Remote1, Local1} of
+ {[], L} -> L;
+ {R, []} -> R;
+ _ -> erlang:min(Remote1, Local1)
+ end.
+
+
+
+% notify(Type, Nodes) ->
+% lists:foreach(fun(Node) ->
+% gen_event:notify(membership_events, {Type, Node})
+% end, Nodes).
+
gossip(#mem{args=Args} = NewState) ->
Test = proplists:get_value(test, Args),
gossip(Test, NewState).
@@ -341,6 +381,7 @@ gossip(_,_) ->
% testing, so don't gossip
ok.
+
%% @doc find the latest state file on disk
find_latest_state_filename(Config) ->
Dir = Config#config.directory,
@@ -387,7 +428,6 @@ save_state_file(State, Config) ->
TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
[Year, Month, Day, Hour, Minute, Second]),
FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS,
- ?debugFmt("~nFilename: ~s~n", [FN]),
ok = filelib:ensure_dir(FN),
{ok, File} = file:open(FN, [binary, write]),
io:format(File, "~w.~n", [State]),
@@ -475,41 +515,6 @@ int_reset(Test, State) ->
State#mem{node=Node, nodes=Nodes, clock=Clock}.
-merge_state(_RemoteState=#mem{clock=RemoteClock, nodes=RemoteNodes},
- LocalState=#mem{clock=LocalClock, nodes=LocalNodes}) ->
- case vector_clock:equals(RemoteClock, LocalClock) of
- true ->
- {equal, LocalState};
- false ->
- {MergedClock, MergedNodes} =
- merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes),
-
-% % notify of arrivals & departures
-% Arrived = MergedNodes -- LocalNodes,
-% notify(node_join, Arrived),
-% Departed = LocalNodes -- MergedNodes,
-% notify(node_leave, Departed),
-
- {merged, LocalState#mem{clock=MergedClock, nodes=MergedNodes}}
- end.
-
-
-merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) ->
- MergedClock = vector_clock:merge(RemoteClock, LocalClock),
- Merged1 = lists:ukeymerge(1,
- lists:keysort(1, RemoteNodes),
- lists:keysort(1, LocalNodes)),
- Merged = lists:keydelete(0, 1, Merged1),
- % TODO: make sure we don't have dupe keys ?
- {MergedClock, lists:keysort(1, Merged)}.
-
-
-% notify(Type, Nodes) ->
-% lists:foreach(fun(Node) ->
-% gen_event:notify(membership_events, {Type, Node})
-% end, Nodes).
-
-
ping_all_yall(Nodes) ->
lists:map(fun(Node) -> net_adm:ping(Node) end, Nodes).
diff --git a/test/mem3_test.erl b/test/mem3_test.erl
index 8be90ef8..03e55978 100644
--- a/test/mem3_test.erl
+++ b/test/mem3_test.erl
@@ -123,3 +123,22 @@ join_with_wrong_order(_Pid) ->
?assertEqual({error,{position_exists,3,c}}, Res),
%?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
ok.
+
+
+merge_nodes_test() ->
+ A = [{1,a1,[]},{2,a2,[]},{3,a3,[]}],
+ B = [{1,a1,[]},{2,a2,[]},{3,b3,[]}],
+ ?assertEqual(A, mem3:merge_nodes(A,B)),
+ ?assertEqual(mem3:merge_nodes(A,B), mem3:merge_nodes(B,A)),
+ C = [{1,c1,[]},{2,c2,[]},{3,c3,[]}],
+ ?assertEqual(A, mem3:merge_nodes(A,C)),
+ ?assertEqual(A, mem3:merge_nodes(C,A)),
+ ok.
+
+
+merge_nodes_with_init_nodelist_test() ->
+ A = [{1,a1,[]},{2,a2,[]},{3,a3,[]}],
+ B = [{0, b, []}],
+ ?assertEqual(A, mem3:merge_nodes(A,B)),
+ ?assertEqual(mem3:merge_nodes(A,B), mem3:merge_nodes(B,A)),
+ ok.