diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-06-30 16:23:38 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-12 01:22:33 -0400 |
commit | 48c8fde34591f782be7af77575eaa02dab8659b3 (patch) | |
tree | bb54c0d61a660b9ba019f9b187e901ede7132bfa /src | |
parent | 8a09581aa2252f53047fa0e9e95591eaae4556c9 (diff) |
standardize mem3 naming. app is horribly broken for now
Diffstat (limited to 'src')
-rw-r--r-- | src/mem3.erl | 569 | ||||
-rw-r--r-- | src/mem3_app.erl (renamed from src/membership_app.erl) | 6 | ||||
-rw-r--r-- | src/mem3_cache.erl (renamed from src/dbs_cache.erl) | 4 | ||||
-rw-r--r-- | src/mem3_event.erl | 74 | ||||
-rw-r--r-- | src/mem3_httpd.erl | 77 | ||||
-rw-r--r-- | src/mem3_server.erl | 568 | ||||
-rw-r--r-- | src/mem3_sup.erl | 22 | ||||
-rw-r--r-- | src/mem3_sync.erl (renamed from src/dbs.erl) | 4 | ||||
-rw-r--r-- | src/mem3_util.erl (renamed from src/partitions.erl) | 6 | ||||
-rw-r--r-- | src/mem3_vclock.erl (renamed from src/vector_clock.erl) | 2 | ||||
-rw-r--r-- | src/membership.erl | 15 | ||||
-rw-r--r-- | src/membership_sup.erl | 44 |
12 files changed, 759 insertions, 632 deletions
diff --git a/src/mem3.erl b/src/mem3.erl index 7ae7627c..2b8f0188 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -1,568 +1,15 @@ -%%% membership module -%%% -%%% State of the gen_server is a #mem record -%%% -%%% Nodes and Gossip are the same thing, and are a list of three-tuples like: -%%% -%%% [ {Pos,NodeName,Options} | _ ] -%%% -%%% Position is 1-based incrementing in order of node joining -%%% -%%% Options is a proplist, with [{hints, [Part1|_]}] denoting that the node -%%% is responsible for the extra partitions too. -%%% -%%% TODO: dialyzer type specs -%%% -module(mem3). --author('brad@cloudant.com'). +-author('Brad Anderson <brad@cloudant.com>'). --behaviour(gen_server). +-export([start/0, stop/0, restart/0]). -%% API --export([start_link/0, start_link/1, stop/0, stop/1, reset/0]). --export([join/3, clock/0, state/0, states/0, nodes/0, fullnodes/0, - start_gossip/0]). -%% for testing more than anything else --export([merge_nodes/2, next_up_node/1, next_up_node/3]). +start() -> + application:start(mem3). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%% includes --include("membership.hrl"). - --define(SERVER, membership). --define(STATE_FILE_PREFIX, "membership"). - - -%%==================================================================== -%% API -%%==================================================================== - --spec start_link() -> {ok, pid()}. -start_link() -> - start_link([]). - - --spec start_link(args()) -> {ok, pid()}. -start_link(Args) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []). - - --spec stop() -> ok. stop() -> - stop(?MODULE). - - --spec stop(atom()) -> ok. -stop(Server) -> - gen_server:cast(Server, stop). - - --spec join(join_type(), mem_node_list() | {node(), options()}, node() | nil) -> - ok. -join(JoinType, Payload, PingNode) -> - gen_server:call(?SERVER, {join, JoinType, Payload, PingNode}). - - --spec clock() -> vector_clock(). -clock() -> - gen_server:call(?SERVER, clock). - - --spec state() -> mem_state(). -state() -> - gen_server:call(?SERVER, state). - - -%% @doc Detailed report of cluster-wide membership state. Queries the state -%% on all member nodes and builds a dictionary with unique states as the -%% key and the nodes holding that state as the value. Also reports member -%% nodes which fail to respond and nodes which are connected but are not -%% cluster members. Useful for debugging. --spec states() -> [{mem_state() | bad_nodes | non_member_nodes, [node()]}]. -states() -> - {ok, Nodes} = mem3:nodes(), - AllNodes = [node()|erlang:nodes()], - {Replies, BadNodes} = gen_server:multi_call(Nodes, ?SERVER, state), - Dict = lists:foldl(fun({Node, {ok,State}}, D) -> - orddict:append(State, Node, D) - end, orddict:new(), Replies), - [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. - --spec start_gossip() -> ok. -start_gossip() -> - gen_server:call(?SERVER, start_gossip). - - --spec reset() -> ok | not_reset. -reset() -> - gen_server:call(?SERVER, reset). - - -%% @doc get the list of cluster nodes (according to membership module) -%% This may differ from erlang:nodes() -%% Guaranteed to be in order of State's node list (1st elem in 3-tuple) --spec nodes() -> {ok, [node()]}. -nodes() -> - gen_server:call(?SERVER, nodes). - - -%% @doc get the list of cluster nodes (according to membership module) -%% This may differ from erlang:nodes() -%% Guaranteed to be in order of State's node list (1st elem in 3-tuple) --spec fullnodes() -> {ok, [mem_node()]}. -fullnodes() -> - gen_server:call(?SERVER, fullnodes). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%% start up membership server --spec init(args()) -> {ok, mem_state()}. -init(Args) -> - process_flag(trap_exit,true), - Test = get_test(Args), - OldState = read_latest_state_file(Test), - showroom_log:message(info, "membership: membership server starting...", []), - net_kernel:monitor_nodes(true), - State = handle_init(Test, OldState), - {ok, State#mem{args=Args}}. - - -%% new node(s) joining to this node -handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) -> - try - case handle_join(JoinType, ExtNodes, PingNode, State) of - {ok, NewState} -> {reply, ok, NewState}; - Other -> {reply, Other, State} - end - catch _:Error -> - showroom_log:message(error, "~p", [Error]), - {reply, Error, State} - end; - -%% clock -handle_call(clock, _From, #mem{clock=Clock} = State) -> - {reply, {ok, Clock}, State}; - -%% state -handle_call(state, _From, State) -> - {reply, {ok, State}, State}; - -%% reset - but only if we're in test mode -handle_call(reset, _From, #mem{args=Args} = State) -> - Test = get_test(Args), - case Test of - undefined -> {reply, not_reset, State}; - _ -> {reply, ok, int_reset(Test, State)} - end; - -%% nodes -handle_call(nodes, _From, #mem{nodes=Nodes} = State) -> - {_,NodeList,_} = lists:unzip3(lists:keysort(1, Nodes)), - {reply, {ok, NodeList}, State}; - -%% fullnodes -handle_call(fullnodes, _From, #mem{nodes=Nodes} = State) -> - {reply, {ok, Nodes}, State}; - -%% gossip -handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) -> - showroom_log:message(info, "membership: received gossip from ~p", - [erlang:node(Pid)]), - handle_gossip(From, RemoteState, LocalState); - -% start_gossip -handle_call(start_gossip, _From, State) -> - NewState = gossip(State), - {reply, ok, NewState}; - -%% ignored call -handle_call(Msg, _From, State) -> - showroom_log:message(info, "membership: ignored call: ~p", [Msg]), - {reply, ignored, State}. - - -%% gossip -handle_cast({gossip, RemoteState}, LocalState) -> - State = case handle_gossip(none, RemoteState, LocalState) of - {reply, ok, NewState} -> NewState; - {reply, {new_state, NewState}, _} -> NewState; - {noreply, NewState} -> NewState - end, - {noreply, State}; - -%% stop -handle_cast(stop, State) -> - {stop, normal, State}; - -%% ignored cast -handle_cast(Msg, State) -> - showroom_log:message(info, "membership: ignored cast: ~p", [Msg]), - {noreply, State}. - - -%% @doc handle nodedown messages because we have -%% net_kernel:monitor_nodes(true) -handle_info({nodedown, Node}, State) -> - showroom_log:message(alert, "membership: nodedown ~p", [Node]), - notify(nodedown, [Node], State), - {noreply, State}; - -%% @doc handle nodeup messages because we have -%% net_kernel:monitor_nodes(true) -handle_info({nodeup, Node}, State) -> - showroom_log:message(alert, "membership: nodeup ~p", [Node]), - notify(nodeup, [Node], State), - gossip_cast(State), - {noreply, State}; - -%% ignored info -handle_info(Info, State) -> - showroom_log:message(info, "membership: ignored info: ~p", [Info]), - {noreply, State}. - - -% terminate -terminate(_Reason, _State) -> - ok. - - -% ignored code change -code_change(OldVsn, State, _Extra) -> - io:format("Unknown Old Version~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]), - {ok, State}. - - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- - -%% @doc if Args has config use it, otherwise call configuration module -%% most times Args will have config during testing runs -%get_config(Args) -> -% case proplists:get_value(config, Args) of -% undefined -> configuration:get_config(); -% Any -> Any -% end. - - -get_test(Args) -> - proplists:get_value(test, Args). - - -%% @doc handle_init starts a node -%% Most of the time, this puts the node in a single-node cluster setup, -%% But, we could be automatically rejoining a cluster after some downtime. -%% See handle_join for initing, joining, leaving a cluster, or replacing a -%% node. -%% @end -handle_init(Test, nil) -> - int_reset(Test); - -handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) -> - % there's an old state, let's try to rejoin automatically - % but only if we can compare our old state to other available - % nodes and get a match... otherwise get a human involved - {_, NodeList, _} = lists:unzip3(Nodes), - ping_all_yall(NodeList), - {RemoteStates, _BadNodes} = get_remote_states(NodeList), - Test = get_test(Args), - case compare_state_with_rest(OldState, RemoteStates) of - match -> - showroom_log:message(info, "membership: rejoined successfully", []), - OldState; - Other -> - showroom_log:message(error, "membership: rejoin failed: ~p", [Other]), - int_reset(Test) - end. - - -%% @doc handle join activities, return {ok,NewState} --spec handle_join(join_type(), [mem_node()], ping_node(), mem_state()) -> - {ok, mem_state()}. -% init -handle_join(init, ExtNodes, nil, State) -> - {_,Nodes,_} = lists:unzip3(ExtNodes), - ping_all_yall(Nodes), - int_join(ExtNodes, State); -% join -handle_join(join, ExtNodes, PingNode, #mem{args=Args} = State) -> - NewState = case get_test(Args) of - undefined -> get_pingnode_state(PingNode); - _ -> State % testing, so meh - end, - % now use this info to join the ring - int_join(ExtNodes, NewState); -% replace -handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) -> - handle_join(replace, {OldNode, []}, PingNode, State); -handle_join(replace, [OldNode | _], PingNode, State) -> - handle_join(replace, {OldNode, []}, PingNode, State); -handle_join(replace, {OldNode, NewOpts}, PingNode, State) -> - OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode), - {Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes), - NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}), - notify(node_leave, [OldNode], State), - int_join([], OldState#mem{nodes=NewNodes}); -% leave -handle_join(leave, [OldNode | _], _PingNode, State) -> - % TODO implement me - notify(node_leave, [OldNode], State), - ok; - -handle_join(JoinType, _, PingNode, _) -> - showroom_log:message(info, "membership: unknown join type: ~p " - "for ping node: ~p", [JoinType, PingNode]), - {error, unknown_join_type}. - -%% @doc common operations for all join types -int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) -> - NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) -> - check_pos(Pos, N, Nodes), - notify(node_join, [N], State), - [New|AccIn] - end, Nodes, ExtNodes), - NewNodes1 = lists:sort(NewNodes), - NewClock = vector_clock:increment(node(), Clock), - NewState = State#mem{nodes=NewNodes1, clock=NewClock}, - install_new_state(NewState), - {ok, NewState}. - - -install_new_state(#mem{args=Args} = State) -> - Test = get_test(Args), - save_state_file(Test, State), - gossip(call, Test, State). - - -get_pingnode_state(PingNode) -> - {ok, RemoteState} = gen_server:call({?SERVER, PingNode}, state), - RemoteState. - - -%% @doc handle the gossip messages -%% We're not using vector_clock:resolve b/c we need custom merge strategy -handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, - LocalState=#mem{clock=LocalClock}) -> - case vector_clock:compare(RemoteClock, LocalClock) of - equal -> - {reply, ok, LocalState}; - less -> - % remote node needs updating - {reply, {new_state, LocalState}, LocalState}; - greater when From == none-> - {noreply, install_new_state(RemoteState)}; - greater -> - % local node needs updating - gen_server:reply(From, ok), % reply to sender first - {noreply, install_new_state(RemoteState)}; - concurrent -> - % ick, so let's resolve and merge states - showroom_log:message(info, - "membership: Concurrent Clocks~n" - "RemoteState : ~p~nLocalState : ~p~n" - , [RemoteState, LocalState]), - MergedState = merge_states(RemoteState, LocalState), - if From =/= none -> - % reply to sender - gen_server:reply(From, {new_state, MergedState}) - end, - {noreply, install_new_state(MergedState)} - end. - - -merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState, - #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) -> - MergedClock = vector_clock:merge(RemoteClock, LocalClock), - MergedNodes = merge_nodes(RemoteNodes, LocalNodes), - LocalState#mem{clock=MergedClock, nodes=MergedNodes}. - - -%% this will give one of the lists back, deterministically -merge_nodes(Remote, Local) -> - % get rid of the initial 0 node if it's still there, and sort - Remote1 = lists:usort(lists:keydelete(0,1,Remote)), - Local1 = lists:usort(lists:keydelete(0,1,Local)), - % handle empty lists as well as other cases - case {Remote1, Local1} of - {[], L} -> L; - {R, []} -> R; - _ -> erlang:min(Remote1, Local1) - end. - - -gossip(#mem{args=Args} = NewState) -> - Test = get_test(Args), - gossip(call, Test, NewState). - - -gossip_cast(#mem{nodes=[]}) -> ok; -gossip_cast(#mem{args=Args} = NewState) -> - Test = get_test(Args), - gossip(cast, Test, NewState). - - --spec gossip(gossip_fun(), test(), mem_state()) -> mem_state(). -gossip(_, _, #mem{nodes=[]}) -> ok; -gossip(Fun, undefined, #mem{nodes=StateNodes} = State) -> - {_, Nodes, _} = lists:unzip3(StateNodes), - case next_up_node(Nodes) of - no_gossip_targets_available -> - State; % skip gossip, I'm the only node - TargetNode -> - showroom_log:message(info, "membership: firing gossip from ~p to ~p", - [node(), TargetNode]), - case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of - ok -> State; - {new_state, NewState} -> NewState; - Error -> throw({unknown_gossip_response, Error}) - end - end; - -gossip(_,_,_) -> - % testing, so don't gossip - ok. - - -next_up_node(Nodes) -> - next_up_node(node(), Nodes, up_nodes()). - - -next_up_node(Node, Nodes, UpNodes) -> - {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes), - List = lists:append(B, A), % be sure to eliminate Node - DownNodes = Nodes -- UpNodes, - case List -- DownNodes of - [Target|_] -> Target; - [] -> no_gossip_targets_available - end. - - -up_nodes() -> - % TODO: implement cache (fb 9704 & 9449) - erlang:nodes(). - - -%% @doc find the latest state file on disk -find_latest_state_filename() -> - Dir = couch_config:get("couchdb", "database_dir"), - case file:list_dir(Dir) of - {ok, Filenames} -> - Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <- - [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]], - SortedTimestamps = lists:reverse(lists:sort(Timestamps)), - case SortedTimestamps of - [Latest | _] -> - {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ - integer_to_list(Latest)}; - _ -> - throw({error, mem_state_file_not_found}) - end; - {error, Reason} -> - throw({error, Reason}) - end. - - -%% (Test, Config) -read_latest_state_file(undefined) -> - try - {ok, File} = find_latest_state_filename(), - case file:consult(File) of - {ok, [#mem{}=State]} -> State; - _Else -> - throw({error, bad_mem_state_file}) - end - catch _:Error -> - showroom_log:message(info, "membership: ~p", [Error]), - nil - end; -read_latest_state_file(_) -> - nil. - - -%% @doc save the state file to disk, with current timestamp. -%% thx to riak_ring_manager:do_write_ringfile/1 --spec save_state_file(test(), mem_state()) -> ok. -save_state_file(undefined, State) -> - Dir = couch_config:get("couchdb", "database_dir"), - {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(), - TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B", - [Year, Month, Day, Hour, Minute, Second]), - FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS, - ok = filelib:ensure_dir(FN), - {ok, File} = file:open(FN, [binary, write]), - io:format(File, "~w.~n", [State]), - file:close(File); - -save_state_file(_,_) -> ok. % don't save if testing - - -check_pos(Pos, Node, Nodes) -> - Found = lists:keyfind(Pos, 1, Nodes), - case Found of - false -> ok; - _ -> - {_,OldNode,_} = Found, - if - OldNode =:= Node -> - Msg = "node_exists_at_position_" ++ integer_to_list(Pos), - throw({error, list_to_binary(Msg)}); - true -> - Msg = "position_exists_" ++ integer_to_list(Pos), - throw({error, list_to_binary(Msg)}) - end - end. - - -int_reset(Test) -> - int_reset(Test, #mem{}). - - -int_reset(_Test, State) -> - State#mem{nodes=[], clock=[]}. - - -ping_all_yall(Nodes) -> - lists:foreach(fun(Node) -> - net_adm:ping(Node) - end, Nodes), - timer:sleep(500). % sigh. - - -get_remote_states(NodeList) -> - NodeList1 = lists:delete(node(), NodeList), - {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000), - {_Status, States2} = lists:unzip(States1), - NodeList2 = NodeList1 -- BadNodes, - {lists:zip(NodeList2,States2), BadNodes}. - - -%% @doc compare state with states based on vector clock -%% return match | {bad_state_match, Node, NodesThatDontMatch} -compare_state_with_rest(#mem{clock=Clock} = _State, States) -> - Results = lists:map(fun({Node, #mem{clock=Clock1}}) -> - {vector_clock:equals(Clock, Clock1), Node} - end, States), - BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn; - ({false, N}, AccIn) -> [N | AccIn] - end, [], Results), - if - length(BadResults) == 0 -> match; - true -> {bad_state_match, node(), BadResults} - end. + application:stop(mem3). -notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) -> - {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)), - lists:foreach(fun(Node) -> - case lists:member(Node, MemNodes) orelse Type == nodedown of - true -> - gen_event:notify(membership_events, {Type, Node}); - _ -> ok % node not in cluster - end - end, Nodes). +restart() -> + stop(), + start(). diff --git a/src/membership_app.erl b/src/mem3_app.erl index df0f4fee..70bf1cf9 100644 --- a/src/membership_app.erl +++ b/src/mem3_app.erl @@ -1,11 +1,11 @@ --module(membership_app). +-module(mem3_app). -behaviour(application). -export([start/2, stop/1]). start(_Type, []) -> - DbName = couch_config:get("membership", "db", "dbs"), + DbName = couch_config:get("mem3", "db", "dbs"), couch_server:create(list_to_binary(DbName), []), - membership_sup:start_link(). + mem3_sup:start_link(). stop([]) -> ok. diff --git a/src/dbs_cache.erl b/src/mem3_cache.erl index 1afb873b..8f5c372a 100644 --- a/src/dbs_cache.erl +++ b/src/mem3_cache.erl @@ -1,11 +1,11 @@ --module(dbs_cache). +-module(mem3_cache). -behaviour(gen_server). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([start_link/0]). --include("membership.hrl"). +-include("mem3.hrl"). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). diff --git a/src/mem3_event.erl b/src/mem3_event.erl new file mode 100644 index 00000000..59156adc --- /dev/null +++ b/src/mem3_event.erl @@ -0,0 +1,74 @@ +-module(mem3_event). + +-behaviour(gen_event). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, + code_change/3]). + +-include("mem3.hrl"). + +init([]) -> + {ok, []}. + +handle_event({node_join, Node}, State) -> + start_repl({node_join, Node}, State); + +handle_event({nodeup, Node}, State) -> + start_repl({nodeup, Node}, State); + +handle_event({node_leave, Node}, State) -> + stop_repl({node_leave, Node}, State); + +handle_event({nodedown, Node}, State) -> + stop_repl({nodedown, Node}, State); + +handle_event(Event, State) -> + ?LOG_ERROR("unexpected event in dbs handler ~p", [Event]), + {ok, State}. + +handle_call(Request, State) -> + ?LOG_ERROR("unexpected call in dbs handler ~p", [Request]), + {ok, ok, State}. + +handle_info(Info, State) -> + ?LOG_ERROR("unexpected msg in dbs handler ~p", [Info]), + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% +%% internal +%% + +start_repl({Reason, Node}, State) -> + ChildSpec = dbs:childspec(Node), + case supervisor:start_child(dbs, ChildSpec) of + {ok, _} -> + ok; + {error, {already_started, _Child}} -> + ok; + {error, running} -> + ok; + {error, already_present} -> + case supervisor:restart_child(dbs, ChildSpec) of + {ok, _} -> + ok; + {error, running} -> + ok; + {error, Reason} -> + ?LOG_ERROR("dbs repl restart failed ~p", [Reason]) + end; + {error, Reason} -> + ?LOG_ERROR("dbs repl start failed ~p", [Reason]) + end, + {ok, State}. + +stop_repl({Reason, Node}, State) -> + ?LOG_INFO("dbs repl ~p --> ~p terminating (~p)", [node(), Node, Reason]), + supervisor:terminate_child(dbs, Node), + supervisor:delete_child(dbs, Node), + {ok, State}. diff --git a/src/mem3_httpd.erl b/src/mem3_httpd.erl new file mode 100644 index 00000000..2b29b488 --- /dev/null +++ b/src/mem3_httpd.erl @@ -0,0 +1,77 @@ +-module(mem3_httpd). + +-export([handle_membership_req/1]). + +%% includes +-include("mem3.hrl"). + + +handle_membership_req(#httpd{method='GET', + path_parts=[<<"_membership">>]} = Req) -> + {ok,ClusterNodes} = try mem3:nodes() + catch _:_ -> {ok,[]} end, + couch_httpd:send_json(Req, {[ + {all_nodes, lists:sort([node()|nodes()])}, + {cluster_nodes, lists:sort(ClusterNodes)} + ]}); + +handle_membership_req(#httpd{method='POST', + path_parts=[<<"_membership">>]} = Req) -> + {JsonProps} = couch_httpd:json_body_obj(Req), + Method = couch_util:get_value(<<"method">>, JsonProps), + Params = couch_util:get_value(<<"params">>, JsonProps), + Id = couch_util:get_value(<<"id">>, JsonProps), + {Result, Error} = membership_dispatch(Method, Params), + couch_httpd:send_json(Req, {[ + {result, Result}, + {error, Error}, + {id, Id} + ]}). + +%% +%% internal +%% +membership_dispatch(<<"replace">>, Params) -> + OldNode = get_oldnode(Params), + NewNodeOpts = get_value_json(<<"newnode_options">>, Params, []), + PingNode = get_pingnode(Params), + send_join(replace, {OldNode, NewNodeOpts}, PingNode); +membership_dispatch(TypeBin, Params) -> + Type = list_to_atom(?b2l(TypeBin)), + NodeList = get_value_json(<<"nodes">>, Params, []), + Nodes = lists:map(fun({List}) -> node_info(List) end, NodeList), + PingNode = get_pingnode(Params), + send_join(Type, Nodes, PingNode). + +get_pingnode(Params) -> + PingNodeBin = get_value_json(<<"pingnode">>, Params, <<"nil">>), + list_to_atom(?b2l(PingNodeBin)). + +get_oldnode(Params) -> + NodeBin = get_value_json(<<"oldnode">>, Params, undefined), + NodeList = ?b2l(NodeBin), + list_to_atom(NodeList). + +%% @doc send join command to mem module +send_join(Type, Payload, PingNode) -> + case mem3:join(Type, Payload, PingNode) of + ok -> {ok, null}; + {error, Error} -> {Type, Error}; + Other -> + ?LOG_ERROR("membership dispatch error ~p", [Other]), + {Type, unknown_error} + end. + +node_info(List) -> + Order = couch_util:get_value(<<"order">>, List), + Node1 = couch_util:get_value(<<"node">>, List), + Node2 = list_to_atom(?b2l(Node1)), + Options = couch_util:get_value(<<"options">>, List), + {Order, Node2, Options}. + +get_value_json(_,[], Default) -> Default; +get_value_json(Key, [JsonProp|Rest], Default) -> + case JsonProp of + {[{Key, Value}]} -> Value; + _ -> get_value_json(Key, Rest, Default) + end. diff --git a/src/mem3_server.erl b/src/mem3_server.erl new file mode 100644 index 00000000..863e752f --- /dev/null +++ b/src/mem3_server.erl @@ -0,0 +1,568 @@ +%%% membership module +%%% +%%% State of the gen_server is a #mem record +%%% +%%% Nodes and Gossip are the same thing, and are a list of three-tuples like: +%%% +%%% [ {Pos,NodeName,Options} | _ ] +%%% +%%% Position is 1-based incrementing in order of node joining +%%% +%%% Options is a proplist, with [{hints, [Part1|_]}] denoting that the node +%%% is responsible for the extra partitions too. +%%% +%%% TODO: dialyzer type specs +%%% +-module(mem3_server). +-author('brad@cloudant.com'). + +-behaviour(gen_server). + +%% API +-export([start_link/0, start_link/1, stop/0, stop/1, reset/0]). +-export([join/3, clock/0, state/0, states/0, nodes/0, fullnodes/0, + start_gossip/0]). + +%% for testing more than anything else +-export([merge_nodes/2, next_up_node/1, next_up_node/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% includes +-include("mem3.hrl"). + +-define(SERVER, membership). +-define(STATE_FILE_PREFIX, "membership"). + + +%%==================================================================== +%% API +%%==================================================================== + +-spec start_link() -> {ok, pid()}. +start_link() -> + start_link([]). + + +-spec start_link(args()) -> {ok, pid()}. +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []). + + +-spec stop() -> ok. +stop() -> + stop(?MODULE). + + +-spec stop(atom()) -> ok. +stop(Server) -> + gen_server:cast(Server, stop). + + +-spec join(join_type(), mem_node_list() | {node(), options()}, node() | nil) -> + ok. +join(JoinType, Payload, PingNode) -> + gen_server:call(?SERVER, {join, JoinType, Payload, PingNode}). + + +-spec clock() -> vector_clock(). +clock() -> + gen_server:call(?SERVER, clock). + + +-spec state() -> mem_state(). +state() -> + gen_server:call(?SERVER, state). + + +%% @doc Detailed report of cluster-wide membership state. Queries the state +%% on all member nodes and builds a dictionary with unique states as the +%% key and the nodes holding that state as the value. Also reports member +%% nodes which fail to respond and nodes which are connected but are not +%% cluster members. Useful for debugging. +-spec states() -> [{mem_state() | bad_nodes | non_member_nodes, [node()]}]. +states() -> + {ok, Nodes} = mem3:nodes(), + AllNodes = [node()|erlang:nodes()], + {Replies, BadNodes} = gen_server:multi_call(Nodes, ?SERVER, state), + Dict = lists:foldl(fun({Node, {ok,State}}, D) -> + orddict:append(State, Node, D) + end, orddict:new(), Replies), + [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. + +-spec start_gossip() -> ok. +start_gossip() -> + gen_server:call(?SERVER, start_gossip). + + +-spec reset() -> ok | not_reset. +reset() -> + gen_server:call(?SERVER, reset). + + +%% @doc get the list of cluster nodes (according to membership module) +%% This may differ from erlang:nodes() +%% Guaranteed to be in order of State's node list (1st elem in 3-tuple) +-spec nodes() -> {ok, [node()]}. +nodes() -> + gen_server:call(?SERVER, nodes). + + +%% @doc get the list of cluster nodes (according to membership module) +%% This may differ from erlang:nodes() +%% Guaranteed to be in order of State's node list (1st elem in 3-tuple) +-spec fullnodes() -> {ok, [mem_node()]}. +fullnodes() -> + gen_server:call(?SERVER, fullnodes). + + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%% start up membership server +-spec init(args()) -> {ok, mem_state()}. +init(Args) -> + process_flag(trap_exit,true), + Test = get_test(Args), + OldState = read_latest_state_file(Test), + showroom_log:message(info, "membership: membership server starting...", []), + net_kernel:monitor_nodes(true), + State = handle_init(Test, OldState), + {ok, State#mem{args=Args}}. + + +%% new node(s) joining to this node +handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) -> + try + case handle_join(JoinType, ExtNodes, PingNode, State) of + {ok, NewState} -> {reply, ok, NewState}; + Other -> {reply, Other, State} + end + catch _:Error -> + showroom_log:message(error, "~p", [Error]), + {reply, Error, State} + end; + +%% clock +handle_call(clock, _From, #mem{clock=Clock} = State) -> + {reply, {ok, Clock}, State}; + +%% state +handle_call(state, _From, State) -> + {reply, {ok, State}, State}; + +%% reset - but only if we're in test mode +handle_call(reset, _From, #mem{args=Args} = State) -> + Test = get_test(Args), + case Test of + undefined -> {reply, not_reset, State}; + _ -> {reply, ok, int_reset(Test, State)} + end; + +%% nodes +handle_call(nodes, _From, #mem{nodes=Nodes} = State) -> + {_,NodeList,_} = lists:unzip3(lists:keysort(1, Nodes)), + {reply, {ok, NodeList}, State}; + +%% fullnodes +handle_call(fullnodes, _From, #mem{nodes=Nodes} = State) -> + {reply, {ok, Nodes}, State}; + +%% gossip +handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) -> + showroom_log:message(info, "membership: received gossip from ~p", + [erlang:node(Pid)]), + handle_gossip(From, RemoteState, LocalState); + +% start_gossip +handle_call(start_gossip, _From, State) -> + NewState = gossip(State), + {reply, ok, NewState}; + +%% ignored call +handle_call(Msg, _From, State) -> + showroom_log:message(info, "membership: ignored call: ~p", [Msg]), + {reply, ignored, State}. + + +%% gossip +handle_cast({gossip, RemoteState}, LocalState) -> + State = case handle_gossip(none, RemoteState, LocalState) of + {reply, ok, NewState} -> NewState; + {reply, {new_state, NewState}, _} -> NewState; + {noreply, NewState} -> NewState + end, + {noreply, State}; + +%% stop +handle_cast(stop, State) -> + {stop, normal, State}; + +%% ignored cast +handle_cast(Msg, State) -> + showroom_log:message(info, "membership: ignored cast: ~p", [Msg]), + {noreply, State}. + + +%% @doc handle nodedown messages because we have +%% net_kernel:monitor_nodes(true) +handle_info({nodedown, Node}, State) -> + showroom_log:message(alert, "membership: nodedown ~p", [Node]), + notify(nodedown, [Node], State), + {noreply, State}; + +%% @doc handle nodeup messages because we have +%% net_kernel:monitor_nodes(true) +handle_info({nodeup, Node}, State) -> + showroom_log:message(alert, "membership: nodeup ~p", [Node]), + notify(nodeup, [Node], State), + gossip_cast(State), + {noreply, State}; + +%% ignored info +handle_info(Info, State) -> + showroom_log:message(info, "membership: ignored info: ~p", [Info]), + {noreply, State}. + + +% terminate +terminate(_Reason, _State) -> + ok. + + +% ignored code change +code_change(OldVsn, State, _Extra) -> + io:format("Unknown Old Version~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]), + {ok, State}. + + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +%% @doc if Args has config use it, otherwise call configuration module +%% most times Args will have config during testing runs +%get_config(Args) -> +% case proplists:get_value(config, Args) of +% undefined -> configuration:get_config(); +% Any -> Any +% end. + + +get_test(Args) -> + proplists:get_value(test, Args). + + +%% @doc handle_init starts a node +%% Most of the time, this puts the node in a single-node cluster setup, +%% But, we could be automatically rejoining a cluster after some downtime. +%% See handle_join for initing, joining, leaving a cluster, or replacing a +%% node. +%% @end +handle_init(Test, nil) -> + int_reset(Test); + +handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) -> + % there's an old state, let's try to rejoin automatically + % but only if we can compare our old state to other available + % nodes and get a match... otherwise get a human involved + {_, NodeList, _} = lists:unzip3(Nodes), + ping_all_yall(NodeList), + {RemoteStates, _BadNodes} = get_remote_states(NodeList), + Test = get_test(Args), + case compare_state_with_rest(OldState, RemoteStates) of + match -> + showroom_log:message(info, "membership: rejoined successfully", []), + OldState; + Other -> + showroom_log:message(error, "membership: rejoin failed: ~p", [Other]), + int_reset(Test) + end. + + +%% @doc handle join activities, return {ok,NewState} +-spec handle_join(join_type(), [mem_node()], ping_node(), mem_state()) -> + {ok, mem_state()}. +% init +handle_join(init, ExtNodes, nil, State) -> + {_,Nodes,_} = lists:unzip3(ExtNodes), + ping_all_yall(Nodes), + int_join(ExtNodes, State); +% join +handle_join(join, ExtNodes, PingNode, #mem{args=Args} = State) -> + NewState = case get_test(Args) of + undefined -> get_pingnode_state(PingNode); + _ -> State % testing, so meh + end, + % now use this info to join the ring + int_join(ExtNodes, NewState); +% replace +handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) -> + handle_join(replace, {OldNode, []}, PingNode, State); +handle_join(replace, [OldNode | _], PingNode, State) -> + handle_join(replace, {OldNode, []}, PingNode, State); +handle_join(replace, {OldNode, NewOpts}, PingNode, State) -> + OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode), + {Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes), + NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}), + notify(node_leave, [OldNode], State), + int_join([], OldState#mem{nodes=NewNodes}); +% leave +handle_join(leave, [OldNode | _], _PingNode, State) -> + % TODO implement me + notify(node_leave, [OldNode], State), + ok; + +handle_join(JoinType, _, PingNode, _) -> + showroom_log:message(info, "membership: unknown join type: ~p " + "for ping node: ~p", [JoinType, PingNode]), + {error, unknown_join_type}. + +%% @doc common operations for all join types +int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) -> + NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) -> + check_pos(Pos, N, Nodes), + notify(node_join, [N], State), + [New|AccIn] + end, Nodes, ExtNodes), + NewNodes1 = lists:sort(NewNodes), + NewClock = vector_clock:increment(node(), Clock), + NewState = State#mem{nodes=NewNodes1, clock=NewClock}, + install_new_state(NewState), + {ok, NewState}. + + +install_new_state(#mem{args=Args} = State) -> + Test = get_test(Args), + save_state_file(Test, State), + gossip(call, Test, State). + + +get_pingnode_state(PingNode) -> + {ok, RemoteState} = gen_server:call({?SERVER, PingNode}, state), + RemoteState. + + +%% @doc handle the gossip messages +%% We're not using vector_clock:resolve b/c we need custom merge strategy +handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, + LocalState=#mem{clock=LocalClock}) -> + case vector_clock:compare(RemoteClock, LocalClock) of + equal -> + {reply, ok, LocalState}; + less -> + % remote node needs updating + {reply, {new_state, LocalState}, LocalState}; + greater when From == none-> + {noreply, install_new_state(RemoteState)}; + greater -> + % local node needs updating + gen_server:reply(From, ok), % reply to sender first + {noreply, install_new_state(RemoteState)}; + concurrent -> + % ick, so let's resolve and merge states + showroom_log:message(info, + "membership: Concurrent Clocks~n" + "RemoteState : ~p~nLocalState : ~p~n" + , [RemoteState, LocalState]), + MergedState = merge_states(RemoteState, LocalState), + if From =/= none -> + % reply to sender + gen_server:reply(From, {new_state, MergedState}) + end, + {noreply, install_new_state(MergedState)} + end. + + +merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState, + #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) -> + MergedClock = vector_clock:merge(RemoteClock, LocalClock), + MergedNodes = merge_nodes(RemoteNodes, LocalNodes), + LocalState#mem{clock=MergedClock, nodes=MergedNodes}. + + +%% this will give one of the lists back, deterministically +merge_nodes(Remote, Local) -> + % get rid of the initial 0 node if it's still there, and sort + Remote1 = lists:usort(lists:keydelete(0,1,Remote)), + Local1 = lists:usort(lists:keydelete(0,1,Local)), + % handle empty lists as well as other cases + case {Remote1, Local1} of + {[], L} -> L; + {R, []} -> R; + _ -> erlang:min(Remote1, Local1) + end. + + +gossip(#mem{args=Args} = NewState) -> + Test = get_test(Args), + gossip(call, Test, NewState). + + +gossip_cast(#mem{nodes=[]}) -> ok; +gossip_cast(#mem{args=Args} = NewState) -> + Test = get_test(Args), + gossip(cast, Test, NewState). + + +-spec gossip(gossip_fun(), test(), mem_state()) -> mem_state(). +gossip(_, _, #mem{nodes=[]}) -> ok; +gossip(Fun, undefined, #mem{nodes=StateNodes} = State) -> + {_, Nodes, _} = lists:unzip3(StateNodes), + case next_up_node(Nodes) of + no_gossip_targets_available -> + State; % skip gossip, I'm the only node + TargetNode -> + showroom_log:message(info, "membership: firing gossip from ~p to ~p", + [node(), TargetNode]), + case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of + ok -> State; + {new_state, NewState} -> NewState; + Error -> throw({unknown_gossip_response, Error}) + end + end; + +gossip(_,_,_) -> + % testing, so don't gossip + ok. + + +next_up_node(Nodes) -> + next_up_node(node(), Nodes, up_nodes()). + + +next_up_node(Node, Nodes, UpNodes) -> + {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes), + List = lists:append(B, A), % be sure to eliminate Node + DownNodes = Nodes -- UpNodes, + case List -- DownNodes of + [Target|_] -> Target; + [] -> no_gossip_targets_available + end. + + +up_nodes() -> + % TODO: implement cache (fb 9704 & 9449) + erlang:nodes(). + + +%% @doc find the latest state file on disk +find_latest_state_filename() -> + Dir = couch_config:get("couchdb", "database_dir"), + case file:list_dir(Dir) of + {ok, Filenames} -> + Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <- + [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]], + SortedTimestamps = lists:reverse(lists:sort(Timestamps)), + case SortedTimestamps of + [Latest | _] -> + {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ + integer_to_list(Latest)}; + _ -> + throw({error, mem_state_file_not_found}) + end; + {error, Reason} -> + throw({error, Reason}) + end. + + +%% (Test, Config) +read_latest_state_file(undefined) -> + try + {ok, File} = find_latest_state_filename(), + case file:consult(File) of + {ok, [#mem{}=State]} -> State; + _Else -> + throw({error, bad_mem_state_file}) + end + catch _:Error -> + showroom_log:message(info, "membership: ~p", [Error]), + nil + end; +read_latest_state_file(_) -> + nil. + + +%% @doc save the state file to disk, with current timestamp. +%% thx to riak_ring_manager:do_write_ringfile/1 +-spec save_state_file(test(), mem_state()) -> ok. +save_state_file(undefined, State) -> + Dir = couch_config:get("couchdb", "database_dir"), + {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(), + TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B", + [Year, Month, Day, Hour, Minute, Second]), + FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS, + ok = filelib:ensure_dir(FN), + {ok, File} = file:open(FN, [binary, write]), + io:format(File, "~w.~n", [State]), + file:close(File); + +save_state_file(_,_) -> ok. % don't save if testing + + +check_pos(Pos, Node, Nodes) -> + Found = lists:keyfind(Pos, 1, Nodes), + case Found of + false -> ok; + _ -> + {_,OldNode,_} = Found, + if + OldNode =:= Node -> + Msg = "node_exists_at_position_" ++ integer_to_list(Pos), + throw({error, list_to_binary(Msg)}); + true -> + Msg = "position_exists_" ++ integer_to_list(Pos), + throw({error, list_to_binary(Msg)}) + end + end. + + +int_reset(Test) -> + int_reset(Test, #mem{}). + + +int_reset(_Test, State) -> + State#mem{nodes=[], clock=[]}. + + +ping_all_yall(Nodes) -> + lists:foreach(fun(Node) -> + net_adm:ping(Node) + end, Nodes), + timer:sleep(500). % sigh. + + +get_remote_states(NodeList) -> + NodeList1 = lists:delete(node(), NodeList), + {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000), + {_Status, States2} = lists:unzip(States1), + NodeList2 = NodeList1 -- BadNodes, + {lists:zip(NodeList2,States2), BadNodes}. + + +%% @doc compare state with states based on vector clock +%% return match | {bad_state_match, Node, NodesThatDontMatch} +compare_state_with_rest(#mem{clock=Clock} = _State, States) -> + Results = lists:map(fun({Node, #mem{clock=Clock1}}) -> + {vector_clock:equals(Clock, Clock1), Node} + end, States), + BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn; + ({false, N}, AccIn) -> [N | AccIn] + end, [], Results), + if + length(BadResults) == 0 -> match; + true -> {bad_state_match, node(), BadResults} + end. + +notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) -> + {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)), + lists:foreach(fun(Node) -> + case lists:member(Node, MemNodes) orelse Type == nodedown of + true -> + gen_event:notify(membership_events, {Type, Node}); + _ -> ok % node not in cluster + end + end, Nodes). diff --git a/src/mem3_sup.erl b/src/mem3_sup.erl new file mode 100644 index 00000000..122e68d7 --- /dev/null +++ b/src/mem3_sup.erl @@ -0,0 +1,22 @@ +-module(mem3_sup). +-behaviour(supervisor). +-export([start_link/0, init/1]). + +start_link() -> + supervisor:start_link(?MODULE, []). + +init(_Args) -> + Children = [ + child(mem3_server), + child(mem3_event), + child(mem3_sync), + child(mem3_cache) + ], + {ok, {{one_for_one,10,1}, Children}}. + +child(mem3_event) -> + MFA = {gen_event, start_link, [{local,mem3_event}]}, + {mem3_event, MFA, permanent, 1000, worker, dynamic}; +child(Child) -> + {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}. +
\ No newline at end of file diff --git a/src/dbs.erl b/src/mem3_sync.erl index 345788ef..d50514d9 100644 --- a/src/dbs.erl +++ b/src/mem3_sync.erl @@ -1,9 +1,9 @@ --module(dbs). +-module(mem3_sync). -behaviour(supervisor). -export([start_link/0, init/1, childspec/1, sup_upgrade_notify/2]). --include("membership.hrl"). +-include("mem3.hrl"). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). diff --git a/src/partitions.erl b/src/mem3_util.erl index 3e024264..f6c94748 100644 --- a/src/partitions.erl +++ b/src/mem3_util.erl @@ -1,4 +1,4 @@ --module(partitions). +-module(mem3_util). -author('brad@cloudant.com'). %% API @@ -8,9 +8,7 @@ -define(RINGTOP, trunc(math:pow(2,160))). % SHA-1 space --include("../../couch/src/couch_db.hrl"). --include("../../dynomite/include/membership.hrl"). --include_lib("eunit/include/eunit.hrl"). +-include("mem3.hrl"). %%==================================================================== %% API diff --git a/src/vector_clock.erl b/src/mem3_vclock.erl index 0a422334..a48da43c 100644 --- a/src/vector_clock.erl +++ b/src/mem3_vclock.erl @@ -1,7 +1,7 @@ %%% @author Cliff Moon <cliff@powerset.com> [] %%% @copyright 2008 Cliff Moon --module (vector_clock). +-module (mem3_vclock). -export ([create/1, truncate/1, increment/2, compare/2, resolve/2, merge/2, equals/2]). diff --git a/src/membership.erl b/src/membership.erl deleted file mode 100644 index 1e06e798..00000000 --- a/src/membership.erl +++ /dev/null @@ -1,15 +0,0 @@ --module(membership). --author('Brad Anderson <brad@cloudant.com>'). - --export([start/0, stop/0, restart/0]). - - -start() -> - application:start(membership). - -stop() -> - application:stop(membership). - -restart() -> - stop(), - start(). diff --git a/src/membership_sup.erl b/src/membership_sup.erl deleted file mode 100644 index f203924d..00000000 --- a/src/membership_sup.erl +++ /dev/null @@ -1,44 +0,0 @@ --module(membership_sup). --author('brad@cloudant.com'). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - --define(SERVER, ?MODULE). - -start_link() -> - supervisor:start_link(?MODULE, []). - -init(_Args) -> - Membership = {membership, - {mem3, start_link, []}, - permanent, - 1000, - worker, - [mem3]}, - MemEventMgr = {mem_event_manager, - {gen_event, start_link, [{local, membership_events}]}, - permanent, - 1000, - worker, - []}, - DbsRepl = - {dbs, - {dbs, start_link, []}, - permanent, - infinity, - supervisor, - [dbs]}, - DbsCache = - {dbs_cache, - {dbs_cache, start_link, []}, - permanent, - 1000, - worker, - [dbs_cache]}, - {ok, {{one_for_one,10,1}, [Membership, MemEventMgr, DbsRepl, DbsCache]}}. |