summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mem3.erl42
-rw-r--r--src/vector_clock.erl9
2 files changed, 34 insertions, 17 deletions
diff --git a/src/mem3.erl b/src/mem3.erl
index e0bd1df0..bc2b1d7e 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -212,7 +212,8 @@ handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) ->
case MergeType of
equal -> {noreply, MergedState};
merged ->
- showroom_log:message(info, "membership: merged new gossip", []),
+ showroom_log:message(info, "membership: merged new gossip: ~p",
+ [MergedState]),
update_cache(MergedState),
gossip(MergedState),
{noreply, MergedState}
@@ -289,13 +290,13 @@ handle_init(_Test, _OldState) ->
%% handle join activities, return NewState
-handle_join(JoinType, ExtNodes,
- #mem{node=Node, nodes=Nodes, clock=Clock} = State, Config)
- when JoinType == first orelse JoinType == new ->
- {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
- update_cache(Pmap, Fullmap),
- NewClock = vector_clock:increment(Node, Clock),
- State#mem{nodes=ExtNodes, clock=NewClock};
+handle_join(first, ExtNodes, State, Config) ->
+ {_,Nodes,_} = lists:unzip3(ExtNodes),
+ ping_all_yall(Nodes),
+ join(first, ExtNodes, State, Config);
+
+handle_join(new, ExtNodes, State, Config) ->
+ join(new, ExtNodes, State, Config);
handle_join(replace, [_OldNode | _], _State, _Config) ->
% TODO implement me
@@ -306,6 +307,14 @@ handle_join(JoinType, _, _, _) ->
{error, {unknown_join_type, JoinType}}.
+join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State,
+ Config) ->
+ {Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
+ update_cache(Pmap, Fullmap),
+ NewClock = vector_clock:increment(Node, Clock),
+ State#mem{nodes=ExtNodes, clock=NewClock}.
+
+
gossip(#mem{args=Args} = NewState) ->
Test = proplists:get_value(test, Args),
gossip(Test, NewState).
@@ -317,7 +326,7 @@ gossip(undefined, #mem{node=Node, nodes=StateNodes} = NewState) ->
lists:foreach(fun(TargetNode) ->
showroom_log:message(info, "membership: firing gossip from ~p to ~p",
[Node, TargetNode]),
- gen_server:cast({?MODULE, TargetNode}, {gossip, NewState})
+ gen_server:cast({?SERVER, TargetNode}, {gossip, NewState})
end, PartnersPlus);
gossip(_,_) ->
% testing, so don't gossip
@@ -335,7 +344,7 @@ find_latest_state_filename(Config) ->
[Latest | _] ->
{ok, Dir ++ "/state." ++ integer_to_list(Latest)};
_ ->
- throw({error, not_found})
+ throw({error, mem_state_file_not_found})
end;
{error, Reason} ->
throw({error, Reason})
@@ -450,9 +459,12 @@ merge_state(_RemoteState=#mem{clock=RemoteClock, nodes=RemoteNodes},
merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) ->
- {MergedClock, Merged} =
- vector_clock:resolve({RemoteClock, RemoteNodes},
- {LocalClock, LocalNodes}),
+ MergedClock = vector_clock:merge(RemoteClock, LocalClock),
+ Merged1 = lists:ukeymerge(1,
+ lists:keysort(1, RemoteNodes),
+ lists:keysort(1, LocalNodes)),
+ Merged = lists:keydelete(0, 1, Merged1),
+ % TODO: make sure we don't have dupe keys ?
{MergedClock, lists:keysort(1, Merged)}.
@@ -460,3 +472,7 @@ merge_nodes(RemoteClock, RemoteNodes, LocalClock, LocalNodes) ->
% lists:foreach(fun(Node) ->
% gen_event:notify(membership_events, {Type, Node})
% end, Nodes).
+
+
+ping_all_yall(Nodes) ->
+ lists:map(fun(Node) -> net_adm:ping(Node) end, Nodes).
diff --git a/src/vector_clock.erl b/src/vector_clock.erl
index 0a89d41e..740d1520 100644
--- a/src/vector_clock.erl
+++ b/src/vector_clock.erl
@@ -31,10 +31,11 @@ resolve({ClockA, ValuesA}, {ClockB, ValuesB}) ->
greater -> {ClockA, ValuesA};
equal -> {ClockA, ValuesA};
concurrent ->
- io:format("~nConcurrent Clocks~n"
- "ClockA : ~p~nClockB : ~p~n"
- "ValuesA: ~p~nValuesB: ~p~n"
- , [ClockA, ClockB, ValuesA, ValuesB]),
+ showroom_log:message(info,
+ "~nConcurrent Clocks~n"
+ "ClockA : ~p~nClockB : ~p~n"
+ "ValuesA: ~p~nValuesB: ~p~n"
+ , [ClockA, ClockB, ValuesA, ValuesB]),
{merge(ClockA,ClockB), ValuesA ++ ValuesB}
end;
resolve(not_found, {Clock, Values}) ->