summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/membership.hrl1
-rw-r--r--src/mem3.erl66
2 files changed, 47 insertions, 20 deletions
diff --git a/include/membership.hrl b/include/membership.hrl
index 031c12c4..a1e6f822 100644
--- a/include/membership.hrl
+++ b/include/membership.hrl
@@ -34,6 +34,7 @@
-type clock() :: {node(), epoch()}.
-type vector_clock() :: [clock()].
-type ping_node() :: node() | nil.
+-type gossip_fun() :: call | cast.
-type part() :: #shard{}.
-type fullmap() :: [part()].
diff --git a/src/mem3.erl b/src/mem3.erl
index d715e657..7ae7627c 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -188,6 +188,15 @@ handle_call(Msg, _From, State) ->
{reply, ignored, State}.
+%% gossip
+handle_cast({gossip, RemoteState}, LocalState) ->
+ State = case handle_gossip(none, RemoteState, LocalState) of
+ {reply, ok, NewState} -> NewState;
+ {reply, {new_state, NewState}, _} -> NewState;
+ {noreply, NewState} -> NewState
+ end,
+ {noreply, State};
+
%% stop
handle_cast(stop, State) ->
{stop, normal, State};
@@ -202,14 +211,15 @@ handle_cast(Msg, State) ->
%% net_kernel:monitor_nodes(true)
handle_info({nodedown, Node}, State) ->
showroom_log:message(alert, "membership: nodedown ~p", [Node]),
- notify(nodedown, [Node]),
+ notify(nodedown, [Node], State),
{noreply, State};
%% @doc handle nodeup messages because we have
%% net_kernel:monitor_nodes(true)
handle_info({nodeup, Node}, State) ->
showroom_log:message(alert, "membership: nodeup ~p", [Node]),
- notify(nodeup, [Node]),
+ notify(nodeup, [Node], State),
+ gossip_cast(State),
{noreply, State};
%% ignored info
@@ -294,16 +304,16 @@ handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) ->
handle_join(replace, {OldNode, []}, PingNode, State);
handle_join(replace, [OldNode | _], PingNode, State) ->
handle_join(replace, {OldNode, []}, PingNode, State);
-handle_join(replace, {OldNode, NewOpts}, PingNode, _State) ->
+handle_join(replace, {OldNode, NewOpts}, PingNode, State) ->
OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode),
{Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes),
NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}),
- notify(node_leave, [OldNode]),
+ notify(node_leave, [OldNode], State),
int_join([], OldState#mem{nodes=NewNodes});
% leave
-handle_join(leave, [OldNode | _], _PingNode, _State) ->
+handle_join(leave, [OldNode | _], _PingNode, State) ->
% TODO implement me
- notify(node_leave, [OldNode]),
+ notify(node_leave, [OldNode], State),
ok;
handle_join(JoinType, _, PingNode, _) ->
@@ -315,7 +325,7 @@ handle_join(JoinType, _, PingNode, _) ->
int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) ->
NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
check_pos(Pos, N, Nodes),
- notify(node_join, [N]),
+ notify(node_join, [N], State),
[New|AccIn]
end, Nodes, ExtNodes),
NewNodes1 = lists:sort(NewNodes),
@@ -328,7 +338,7 @@ int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) ->
install_new_state(#mem{args=Args} = State) ->
Test = get_test(Args),
save_state_file(Test, State),
- gossip(Test, State).
+ gossip(call, Test, State).
get_pingnode_state(PingNode) ->
@@ -346,6 +356,8 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
less ->
% remote node needs updating
{reply, {new_state, LocalState}, LocalState};
+ greater when From == none->
+ {noreply, install_new_state(RemoteState)};
greater ->
% local node needs updating
gen_server:reply(From, ok), % reply to sender first
@@ -357,7 +369,10 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
"RemoteState : ~p~nLocalState : ~p~n"
, [RemoteState, LocalState]),
MergedState = merge_states(RemoteState, LocalState),
- gen_server:reply(From, {new_state, MergedState}), % reply to sender
+ if From =/= none ->
+ % reply to sender
+ gen_server:reply(From, {new_state, MergedState})
+ end,
{noreply, install_new_state(MergedState)}
end.
@@ -384,11 +399,18 @@ merge_nodes(Remote, Local) ->
gossip(#mem{args=Args} = NewState) ->
Test = get_test(Args),
- gossip(Test, NewState).
+ gossip(call, Test, NewState).
--spec gossip(test(), mem_state()) -> mem_state().
-gossip(undefined, #mem{nodes=StateNodes} = State) ->
+gossip_cast(#mem{nodes=[]}) -> ok;
+gossip_cast(#mem{args=Args} = NewState) ->
+ Test = get_test(Args),
+ gossip(cast, Test, NewState).
+
+
+-spec gossip(gossip_fun(), test(), mem_state()) -> mem_state().
+gossip(_, _, #mem{nodes=[]}) -> ok;
+gossip(Fun, undefined, #mem{nodes=StateNodes} = State) ->
{_, Nodes, _} = lists:unzip3(StateNodes),
case next_up_node(Nodes) of
no_gossip_targets_available ->
@@ -396,21 +418,20 @@ gossip(undefined, #mem{nodes=StateNodes} = State) ->
TargetNode ->
showroom_log:message(info, "membership: firing gossip from ~p to ~p",
[node(), TargetNode]),
- case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of
+ case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of
ok -> State;
{new_state, NewState} -> NewState;
Error -> throw({unknown_gossip_response, Error})
end
end;
-gossip(_,_) ->
+gossip(_,_,_) ->
% testing, so don't gossip
ok.
next_up_node(Nodes) ->
- Node = node(),
- next_up_node(Node, Nodes, up_nodes()).
+ next_up_node(node(), Nodes, up_nodes()).
next_up_node(Node, Nodes, UpNodes) ->
@@ -536,7 +557,12 @@ compare_state_with_rest(#mem{clock=Clock} = _State, States) ->
true -> {bad_state_match, node(), BadResults}
end.
-notify(Type, Nodes) ->
- lists:foreach(fun(Node) ->
- gen_event:notify(membership_events, {Type, Node})
- end, Nodes).
+notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) ->
+ {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)),
+ lists:foreach(fun(Node) ->
+ case lists:member(Node, MemNodes) orelse Type == nodedown of
+ true ->
+ gen_event:notify(membership_events, {Type, Node});
+ _ -> ok % node not in cluster
+ end
+ end, Nodes).