summaryrefslogtreecommitdiff
path: root/src/mem3_server.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-30 16:23:38 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-12 01:22:33 -0400
commit48c8fde34591f782be7af77575eaa02dab8659b3 (patch)
treebb54c0d61a660b9ba019f9b187e901ede7132bfa /src/mem3_server.erl
parent8a09581aa2252f53047fa0e9e95591eaae4556c9 (diff)
standardize mem3 naming. app is horribly broken for now
Diffstat (limited to 'src/mem3_server.erl')
-rw-r--r--src/mem3_server.erl568
1 files changed, 568 insertions, 0 deletions
diff --git a/src/mem3_server.erl b/src/mem3_server.erl
new file mode 100644
index 00000000..863e752f
--- /dev/null
+++ b/src/mem3_server.erl
@@ -0,0 +1,568 @@
+%%% membership module
+%%%
+%%% State of the gen_server is a #mem record
+%%%
+%%% Nodes and Gossip are the same thing, and are a list of three-tuples like:
+%%%
+%%% [ {Pos,NodeName,Options} | _ ]
+%%%
+%%% Position is 1-based incrementing in order of node joining
+%%%
+%%% Options is a proplist, with [{hints, [Part1|_]}] denoting that the node
+%%% is responsible for the extra partitions too.
+%%%
+%%% TODO: dialyzer type specs
+%%%
+-module(mem3_server).
+-author('brad@cloudant.com').
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0, start_link/1, stop/0, stop/1, reset/0]).
+-export([join/3, clock/0, state/0, states/0, nodes/0, fullnodes/0,
+ start_gossip/0]).
+
+%% for testing more than anything else
+-export([merge_nodes/2, next_up_node/1, next_up_node/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% includes
+-include("mem3.hrl").
+
+-define(SERVER, membership).
+-define(STATE_FILE_PREFIX, "membership").
+
+
+%%====================================================================
+%% API
+%%====================================================================
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+ start_link([]).
+
+
+-spec start_link(args()) -> {ok, pid()}.
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []).
+
+
+-spec stop() -> ok.
+stop() ->
+ stop(?MODULE).
+
+
+-spec stop(atom()) -> ok.
+stop(Server) ->
+ gen_server:cast(Server, stop).
+
+
+-spec join(join_type(), mem_node_list() | {node(), options()}, node() | nil) ->
+ ok.
+join(JoinType, Payload, PingNode) ->
+ gen_server:call(?SERVER, {join, JoinType, Payload, PingNode}).
+
+
+-spec clock() -> vector_clock().
+clock() ->
+ gen_server:call(?SERVER, clock).
+
+
+-spec state() -> mem_state().
+state() ->
+ gen_server:call(?SERVER, state).
+
+
+%% @doc Detailed report of cluster-wide membership state. Queries the state
+%% on all member nodes and builds a dictionary with unique states as the
+%% key and the nodes holding that state as the value. Also reports member
+%% nodes which fail to respond and nodes which are connected but are not
+%% cluster members. Useful for debugging.
+-spec states() -> [{mem_state() | bad_nodes | non_member_nodes, [node()]}].
+states() ->
+ {ok, Nodes} = mem3:nodes(),
+ AllNodes = [node()|erlang:nodes()],
+ {Replies, BadNodes} = gen_server:multi_call(Nodes, ?SERVER, state),
+ Dict = lists:foldl(fun({Node, {ok,State}}, D) ->
+ orddict:append(State, Node, D)
+ end, orddict:new(), Replies),
+ [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
+
+-spec start_gossip() -> ok.
+start_gossip() ->
+ gen_server:call(?SERVER, start_gossip).
+
+
+-spec reset() -> ok | not_reset.
+reset() ->
+ gen_server:call(?SERVER, reset).
+
+
+%% @doc get the list of cluster nodes (according to membership module)
+%% This may differ from erlang:nodes()
+%% Guaranteed to be in order of State's node list (1st elem in 3-tuple)
+-spec nodes() -> {ok, [node()]}.
+nodes() ->
+ gen_server:call(?SERVER, nodes).
+
+
+%% @doc get the list of cluster nodes (according to membership module)
+%% This may differ from erlang:nodes()
+%% Guaranteed to be in order of State's node list (1st elem in 3-tuple)
+-spec fullnodes() -> {ok, [mem_node()]}.
+fullnodes() ->
+ gen_server:call(?SERVER, fullnodes).
+
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%% start up membership server
+-spec init(args()) -> {ok, mem_state()}.
+init(Args) ->
+ process_flag(trap_exit,true),
+ Test = get_test(Args),
+ OldState = read_latest_state_file(Test),
+ showroom_log:message(info, "membership: membership server starting...", []),
+ net_kernel:monitor_nodes(true),
+ State = handle_init(Test, OldState),
+ {ok, State#mem{args=Args}}.
+
+
+%% new node(s) joining to this node
+handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) ->
+ try
+ case handle_join(JoinType, ExtNodes, PingNode, State) of
+ {ok, NewState} -> {reply, ok, NewState};
+ Other -> {reply, Other, State}
+ end
+ catch _:Error ->
+ showroom_log:message(error, "~p", [Error]),
+ {reply, Error, State}
+ end;
+
+%% clock
+handle_call(clock, _From, #mem{clock=Clock} = State) ->
+ {reply, {ok, Clock}, State};
+
+%% state
+handle_call(state, _From, State) ->
+ {reply, {ok, State}, State};
+
+%% reset - but only if we're in test mode
+handle_call(reset, _From, #mem{args=Args} = State) ->
+ Test = get_test(Args),
+ case Test of
+ undefined -> {reply, not_reset, State};
+ _ -> {reply, ok, int_reset(Test, State)}
+ end;
+
+%% nodes
+handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
+ {_,NodeList,_} = lists:unzip3(lists:keysort(1, Nodes)),
+ {reply, {ok, NodeList}, State};
+
+%% fullnodes
+handle_call(fullnodes, _From, #mem{nodes=Nodes} = State) ->
+ {reply, {ok, Nodes}, State};
+
+%% gossip
+handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) ->
+ showroom_log:message(info, "membership: received gossip from ~p",
+ [erlang:node(Pid)]),
+ handle_gossip(From, RemoteState, LocalState);
+
+% start_gossip
+handle_call(start_gossip, _From, State) ->
+ NewState = gossip(State),
+ {reply, ok, NewState};
+
+%% ignored call
+handle_call(Msg, _From, State) ->
+ showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
+ {reply, ignored, State}.
+
+
+%% gossip
+handle_cast({gossip, RemoteState}, LocalState) ->
+ State = case handle_gossip(none, RemoteState, LocalState) of
+ {reply, ok, NewState} -> NewState;
+ {reply, {new_state, NewState}, _} -> NewState;
+ {noreply, NewState} -> NewState
+ end,
+ {noreply, State};
+
+%% stop
+handle_cast(stop, State) ->
+ {stop, normal, State};
+
+%% ignored cast
+handle_cast(Msg, State) ->
+ showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
+ {noreply, State}.
+
+
+%% @doc handle nodedown messages because we have
+%% net_kernel:monitor_nodes(true)
+handle_info({nodedown, Node}, State) ->
+ showroom_log:message(alert, "membership: nodedown ~p", [Node]),
+ notify(nodedown, [Node], State),
+ {noreply, State};
+
+%% @doc handle nodeup messages because we have
+%% net_kernel:monitor_nodes(true)
+handle_info({nodeup, Node}, State) ->
+ showroom_log:message(alert, "membership: nodeup ~p", [Node]),
+ notify(nodeup, [Node], State),
+ gossip_cast(State),
+ {noreply, State};
+
+%% ignored info
+handle_info(Info, State) ->
+ showroom_log:message(info, "membership: ignored info: ~p", [Info]),
+ {noreply, State}.
+
+
+% terminate
+terminate(_Reason, _State) ->
+ ok.
+
+
+% ignored code change
+code_change(OldVsn, State, _Extra) ->
+ io:format("Unknown Old Version~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
+ {ok, State}.
+
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+%% @doc if Args has config use it, otherwise call configuration module
+%% most times Args will have config during testing runs
+%get_config(Args) ->
+% case proplists:get_value(config, Args) of
+% undefined -> configuration:get_config();
+% Any -> Any
+% end.
+
+
+get_test(Args) ->
+ proplists:get_value(test, Args).
+
+
+%% @doc handle_init starts a node
+%% Most of the time, this puts the node in a single-node cluster setup,
+%% But, we could be automatically rejoining a cluster after some downtime.
+%% See handle_join for initing, joining, leaving a cluster, or replacing a
+%% node.
+%% @end
+handle_init(Test, nil) ->
+ int_reset(Test);
+
+handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
+ % there's an old state, let's try to rejoin automatically
+ % but only if we can compare our old state to other available
+ % nodes and get a match... otherwise get a human involved
+ {_, NodeList, _} = lists:unzip3(Nodes),
+ ping_all_yall(NodeList),
+ {RemoteStates, _BadNodes} = get_remote_states(NodeList),
+ Test = get_test(Args),
+ case compare_state_with_rest(OldState, RemoteStates) of
+ match ->
+ showroom_log:message(info, "membership: rejoined successfully", []),
+ OldState;
+ Other ->
+ showroom_log:message(error, "membership: rejoin failed: ~p", [Other]),
+ int_reset(Test)
+ end.
+
+
+%% @doc handle join activities, return {ok,NewState}
+-spec handle_join(join_type(), [mem_node()], ping_node(), mem_state()) ->
+ {ok, mem_state()}.
+% init
+handle_join(init, ExtNodes, nil, State) ->
+ {_,Nodes,_} = lists:unzip3(ExtNodes),
+ ping_all_yall(Nodes),
+ int_join(ExtNodes, State);
+% join
+handle_join(join, ExtNodes, PingNode, #mem{args=Args} = State) ->
+ NewState = case get_test(Args) of
+ undefined -> get_pingnode_state(PingNode);
+ _ -> State % testing, so meh
+ end,
+ % now use this info to join the ring
+ int_join(ExtNodes, NewState);
+% replace
+handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) ->
+ handle_join(replace, {OldNode, []}, PingNode, State);
+handle_join(replace, [OldNode | _], PingNode, State) ->
+ handle_join(replace, {OldNode, []}, PingNode, State);
+handle_join(replace, {OldNode, NewOpts}, PingNode, State) ->
+ OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode),
+ {Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes),
+ NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}),
+ notify(node_leave, [OldNode], State),
+ int_join([], OldState#mem{nodes=NewNodes});
+% leave
+handle_join(leave, [OldNode | _], _PingNode, State) ->
+ % TODO implement me
+ notify(node_leave, [OldNode], State),
+ ok;
+
+handle_join(JoinType, _, PingNode, _) ->
+ showroom_log:message(info, "membership: unknown join type: ~p "
+ "for ping node: ~p", [JoinType, PingNode]),
+ {error, unknown_join_type}.
+
+%% @doc common operations for all join types
+int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) ->
+ NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
+ check_pos(Pos, N, Nodes),
+ notify(node_join, [N], State),
+ [New|AccIn]
+ end, Nodes, ExtNodes),
+ NewNodes1 = lists:sort(NewNodes),
+ NewClock = vector_clock:increment(node(), Clock),
+ NewState = State#mem{nodes=NewNodes1, clock=NewClock},
+ install_new_state(NewState),
+ {ok, NewState}.
+
+
+install_new_state(#mem{args=Args} = State) ->
+ Test = get_test(Args),
+ save_state_file(Test, State),
+ gossip(call, Test, State).
+
+
+get_pingnode_state(PingNode) ->
+ {ok, RemoteState} = gen_server:call({?SERVER, PingNode}, state),
+ RemoteState.
+
+
+%% @doc handle the gossip messages
+%% We're not using vector_clock:resolve b/c we need custom merge strategy
+handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
+ LocalState=#mem{clock=LocalClock}) ->
+ case vector_clock:compare(RemoteClock, LocalClock) of
+ equal ->
+ {reply, ok, LocalState};
+ less ->
+ % remote node needs updating
+ {reply, {new_state, LocalState}, LocalState};
+ greater when From == none->
+ {noreply, install_new_state(RemoteState)};
+ greater ->
+ % local node needs updating
+ gen_server:reply(From, ok), % reply to sender first
+ {noreply, install_new_state(RemoteState)};
+ concurrent ->
+ % ick, so let's resolve and merge states
+ showroom_log:message(info,
+ "membership: Concurrent Clocks~n"
+ "RemoteState : ~p~nLocalState : ~p~n"
+ , [RemoteState, LocalState]),
+ MergedState = merge_states(RemoteState, LocalState),
+ if From =/= none ->
+ % reply to sender
+ gen_server:reply(From, {new_state, MergedState})
+ end,
+ {noreply, install_new_state(MergedState)}
+ end.
+
+
+merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState,
+ #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) ->
+ MergedClock = vector_clock:merge(RemoteClock, LocalClock),
+ MergedNodes = merge_nodes(RemoteNodes, LocalNodes),
+ LocalState#mem{clock=MergedClock, nodes=MergedNodes}.
+
+
+%% this will give one of the lists back, deterministically
+merge_nodes(Remote, Local) ->
+ % get rid of the initial 0 node if it's still there, and sort
+ Remote1 = lists:usort(lists:keydelete(0,1,Remote)),
+ Local1 = lists:usort(lists:keydelete(0,1,Local)),
+ % handle empty lists as well as other cases
+ case {Remote1, Local1} of
+ {[], L} -> L;
+ {R, []} -> R;
+ _ -> erlang:min(Remote1, Local1)
+ end.
+
+
+gossip(#mem{args=Args} = NewState) ->
+ Test = get_test(Args),
+ gossip(call, Test, NewState).
+
+
+gossip_cast(#mem{nodes=[]}) -> ok;
+gossip_cast(#mem{args=Args} = NewState) ->
+ Test = get_test(Args),
+ gossip(cast, Test, NewState).
+
+
+-spec gossip(gossip_fun(), test(), mem_state()) -> mem_state().
+gossip(_, _, #mem{nodes=[]}) -> ok;
+gossip(Fun, undefined, #mem{nodes=StateNodes} = State) ->
+ {_, Nodes, _} = lists:unzip3(StateNodes),
+ case next_up_node(Nodes) of
+ no_gossip_targets_available ->
+ State; % skip gossip, I'm the only node
+ TargetNode ->
+ showroom_log:message(info, "membership: firing gossip from ~p to ~p",
+ [node(), TargetNode]),
+ case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of
+ ok -> State;
+ {new_state, NewState} -> NewState;
+ Error -> throw({unknown_gossip_response, Error})
+ end
+ end;
+
+gossip(_,_,_) ->
+ % testing, so don't gossip
+ ok.
+
+
+next_up_node(Nodes) ->
+ next_up_node(node(), Nodes, up_nodes()).
+
+
+next_up_node(Node, Nodes, UpNodes) ->
+ {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
+ List = lists:append(B, A), % be sure to eliminate Node
+ DownNodes = Nodes -- UpNodes,
+ case List -- DownNodes of
+ [Target|_] -> Target;
+ [] -> no_gossip_targets_available
+ end.
+
+
+up_nodes() ->
+ % TODO: implement cache (fb 9704 & 9449)
+ erlang:nodes().
+
+
+%% @doc find the latest state file on disk
+find_latest_state_filename() ->
+ Dir = couch_config:get("couchdb", "database_dir"),
+ case file:list_dir(Dir) of
+ {ok, Filenames} ->
+ Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <-
+ [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]],
+ SortedTimestamps = lists:reverse(lists:sort(Timestamps)),
+ case SortedTimestamps of
+ [Latest | _] ->
+ {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++
+ integer_to_list(Latest)};
+ _ ->
+ throw({error, mem_state_file_not_found})
+ end;
+ {error, Reason} ->
+ throw({error, Reason})
+ end.
+
+
+%% (Test, Config)
+read_latest_state_file(undefined) ->
+ try
+ {ok, File} = find_latest_state_filename(),
+ case file:consult(File) of
+ {ok, [#mem{}=State]} -> State;
+ _Else ->
+ throw({error, bad_mem_state_file})
+ end
+ catch _:Error ->
+ showroom_log:message(info, "membership: ~p", [Error]),
+ nil
+ end;
+read_latest_state_file(_) ->
+ nil.
+
+
+%% @doc save the state file to disk, with current timestamp.
+%% thx to riak_ring_manager:do_write_ringfile/1
+-spec save_state_file(test(), mem_state()) -> ok.
+save_state_file(undefined, State) ->
+ Dir = couch_config:get("couchdb", "database_dir"),
+ {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
+ TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
+ [Year, Month, Day, Hour, Minute, Second]),
+ FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS,
+ ok = filelib:ensure_dir(FN),
+ {ok, File} = file:open(FN, [binary, write]),
+ io:format(File, "~w.~n", [State]),
+ file:close(File);
+
+save_state_file(_,_) -> ok. % don't save if testing
+
+
+check_pos(Pos, Node, Nodes) ->
+ Found = lists:keyfind(Pos, 1, Nodes),
+ case Found of
+ false -> ok;
+ _ ->
+ {_,OldNode,_} = Found,
+ if
+ OldNode =:= Node ->
+ Msg = "node_exists_at_position_" ++ integer_to_list(Pos),
+ throw({error, list_to_binary(Msg)});
+ true ->
+ Msg = "position_exists_" ++ integer_to_list(Pos),
+ throw({error, list_to_binary(Msg)})
+ end
+ end.
+
+
+int_reset(Test) ->
+ int_reset(Test, #mem{}).
+
+
+int_reset(_Test, State) ->
+ State#mem{nodes=[], clock=[]}.
+
+
+ping_all_yall(Nodes) ->
+ lists:foreach(fun(Node) ->
+ net_adm:ping(Node)
+ end, Nodes),
+ timer:sleep(500). % sigh.
+
+
+get_remote_states(NodeList) ->
+ NodeList1 = lists:delete(node(), NodeList),
+ {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000),
+ {_Status, States2} = lists:unzip(States1),
+ NodeList2 = NodeList1 -- BadNodes,
+ {lists:zip(NodeList2,States2), BadNodes}.
+
+
+%% @doc compare state with states based on vector clock
+%% return match | {bad_state_match, Node, NodesThatDontMatch}
+compare_state_with_rest(#mem{clock=Clock} = _State, States) ->
+ Results = lists:map(fun({Node, #mem{clock=Clock1}}) ->
+ {vector_clock:equals(Clock, Clock1), Node}
+ end, States),
+ BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn;
+ ({false, N}, AccIn) -> [N | AccIn]
+ end, [], Results),
+ if
+ length(BadResults) == 0 -> match;
+ true -> {bad_state_match, node(), BadResults}
+ end.
+
+notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) ->
+ {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)),
+ lists:foreach(fun(Node) ->
+ case lists:member(Node, MemNodes) orelse Type == nodedown of
+ true ->
+ gen_event:notify(membership_events, {Type, Node});
+ _ -> ok % node not in cluster
+ end
+ end, Nodes).