summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-07-02 03:12:45 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-12 01:23:30 -0400
commit40c669d1864c4c9eb788240dd4edc533d8a352f2 (patch)
treecdd1ce41c1dc76bf8ad907821b94d61901bc9ecb
parent570649ec3d6c66be8d7900b655cdc1d31ca8fe27 (diff)
mega refactoring of mem3
-rw-r--r--ebin/mem3.app7
-rw-r--r--include/mem3.hrl8
-rw-r--r--src/mem3.erl56
-rw-r--r--src/mem3_app.erl2
-rw-r--r--src/mem3_cache.erl4
-rw-r--r--src/mem3_server.erl552
-rw-r--r--src/mem3_sup.erl4
-rw-r--r--src/mem3_sync.erl242
-rw-r--r--src/mem3_sync_event.erl25
-rw-r--r--src/mem3_util.erl253
-rw-r--r--src/mem3_vclock.erl109
11 files changed, 368 insertions, 894 deletions
diff --git a/ebin/mem3.app b/ebin/mem3.app
index d0caaeec..05d50748 100644
--- a/ebin/mem3.app
+++ b/ebin/mem3.app
@@ -7,17 +7,16 @@
mem3_app,
mem3_cache,
mem3_httpd,
- mem3_server,
+ mem3_nodes,
mem3_sup,
mem3_sync,
mem3_sync_event,
- mem3_util,
- mem3_vclock
+ mem3_util
]},
{registered, [
mem3_cache,
mem3_events,
- mem3_server,
+ mem3_nodes,
mem3_sync,
mem3_sup
]},
diff --git a/include/mem3.hrl b/include/mem3.hrl
index a1e6f822..533056f9 100644
--- a/include/mem3.hrl
+++ b/include/mem3.hrl
@@ -10,13 +10,6 @@
-include_lib("eunit/include/eunit.hrl").
-%% version 3 of membership state
--record(mem, {header=3,
- nodes=[],
- clock=[],
- args
- }).
-
%% partition record
-record(shard, {name, node, dbname, range, ref}).
@@ -28,7 +21,6 @@
-type mem_node_list() :: [mem_node()].
-type arg_options() :: {test, boolean()}.
-type args() :: [] | [arg_options()].
--type mem_state() :: #mem{}.
-type test() :: undefined | node().
-type epoch() :: float().
-type clock() :: {node(), epoch()}.
diff --git a/src/mem3.erl b/src/mem3.erl
index 4f7c6ade..e6ee5bf8 100644
--- a/src/mem3.erl
+++ b/src/mem3.erl
@@ -1,7 +1,7 @@
-module(mem3).
--author('Brad Anderson <brad@cloudant.com>').
--export([start/0, stop/0, restart/0, state/0]).
+-export([start/0, stop/0, restart/0, state/0, nodes/0, shards/1, shards/2,
+ choose_shards/2]).
-include("mem3.hrl").
@@ -22,7 +22,7 @@ restart() ->
%% key and the nodes holding that state as the value. Also reports member
%% nodes which fail to respond and nodes which are connected but are not
%% cluster members. Useful for debugging.
--spec state() -> [{mem_state() | bad_nodes | non_member_nodes, [node()]}].
+-spec state() -> [{any | bad_nodes | non_member_nodes, [node()]}].
state() ->
{ok, Nodes} = mem3:nodes(),
AllNodes = erlang:nodes([this, visible]),
@@ -30,4 +30,52 @@ state() ->
Dict = lists:foldl(fun({Node, {ok,State}}, D) ->
orddict:append(State, Node, D)
end, orddict:new(), Replies),
- [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. \ No newline at end of file
+ [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
+
+-spec nodes() -> [node()].
+nodes() ->
+ mem3_nodes:get_nodelist().
+
+-spec shards(DbName::binary()) -> [#shard{}].
+shards(DbName) ->
+ case ets:lookup(partitions, DbName) of
+ [] ->
+ % TODO fall back to checking dbs.couch directly
+ erlang:error(database_does_not_exist);
+ Else ->
+ Else
+ end.
+
+-spec shards(DbName::binary(), DocId::binary()) -> [#shard{}].
+shards(DbName, DocId) ->
+ HashKey = mem3_util:hash(DocId),
+ Head = #shard{
+ name = '_',
+ node = '_',
+ dbname = DbName,
+ range = ['$1','$2'],
+ ref = '_'
+ },
+ % TODO these conditions assume A < B, which we don't require
+ Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}],
+ case ets:select(partitions, [{Head, Conditions, ['$_']}]) of
+ [] ->
+ % TODO fall back to checking dbs.couch directly
+ erlang:error(database_does_not_exist);
+ Shards ->
+ Shards
+ end.
+
+choose_shards(DbName, Options) ->
+ try shards(DbName)
+ catch error:database_does_not_exist ->
+ Nodes = mem3:nodes(),
+ NodeCount = length(Nodes),
+ N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
+ Q = mem3_util:to_integer(couch_util:get_value(q, Options,
+ couch_config:get("cluster", "q", "8"))),
+ % rotate to a random entry in the nodelist for even distribution
+ {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes),
+ RotatedNodes = B ++ A,
+ mem3_util:create_partition_map(DbName, N, Q, RotatedNodes)
+ end.
diff --git a/src/mem3_app.erl b/src/mem3_app.erl
index 70bf1cf9..88cd1ea1 100644
--- a/src/mem3_app.erl
+++ b/src/mem3_app.erl
@@ -3,8 +3,6 @@
-export([start/2, stop/1]).
start(_Type, []) ->
- DbName = couch_config:get("mem3", "db", "dbs"),
- couch_server:create(list_to_binary(DbName), []),
mem3_sup:start_link().
stop([]) ->
diff --git a/src/mem3_cache.erl b/src/mem3_cache.erl
index b17db1a2..1d1bbe9b 100644
--- a/src/mem3_cache.erl
+++ b/src/mem3_cache.erl
@@ -69,7 +69,7 @@ changes_callback(start, _) ->
changes_callback({stop, EndSeq}, _) ->
exit({seq, EndSeq});
changes_callback({change, {Change}, _}, _) ->
- DbName = couch_util:get_value(id, Change),
+ DbName = couch_util:get_value(<<"id">>, Change),
case couch_util:get_value(deleted, Change, false) of
true ->
ets:delete(partitions, DbName);
@@ -82,6 +82,6 @@ changes_callback({change, {Change}, _}, _) ->
ets:insert(partitions, mem3_util:build_shards(DbName, Doc))
end
end,
- {ok, couch_util:get_value(seq, Change)};
+ {ok, couch_util:get_value(<<"seq">>, Change)};
changes_callback(timeout, _) ->
{ok, nil}.
diff --git a/src/mem3_server.erl b/src/mem3_server.erl
deleted file mode 100644
index 0d76344d..00000000
--- a/src/mem3_server.erl
+++ /dev/null
@@ -1,552 +0,0 @@
-%%% membership module
-%%%
-%%% State of the gen_server is a #mem record
-%%%
-%%% Nodes and Gossip are the same thing, and are a list of three-tuples like:
-%%%
-%%% [ {Pos,NodeName,Options} | _ ]
-%%%
-%%% Position is 1-based incrementing in order of node joining
-%%%
-%%% Options is a proplist, with [{hints, [Part1|_]}] denoting that the node
-%%% is responsible for the extra partitions too.
-%%%
-%%% TODO: dialyzer type specs
-%%%
--module(mem3_server).
--author('brad@cloudant.com').
-
--behaviour(gen_server).
-
-%% API
--export([start_link/0, start_link/1, stop/0, stop/1, reset/0]).
--export([join/3, clock/0, state/0, nodes/0, fullnodes/0,
- start_gossip/0]).
-
-%% for testing more than anything else
--export([merge_nodes/2, next_up_node/1, next_up_node/3]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
-%% includes
--include("mem3.hrl").
-
--define(SERVER, membership).
--define(STATE_FILE_PREFIX, "membership").
-
-
-%%====================================================================
-%% API
-%%====================================================================
-
--spec start_link() -> {ok, pid()}.
-start_link() ->
- start_link([]).
-
-
--spec start_link(args()) -> {ok, pid()}.
-start_link(Args) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, Args, []).
-
-
--spec stop() -> ok.
-stop() ->
- stop(?MODULE).
-
-
--spec stop(atom()) -> ok.
-stop(Server) ->
- gen_server:cast(Server, stop).
-
-
--spec join(join_type(), mem_node_list() | {node(), options()}, node() | nil) ->
- ok.
-join(JoinType, Payload, PingNode) ->
- gen_server:call(?SERVER, {join, JoinType, Payload, PingNode}).
-
-
--spec clock() -> vector_clock().
-clock() ->
- gen_server:call(?SERVER, clock).
-
-
--spec state() -> mem_state().
-state() ->
- gen_server:call(?SERVER, state).
-
--spec start_gossip() -> ok.
-start_gossip() ->
- gen_server:call(?SERVER, start_gossip).
-
-
--spec reset() -> ok | not_reset.
-reset() ->
- gen_server:call(?SERVER, reset).
-
-
-%% @doc get the list of cluster nodes (according to membership module)
-%% This may differ from erlang:nodes()
-%% Guaranteed to be in order of State's node list (1st elem in 3-tuple)
--spec nodes() -> {ok, [node()]}.
-nodes() ->
- gen_server:call(?SERVER, nodes).
-
-
-%% @doc get the list of cluster nodes (according to membership module)
-%% This may differ from erlang:nodes()
-%% Guaranteed to be in order of State's node list (1st elem in 3-tuple)
--spec fullnodes() -> {ok, [mem_node()]}.
-fullnodes() ->
- gen_server:call(?SERVER, fullnodes).
-
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%% start up membership server
--spec init(args()) -> {ok, mem_state()}.
-init(Args) ->
- process_flag(trap_exit,true),
- Test = get_test(Args),
- OldState = read_latest_state_file(Test),
- showroom_log:message(info, "membership: membership server starting...", []),
- net_kernel:monitor_nodes(true),
- State = handle_init(Test, OldState),
- {ok, State#mem{args=Args}}.
-
-
-%% new node(s) joining to this node
-handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) ->
- try
- case handle_join(JoinType, ExtNodes, PingNode, State) of
- {ok, NewState} -> {reply, ok, NewState};
- Other -> {reply, Other, State}
- end
- catch _:Error ->
- showroom_log:message(error, "~p", [Error]),
- {reply, Error, State}
- end;
-
-%% clock
-handle_call(clock, _From, #mem{clock=Clock} = State) ->
- {reply, {ok, Clock}, State};
-
-%% state
-handle_call(state, _From, State) ->
- {reply, {ok, State}, State};
-
-%% reset - but only if we're in test mode
-handle_call(reset, _From, #mem{args=Args} = State) ->
- Test = get_test(Args),
- case Test of
- undefined -> {reply, not_reset, State};
- _ -> {reply, ok, int_reset(Test, State)}
- end;
-
-%% nodes
-handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
- {_,NodeList,_} = lists:unzip3(lists:keysort(1, Nodes)),
- {reply, {ok, NodeList}, State};
-
-%% fullnodes
-handle_call(fullnodes, _From, #mem{nodes=Nodes} = State) ->
- {reply, {ok, Nodes}, State};
-
-%% gossip
-handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) ->
- showroom_log:message(info, "membership: received gossip from ~p",
- [erlang:node(Pid)]),
- handle_gossip(From, RemoteState, LocalState);
-
-% start_gossip
-handle_call(start_gossip, _From, State) ->
- NewState = gossip(State),
- {reply, ok, NewState};
-
-%% ignored call
-handle_call(Msg, _From, State) ->
- showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
- {reply, ignored, State}.
-
-
-%% gossip
-handle_cast({gossip, RemoteState}, LocalState) ->
- State = case handle_gossip(none, RemoteState, LocalState) of
- {reply, ok, NewState} -> NewState;
- {reply, {new_state, NewState}, _} -> NewState;
- {noreply, NewState} -> NewState
- end,
- {noreply, State};
-
-%% stop
-handle_cast(stop, State) ->
- {stop, normal, State};
-
-%% ignored cast
-handle_cast(Msg, State) ->
- showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
- {noreply, State}.
-
-
-%% @doc handle nodedown messages because we have
-%% net_kernel:monitor_nodes(true)
-handle_info({nodedown, Node}, State) ->
- showroom_log:message(alert, "membership: nodedown ~p", [Node]),
- notify(nodedown, [Node], State),
- {noreply, State};
-
-%% @doc handle nodeup messages because we have
-%% net_kernel:monitor_nodes(true)
-handle_info({nodeup, Node}, State) ->
- showroom_log:message(alert, "membership: nodeup ~p", [Node]),
- notify(nodeup, [Node], State),
- gossip_cast(State),
- {noreply, State};
-
-%% ignored info
-handle_info(Info, State) ->
- showroom_log:message(info, "membership: ignored info: ~p", [Info]),
- {noreply, State}.
-
-
-% terminate
-terminate(_Reason, _State) ->
- ok.
-
-
-% ignored code change
-code_change(OldVsn, State, _Extra) ->
- io:format("Unknown Old Version~nOldVsn: ~p~nState : ~p~n", [OldVsn, State]),
- {ok, State}.
-
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-
-%% @doc if Args has config use it, otherwise call configuration module
-%% most times Args will have config during testing runs
-%get_config(Args) ->
-% case proplists:get_value(config, Args) of
-% undefined -> configuration:get_config();
-% Any -> Any
-% end.
-
-
-get_test(Args) ->
- proplists:get_value(test, Args).
-
-
-%% @doc handle_init starts a node
-%% Most of the time, this puts the node in a single-node cluster setup,
-%% But, we could be automatically rejoining a cluster after some downtime.
-%% See handle_join for initing, joining, leaving a cluster, or replacing a
-%% node.
-%% @end
-handle_init(Test, nil) ->
- int_reset(Test);
-
-handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
- % there's an old state, let's try to rejoin automatically
- % but only if we can compare our old state to other available
- % nodes and get a match... otherwise get a human involved
- {_, NodeList, _} = lists:unzip3(Nodes),
- ping_all_yall(NodeList),
- {RemoteStates, _BadNodes} = get_remote_states(NodeList),
- Test = get_test(Args),
- case compare_state_with_rest(OldState, RemoteStates) of
- match ->
- showroom_log:message(info, "membership: rejoined successfully", []),
- OldState;
- Other ->
- showroom_log:message(error, "membership: rejoin failed: ~p", [Other]),
- int_reset(Test)
- end.
-
-
-%% @doc handle join activities, return {ok,NewState}
--spec handle_join(join_type(), [mem_node()], ping_node(), mem_state()) ->
- {ok, mem_state()}.
-% init
-handle_join(init, ExtNodes, nil, State) ->
- {_,Nodes,_} = lists:unzip3(ExtNodes),
- ping_all_yall(Nodes),
- int_join(ExtNodes, State);
-% join
-handle_join(join, ExtNodes, PingNode, #mem{args=Args} = State) ->
- NewState = case get_test(Args) of
- undefined -> get_pingnode_state(PingNode);
- _ -> State % testing, so meh
- end,
- % now use this info to join the ring
- int_join(ExtNodes, NewState);
-% replace
-handle_join(replace, OldNode, PingNode, State) when is_atom(OldNode) ->
- handle_join(replace, {OldNode, []}, PingNode, State);
-handle_join(replace, [OldNode | _], PingNode, State) ->
- handle_join(replace, {OldNode, []}, PingNode, State);
-handle_join(replace, {OldNode, NewOpts}, PingNode, State) ->
- OldState = #mem{nodes=OldNodes} = get_pingnode_state(PingNode),
- {Order, OldNode, _OldOpts} = lists:keyfind(OldNode, 2, OldNodes),
- NewNodes = lists:keyreplace(OldNode, 2, OldNodes, {Order, node(), NewOpts}),
- notify(node_leave, [OldNode], State),
- int_join([], OldState#mem{nodes=NewNodes});
-% leave
-handle_join(leave, [OldNode | _], _PingNode, State) ->
- % TODO implement me
- notify(node_leave, [OldNode], State),
- ok;
-
-handle_join(JoinType, _, PingNode, _) ->
- showroom_log:message(info, "membership: unknown join type: ~p "
- "for ping node: ~p", [JoinType, PingNode]),
- {error, unknown_join_type}.
-
-%% @doc common operations for all join types
-int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) ->
- NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
- check_pos(Pos, N, Nodes),
- notify(node_join, [N], State),
- [New|AccIn]
- end, Nodes, ExtNodes),
- NewNodes1 = lists:sort(NewNodes),
- NewClock = mem3_vclock:increment(node(), Clock),
- NewState = State#mem{nodes=NewNodes1, clock=NewClock},
- install_new_state(NewState),
- {ok, NewState}.
-
-
-install_new_state(#mem{args=Args} = State) ->
- Test = get_test(Args),
- save_state_file(Test, State),
- gossip(call, Test, State).
-
-
-get_pingnode_state(PingNode) ->
- {ok, RemoteState} = gen_server:call({?SERVER, PingNode}, state),
- RemoteState.
-
-
-%% @doc handle the gossip messages
-%% We're not using mem3_vclock:resolve b/c we need custom merge strategy
-handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
- LocalState=#mem{clock=LocalClock}) ->
- case mem3_vclock:compare(RemoteClock, LocalClock) of
- equal ->
- {reply, ok, LocalState};
- less ->
- % remote node needs updating
- {reply, {new_state, LocalState}, LocalState};
- greater when From == none->
- {noreply, install_new_state(RemoteState)};
- greater ->
- % local node needs updating
- gen_server:reply(From, ok), % reply to sender first
- {noreply, install_new_state(RemoteState)};
- concurrent ->
- % ick, so let's resolve and merge states
- showroom_log:message(info,
- "membership: Concurrent Clocks~n"
- "RemoteState : ~p~nLocalState : ~p~n"
- , [RemoteState, LocalState]),
- MergedState = merge_states(RemoteState, LocalState),
- if From =/= none ->
- % reply to sender
- gen_server:reply(From, {new_state, MergedState})
- end,
- {noreply, install_new_state(MergedState)}
- end.
-
-
-merge_states(#mem{clock=RemoteClock, nodes=RemoteNodes} = _RemoteState,
- #mem{clock=LocalClock, nodes=LocalNodes} = LocalState) ->
- MergedClock = mem3_vclock:merge(RemoteClock, LocalClock),
- MergedNodes = merge_nodes(RemoteNodes, LocalNodes),
- LocalState#mem{clock=MergedClock, nodes=MergedNodes}.
-
-
-%% this will give one of the lists back, deterministically
-merge_nodes(Remote, Local) ->
- % get rid of the initial 0 node if it's still there, and sort
- Remote1 = lists:usort(lists:keydelete(0,1,Remote)),
- Local1 = lists:usort(lists:keydelete(0,1,Local)),
- % handle empty lists as well as other cases
- case {Remote1, Local1} of
- {[], L} -> L;
- {R, []} -> R;
- _ -> erlang:min(Remote1, Local1)
- end.
-
-
-gossip(#mem{args=Args} = NewState) ->
- Test = get_test(Args),
- gossip(call, Test, NewState).
-
-
-gossip_cast(#mem{nodes=[]}) -> ok;
-gossip_cast(#mem{args=Args} = NewState) ->
- Test = get_test(Args),
- gossip(cast, Test, NewState).
-
-
--spec gossip(gossip_fun(), test(), mem_state()) -> mem_state().
-gossip(_, _, #mem{nodes=[]}) -> ok;
-gossip(Fun, undefined, #mem{nodes=StateNodes} = State) ->
- {_, Nodes, _} = lists:unzip3(StateNodes),
- case next_up_node(Nodes) of
- no_gossip_targets_available ->
- State; % skip gossip, I'm the only node
- TargetNode ->
- showroom_log:message(info, "membership: firing gossip from ~p to ~p",
- [node(), TargetNode]),
- case gen_server:Fun({?SERVER, TargetNode}, {gossip, State}) of
- ok -> State;
- {new_state, NewState} -> NewState;
- Error -> throw({unknown_gossip_response, Error})
- end
- end;
-
-gossip(_,_,_) ->
- % testing, so don't gossip
- ok.
-
-
-next_up_node(Nodes) ->
- next_up_node(node(), Nodes, up_nodes()).
-
-
-next_up_node(Node, Nodes, UpNodes) ->
- {A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
- List = lists:append(B, A), % be sure to eliminate Node
- DownNodes = Nodes -- UpNodes,
- case List -- DownNodes of
- [Target|_] -> Target;
- [] -> no_gossip_targets_available
- end.
-
-
-up_nodes() ->
- % TODO: implement cache (fb 9704 & 9449)
- erlang:nodes().
-
-
-%% @doc find the latest state file on disk
-find_latest_state_filename() ->
- Dir = couch_config:get("couchdb", "database_dir"),
- case file:list_dir(Dir) of
- {ok, Filenames} ->
- Timestamps = [list_to_integer(TS) || {?STATE_FILE_PREFIX, TS} <-
- [list_to_tuple(string:tokens(FN, ".")) || FN <- Filenames]],
- SortedTimestamps = lists:reverse(lists:sort(Timestamps)),
- case SortedTimestamps of
- [Latest | _] ->
- {ok, Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++
- integer_to_list(Latest)};
- _ ->
- throw({error, mem_state_file_not_found})
- end;
- {error, Reason} ->
- throw({error, Reason})
- end.
-
-
-%% (Test, Config)
-read_latest_state_file(undefined) ->
- try
- {ok, File} = find_latest_state_filename(),
- case file:consult(File) of
- {ok, [#mem{}=State]} -> State;
- _Else ->
- throw({error, bad_mem_state_file})
- end
- catch _:Error ->
- showroom_log:message(info, "membership: ~p", [Error]),
- nil
- end;
-read_latest_state_file(_) ->
- nil.
-
-
-%% @doc save the state file to disk, with current timestamp.
-%% thx to riak_ring_manager:do_write_ringfile/1
--spec save_state_file(test(), mem_state()) -> ok.
-save_state_file(undefined, State) ->
- Dir = couch_config:get("couchdb", "database_dir"),
- {{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
- TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
- [Year, Month, Day, Hour, Minute, Second]),
- FN = Dir ++ "/" ++ ?STATE_FILE_PREFIX ++ "." ++ TS,
- ok = filelib:ensure_dir(FN),
- {ok, File} = file:open(FN, [binary, write]),
- io:format(File, "~w.~n", [State]),
- file:close(File);
-
-save_state_file(_,_) -> ok. % don't save if testing
-
-
-check_pos(Pos, Node, Nodes) ->
- Found = lists:keyfind(Pos, 1, Nodes),
- case Found of
- false -> ok;
- _ ->
- {_,OldNode,_} = Found,
- if
- OldNode =:= Node ->
- Msg = "node_exists_at_position_" ++ integer_to_list(Pos),
- throw({error, list_to_binary(Msg)});
- true ->
- Msg = "position_exists_" ++ integer_to_list(Pos),
- throw({error, list_to_binary(Msg)})
- end
- end.
-
-
-int_reset(Test) ->
- int_reset(Test, #mem{}).
-
-
-int_reset(_Test, State) ->
- State#mem{nodes=[], clock=[]}.
-
-
-ping_all_yall(Nodes) ->
- lists:foreach(fun(Node) ->
- net_adm:ping(Node)
- end, Nodes),
- timer:sleep(500). % sigh.
-
-
-get_remote_states(NodeList) ->
- NodeList1 = lists:delete(node(), NodeList),
- {States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000),
- {_Status, States2} = lists:unzip(States1),
- NodeList2 = NodeList1 -- BadNodes,
- {lists:zip(NodeList2,States2), BadNodes}.
-
-
-%% @doc compare state with states based on vector clock
-%% return match | {bad_state_match, Node, NodesThatDontMatch}
-compare_state_with_rest(#mem{clock=Clock} = _State, States) ->
- Results = lists:map(fun({Node, #mem{clock=Clock1}}) ->
- {mem3_vclock:equals(Clock, Clock1), Node}
- end, States),
- BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn;
- ({false, N}, AccIn) -> [N | AccIn]
- end, [], Results),
- if
- length(BadResults) == 0 -> match;
- true -> {bad_state_match, node(), BadResults}
- end.
-
-notify(Type, Nodes, #mem{nodes=MemNodesList} = _State) ->
- {_,MemNodes,_} = lists:unzip3(lists:keysort(1, MemNodesList)),
- lists:foreach(fun(Node) ->
- case lists:member(Node, MemNodes) orelse Type == nodedown of
- true ->
- gen_event:notify(membership_events, {Type, Node});
- _ -> ok % node not in cluster
- end
- end, Nodes).
diff --git a/src/mem3_sup.erl b/src/mem3_sup.erl
index 0a9f66d0..353216d4 100644
--- a/src/mem3_sup.erl
+++ b/src/mem3_sup.erl
@@ -7,10 +7,10 @@ start_link() ->
init(_Args) ->
Children = [
- child(mem3_server),
child(mem3_events),
child(mem3_sync),
- child(mem3_cache)
+ child(mem3_cache),
+ child(mem3_nodes)
],
{ok, {{one_for_one,10,1}, Children}}.
diff --git a/src/mem3_sync.erl b/src/mem3_sync.erl
index d50514d9..0f402834 100644
--- a/src/mem3_sync.erl
+++ b/src/mem3_sync.erl
@@ -1,46 +1,214 @@
-module(mem3_sync).
--behaviour(supervisor).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
--export([start_link/0, init/1, childspec/1, sup_upgrade_notify/2]).
+-export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]).
--include("mem3.hrl").
+-include_lib("../../couch/src/couch_db.hrl").
+
+-record(state, {
+ active = [],
+ count = 0,
+ limit,
+ dict = dict:new(),
+ waiting = [],
+ update_notifier
+}).
start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_active() ->
+ gen_server:call(?MODULE, get_active).
+
+get_queue() ->
+ gen_server:call(?MODULE, get_queue).
+
+push(Db, Node) ->
+ gen_server:cast(?MODULE, {push, Db, Node}).
+
+remove_node(Node) ->
+ gen_server:cast(?MODULE, {remove_node, Node}).
init([]) ->
- {ok, MemNodes} = mem3:nodes(),
- LiveNodes = nodes(),
- ChildSpecs = [childspec(N) || N <- MemNodes, lists:member(N, LiveNodes)],
- gen_event:add_handler(membership_events, dbs_event, []),
- {ok, {{one_for_one, 10, 8}, ChildSpecs}}.
-
-childspec(Node) ->
- ?LOG_INFO("dbs repl ~p --> ~p starting", [node(), Node]),
+ process_flag(trap_exit, true),
+ Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
+ gen_event:add_handler(mem3_events, mem3_sync_event, []),
+ {ok, Pid} = start_update_notifier(),
+ spawn(fun initial_sync/0),
+ {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+
+handle_call(get_active, _From, State) ->
+ {reply, State#state.active, State};
+
+handle_call(get_queue, _From, State) ->
+ {reply, State#state.waiting, State}.
+
+handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State)
+ when Count >= Limit ->
+ {noreply, add_to_queue(State, DbName, Node)};
+
+handle_cast({push, DbName, Node}, State) ->
+ #state{active = L, count = C} = State,
+ case is_running(DbName, Node, L) of
+ true ->
+ {noreply, add_to_queue(State, DbName, Node)};
+ false ->
+ Pid = start_push_replication(DbName, Node),
+ {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}}
+ end;
+
+handle_cast({remove_node, Node}, State) ->
+ Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node],
+ Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end,
+ State#state.dict, [S || {S,N} <- Waiting, N =:= Node]),
+ {noreply, State#state{dict = Dict, waiting = Waiting}}.
+
+handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
+ {ok, NewPid} = start_update_notifier(),
+ {noreply, State#state{update_notifier=NewPid}};
+
+handle_info({'EXIT', Active, normal}, State) ->
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, Reason}, State) ->
+ case lists:keyfind(Active, 3, State#state.active) of
+ {OldDbName, OldNode, _} ->
+ ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName,
+ OldNode, Reason]),
+ timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]);
+ false -> ok end,
+ handle_replication_exit(State, Active);
+
+handle_info(Msg, State) ->
+ ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active],
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_replication_exit(#state{waiting=[]} = State, Pid) ->
+ NewActive = lists:keydelete(Pid, 3, State#state.active),
+ {noreply, State#state{active=NewActive, count=length(NewActive)}};
+handle_replication_exit(State, Pid) ->
+ #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
+ Active1 = lists:keydelete(Pid, 3, Active),
+ Count = length(Active1),
+ NewState = if Count < Limit ->
+ case next_replication(Active1, Waiting) of
+ nil -> % all waiting replications are also active
+ State#state{active = Active1, count = Count};
+ {DbName, Node, StillWaiting} ->
+ NewPid = start_push_replication(DbName, Node),
+ State#state{
+ active = [{DbName, Node, NewPid} | Active1],
+ count = Count+1,
+ dict = dict:erase({DbName,Node}, D),
+ waiting = StillWaiting
+ }
+ end;
+ true ->
+ State#state{active = Active1, count=Count}
+ end,
+ {noreply, NewState}.
+
+start_push_replication(DbName, Node) ->
PostBody = {[
- {<<"source">>, <<"dbs">>},
- {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, <<"dbs">>}]}},
- {<<"continuous">>, true}
+ {<<"source">>, DbName},
+ {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}},
+ {<<"continuous">>, false},
+ {<<"async">>, true}
]},
- Id = couch_util:to_hex(erlang:md5(term_to_binary([node(), Node]))),
- MFA = {couch_rep, start_link, [Id, PostBody, #user_ctx{}]},
- {Node, MFA, permanent, 100, worker, [couch_rep]}.
-
-% from http://code.google.com/p/erlrc/wiki/ErlrcHowto
-sup_upgrade_notify (_Old, _New) ->
- {ok, {_, Specs}} = init([]),
-
- Old = sets:from_list(
- [Name || {Name, _, _, _} <- supervisor:which_children(?MODULE)]),
- New = sets:from_list([Name || {Name, _, _, _, _, _} <- Specs]),
- Kill = sets:subtract(Old, New),
-
- sets:fold(fun(Id, ok) ->
- supervisor:terminate_child(?MODULE, Id),
- supervisor:delete_child(?MODULE, Id),
- ok
- end,
- ok,
- Kill),
- [supervisor:start_child (?MODULE, Spec) || Spec <- Specs ],
- ok.
+ ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]),
+ UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]},
+ case (catch couch_rep:replicate(PostBody, UserCtx)) of
+ Pid when is_pid(Pid) ->
+ link(Pid),
+ Pid;
+ {db_not_found, _Msg} ->
+ case couch_api:open_db(DbName, []) of
+ {ok, Db} ->
+ % source exists, let's (re)create the target
+ couch_api:close_db(Db),
+ case rpc:call(Node, couch_api, create_db, [DbName, []]) of
+ {ok, Target} ->
+ ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName,
+ Node]),
+ couch_api:close_db(Target),
+ start_push_replication(DbName, Node);
+ file_exists ->
+ start_push_replication(DbName, Node);
+ Error ->
+ ?LOG_ERROR("~p couldn't create ~s on ~p because ~p",
+ [?MODULE, DbName, Node, Error]),
+ exit(shutdown)
+ end;
+ {not_found, no_db_file} ->
+ % source is gone, so this is a hack to skip it
+ ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted",
+ [?MODULE, DbName, Node]),
+ spawn_link(fun() -> ok end)
+ end;
+ {node_not_connected, _} ->
+ % we'll get this one when the node rejoins
+ ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]),
+ spawn_link(fun() -> ok end);
+ CatchAll ->
+ ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]),
+ case lists:member(Node, nodes()) of
+ true ->
+ timer:apply_after(5000, ?MODULE, push, [DbName, Node]);
+ false ->
+ ok
+ end,
+ spawn_link(fun() -> ok end)
+ end.
+
+add_to_queue(State, DbName, Node) ->
+ #state{dict=D, waiting=Waiting} = State,
+ case dict:is_key({DbName, Node}, D) of
+ true ->
+ State;
+ false ->
+ ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]),
+ State#state{
+ dict = dict:store({DbName,Node}, ok, D),
+ waiting = Waiting ++ [{DbName,Node}]
+ }
+ end.
+
+initial_sync() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)].
+
+start_update_notifier() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ couch_db_update_notifier:start_link(fun
+ ({updated, Db}) when Db == Db1; Db == Db2 ->
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
+ (_) -> ok end).
+
+%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
+%% which does not correspond to an already running replication
+-spec next_replication(list(), list()) -> {binary(),node(),list()} | nil.
+next_replication(Active, Waiting) ->
+ case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of
+ {_, []} ->
+ nil;
+ {Running, [{DbName,Node}|Rest]} ->
+ {DbName, Node, Running ++ Rest}
+ end.
+
+is_running(DbName, Node, ActiveList) ->
+ [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node].
diff --git a/src/mem3_sync_event.erl b/src/mem3_sync_event.erl
index 55f3840c..45fcb8aa 100644
--- a/src/mem3_sync_event.erl
+++ b/src/mem3_sync_event.erl
@@ -7,11 +7,26 @@
init(_) ->
{ok, nil}.
-handle_event({Up, Node}, State) when Up == nodeup; Up == node_join ->
- mem3_sync:add_node(Node);
-
-handle_event({Down, Node}, State) when Down == nodedown; Down == node_leave ->
- mem3_sync:remove_node(Node);
+handle_event({add_node, Node}, State) ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]],
+ {ok, State};
+
+handle_event({nodeup, Node}, State) ->
+ case lists:member(Node, mem3:nodes()) of
+ true ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]];
+ false ->
+ ok
+ end,
+ {ok, State};
+
+handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
handle_event(_Event, State) ->
{ok, State}.
diff --git a/src/mem3_util.erl b/src/mem3_util.erl
index 476742b7..b05faa15 100644
--- a/src/mem3_util.erl
+++ b/src/mem3_util.erl
@@ -1,170 +1,52 @@
-module(mem3_util).
-author('brad@cloudant.com').
-%% API
--export([fullmap/2, fullmap/3, hash/1, install_fullmap/4]).
--export([for_key/2, all_parts/1]).
--export([shard_name/2, build_shards/2]).
+-export([hash/1, name_shard/1, create_partition_map/4, build_shards/2,
+ n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1]).
--define(RINGTOP, trunc(math:pow(2,160))). % SHA-1 space
+-define(RINGTOP, 2 bsl 31). % CRC32 space
-include("mem3.hrl").
-%%====================================================================
-%% API
-%%====================================================================
-
-%% @doc build a full partition map
-fullmap(DbName, Options) ->
- {ok, Nodes} = mem3:nodes(),
- fullmap(DbName, Nodes, Options).
-
-fullmap(DbName, Nodes, Options) ->
- {N,Q} = db_init_constants(Options),
- NewNodes = ordered_nodes(DbName, Nodes),
- Pmap = pmap(Q, NewNodes),
- int_fullmap(DbName, N, Pmap, NewNodes).
-
-%% @spec hash(term()) -> Digest::binary()
-%% @doc uses SHA-1 as its hash
hash(Item) when is_binary(Item) ->
- crypto:sha(Item);
+ erlang:crc32(Item);
hash(Item) ->
- crypto:sha(term_to_binary(Item)).
-
-install_fullmap(DbName, Fullmap, FullNodes, Options) ->
- {N,Q} = db_init_constants(Options),
- Doc = {[{<<"_id">>,DbName},
- {<<"map">>, jsonify(<<"map">>, Fullmap)},
- {<<"nodes">>, jsonify(<<"nodes">>, FullNodes)},
- {<<"n">>,N},
- {<<"q">>,Q}]},
- write_db_doc(Doc).
-
-for_key(DbName, Key) ->
- <<HashKey:160/integer>> = hash(Key),
- Head = #shard{
- name = '_',
- node = '_',
- dbname = DbName,
- range = ['$1','$2'],
- ref = '_'
- },
- % TODO these conditions assume A < B, which we don't require
- Conditions = [{'<', '$1', HashKey}, {'<', HashKey, '$2'}],
- case ets:select(partitions, [{Head, Conditions, ['$_']}]) of
- [] ->
- erlang:error(database_does_not_exist);
- Shards ->
- Shards
- end.
-
-all_parts(DbName) ->
- case ets:lookup(partitions, DbName) of
- [] ->
- erlang:error(database_does_not_exist);
- Else ->
- Else
- end.
-
-%%====================================================================
-%% Internal functions
-%%====================================================================
+ erlang:crc32(term_to_binary(Item)).
+
+name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) ->
+ Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>), "/", DbName],
+ Shard#shard{name = ?l2b(Name)}.
+
+create_partition_map(DbName, N, Q, Nodes) ->
+ UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []),
+ Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]),
+ Shards1 = attach_nodes(Shards0, [], Nodes, []),
+ [name_shard(S#shard{dbname=DbName}) || S <- Shards1].
+
+make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP ->
+ Acc;
+make_key_ranges(Increment, Start, Acc) ->
+ case Start + 2*Increment of
+ X when X > ?RINGTOP ->
+ End = ?RINGTOP - 1;
+ _ ->
+ End = Start + Increment - 1
+ end,
+ make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]).
-%% @doc get cluster constants from options or config
-db_init_constants(Options) ->
- {const(n, Options), const(q, Options)}.
+attach_nodes([], Acc, _, _) ->
+ lists:reverse(Acc);
+attach_nodes(Shards, Acc, [], UsedNodes) ->
+ attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []);
+attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
+ attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
-%% @doc get individual constant
-const(Const, Options) ->
- ListResult = case couch_util:get_value(Const, Options) of
- undefined -> couch_config:get("cluster", atom_to_list(Const));
- Val -> Val
- end,
- list_to_integer(ListResult).
-
-%% @doc hash the dbname, and return the corresponding node for seeding a ring
-seednode(DbName, Nodes) ->
- <<HashInt:160/integer>> = hash(DbName),
- Size = partition_range(length(Nodes)),
- Factor = (HashInt div Size),
- lists:nth(Factor+1, Nodes).
-
-%% @doc take the list of nodes, and rearrange it, starting with the node that
-%% results from hashing the Term
-ordered_nodes(Term, Nodes) ->
- SeedNode = seednode(Term, Nodes),
- {A, B} = lists:splitwith(fun(N) -> N /= SeedNode end, Nodes),
- lists:append(B,A).
-
-%% @doc create a partition map
-pmap(NumPartitions, Nodes) ->
- Increment = ?RINGTOP div NumPartitions,
- Parts = parts(?RINGTOP, Increment, 0, []),
- make_map(Nodes, Nodes, Parts, []).
-
-%% @doc makes a {beg, end} list of partition ranges
-%% last range may have an extra few values, because Increment is created
-%% with Ringtop 'div' NumPartitions above.
-parts(Top, _, Beg, Acc) when Beg > Top -> Acc;
-parts(Top, Increment, Beg, Acc) ->
- End = case Beg + 2*Increment of
- Over when Over > Top -> Top;
- _ -> Beg + Increment - 1
- end,
- NewAcc = [{Beg, End} | Acc],
- parts(Top, Increment, End+1, NewAcc).
-
-%% @doc create a full map, which is a pmap with N-1 replication partner nodes
-%% added per partition
-int_fullmap(DbName, N, Pmap, Nodes) ->
- Full = lists:foldl(fun({Node,{B,E} = Part}, AccIn) ->
- Primary = [#shard{dbname=DbName, node=Node, range=[B,E],
- name=shard_name(B,DbName)}],
- Partners = partners(DbName, N, Node, Nodes, Part),
- lists:append([Primary, Partners, AccIn])
- end, [], Pmap),
- lists:reverse(Full).
-
-partners(DbName, N, Node, Nodes, {Beg,End}) ->
- {A, [Node|B]} = lists:splitwith(fun(Nd) -> Nd /= Node end, Nodes),
- Nodes1 = lists:append(B,A),
- Partners = lists:sublist(Nodes1, N-1), % N-1 replication partner nodes
- lists:map(fun(Partner) ->
- #shard{dbname=DbName, node=Partner, range=[Beg,End],
- name=shard_name(Beg,DbName)}
- end, Partners).
-
-%% @doc size of one partition in the ring
-partition_range(Q) ->
- trunc( ?RINGTOP / Q ). % SHA-1 space / Q
-
-%% @doc assign nodes to each of the partitions. When you run out of nodes,
-%% start at the beginning of the node list again.
-%% The provided node list starts with the seed node (seednode fun)
-make_map(_,_,[], Acc) ->
- lists:keysort(2,Acc);
-make_map(AllNodes, [], Parts, Acc) ->
- % start back at beginning of node list
- make_map(AllNodes, AllNodes, Parts, Acc);
-make_map(AllNodes, [Node|RestNodes], [Part|RestParts], Acc) ->
- % add a node/part combo to the Acc
- make_map(AllNodes, RestNodes, RestParts, [{Node,Part}|Acc]).
-
-jsonify(<<"map">>, Map) ->
- lists:map(fun(#shard{node=Node, range=[Beg,End]}) ->
- {[{<<"node">>, Node}, {<<"b">>, Beg}, {<<"e">>, End}]}
- end, Map);
-jsonify(<<"nodes">>, Nodes) ->
- lists:map(fun({Order, Node, Options}) ->
- {[{<<"order">>, Order}, {<<"node">>, Node}, {<<"options">>, Options}]}
- end, Nodes).
-
-write_db_doc(EDoc) ->
+write_db_doc(Doc) ->
{ok, Db} = couch_db:open(<<"dbs">>, []),
try
- update_db_doc(Db, couch_doc:from_json_obj(EDoc))
- catch {conflict, _} ->
+ update_db_doc(Db, Doc)
+ catch conflict ->
?LOG_ERROR("conflict writing db doc, must be a race", [])
after
couch_db:close(Db)
@@ -180,22 +62,55 @@ update_db_doc(Db, #doc{id=Id, body=Body} = Doc) ->
{ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, [])
end.
-shard_name(Part, DbName) when is_list(DbName) ->
- shard_name(Part, ?l2b(DbName));
-shard_name(Part, DbName) ->
- PartHex = ?l2b(showroom_utils:int_to_hexstr(Part)),
- <<"x", PartHex/binary, "/", DbName/binary, "_", PartHex/binary>>.
+delete_db_doc(DocId) ->
+ {ok, Db} = couch_db:open(<<"dbs">>, []),
+ try
+ delete_db_doc(Db, DocId)
+ catch conflict ->
+ ok
+ after
+ couch_db:close(Db)
+ end.
+
+delete_db_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, []) of
+ {not_found, _} ->
+ ok;
+ {ok, OldDoc} ->
+ {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, [])
+ end.
build_shards(DbName, DocProps) ->
- lists:map(fun({Map}) ->
- Begin = couch_util:get_value(<<"b">>, Map),
- #shard{
- name = mem3_util:shard_name(Begin, DbName),
- dbname = DbName,
- node = to_atom(couch_util:get_value(<<"node">>, Map)),
- range = [Begin, couch_util:get_value(<<"e">>, Map)]
- }
- end, couch_util:get_value(<<"map">>, DocProps, {[]})).
+ {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
+ lists:flatmap(fun({Node, Ranges}) ->
+ lists:map(fun(Range) ->
+ [B,E] = string:tokens(?b2l(Range), "-"),
+ Beg = httpd_util:hexlist_to_integer(B),
+ End = httpd_util:hexlist_to_integer(E),
+ name_shard(#shard{
+ dbname = DbName,
+ node = to_atom(Node),
+ range = [Beg, End]
+ })
+ end, Ranges)
+ end, ByNode).
to_atom(Node) when is_binary(Node) ->
- list_to_atom(binary_to_list(Node)).
+ list_to_atom(binary_to_list(Node));
+to_atom(Node) when is_atom(Node) ->
+ Node.
+
+to_integer(N) when is_integer(N) ->
+ N;
+to_integer(N) when is_binary(N) ->
+ list_to_integer(binary_to_list(N));
+to_integer(N) when is_list(N) ->
+ list_to_integer(N).
+
+n_val(undefined, NodeCount) ->
+ n_val(list_to_integer(couch_config:get("cluster", "n", "3")), NodeCount);
+n_val(N, NodeCount) when N > NodeCount ->
+ ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]),
+ NodeCount;
+n_val(N, _) ->
+ N.
diff --git a/src/mem3_vclock.erl b/src/mem3_vclock.erl
deleted file mode 100644
index a48da43c..00000000
--- a/src/mem3_vclock.erl
+++ /dev/null
@@ -1,109 +0,0 @@
-%%% @author Cliff Moon <cliff@powerset.com> []
-%%% @copyright 2008 Cliff Moon
-
--module (mem3_vclock).
--export ([create/1, truncate/1, increment/2, compare/2, resolve/2, merge/2,
- equals/2]).
-
-%% -ifdef(TEST).
-%% -include("etest/vector_clock_test.erl").
-%% -endif.
-
-create(NodeName) -> [{NodeName, now_float()}].
-
-truncate(Clock) when length(Clock) > 10 ->
- lists:nthtail(length(Clock) - 10, lists:keysort(2, Clock));
-
-truncate(Clock) -> Clock.
-
-increment(NodeName, [{NodeName, _Version}|Clocks]) ->
- [{NodeName, now_float()}|Clocks];
-
-increment(NodeName, [NodeClock|Clocks]) ->
- [NodeClock|increment(NodeName, Clocks)];
-
-increment(NodeName, []) ->
- [{NodeName, now_float()}].
-
-resolve({ClockA, ValuesA}, {ClockB, ValuesB}) ->
- case compare(ClockA, ClockB) of
- less -> {ClockB, ValuesB};
- greater -> {ClockA, ValuesA};
- equal -> {ClockA, ValuesA};
- concurrent ->
- showroom_log:message(info,
- "~nConcurrent Clocks~n"
- "ClockA : ~p~nClockB : ~p~n"
- "ValuesA: ~p~nValuesB: ~p~n"
- , [ClockA, ClockB, ValuesA, ValuesB]),
- {merge(ClockA,ClockB), ValuesA ++ ValuesB}
- end;
-resolve(not_found, {Clock, Values}) ->
- {Clock, Values};
-resolve({Clock, Values}, not_found) ->
- {Clock, Values}.
-
-merge(ClockA, ClockB) ->
- merge([], ClockA, ClockB).
-
-merge(Merged, [], ClockB) -> lists:keysort(1, Merged ++ ClockB);
-
-merge(Merged, ClockA, []) -> lists:keysort(1, Merged ++ ClockA);
-
-merge(Merged, [{NodeA, VersionA}|ClockA], ClockB) ->
- case lists:keytake(NodeA, 1, ClockB) of
- {value, {NodeA, VersionB}, TrunkClockB} when VersionA > VersionB ->
- merge([{NodeA,VersionA}|Merged],ClockA,TrunkClockB);
- {value, {NodeA, VersionB}, TrunkClockB} ->
- merge([{NodeA,VersionB}|Merged],ClockA,TrunkClockB);
- false ->
- merge([{NodeA,VersionA}|Merged],ClockA,ClockB)
- end.
-
-compare(ClockA, ClockB) ->
- AltB = less_than(ClockA, ClockB),
- if AltB -> less; true ->
- BltA = less_than(ClockB, ClockA),
- if BltA -> greater; true ->
- AeqB = equals(ClockA, ClockB),
- if AeqB -> equal; true -> concurrent end
- end
- end.
-
-%% ClockA is less than ClockB if and only if ClockA[z] <= ClockB[z] for all
-%% instances z and there exists an index z' such that ClockA[z'] < ClockB[z']
-less_than(ClockA, ClockB) ->
- ForAll = lists:all(fun({Node, VersionA}) ->
- case lists:keysearch(Node, 1, ClockB) of
- {value, {_NodeB, VersionB}} -> VersionA =< VersionB;
- false -> false
- end
- end, ClockA),
- Exists = lists:any(fun({NodeA, VersionA}) ->
- case lists:keysearch(NodeA, 1, ClockB) of
- {value, {_NodeB, VersionB}} -> VersionA /= VersionB;
- false -> true
- end
- end, ClockA),
- %length takes care of the case when clockA is shorter than B
- ForAll and (Exists or (length(ClockA) < length(ClockB))).
-
-equals(ClockA, ClockB) ->
- Equivalent = lists:all(fun({NodeA, VersionA}) ->
- lists:any(fun(NodeClockB) ->
- case NodeClockB of
- {NodeA, VersionA} -> true;
- _ -> false
- end
- end, ClockB)
- end, ClockA),
- Equivalent and (length(ClockA) == length(ClockB)).
-
-now_float() ->
- time_to_epoch_float(now()).
-
-time_to_epoch_float(Time) when is_integer(Time) or is_float(Time) ->
- Time;
-
-time_to_epoch_float({Mega,Sec,Micro}) ->
- Mega * 1000000 + Sec + Micro / 1000000.