diff options
author | Brad Anderson <brad@cloudant.com> | 2010-04-26 15:54:17 -0400 |
---|---|---|
committer | Brad Anderson <brad@cloudant.com> | 2010-05-09 22:56:24 -0400 |
commit | eb593c0557710c29f8c476f43d72fb54172b8e4e (patch) | |
tree | af7f185ce7be819282aacc8fef82044ebaac2d3a /src/mem3.erl | |
parent | 0e88f2bd418737f9df5383b82811b07135e179c8 (diff) |
reworking gossip, BugzID 10069
Diffstat (limited to 'src/mem3.erl')
-rw-r--r-- | src/mem3.erl | 163 |
1 files changed, 64 insertions, 99 deletions
diff --git a/src/mem3.erl b/src/mem3.erl index cec2631c..c5c558da 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -165,11 +165,9 @@ init(Args) -> %% new node(s) joining to this node -handle_call({join, JoinType, ExtNodes}, _From, - #mem{args=Args} = State) -> - Config = get_config(Args), +handle_call({join, JoinType, ExtNodes}, _From, State) -> try - NewState = handle_join(JoinType, ExtNodes, State, Config), + NewState = handle_join(JoinType, ExtNodes, State), {reply, ok, NewState} catch _:Error -> showroom_log:message(error, "~p", [Error]), @@ -189,10 +187,7 @@ handle_call(reset, _From, #mem{args=Args} = State) -> Test = proplists:get_value(test, Args), case Test of undefined -> {reply, not_reset, State}; - _ -> - mochiglobal:delete(pmap), - mochiglobal:delete(fullmap), - {reply, ok, int_reset(Test, State)} + _ -> {reply, ok, int_reset(Test, State)} end; %% nodes @@ -200,6 +195,12 @@ handle_call(nodes, _From, #mem{nodes=Nodes} = State) -> {_,NodeList,_} = lists:unzip3(Nodes), {reply, {ok, NodeList}, State}; +%% gossip +handle_call({gossip, #mem{node=RemoteNode} = RemoteState}, From, LocalState) -> + showroom_log:message(info, "membership: received gossip from ~p", + [RemoteNode]), + handle_gossip(From, RemoteState, LocalState); + %% ignored call handle_call(Msg, _From, State) -> showroom_log:message(info, "membership: ignored call: ~p", [Msg]), @@ -210,12 +211,6 @@ handle_call(Msg, _From, State) -> handle_cast(stop, State) -> {stop, normal, State}; -%% gossip -handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) -> - showroom_log:message(info, "membership: received gossip from ~p", - [RemoteNode]), - {noreply, handle_gossip(RemoteState, LocalState)}; - %% ignored cast handle_cast(Msg, State) -> showroom_log:message(info, "membership: ignored cast: ~p", [Msg]), @@ -294,52 +289,60 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) -> %% handle join activities, return NewState -handle_join(first, ExtNodes, State, Config) -> +handle_join(first, ExtNodes, State) -> {_,Nodes,_} = lists:unzip3(ExtNodes), ping_all_yall(Nodes), - int_join(first, ExtNodes, State, Config); + int_join(ExtNodes, State); -handle_join(new, ExtNodes, State, Config) -> - int_join(new, ExtNodes, State, Config); +handle_join(new, ExtNodes, State) -> + {_,Nodes,_} = lists:unzip3(ExtNodes), + ping_all_yall(Nodes), + int_join(ExtNodes, State); -handle_join(replace, [_OldNode | _], _State, _Config) -> +handle_join(replace, [_OldNode | _], _State) -> % TODO implement me ok; -handle_join(JoinType, _, _, _) -> +handle_join(JoinType, _, _) -> showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]), {error, {unknown_join_type, JoinType}}. -int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State, - Config) -> +int_join(ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State) -> + NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) -> + check_pos(Pos, N, Nodes), + [New|AccIn] + end, Nodes, ExtNodes), + NewNodes1 = lists:sort(NewNodes), NewClock = vector_clock:increment(Node, Clock), - NewState = State#mem{nodes=ExtNodes, clock=NewClock}, - {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes), - new_state(NewState, Pmap, Fullmap, Config). + NewState = State#mem{nodes=NewNodes1, clock=NewClock}, + install_new_state(NewState), + NewState. %% @doc handle the gossip messages %% We're not using vector_clock:resolve b/c we need custom merge strategy -handle_gossip(RemoteState=#mem{clock=RemoteClock, node=RemoteNode}, +handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, LocalState=#mem{clock=LocalClock}) -> case vector_clock:compare(RemoteClock, LocalClock) of - equal -> LocalState; + equal -> + {reply, ok, LocalState}; less -> % remote node needs updating - gen_server:cast({?SERVER, RemoteNode}, {gossip, LocalState}), - LocalState; + {reply, {new_state, LocalState}, LocalState}; greater -> % local node needs updating - new_state(RemoteState); + gen_server:reply(From, ok), % reply to sender first + install_new_state(RemoteState); concurrent -> % ick, so let's resolve and merge states showroom_log:message(info, - "~nmembership: Concurrent Clocks~n" - "~nRemoteState : ~p~nLocalState : ~p~n" + "membership: Concurrent Clocks~n" + "RemoteState : ~p~nLocalState : ~p~n" , [RemoteState, LocalState]), MergedState = merge_states(RemoteState, LocalState), - new_state(MergedState) + gen_server:reply(From, {new_state, MergedState}), % reply to sender + install_new_state(MergedState) end. @@ -363,29 +366,39 @@ merge_nodes(Remote, Local) -> end. -% notify(Type, Nodes) -> -% lists:foreach(fun(Node) -> -% gen_event:notify(membership_events, {Type, Node}) -% end, Nodes). - gossip(#mem{args=Args} = NewState) -> Test = proplists:get_value(test, Args), gossip(Test, NewState). -gossip(undefined, #mem{node=Node, nodes=StateNodes} = NewState) -> +gossip(undefined, #mem{node=Node, nodes=StateNodes} = State) -> {_, Nodes, _} = lists:unzip3(StateNodes), - PartnersPlus = replication:partners_plus(Node, Nodes), - lists:foreach(fun(TargetNode) -> - showroom_log:message(info, "membership: firing gossip from ~p to ~p", + TargetNode = next_up_node(Node, Nodes), + showroom_log:message(info, "membership: firing gossip from ~p to ~p", [Node, TargetNode]), - gen_server:cast({?SERVER, TargetNode}, {gossip, NewState}) - end, PartnersPlus); + case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of + ok -> ok; + {new_state, _NewState} -> ?debugHere,ok; + Error -> throw({unknown_gossip_response, Error}) + end; + gossip(_,_) -> % testing, so don't gossip ok. +next_up_node(Node, Nodes) -> + {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes), + List = lists:append([B, A, [Node]]), + UpNodes = lists:delete(fun(N) -> lists:member(N, up_nodes()) end, List), + hd(UpNodes). % TODO: empty list? + + +up_nodes() -> + % TODO: implement cache (fb 9704 & 9449) + erlang:nodes(). + + %% @doc find the latest state file on disk find_latest_state_filename(Config) -> Dir = Config#config.directory, @@ -424,6 +437,12 @@ read_latest_state_file(_, _) -> nil. +install_new_state(#mem{args=Args} = State) -> + Config = get_config(Args), + save_state_file(State, Config), + gossip(State). + + %% @doc save the state file to disk, with current timestamp. %% thx to riak_ring_manager:do_write_ringfile/1 save_state_file(State, Config) -> @@ -438,58 +457,6 @@ save_state_file(State, Config) -> file:close(File). -%% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap} -%% This is basically replaying all the mem events that have happened. -create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) -> - [{_,FirstNode,_}|_] = ExtNodes, - Fun = fun({Pos, Node, Options}, Pmap) -> - check_pos(Pos, Node, Nodes), - Hints = proplists:get_value(hints, Options), - {ok, NewPmap} = partitions:join(Node, Pmap, Hints), - NewPmap - end, - Acc0 = case JoinType of - first -> partitions:create_partitions(Q, FirstNode); - new -> mochiglobal:get(pmap) - end, - Pmap = lists:foldl(Fun, Acc0, lists:keysort(1, ExtNodes)), - {Pmap, make_fullmap(Pmap, Config)}. - - -%% @doc construct a table with all partitions, with the primary node and all -%% replication partner nodes as well. -make_fullmap(PMap, Config) -> - {Nodes, _Parts} = lists:unzip(PMap), - NodeParts = lists:flatmap( - fun({Node,Part}) -> - Partners = replication:partners(Node, lists:usort(Nodes), Config), - PartnerList = [{Partner, Part} || Partner <- Partners], - [{Node, Part} | PartnerList] - end, PMap), - NodeParts. - - -%% @doc tasks associated with a new state -new_state(#mem{nodes=Nodes, args=Args} = State) -> - Config = get_config(Args), - {Pmap, Fullmap} = create_maps(Config, first, Nodes, []), - new_state(State, Pmap, Fullmap, Config). - - -%% @doc tasks associated with a new state -new_state(State, Pmap, Fullmap, Config) -> - update_cache(Pmap, Fullmap), - save_state_file(State, Config), - gossip(State), - State. - - -%% cache table helper function -update_cache(Pmap, Fullmap) -> - mochiglobal:put(pmap, Pmap), - mochiglobal:put(fullmap, Fullmap). - - check_pos(Pos, Node, Nodes) -> Found = lists:keyfind(Pos, 1, Nodes), case Found of @@ -514,9 +481,7 @@ int_reset(Test, State) -> undefined -> node(); _ -> Test end, - Nodes = [{0, Node, []}], - Clock = vector_clock:create(Node), - State#mem{node=Node, nodes=Nodes, clock=Clock}. + State#mem{node=Node, nodes=[], clock=[]}. ping_all_yall(Nodes) -> |