summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mem3.erl117
-rw-r--r--test/mem3_test.erl8
2 files changed, 98 insertions, 27 deletions
diff --git a/src/mem3.erl b/src/mem3.erl
index ca5bec0b..eb4feea2 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -147,7 +147,7 @@ init(Args) ->
Config = get_config(Args),
Test = proplists:get_value(test, Args),
OldState = read_latest_state_file(Test, Config),
- State = handle_init(OldState),
+ State = handle_init(Test, OldState),
{ok, State#mem{args=Args}}.
@@ -157,11 +157,11 @@ handle_call({join, JoinType, ExtNodes}, _From,
Config = get_config(Args),
try
NewState = handle_join(JoinType, ExtNodes, State, Config),
+ gossip(NewState),
{reply, ok, NewState}
- catch
- _:Error ->
- showroom_log:message(error, "~p", [Error]),
- {reply, Error, State}
+ catch _:Error ->
+ showroom_log:message(error, "~p", [Error]),
+ {reply, Error, State}
end;
%% clock
@@ -174,12 +174,13 @@ handle_call(state, _From, State) ->
%% reset - but only if we're in test mode
handle_call(reset, _From, #mem{args=Args} = State) ->
- case proplists:get_value(test, Args) of
+ Test = proplists:get_value(test, Args),
+ case Test of
undefined -> {reply, not_reset, State};
_ ->
mochiglobal:delete(pmap),
mochiglobal:delete(fullmap),
- {reply, ok, int_reset(State)}
+ {reply, ok, int_reset(Test, State)}
end;
%% ignored call
@@ -192,6 +193,19 @@ handle_call(Msg, _From, State) ->
handle_cast(stop, State) ->
{stop, normal, State};
+handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) ->
+ showroom_log:message(info, "membership: received gossip from ~p",
+ [RemoteNode]),
+ {MergeType, MergedState} = merge_state(RemoteState, LocalState),
+ case MergeType of
+ equal -> {noreply, MergedState};
+ merged ->
+ showroom_log:message(info, "membership: merged new gossip", []),
+ update_cache(MergedState),
+ gossip(MergedState),
+ {noreply, MergedState}
+ end;
+
%% ignored cast
handle_cast(Msg, State) ->
showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
@@ -248,12 +262,12 @@ get_config(Args) ->
% 3. joining a cluster as a new node
% 4. replacing a node in an existing cluster
-handle_init(nil) ->
+handle_init(Test, nil) ->
showroom_log:message(info, "membership: membership server starting...", []),
net_kernel:monitor_nodes(true),
- int_reset();
+ int_reset(Test);
-handle_init(_OldState) ->
+handle_init(_Test, _OldState) ->
?debugHere,
% there's an old state, let's try to rejoin automatically
% but only if we can compare our old state to all other
@@ -269,10 +283,10 @@ handle_join(JoinType, ExtNodes,
{Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
update_cache(Pmap, Fullmap),
NewClock = vector_clock:increment(Node, Clock),
- % TODO: gossip
State#mem{nodes=ExtNodes, clock=NewClock};
handle_join(replace, [_OldNode | _], _State, _Config) ->
+ % TODO implement me
ok;
handle_join(JoinType, _, _, _) ->
@@ -280,6 +294,23 @@ handle_join(JoinType, _, _, _) ->
{error, {unknown_join_type, JoinType}}.
+gossip(#mem{args=Args} = NewState) ->
+ Test = proplists:get_value(test, Args),
+ gossip(Test, NewState).
+
+
+gossip(undefined, #mem{node=Node, nodes=StateNodes} = NewState) ->
+ {_, Nodes, _} = lists:unzip3(StateNodes),
+ PartnersPlus = replication:partners_plus(Node, Nodes),
+ lists:foreach(fun(TargetNode) ->
+ showroom_log:message(info, "membership: firing gossip from ~p to ~p",
+ [Node, TargetNode]),
+ gen_server:cast({?MODULE, TargetNode}, {gossip, NewState})
+ end, PartnersPlus);
+gossip(_,_) ->
+ % testing, so don't gossip
+ ok.
+
%% @doc find the latest state file on disk
find_latest_state_filename(Config) ->
Dir = Config#config.directory,
@@ -300,20 +331,19 @@ find_latest_state_filename(Config) ->
%% (Test, Config)
-read_latest_state_file(true, _) ->
- nil;
-read_latest_state_file(_, Config) ->
+read_latest_state_file(undefined, Config) ->
try
{ok, File} = find_latest_state_filename(Config),
case file:consult(File) of
{ok, #mem{}=State} -> State;
_Else -> throw({error, bad_mem_state_file})
end
- catch
- _:Error ->
- showroom_log:message(info, "membership: ~p", [Error]),
- nil
- end.
+ catch _:Error ->
+ showroom_log:message(info, "membership: ~p", [Error]),
+ nil
+ end;
+read_latest_state_file(_, _) ->
+ nil.
%% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap}
@@ -348,6 +378,12 @@ make_fullmap(PMap, Config) ->
%% cache table helper functions
+update_cache(#mem{nodes=Nodes, args=Args}) ->
+ Config = get_config(Args),
+ {Pmap, Fullmap} = create_maps(Config, first, Nodes, []),
+ update_cache(Pmap, Fullmap).
+
+
update_cache(Pmap, Fullmap) ->
mochiglobal:put(pmap, Pmap),
mochiglobal:put(fullmap, Fullmap).
@@ -368,12 +404,47 @@ check_pos(Pos, Node, Nodes) ->
end.
-int_reset() ->
- int_reset(#mem{}).
+int_reset(Test) ->
+ int_reset(Test, #mem{}).
-int_reset(State) ->
- Node = node(),
+int_reset(Test, State) ->
+ Node = case Test of
+ undefined -> node();
+ _ -> Test
+ end,
Nodes = [{0, Node, []}],
Clock = vector_clock:create(Node),
State#mem{node=Node, nodes=Nodes, clock=Clock}.
+
+
+merge_state(_RemoteState=#mem{clock=RemoteClock, nodes=RemoteNodes},
+ LocalState=#mem{clock=LocalClock, nodes=LocalNodes}) ->
+ case vector_clock:equals(RemoteClock, LocalClock) of
+ true ->
+ {equal, LocalState};
+ false ->
+ {MergedClock, MergedNodes} =
+ merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes),
+
+% % notify of arrivals & departures
+% Arrived = MergedNodes -- LocalNodes,
+% notify(node_join, Arrived),
+% Departed = LocalNodes -- MergedNodes,
+% notify(node_leave, Departed),
+
+ {merged, LocalState#mem{clock=MergedClock, nodes=MergedNodes}}
+ end.
+
+
+merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) ->
+ {MergedClock, Merged} =
+ vector_clock:resolve({RemoteClock, RemoteNodes},
+ {LocalClock, LocalNodes}),
+ {MergedClock, lists:keysort(1, Merged)}.
+
+
+% notify(Type, Nodes) ->
+% lists:foreach(fun(Node) ->
+% gen_event:notify(membership_events, {Type, Node})
+% end, Nodes).
diff --git a/test/mem3_test.erl b/test/mem3_test.erl
index 5d8a004c..c47878d1 100644
--- a/test/mem3_test.erl
+++ b/test/mem3_test.erl
@@ -4,6 +4,7 @@
-include("../include/config.hrl").
-include_lib("eunit/include/eunit.hrl").
+-define(TEST_NODE_NAME, a).
-define(HINT_C1, 365375409332725729550921208179070754913983135744).
-define(HINT_C2, 1096126227998177188652763624537212264741949407232).
-define(PARTS_FOR_D1, [365375409332725729550921208179070754913983135744,
@@ -42,7 +43,7 @@ all_tests_test_() ->
test_setup() ->
Config = #config{n=3,r=2,w=2,q=3,directory="/srv/db",
storage_mod="dynomite_couch_storage"},
- {ok, Pid} = mem3:start_link([{test,true}, {config, Config}]),
+ {ok, Pid} = mem3:start_link([{test,?TEST_NODE_NAME}, {config, Config}]),
Pid.
@@ -55,13 +56,12 @@ test_teardown(Pid) ->
init(_Pid) ->
#mem{args=Args} = mem3:state(),
Test = proplists:get_value(test, Args),
- ?assertEqual(true, Test).
+ ?assertEqual(?TEST_NODE_NAME, Test).
clock(_Pid) ->
- Node = node(),
Clock = mem3:clock(),
- ?assertMatch([{Node, _}], Clock).
+ ?assertMatch([{?TEST_NODE_NAME, _}], Clock).
join_first(_Pid) ->