summaryrefslogtreecommitdiff
path: root/src/mem3.erl
blob: 6f53ed231422adefd15d8b7216c87480e6d2a675 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204

-module(mem3).
-author('brad@cloudant.com').

-behaviour(gen_server).

%% API
-export([start_link/2, start_link/3, stop/0, stop/1]).
-export([clock/0, state/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).

%% includes
-include("../include/config.hrl").
-include("../include/common.hrl").


%%====================================================================
%% API
%%====================================================================

start_link(Node, ErlNodes) ->
    start_link(Node, ErlNodes, []).


start_link(Node, ErlNodes, Args) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [Node, ErlNodes, Args], []).


stop() ->
    stop(?MODULE).


stop(Server) ->
    gen_server:cast(Server, stop).


clock() ->
    gen_server:call(?MODULE, clock).


state() ->
    gen_server:call(?MODULE, state).


%%====================================================================
%% gen_server callbacks
%%====================================================================

%% start up membership server
init([Node, Nodes, Args]) ->
    process_flag(trap_exit,true),
    showroom_log:message(info, "membership: membership server starting...", []),
    net_kernel:monitor_nodes(true),
    Options = lists:flatten(Args),
    Config = configuration:get_config(),
    OldState = read_latest_state_file(Config),
    State = handle_init(Node, Nodes, Options, OldState, Config),
    {ok, State}.


%% new node joining to this node
handle_call({join, _JoiningNode, _Options}, _From, State) ->
    {reply, ok, State};

%% clock
handle_call(clock, _From, State = #mem{clock=Clock}) ->
    {reply, Clock, State};

%% state
handle_call(state, _From, State) ->
    {reply, State, State};

%% ignored call
handle_call(Msg, _From, State) ->
    showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
    {reply, ignored, State}.


%% stop
handle_cast(stop, State) ->
    {stop, normal, State};

%% ignored cast
handle_cast(Msg, State) ->
    showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
    {noreply, State}.


%% @doc handle nodedown messages because we have
%%      net_kernel:monitor_nodes(true)
handle_info({nodedown, Node}, State) ->
    showroom_log:message(alert, "membership: nodedown from ~p", [Node]),
    {noreply, State};

%% @doc handle nodeup messages because we have
%%      net_kernel:monitor_nodes(true)
handle_info({nodeup, Node}, State) ->
    showroom_log:message(alert, "membership: nodeup Node: ~p", [Node]),
    {noreply, State};

%% ignored info
handle_info(Info, State) ->
    showroom_log:message(info, "membership: ignored info: ~p", [Info]),
    {noreply, State}.


% terminate
terminate(_Reason, _State) ->
    ok.


% ignored code change
code_change(OldVsn, State, _Extra) ->
    io:format("Unknown Old Version!~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
    {ok, State}.


%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

% we could be:
%  1. starting fresh node into a fresh cluster (we're one of first nodes)
%  2. starting fresh node into an existing cluster (need to join)
%  3. rejoining a cluster after some downtime
%  4. replacing a node in an existing cluster

handle_init(Node, [], nil, Options, Config) ->
    % no other erlang nodes, no old state
    Hints = proplists:get_value(hints, Options),
    Map = create_map(Config, [{Node, Hints}]),
    ?debugFmt("~nmap: ~p~n", [Map]);

handle_init(_Node, [], _OldState, _Options, _Config) ->
    % no other erlang nodes, old state
    % network partition?
    ok;

handle_init(_Node, _ErlNodes, nil, _Options, _Config) ->
    % other erlang nodes, no old state
    ok;

handle_init(_Node, _ErlNodes, _OldState, _Options, _Config) ->
    % other erlang nodes, old state
    % network partition?
    ok.


find_latest_state_filename(Config) ->
    ?debugFmt("~nConfig: ~p~n", [Config]),
    Dir = Config#config.directory,
    case file:list_dir(Dir) of
    {ok, Filenames} ->
        Timestamps = [list_to_integer(TS) || {"state", TS} <-
           [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]],
        SortedTimestamps = lists:reverse(lists:sort(Timestamps)),
        case SortedTimestamps of
        [Latest | _] ->
            {ok, Dir ++ "/state." ++ integer_to_list(Latest)};
        _ ->
            throw({error, not_found})
        end;
    {error, Reason} ->
        throw({error, Reason})
    end.


read_latest_state_file(Config) ->
    try
        {ok, File} = find_latest_state_filename(Config),
        case file:consult(File) of
        {ok, #mem{}=State} -> State;
        _Else -> throw({error, bad_mem_state_file})
        end
    catch
        _:Error ->
            showroom_log:message(info, "membership: ~p", [Error]),
            nil
    end.


%% @doc given Config and a list of Nodes, construct a Fullmap
create_map(#config{q=Q}, Nodes) ->
    [{FirstNode,_}|_] = Nodes,
    Pmap = lists:foldl(fun({Node, Hints}, Map) ->
        partitions:join(Node, Map, Hints)
    end, partitions:create(Q, FirstNode), Nodes),
    make_fullmap(Pmap).


%% @doc construct a table with all partitions, with the primary node and all
%%      replication partner nodes as well.
make_fullmap(PMap) ->
  {Nodes, _Parts} = lists:unzip(PMap),
  NodeParts = lists:flatmap(
    fun({Node,Part}) ->
        Partners = replication:partners(Node, lists:usort(Nodes)),
        PartnerList = [{Partner, Part, partner} || Partner <- Partners],
        [{Node, Part, primary} | PartnerList]
    end, PMap),
  NodeParts.