summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/common.hrl4
-rw-r--r--src/mem3.erl163
-rw-r--r--test/mem3_test.erl36
3 files changed, 83 insertions, 120 deletions
diff --git a/include/common.hrl b/include/common.hrl
index 88c3fa9a..4315a54c 100644
--- a/include/common.hrl
+++ b/include/common.hrl
@@ -43,7 +43,7 @@
%% version 3 of membership state
-record(mem, {header=3,
node,
- nodes,
- clock,
+ nodes=[],
+ clock=[],
args
}).
diff --git a/src/mem3.erl b/src/mem3.erl
index cec2631c..c5c558da 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -165,11 +165,9 @@ init(Args) ->
%% new node(s) joining to this node
-handle_call({join, JoinType, ExtNodes}, _From,
- #mem{args=Args} = State) ->
- Config = get_config(Args),
+handle_call({join, JoinType, ExtNodes}, _From, State) ->
try
- NewState = handle_join(JoinType, ExtNodes, State, Config),
+ NewState = handle_join(JoinType, ExtNodes, State),
{reply, ok, NewState}
catch _:Error ->
showroom_log:message(error, "~p", [Error]),
@@ -189,10 +187,7 @@ handle_call(reset, _From, #mem{args=Args} = State) ->
Test = proplists:get_value(test, Args),
case Test of
undefined -> {reply, not_reset, State};
- _ ->
- mochiglobal:delete(pmap),
- mochiglobal:delete(fullmap),
- {reply, ok, int_reset(Test, State)}
+ _ -> {reply, ok, int_reset(Test, State)}
end;
%% nodes
@@ -200,6 +195,12 @@ handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
{_,NodeList,_} = lists:unzip3(Nodes),
{reply, {ok, NodeList}, State};
+%% gossip
+handle_call({gossip, #mem{node=RemoteNode} = RemoteState}, From, LocalState) ->
+ showroom_log:message(info, "membership: received gossip from ~p",
+ [RemoteNode]),
+ handle_gossip(From, RemoteState, LocalState);
+
%% ignored call
handle_call(Msg, _From, State) ->
showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
@@ -210,12 +211,6 @@ handle_call(Msg, _From, State) ->
handle_cast(stop, State) ->
{stop, normal, State};
-%% gossip
-handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) ->
- showroom_log:message(info, "membership: received gossip from ~p",
- [RemoteNode]),
- {noreply, handle_gossip(RemoteState, LocalState)};
-
%% ignored cast
handle_cast(Msg, State) ->
showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
@@ -294,52 +289,60 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
%% handle join activities, return NewState
-handle_join(first, ExtNodes, State, Config) ->
+handle_join(first, ExtNodes, State) ->
{_,Nodes,_} = lists:unzip3(ExtNodes),
ping_all_yall(Nodes),
- int_join(first, ExtNodes, State, Config);
+ int_join(ExtNodes, State);
-handle_join(new, ExtNodes, State, Config) ->
- int_join(new, ExtNodes, State, Config);
+handle_join(new, ExtNodes, State) ->
+ {_,Nodes,_} = lists:unzip3(ExtNodes),
+ ping_all_yall(Nodes),
+ int_join(ExtNodes, State);
-handle_join(replace, [_OldNode | _], _State, _Config) ->
+handle_join(replace, [_OldNode | _], _State) ->
% TODO implement me
ok;
-handle_join(JoinType, _, _, _) ->
+handle_join(JoinType, _, _) ->
showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]),
{error, {unknown_join_type, JoinType}}.
-int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State,
- Config) ->
+int_join(ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State) ->
+ NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
+ check_pos(Pos, N, Nodes),
+ [New|AccIn]
+ end, Nodes, ExtNodes),
+ NewNodes1 = lists:sort(NewNodes),
NewClock = vector_clock:increment(Node, Clock),
- NewState = State#mem{nodes=ExtNodes, clock=NewClock},
- {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
- new_state(NewState, Pmap, Fullmap, Config).
+ NewState = State#mem{nodes=NewNodes1, clock=NewClock},
+ install_new_state(NewState),
+ NewState.
%% @doc handle the gossip messages
%% We're not using vector_clock:resolve b/c we need custom merge strategy
-handle_gossip(RemoteState=#mem{clock=RemoteClock, node=RemoteNode},
+handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
LocalState=#mem{clock=LocalClock}) ->
case vector_clock:compare(RemoteClock, LocalClock) of
- equal -> LocalState;
+ equal ->
+ {reply, ok, LocalState};
less ->
% remote node needs updating
- gen_server:cast({?SERVER, RemoteNode}, {gossip, LocalState}),
- LocalState;
+ {reply, {new_state, LocalState}, LocalState};
greater ->
% local node needs updating
- new_state(RemoteState);
+ gen_server:reply(From, ok), % reply to sender first
+ install_new_state(RemoteState);
concurrent ->
% ick, so let's resolve and merge states
showroom_log:message(info,
- "~nmembership: Concurrent Clocks~n"
- "~nRemoteState : ~p~nLocalState : ~p~n"
+ "membership: Concurrent Clocks~n"
+ "RemoteState : ~p~nLocalState : ~p~n"
, [RemoteState, LocalState]),
MergedState = merge_states(RemoteState, LocalState),
- new_state(MergedState)
+ gen_server:reply(From, {new_state, MergedState}), % reply to sender
+ install_new_state(MergedState)
end.
@@ -363,29 +366,39 @@ merge_nodes(Remote, Local) ->
end.
-% notify(Type, Nodes) ->
-% lists:foreach(fun(Node) ->
-% gen_event:notify(membership_events, {Type, Node})
-% end, Nodes).
-
gossip(#mem{args=Args} = NewState) ->
Test = proplists:get_value(test, Args),
gossip(Test, NewState).
-gossip(undefined, #mem{node=Node, nodes=StateNodes} = NewState) ->
+gossip(undefined, #mem{node=Node, nodes=StateNodes} = State) ->
{_, Nodes, _} = lists:unzip3(StateNodes),
- PartnersPlus = replication:partners_plus(Node, Nodes),
- lists:foreach(fun(TargetNode) ->
- showroom_log:message(info, "membership: firing gossip from ~p to ~p",
+ TargetNode = next_up_node(Node, Nodes),
+ showroom_log:message(info, "membership: firing gossip from ~p to ~p",
[Node, TargetNode]),
- gen_server:cast({?SERVER, TargetNode}, {gossip, NewState})
- end, PartnersPlus);
+ case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of
+ ok -> ok;
+ {new_state, _NewState} -> ?debugHere,ok;
+ Error -> throw({unknown_gossip_response, Error})
+ end;
+
gossip(_,_) ->
% testing, so don't gossip
ok.
+next_up_node(Node, Nodes) ->
+ {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
+ List = lists:append([B, A, [Node]]),
+ UpNodes = lists:delete(fun(N) -> lists:member(N, up_nodes()) end, List),
+ hd(UpNodes). % TODO: empty list?
+
+
+up_nodes() ->
+ % TODO: implement cache (fb 9704 & 9449)
+ erlang:nodes().
+
+
%% @doc find the latest state file on disk
find_latest_state_filename(Config) ->
Dir = Config#config.directory,
@@ -424,6 +437,12 @@ read_latest_state_file(_, _) ->
nil.
+install_new_state(#mem{args=Args} = State) ->
+ Config = get_config(Args),
+ save_state_file(State, Config),
+ gossip(State).
+
+
%% @doc save the state file to disk, with current timestamp.
%% thx to riak_ring_manager:do_write_ringfile/1
save_state_file(State, Config) ->
@@ -438,58 +457,6 @@ save_state_file(State, Config) ->
file:close(File).
-%% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap}
-%% This is basically replaying all the mem events that have happened.
-create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) ->
- [{_,FirstNode,_}|_] = ExtNodes,
- Fun = fun({Pos, Node, Options}, Pmap) ->
- check_pos(Pos, Node, Nodes),
- Hints = proplists:get_value(hints, Options),
- {ok, NewPmap} = partitions:join(Node, Pmap, Hints),
- NewPmap
- end,
- Acc0 = case JoinType of
- first -> partitions:create_partitions(Q, FirstNode);
- new -> mochiglobal:get(pmap)
- end,
- Pmap = lists:foldl(Fun, Acc0, lists:keysort(1, ExtNodes)),
- {Pmap, make_fullmap(Pmap, Config)}.
-
-
-%% @doc construct a table with all partitions, with the primary node and all
-%% replication partner nodes as well.
-make_fullmap(PMap, Config) ->
- {Nodes, _Parts} = lists:unzip(PMap),
- NodeParts = lists:flatmap(
- fun({Node,Part}) ->
- Partners = replication:partners(Node, lists:usort(Nodes), Config),
- PartnerList = [{Partner, Part} || Partner <- Partners],
- [{Node, Part} | PartnerList]
- end, PMap),
- NodeParts.
-
-
-%% @doc tasks associated with a new state
-new_state(#mem{nodes=Nodes, args=Args} = State) ->
- Config = get_config(Args),
- {Pmap, Fullmap} = create_maps(Config, first, Nodes, []),
- new_state(State, Pmap, Fullmap, Config).
-
-
-%% @doc tasks associated with a new state
-new_state(State, Pmap, Fullmap, Config) ->
- update_cache(Pmap, Fullmap),
- save_state_file(State, Config),
- gossip(State),
- State.
-
-
-%% cache table helper function
-update_cache(Pmap, Fullmap) ->
- mochiglobal:put(pmap, Pmap),
- mochiglobal:put(fullmap, Fullmap).
-
-
check_pos(Pos, Node, Nodes) ->
Found = lists:keyfind(Pos, 1, Nodes),
case Found of
@@ -514,9 +481,7 @@ int_reset(Test, State) ->
undefined -> node();
_ -> Test
end,
- Nodes = [{0, Node, []}],
- Clock = vector_clock:create(Node),
- State#mem{node=Node, nodes=Nodes, clock=Clock}.
+ State#mem{node=Node, nodes=[], clock=[]}.
ping_all_yall(Nodes) ->
diff --git a/test/mem3_test.erl b/test/mem3_test.erl
index 03e55978..069d897b 100644
--- a/test/mem3_test.erl
+++ b/test/mem3_test.erl
@@ -61,16 +61,14 @@ init(_Pid) ->
clock(_Pid) ->
{ok, Clock} = mem3:clock(),
- ?assertMatch([{?TEST_NODE_NAME, _}], Clock).
+ ?assertMatch([], Clock).
join_first(_Pid) ->
mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}]),
- Fullmap = mem3:fullmap(),
- ?assertEqual(16, length(Fullmap)),
- Pmap = mem3:partitions(),
- ?assertEqual(8, length(Pmap)),
+ {ok, Nodes} = mem3:nodes(),
+ ?assertEqual(2, length(Nodes)),
ok.
@@ -81,35 +79,35 @@ join_first_with_hints(_Pid) ->
{3, c, [{hints, [?HINT_C1, ?HINT_C2]}]},
{4, d, []},
{5, e, []}]),
- Fullmap = mem3:fullmap(),
- ?assertEqual(24, length(Fullmap)),
- Pmap = mem3:partitions(),
- ?assertEqual(8, length(Pmap)),
+ {ok, Nodes} = mem3:nodes(),
+ ?assertEqual(5, length(Nodes)),
%?debugFmt("~nFullmap: ~p~n", [Fullmap]),
- ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C1)),
- ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C2)),
+% ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C1)),
+% ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C2)),
ok.
join_new_node(_Pid) ->
mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
- ?assertEqual(24, length(mem3:fullmap())),
- ?assertEqual([], mem3:parts_for_node(d)),
+ {ok, Nodes1} = mem3:nodes(),
+ ?assertEqual(3, length(Nodes1)),
mem3:join(new, [{4, d, []}]),
- ?assertEqual(?PARTS_FOR_D1, mem3:parts_for_node(d)),
- %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
+ {ok, Nodes2} = mem3:nodes(),
+ ?assertEqual(4, length(Nodes2)),
+ ?debugFmt("~nNodes: ~p~n", [Nodes2]),
ok.
join_two_new_nodes(_Pid) ->
mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
- ?assertEqual([], mem3:parts_for_node(d)),
+ {ok, Nodes1} = mem3:nodes(),
+ ?assertEqual(3, length(Nodes1)),
Res = mem3:join(new, [{4, d, []}, {5, e, []}]),
?assertEqual(ok, Res),
- ?assertEqual([a,d,e], mem3:nodes_for_part(?x40)),
- ?assertEqual([c,d,e], mem3:nodes_for_part(?x60)),
+ {ok, Nodes2} = mem3:nodes(),
+ ?assertEqual(5, length(Nodes2)),
%?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
ok.
@@ -117,7 +115,7 @@ join_two_new_nodes(_Pid) ->
join_with_wrong_order(_Pid) ->
mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
- ?assertEqual([], mem3:parts_for_node(d)),
+% ?assertEqual([], mem3:parts_for_node(d)),
%?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
Res = mem3:join(new, [{3, d, []}]),
?assertEqual({error,{position_exists,3,c}}, Res),