diff options
-rw-r--r-- | src/mem3.erl | 117 | ||||
-rw-r--r-- | test/mem3_test.erl | 8 |
2 files changed, 98 insertions, 27 deletions
diff --git a/src/mem3.erl b/src/mem3.erl index ca5bec0b..eb4feea2 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -147,7 +147,7 @@ init(Args) -> Config = get_config(Args), Test = proplists:get_value(test, Args), OldState = read_latest_state_file(Test, Config), - State = handle_init(OldState), + State = handle_init(Test, OldState), {ok, State#mem{args=Args}}. @@ -157,11 +157,11 @@ handle_call({join, JoinType, ExtNodes}, _From, Config = get_config(Args), try NewState = handle_join(JoinType, ExtNodes, State, Config), + gossip(NewState), {reply, ok, NewState} - catch - _:Error -> - showroom_log:message(error, "~p", [Error]), - {reply, Error, State} + catch _:Error -> + showroom_log:message(error, "~p", [Error]), + {reply, Error, State} end; %% clock @@ -174,12 +174,13 @@ handle_call(state, _From, State) -> %% reset - but only if we're in test mode handle_call(reset, _From, #mem{args=Args} = State) -> - case proplists:get_value(test, Args) of + Test = proplists:get_value(test, Args), + case Test of undefined -> {reply, not_reset, State}; _ -> mochiglobal:delete(pmap), mochiglobal:delete(fullmap), - {reply, ok, int_reset(State)} + {reply, ok, int_reset(Test, State)} end; %% ignored call @@ -192,6 +193,19 @@ handle_call(Msg, _From, State) -> handle_cast(stop, State) -> {stop, normal, State}; +handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) -> + showroom_log:message(info, "membership: received gossip from ~p", + [RemoteNode]), + {MergeType, MergedState} = merge_state(RemoteState, LocalState), + case MergeType of + equal -> {noreply, MergedState}; + merged -> + showroom_log:message(info, "membership: merged new gossip", []), + update_cache(MergedState), + gossip(MergedState), + {noreply, MergedState} + end; + %% ignored cast handle_cast(Msg, State) -> showroom_log:message(info, "membership: ignored cast: ~p", [Msg]), @@ -248,12 +262,12 @@ get_config(Args) -> % 3. joining a cluster as a new node % 4. replacing a node in an existing cluster -handle_init(nil) -> +handle_init(Test, nil) -> showroom_log:message(info, "membership: membership server starting...", []), net_kernel:monitor_nodes(true), - int_reset(); + int_reset(Test); -handle_init(_OldState) -> +handle_init(_Test, _OldState) -> ?debugHere, % there's an old state, let's try to rejoin automatically % but only if we can compare our old state to all other @@ -269,10 +283,10 @@ handle_join(JoinType, ExtNodes, {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes), update_cache(Pmap, Fullmap), NewClock = vector_clock:increment(Node, Clock), - % TODO: gossip State#mem{nodes=ExtNodes, clock=NewClock}; handle_join(replace, [_OldNode | _], _State, _Config) -> + % TODO implement me ok; handle_join(JoinType, _, _, _) -> @@ -280,6 +294,23 @@ handle_join(JoinType, _, _, _) -> {error, {unknown_join_type, JoinType}}. +gossip(#mem{args=Args} = NewState) -> + Test = proplists:get_value(test, Args), + gossip(Test, NewState). + + +gossip(undefined, #mem{node=Node, nodes=StateNodes} = NewState) -> + {_, Nodes, _} = lists:unzip3(StateNodes), + PartnersPlus = replication:partners_plus(Node, Nodes), + lists:foreach(fun(TargetNode) -> + showroom_log:message(info, "membership: firing gossip from ~p to ~p", + [Node, TargetNode]), + gen_server:cast({?MODULE, TargetNode}, {gossip, NewState}) + end, PartnersPlus); +gossip(_,_) -> + % testing, so don't gossip + ok. + %% @doc find the latest state file on disk find_latest_state_filename(Config) -> Dir = Config#config.directory, @@ -300,20 +331,19 @@ find_latest_state_filename(Config) -> %% (Test, Config) -read_latest_state_file(true, _) -> - nil; -read_latest_state_file(_, Config) -> +read_latest_state_file(undefined, Config) -> try {ok, File} = find_latest_state_filename(Config), case file:consult(File) of {ok, #mem{}=State} -> State; _Else -> throw({error, bad_mem_state_file}) end - catch - _:Error -> - showroom_log:message(info, "membership: ~p", [Error]), - nil - end. + catch _:Error -> + showroom_log:message(info, "membership: ~p", [Error]), + nil + end; +read_latest_state_file(_, _) -> + nil. %% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap} @@ -348,6 +378,12 @@ make_fullmap(PMap, Config) -> %% cache table helper functions +update_cache(#mem{nodes=Nodes, args=Args}) -> + Config = get_config(Args), + {Pmap, Fullmap} = create_maps(Config, first, Nodes, []), + update_cache(Pmap, Fullmap). + + update_cache(Pmap, Fullmap) -> mochiglobal:put(pmap, Pmap), mochiglobal:put(fullmap, Fullmap). @@ -368,12 +404,47 @@ check_pos(Pos, Node, Nodes) -> end. -int_reset() -> - int_reset(#mem{}). +int_reset(Test) -> + int_reset(Test, #mem{}). -int_reset(State) -> - Node = node(), +int_reset(Test, State) -> + Node = case Test of + undefined -> node(); + _ -> Test + end, Nodes = [{0, Node, []}], Clock = vector_clock:create(Node), State#mem{node=Node, nodes=Nodes, clock=Clock}. + + +merge_state(_RemoteState=#mem{clock=RemoteClock, nodes=RemoteNodes}, + LocalState=#mem{clock=LocalClock, nodes=LocalNodes}) -> + case vector_clock:equals(RemoteClock, LocalClock) of + true -> + {equal, LocalState}; + false -> + {MergedClock, MergedNodes} = + merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes), + +% % notify of arrivals & departures +% Arrived = MergedNodes -- LocalNodes, +% notify(node_join, Arrived), +% Departed = LocalNodes -- MergedNodes, +% notify(node_leave, Departed), + + {merged, LocalState#mem{clock=MergedClock, nodes=MergedNodes}} + end. + + +merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) -> + {MergedClock, Merged} = + vector_clock:resolve({RemoteClock, RemoteNodes}, + {LocalClock, LocalNodes}), + {MergedClock, lists:keysort(1, Merged)}. + + +% notify(Type, Nodes) -> +% lists:foreach(fun(Node) -> +% gen_event:notify(membership_events, {Type, Node}) +% end, Nodes). diff --git a/test/mem3_test.erl b/test/mem3_test.erl index 5d8a004c..c47878d1 100644 --- a/test/mem3_test.erl +++ b/test/mem3_test.erl @@ -4,6 +4,7 @@ -include("../include/config.hrl"). -include_lib("eunit/include/eunit.hrl"). +-define(TEST_NODE_NAME, a). -define(HINT_C1, 365375409332725729550921208179070754913983135744). -define(HINT_C2, 1096126227998177188652763624537212264741949407232). -define(PARTS_FOR_D1, [365375409332725729550921208179070754913983135744, @@ -42,7 +43,7 @@ all_tests_test_() -> test_setup() -> Config = #config{n=3,r=2,w=2,q=3,directory="/srv/db", storage_mod="dynomite_couch_storage"}, - {ok, Pid} = mem3:start_link([{test,true}, {config, Config}]), + {ok, Pid} = mem3:start_link([{test,?TEST_NODE_NAME}, {config, Config}]), Pid. @@ -55,13 +56,12 @@ test_teardown(Pid) -> init(_Pid) -> #mem{args=Args} = mem3:state(), Test = proplists:get_value(test, Args), - ?assertEqual(true, Test). + ?assertEqual(?TEST_NODE_NAME, Test). clock(_Pid) -> - Node = node(), Clock = mem3:clock(), - ?assertMatch([{Node, _}], Clock). + ?assertMatch([{?TEST_NODE_NAME, _}], Clock). join_first(_Pid) -> |