diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/mem3.erl | 109 | 
1 files changed, 83 insertions, 26 deletions
diff --git a/src/mem3.erl b/src/mem3.erl index bc2b1d7e..cb69d27d 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -35,6 +35,7 @@  -include("../include/common.hrl").  -define(SERVER, membership). +-define(STATE_FILE_PREFIX, "membership").  %% types - stick somewhere in includes?  -type join_type() :: first | new | replace. @@ -154,6 +155,8 @@ init(Args) ->      Config = get_config(Args),      Test = proplists:get_value(test, Args),      OldState = read_latest_state_file(Test, Config), +    showroom_log:message(info, "membership: membership server starting...", []), +    net_kernel:monitor_nodes(true),      State = handle_init(Test, OldState),      {ok, State#mem{args=Args}}. @@ -164,7 +167,6 @@ handle_call({join, JoinType, ExtNodes}, _From,      Config = get_config(Args),      try          NewState = handle_join(JoinType, ExtNodes, State, Config), -        gossip(NewState),          {reply, ok, NewState}      catch _:Error ->          showroom_log:message(error, "~p", [Error]), @@ -191,9 +193,9 @@ handle_call(reset, _From, #mem{args=Args} = State) ->      end;  %% nodes -handle_call(nodes, _From, #mem{nodes=NodeList} = State) -> -    {_,Nodes,_} = lists:unzip3(NodeList), -    {reply, {ok, Nodes}, State}; +handle_call(nodes, _From, #mem{nodes=Nodes} = State) -> +    {_,NodeList,_} = lists:unzip3(Nodes), +    {reply, {ok, NodeList}, State};  %% ignored call  handle_call(Msg, _From, State) -> @@ -214,7 +216,7 @@ handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) ->      merged ->          showroom_log:message(info, "membership: merged new gossip: ~p",                               [MergedState]), -        update_cache(MergedState), +        new_state(MergedState),          gossip(MergedState),          {noreply, MergedState}      end; @@ -276,27 +278,34 @@ get_config(Args) ->  %  4. replacing a node in an existing cluster  handle_init(Test, nil) -> -    showroom_log:message(info, "membership: membership server starting...", []), -    net_kernel:monitor_nodes(true),      int_reset(Test); -handle_init(_Test, _OldState) -> -    ?debugHere, +handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->      % there's an old state, let's try to rejoin automatically -    %  but only if we can compare our old state to all other -    %  available nodes and get a match... otherwise get a human involved -    % TODO implement me -    #mem{}. +    %  but only if we can compare our old state to other available +    %  nodes and get a match... otherwise get a human involved +    {_, NodeList, _} = lists:unzip3(Nodes), +    ping_all_yall(NodeList), +    {RemoteStates, _BadNodes} = get_remote_states(NodeList), +    Test = proplists:get_value(test, Args), +    case compare_state_with_rest(OldState, RemoteStates) of +    match -> +        showroom_log:message(info, "membership: rejoined successfully", []), +        OldState; +    Other -> +        showroom_log:message(error, "membership: rejoin failed: ~p", [Other]), +        int_reset(Test) +    end.  %% handle join activities, return NewState  handle_join(first, ExtNodes, State, Config) ->      {_,Nodes,_} = lists:unzip3(ExtNodes),      ping_all_yall(Nodes), -    join(first, ExtNodes, State, Config); +    int_join(first, ExtNodes, State, Config);  handle_join(new, ExtNodes, State, Config) -> -    join(new, ExtNodes, State, Config); +    int_join(new, ExtNodes, State, Config);  handle_join(replace, [_OldNode | _], _State, _Config) ->      % TODO implement me @@ -307,12 +316,12 @@ handle_join(JoinType, _, _, _) ->      {error, {unknown_join_type, JoinType}}. -join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State, +int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State,       Config) -> -    {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes), -    update_cache(Pmap, Fullmap),      NewClock = vector_clock:increment(Node, Clock), -    State#mem{nodes=ExtNodes, clock=NewClock}. +    NewState = State#mem{nodes=ExtNodes, clock=NewClock}, +    {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes), +    new_state(NewState, Pmap, Fullmap, Config).  gossip(#mem{args=Args} = NewState) -> @@ -337,12 +346,13 @@ find_latest_state_filename(Config) ->      Dir = Config#config.directory,      case file:list_dir(Dir) of      {ok, Filenames} -> -        Timestamps = [list_to_integer(TS) || {"state", TS} <- +        Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <-             [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]],          SortedTimestamps = lists:reverse(lists:sort(Timestamps)),          case SortedTimestamps of          [Latest | _] -> -            {ok, Dir ++ "/state." ++ integer_to_list(Latest)}; +            {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ +             integer_to_list(Latest)};          _ ->              throw({error, mem_state_file_not_found})          end; @@ -356,8 +366,10 @@ read_latest_state_file(undefined, Config) ->      try          {ok, File} = find_latest_state_filename(Config),          case file:consult(File) of -        {ok, #mem{}=State} -> State; -        _Else -> throw({error, bad_mem_state_file}) +        {ok, [#mem{}=State]} -> State; +        Else -> +                ?debugFmt("~nElse: ~p~n", [Else]), +                throw({error, bad_mem_state_file})          end      catch _:Error ->          showroom_log:message(info, "membership: ~p", [Error]), @@ -367,6 +379,21 @@ read_latest_state_file(_, _) ->      nil. +%% @doc save the state file to disk, with current timestamp. +%%      thx to riak_ring_manager:do_write_ringfile/1 +save_state_file(State, Config) -> +    Dir = Config#config.directory, +    {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(), +    TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B", +                       [Year, Month, Day, Hour, Minute, Second]), +    FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS, +    ?debugFmt("~nFilename: ~s~n", [FN]), +    ok = filelib:ensure_dir(FN), +    {ok, File} = file:open(FN, [binary, write]), +    io:format(File, "~w.~n", [State]), +    file:close(File). + +  %% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap}  %%      This is basically replaying all the mem events that have happened.  create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) -> @@ -398,13 +425,22 @@ make_fullmap(PMap, Config) ->      NodeParts. -%% cache table helper functions -update_cache(#mem{nodes=Nodes, args=Args}) -> +%% @doc tasks associated with a new state +new_state(#mem{nodes=Nodes, args=Args} = State) ->      Config = get_config(Args),      {Pmap, Fullmap} = create_maps(Config, first, Nodes, []), -    update_cache(Pmap, Fullmap). +    new_state(State, Pmap, Fullmap, Config). +%% @doc tasks associated with a new state +new_state(State, Pmap, Fullmap, Config) -> +    update_cache(Pmap, Fullmap), +    save_state_file(State, Config), +    gossip(State), +    State. + + +%% cache table helper function  update_cache(Pmap, Fullmap) ->      mochiglobal:put(pmap, Pmap),      mochiglobal:put(fullmap, Fullmap). @@ -476,3 +512,24 @@ merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) ->  ping_all_yall(Nodes) ->      lists:map(fun(Node) -> net_adm:ping(Node) end, Nodes). + + +get_remote_states(NodeList) -> +    NodeList1 = lists:delete(node(), NodeList), +    {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000), +    {_Status, States2} = lists:unzip(States1), +    {States2, BadNodes}. + + +%% @doc compare state with states based on vector clock +%%      return match | {bad_state_match, Node, NodesThatDontMatch} +compare_state_with_rest(#mem{node=Node, clock=Clock} = _State, States) -> +    Results = lists:map(fun(#mem{node=Node1, clock=Clock1}) -> +        {vector_clock:equals(Clock, Clock1), Node1} +    end, States), +    BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn; +        ({false, N}, AccIn) -> [N | AccIn] end, [], Results), +    if +    length(BadResults) == 0 -> match; +    true -> {bad_state_match, Node, BadResults} +    end.  | 
