summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-04-28 10:41:20 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-09 22:56:24 -0400
commit454b9aff017c4bbda9fa01cf9875c44f04644210 (patch)
tree442c5662a2cd4e3714ce2593ff2eb70ae650de40
parenteb593c0557710c29f8c476f43d72fb54172b8e4e (diff)
dang, large commit.
* node removed from #mem{} * start_gossip api call added * some dialyzer specs * 'new' join accepts PingNode, calls into cluster itself * get_test convenience method * don't save state when testing
-rw-r--r--include/common.hrl1
-rw-r--r--src/mem3.erl142
-rw-r--r--test/mem3_test.erl33
3 files changed, 114 insertions, 62 deletions
diff --git a/include/common.hrl b/include/common.hrl
index 4315a54c..59f5b9a1 100644
--- a/include/common.hrl
+++ b/include/common.hrl
@@ -42,7 +42,6 @@
%% version 3 of membership state
-record(mem, {header=3,
- node,
nodes=[],
clock=[],
args
diff --git a/src/mem3.erl b/src/mem3.erl
index c5c558da..25945761 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -20,13 +20,13 @@
%% API
-export([start_link/0, start_link/1, stop/0, stop/1, reset/0]).
--export([join/2, clock/0, state/0]).
+-export([join/3, clock/0, state/0, start_gossip/0]).
-export([partitions/0, fullmap/0]).
-export([nodes/0, nodes_for_part/1, nodes_for_part/2, all_nodes_parts/1]).
-export([parts_for_node/1]).
%% for testing more than anything else
--export([merge_nodes/2]).
+-export([merge_nodes/2, next_up_node/1, next_up_node/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -46,9 +46,11 @@
-type options() :: list().
-type mem_node() :: {join_order(), node(), options()}.
-type mem_node_list() :: [mem_node()].
--type arg_options() :: {test, boolean()} | {config, #config{}}.
+-type config() :: #config{}.
+-type arg_options() :: {test, boolean()} | {config, config()}.
-type args() :: [] | [arg_options()].
-type mem_state() :: #mem{}.
+-type test() :: undefined | node().
-type epoch() :: float().
-type clock() :: {node(), epoch()}.
-type vector_clock() :: [clock()].
@@ -77,9 +79,9 @@ stop(Server) ->
gen_server:cast(Server, stop).
--spec join(join_type(), mem_node_list()) -> ok.
-join(JoinType, Nodes) ->
- gen_server:call(?SERVER, {join, JoinType, Nodes}).
+-spec join(join_type(), mem_node_list(), node() | nil) -> ok.
+join(JoinType, Nodes, PingNode) ->
+ gen_server:call(?SERVER, {join, JoinType, Nodes, PingNode}).
-spec clock() -> vector_clock().
@@ -92,6 +94,11 @@ state() ->
gen_server:call(?SERVER, state).
+-spec start_gossip() -> ok.
+start_gossip() ->
+ gen_server:call(?SERVER, start_gossip).
+
+
-spec reset() -> ok | not_reset.
reset() ->
gen_server:call(?SERVER, reset).
@@ -156,7 +163,7 @@ all_nodes_parts(true) ->
init(Args) ->
process_flag(trap_exit,true),
Config = get_config(Args),
- Test = proplists:get_value(test, Args),
+ Test = get_test(Args),
OldState = read_latest_state_file(Test, Config),
showroom_log:message(info, "membership: membership server starting...", []),
net_kernel:monitor_nodes(true),
@@ -165,9 +172,11 @@ init(Args) ->
%% new node(s) joining to this node
-handle_call({join, JoinType, ExtNodes}, _From, State) ->
+handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) ->
+ % NewState = handle_join(JoinType, ExtNodes, PingNode, State),
+ % {reply, ok, NewState};
try
- NewState = handle_join(JoinType, ExtNodes, State),
+ NewState = handle_join(JoinType, ExtNodes, PingNode, State),
{reply, ok, NewState}
catch _:Error ->
showroom_log:message(error, "~p", [Error]),
@@ -184,7 +193,7 @@ handle_call(state, _From, State) ->
%% reset - but only if we're in test mode
handle_call(reset, _From, #mem{args=Args} = State) ->
- Test = proplists:get_value(test, Args),
+ Test = get_test(Args),
case Test of
undefined -> {reply, not_reset, State};
_ -> {reply, ok, int_reset(Test, State)}
@@ -196,11 +205,16 @@ handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
{reply, {ok, NodeList}, State};
%% gossip
-handle_call({gossip, #mem{node=RemoteNode} = RemoteState}, From, LocalState) ->
+handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) ->
showroom_log:message(info, "membership: received gossip from ~p",
- [RemoteNode]),
+ [erlang:node(Pid)]),
handle_gossip(From, RemoteState, LocalState);
+% start_gossip
+handle_call(start_gossip, _From, State) ->
+ NewState = gossip(State),
+ {reply, ok, NewState};
+
%% ignored call
handle_call(Msg, _From, State) ->
showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
@@ -259,6 +273,10 @@ get_config(Args) ->
end.
+get_test(Args) ->
+ proplists:get_value(test, Args).
+
+
% we could be automatically:
% 1. rejoining a cluster after some downtime
%
@@ -277,7 +295,7 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
{_, NodeList, _} = lists:unzip3(Nodes),
ping_all_yall(NodeList),
{RemoteStates, _BadNodes} = get_remote_states(NodeList),
- Test = proplists:get_value(test, Args),
+ Test = get_test(Args),
case compare_state_with_rest(OldState, RemoteStates) of
match ->
showroom_log:message(info, "membership: rejoined successfully", []),
@@ -289,32 +307,43 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
%% handle join activities, return NewState
-handle_join(first, ExtNodes, State) ->
+handle_join(first, ExtNodes, nil, State) ->
{_,Nodes,_} = lists:unzip3(ExtNodes),
ping_all_yall(Nodes),
int_join(ExtNodes, State);
-handle_join(new, ExtNodes, State) ->
- {_,Nodes,_} = lists:unzip3(ExtNodes),
- ping_all_yall(Nodes),
- int_join(ExtNodes, State);
+handle_join(new, ExtNodes, PingNode, #mem{args=Args} = State) ->
+ NewState = case get_test(Args) of
+ undefined ->
+ % ping the PingNode and get its state
+ pong = net_adm:ping(PingNode),
+ timer:sleep(1000), % let dist. erl get set up... sigh.
+ {ok, RemoteState} = rpc:call(PingNode, mem3, state, []),
+ RemoteState;
+ _ ->
+ % testing, so meh
+ State
+ end,
+ % now use this info to join the ring
+ int_join(ExtNodes, NewState);
-handle_join(replace, [_OldNode | _], _State) ->
+handle_join(replace, [_OldNode | _], _PingNode, _State) ->
% TODO implement me
ok;
-handle_join(JoinType, _, _) ->
- showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]),
+handle_join(JoinType, _, PingNode, _) ->
+ showroom_log:message(info, "membership: unknown join type: ~p "
+ "for ping node: ~p", [JoinType, PingNode]),
{error, {unknown_join_type, JoinType}}.
-int_join(ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State) ->
+int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) ->
NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
check_pos(Pos, N, Nodes),
[New|AccIn]
end, Nodes, ExtNodes),
NewNodes1 = lists:sort(NewNodes),
- NewClock = vector_clock:increment(Node, Clock),
+ NewClock = vector_clock:increment(node(), Clock),
NewState = State#mem{nodes=NewNodes1, clock=NewClock},
install_new_state(NewState),
NewState.
@@ -333,7 +362,7 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
greater ->
% local node needs updating
gen_server:reply(From, ok), % reply to sender first
- install_new_state(RemoteState);
+ {noreply, install_new_state(RemoteState)};
concurrent ->
% ick, so let's resolve and merge states
showroom_log:message(info,
@@ -342,7 +371,7 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
, [RemoteState, LocalState]),
MergedState = merge_states(RemoteState, LocalState),
gen_server:reply(From, {new_state, MergedState}), % reply to sender
- install_new_state(MergedState)
+ {noreply, install_new_state(MergedState)}
end.
@@ -367,18 +396,19 @@ merge_nodes(Remote, Local) ->
gossip(#mem{args=Args} = NewState) ->
- Test = proplists:get_value(test, Args),
+ Test = get_test(Args),
gossip(Test, NewState).
-gossip(undefined, #mem{node=Node, nodes=StateNodes} = State) ->
+-spec gossip(test(), mem_state()) -> mem_state().
+gossip(undefined, #mem{nodes=StateNodes} = State) ->
{_, Nodes, _} = lists:unzip3(StateNodes),
- TargetNode = next_up_node(Node, Nodes),
+ TargetNode = next_up_node(Nodes),
showroom_log:message(info, "membership: firing gossip from ~p to ~p",
- [Node, TargetNode]),
+ [node(), TargetNode]),
case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of
- ok -> ok;
- {new_state, _NewState} -> ?debugHere,ok;
+ ok -> State;
+ {new_state, NewState} -> NewState;
Error -> throw({unknown_gossip_response, Error})
end;
@@ -387,11 +417,19 @@ gossip(_,_) ->
ok.
-next_up_node(Node, Nodes) ->
+next_up_node(Nodes) ->
+ Node = node(),
+ next_up_node(Node, Nodes, up_nodes()).
+
+
+next_up_node(Node, Nodes, UpNodes) ->
{A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
- List = lists:append([B, A, [Node]]),
- UpNodes = lists:delete(fun(N) -> lists:member(N, up_nodes()) end, List),
- hd(UpNodes). % TODO: empty list?
+ List = lists:append(B, A), % be sure to eliminate Node
+ DownNodes = Nodes -- UpNodes,
+ case List -- DownNodes of
+ [Target|_] -> Target;
+ [] -> throw({error, no_gossip_targets_available})
+ end.
up_nodes() ->
@@ -425,8 +463,7 @@ read_latest_state_file(undefined, Config) ->
{ok, File} = find_latest_state_filename(Config),
case file:consult(File) of
{ok, [#mem{}=State]} -> State;
- Else ->
- ?debugFmt("~nElse: ~p~n", [Else]),
+ _Else ->
throw({error, bad_mem_state_file})
end
catch _:Error ->
@@ -439,13 +476,15 @@ read_latest_state_file(_, _) ->
install_new_state(#mem{args=Args} = State) ->
Config = get_config(Args),
- save_state_file(State, Config),
+ Test = get_test(Args),
+ save_state_file(Test, State, Config),
gossip(State).
%% @doc save the state file to disk, with current timestamp.
%% thx to riak_ring_manager:do_write_ringfile/1
-save_state_file(State, Config) ->
+-spec save_state_file(test(), mem_state(), config()) -> ok.
+save_state_file(undefined, State, Config) ->
Dir = Config#config.directory,
{{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
@@ -454,7 +493,9 @@ save_state_file(State, Config) ->
ok = filelib:ensure_dir(FN),
{ok, File} = file:open(FN, [binary, write]),
io:format(File, "~w.~n", [State]),
- file:close(File).
+ file:close(File);
+
+save_state_file(_,_,_) -> ok. % don't save if testing
check_pos(Pos, Node, Nodes) ->
@@ -476,12 +517,8 @@ int_reset(Test) ->
int_reset(Test, #mem{}).
-int_reset(Test, State) ->
- Node = case Test of
- undefined -> node();
- _ -> Test
- end,
- State#mem{node=Node, nodes=[], clock=[]}.
+int_reset(_Test, State) ->
+ State#mem{nodes=[], clock=[]}.
ping_all_yall(Nodes) ->
@@ -492,18 +529,19 @@ get_remote_states(NodeList) ->
NodeList1 = lists:delete(node(), NodeList),
{States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000),
{_Status, States2} = lists:unzip(States1),
- {States2, BadNodes}.
+ {lists:zip(NodeList1,States2), BadNodes}.
%% @doc compare state with states based on vector clock
%% return match | {bad_state_match, Node, NodesThatDontMatch}
-compare_state_with_rest(#mem{node=Node, clock=Clock} = _State, States) ->
- Results = lists:map(fun(#mem{node=Node1, clock=Clock1}) ->
- {vector_clock:equals(Clock, Clock1), Node1}
+compare_state_with_rest(#mem{clock=Clock} = _State, States) ->
+ Results = lists:map(fun({Node, #mem{clock=Clock1}}) ->
+ {vector_clock:equals(Clock, Clock1), Node}
end, States),
BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn;
- ({false, N}, AccIn) -> [N | AccIn] end, [], Results),
+ ({false, N}, AccIn) -> [N | AccIn]
+ end, [], Results),
if
length(BadResults) == 0 -> match;
- true -> {bad_state_match, Node, BadResults}
+ true -> {bad_state_match, node(), BadResults}
end.
diff --git a/test/mem3_test.erl b/test/mem3_test.erl
index 069d897b..80699559 100644
--- a/test/mem3_test.erl
+++ b/test/mem3_test.erl
@@ -66,7 +66,7 @@ clock(_Pid) ->
join_first(_Pid) ->
mem3:reset(),
- mem3:join(first, [{1, a, []}, {2, b, []}]),
+ mem3:join(first, [{1, a, []}, {2, b, []}], nil),
{ok, Nodes} = mem3:nodes(),
?assertEqual(2, length(Nodes)),
ok.
@@ -78,7 +78,8 @@ join_first_with_hints(_Pid) ->
{2, b, []},
{3, c, [{hints, [?HINT_C1, ?HINT_C2]}]},
{4, d, []},
- {5, e, []}]),
+ {5, e, []}],
+ nil),
{ok, Nodes} = mem3:nodes(),
?assertEqual(5, length(Nodes)),
%?debugFmt("~nFullmap: ~p~n", [Fullmap]),
@@ -89,22 +90,21 @@ join_first_with_hints(_Pid) ->
join_new_node(_Pid) ->
mem3:reset(),
- mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
+ mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}], nil),
{ok, Nodes1} = mem3:nodes(),
?assertEqual(3, length(Nodes1)),
- mem3:join(new, [{4, d, []}]),
+ mem3:join(new, [{4, d, []}], a),
{ok, Nodes2} = mem3:nodes(),
?assertEqual(4, length(Nodes2)),
- ?debugFmt("~nNodes: ~p~n", [Nodes2]),
ok.
join_two_new_nodes(_Pid) ->
mem3:reset(),
- mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
+ mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}], nil),
{ok, Nodes1} = mem3:nodes(),
?assertEqual(3, length(Nodes1)),
- Res = mem3:join(new, [{4, d, []}, {5, e, []}]),
+ Res = mem3:join(new, [{4, d, []}, {5, e, []}], b),
?assertEqual(ok, Res),
{ok, Nodes2} = mem3:nodes(),
?assertEqual(5, length(Nodes2)),
@@ -114,15 +114,18 @@ join_two_new_nodes(_Pid) ->
join_with_wrong_order(_Pid) ->
mem3:reset(),
- mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
+ mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}], nil),
% ?assertEqual([], mem3:parts_for_node(d)),
%?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
- Res = mem3:join(new, [{3, d, []}]),
+ Res = mem3:join(new, [{3, d, []}], c),
?assertEqual({error,{position_exists,3,c}}, Res),
%?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
ok.
+%%
+%% tests without running gen_server
+%%
merge_nodes_test() ->
A = [{1,a1,[]},{2,a2,[]},{3,a3,[]}],
B = [{1,a1,[]},{2,a2,[]},{3,b3,[]}],
@@ -140,3 +143,15 @@ merge_nodes_with_init_nodelist_test() ->
?assertEqual(A, mem3:merge_nodes(A,B)),
?assertEqual(mem3:merge_nodes(A,B), mem3:merge_nodes(B,A)),
ok.
+
+
+next_up_nodes_test() ->
+ Nodes = [a,b,c,d],
+ UpNodes = [a,b,d],
+ ?assertEqual(b, mem3:next_up_node(a,Nodes,UpNodes)),
+ ?assertEqual(d, mem3:next_up_node(b,Nodes,UpNodes)),
+ ?assertEqual(a, mem3:next_up_node(d,Nodes,UpNodes)),
+ ?assertThrow({error, no_gossip_targets_available},
+ mem3:next_up_node(a,[a,b,c],[])),
+ ?assertEqual(b, mem3:next_up_node(a,[a,b],[a,b])),
+ ok.