diff options
author | Brad Anderson <brad@cloudant.com> | 2010-04-16 23:40:13 -0400 |
---|---|---|
committer | Brad Anderson <brad@cloudant.com> | 2010-05-09 22:56:24 -0400 |
commit | f23371c2ec884628e73abd783c3beedfaa25d490 (patch) | |
tree | 65ea906b0387dd1fcd5adbb67c60ca09ea9336ce | |
parent | 75005e7dfc78bc9232e1364799a03ad541ce0244 (diff) |
writing state to disk now, and handle_init installs disk state, if vector clocks match other nodes in cluster. Tests Needed
-rw-r--r-- | src/mem3.erl | 109 |
1 files changed, 83 insertions, 26 deletions
diff --git a/src/mem3.erl b/src/mem3.erl index bc2b1d7e..cb69d27d 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -35,6 +35,7 @@ -include("../include/common.hrl"). -define(SERVER, membership). +-define(STATE_FILE_PREFIX, "membership"). %% types - stick somewhere in includes? -type join_type() :: first | new | replace. @@ -154,6 +155,8 @@ init(Args) -> Config = get_config(Args), Test = proplists:get_value(test, Args), OldState = read_latest_state_file(Test, Config), + showroom_log:message(info, "membership: membership server starting...", []), + net_kernel:monitor_nodes(true), State = handle_init(Test, OldState), {ok, State#mem{args=Args}}. @@ -164,7 +167,6 @@ handle_call({join, JoinType, ExtNodes}, _From, Config = get_config(Args), try NewState = handle_join(JoinType, ExtNodes, State, Config), - gossip(NewState), {reply, ok, NewState} catch _:Error -> showroom_log:message(error, "~p", [Error]), @@ -191,9 +193,9 @@ handle_call(reset, _From, #mem{args=Args} = State) -> end; %% nodes -handle_call(nodes, _From, #mem{nodes=NodeList} = State) -> - {_,Nodes,_} = lists:unzip3(NodeList), - {reply, {ok, Nodes}, State}; +handle_call(nodes, _From, #mem{nodes=Nodes} = State) -> + {_,NodeList,_} = lists:unzip3(Nodes), + {reply, {ok, NodeList}, State}; %% ignored call handle_call(Msg, _From, State) -> @@ -214,7 +216,7 @@ handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) -> merged -> showroom_log:message(info, "membership: merged new gossip: ~p", [MergedState]), - update_cache(MergedState), + new_state(MergedState), gossip(MergedState), {noreply, MergedState} end; @@ -276,27 +278,34 @@ get_config(Args) -> % 4. replacing a node in an existing cluster handle_init(Test, nil) -> - showroom_log:message(info, "membership: membership server starting...", []), - net_kernel:monitor_nodes(true), int_reset(Test); -handle_init(_Test, _OldState) -> - ?debugHere, +handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) -> % there's an old state, let's try to rejoin automatically - % but only if we can compare our old state to all other - % available nodes and get a match... otherwise get a human involved - % TODO implement me - #mem{}. + % but only if we can compare our old state to other available + % nodes and get a match... otherwise get a human involved + {_, NodeList, _} = lists:unzip3(Nodes), + ping_all_yall(NodeList), + {RemoteStates, _BadNodes} = get_remote_states(NodeList), + Test = proplists:get_value(test, Args), + case compare_state_with_rest(OldState, RemoteStates) of + match -> + showroom_log:message(info, "membership: rejoined successfully", []), + OldState; + Other -> + showroom_log:message(error, "membership: rejoin failed: ~p", [Other]), + int_reset(Test) + end. %% handle join activities, return NewState handle_join(first, ExtNodes, State, Config) -> {_,Nodes,_} = lists:unzip3(ExtNodes), ping_all_yall(Nodes), - join(first, ExtNodes, State, Config); + int_join(first, ExtNodes, State, Config); handle_join(new, ExtNodes, State, Config) -> - join(new, ExtNodes, State, Config); + int_join(new, ExtNodes, State, Config); handle_join(replace, [_OldNode | _], _State, _Config) -> % TODO implement me @@ -307,12 +316,12 @@ handle_join(JoinType, _, _, _) -> {error, {unknown_join_type, JoinType}}. -join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State, +int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State, Config) -> - {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes), - update_cache(Pmap, Fullmap), NewClock = vector_clock:increment(Node, Clock), - State#mem{nodes=ExtNodes, clock=NewClock}. + NewState = State#mem{nodes=ExtNodes, clock=NewClock}, + {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes), + new_state(NewState, Pmap, Fullmap, Config). gossip(#mem{args=Args} = NewState) -> @@ -337,12 +346,13 @@ find_latest_state_filename(Config) -> Dir = Config#config.directory, case file:list_dir(Dir) of {ok, Filenames} -> - Timestamps = [list_to_integer(TS) || {"state", TS} <- + Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <- [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]], SortedTimestamps = lists:reverse(lists:sort(Timestamps)), case SortedTimestamps of [Latest | _] -> - {ok, Dir ++ "/state." ++ integer_to_list(Latest)}; + {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ + integer_to_list(Latest)}; _ -> throw({error, mem_state_file_not_found}) end; @@ -356,8 +366,10 @@ read_latest_state_file(undefined, Config) -> try {ok, File} = find_latest_state_filename(Config), case file:consult(File) of - {ok, #mem{}=State} -> State; - _Else -> throw({error, bad_mem_state_file}) + {ok, [#mem{}=State]} -> State; + Else -> + ?debugFmt("~nElse: ~p~n", [Else]), + throw({error, bad_mem_state_file}) end catch _:Error -> showroom_log:message(info, "membership: ~p", [Error]), @@ -367,6 +379,21 @@ read_latest_state_file(_, _) -> nil. +%% @doc save the state file to disk, with current timestamp. +%% thx to riak_ring_manager:do_write_ringfile/1 +save_state_file(State, Config) -> + Dir = Config#config.directory, + {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(), + TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B", + [Year, Month, Day, Hour, Minute, Second]), + FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS, + ?debugFmt("~nFilename: ~s~n", [FN]), + ok = filelib:ensure_dir(FN), + {ok, File} = file:open(FN, [binary, write]), + io:format(File, "~w.~n", [State]), + file:close(File). + + %% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap} %% This is basically replaying all the mem events that have happened. create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) -> @@ -398,13 +425,22 @@ make_fullmap(PMap, Config) -> NodeParts. -%% cache table helper functions -update_cache(#mem{nodes=Nodes, args=Args}) -> +%% @doc tasks associated with a new state +new_state(#mem{nodes=Nodes, args=Args} = State) -> Config = get_config(Args), {Pmap, Fullmap} = create_maps(Config, first, Nodes, []), - update_cache(Pmap, Fullmap). + new_state(State, Pmap, Fullmap, Config). +%% @doc tasks associated with a new state +new_state(State, Pmap, Fullmap, Config) -> + update_cache(Pmap, Fullmap), + save_state_file(State, Config), + gossip(State), + State. + + +%% cache table helper function update_cache(Pmap, Fullmap) -> mochiglobal:put(pmap, Pmap), mochiglobal:put(fullmap, Fullmap). @@ -476,3 +512,24 @@ merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) -> ping_all_yall(Nodes) -> lists:map(fun(Node) -> net_adm:ping(Node) end, Nodes). + + +get_remote_states(NodeList) -> + NodeList1 = lists:delete(node(), NodeList), + {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000), + {_Status, States2} = lists:unzip(States1), + {States2, BadNodes}. + + +%% @doc compare state with states based on vector clock +%% return match | {bad_state_match, Node, NodesThatDontMatch} +compare_state_with_rest(#mem{node=Node, clock=Clock} = _State, States) -> + Results = lists:map(fun(#mem{node=Node1, clock=Clock1}) -> + {vector_clock:equals(Clock, Clock1), Node1} + end, States), + BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn; + ({false, N}, AccIn) -> [N | AccIn] end, [], Results), + if + length(BadResults) == 0 -> match; + true -> {bad_state_match, Node, BadResults} + end. |