diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mem3.erl | 66 |
1 files changed, 46 insertions, 20 deletions
diff --git a/src/mem3.erl b/src/mem3.erl index d715e657..7ae7627c 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -188,6 +188,15 @@ handle_call(Msg, _From, State) -> {reply, ignored, State}. +%% gossip +handle_cast({gossip, RemoteState}, LocalState) -> + State = case handle_gossip(none, RemoteState, LocalState) of + {reply, ok, NewState} -> NewState; + {reply, {new_state, NewState}, _} -> NewState; + {noreply, NewState} -> NewState + end, + {noreply, State}; + %% stop handle_cast(stop, State) -> {stop, normal, State}; @@ -202,14 +211,15 @@ handle_cast(Msg, State) -> %% net_kernel:monitor_nodes(true) handle_info({nodedown, Node}, State) -> showroom_log:message(alert, "membership: nodedown ~p", [Node]), - notify(nodedown, [Node]), + notify(nodedown, [Node], State), {noreply, State}; %% @doc handle nodeup messages because we have %% net_kernel:monitor_nodes(true) handle_info({nodeup, Node}, State) -> showroom_log:message(alert, "membership: nodeup ~p", [Node]), - notify(nodeup, [Node]), + notify(nodeup, [Node], State), + gossip_cast(State), {noreply, State}; %% ignored info @@ -294,16 +304,16 @@ handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) -> handle_join(replace, {OldNode, []}, PingNode, State); handle_join(replace, [OldNode | _], PingNode, State) -> handle_join(replace, {OldNode, []}, PingNode, State); -handle_join(replace, {OldNode, NewOpts}, PingNode, _State) -> +handle_join(replace, {OldNode, NewOpts}, PingNode, State) -> OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode), {Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes), NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}), - notify(node_leave, [OldNode]), + notify(node_leave, [OldNode], State), int_join([], OldState#mem{nodes=NewNodes}); % leave -handle_join(leave, [OldNode | _], _PingNode, _State) -> +handle_join(leave, [OldNode | _], _PingNode, State) -> % TODO implement me - notify(node_leave, [OldNode]), + notify(node_leave, [OldNode], State), ok; handle_join(JoinType, _, PingNode, _) -> @@ -315,7 +325,7 @@ handle_join(JoinType, _, PingNode, _) -> int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) -> NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) -> check_pos(Pos, N, Nodes), - notify(node_join, [N]), + notify(node_join, [N], State), [New|AccIn] end, Nodes, ExtNodes), NewNodes1 = lists:sort(NewNodes), @@ -328,7 +338,7 @@ int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) -> install_new_state(#mem{args=Args} = State) -> Test = get_test(Args), save_state_file(Test, State), - gossip(Test, State). + gossip(call, Test, State). get_pingnode_state(PingNode) -> @@ -346,6 +356,8 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, less -> % remote node needs updating {reply, {new_state, LocalState}, LocalState}; + greater when From == none-> + {noreply, install_new_state(RemoteState)}; greater -> % local node needs updating gen_server:reply(From, ok), % reply to sender first @@ -357,7 +369,10 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, "RemoteState : ~p~nLocalState : ~p~n" , [RemoteState, LocalState]), MergedState = merge_states(RemoteState, LocalState), - gen_server:reply(From, {new_state, MergedState}), % reply to sender + if From =/= none -> + % reply to sender + gen_server:reply(From, {new_state, MergedState}) + end, {noreply, install_new_state(MergedState)} end. @@ -384,11 +399,18 @@ merge_nodes(Remote, Local) -> gossip(#mem{args=Args} = NewState) -> Test = get_test(Args), - gossip(Test, NewState). + gossip(call, Test, NewState). --spec gossip(test(), mem_state()) -> mem_state(). -gossip(undefined, #mem{nodes=StateNodes} = State) -> +gossip_cast(#mem{nodes=[]}) -> ok; +gossip_cast(#mem{args=Args} = NewState) -> + Test = get_test(Args), + gossip(cast, Test, NewState). + + +-spec gossip(gossip_fun(), test(), mem_state()) -> mem_state(). +gossip(_, _, #mem{nodes=[]}) -> ok; +gossip(Fun, undefined, #mem{nodes=StateNodes} = State) -> {_, Nodes, _} = lists:unzip3(StateNodes), case next_up_node(Nodes) of no_gossip_targets_available -> @@ -396,21 +418,20 @@ gossip(undefined, #mem{nodes=StateNodes} = State) -> TargetNode -> showroom_log:message(info, "membership: firing gossip from ~p to ~p", [node(), TargetNode]), - case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of + case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of ok -> State; {new_state, NewState} -> NewState; Error -> throw({unknown_gossip_response, Error}) end end; -gossip(_,_) -> +gossip(_,_,_) -> % testing, so don't gossip ok. next_up_node(Nodes) -> - Node = node(), - next_up_node(Node, Nodes, up_nodes()). + next_up_node(node(), Nodes, up_nodes()). next_up_node(Node, Nodes, UpNodes) -> @@ -536,7 +557,12 @@ compare_state_with_rest(#mem{clock=Clock} = _State, States) -> true -> {bad_state_match, node(), BadResults} end. -notify(Type, Nodes) -> - lists:foreach(fun(Node) -> - gen_event:notify(membership_events, {Type, Node}) - end, Nodes). +notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) -> + {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)), + lists:foreach(fun(Node) -> + case lists:member(Node, MemNodes) orelse Type == nodedown of + true -> + gen_event:notify(membership_events, {Type, Node}); + _ -> ok % node not in cluster + end + end, Nodes). |