summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-03-09 11:54:53 -0800
committerBrad Anderson <brad@cloudant.com>2010-05-09 22:56:22 -0400
commite522c645e3d34403ae64193db3422ad8c574d256 (patch)
tree73d6a15c2ad38ea6db427a5c6f009ff68fd9309c
parentcb079ba3ee4496b97e73e9fa08525878b88de413 (diff)
moving mem3 into rebar position
-rw-r--r--src/mem3.erl204
1 files changed, 204 insertions, 0 deletions
diff --git a/src/mem3.erl b/src/mem3.erl
new file mode 100644
index 00000000..6f53ed23
--- /dev/null
+++ b/src/mem3.erl
@@ -0,0 +1,204 @@
+
+-module(mem3).
+-author('brad@cloudant.com').
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/2, start_link/3, stop/0, stop/1]).
+-export([clock/0, state/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% includes
+-include("../include/config.hrl").
+-include("../include/common.hrl").
+
+
+%%====================================================================
+%% API
+%%====================================================================
+
+start_link(Node, ErlNodes) ->
+ start_link(Node, ErlNodes, []).
+
+
+start_link(Node, ErlNodes, Args) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [Node, ErlNodes, Args], []).
+
+
+stop() ->
+ stop(?MODULE).
+
+
+stop(Server) ->
+ gen_server:cast(Server, stop).
+
+
+clock() ->
+ gen_server:call(?MODULE, clock).
+
+
+state() ->
+ gen_server:call(?MODULE, state).
+
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%% start up membership server
+init([Node, Nodes, Args]) ->
+ process_flag(trap_exit,true),
+ showroom_log:message(info, "membership: membership server starting...", []),
+ net_kernel:monitor_nodes(true),
+ Options = lists:flatten(Args),
+ Config = configuration:get_config(),
+ OldState = read_latest_state_file(Config),
+ State = handle_init(Node, Nodes, Options, OldState, Config),
+ {ok, State}.
+
+
+%% new node joining to this node
+handle_call({join, _JoiningNode, _Options}, _From, State) ->
+ {reply, ok, State};
+
+%% clock
+handle_call(clock, _From, State = #mem{clock=Clock}) ->
+ {reply, Clock, State};
+
+%% state
+handle_call(state, _From, State) ->
+ {reply, State, State};
+
+%% ignored call
+handle_call(Msg, _From, State) ->
+ showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
+ {reply, ignored, State}.
+
+
+%% stop
+handle_cast(stop, State) ->
+ {stop, normal, State};
+
+%% ignored cast
+handle_cast(Msg, State) ->
+ showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
+ {noreply, State}.
+
+
+%% @doc handle nodedown messages because we have
+%% net_kernel:monitor_nodes(true)
+handle_info({nodedown, Node}, State) ->
+ showroom_log:message(alert, "membership: nodedown from ~p", [Node]),
+ {noreply, State};
+
+%% @doc handle nodeup messages because we have
+%% net_kernel:monitor_nodes(true)
+handle_info({nodeup, Node}, State) ->
+ showroom_log:message(alert, "membership: nodeup Node: ~p", [Node]),
+ {noreply, State};
+
+%% ignored info
+handle_info(Info, State) ->
+ showroom_log:message(info, "membership: ignored info: ~p", [Info]),
+ {noreply, State}.
+
+
+% terminate
+terminate(_Reason, _State) ->
+ ok.
+
+
+% ignored code change
+code_change(OldVsn, State, _Extra) ->
+ io:format("Unknown Old Version!~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
+ {ok, State}.
+
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+% we could be:
+% 1. starting fresh node into a fresh cluster (we're one of first nodes)
+% 2. starting fresh node into an existing cluster (need to join)
+% 3. rejoining a cluster after some downtime
+% 4. replacing a node in an existing cluster
+
+handle_init(Node, [], nil, Options, Config) ->
+ % no other erlang nodes, no old state
+ Hints = proplists:get_value(hints, Options),
+ Map = create_map(Config, [{Node, Hints}]),
+ ?debugFmt("~nmap: ~p~n", [Map]);
+
+handle_init(_Node, [], _OldState, _Options, _Config) ->
+ % no other erlang nodes, old state
+ % network partition?
+ ok;
+
+handle_init(_Node, _ErlNodes, nil, _Options, _Config) ->
+ % other erlang nodes, no old state
+ ok;
+
+handle_init(_Node, _ErlNodes, _OldState, _Options, _Config) ->
+ % other erlang nodes, old state
+ % network partition?
+ ok.
+
+
+find_latest_state_filename(Config) ->
+ ?debugFmt("~nConfig: ~p~n", [Config]),
+ Dir = Config#config.directory,
+ case file:list_dir(Dir) of
+ {ok, Filenames} ->
+ Timestamps = [list_to_integer(TS) || {"state", TS} <-
+ [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]],
+ SortedTimestamps = lists:reverse(lists:sort(Timestamps)),
+ case SortedTimestamps of
+ [Latest | _] ->
+ {ok, Dir ++ "/state." ++ integer_to_list(Latest)};
+ _ ->
+ throw({error, not_found})
+ end;
+ {error, Reason} ->
+ throw({error, Reason})
+ end.
+
+
+read_latest_state_file(Config) ->
+ try
+ {ok, File} = find_latest_state_filename(Config),
+ case file:consult(File) of
+ {ok, #mem{}=State} -> State;
+ _Else -> throw({error, bad_mem_state_file})
+ end
+ catch
+ _:Error ->
+ showroom_log:message(info, "membership: ~p", [Error]),
+ nil
+ end.
+
+
+%% @doc given Config and a list of Nodes, construct a Fullmap
+create_map(#config{q=Q}, Nodes) ->
+ [{FirstNode,_}|_] = Nodes,
+ Pmap = lists:foldl(fun({Node, Hints}, Map) ->
+ partitions:join(Node, Map, Hints)
+ end, partitions:create(Q, FirstNode), Nodes),
+ make_fullmap(Pmap).
+
+
+%% @doc construct a table with all partitions, with the primary node and all
+%% replication partner nodes as well.
+make_fullmap(PMap) ->
+ {Nodes, _Parts} = lists:unzip(PMap),
+ NodeParts = lists:flatmap(
+ fun({Node,Part}) ->
+ Partners = replication:partners(Node, lists:usort(Nodes)),
+ PartnerList = [{Partner, Part, partner} || Partner <- Partners],
+ [{Node, Part, primary} | PartnerList]
+ end, PMap),
+ NodeParts.