summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-04-16 23:40:13 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-09 22:56:24 -0400
commitf23371c2ec884628e73abd783c3beedfaa25d490 (patch)
tree65ea906b0387dd1fcd5adbb67c60ca09ea9336ce
parent75005e7dfc78bc9232e1364799a03ad541ce0244 (diff)
writing state to disk now, and handle_init installs disk state, if vector clocks match other nodes in cluster. Tests Needed
-rw-r--r--src/mem3.erl109
1 files changed, 83 insertions, 26 deletions
diff --git a/src/mem3.erl b/src/mem3.erl
index bc2b1d7e..cb69d27d 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -35,6 +35,7 @@
-include("../include/common.hrl").
-define(SERVER, membership).
+-define(STATE_FILE_PREFIX, "membership").
%% types - stick somewhere in includes?
-type join_type() :: first | new | replace.
@@ -154,6 +155,8 @@ init(Args) ->
Config = get_config(Args),
Test = proplists:get_value(test, Args),
OldState = read_latest_state_file(Test, Config),
+ showroom_log:message(info, "membership: membership server starting...", []),
+ net_kernel:monitor_nodes(true),
State = handle_init(Test, OldState),
{ok, State#mem{args=Args}}.
@@ -164,7 +167,6 @@ handle_call({join, JoinType, ExtNodes}, _From,
Config = get_config(Args),
try
NewState = handle_join(JoinType, ExtNodes, State, Config),
- gossip(NewState),
{reply, ok, NewState}
catch _:Error ->
showroom_log:message(error, "~p", [Error]),
@@ -191,9 +193,9 @@ handle_call(reset, _From, #mem{args=Args} = State) ->
end;
%% nodes
-handle_call(nodes, _From, #mem{nodes=NodeList} = State) ->
- {_,Nodes,_} = lists:unzip3(NodeList),
- {reply, {ok, Nodes}, State};
+handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
+ {_,NodeList,_} = lists:unzip3(Nodes),
+ {reply, {ok, NodeList}, State};
%% ignored call
handle_call(Msg, _From, State) ->
@@ -214,7 +216,7 @@ handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) ->
merged ->
showroom_log:message(info, "membership: merged new gossip: ~p",
[MergedState]),
- update_cache(MergedState),
+ new_state(MergedState),
gossip(MergedState),
{noreply, MergedState}
end;
@@ -276,27 +278,34 @@ get_config(Args) ->
% 4. replacing a node in an existing cluster
handle_init(Test, nil) ->
- showroom_log:message(info, "membership: membership server starting...", []),
- net_kernel:monitor_nodes(true),
int_reset(Test);
-handle_init(_Test, _OldState) ->
- ?debugHere,
+handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
% there's an old state, let's try to rejoin automatically
- % but only if we can compare our old state to all other
- % available nodes and get a match... otherwise get a human involved
- % TODO implement me
- #mem{}.
+ % but only if we can compare our old state to other available
+ % nodes and get a match... otherwise get a human involved
+ {_, NodeList, _} = lists:unzip3(Nodes),
+ ping_all_yall(NodeList),
+ {RemoteStates, _BadNodes} = get_remote_states(NodeList),
+ Test = proplists:get_value(test, Args),
+ case compare_state_with_rest(OldState, RemoteStates) of
+ match ->
+ showroom_log:message(info, "membership: rejoined successfully", []),
+ OldState;
+ Other ->
+ showroom_log:message(error, "membership: rejoin failed: ~p", [Other]),
+ int_reset(Test)
+ end.
%% handle join activities, return NewState
handle_join(first, ExtNodes, State, Config) ->
{_,Nodes,_} = lists:unzip3(ExtNodes),
ping_all_yall(Nodes),
- join(first, ExtNodes, State, Config);
+ int_join(first, ExtNodes, State, Config);
handle_join(new, ExtNodes, State, Config) ->
- join(new, ExtNodes, State, Config);
+ int_join(new, ExtNodes, State, Config);
handle_join(replace, [_OldNode | _], _State, _Config) ->
% TODO implement me
@@ -307,12 +316,12 @@ handle_join(JoinType, _, _, _) ->
{error, {unknown_join_type, JoinType}}.
-join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State,
+int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State,
Config) ->
- {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
- update_cache(Pmap, Fullmap),
NewClock = vector_clock:increment(Node, Clock),
- State#mem{nodes=ExtNodes, clock=NewClock}.
+ NewState = State#mem{nodes=ExtNodes, clock=NewClock},
+ {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
+ new_state(NewState, Pmap, Fullmap, Config).
gossip(#mem{args=Args} = NewState) ->
@@ -337,12 +346,13 @@ find_latest_state_filename(Config) ->
Dir = Config#config.directory,
case file:list_dir(Dir) of
{ok, Filenames} ->
- Timestamps = [list_to_integer(TS) || {"state", TS} <-
+ Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <-
[list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]],
SortedTimestamps = lists:reverse(lists:sort(Timestamps)),
case SortedTimestamps of
[Latest | _] ->
- {ok, Dir ++ "/state." ++ integer_to_list(Latest)};
+ {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++
+ integer_to_list(Latest)};
_ ->
throw({error, mem_state_file_not_found})
end;
@@ -356,8 +366,10 @@ read_latest_state_file(undefined, Config) ->
try
{ok, File} = find_latest_state_filename(Config),
case file:consult(File) of
- {ok, #mem{}=State} -> State;
- _Else -> throw({error, bad_mem_state_file})
+ {ok, [#mem{}=State]} -> State;
+ Else ->
+ ?debugFmt("~nElse: ~p~n", [Else]),
+ throw({error, bad_mem_state_file})
end
catch _:Error ->
showroom_log:message(info, "membership: ~p", [Error]),
@@ -367,6 +379,21 @@ read_latest_state_file(_, _) ->
nil.
+%% @doc save the state file to disk, with current timestamp.
+%% thx to riak_ring_manager:do_write_ringfile/1
+save_state_file(State, Config) ->
+ Dir = Config#config.directory,
+ {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
+ TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
+ [Year, Month, Day, Hour, Minute, Second]),
+ FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS,
+ ?debugFmt("~nFilename: ~s~n", [FN]),
+ ok = filelib:ensure_dir(FN),
+ {ok, File} = file:open(FN, [binary, write]),
+ io:format(File, "~w.~n", [State]),
+ file:close(File).
+
+
%% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap}
%% This is basically replaying all the mem events that have happened.
create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) ->
@@ -398,13 +425,22 @@ make_fullmap(PMap, Config) ->
NodeParts.
-%% cache table helper functions
-update_cache(#mem{nodes=Nodes, args=Args}) ->
+%% @doc tasks associated with a new state
+new_state(#mem{nodes=Nodes, args=Args} = State) ->
Config = get_config(Args),
{Pmap, Fullmap} = create_maps(Config, first, Nodes, []),
- update_cache(Pmap, Fullmap).
+ new_state(State, Pmap, Fullmap, Config).
+%% @doc tasks associated with a new state
+new_state(State, Pmap, Fullmap, Config) ->
+ update_cache(Pmap, Fullmap),
+ save_state_file(State, Config),
+ gossip(State),
+ State.
+
+
+%% cache table helper function
update_cache(Pmap, Fullmap) ->
mochiglobal:put(pmap, Pmap),
mochiglobal:put(fullmap, Fullmap).
@@ -476,3 +512,24 @@ merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) ->
ping_all_yall(Nodes) ->
lists:map(fun(Node) -> net_adm:ping(Node) end, Nodes).
+
+
+get_remote_states(NodeList) ->
+ NodeList1 = lists:delete(node(), NodeList),
+ {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000),
+ {_Status, States2} = lists:unzip(States1),
+ {States2, BadNodes}.
+
+
+%% @doc compare state with states based on vector clock
+%% return match | {bad_state_match, Node, NodesThatDontMatch}
+compare_state_with_rest(#mem{node=Node, clock=Clock} = _State, States) ->
+ Results = lists:map(fun(#mem{node=Node1, clock=Clock1}) ->
+ {vector_clock:equals(Clock, Clock1), Node1}
+ end, States),
+ BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn;
+ ({false, N}, AccIn) -> [N | AccIn] end, [], Results),
+ if
+ length(BadResults) == 0 -> match;
+ true -> {bad_state_match, Node, BadResults}
+ end.