summaryrefslogtreecommitdiff
path: root/src/mem3.erl
diff options
context:
space:
mode:
authorBrad Anderson <brad@cloudant.com>2010-04-26 15:54:17 -0400
committerBrad Anderson <brad@cloudant.com>2010-05-09 22:56:24 -0400
commiteb593c0557710c29f8c476f43d72fb54172b8e4e (patch)
treeaf7f185ce7be819282aacc8fef82044ebaac2d3a /src/mem3.erl
parent0e88f2bd418737f9df5383b82811b07135e179c8 (diff)
reworking gossip, BugzID 10069
Diffstat (limited to 'src/mem3.erl')
-rw-r--r--src/mem3.erl163
1 files changed, 64 insertions, 99 deletions
diff --git a/src/mem3.erl b/src/mem3.erl
index cec2631c..c5c558da 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -165,11 +165,9 @@ init(Args) ->
%% new node(s) joining to this node
-handle_call({join, JoinType, ExtNodes}, _From,
- #mem{args=Args} = State) ->
- Config = get_config(Args),
+handle_call({join, JoinType, ExtNodes}, _From, State) ->
try
- NewState = handle_join(JoinType, ExtNodes, State, Config),
+ NewState = handle_join(JoinType, ExtNodes, State),
{reply, ok, NewState}
catch _:Error ->
showroom_log:message(error, "~p", [Error]),
@@ -189,10 +187,7 @@ handle_call(reset, _From, #mem{args=Args} = State) ->
Test = proplists:get_value(test, Args),
case Test of
undefined -> {reply, not_reset, State};
- _ ->
- mochiglobal:delete(pmap),
- mochiglobal:delete(fullmap),
- {reply, ok, int_reset(Test, State)}
+ _ -> {reply, ok, int_reset(Test, State)}
end;
%% nodes
@@ -200,6 +195,12 @@ handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
{_,NodeList,_} = lists:unzip3(Nodes),
{reply, {ok, NodeList}, State};
+%% gossip
+handle_call({gossip, #mem{node=RemoteNode} = RemoteState}, From, LocalState) ->
+ showroom_log:message(info, "membership: received gossip from ~p",
+ [RemoteNode]),
+ handle_gossip(From, RemoteState, LocalState);
+
%% ignored call
handle_call(Msg, _From, State) ->
showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
@@ -210,12 +211,6 @@ handle_call(Msg, _From, State) ->
handle_cast(stop, State) ->
{stop, normal, State};
-%% gossip
-handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) ->
- showroom_log:message(info, "membership: received gossip from ~p",
- [RemoteNode]),
- {noreply, handle_gossip(RemoteState, LocalState)};
-
%% ignored cast
handle_cast(Msg, State) ->
showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
@@ -294,52 +289,60 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
%% handle join activities, return NewState
-handle_join(first, ExtNodes, State, Config) ->
+handle_join(first, ExtNodes, State) ->
{_,Nodes,_} = lists:unzip3(ExtNodes),
ping_all_yall(Nodes),
- int_join(first, ExtNodes, State, Config);
+ int_join(ExtNodes, State);
-handle_join(new, ExtNodes, State, Config) ->
- int_join(new, ExtNodes, State, Config);
+handle_join(new, ExtNodes, State) ->
+ {_,Nodes,_} = lists:unzip3(ExtNodes),
+ ping_all_yall(Nodes),
+ int_join(ExtNodes, State);
-handle_join(replace, [_OldNode | _], _State, _Config) ->
+handle_join(replace, [_OldNode | _], _State) ->
% TODO implement me
ok;
-handle_join(JoinType, _, _, _) ->
+handle_join(JoinType, _, _) ->
showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]),
{error, {unknown_join_type, JoinType}}.
-int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State,
- Config) ->
+int_join(ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State) ->
+ NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
+ check_pos(Pos, N, Nodes),
+ [New|AccIn]
+ end, Nodes, ExtNodes),
+ NewNodes1 = lists:sort(NewNodes),
NewClock = vector_clock:increment(Node, Clock),
- NewState = State#mem{nodes=ExtNodes, clock=NewClock},
- {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
- new_state(NewState, Pmap, Fullmap, Config).
+ NewState = State#mem{nodes=NewNodes1, clock=NewClock},
+ install_new_state(NewState),
+ NewState.
%% @doc handle the gossip messages
%% We're not using vector_clock:resolve b/c we need custom merge strategy
-handle_gossip(RemoteState=#mem{clock=RemoteClock, node=RemoteNode},
+handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
LocalState=#mem{clock=LocalClock}) ->
case vector_clock:compare(RemoteClock, LocalClock) of
- equal -> LocalState;
+ equal ->
+ {reply, ok, LocalState};
less ->
% remote node needs updating
- gen_server:cast({?SERVER, RemoteNode}, {gossip, LocalState}),
- LocalState;
+ {reply, {new_state, LocalState}, LocalState};
greater ->
% local node needs updating
- new_state(RemoteState);
+ gen_server:reply(From, ok), % reply to sender first
+ install_new_state(RemoteState);
concurrent ->
% ick, so let's resolve and merge states
showroom_log:message(info,
- "~nmembership: Concurrent Clocks~n"
- "~nRemoteState : ~p~nLocalState : ~p~n"
+ "membership: Concurrent Clocks~n"
+ "RemoteState : ~p~nLocalState : ~p~n"
, [RemoteState, LocalState]),
MergedState = merge_states(RemoteState, LocalState),
- new_state(MergedState)
+ gen_server:reply(From, {new_state, MergedState}), % reply to sender
+ install_new_state(MergedState)
end.
@@ -363,29 +366,39 @@ merge_nodes(Remote, Local) ->
end.
-% notify(Type, Nodes) ->
-% lists:foreach(fun(Node) ->
-% gen_event:notify(membership_events, {Type, Node})
-% end, Nodes).
-
gossip(#mem{args=Args} = NewState) ->
Test = proplists:get_value(test, Args),
gossip(Test, NewState).
-gossip(undefined, #mem{node=Node, nodes=StateNodes} = NewState) ->
+gossip(undefined, #mem{node=Node, nodes=StateNodes} = State) ->
{_, Nodes, _} = lists:unzip3(StateNodes),
- PartnersPlus = replication:partners_plus(Node, Nodes),
- lists:foreach(fun(TargetNode) ->
- showroom_log:message(info, "membership: firing gossip from ~p to ~p",
+ TargetNode = next_up_node(Node, Nodes),
+ showroom_log:message(info, "membership: firing gossip from ~p to ~p",
[Node, TargetNode]),
- gen_server:cast({?SERVER, TargetNode}, {gossip, NewState})
- end, PartnersPlus);
+ case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of
+ ok -> ok;
+ {new_state, _NewState} -> ?debugHere,ok;
+ Error -> throw({unknown_gossip_response, Error})
+ end;
+
gossip(_,_) ->
% testing, so don't gossip
ok.
+next_up_node(Node, Nodes) ->
+ {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
+ List = lists:append([B, A, [Node]]),
+ UpNodes = lists:delete(fun(N) -> lists:member(N, up_nodes()) end, List),
+ hd(UpNodes). % TODO: empty list?
+
+
+up_nodes() ->
+ % TODO: implement cache (fb 9704 & 9449)
+ erlang:nodes().
+
+
%% @doc find the latest state file on disk
find_latest_state_filename(Config) ->
Dir = Config#config.directory,
@@ -424,6 +437,12 @@ read_latest_state_file(_, _) ->
nil.
+install_new_state(#mem{args=Args} = State) ->
+ Config = get_config(Args),
+ save_state_file(State, Config),
+ gossip(State).
+
+
%% @doc save the state file to disk, with current timestamp.
%% thx to riak_ring_manager:do_write_ringfile/1
save_state_file(State, Config) ->
@@ -438,58 +457,6 @@ save_state_file(State, Config) ->
file:close(File).
-%% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap}
-%% This is basically replaying all the mem events that have happened.
-create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) ->
- [{_,FirstNode,_}|_] = ExtNodes,
- Fun = fun({Pos, Node, Options}, Pmap) ->
- check_pos(Pos, Node, Nodes),
- Hints = proplists:get_value(hints, Options),
- {ok, NewPmap} = partitions:join(Node, Pmap, Hints),
- NewPmap
- end,
- Acc0 = case JoinType of
- first -> partitions:create_partitions(Q, FirstNode);
- new -> mochiglobal:get(pmap)
- end,
- Pmap = lists:foldl(Fun, Acc0, lists:keysort(1, ExtNodes)),
- {Pmap, make_fullmap(Pmap, Config)}.
-
-
-%% @doc construct a table with all partitions, with the primary node and all
-%% replication partner nodes as well.
-make_fullmap(PMap, Config) ->
- {Nodes, _Parts} = lists:unzip(PMap),
- NodeParts = lists:flatmap(
- fun({Node,Part}) ->
- Partners = replication:partners(Node, lists:usort(Nodes), Config),
- PartnerList = [{Partner, Part} || Partner <- Partners],
- [{Node, Part} | PartnerList]
- end, PMap),
- NodeParts.
-
-
-%% @doc tasks associated with a new state
-new_state(#mem{nodes=Nodes, args=Args} = State) ->
- Config = get_config(Args),
- {Pmap, Fullmap} = create_maps(Config, first, Nodes, []),
- new_state(State, Pmap, Fullmap, Config).
-
-
-%% @doc tasks associated with a new state
-new_state(State, Pmap, Fullmap, Config) ->
- update_cache(Pmap, Fullmap),
- save_state_file(State, Config),
- gossip(State),
- State.
-
-
-%% cache table helper function
-update_cache(Pmap, Fullmap) ->
- mochiglobal:put(pmap, Pmap),
- mochiglobal:put(fullmap, Fullmap).
-
-
check_pos(Pos, Node, Nodes) ->
Found = lists:keyfind(Pos, 1, Nodes),
case Found of
@@ -514,9 +481,7 @@ int_reset(Test, State) ->
undefined -> node();
_ -> Test
end,
- Nodes = [{0, Node, []}],
- Clock = vector_clock:create(Node),
- State#mem{node=Node, nodes=Nodes, clock=Clock}.
+ State#mem{node=Node, nodes=[], clock=[]}.
ping_all_yall(Nodes) ->