summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mem3.erl100
-rw-r--r--test/mem3_test.erl34
2 files changed, 105 insertions, 29 deletions
diff --git a/src/mem3.erl b/src/mem3.erl
index 44ab4d4c..ca5bec0b 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -19,7 +19,7 @@
-behaviour(gen_server).
%% API
--export([start_link/0, start_link/1, stop/0, stop/1]).
+-export([start_link/0, start_link/1, stop/0, stop/1, reset/0]).
-export([join/2, clock/0, state/0]).
-export([partitions/0, fullmap/0]).
-export([nodes_for_part/1, nodes_for_part/2, all_nodes_parts/1]).
@@ -87,6 +87,11 @@ state() ->
gen_server:call(?MODULE, state).
+-spec reset() -> ok | not_reset.
+reset() ->
+ gen_server:call(?MODULE, reset).
+
+
%% @doc retrieve the primary partition map. This is a list of partitions and
%% their corresponding primary node, no replication partner nodes.
partitions() ->
@@ -146,21 +151,37 @@ init(Args) ->
{ok, State#mem{args=Args}}.
-%% new node joining to this node
+%% new node(s) joining to this node
handle_call({join, JoinType, ExtNodes}, _From,
- State = #mem{args=Args}) ->
+ #mem{args=Args} = State) ->
Config = get_config(Args),
- NewState = handle_join(JoinType, ExtNodes, State, Config),
- {reply, ok, NewState};
+ try
+ NewState = handle_join(JoinType, ExtNodes, State, Config),
+ {reply, ok, NewState}
+ catch
+ _:Error ->
+ showroom_log:message(error, "~p", [Error]),
+ {reply, Error, State}
+ end;
%% clock
-handle_call(clock, _From, State = #mem{clock=Clock}) ->
+handle_call(clock, _From, #mem{clock=Clock} = State) ->
{reply, Clock, State};
%% state
handle_call(state, _From, State) ->
{reply, State, State};
+%% reset - but only if we're in test mode
+handle_call(reset, _From, #mem{args=Args} = State) ->
+ case proplists:get_value(test, Args) of
+ undefined -> {reply, not_reset, State};
+ _ ->
+ mochiglobal:delete(pmap),
+ mochiglobal:delete(fullmap),
+ {reply, ok, int_reset(State)}
+ end;
+
%% ignored call
handle_call(Msg, _From, State) ->
showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
@@ -230,10 +251,7 @@ get_config(Args) ->
handle_init(nil) ->
showroom_log:message(info, "membership: membership server starting...", []),
net_kernel:monitor_nodes(true),
- Node = node(),
- Nodes = [{0, Node, []}],
- Clock = vector_clock:create(Node),
- #mem{node=Node, nodes=Nodes, clock=Clock};
+ int_reset();
handle_init(_OldState) ->
?debugHere,
@@ -245,9 +263,10 @@ handle_init(_OldState) ->
%% handle join activities, return NewState
-handle_join(JoinType, ExtNodes, #mem{node=Node, clock=Clock} = State, Config)
+handle_join(JoinType, ExtNodes,
+ #mem{node=Node, nodes=Nodes, clock=Clock} = State, Config)
when JoinType == first orelse JoinType == new ->
- {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes),
+ {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
update_cache(Pmap, Fullmap),
NewClock = vector_clock:increment(Node, Clock),
% TODO: gossip
@@ -297,37 +316,64 @@ read_latest_state_file(_, Config) ->
end.
-%% @doc given Config and a list of Nodes, construct a {Pmap,Fullmap}
+%% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap}
%% This is basically replaying all the mem events that have happened.
-create_maps(#config{q=Q} = Config, JoinType, Nodes) ->
- [{_,FirstNode,_}|_] = Nodes,
- Fun = fun({_Pos, Node, Options}, Map) ->
+create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) ->
+ [{_,FirstNode,_}|_] = ExtNodes,
+ Fun = fun({Pos, Node, Options}, Pmap) ->
+ check_pos(Pos, Node, Nodes),
Hints = proplists:get_value(hints, Options),
- {ok, NewMap} = partitions:join(Node, Map, Hints),
- NewMap
+ {ok, NewPmap} = partitions:join(Node, Pmap, Hints),
+ NewPmap
end,
Acc0 = case JoinType of
first -> partitions:create_partitions(Q, FirstNode);
new -> mochiglobal:get(pmap)
end,
- Pmap = lists:foldl(Fun, Acc0, lists:keysort(1, Nodes)),
+ Pmap = lists:foldl(Fun, Acc0, lists:keysort(1, ExtNodes)),
{Pmap, make_fullmap(Pmap, Config)}.
%% @doc construct a table with all partitions, with the primary node and all
%% replication partner nodes as well.
make_fullmap(PMap, Config) ->
- {Nodes, _Parts} = lists:unzip(PMap),
- NodeParts = lists:flatmap(
- fun({Node,Part}) ->
- Partners = replication:partners(Node, lists:usort(Nodes), Config),
- PartnerList = [{Partner, Part} || Partner <- Partners],
- [{Node, Part} | PartnerList]
- end, PMap),
- NodeParts.
+ {Nodes, _Parts} = lists:unzip(PMap),
+ NodeParts = lists:flatmap(
+ fun({Node,Part}) ->
+ Partners = replication:partners(Node, lists:usort(Nodes), Config),
+ PartnerList = [{Partner, Part} || Partner <- Partners],
+ [{Node, Part} | PartnerList]
+ end, PMap),
+ NodeParts.
%% cache table helper functions
update_cache(Pmap, Fullmap) ->
mochiglobal:put(pmap, Pmap),
mochiglobal:put(fullmap, Fullmap).
+
+
+check_pos(Pos, Node, Nodes) ->
+ Found = lists:keyfind(Pos, 1, Nodes),
+ case Found of
+ false -> ok;
+ _ ->
+ {_,OldNode,_} = Found,
+ if
+ OldNode =:= Node ->
+ throw({error, {node_exists_at_position, Pos, Node}});
+ true ->
+ throw({error, {position_exists, Pos, OldNode}})
+ end
+ end.
+
+
+int_reset() ->
+ int_reset(#mem{}).
+
+
+int_reset(State) ->
+ Node = node(),
+ Nodes = [{0, Node, []}],
+ Clock = vector_clock:create(Node),
+ State#mem{node=Node, nodes=Nodes, clock=Clock}.
diff --git a/test/mem3_test.erl b/test/mem3_test.erl
index 139187df..5d8a004c 100644
--- a/test/mem3_test.erl
+++ b/test/mem3_test.erl
@@ -12,6 +12,8 @@
913438523331814323877303020447676887284957839360,
1096126227998177188652763624537212264741949407232,
1278813932664540053428224228626747642198940975104]).
+-define(x40, 365375409332725729550921208179070754913983135744).
+-define(x60, 548063113999088594326381812268606132370974703616).
%% TEST SETUP
@@ -28,7 +30,9 @@ all_tests_test_() ->
fun clock/1,
fun join_first/1,
fun join_first_with_hints/1,
- fun join_new_node/1
+ fun join_new_node/1,
+ fun join_two_new_nodes/1,
+ fun join_with_wrong_order/1
]}
end}
]
@@ -61,6 +65,7 @@ clock(_Pid) ->
join_first(_Pid) ->
+ mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}]),
Fullmap = mem3:fullmap(),
?assertEqual(16, length(Fullmap)),
@@ -70,6 +75,7 @@ join_first(_Pid) ->
join_first_with_hints(_Pid) ->
+ mem3:reset(),
mem3:join(first, [{1, a, []},
{2, b, []},
{3, c, [{hints, [?HINT_C1, ?HINT_C2]}]},
@@ -86,10 +92,34 @@ join_first_with_hints(_Pid) ->
join_new_node(_Pid) ->
+ mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
?assertEqual(24, length(mem3:fullmap())),
?assertEqual([], mem3:parts_for_node(d)),
mem3:join(new, [{4, d, []}]),
?assertEqual(?PARTS_FOR_D1, mem3:parts_for_node(d)),
- ?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
+ %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
+ ok.
+
+
+join_two_new_nodes(_Pid) ->
+ mem3:reset(),
+ mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
+ ?assertEqual([], mem3:parts_for_node(d)),
+ Res = mem3:join(new, [{4, d, []}, {5, e, []}]),
+ ?assertEqual(ok, Res),
+ ?assertEqual([a,d,e], mem3:nodes_for_part(?x40)),
+ ?assertEqual([c,d,e], mem3:nodes_for_part(?x60)),
+ %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
+ ok.
+
+
+join_with_wrong_order(_Pid) ->
+ mem3:reset(),
+ mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
+ ?assertEqual([], mem3:parts_for_node(d)),
+ %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
+ Res = mem3:join(new, [{3, d, []}]),
+ ?assertEqual({error,{position_exists,3,c}}, Res),
+ %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
ok.