diff options
author | Adam Kocoloski <adam@cloudant.com> | 2010-07-02 03:12:45 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2010-08-12 01:23:30 -0400 |
commit | 40c669d1864c4c9eb788240dd4edc533d8a352f2 (patch) | |
tree | cdd1ce41c1dc76bf8ad907821b94d61901bc9ecb | |
parent | 570649ec3d6c66be8d7900b655cdc1d31ca8fe27 (diff) |
mega refactoring of mem3
-rw-r--r-- | ebin/mem3.app | 7 | ||||
-rw-r--r-- | include/mem3.hrl | 8 | ||||
-rw-r--r-- | src/mem3.erl | 56 | ||||
-rw-r--r-- | src/mem3_app.erl | 2 | ||||
-rw-r--r-- | src/mem3_cache.erl | 4 | ||||
-rw-r--r-- | src/mem3_server.erl | 552 | ||||
-rw-r--r-- | src/mem3_sup.erl | 4 | ||||
-rw-r--r-- | src/mem3_sync.erl | 242 | ||||
-rw-r--r-- | src/mem3_sync_event.erl | 25 | ||||
-rw-r--r-- | src/mem3_util.erl | 253 | ||||
-rw-r--r-- | src/mem3_vclock.erl | 109 |
11 files changed, 368 insertions, 894 deletions
diff --git a/ebin/mem3.app b/ebin/mem3.app index d0caaeec..05d50748 100644 --- a/ebin/mem3.app +++ b/ebin/mem3.app @@ -7,17 +7,16 @@ mem3_app, mem3_cache, mem3_httpd, - mem3_server, + mem3_nodes, mem3_sup, mem3_sync, mem3_sync_event, - mem3_util, - mem3_vclock + mem3_util ]}, {registered, [ mem3_cache, mem3_events, - mem3_server, + mem3_nodes, mem3_sync, mem3_sup ]}, diff --git a/include/mem3.hrl b/include/mem3.hrl index a1e6f822..533056f9 100644 --- a/include/mem3.hrl +++ b/include/mem3.hrl @@ -10,13 +10,6 @@ -include_lib("eunit/include/eunit.hrl"). -%% version 3 of membership state --record(mem, {header=3, - nodes=[], - clock=[], - args - }). - %% partition record -record(shard, {name, node, dbname, range, ref}). @@ -28,7 +21,6 @@ -type mem_node_list() :: [mem_node()]. -type arg_options() :: {test, boolean()}. -type args() :: [] | [arg_options()]. --type mem_state() :: #mem{}. -type test() :: undefined | node(). -type epoch() :: float(). -type clock() :: {node(), epoch()}. diff --git a/src/mem3.erl b/src/mem3.erl index 4f7c6ade..e6ee5bf8 100644 --- a/src/mem3.erl +++ b/src/mem3.erl @@ -1,7 +1,7 @@ -module(mem3). --author('Brad Anderson <brad@cloudant.com>'). --export([start/0, stop/0, restart/0, state/0]). +-export([start/0, stop/0, restart/0, state/0, nodes/0, shards/1, shards/2, + choose_shards/2]). -include("mem3.hrl"). @@ -22,7 +22,7 @@ restart() -> %% key and the nodes holding that state as the value. Also reports member %% nodes which fail to respond and nodes which are connected but are not %% cluster members. Useful for debugging. --spec state() -> [{mem_state() | bad_nodes | non_member_nodes, [node()]}]. +-spec state() -> [{any | bad_nodes | non_member_nodes, [node()]}]. state() -> {ok, Nodes} = mem3:nodes(), AllNodes = erlang:nodes([this, visible]), @@ -30,4 +30,52 @@ state() -> Dict = lists:foldl(fun({Node, {ok,State}}, D) -> orddict:append(State, Node, D) end, orddict:new(), Replies), - [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
\ No newline at end of file + [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. + +-spec nodes() -> [node()]. +nodes() -> + mem3_nodes:get_nodelist(). + +-spec shards(DbName::binary()) -> [#shard{}]. +shards(DbName) -> + case ets:lookup(partitions, DbName) of + [] -> + % TODO fall back to checking dbs.couch directly + erlang:error(database_does_not_exist); + Else -> + Else + end. + +-spec shards(DbName::binary(), DocId::binary()) -> [#shard{}]. +shards(DbName, DocId) -> + HashKey = mem3_util:hash(DocId), + Head = #shard{ + name = '_', + node = '_', + dbname = DbName, + range = ['$1','$2'], + ref = '_' + }, + % TODO these conditions assume A < B, which we don't require + Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}], + case ets:select(partitions, [{Head, Conditions, ['$_']}]) of + [] -> + % TODO fall back to checking dbs.couch directly + erlang:error(database_does_not_exist); + Shards -> + Shards + end. + +choose_shards(DbName, Options) -> + try shards(DbName) + catch error:database_does_not_exist -> + Nodes = mem3:nodes(), + NodeCount = length(Nodes), + N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount), + Q = mem3_util:to_integer(couch_util:get_value(q, Options, + couch_config:get("cluster", "q", "8"))), + % rotate to a random entry in the nodelist for even distribution + {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes), + RotatedNodes = B ++ A, + mem3_util:create_partition_map(DbName, N, Q, RotatedNodes) + end. diff --git a/src/mem3_app.erl b/src/mem3_app.erl index 70bf1cf9..88cd1ea1 100644 --- a/src/mem3_app.erl +++ b/src/mem3_app.erl @@ -3,8 +3,6 @@ -export([start/2, stop/1]). start(_Type, []) -> - DbName = couch_config:get("mem3", "db", "dbs"), - couch_server:create(list_to_binary(DbName), []), mem3_sup:start_link(). stop([]) -> diff --git a/src/mem3_cache.erl b/src/mem3_cache.erl index b17db1a2..1d1bbe9b 100644 --- a/src/mem3_cache.erl +++ b/src/mem3_cache.erl @@ -69,7 +69,7 @@ changes_callback(start, _) -> changes_callback({stop, EndSeq}, _) -> exit({seq, EndSeq}); changes_callback({change, {Change}, _}, _) -> - DbName = couch_util:get_value(id, Change), + DbName = couch_util:get_value(<<"id">>, Change), case couch_util:get_value(deleted, Change, false) of true -> ets:delete(partitions, DbName); @@ -82,6 +82,6 @@ changes_callback({change, {Change}, _}, _) -> ets:insert(partitions, mem3_util:build_shards(DbName, Doc)) end end, - {ok, couch_util:get_value(seq, Change)}; + {ok, couch_util:get_value(<<"seq">>, Change)}; changes_callback(timeout, _) -> {ok, nil}. diff --git a/src/mem3_server.erl b/src/mem3_server.erl deleted file mode 100644 index 0d76344d..00000000 --- a/src/mem3_server.erl +++ /dev/null @@ -1,552 +0,0 @@ -%%% membership module -%%% -%%% State of the gen_server is a #mem record -%%% -%%% Nodes and Gossip are the same thing, and are a list of three-tuples like: -%%% -%%% [ {Pos,NodeName,Options} | _ ] -%%% -%%% Position is 1-based incrementing in order of node joining -%%% -%%% Options is a proplist, with [{hints, [Part1|_]}] denoting that the node -%%% is responsible for the extra partitions too. -%%% -%%% TODO: dialyzer type specs -%%% --module(mem3_server). --author('brad@cloudant.com'). - --behaviour(gen_server). - -%% API --export([start_link/0, start_link/1, stop/0, stop/1, reset/0]). --export([join/3, clock/0, state/0, nodes/0, fullnodes/0, - start_gossip/0]). - -%% for testing more than anything else --export([merge_nodes/2, next_up_node/1, next_up_node/3]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%% includes --include("mem3.hrl"). - --define(SERVER, membership). --define(STATE_FILE_PREFIX, "membership"). - - -%%==================================================================== -%% API -%%==================================================================== - --spec start_link() -> {ok, pid()}. -start_link() -> - start_link([]). - - --spec start_link(args()) -> {ok, pid()}. -start_link(Args) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []). - - --spec stop() -> ok. -stop() -> - stop(?MODULE). - - --spec stop(atom()) -> ok. -stop(Server) -> - gen_server:cast(Server, stop). - - --spec join(join_type(), mem_node_list() | {node(), options()}, node() | nil) -> - ok. -join(JoinType, Payload, PingNode) -> - gen_server:call(?SERVER, {join, JoinType, Payload, PingNode}). - - --spec clock() -> vector_clock(). -clock() -> - gen_server:call(?SERVER, clock). - - --spec state() -> mem_state(). -state() -> - gen_server:call(?SERVER, state). - --spec start_gossip() -> ok. -start_gossip() -> - gen_server:call(?SERVER, start_gossip). - - --spec reset() -> ok | not_reset. -reset() -> - gen_server:call(?SERVER, reset). - - -%% @doc get the list of cluster nodes (according to membership module) -%% This may differ from erlang:nodes() -%% Guaranteed to be in order of State's node list (1st elem in 3-tuple) --spec nodes() -> {ok, [node()]}. -nodes() -> - gen_server:call(?SERVER, nodes). - - -%% @doc get the list of cluster nodes (according to membership module) -%% This may differ from erlang:nodes() -%% Guaranteed to be in order of State's node list (1st elem in 3-tuple) --spec fullnodes() -> {ok, [mem_node()]}. -fullnodes() -> - gen_server:call(?SERVER, fullnodes). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%% start up membership server --spec init(args()) -> {ok, mem_state()}. -init(Args) -> - process_flag(trap_exit,true), - Test = get_test(Args), - OldState = read_latest_state_file(Test), - showroom_log:message(info, "membership: membership server starting...", []), - net_kernel:monitor_nodes(true), - State = handle_init(Test, OldState), - {ok, State#mem{args=Args}}. - - -%% new node(s) joining to this node -handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) -> - try - case handle_join(JoinType, ExtNodes, PingNode, State) of - {ok, NewState} -> {reply, ok, NewState}; - Other -> {reply, Other, State} - end - catch _:Error -> - showroom_log:message(error, "~p", [Error]), - {reply, Error, State} - end; - -%% clock -handle_call(clock, _From, #mem{clock=Clock} = State) -> - {reply, {ok, Clock}, State}; - -%% state -handle_call(state, _From, State) -> - {reply, {ok, State}, State}; - -%% reset - but only if we're in test mode -handle_call(reset, _From, #mem{args=Args} = State) -> - Test = get_test(Args), - case Test of - undefined -> {reply, not_reset, State}; - _ -> {reply, ok, int_reset(Test, State)} - end; - -%% nodes -handle_call(nodes, _From, #mem{nodes=Nodes} = State) -> - {_,NodeList,_} = lists:unzip3(lists:keysort(1, Nodes)), - {reply, {ok, NodeList}, State}; - -%% fullnodes -handle_call(fullnodes, _From, #mem{nodes=Nodes} = State) -> - {reply, {ok, Nodes}, State}; - -%% gossip -handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) -> - showroom_log:message(info, "membership: received gossip from ~p", - [erlang:node(Pid)]), - handle_gossip(From, RemoteState, LocalState); - -% start_gossip -handle_call(start_gossip, _From, State) -> - NewState = gossip(State), - {reply, ok, NewState}; - -%% ignored call -handle_call(Msg, _From, State) -> - showroom_log:message(info, "membership: ignored call: ~p", [Msg]), - {reply, ignored, State}. - - -%% gossip -handle_cast({gossip, RemoteState}, LocalState) -> - State = case handle_gossip(none, RemoteState, LocalState) of - {reply, ok, NewState} -> NewState; - {reply, {new_state, NewState}, _} -> NewState; - {noreply, NewState} -> NewState - end, - {noreply, State}; - -%% stop -handle_cast(stop, State) -> - {stop, normal, State}; - -%% ignored cast -handle_cast(Msg, State) -> - showroom_log:message(info, "membership: ignored cast: ~p", [Msg]), - {noreply, State}. - - -%% @doc handle nodedown messages because we have -%% net_kernel:monitor_nodes(true) -handle_info({nodedown, Node}, State) -> - showroom_log:message(alert, "membership: nodedown ~p", [Node]), - notify(nodedown, [Node], State), - {noreply, State}; - -%% @doc handle nodeup messages because we have -%% net_kernel:monitor_nodes(true) -handle_info({nodeup, Node}, State) -> - showroom_log:message(alert, "membership: nodeup ~p", [Node]), - notify(nodeup, [Node], State), - gossip_cast(State), - {noreply, State}; - -%% ignored info -handle_info(Info, State) -> - showroom_log:message(info, "membership: ignored info: ~p", [Info]), - {noreply, State}. - - -% terminate -terminate(_Reason, _State) -> - ok. - - -% ignored code change -code_change(OldVsn, State, _Extra) -> - io:format("Unknown Old Version~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]), - {ok, State}. - - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- - -%% @doc if Args has config use it, otherwise call configuration module -%% most times Args will have config during testing runs -%get_config(Args) -> -% case proplists:get_value(config, Args) of -% undefined -> configuration:get_config(); -% Any -> Any -% end. - - -get_test(Args) -> - proplists:get_value(test, Args). - - -%% @doc handle_init starts a node -%% Most of the time, this puts the node in a single-node cluster setup, -%% But, we could be automatically rejoining a cluster after some downtime. -%% See handle_join for initing, joining, leaving a cluster, or replacing a -%% node. -%% @end -handle_init(Test, nil) -> - int_reset(Test); - -handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) -> - % there's an old state, let's try to rejoin automatically - % but only if we can compare our old state to other available - % nodes and get a match... otherwise get a human involved - {_, NodeList, _} = lists:unzip3(Nodes), - ping_all_yall(NodeList), - {RemoteStates, _BadNodes} = get_remote_states(NodeList), - Test = get_test(Args), - case compare_state_with_rest(OldState, RemoteStates) of - match -> - showroom_log:message(info, "membership: rejoined successfully", []), - OldState; - Other -> - showroom_log:message(error, "membership: rejoin failed: ~p", [Other]), - int_reset(Test) - end. - - -%% @doc handle join activities, return {ok,NewState} --spec handle_join(join_type(), [mem_node()], ping_node(), mem_state()) -> - {ok, mem_state()}. -% init -handle_join(init, ExtNodes, nil, State) -> - {_,Nodes,_} = lists:unzip3(ExtNodes), - ping_all_yall(Nodes), - int_join(ExtNodes, State); -% join -handle_join(join, ExtNodes, PingNode, #mem{args=Args} = State) -> - NewState = case get_test(Args) of - undefined -> get_pingnode_state(PingNode); - _ -> State % testing, so meh - end, - % now use this info to join the ring - int_join(ExtNodes, NewState); -% replace -handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) -> - handle_join(replace, {OldNode, []}, PingNode, State); -handle_join(replace, [OldNode | _], PingNode, State) -> - handle_join(replace, {OldNode, []}, PingNode, State); -handle_join(replace, {OldNode, NewOpts}, PingNode, State) -> - OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode), - {Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes), - NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}), - notify(node_leave, [OldNode], State), - int_join([], OldState#mem{nodes=NewNodes}); -% leave -handle_join(leave, [OldNode | _], _PingNode, State) -> - % TODO implement me - notify(node_leave, [OldNode], State), - ok; - -handle_join(JoinType, _, PingNode, _) -> - showroom_log:message(info, "membership: unknown join type: ~p " - "for ping node: ~p", [JoinType, PingNode]), - {error, unknown_join_type}. - -%% @doc common operations for all join types -int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) -> - NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) -> - check_pos(Pos, N, Nodes), - notify(node_join, [N], State), - [New|AccIn] - end, Nodes, ExtNodes), - NewNodes1 = lists:sort(NewNodes), - NewClock = mem3_vclock:increment(node(), Clock), - NewState = State#mem{nodes=NewNodes1, clock=NewClock}, - install_new_state(NewState), - {ok, NewState}. - - -install_new_state(#mem{args=Args} = State) -> - Test = get_test(Args), - save_state_file(Test, State), - gossip(call, Test, State). - - -get_pingnode_state(PingNode) -> - {ok, RemoteState} = gen_server:call({?SERVER, PingNode}, state), - RemoteState. - - -%% @doc handle the gossip messages -%% We're not using mem3_vclock:resolve b/c we need custom merge strategy -handle_gossip(From, RemoteState=#mem{clock=RemoteClock}, - LocalState=#mem{clock=LocalClock}) -> - case mem3_vclock:compare(RemoteClock, LocalClock) of - equal -> - {reply, ok, LocalState}; - less -> - % remote node needs updating - {reply, {new_state, LocalState}, LocalState}; - greater when From == none-> - {noreply, install_new_state(RemoteState)}; - greater -> - % local node needs updating - gen_server:reply(From, ok), % reply to sender first - {noreply, install_new_state(RemoteState)}; - concurrent -> - % ick, so let's resolve and merge states - showroom_log:message(info, - "membership: Concurrent Clocks~n" - "RemoteState : ~p~nLocalState : ~p~n" - , [RemoteState, LocalState]), - MergedState = merge_states(RemoteState, LocalState), - if From =/= none -> - % reply to sender - gen_server:reply(From, {new_state, MergedState}) - end, - {noreply, install_new_state(MergedState)} - end. - - -merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState, - #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) -> - MergedClock = mem3_vclock:merge(RemoteClock, LocalClock), - MergedNodes = merge_nodes(RemoteNodes, LocalNodes), - LocalState#mem{clock=MergedClock, nodes=MergedNodes}. - - -%% this will give one of the lists back, deterministically -merge_nodes(Remote, Local) -> - % get rid of the initial 0 node if it's still there, and sort - Remote1 = lists:usort(lists:keydelete(0,1,Remote)), - Local1 = lists:usort(lists:keydelete(0,1,Local)), - % handle empty lists as well as other cases - case {Remote1, Local1} of - {[], L} -> L; - {R, []} -> R; - _ -> erlang:min(Remote1, Local1) - end. - - -gossip(#mem{args=Args} = NewState) -> - Test = get_test(Args), - gossip(call, Test, NewState). - - -gossip_cast(#mem{nodes=[]}) -> ok; -gossip_cast(#mem{args=Args} = NewState) -> - Test = get_test(Args), - gossip(cast, Test, NewState). - - --spec gossip(gossip_fun(), test(), mem_state()) -> mem_state(). -gossip(_, _, #mem{nodes=[]}) -> ok; -gossip(Fun, undefined, #mem{nodes=StateNodes} = State) -> - {_, Nodes, _} = lists:unzip3(StateNodes), - case next_up_node(Nodes) of - no_gossip_targets_available -> - State; % skip gossip, I'm the only node - TargetNode -> - showroom_log:message(info, "membership: firing gossip from ~p to ~p", - [node(), TargetNode]), - case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of - ok -> State; - {new_state, NewState} -> NewState; - Error -> throw({unknown_gossip_response, Error}) - end - end; - -gossip(_,_,_) -> - % testing, so don't gossip - ok. - - -next_up_node(Nodes) -> - next_up_node(node(), Nodes, up_nodes()). - - -next_up_node(Node, Nodes, UpNodes) -> - {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes), - List = lists:append(B, A), % be sure to eliminate Node - DownNodes = Nodes -- UpNodes, - case List -- DownNodes of - [Target|_] -> Target; - [] -> no_gossip_targets_available - end. - - -up_nodes() -> - % TODO: implement cache (fb 9704 & 9449) - erlang:nodes(). - - -%% @doc find the latest state file on disk -find_latest_state_filename() -> - Dir = couch_config:get("couchdb", "database_dir"), - case file:list_dir(Dir) of - {ok, Filenames} -> - Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <- - [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]], - SortedTimestamps = lists:reverse(lists:sort(Timestamps)), - case SortedTimestamps of - [Latest | _] -> - {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ - integer_to_list(Latest)}; - _ -> - throw({error, mem_state_file_not_found}) - end; - {error, Reason} -> - throw({error, Reason}) - end. - - -%% (Test, Config) -read_latest_state_file(undefined) -> - try - {ok, File} = find_latest_state_filename(), - case file:consult(File) of - {ok, [#mem{}=State]} -> State; - _Else -> - throw({error, bad_mem_state_file}) - end - catch _:Error -> - showroom_log:message(info, "membership: ~p", [Error]), - nil - end; -read_latest_state_file(_) -> - nil. - - -%% @doc save the state file to disk, with current timestamp. -%% thx to riak_ring_manager:do_write_ringfile/1 --spec save_state_file(test(), mem_state()) -> ok. -save_state_file(undefined, State) -> - Dir = couch_config:get("couchdb", "database_dir"), - {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(), - TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B", - [Year, Month, Day, Hour, Minute, Second]), - FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS, - ok = filelib:ensure_dir(FN), - {ok, File} = file:open(FN, [binary, write]), - io:format(File, "~w.~n", [State]), - file:close(File); - -save_state_file(_,_) -> ok. % don't save if testing - - -check_pos(Pos, Node, Nodes) -> - Found = lists:keyfind(Pos, 1, Nodes), - case Found of - false -> ok; - _ -> - {_,OldNode,_} = Found, - if - OldNode =:= Node -> - Msg = "node_exists_at_position_" ++ integer_to_list(Pos), - throw({error, list_to_binary(Msg)}); - true -> - Msg = "position_exists_" ++ integer_to_list(Pos), - throw({error, list_to_binary(Msg)}) - end - end. - - -int_reset(Test) -> - int_reset(Test, #mem{}). - - -int_reset(_Test, State) -> - State#mem{nodes=[], clock=[]}. - - -ping_all_yall(Nodes) -> - lists:foreach(fun(Node) -> - net_adm:ping(Node) - end, Nodes), - timer:sleep(500). % sigh. - - -get_remote_states(NodeList) -> - NodeList1 = lists:delete(node(), NodeList), - {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000), - {_Status, States2} = lists:unzip(States1), - NodeList2 = NodeList1 -- BadNodes, - {lists:zip(NodeList2,States2), BadNodes}. - - -%% @doc compare state with states based on vector clock -%% return match | {bad_state_match, Node, NodesThatDontMatch} -compare_state_with_rest(#mem{clock=Clock} = _State, States) -> - Results = lists:map(fun({Node, #mem{clock=Clock1}}) -> - {mem3_vclock:equals(Clock, Clock1), Node} - end, States), - BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn; - ({false, N}, AccIn) -> [N | AccIn] - end, [], Results), - if - length(BadResults) == 0 -> match; - true -> {bad_state_match, node(), BadResults} - end. - -notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) -> - {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)), - lists:foreach(fun(Node) -> - case lists:member(Node, MemNodes) orelse Type == nodedown of - true -> - gen_event:notify(membership_events, {Type, Node}); - _ -> ok % node not in cluster - end - end, Nodes). diff --git a/src/mem3_sup.erl b/src/mem3_sup.erl index 0a9f66d0..353216d4 100644 --- a/src/mem3_sup.erl +++ b/src/mem3_sup.erl @@ -7,10 +7,10 @@ start_link() -> init(_Args) -> Children = [ - child(mem3_server), child(mem3_events), child(mem3_sync), - child(mem3_cache) + child(mem3_cache), + child(mem3_nodes) ], {ok, {{one_for_one,10,1}, Children}}. diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl index d50514d9..0f402834 100644 --- a/src/mem3_sync.erl +++ b/src/mem3_sync.erl @@ -1,46 +1,214 @@ -module(mem3_sync). --behaviour(supervisor). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). --export([start_link/0, init/1, childspec/1, sup_upgrade_notify/2]). +-export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]). --include("mem3.hrl"). +-include_lib("../../couch/src/couch_db.hrl"). + +-record(state, { + active = [], + count = 0, + limit, + dict = dict:new(), + waiting = [], + update_notifier +}). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_active() -> + gen_server:call(?MODULE, get_active). + +get_queue() -> + gen_server:call(?MODULE, get_queue). + +push(Db, Node) -> + gen_server:cast(?MODULE, {push, Db, Node}). + +remove_node(Node) -> + gen_server:cast(?MODULE, {remove_node, Node}). init([]) -> - {ok, MemNodes} = mem3:nodes(), - LiveNodes = nodes(), - ChildSpecs = [childspec(N) || N <- MemNodes, lists:member(N, LiveNodes)], - gen_event:add_handler(membership_events, dbs_event, []), - {ok, {{one_for_one, 10, 8}, ChildSpecs}}. - -childspec(Node) -> - ?LOG_INFO("dbs repl ~p --> ~p starting", [node(), Node]), + process_flag(trap_exit, true), + Concurrency = couch_config:get("mem3", "sync_concurrency", "10"), + gen_event:add_handler(mem3_events, mem3_sync_event, []), + {ok, Pid} = start_update_notifier(), + spawn(fun initial_sync/0), + {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}. + +handle_call(get_active, _From, State) -> + {reply, State#state.active, State}; + +handle_call(get_queue, _From, State) -> + {reply, State#state.waiting, State}. + +handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State) + when Count >= Limit -> + {noreply, add_to_queue(State, DbName, Node)}; + +handle_cast({push, DbName, Node}, State) -> + #state{active = L, count = C} = State, + case is_running(DbName, Node, L) of + true -> + {noreply, add_to_queue(State, DbName, Node)}; + false -> + Pid = start_push_replication(DbName, Node), + {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}} + end; + +handle_cast({remove_node, Node}, State) -> + Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node], + Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end, + State#state.dict, [S || {S,N} <- Waiting, N =:= Node]), + {noreply, State#state{dict = Dict, waiting = Waiting}}. + +handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) -> + {ok, NewPid} = start_update_notifier(), + {noreply, State#state{update_notifier=NewPid}}; + +handle_info({'EXIT', Active, normal}, State) -> + handle_replication_exit(State, Active); + +handle_info({'EXIT', Active, Reason}, State) -> + case lists:keyfind(Active, 3, State#state.active) of + {OldDbName, OldNode, _} -> + ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName, + OldNode, Reason]), + timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]); + false -> ok end, + handle_replication_exit(State, Active); + +handle_info(Msg, State) -> + ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]), + {noreply, State}. + +terminate(_Reason, State) -> + [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active], + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_replication_exit(#state{waiting=[]} = State, Pid) -> + NewActive = lists:keydelete(Pid, 3, State#state.active), + {noreply, State#state{active=NewActive, count=length(NewActive)}}; +handle_replication_exit(State, Pid) -> + #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State, + Active1 = lists:keydelete(Pid, 3, Active), + Count = length(Active1), + NewState = if Count < Limit -> + case next_replication(Active1, Waiting) of + nil -> % all waiting replications are also active + State#state{active = Active1, count = Count}; + {DbName, Node, StillWaiting} -> + NewPid = start_push_replication(DbName, Node), + State#state{ + active = [{DbName, Node, NewPid} | Active1], + count = Count+1, + dict = dict:erase({DbName,Node}, D), + waiting = StillWaiting + } + end; + true -> + State#state{active = Active1, count=Count} + end, + {noreply, NewState}. + +start_push_replication(DbName, Node) -> PostBody = {[ - {<<"source">>, <<"dbs">>}, - {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, <<"dbs">>}]}}, - {<<"continuous">>, true} + {<<"source">>, DbName}, + {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}}, + {<<"continuous">>, false}, + {<<"async">>, true} ]}, - Id = couch_util:to_hex(erlang:md5(term_to_binary([node(), Node]))), - MFA = {couch_rep, start_link, [Id, PostBody, #user_ctx{}]}, - {Node, MFA, permanent, 100, worker, [couch_rep]}. - -% from http://code.google.com/p/erlrc/wiki/ErlrcHowto -sup_upgrade_notify (_Old, _New) -> - {ok, {_, Specs}} = init([]), - - Old = sets:from_list( - [Name || {Name, _, _, _} <- supervisor:which_children(?MODULE)]), - New = sets:from_list([Name || {Name, _, _, _, _, _} <- Specs]), - Kill = sets:subtract(Old, New), - - sets:fold(fun(Id, ok) -> - supervisor:terminate_child(?MODULE, Id), - supervisor:delete_child(?MODULE, Id), - ok - end, - ok, - Kill), - [supervisor:start_child (?MODULE, Spec) || Spec <- Specs ], - ok. + ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]), + UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]}, + case (catch couch_rep:replicate(PostBody, UserCtx)) of + Pid when is_pid(Pid) -> + link(Pid), + Pid; + {db_not_found, _Msg} -> + case couch_api:open_db(DbName, []) of + {ok, Db} -> + % source exists, let's (re)create the target + couch_api:close_db(Db), + case rpc:call(Node, couch_api, create_db, [DbName, []]) of + {ok, Target} -> + ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName, + Node]), + couch_api:close_db(Target), + start_push_replication(DbName, Node); + file_exists -> + start_push_replication(DbName, Node); + Error -> + ?LOG_ERROR("~p couldn't create ~s on ~p because ~p", + [?MODULE, DbName, Node, Error]), + exit(shutdown) + end; + {not_found, no_db_file} -> + % source is gone, so this is a hack to skip it + ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted", + [?MODULE, DbName, Node]), + spawn_link(fun() -> ok end) + end; + {node_not_connected, _} -> + % we'll get this one when the node rejoins + ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]), + spawn_link(fun() -> ok end); + CatchAll -> + ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]), + case lists:member(Node, nodes()) of + true -> + timer:apply_after(5000, ?MODULE, push, [DbName, Node]); + false -> + ok + end, + spawn_link(fun() -> ok end) + end. + +add_to_queue(State, DbName, Node) -> + #state{dict=D, waiting=Waiting} = State, + case dict:is_key({DbName, Node}, D) of + true -> + State; + false -> + ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]), + State#state{ + dict = dict:store({DbName,Node}, ok, D), + waiting = Waiting ++ [{DbName,Node}] + } + end. + +initial_sync() -> + Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), + Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + Nodes = mem3:nodes(), + Live = nodes(), + [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)]. + +start_update_notifier() -> + Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), + Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + couch_db_update_notifier:start_link(fun + ({updated, Db}) when Db == Db1; Db == Db2 -> + Nodes = mem3:nodes(), + Live = nodes(), + [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)]; + (_) -> ok end). + +%% @doc Finds the next {DbName,Node} pair in the list of waiting replications +%% which does not correspond to an already running replication +-spec next_replication(list(), list()) -> {binary(),node(),list()} | nil. +next_replication(Active, Waiting) -> + case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of + {_, []} -> + nil; + {Running, [{DbName,Node}|Rest]} -> + {DbName, Node, Running ++ Rest} + end. + +is_running(DbName, Node, ActiveList) -> + [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node]. diff --git a/src/mem3_sync_event.erl b/src/mem3_sync_event.erl index 55f3840c..45fcb8aa 100644 --- a/src/mem3_sync_event.erl +++ b/src/mem3_sync_event.erl @@ -7,11 +7,26 @@ init(_) -> {ok, nil}. -handle_event({Up, Node}, State) when Up == nodeup; Up == node_join -> - mem3_sync:add_node(Node); - -handle_event({Down, Node}, State) when Down == nodedown; Down == node_leave -> - mem3_sync:remove_node(Node); +handle_event({add_node, Node}, State) -> + Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), + Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), + [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]], + {ok, State}; + +handle_event({nodeup, Node}, State) -> + case lists:member(Node, mem3:nodes()) of + true -> + Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), + Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), + [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]]; + false -> + ok + end, + {ok, State}; + +handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node -> + mem3_sync:remove_node(Node), + {ok, State}; handle_event(_Event, State) -> {ok, State}. diff --git a/src/mem3_util.erl b/src/mem3_util.erl index 476742b7..b05faa15 100644 --- a/src/mem3_util.erl +++ b/src/mem3_util.erl @@ -1,170 +1,52 @@ -module(mem3_util). -author('brad@cloudant.com'). -%% API --export([fullmap/2, fullmap/3, hash/1, install_fullmap/4]). --export([for_key/2, all_parts/1]). --export([shard_name/2, build_shards/2]). +-export([hash/1, name_shard/1, create_partition_map/4, build_shards/2, + n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1]). --define(RINGTOP, trunc(math:pow(2,160))). % SHA-1 space +-define(RINGTOP, 2 bsl 31). % CRC32 space -include("mem3.hrl"). -%%==================================================================== -%% API -%%==================================================================== - -%% @doc build a full partition map -fullmap(DbName, Options) -> - {ok, Nodes} = mem3:nodes(), - fullmap(DbName, Nodes, Options). - -fullmap(DbName, Nodes, Options) -> - {N,Q} = db_init_constants(Options), - NewNodes = ordered_nodes(DbName, Nodes), - Pmap = pmap(Q, NewNodes), - int_fullmap(DbName, N, Pmap, NewNodes). - -%% @spec hash(term()) -> Digest::binary() -%% @doc uses SHA-1 as its hash hash(Item) when is_binary(Item) -> - crypto:sha(Item); + erlang:crc32(Item); hash(Item) -> - crypto:sha(term_to_binary(Item)). - -install_fullmap(DbName, Fullmap, FullNodes, Options) -> - {N,Q} = db_init_constants(Options), - Doc = {[{<<"_id">>,DbName}, - {<<"map">>, jsonify(<<"map">>, Fullmap)}, - {<<"nodes">>, jsonify(<<"nodes">>, FullNodes)}, - {<<"n">>,N}, - {<<"q">>,Q}]}, - write_db_doc(Doc). - -for_key(DbName, Key) -> - <<HashKey:160/integer>> = hash(Key), - Head = #shard{ - name = '_', - node = '_', - dbname = DbName, - range = ['$1','$2'], - ref = '_' - }, - % TODO these conditions assume A < B, which we don't require - Conditions = [{'<', '$1', HashKey}, {'<', HashKey, '$2'}], - case ets:select(partitions, [{Head, Conditions, ['$_']}]) of - [] -> - erlang:error(database_does_not_exist); - Shards -> - Shards - end. - -all_parts(DbName) -> - case ets:lookup(partitions, DbName) of - [] -> - erlang:error(database_does_not_exist); - Else -> - Else - end. - -%%==================================================================== -%% Internal functions -%%==================================================================== + erlang:crc32(term_to_binary(Item)). + +name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) -> + Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-", + couch_util:to_hex(<<E:32/integer>>), "/", DbName], + Shard#shard{name = ?l2b(Name)}. + +create_partition_map(DbName, N, Q, Nodes) -> + UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []), + Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]), + Shards1 = attach_nodes(Shards0, [], Nodes, []), + [name_shard(S#shard{dbname=DbName}) || S <- Shards1]. + +make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP -> + Acc; +make_key_ranges(Increment, Start, Acc) -> + case Start + 2*Increment of + X when X > ?RINGTOP -> + End = ?RINGTOP - 1; + _ -> + End = Start + Increment - 1 + end, + make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]). -%% @doc get cluster constants from options or config -db_init_constants(Options) -> - {const(n, Options), const(q, Options)}. +attach_nodes([], Acc, _, _) -> + lists:reverse(Acc); +attach_nodes(Shards, Acc, [], UsedNodes) -> + attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []); +attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) -> + attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]). -%% @doc get individual constant -const(Const, Options) -> - ListResult = case couch_util:get_value(Const, Options) of - undefined -> couch_config:get("cluster", atom_to_list(Const)); - Val -> Val - end, - list_to_integer(ListResult). - -%% @doc hash the dbname, and return the corresponding node for seeding a ring -seednode(DbName, Nodes) -> - <<HashInt:160/integer>> = hash(DbName), - Size = partition_range(length(Nodes)), - Factor = (HashInt div Size), - lists:nth(Factor+1, Nodes). - -%% @doc take the list of nodes, and rearrange it, starting with the node that -%% results from hashing the Term -ordered_nodes(Term, Nodes) -> - SeedNode = seednode(Term, Nodes), - {A, B} = lists:splitwith(fun(N) -> N /= SeedNode end, Nodes), - lists:append(B,A). - -%% @doc create a partition map -pmap(NumPartitions, Nodes) -> - Increment = ?RINGTOP div NumPartitions, - Parts = parts(?RINGTOP, Increment, 0, []), - make_map(Nodes, Nodes, Parts, []). - -%% @doc makes a {beg, end} list of partition ranges -%% last range may have an extra few values, because Increment is created -%% with Ringtop 'div' NumPartitions above. -parts(Top, _, Beg, Acc) when Beg > Top -> Acc; -parts(Top, Increment, Beg, Acc) -> - End = case Beg + 2*Increment of - Over when Over > Top -> Top; - _ -> Beg + Increment - 1 - end, - NewAcc = [{Beg, End} | Acc], - parts(Top, Increment, End+1, NewAcc). - -%% @doc create a full map, which is a pmap with N-1 replication partner nodes -%% added per partition -int_fullmap(DbName, N, Pmap, Nodes) -> - Full = lists:foldl(fun({Node,{B,E} = Part}, AccIn) -> - Primary = [#shard{dbname=DbName, node=Node, range=[B,E], - name=shard_name(B,DbName)}], - Partners = partners(DbName, N, Node, Nodes, Part), - lists:append([Primary, Partners, AccIn]) - end, [], Pmap), - lists:reverse(Full). - -partners(DbName, N, Node, Nodes, {Beg,End}) -> - {A, [Node|B]} = lists:splitwith(fun(Nd) -> Nd /= Node end, Nodes), - Nodes1 = lists:append(B,A), - Partners = lists:sublist(Nodes1, N-1), % N-1 replication partner nodes - lists:map(fun(Partner) -> - #shard{dbname=DbName, node=Partner, range=[Beg,End], - name=shard_name(Beg,DbName)} - end, Partners). - -%% @doc size of one partition in the ring -partition_range(Q) -> - trunc( ?RINGTOP / Q ). % SHA-1 space / Q - -%% @doc assign nodes to each of the partitions. When you run out of nodes, -%% start at the beginning of the node list again. -%% The provided node list starts with the seed node (seednode fun) -make_map(_,_,[], Acc) -> - lists:keysort(2,Acc); -make_map(AllNodes, [], Parts, Acc) -> - % start back at beginning of node list - make_map(AllNodes, AllNodes, Parts, Acc); -make_map(AllNodes, [Node|RestNodes], [Part|RestParts], Acc) -> - % add a node/part combo to the Acc - make_map(AllNodes, RestNodes, RestParts, [{Node,Part}|Acc]). - -jsonify(<<"map">>, Map) -> - lists:map(fun(#shard{node=Node, range=[Beg,End]}) -> - {[{<<"node">>, Node}, {<<"b">>, Beg}, {<<"e">>, End}]} - end, Map); -jsonify(<<"nodes">>, Nodes) -> - lists:map(fun({Order, Node, Options}) -> - {[{<<"order">>, Order}, {<<"node">>, Node}, {<<"options">>, Options}]} - end, Nodes). - -write_db_doc(EDoc) -> +write_db_doc(Doc) -> {ok, Db} = couch_db:open(<<"dbs">>, []), try - update_db_doc(Db, couch_doc:from_json_obj(EDoc)) - catch {conflict, _} -> + update_db_doc(Db, Doc) + catch conflict -> ?LOG_ERROR("conflict writing db doc, must be a race", []) after couch_db:close(Db) @@ -180,22 +62,55 @@ update_db_doc(Db, #doc{id=Id, body=Body} = Doc) -> {ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, []) end. -shard_name(Part, DbName) when is_list(DbName) -> - shard_name(Part, ?l2b(DbName)); -shard_name(Part, DbName) -> - PartHex = ?l2b(showroom_utils:int_to_hexstr(Part)), - <<"x", PartHex/binary, "/", DbName/binary, "_", PartHex/binary>>. +delete_db_doc(DocId) -> + {ok, Db} = couch_db:open(<<"dbs">>, []), + try + delete_db_doc(Db, DocId) + catch conflict -> + ok + after + couch_db:close(Db) + end. + +delete_db_doc(Db, DocId) -> + case couch_db:open_doc(Db, DocId, []) of + {not_found, _} -> + ok; + {ok, OldDoc} -> + {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, []) + end. build_shards(DbName, DocProps) -> - lists:map(fun({Map}) -> - Begin = couch_util:get_value(<<"b">>, Map), - #shard{ - name = mem3_util:shard_name(Begin, DbName), - dbname = DbName, - node = to_atom(couch_util:get_value(<<"node">>, Map)), - range = [Begin, couch_util:get_value(<<"e">>, Map)] - } - end, couch_util:get_value(<<"map">>, DocProps, {[]})). + {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}), + lists:flatmap(fun({Node, Ranges}) -> + lists:map(fun(Range) -> + [B,E] = string:tokens(?b2l(Range), "-"), + Beg = httpd_util:hexlist_to_integer(B), + End = httpd_util:hexlist_to_integer(E), + name_shard(#shard{ + dbname = DbName, + node = to_atom(Node), + range = [Beg, End] + }) + end, Ranges) + end, ByNode). to_atom(Node) when is_binary(Node) -> - list_to_atom(binary_to_list(Node)). + list_to_atom(binary_to_list(Node)); +to_atom(Node) when is_atom(Node) -> + Node. + +to_integer(N) when is_integer(N) -> + N; +to_integer(N) when is_binary(N) -> + list_to_integer(binary_to_list(N)); +to_integer(N) when is_list(N) -> + list_to_integer(N). + +n_val(undefined, NodeCount) -> + n_val(list_to_integer(couch_config:get("cluster", "n", "3")), NodeCount); +n_val(N, NodeCount) when N > NodeCount -> + ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]), + NodeCount; +n_val(N, _) -> + N. diff --git a/src/mem3_vclock.erl b/src/mem3_vclock.erl deleted file mode 100644 index a48da43c..00000000 --- a/src/mem3_vclock.erl +++ /dev/null @@ -1,109 +0,0 @@ -%%% @author Cliff Moon <cliff@powerset.com> [] -%%% @copyright 2008 Cliff Moon - --module (mem3_vclock). --export ([create/1, truncate/1, increment/2, compare/2, resolve/2, merge/2, - equals/2]). - -%% -ifdef(TEST). -%% -include("etest/vector_clock_test.erl"). -%% -endif. - -create(NodeName) -> [{NodeName, now_float()}]. - -truncate(Clock) when length(Clock) > 10 -> - lists:nthtail(length(Clock) - 10, lists:keysort(2, Clock)); - -truncate(Clock) -> Clock. - -increment(NodeName, [{NodeName, _Version}|Clocks]) -> - [{NodeName, now_float()}|Clocks]; - -increment(NodeName, [NodeClock|Clocks]) -> - [NodeClock|increment(NodeName, Clocks)]; - -increment(NodeName, []) -> - [{NodeName, now_float()}]. - -resolve({ClockA, ValuesA}, {ClockB, ValuesB}) -> - case compare(ClockA, ClockB) of - less -> {ClockB, ValuesB}; - greater -> {ClockA, ValuesA}; - equal -> {ClockA, ValuesA}; - concurrent -> - showroom_log:message(info, - "~nConcurrent Clocks~n" - "ClockA : ~p~nClockB : ~p~n" - "ValuesA: ~p~nValuesB: ~p~n" - , [ClockA, ClockB, ValuesA, ValuesB]), - {merge(ClockA,ClockB), ValuesA ++ ValuesB} - end; -resolve(not_found, {Clock, Values}) -> - {Clock, Values}; -resolve({Clock, Values}, not_found) -> - {Clock, Values}. - -merge(ClockA, ClockB) -> - merge([], ClockA, ClockB). - -merge(Merged, [], ClockB) -> lists:keysort(1, Merged ++ ClockB); - -merge(Merged, ClockA, []) -> lists:keysort(1, Merged ++ ClockA); - -merge(Merged, [{NodeA, VersionA}|ClockA], ClockB) -> - case lists:keytake(NodeA, 1, ClockB) of - {value, {NodeA, VersionB}, TrunkClockB} when VersionA > VersionB -> - merge([{NodeA,VersionA}|Merged],ClockA,TrunkClockB); - {value, {NodeA, VersionB}, TrunkClockB} -> - merge([{NodeA,VersionB}|Merged],ClockA,TrunkClockB); - false -> - merge([{NodeA,VersionA}|Merged],ClockA,ClockB) - end. - -compare(ClockA, ClockB) -> - AltB = less_than(ClockA, ClockB), - if AltB -> less; true -> - BltA = less_than(ClockB, ClockA), - if BltA -> greater; true -> - AeqB = equals(ClockA, ClockB), - if AeqB -> equal; true -> concurrent end - end - end. - -%% ClockA is less than ClockB if and only if ClockA[z] <= ClockB[z] for all -%% instances z and there exists an index z' such that ClockA[z'] < ClockB[z'] -less_than(ClockA, ClockB) -> - ForAll = lists:all(fun({Node, VersionA}) -> - case lists:keysearch(Node, 1, ClockB) of - {value, {_NodeB, VersionB}} -> VersionA =< VersionB; - false -> false - end - end, ClockA), - Exists = lists:any(fun({NodeA, VersionA}) -> - case lists:keysearch(NodeA, 1, ClockB) of - {value, {_NodeB, VersionB}} -> VersionA /= VersionB; - false -> true - end - end, ClockA), - %length takes care of the case when clockA is shorter than B - ForAll and (Exists or (length(ClockA) < length(ClockB))). - -equals(ClockA, ClockB) -> - Equivalent = lists:all(fun({NodeA, VersionA}) -> - lists:any(fun(NodeClockB) -> - case NodeClockB of - {NodeA, VersionA} -> true; - _ -> false - end - end, ClockB) - end, ClockA), - Equivalent and (length(ClockA) == length(ClockB)). - -now_float() -> - time_to_epoch_float(now()). - -time_to_epoch_float(Time) when is_integer(Time) or is_float(Time) -> - Time; - -time_to_epoch_float({Mega,Sec,Micro}) -> - Mega * 1000000 + Sec + Micro / 1000000. |