summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-06-30 16:23:38 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-12 01:22:33 -0400
commit48c8fde34591f782be7af77575eaa02dab8659b3 (patch)
treebb54c0d61a660b9ba019f9b187e901ede7132bfa /src
parent8a09581aa2252f53047fa0e9e95591eaae4556c9 (diff)
standardize mem3 naming. app is horribly broken for now
Diffstat (limited to 'src')
-rw-r--r--src/mem3.erl569
-rw-r--r--src/mem3_app.erl (renamed from src/membership_app.erl)6
-rw-r--r--src/mem3_cache.erl (renamed from src/dbs_cache.erl)4
-rw-r--r--src/mem3_event.erl74
-rw-r--r--src/mem3_httpd.erl77
-rw-r--r--src/mem3_server.erl568
-rw-r--r--src/mem3_sup.erl22
-rw-r--r--src/mem3_sync.erl (renamed from src/dbs.erl)4
-rw-r--r--src/mem3_util.erl (renamed from src/partitions.erl)6
-rw-r--r--src/mem3_vclock.erl (renamed from src/vector_clock.erl)2
-rw-r--r--src/membership.erl15
-rw-r--r--src/membership_sup.erl44
12 files changed, 759 insertions, 632 deletions
diff --git a/src/mem3.erl b/src/mem3.erl
index 7ae7627c..2b8f0188 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -1,568 +1,15 @@
-%%% membership module
-%%%
-%%% State of the gen_server is a #mem record
-%%%
-%%% Nodes and Gossip are the same thing, and are a list of three-tuples like:
-%%%
-%%% [ {Pos,NodeName,Options} | _ ]
-%%%
-%%% Position is 1-based incrementing in order of node joining
-%%%
-%%% Options is a proplist, with [{hints, [Part1|_]}] denoting that the node
-%%% is responsible for the extra partitions too.
-%%%
-%%% TODO: dialyzer type specs
-%%%
-module(mem3).
--author('brad@cloudant.com').
+-author('Brad Anderson <brad@cloudant.com>').
--behaviour(gen_server).
+-export([start/0, stop/0, restart/0]).
-%% API
--export([start_link/0, start_link/1, stop/0, stop/1, reset/0]).
--export([join/3, clock/0, state/0, states/0, nodes/0, fullnodes/0,
- start_gossip/0]).
-%% for testing more than anything else
--export([merge_nodes/2, next_up_node/1, next_up_node/3]).
+start() ->
+ application:start(mem3).
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
-%% includes
--include("membership.hrl").
-
--define(SERVER, membership).
--define(STATE_FILE_PREFIX, "membership").
-
-
-%%====================================================================
-%% API
-%%====================================================================
-
--spec start_link() -> {ok, pid()}.
-start_link() ->
- start_link([]).
-
-
--spec start_link(args()) -> {ok, pid()}.
-start_link(Args) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []).
-
-
--spec stop() -> ok.
stop() ->
- stop(?MODULE).
-
-
--spec stop(atom()) -> ok.
-stop(Server) ->
- gen_server:cast(Server, stop).
-
-
--spec join(join_type(), mem_node_list() | {node(), options()}, node() | nil) ->
- ok.
-join(JoinType, Payload, PingNode) ->
- gen_server:call(?SERVER, {join, JoinType, Payload, PingNode}).
-
-
--spec clock() -> vector_clock().
-clock() ->
- gen_server:call(?SERVER, clock).
-
-
--spec state() -> mem_state().
-state() ->
- gen_server:call(?SERVER, state).
-
-
-%% @doc Detailed report of cluster-wide membership state. Queries the state
-%% on all member nodes and builds a dictionary with unique states as the
-%% key and the nodes holding that state as the value. Also reports member
-%% nodes which fail to respond and nodes which are connected but are not
-%% cluster members. Useful for debugging.
--spec states() -> [{mem_state() | bad_nodes | non_member_nodes, [node()]}].
-states() ->
- {ok, Nodes} = mem3:nodes(),
- AllNodes = [node()|erlang:nodes()],
- {Replies, BadNodes} = gen_server:multi_call(Nodes, ?SERVER, state),
- Dict = lists:foldl(fun({Node, {ok,State}}, D) ->
- orddict:append(State, Node, D)
- end, orddict:new(), Replies),
- [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
-
--spec start_gossip() -> ok.
-start_gossip() ->
- gen_server:call(?SERVER, start_gossip).
-
-
--spec reset() -> ok | not_reset.
-reset() ->
- gen_server:call(?SERVER, reset).
-
-
-%% @doc get the list of cluster nodes (according to membership module)
-%% This may differ from erlang:nodes()
-%% Guaranteed to be in order of State's node list (1st elem in 3-tuple)
--spec nodes() -> {ok, [node()]}.
-nodes() ->
- gen_server:call(?SERVER, nodes).
-
-
-%% @doc get the list of cluster nodes (according to membership module)
-%% This may differ from erlang:nodes()
-%% Guaranteed to be in order of State's node list (1st elem in 3-tuple)
--spec fullnodes() -> {ok, [mem_node()]}.
-fullnodes() ->
- gen_server:call(?SERVER, fullnodes).
-
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%% start up membership server
--spec init(args()) -> {ok, mem_state()}.
-init(Args) ->
- process_flag(trap_exit,true),
- Test = get_test(Args),
- OldState = read_latest_state_file(Test),
- showroom_log:message(info, "membership: membership server starting...", []),
- net_kernel:monitor_nodes(true),
- State = handle_init(Test, OldState),
- {ok, State#mem{args=Args}}.
-
-
-%% new node(s) joining to this node
-handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) ->
- try
- case handle_join(JoinType, ExtNodes, PingNode, State) of
- {ok, NewState} -> {reply, ok, NewState};
- Other -> {reply, Other, State}
- end
- catch _:Error ->
- showroom_log:message(error, "~p", [Error]),
- {reply, Error, State}
- end;
-
-%% clock
-handle_call(clock, _From, #mem{clock=Clock} = State) ->
- {reply, {ok, Clock}, State};
-
-%% state
-handle_call(state, _From, State) ->
- {reply, {ok, State}, State};
-
-%% reset - but only if we're in test mode
-handle_call(reset, _From, #mem{args=Args} = State) ->
- Test = get_test(Args),
- case Test of
- undefined -> {reply, not_reset, State};
- _ -> {reply, ok, int_reset(Test, State)}
- end;
-
-%% nodes
-handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
- {_,NodeList,_} = lists:unzip3(lists:keysort(1, Nodes)),
- {reply, {ok, NodeList}, State};
-
-%% fullnodes
-handle_call(fullnodes, _From, #mem{nodes=Nodes} = State) ->
- {reply, {ok, Nodes}, State};
-
-%% gossip
-handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) ->
- showroom_log:message(info, "membership: received gossip from ~p",
- [erlang:node(Pid)]),
- handle_gossip(From, RemoteState, LocalState);
-
-% start_gossip
-handle_call(start_gossip, _From, State) ->
- NewState = gossip(State),
- {reply, ok, NewState};
-
-%% ignored call
-handle_call(Msg, _From, State) ->
- showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
- {reply, ignored, State}.
-
-
-%% gossip
-handle_cast({gossip, RemoteState}, LocalState) ->
- State = case handle_gossip(none, RemoteState, LocalState) of
- {reply, ok, NewState} -> NewState;
- {reply, {new_state, NewState}, _} -> NewState;
- {noreply, NewState} -> NewState
- end,
- {noreply, State};
-
-%% stop
-handle_cast(stop, State) ->
- {stop, normal, State};
-
-%% ignored cast
-handle_cast(Msg, State) ->
- showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
- {noreply, State}.
-
-
-%% @doc handle nodedown messages because we have
-%% net_kernel:monitor_nodes(true)
-handle_info({nodedown, Node}, State) ->
- showroom_log:message(alert, "membership: nodedown ~p", [Node]),
- notify(nodedown, [Node], State),
- {noreply, State};
-
-%% @doc handle nodeup messages because we have
-%% net_kernel:monitor_nodes(true)
-handle_info({nodeup, Node}, State) ->
- showroom_log:message(alert, "membership: nodeup ~p", [Node]),
- notify(nodeup, [Node], State),
- gossip_cast(State),
- {noreply, State};
-
-%% ignored info
-handle_info(Info, State) ->
- showroom_log:message(info, "membership: ignored info: ~p", [Info]),
- {noreply, State}.
-
-
-% terminate
-terminate(_Reason, _State) ->
- ok.
-
-
-% ignored code change
-code_change(OldVsn, State, _Extra) ->
- io:format("Unknown Old Version~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
- {ok, State}.
-
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-
-%% @doc if Args has config use it, otherwise call configuration module
-%% most times Args will have config during testing runs
-%get_config(Args) ->
-% case proplists:get_value(config, Args) of
-% undefined -> configuration:get_config();
-% Any -> Any
-% end.
-
-
-get_test(Args) ->
- proplists:get_value(test, Args).
-
-
-%% @doc handle_init starts a node
-%% Most of the time, this puts the node in a single-node cluster setup,
-%% But, we could be automatically rejoining a cluster after some downtime.
-%% See handle_join for initing, joining, leaving a cluster, or replacing a
-%% node.
-%% @end
-handle_init(Test, nil) ->
- int_reset(Test);
-
-handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
- % there's an old state, let's try to rejoin automatically
- % but only if we can compare our old state to other available
- % nodes and get a match... otherwise get a human involved
- {_, NodeList, _} = lists:unzip3(Nodes),
- ping_all_yall(NodeList),
- {RemoteStates, _BadNodes} = get_remote_states(NodeList),
- Test = get_test(Args),
- case compare_state_with_rest(OldState, RemoteStates) of
- match ->
- showroom_log:message(info, "membership: rejoined successfully", []),
- OldState;
- Other ->
- showroom_log:message(error, "membership: rejoin failed: ~p", [Other]),
- int_reset(Test)
- end.
-
-
-%% @doc handle join activities, return {ok,NewState}
--spec handle_join(join_type(), [mem_node()], ping_node(), mem_state()) ->
- {ok, mem_state()}.
-% init
-handle_join(init, ExtNodes, nil, State) ->
- {_,Nodes,_} = lists:unzip3(ExtNodes),
- ping_all_yall(Nodes),
- int_join(ExtNodes, State);
-% join
-handle_join(join, ExtNodes, PingNode, #mem{args=Args} = State) ->
- NewState = case get_test(Args) of
- undefined -> get_pingnode_state(PingNode);
- _ -> State % testing, so meh
- end,
- % now use this info to join the ring
- int_join(ExtNodes, NewState);
-% replace
-handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) ->
- handle_join(replace, {OldNode, []}, PingNode, State);
-handle_join(replace, [OldNode | _], PingNode, State) ->
- handle_join(replace, {OldNode, []}, PingNode, State);
-handle_join(replace, {OldNode, NewOpts}, PingNode, State) ->
- OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode),
- {Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes),
- NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}),
- notify(node_leave, [OldNode], State),
- int_join([], OldState#mem{nodes=NewNodes});
-% leave
-handle_join(leave, [OldNode | _], _PingNode, State) ->
- % TODO implement me
- notify(node_leave, [OldNode], State),
- ok;
-
-handle_join(JoinType, _, PingNode, _) ->
- showroom_log:message(info, "membership: unknown join type: ~p "
- "for ping node: ~p", [JoinType, PingNode]),
- {error, unknown_join_type}.
-
-%% @doc common operations for all join types
-int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) ->
- NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
- check_pos(Pos, N, Nodes),
- notify(node_join, [N], State),
- [New|AccIn]
- end, Nodes, ExtNodes),
- NewNodes1 = lists:sort(NewNodes),
- NewClock = vector_clock:increment(node(), Clock),
- NewState = State#mem{nodes=NewNodes1, clock=NewClock},
- install_new_state(NewState),
- {ok, NewState}.
-
-
-install_new_state(#mem{args=Args} = State) ->
- Test = get_test(Args),
- save_state_file(Test, State),
- gossip(call, Test, State).
-
-
-get_pingnode_state(PingNode) ->
- {ok, RemoteState} = gen_server:call({?SERVER, PingNode}, state),
- RemoteState.
-
-
-%% @doc handle the gossip messages
-%% We're not using vector_clock:resolve b/c we need custom merge strategy
-handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
- LocalState=#mem{clock=LocalClock}) ->
- case vector_clock:compare(RemoteClock, LocalClock) of
- equal ->
- {reply, ok, LocalState};
- less ->
- % remote node needs updating
- {reply, {new_state, LocalState}, LocalState};
- greater when From == none->
- {noreply, install_new_state(RemoteState)};
- greater ->
- % local node needs updating
- gen_server:reply(From, ok), % reply to sender first
- {noreply, install_new_state(RemoteState)};
- concurrent ->
- % ick, so let's resolve and merge states
- showroom_log:message(info,
- "membership: Concurrent Clocks~n"
- "RemoteState : ~p~nLocalState : ~p~n"
- , [RemoteState, LocalState]),
- MergedState = merge_states(RemoteState, LocalState),
- if From =/= none ->
- % reply to sender
- gen_server:reply(From, {new_state, MergedState})
- end,
- {noreply, install_new_state(MergedState)}
- end.
-
-
-merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState,
- #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) ->
- MergedClock = vector_clock:merge(RemoteClock, LocalClock),
- MergedNodes = merge_nodes(RemoteNodes, LocalNodes),
- LocalState#mem{clock=MergedClock, nodes=MergedNodes}.
-
-
-%% this will give one of the lists back, deterministically
-merge_nodes(Remote, Local) ->
- % get rid of the initial 0 node if it's still there, and sort
- Remote1 = lists:usort(lists:keydelete(0,1,Remote)),
- Local1 = lists:usort(lists:keydelete(0,1,Local)),
- % handle empty lists as well as other cases
- case {Remote1, Local1} of
- {[], L} -> L;
- {R, []} -> R;
- _ -> erlang:min(Remote1, Local1)
- end.
-
-
-gossip(#mem{args=Args} = NewState) ->
- Test = get_test(Args),
- gossip(call, Test, NewState).
-
-
-gossip_cast(#mem{nodes=[]}) -> ok;
-gossip_cast(#mem{args=Args} = NewState) ->
- Test = get_test(Args),
- gossip(cast, Test, NewState).
-
-
--spec gossip(gossip_fun(), test(), mem_state()) -> mem_state().
-gossip(_, _, #mem{nodes=[]}) -> ok;
-gossip(Fun, undefined, #mem{nodes=StateNodes} = State) ->
- {_, Nodes, _} = lists:unzip3(StateNodes),
- case next_up_node(Nodes) of
- no_gossip_targets_available ->
- State; % skip gossip, I'm the only node
- TargetNode ->
- showroom_log:message(info, "membership: firing gossip from ~p to ~p",
- [node(), TargetNode]),
- case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of
- ok -> State;
- {new_state, NewState} -> NewState;
- Error -> throw({unknown_gossip_response, Error})
- end
- end;
-
-gossip(_,_,_) ->
- % testing, so don't gossip
- ok.
-
-
-next_up_node(Nodes) ->
- next_up_node(node(), Nodes, up_nodes()).
-
-
-next_up_node(Node, Nodes, UpNodes) ->
- {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
- List = lists:append(B, A), % be sure to eliminate Node
- DownNodes = Nodes -- UpNodes,
- case List -- DownNodes of
- [Target|_] -> Target;
- [] -> no_gossip_targets_available
- end.
-
-
-up_nodes() ->
- % TODO: implement cache (fb 9704 & 9449)
- erlang:nodes().
-
-
-%% @doc find the latest state file on disk
-find_latest_state_filename() ->
- Dir = couch_config:get("couchdb", "database_dir"),
- case file:list_dir(Dir) of
- {ok, Filenames} ->
- Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <-
- [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]],
- SortedTimestamps = lists:reverse(lists:sort(Timestamps)),
- case SortedTimestamps of
- [Latest | _] ->
- {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++
- integer_to_list(Latest)};
- _ ->
- throw({error, mem_state_file_not_found})
- end;
- {error, Reason} ->
- throw({error, Reason})
- end.
-
-
-%% (Test, Config)
-read_latest_state_file(undefined) ->
- try
- {ok, File} = find_latest_state_filename(),
- case file:consult(File) of
- {ok, [#mem{}=State]} -> State;
- _Else ->
- throw({error, bad_mem_state_file})
- end
- catch _:Error ->
- showroom_log:message(info, "membership: ~p", [Error]),
- nil
- end;
-read_latest_state_file(_) ->
- nil.
-
-
-%% @doc save the state file to disk, with current timestamp.
-%% thx to riak_ring_manager:do_write_ringfile/1
--spec save_state_file(test(), mem_state()) -> ok.
-save_state_file(undefined, State) ->
- Dir = couch_config:get("couchdb", "database_dir"),
- {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
- TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
- [Year, Month, Day, Hour, Minute, Second]),
- FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS,
- ok = filelib:ensure_dir(FN),
- {ok, File} = file:open(FN, [binary, write]),
- io:format(File, "~w.~n", [State]),
- file:close(File);
-
-save_state_file(_,_) -> ok. % don't save if testing
-
-
-check_pos(Pos, Node, Nodes) ->
- Found = lists:keyfind(Pos, 1, Nodes),
- case Found of
- false -> ok;
- _ ->
- {_,OldNode,_} = Found,
- if
- OldNode =:= Node ->
- Msg = "node_exists_at_position_" ++ integer_to_list(Pos),
- throw({error, list_to_binary(Msg)});
- true ->
- Msg = "position_exists_" ++ integer_to_list(Pos),
- throw({error, list_to_binary(Msg)})
- end
- end.
-
-
-int_reset(Test) ->
- int_reset(Test, #mem{}).
-
-
-int_reset(_Test, State) ->
- State#mem{nodes=[], clock=[]}.
-
-
-ping_all_yall(Nodes) ->
- lists:foreach(fun(Node) ->
- net_adm:ping(Node)
- end, Nodes),
- timer:sleep(500). % sigh.
-
-
-get_remote_states(NodeList) ->
- NodeList1 = lists:delete(node(), NodeList),
- {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000),
- {_Status, States2} = lists:unzip(States1),
- NodeList2 = NodeList1 -- BadNodes,
- {lists:zip(NodeList2,States2), BadNodes}.
-
-
-%% @doc compare state with states based on vector clock
-%% return match | {bad_state_match, Node, NodesThatDontMatch}
-compare_state_with_rest(#mem{clock=Clock} = _State, States) ->
- Results = lists:map(fun({Node, #mem{clock=Clock1}}) ->
- {vector_clock:equals(Clock, Clock1), Node}
- end, States),
- BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn;
- ({false, N}, AccIn) -> [N | AccIn]
- end, [], Results),
- if
- length(BadResults) == 0 -> match;
- true -> {bad_state_match, node(), BadResults}
- end.
+ application:stop(mem3).
-notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) ->
- {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)),
- lists:foreach(fun(Node) ->
- case lists:member(Node, MemNodes) orelse Type == nodedown of
- true ->
- gen_event:notify(membership_events, {Type, Node});
- _ -> ok % node not in cluster
- end
- end, Nodes).
+restart() ->
+ stop(),
+ start().
diff --git a/src/membership_app.erl b/src/mem3_app.erl
index df0f4fee..70bf1cf9 100644
--- a/src/membership_app.erl
+++ b/src/mem3_app.erl
@@ -1,11 +1,11 @@
--module(membership_app).
+-module(mem3_app).
-behaviour(application).
-export([start/2, stop/1]).
start(_Type, []) ->
- DbName = couch_config:get("membership", "db", "dbs"),
+ DbName = couch_config:get("mem3", "db", "dbs"),
couch_server:create(list_to_binary(DbName), []),
- membership_sup:start_link().
+ mem3_sup:start_link().
stop([]) ->
ok.
diff --git a/src/dbs_cache.erl b/src/mem3_cache.erl
index 1afb873b..8f5c372a 100644
--- a/src/dbs_cache.erl
+++ b/src/mem3_cache.erl
@@ -1,11 +1,11 @@
--module(dbs_cache).
+-module(mem3_cache).
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start_link/0]).
--include("membership.hrl").
+-include("mem3.hrl").
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
diff --git a/src/mem3_event.erl b/src/mem3_event.erl
new file mode 100644
index 00000000..59156adc
--- /dev/null
+++ b/src/mem3_event.erl
@@ -0,0 +1,74 @@
+-module(mem3_event).
+
+-behaviour(gen_event).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-include("mem3.hrl").
+
+init([]) ->
+ {ok, []}.
+
+handle_event({node_join, Node}, State) ->
+ start_repl({node_join, Node}, State);
+
+handle_event({nodeup, Node}, State) ->
+ start_repl({nodeup, Node}, State);
+
+handle_event({node_leave, Node}, State) ->
+ stop_repl({node_leave, Node}, State);
+
+handle_event({nodedown, Node}, State) ->
+ stop_repl({nodedown, Node}, State);
+
+handle_event(Event, State) ->
+ ?LOG_ERROR("unexpected event in dbs handler ~p", [Event]),
+ {ok, State}.
+
+handle_call(Request, State) ->
+ ?LOG_ERROR("unexpected call in dbs handler ~p", [Request]),
+ {ok, ok, State}.
+
+handle_info(Info, State) ->
+ ?LOG_ERROR("unexpected msg in dbs handler ~p", [Info]),
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%
+%% internal
+%%
+
+start_repl({Reason, Node}, State) ->
+ ChildSpec = dbs:childspec(Node),
+ case supervisor:start_child(dbs, ChildSpec) of
+ {ok, _} ->
+ ok;
+ {error, {already_started, _Child}} ->
+ ok;
+ {error, running} ->
+ ok;
+ {error, already_present} ->
+ case supervisor:restart_child(dbs, ChildSpec) of
+ {ok, _} ->
+ ok;
+ {error, running} ->
+ ok;
+ {error, Reason} ->
+ ?LOG_ERROR("dbs repl restart failed ~p", [Reason])
+ end;
+ {error, Reason} ->
+ ?LOG_ERROR("dbs repl start failed ~p", [Reason])
+ end,
+ {ok, State}.
+
+stop_repl({Reason, Node}, State) ->
+ ?LOG_INFO("dbs repl ~p --> ~p terminating (~p)", [node(), Node, Reason]),
+ supervisor:terminate_child(dbs, Node),
+ supervisor:delete_child(dbs, Node),
+ {ok, State}.
diff --git a/src/mem3_httpd.erl b/src/mem3_httpd.erl
new file mode 100644
index 00000000..2b29b488
--- /dev/null
+++ b/src/mem3_httpd.erl
@@ -0,0 +1,77 @@
+-module(mem3_httpd).
+
+-export([handle_membership_req/1]).
+
+%% includes
+-include("mem3.hrl").
+
+
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>]} = Req) ->
+ {ok,ClusterNodes} = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)}
+ ]});
+
+handle_membership_req(#httpd{method='POST',
+ path_parts=[<<"_membership">>]} = Req) ->
+ {JsonProps} = couch_httpd:json_body_obj(Req),
+ Method = couch_util:get_value(<<"method">>, JsonProps),
+ Params = couch_util:get_value(<<"params">>, JsonProps),
+ Id = couch_util:get_value(<<"id">>, JsonProps),
+ {Result, Error} = membership_dispatch(Method, Params),
+ couch_httpd:send_json(Req, {[
+ {result, Result},
+ {error, Error},
+ {id, Id}
+ ]}).
+
+%%
+%% internal
+%%
+membership_dispatch(<<"replace">>, Params) ->
+ OldNode = get_oldnode(Params),
+ NewNodeOpts = get_value_json(<<"newnode_options">>, Params, []),
+ PingNode = get_pingnode(Params),
+ send_join(replace, {OldNode, NewNodeOpts}, PingNode);
+membership_dispatch(TypeBin, Params) ->
+ Type = list_to_atom(?b2l(TypeBin)),
+ NodeList = get_value_json(<<"nodes">>, Params, []),
+ Nodes = lists:map(fun({List}) -> node_info(List) end, NodeList),
+ PingNode = get_pingnode(Params),
+ send_join(Type, Nodes, PingNode).
+
+get_pingnode(Params) ->
+ PingNodeBin = get_value_json(<<"pingnode">>, Params, <<"nil">>),
+ list_to_atom(?b2l(PingNodeBin)).
+
+get_oldnode(Params) ->
+ NodeBin = get_value_json(<<"oldnode">>, Params, undefined),
+ NodeList = ?b2l(NodeBin),
+ list_to_atom(NodeList).
+
+%% @doc send join command to mem module
+send_join(Type, Payload, PingNode) ->
+ case mem3:join(Type, Payload, PingNode) of
+ ok -> {ok, null};
+ {error, Error} -> {Type, Error};
+ Other ->
+ ?LOG_ERROR("membership dispatch error ~p", [Other]),
+ {Type, unknown_error}
+ end.
+
+node_info(List) ->
+ Order = couch_util:get_value(<<"order">>, List),
+ Node1 = couch_util:get_value(<<"node">>, List),
+ Node2 = list_to_atom(?b2l(Node1)),
+ Options = couch_util:get_value(<<"options">>, List),
+ {Order, Node2, Options}.
+
+get_value_json(_,[], Default) -> Default;
+get_value_json(Key, [JsonProp|Rest], Default) ->
+ case JsonProp of
+ {[{Key, Value}]} -> Value;
+ _ -> get_value_json(Key, Rest, Default)
+ end.
diff --git a/src/mem3_server.erl b/src/mem3_server.erl
new file mode 100644
index 00000000..863e752f
--- /dev/null
+++ b/src/mem3_server.erl
@@ -0,0 +1,568 @@
+%%% membership module
+%%%
+%%% State of the gen_server is a #mem record
+%%%
+%%% Nodes and Gossip are the same thing, and are a list of three-tuples like:
+%%%
+%%% [ {Pos,NodeName,Options} | _ ]
+%%%
+%%% Position is 1-based incrementing in order of node joining
+%%%
+%%% Options is a proplist, with [{hints, [Part1|_]}] denoting that the node
+%%% is responsible for the extra partitions too.
+%%%
+%%% TODO: dialyzer type specs
+%%%
+-module(mem3_server).
+-author('brad@cloudant.com').
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0, start_link/1, stop/0, stop/1, reset/0]).
+-export([join/3, clock/0, state/0, states/0, nodes/0, fullnodes/0,
+ start_gossip/0]).
+
+%% for testing more than anything else
+-export([merge_nodes/2, next_up_node/1, next_up_node/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% includes
+-include("mem3.hrl").
+
+-define(SERVER, membership).
+-define(STATE_FILE_PREFIX, "membership").
+
+
+%%====================================================================
+%% API
+%%====================================================================
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+ start_link([]).
+
+
+-spec start_link(args()) -> {ok, pid()}.
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []).
+
+
+-spec stop() -> ok.
+stop() ->
+ stop(?MODULE).
+
+
+-spec stop(atom()) -> ok.
+stop(Server) ->
+ gen_server:cast(Server, stop).
+
+
+-spec join(join_type(), mem_node_list() | {node(), options()}, node() | nil) ->
+ ok.
+join(JoinType, Payload, PingNode) ->
+ gen_server:call(?SERVER, {join, JoinType, Payload, PingNode}).
+
+
+-spec clock() -> vector_clock().
+clock() ->
+ gen_server:call(?SERVER, clock).
+
+
+-spec state() -> mem_state().
+state() ->
+ gen_server:call(?SERVER, state).
+
+
+%% @doc Detailed report of cluster-wide membership state. Queries the state
+%% on all member nodes and builds a dictionary with unique states as the
+%% key and the nodes holding that state as the value. Also reports member
+%% nodes which fail to respond and nodes which are connected but are not
+%% cluster members. Useful for debugging.
+-spec states() -> [{mem_state() | bad_nodes | non_member_nodes, [node()]}].
+states() ->
+ {ok, Nodes} = mem3:nodes(),
+ AllNodes = [node()|erlang:nodes()],
+ {Replies, BadNodes} = gen_server:multi_call(Nodes, ?SERVER, state),
+ Dict = lists:foldl(fun({Node, {ok,State}}, D) ->
+ orddict:append(State, Node, D)
+ end, orddict:new(), Replies),
+ [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
+
+-spec start_gossip() -> ok.
+start_gossip() ->
+ gen_server:call(?SERVER, start_gossip).
+
+
+-spec reset() -> ok | not_reset.
+reset() ->
+ gen_server:call(?SERVER, reset).
+
+
+%% @doc get the list of cluster nodes (according to membership module)
+%% This may differ from erlang:nodes()
+%% Guaranteed to be in order of State's node list (1st elem in 3-tuple)
+-spec nodes() -> {ok, [node()]}.
+nodes() ->
+ gen_server:call(?SERVER, nodes).
+
+
+%% @doc get the list of cluster nodes (according to membership module)
+%% This may differ from erlang:nodes()
+%% Guaranteed to be in order of State's node list (1st elem in 3-tuple)
+-spec fullnodes() -> {ok, [mem_node()]}.
+fullnodes() ->
+ gen_server:call(?SERVER, fullnodes).
+
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%% start up membership server
+-spec init(args()) -> {ok, mem_state()}.
+init(Args) ->
+ process_flag(trap_exit,true),
+ Test = get_test(Args),
+ OldState = read_latest_state_file(Test),
+ showroom_log:message(info, "membership: membership server starting...", []),
+ net_kernel:monitor_nodes(true),
+ State = handle_init(Test, OldState),
+ {ok, State#mem{args=Args}}.
+
+
+%% new node(s) joining to this node
+handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) ->
+ try
+ case handle_join(JoinType, ExtNodes, PingNode, State) of
+ {ok, NewState} -> {reply, ok, NewState};
+ Other -> {reply, Other, State}
+ end
+ catch _:Error ->
+ showroom_log:message(error, "~p", [Error]),
+ {reply, Error, State}
+ end;
+
+%% clock
+handle_call(clock, _From, #mem{clock=Clock} = State) ->
+ {reply, {ok, Clock}, State};
+
+%% state
+handle_call(state, _From, State) ->
+ {reply, {ok, State}, State};
+
+%% reset - but only if we're in test mode
+handle_call(reset, _From, #mem{args=Args} = State) ->
+ Test = get_test(Args),
+ case Test of
+ undefined -> {reply, not_reset, State};
+ _ -> {reply, ok, int_reset(Test, State)}
+ end;
+
+%% nodes
+handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
+ {_,NodeList,_} = lists:unzip3(lists:keysort(1, Nodes)),
+ {reply, {ok, NodeList}, State};
+
+%% fullnodes
+handle_call(fullnodes, _From, #mem{nodes=Nodes} = State) ->
+ {reply, {ok, Nodes}, State};
+
+%% gossip
+handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) ->
+ showroom_log:message(info, "membership: received gossip from ~p",
+ [erlang:node(Pid)]),
+ handle_gossip(From, RemoteState, LocalState);
+
+% start_gossip
+handle_call(start_gossip, _From, State) ->
+ NewState = gossip(State),
+ {reply, ok, NewState};
+
+%% ignored call
+handle_call(Msg, _From, State) ->
+ showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
+ {reply, ignored, State}.
+
+
+%% gossip
+handle_cast({gossip, RemoteState}, LocalState) ->
+ State = case handle_gossip(none, RemoteState, LocalState) of
+ {reply, ok, NewState} -> NewState;
+ {reply, {new_state, NewState}, _} -> NewState;
+ {noreply, NewState} -> NewState
+ end,
+ {noreply, State};
+
+%% stop
+handle_cast(stop, State) ->
+ {stop, normal, State};
+
+%% ignored cast
+handle_cast(Msg, State) ->
+ showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
+ {noreply, State}.
+
+
+%% @doc handle nodedown messages because we have
+%% net_kernel:monitor_nodes(true)
+handle_info({nodedown, Node}, State) ->
+ showroom_log:message(alert, "membership: nodedown ~p", [Node]),
+ notify(nodedown, [Node], State),
+ {noreply, State};
+
+%% @doc handle nodeup messages because we have
+%% net_kernel:monitor_nodes(true)
+handle_info({nodeup, Node}, State) ->
+ showroom_log:message(alert, "membership: nodeup ~p", [Node]),
+ notify(nodeup, [Node], State),
+ gossip_cast(State),
+ {noreply, State};
+
+%% ignored info
+handle_info(Info, State) ->
+ showroom_log:message(info, "membership: ignored info: ~p", [Info]),
+ {noreply, State}.
+
+
+% terminate
+terminate(_Reason, _State) ->
+ ok.
+
+
+% ignored code change
+code_change(OldVsn, State, _Extra) ->
+ io:format("Unknown Old Version~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
+ {ok, State}.
+
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+%% @doc if Args has config use it, otherwise call configuration module
+%% most times Args will have config during testing runs
+%get_config(Args) ->
+% case proplists:get_value(config, Args) of
+% undefined -> configuration:get_config();
+% Any -> Any
+% end.
+
+
+get_test(Args) ->
+ proplists:get_value(test, Args).
+
+
+%% @doc handle_init starts a node
+%% Most of the time, this puts the node in a single-node cluster setup,
+%% But, we could be automatically rejoining a cluster after some downtime.
+%% See handle_join for initing, joining, leaving a cluster, or replacing a
+%% node.
+%% @end
+handle_init(Test, nil) ->
+ int_reset(Test);
+
+handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
+ % there's an old state, let's try to rejoin automatically
+ % but only if we can compare our old state to other available
+ % nodes and get a match... otherwise get a human involved
+ {_, NodeList, _} = lists:unzip3(Nodes),
+ ping_all_yall(NodeList),
+ {RemoteStates, _BadNodes} = get_remote_states(NodeList),
+ Test = get_test(Args),
+ case compare_state_with_rest(OldState, RemoteStates) of
+ match ->
+ showroom_log:message(info, "membership: rejoined successfully", []),
+ OldState;
+ Other ->
+ showroom_log:message(error, "membership: rejoin failed: ~p", [Other]),
+ int_reset(Test)
+ end.
+
+
+%% @doc handle join activities, return {ok,NewState}
+-spec handle_join(join_type(), [mem_node()], ping_node(), mem_state()) ->
+ {ok, mem_state()}.
+% init
+handle_join(init, ExtNodes, nil, State) ->
+ {_,Nodes,_} = lists:unzip3(ExtNodes),
+ ping_all_yall(Nodes),
+ int_join(ExtNodes, State);
+% join
+handle_join(join, ExtNodes, PingNode, #mem{args=Args} = State) ->
+ NewState = case get_test(Args) of
+ undefined -> get_pingnode_state(PingNode);
+ _ -> State % testing, so meh
+ end,
+ % now use this info to join the ring
+ int_join(ExtNodes, NewState);
+% replace
+handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) ->
+ handle_join(replace, {OldNode, []}, PingNode, State);
+handle_join(replace, [OldNode | _], PingNode, State) ->
+ handle_join(replace, {OldNode, []}, PingNode, State);
+handle_join(replace, {OldNode, NewOpts}, PingNode, State) ->
+ OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode),
+ {Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes),
+ NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}),
+ notify(node_leave, [OldNode], State),
+ int_join([], OldState#mem{nodes=NewNodes});
+% leave
+handle_join(leave, [OldNode | _], _PingNode, State) ->
+ % TODO implement me
+ notify(node_leave, [OldNode], State),
+ ok;
+
+handle_join(JoinType, _, PingNode, _) ->
+ showroom_log:message(info, "membership: unknown join type: ~p "
+ "for ping node: ~p", [JoinType, PingNode]),
+ {error, unknown_join_type}.
+
+%% @doc common operations for all join types
+int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) ->
+ NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
+ check_pos(Pos, N, Nodes),
+ notify(node_join, [N], State),
+ [New|AccIn]
+ end, Nodes, ExtNodes),
+ NewNodes1 = lists:sort(NewNodes),
+ NewClock = vector_clock:increment(node(), Clock),
+ NewState = State#mem{nodes=NewNodes1, clock=NewClock},
+ install_new_state(NewState),
+ {ok, NewState}.
+
+
+install_new_state(#mem{args=Args} = State) ->
+ Test = get_test(Args),
+ save_state_file(Test, State),
+ gossip(call, Test, State).
+
+
+get_pingnode_state(PingNode) ->
+ {ok, RemoteState} = gen_server:call({?SERVER, PingNode}, state),
+ RemoteState.
+
+
+%% @doc handle the gossip messages
+%% We're not using vector_clock:resolve b/c we need custom merge strategy
+handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
+ LocalState=#mem{clock=LocalClock}) ->
+ case vector_clock:compare(RemoteClock, LocalClock) of
+ equal ->
+ {reply, ok, LocalState};
+ less ->
+ % remote node needs updating
+ {reply, {new_state, LocalState}, LocalState};
+ greater when From == none->
+ {noreply, install_new_state(RemoteState)};
+ greater ->
+ % local node needs updating
+ gen_server:reply(From, ok), % reply to sender first
+ {noreply, install_new_state(RemoteState)};
+ concurrent ->
+ % ick, so let's resolve and merge states
+ showroom_log:message(info,
+ "membership: Concurrent Clocks~n"
+ "RemoteState : ~p~nLocalState : ~p~n"
+ , [RemoteState, LocalState]),
+ MergedState = merge_states(RemoteState, LocalState),
+ if From =/= none ->
+ % reply to sender
+ gen_server:reply(From, {new_state, MergedState})
+ end,
+ {noreply, install_new_state(MergedState)}
+ end.
+
+
+merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState,
+ #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) ->
+ MergedClock = vector_clock:merge(RemoteClock, LocalClock),
+ MergedNodes = merge_nodes(RemoteNodes, LocalNodes),
+ LocalState#mem{clock=MergedClock, nodes=MergedNodes}.
+
+
+%% this will give one of the lists back, deterministically
+merge_nodes(Remote, Local) ->
+ % get rid of the initial 0 node if it's still there, and sort
+ Remote1 = lists:usort(lists:keydelete(0,1,Remote)),
+ Local1 = lists:usort(lists:keydelete(0,1,Local)),
+ % handle empty lists as well as other cases
+ case {Remote1, Local1} of
+ {[], L} -> L;
+ {R, []} -> R;
+ _ -> erlang:min(Remote1, Local1)
+ end.
+
+
+gossip(#mem{args=Args} = NewState) ->
+ Test = get_test(Args),
+ gossip(call, Test, NewState).
+
+
+gossip_cast(#mem{nodes=[]}) -> ok;
+gossip_cast(#mem{args=Args} = NewState) ->
+ Test = get_test(Args),
+ gossip(cast, Test, NewState).
+
+
+-spec gossip(gossip_fun(), test(), mem_state()) -> mem_state().
+gossip(_, _, #mem{nodes=[]}) -> ok;
+gossip(Fun, undefined, #mem{nodes=StateNodes} = State) ->
+ {_, Nodes, _} = lists:unzip3(StateNodes),
+ case next_up_node(Nodes) of
+ no_gossip_targets_available ->
+ State; % skip gossip, I'm the only node
+ TargetNode ->
+ showroom_log:message(info, "membership: firing gossip from ~p to ~p",
+ [node(), TargetNode]),
+ case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of
+ ok -> State;
+ {new_state, NewState} -> NewState;
+ Error -> throw({unknown_gossip_response, Error})
+ end
+ end;
+
+gossip(_,_,_) ->
+ % testing, so don't gossip
+ ok.
+
+
+next_up_node(Nodes) ->
+ next_up_node(node(), Nodes, up_nodes()).
+
+
+next_up_node(Node, Nodes, UpNodes) ->
+ {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
+ List = lists:append(B, A), % be sure to eliminate Node
+ DownNodes = Nodes -- UpNodes,
+ case List -- DownNodes of
+ [Target|_] -> Target;
+ [] -> no_gossip_targets_available
+ end.
+
+
+up_nodes() ->
+ % TODO: implement cache (fb 9704 & 9449)
+ erlang:nodes().
+
+
+%% @doc find the latest state file on disk
+find_latest_state_filename() ->
+ Dir = couch_config:get("couchdb", "database_dir"),
+ case file:list_dir(Dir) of
+ {ok, Filenames} ->
+ Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <-
+ [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]],
+ SortedTimestamps = lists:reverse(lists:sort(Timestamps)),
+ case SortedTimestamps of
+ [Latest | _] ->
+ {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++
+ integer_to_list(Latest)};
+ _ ->
+ throw({error, mem_state_file_not_found})
+ end;
+ {error, Reason} ->
+ throw({error, Reason})
+ end.
+
+
+%% (Test, Config)
+read_latest_state_file(undefined) ->
+ try
+ {ok, File} = find_latest_state_filename(),
+ case file:consult(File) of
+ {ok, [#mem{}=State]} -> State;
+ _Else ->
+ throw({error, bad_mem_state_file})
+ end
+ catch _:Error ->
+ showroom_log:message(info, "membership: ~p", [Error]),
+ nil
+ end;
+read_latest_state_file(_) ->
+ nil.
+
+
+%% @doc save the state file to disk, with current timestamp.
+%% thx to riak_ring_manager:do_write_ringfile/1
+-spec save_state_file(test(), mem_state()) -> ok.
+save_state_file(undefined, State) ->
+ Dir = couch_config:get("couchdb", "database_dir"),
+ {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
+ TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
+ [Year, Month, Day, Hour, Minute, Second]),
+ FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS,
+ ok = filelib:ensure_dir(FN),
+ {ok, File} = file:open(FN, [binary, write]),
+ io:format(File, "~w.~n", [State]),
+ file:close(File);
+
+save_state_file(_,_) -> ok. % don't save if testing
+
+
+check_pos(Pos, Node, Nodes) ->
+ Found = lists:keyfind(Pos, 1, Nodes),
+ case Found of
+ false -> ok;
+ _ ->
+ {_,OldNode,_} = Found,
+ if
+ OldNode =:= Node ->
+ Msg = "node_exists_at_position_" ++ integer_to_list(Pos),
+ throw({error, list_to_binary(Msg)});
+ true ->
+ Msg = "position_exists_" ++ integer_to_list(Pos),
+ throw({error, list_to_binary(Msg)})
+ end
+ end.
+
+
+int_reset(Test) ->
+ int_reset(Test, #mem{}).
+
+
+int_reset(_Test, State) ->
+ State#mem{nodes=[], clock=[]}.
+
+
+ping_all_yall(Nodes) ->
+ lists:foreach(fun(Node) ->
+ net_adm:ping(Node)
+ end, Nodes),
+ timer:sleep(500). % sigh.
+
+
+get_remote_states(NodeList) ->
+ NodeList1 = lists:delete(node(), NodeList),
+ {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000),
+ {_Status, States2} = lists:unzip(States1),
+ NodeList2 = NodeList1 -- BadNodes,
+ {lists:zip(NodeList2,States2), BadNodes}.
+
+
+%% @doc compare state with states based on vector clock
+%% return match | {bad_state_match, Node, NodesThatDontMatch}
+compare_state_with_rest(#mem{clock=Clock} = _State, States) ->
+ Results = lists:map(fun({Node, #mem{clock=Clock1}}) ->
+ {vector_clock:equals(Clock, Clock1), Node}
+ end, States),
+ BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn;
+ ({false, N}, AccIn) -> [N | AccIn]
+ end, [], Results),
+ if
+ length(BadResults) == 0 -> match;
+ true -> {bad_state_match, node(), BadResults}
+ end.
+
+notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) ->
+ {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)),
+ lists:foreach(fun(Node) ->
+ case lists:member(Node, MemNodes) orelse Type == nodedown of
+ true ->
+ gen_event:notify(membership_events, {Type, Node});
+ _ -> ok % node not in cluster
+ end
+ end, Nodes).
diff --git a/src/mem3_sup.erl b/src/mem3_sup.erl
new file mode 100644
index 00000000..122e68d7
--- /dev/null
+++ b/src/mem3_sup.erl
@@ -0,0 +1,22 @@
+-module(mem3_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+
+start_link() ->
+ supervisor:start_link(?MODULE, []).
+
+init(_Args) ->
+ Children = [
+ child(mem3_server),
+ child(mem3_event),
+ child(mem3_sync),
+ child(mem3_cache)
+ ],
+ {ok, {{one_for_one,10,1}, Children}}.
+
+child(mem3_event) ->
+ MFA = {gen_event, start_link, [{local,mem3_event}]},
+ {mem3_event, MFA, permanent, 1000, worker, dynamic};
+child(Child) ->
+ {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.
+ \ No newline at end of file
diff --git a/src/dbs.erl b/src/mem3_sync.erl
index 345788ef..d50514d9 100644
--- a/src/dbs.erl
+++ b/src/mem3_sync.erl
@@ -1,9 +1,9 @@
--module(dbs).
+-module(mem3_sync).
-behaviour(supervisor).
-export([start_link/0, init/1, childspec/1, sup_upgrade_notify/2]).
--include("membership.hrl").
+-include("mem3.hrl").
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
diff --git a/src/partitions.erl b/src/mem3_util.erl
index 3e024264..f6c94748 100644
--- a/src/partitions.erl
+++ b/src/mem3_util.erl
@@ -1,4 +1,4 @@
--module(partitions).
+-module(mem3_util).
-author('brad@cloudant.com').
%% API
@@ -8,9 +8,7 @@
-define(RINGTOP, trunc(math:pow(2,160))). % SHA-1 space
--include("../../couch/src/couch_db.hrl").
--include("../../dynomite/include/membership.hrl").
--include_lib("eunit/include/eunit.hrl").
+-include("mem3.hrl").
%%====================================================================
%% API
diff --git a/src/vector_clock.erl b/src/mem3_vclock.erl
index 0a422334..a48da43c 100644
--- a/src/vector_clock.erl
+++ b/src/mem3_vclock.erl
@@ -1,7 +1,7 @@
%%% @author Cliff Moon <cliff@powerset.com> []
%%% @copyright 2008 Cliff Moon
--module (vector_clock).
+-module (mem3_vclock).
-export ([create/1, truncate/1, increment/2, compare/2, resolve/2, merge/2,
equals/2]).
diff --git a/src/membership.erl b/src/membership.erl
deleted file mode 100644
index 1e06e798..00000000
--- a/src/membership.erl
+++ /dev/null
@@ -1,15 +0,0 @@
--module(membership).
--author('Brad Anderson <brad@cloudant.com>').
-
--export([start/0, stop/0, restart/0]).
-
-
-start() ->
- application:start(membership).
-
-stop() ->
- application:stop(membership).
-
-restart() ->
- stop(),
- start().
diff --git a/src/membership_sup.erl b/src/membership_sup.erl
deleted file mode 100644
index f203924d..00000000
--- a/src/membership_sup.erl
+++ /dev/null
@@ -1,44 +0,0 @@
--module(membership_sup).
--author('brad@cloudant.com').
-
--behaviour(supervisor).
-
-%% API
--export([start_link/0]).
-
-%% Supervisor callbacks
--export([init/1]).
-
--define(SERVER, ?MODULE).
-
-start_link() ->
- supervisor:start_link(?MODULE, []).
-
-init(_Args) ->
- Membership = {membership,
- {mem3, start_link, []},
- permanent,
- 1000,
- worker,
- [mem3]},
- MemEventMgr = {mem_event_manager,
- {gen_event, start_link, [{local, membership_events}]},
- permanent,
- 1000,
- worker,
- []},
- DbsRepl =
- {dbs,
- {dbs, start_link, []},
- permanent,
- infinity,
- supervisor,
- [dbs]},
- DbsCache =
- {dbs_cache,
- {dbs_cache, start_link, []},
- permanent,
- 1000,
- worker,
- [dbs_cache]},
- {ok, {{one_for_one,10,1}, [Membership, MemEventMgr, DbsRepl, DbsCache]}}.