diff options
-rw-r--r-- | include/common.hrl | 1 | ||||
-rw-r--r-- | src/mem3.erl | 142 | ||||
-rw-r--r-- | test/mem3_test.erl | 33 |
3 files changed, 114 insertions, 62 deletions
diff --git a/include/common.hrl b/include/common.hrl index 4315a54c..59f5b9a1 100644 --- a/include/common.hrl +++ b/include/common.hrl @@ -42,7 +42,6 @@ %% version 3 of membership state -record(mem, {header=3, - node, nodes=[], clock=[], args diff --git a/src/mem3.erl b/src/mem3.erl index c5c558da..25945761 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -20,13 +20,13 @@ %% API -export([start_link/0, start_link/1, stop/0, stop/1, reset/0]). --export([join/2, clock/0, state/0]). +-export([join/3, clock/0, state/0, start_gossip/0]). -export([partitions/0, fullmap/0]). -export([nodes/0, nodes_for_part/1, nodes_for_part/2, all_nodes_parts/1]). -export([parts_for_node/1]). %% for testing more than anything else --export([merge_nodes/2]). +-export([merge_nodes/2, next_up_node/1, next_up_node/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -46,9 +46,11 @@ -type options() :: list(). -type mem_node() :: {join_order(), node(), options()}. -type mem_node_list() :: [mem_node()]. --type arg_options() :: {test, boolean()} | {config, #config{}}. +-type config() :: #config{}. +-type arg_options() :: {test, boolean()} | {config, config()}. -type args() :: [] | [arg_options()]. -type mem_state() :: #mem{}. +-type test() :: undefined | node(). -type epoch() :: float(). -type clock() :: {node(), epoch()}. -type vector_clock() :: [clock()]. @@ -77,9 +79,9 @@ stop(Server) -> gen_server:cast(Server, stop). --spec join(join_type(), mem_node_list()) -> ok. -join(JoinType, Nodes) -> - gen_server:call(?SERVER, {join, JoinType, Nodes}). +-spec join(join_type(), mem_node_list(), node() | nil) -> ok. +join(JoinType, Nodes, PingNode) -> + gen_server:call(?SERVER, {join, JoinType, Nodes, PingNode}). -spec clock() -> vector_clock(). @@ -92,6 +94,11 @@ state() -> gen_server:call(?SERVER, state). +-spec start_gossip() -> ok. +start_gossip() -> + gen_server:call(?SERVER, start_gossip). + + -spec reset() -> ok | not_reset. reset() -> gen_server:call(?SERVER, reset). @@ -156,7 +163,7 @@ all_nodes_parts(true) -> init(Args) -> process_flag(trap_exit,true), Config = get_config(Args), - Test = proplists:get_value(test, Args), + Test = get_test(Args), OldState = read_latest_state_file(Test, Config), showroom_log:message(info, "membership: membership server starting...", []), net_kernel:monitor_nodes(true), @@ -165,9 +172,11 @@ init(Args) -> %% new node(s) joining to this node -handle_call({join, JoinType, ExtNodes}, _From, State) -> +handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) -> + % NewState = handle_join(JoinType, ExtNodes, PingNode, State), + % {reply, ok, NewState}; try - NewState = handle_join(JoinType, ExtNodes, State), + NewState = handle_join(JoinType, ExtNodes, PingNode, State), {reply, ok, NewState} catch _:Error -> showroom_log:message(error, "~p", [Error]), @@ -184,7 +193,7 @@ handle_call(state, _From, State) -> %% reset - but only if we're in test mode handle_call(reset, _From, #mem{args=Args} = State) -> - Test = proplists:get_value(test, Args), + Test = get_test(Args), case Test of undefined -> {reply, not_reset, State}; _ -> {reply, ok, int_reset(Test, State)} @@ -196,11 +205,16 @@ handle_call(nodes, _From, #mem{nodes=Nodes} = State) -> {reply, {ok, NodeList}, State}; %% gossip -handle_call({gossip, #mem{node=RemoteNode} = RemoteState}, From, LocalState) -> +handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) -> showroom_log:message(info, "membership: received gossip from ~p", - [RemoteNode]), + [erlang:node(Pid)]), handle_gossip(From, RemoteState, LocalState); +% start_gossip +handle_call(start_gossip, _From, State) -> + NewState = gossip(State), + {reply, ok, NewState}; + %% ignored call handle_call(Msg, _From, State) -> showroom_log:message(info, "membership: ignored call: ~p", [Msg]), @@ -259,6 +273,10 @@ get_config(Args) -> end. +get_test(Args) -> + proplists:get_value(test, Args). + + % we could be automatically: % 1. rejoining a cluster after some downtime % @@ -277,7 +295,7 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) -> {_, NodeList, _} = lists:unzip3(Nodes), ping_all_yall(NodeList), {RemoteStates, _BadNodes} = get_remote_states(NodeList), - Test = proplists:get_value(test, Args), + Test = get_test(Args), case compare_state_with_rest(OldState, RemoteStates) of match -> showroom_log:message(info, "membership: rejoined successfully", []), @@ -289,32 +307,43 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) -> %% handle join activities, return NewState -handle_join(first, ExtNodes, State) -> +handle_join(first, ExtNodes, nil, State) -> {_,Nodes,_} = lists:unzip3(ExtNodes), ping_all_yall(Nodes), int_join(ExtNodes, State); -handle_join(new, ExtNodes, State) -> - {_,Nodes,_} = lists:unzip3(ExtNodes), - ping_all_yall(Nodes), - int_join(ExtNodes, State); +handle_join(new, ExtNodes, PingNode, #mem{args=Args} = State) -> + NewState = case get_test(Args) of + undefined -> + % ping the PingNode and get its state + pong = net_adm:ping(PingNode), + timer:sleep(1000), % let dist. erl get set up... sigh. + {ok, RemoteState} = rpc:call(PingNode, mem3, state, []), + RemoteState; + _ -> + % testing, so meh + State + end, + % now use this info to join the ring + int_join(ExtNodes, NewState); -handle_join(replace, [_OldNode | _], _State) -> +handle_join(replace, [_OldNode | _], _PingNode, _State) -> % TODO implement me ok; -handle_join(JoinType, _, _) -> - showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]), +handle_join(JoinType, _, PingNode, _) -> + showroom_log:message(info, "membership: unknown join type: ~p " + "for ping node: ~p", [JoinType, PingNode]), {error, {unknown_join_type, JoinType}}. -int_join(ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State) -> +int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) -> NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) -> check_pos(Pos, N, Nodes), [New|AccIn] end, Nodes, ExtNodes), NewNodes1 = lists:sort(NewNodes), - NewClock = vector_clock:increment(Node, Clock), + NewClock = vector_clock:increment(node(), Clock), NewState = State#mem{nodes=NewNodes1, clock=NewClock}, install_new_state(NewState), NewState. @@ -333,7 +362,7 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, greater -> % local node needs updating gen_server:reply(From, ok), % reply to sender first - install_new_state(RemoteState); + {noreply, install_new_state(RemoteState)}; concurrent -> % ick, so let's resolve and merge states showroom_log:message(info, @@ -342,7 +371,7 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, , [RemoteState, LocalState]), MergedState = merge_states(RemoteState, LocalState), gen_server:reply(From, {new_state, MergedState}), % reply to sender - install_new_state(MergedState) + {noreply, install_new_state(MergedState)} end. @@ -367,18 +396,19 @@ merge_nodes(Remote, Local) -> gossip(#mem{args=Args} = NewState) -> - Test = proplists:get_value(test, Args), + Test = get_test(Args), gossip(Test, NewState). -gossip(undefined, #mem{node=Node, nodes=StateNodes} = State) -> +-spec gossip(test(), mem_state()) -> mem_state(). +gossip(undefined, #mem{nodes=StateNodes} = State) -> {_, Nodes, _} = lists:unzip3(StateNodes), - TargetNode = next_up_node(Node, Nodes), + TargetNode = next_up_node(Nodes), showroom_log:message(info, "membership: firing gossip from ~p to ~p", - [Node, TargetNode]), + [node(), TargetNode]), case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of - ok -> ok; - {new_state, _NewState} -> ?debugHere,ok; + ok -> State; + {new_state, NewState} -> NewState; Error -> throw({unknown_gossip_response, Error}) end; @@ -387,11 +417,19 @@ gossip(_,_) -> ok. -next_up_node(Node, Nodes) -> +next_up_node(Nodes) -> + Node = node(), + next_up_node(Node, Nodes, up_nodes()). + + +next_up_node(Node, Nodes, UpNodes) -> {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes), - List = lists:append([B, A, [Node]]), - UpNodes = lists:delete(fun(N) -> lists:member(N, up_nodes()) end, List), - hd(UpNodes). % TODO: empty list? + List = lists:append(B, A), % be sure to eliminate Node + DownNodes = Nodes -- UpNodes, + case List -- DownNodes of + [Target|_] -> Target; + [] -> throw({error, no_gossip_targets_available}) + end. up_nodes() -> @@ -425,8 +463,7 @@ read_latest_state_file(undefined, Config) -> {ok, File} = find_latest_state_filename(Config), case file:consult(File) of {ok, [#mem{}=State]} -> State; - Else -> - ?debugFmt("~nElse: ~p~n", [Else]), + _Else -> throw({error, bad_mem_state_file}) end catch _:Error -> @@ -439,13 +476,15 @@ read_latest_state_file(_, _) -> install_new_state(#mem{args=Args} = State) -> Config = get_config(Args), - save_state_file(State, Config), + Test = get_test(Args), + save_state_file(Test, State, Config), gossip(State). %% @doc save the state file to disk, with current timestamp. %% thx to riak_ring_manager:do_write_ringfile/1 -save_state_file(State, Config) -> +-spec save_state_file(test(), mem_state(), config()) -> ok. +save_state_file(undefined, State, Config) -> Dir = Config#config.directory, {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(), TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B", @@ -454,7 +493,9 @@ save_state_file(State, Config) -> ok = filelib:ensure_dir(FN), {ok, File} = file:open(FN, [binary, write]), io:format(File, "~w.~n", [State]), - file:close(File). + file:close(File); + +save_state_file(_,_,_) -> ok. % don't save if testing check_pos(Pos, Node, Nodes) -> @@ -476,12 +517,8 @@ int_reset(Test) -> int_reset(Test, #mem{}). -int_reset(Test, State) -> - Node = case Test of - undefined -> node(); - _ -> Test - end, - State#mem{node=Node, nodes=[], clock=[]}. +int_reset(_Test, State) -> + State#mem{nodes=[], clock=[]}. ping_all_yall(Nodes) -> @@ -492,18 +529,19 @@ get_remote_states(NodeList) -> NodeList1 = lists:delete(node(), NodeList), {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000), {_Status, States2} = lists:unzip(States1), - {States2, BadNodes}. + {lists:zip(NodeList1,States2), BadNodes}. %% @doc compare state with states based on vector clock %% return match | {bad_state_match, Node, NodesThatDontMatch} -compare_state_with_rest(#mem{node=Node, clock=Clock} = _State, States) -> - Results = lists:map(fun(#mem{node=Node1, clock=Clock1}) -> - {vector_clock:equals(Clock, Clock1), Node1} +compare_state_with_rest(#mem{clock=Clock} = _State, States) -> + Results = lists:map(fun({Node, #mem{clock=Clock1}}) -> + {vector_clock:equals(Clock, Clock1), Node} end, States), BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn; - ({false, N}, AccIn) -> [N | AccIn] end, [], Results), + ({false, N}, AccIn) -> [N | AccIn] + end, [], Results), if length(BadResults) == 0 -> match; - true -> {bad_state_match, Node, BadResults} + true -> {bad_state_match, node(), BadResults} end. diff --git a/test/mem3_test.erl b/test/mem3_test.erl index 069d897b..80699559 100644 --- a/test/mem3_test.erl +++ b/test/mem3_test.erl @@ -66,7 +66,7 @@ clock(_Pid) -> join_first(_Pid) -> mem3:reset(), - mem3:join(first, [{1, a, []}, {2, b, []}]), + mem3:join(first, [{1, a, []}, {2, b, []}], nil), {ok, Nodes} = mem3:nodes(), ?assertEqual(2, length(Nodes)), ok. @@ -78,7 +78,8 @@ join_first_with_hints(_Pid) -> {2, b, []}, {3, c, [{hints, [?HINT_C1, ?HINT_C2]}]}, {4, d, []}, - {5, e, []}]), + {5, e, []}], + nil), {ok, Nodes} = mem3:nodes(), ?assertEqual(5, length(Nodes)), %?debugFmt("~nFullmap: ~p~n", [Fullmap]), @@ -89,22 +90,21 @@ join_first_with_hints(_Pid) -> join_new_node(_Pid) -> mem3:reset(), - mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]), + mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}], nil), {ok, Nodes1} = mem3:nodes(), ?assertEqual(3, length(Nodes1)), - mem3:join(new, [{4, d, []}]), + mem3:join(new, [{4, d, []}], a), {ok, Nodes2} = mem3:nodes(), ?assertEqual(4, length(Nodes2)), - ?debugFmt("~nNodes: ~p~n", [Nodes2]), ok. join_two_new_nodes(_Pid) -> mem3:reset(), - mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]), + mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}], nil), {ok, Nodes1} = mem3:nodes(), ?assertEqual(3, length(Nodes1)), - Res = mem3:join(new, [{4, d, []}, {5, e, []}]), + Res = mem3:join(new, [{4, d, []}, {5, e, []}], b), ?assertEqual(ok, Res), {ok, Nodes2} = mem3:nodes(), ?assertEqual(5, length(Nodes2)), @@ -114,15 +114,18 @@ join_two_new_nodes(_Pid) -> join_with_wrong_order(_Pid) -> mem3:reset(), - mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]), + mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}], nil), % ?assertEqual([], mem3:parts_for_node(d)), %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]), - Res = mem3:join(new, [{3, d, []}]), + Res = mem3:join(new, [{3, d, []}], c), ?assertEqual({error,{position_exists,3,c}}, Res), %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]), ok. +%% +%% tests without running gen_server +%% merge_nodes_test() -> A = [{1,a1,[]},{2,a2,[]},{3,a3,[]}], B = [{1,a1,[]},{2,a2,[]},{3,b3,[]}], @@ -140,3 +143,15 @@ merge_nodes_with_init_nodelist_test() -> ?assertEqual(A, mem3:merge_nodes(A,B)), ?assertEqual(mem3:merge_nodes(A,B), mem3:merge_nodes(B,A)), ok. + + +next_up_nodes_test() -> + Nodes = [a,b,c,d], + UpNodes = [a,b,d], + ?assertEqual(b, mem3:next_up_node(a,Nodes,UpNodes)), + ?assertEqual(d, mem3:next_up_node(b,Nodes,UpNodes)), + ?assertEqual(a, mem3:next_up_node(d,Nodes,UpNodes)), + ?assertThrow({error, no_gossip_targets_available}, + mem3:next_up_node(a,[a,b,c],[])), + ?assertEqual(b, mem3:next_up_node(a,[a,b],[a,b])), + ok. |