diff options
-rw-r--r-- | include/common.hrl | 4 | ||||
-rw-r--r-- | src/mem3.erl | 163 | ||||
-rw-r--r-- | test/mem3_test.erl | 36 |
3 files changed, 83 insertions, 120 deletions
diff --git a/include/common.hrl b/include/common.hrl index 88c3fa9a..4315a54c 100644 --- a/include/common.hrl +++ b/include/common.hrl @@ -43,7 +43,7 @@ %% version 3 of membership state -record(mem, {header=3, node, - nodes, - clock, + nodes=[], + clock=[], args }). diff --git a/src/mem3.erl b/src/mem3.erl index cec2631c..c5c558da 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -165,11 +165,9 @@ init(Args) -> %% new node(s) joining to this node -handle_call({join, JoinType, ExtNodes}, _From, - #mem{args=Args} = State) -> - Config = get_config(Args), +handle_call({join, JoinType, ExtNodes}, _From, State) -> try - NewState = handle_join(JoinType, ExtNodes, State, Config), + NewState = handle_join(JoinType, ExtNodes, State), {reply, ok, NewState} catch _:Error -> showroom_log:message(error, "~p", [Error]), @@ -189,10 +187,7 @@ handle_call(reset, _From, #mem{args=Args} = State) -> Test = proplists:get_value(test, Args), case Test of undefined -> {reply, not_reset, State}; - _ -> - mochiglobal:delete(pmap), - mochiglobal:delete(fullmap), - {reply, ok, int_reset(Test, State)} + _ -> {reply, ok, int_reset(Test, State)} end; %% nodes @@ -200,6 +195,12 @@ handle_call(nodes, _From, #mem{nodes=Nodes} = State) -> {_,NodeList,_} = lists:unzip3(Nodes), {reply, {ok, NodeList}, State}; +%% gossip +handle_call({gossip, #mem{node=RemoteNode} = RemoteState}, From, LocalState) -> + showroom_log:message(info, "membership: received gossip from ~p", + [RemoteNode]), + handle_gossip(From, RemoteState, LocalState); + %% ignored call handle_call(Msg, _From, State) -> showroom_log:message(info, "membership: ignored call: ~p", [Msg]), @@ -210,12 +211,6 @@ handle_call(Msg, _From, State) -> handle_cast(stop, State) -> {stop, normal, State}; -%% gossip -handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) -> - showroom_log:message(info, "membership: received gossip from ~p", - [RemoteNode]), - {noreply, handle_gossip(RemoteState, LocalState)}; - %% ignored cast handle_cast(Msg, State) -> showroom_log:message(info, "membership: ignored cast: ~p", [Msg]), @@ -294,52 +289,60 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) -> %% handle join activities, return NewState -handle_join(first, ExtNodes, State, Config) -> +handle_join(first, ExtNodes, State) -> {_,Nodes,_} = lists:unzip3(ExtNodes), ping_all_yall(Nodes), - int_join(first, ExtNodes, State, Config); + int_join(ExtNodes, State); -handle_join(new, ExtNodes, State, Config) -> - int_join(new, ExtNodes, State, Config); +handle_join(new, ExtNodes, State) -> + {_,Nodes,_} = lists:unzip3(ExtNodes), + ping_all_yall(Nodes), + int_join(ExtNodes, State); -handle_join(replace, [_OldNode | _], _State, _Config) -> +handle_join(replace, [_OldNode | _], _State) -> % TODO implement me ok; -handle_join(JoinType, _, _, _) -> +handle_join(JoinType, _, _) -> showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]), {error, {unknown_join_type, JoinType}}. -int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State, - Config) -> +int_join(ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State) -> + NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) -> + check_pos(Pos, N, Nodes), + [New|AccIn] + end, Nodes, ExtNodes), + NewNodes1 = lists:sort(NewNodes), NewClock = vector_clock:increment(Node, Clock), - NewState = State#mem{nodes=ExtNodes, clock=NewClock}, - {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes), - new_state(NewState, Pmap, Fullmap, Config). + NewState = State#mem{nodes=NewNodes1, clock=NewClock}, + install_new_state(NewState), + NewState. %% @doc handle the gossip messages %% We're not using vector_clock:resolve b/c we need custom merge strategy -handle_gossip(RemoteState=#mem{clock=RemoteClock, node=RemoteNode}, +handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, LocalState=#mem{clock=LocalClock}) -> case vector_clock:compare(RemoteClock, LocalClock) of - equal -> LocalState; + equal -> + {reply, ok, LocalState}; less -> % remote node needs updating - gen_server:cast({?SERVER, RemoteNode}, {gossip, LocalState}), - LocalState; + {reply, {new_state, LocalState}, LocalState}; greater -> % local node needs updating - new_state(RemoteState); + gen_server:reply(From, ok), % reply to sender first + install_new_state(RemoteState); concurrent -> % ick, so let's resolve and merge states showroom_log:message(info, - "~nmembership: Concurrent Clocks~n" - "~nRemoteState : ~p~nLocalState : ~p~n" + "membership: Concurrent Clocks~n" + "RemoteState : ~p~nLocalState : ~p~n" , [RemoteState, LocalState]), MergedState = merge_states(RemoteState, LocalState), - new_state(MergedState) + gen_server:reply(From, {new_state, MergedState}), % reply to sender + install_new_state(MergedState) end. @@ -363,29 +366,39 @@ merge_nodes(Remote, Local) -> end. -% notify(Type, Nodes) -> -% lists:foreach(fun(Node) -> -% gen_event:notify(membership_events, {Type, Node}) -% end, Nodes). - gossip(#mem{args=Args} = NewState) -> Test = proplists:get_value(test, Args), gossip(Test, NewState). -gossip(undefined, #mem{node=Node, nodes=StateNodes} = NewState) -> +gossip(undefined, #mem{node=Node, nodes=StateNodes} = State) -> {_, Nodes, _} = lists:unzip3(StateNodes), - PartnersPlus = replication:partners_plus(Node, Nodes), - lists:foreach(fun(TargetNode) -> - showroom_log:message(info, "membership: firing gossip from ~p to ~p", + TargetNode = next_up_node(Node, Nodes), + showroom_log:message(info, "membership: firing gossip from ~p to ~p", [Node, TargetNode]), - gen_server:cast({?SERVER, TargetNode}, {gossip, NewState}) - end, PartnersPlus); + case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of + ok -> ok; + {new_state, _NewState} -> ?debugHere,ok; + Error -> throw({unknown_gossip_response, Error}) + end; + gossip(_,_) -> % testing, so don't gossip ok. +next_up_node(Node, Nodes) -> + {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes), + List = lists:append([B, A, [Node]]), + UpNodes = lists:delete(fun(N) -> lists:member(N, up_nodes()) end, List), + hd(UpNodes). % TODO: empty list? + + +up_nodes() -> + % TODO: implement cache (fb 9704 & 9449) + erlang:nodes(). + + %% @doc find the latest state file on disk find_latest_state_filename(Config) -> Dir = Config#config.directory, @@ -424,6 +437,12 @@ read_latest_state_file(_, _) -> nil. +install_new_state(#mem{args=Args} = State) -> + Config = get_config(Args), + save_state_file(State, Config), + gossip(State). + + %% @doc save the state file to disk, with current timestamp. %% thx to riak_ring_manager:do_write_ringfile/1 save_state_file(State, Config) -> @@ -438,58 +457,6 @@ save_state_file(State, Config) -> file:close(File). -%% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap} -%% This is basically replaying all the mem events that have happened. -create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) -> - [{_,FirstNode,_}|_] = ExtNodes, - Fun = fun({Pos, Node, Options}, Pmap) -> - check_pos(Pos, Node, Nodes), - Hints = proplists:get_value(hints, Options), - {ok, NewPmap} = partitions:join(Node, Pmap, Hints), - NewPmap - end, - Acc0 = case JoinType of - first -> partitions:create_partitions(Q, FirstNode); - new -> mochiglobal:get(pmap) - end, - Pmap = lists:foldl(Fun, Acc0, lists:keysort(1, ExtNodes)), - {Pmap, make_fullmap(Pmap, Config)}. - - -%% @doc construct a table with all partitions, with the primary node and all -%% replication partner nodes as well. -make_fullmap(PMap, Config) -> - {Nodes, _Parts} = lists:unzip(PMap), - NodeParts = lists:flatmap( - fun({Node,Part}) -> - Partners = replication:partners(Node, lists:usort(Nodes), Config), - PartnerList = [{Partner, Part} || Partner <- Partners], - [{Node, Part} | PartnerList] - end, PMap), - NodeParts. - - -%% @doc tasks associated with a new state -new_state(#mem{nodes=Nodes, args=Args} = State) -> - Config = get_config(Args), - {Pmap, Fullmap} = create_maps(Config, first, Nodes, []), - new_state(State, Pmap, Fullmap, Config). - - -%% @doc tasks associated with a new state -new_state(State, Pmap, Fullmap, Config) -> - update_cache(Pmap, Fullmap), - save_state_file(State, Config), - gossip(State), - State. - - -%% cache table helper function -update_cache(Pmap, Fullmap) -> - mochiglobal:put(pmap, Pmap), - mochiglobal:put(fullmap, Fullmap). - - check_pos(Pos, Node, Nodes) -> Found = lists:keyfind(Pos, 1, Nodes), case Found of @@ -514,9 +481,7 @@ int_reset(Test, State) -> undefined -> node(); _ -> Test end, - Nodes = [{0, Node, []}], - Clock = vector_clock:create(Node), - State#mem{node=Node, nodes=Nodes, clock=Clock}. + State#mem{node=Node, nodes=[], clock=[]}. ping_all_yall(Nodes) -> diff --git a/test/mem3_test.erl b/test/mem3_test.erl index 03e55978..069d897b 100644 --- a/test/mem3_test.erl +++ b/test/mem3_test.erl @@ -61,16 +61,14 @@ init(_Pid) -> clock(_Pid) -> {ok, Clock} = mem3:clock(), - ?assertMatch([{?TEST_NODE_NAME, _}], Clock). + ?assertMatch([], Clock). join_first(_Pid) -> mem3:reset(), mem3:join(first, [{1, a, []}, {2, b, []}]), - Fullmap = mem3:fullmap(), - ?assertEqual(16, length(Fullmap)), - Pmap = mem3:partitions(), - ?assertEqual(8, length(Pmap)), + {ok, Nodes} = mem3:nodes(), + ?assertEqual(2, length(Nodes)), ok. @@ -81,35 +79,35 @@ join_first_with_hints(_Pid) -> {3, c, [{hints, [?HINT_C1, ?HINT_C2]}]}, {4, d, []}, {5, e, []}]), - Fullmap = mem3:fullmap(), - ?assertEqual(24, length(Fullmap)), - Pmap = mem3:partitions(), - ?assertEqual(8, length(Pmap)), + {ok, Nodes} = mem3:nodes(), + ?assertEqual(5, length(Nodes)), %?debugFmt("~nFullmap: ~p~n", [Fullmap]), - ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C1)), - ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C2)), +% ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C1)), +% ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C2)), ok. join_new_node(_Pid) -> mem3:reset(), mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]), - ?assertEqual(24, length(mem3:fullmap())), - ?assertEqual([], mem3:parts_for_node(d)), + {ok, Nodes1} = mem3:nodes(), + ?assertEqual(3, length(Nodes1)), mem3:join(new, [{4, d, []}]), - ?assertEqual(?PARTS_FOR_D1, mem3:parts_for_node(d)), - %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]), + {ok, Nodes2} = mem3:nodes(), + ?assertEqual(4, length(Nodes2)), + ?debugFmt("~nNodes: ~p~n", [Nodes2]), ok. join_two_new_nodes(_Pid) -> mem3:reset(), mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]), - ?assertEqual([], mem3:parts_for_node(d)), + {ok, Nodes1} = mem3:nodes(), + ?assertEqual(3, length(Nodes1)), Res = mem3:join(new, [{4, d, []}, {5, e, []}]), ?assertEqual(ok, Res), - ?assertEqual([a,d,e], mem3:nodes_for_part(?x40)), - ?assertEqual([c,d,e], mem3:nodes_for_part(?x60)), + {ok, Nodes2} = mem3:nodes(), + ?assertEqual(5, length(Nodes2)), %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]), ok. @@ -117,7 +115,7 @@ join_two_new_nodes(_Pid) -> join_with_wrong_order(_Pid) -> mem3:reset(), mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]), - ?assertEqual([], mem3:parts_for_node(d)), +% ?assertEqual([], mem3:parts_for_node(d)), %?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]), Res = mem3:join(new, [{3, d, []}]), ?assertEqual({error,{position_exists,3,c}}, Res), |