summaryrefslogtreecommitdiff
path: root/apps/mem3
diff options
context:
space:
mode:
Diffstat (limited to 'apps/mem3')
-rw-r--r--apps/mem3/ebin/mem3.app24
-rw-r--r--apps/mem3/ebin/mem3.appup3
-rw-r--r--apps/mem3/include/mem3.hrl30
-rw-r--r--apps/mem3/src/mem3.erl103
-rw-r--r--apps/mem3/src/mem3_app.erl9
-rw-r--r--apps/mem3/src/mem3_cache.erl92
-rw-r--r--apps/mem3/src/mem3_httpd.erl39
-rw-r--r--apps/mem3/src/mem3_nodes.erl120
-rw-r--r--apps/mem3/src/mem3_sup.erl21
-rw-r--r--apps/mem3/src/mem3_sync.erl215
-rw-r--r--apps/mem3/src/mem3_sync_event.erl44
-rw-r--r--apps/mem3/src/mem3_util.erl139
-rw-r--r--apps/mem3/test/01-config-default.ini2
-rw-r--r--apps/mem3/test/mem3_util_test.erl140
14 files changed, 981 insertions, 0 deletions
diff --git a/apps/mem3/ebin/mem3.app b/apps/mem3/ebin/mem3.app
new file mode 100644
index 00000000..0613ab26
--- /dev/null
+++ b/apps/mem3/ebin/mem3.app
@@ -0,0 +1,24 @@
+{application, mem3, [
+ {description, "CouchDB Cluster Membership"},
+ {mod, {mem3_app, []}},
+ {vsn, "1.0.1"},
+ {modules, [
+ mem3,
+ mem3_app,
+ mem3_cache,
+ mem3_httpd,
+ mem3_nodes,
+ mem3_sup,
+ mem3_sync,
+ mem3_sync_event,
+ mem3_util
+ ]},
+ {registered, [
+ mem3_cache,
+ mem3_events,
+ mem3_nodes,
+ mem3_sync,
+ mem3_sup
+ ]},
+ {applications, [kernel, stdlib, sasl, crypto, mochiweb, couch]}
+]}.
diff --git a/apps/mem3/ebin/mem3.appup b/apps/mem3/ebin/mem3.appup
new file mode 100644
index 00000000..6e9ebe71
--- /dev/null
+++ b/apps/mem3/ebin/mem3.appup
@@ -0,0 +1,3 @@
+{"1.0.1",[{"1.0",[
+ {load_module, mem3_httpd}
+]}],[{"1.0",[]}]}.
diff --git a/apps/mem3/include/mem3.hrl b/apps/mem3/include/mem3.hrl
new file mode 100644
index 00000000..b9359b44
--- /dev/null
+++ b/apps/mem3/include/mem3.hrl
@@ -0,0 +1,30 @@
+% type specification hacked to suppress dialyzer warning re: match spec
+-record(shard, {
+ name :: binary() | '_',
+ node :: node() | '_',
+ dbname :: binary(),
+ range :: [non_neg_integer() | '$1' | '$2'],
+ ref :: reference() | 'undefined' | '_'
+}).
+
+%% types
+-type join_type() :: init | join | replace | leave.
+-type join_order() :: non_neg_integer().
+-type options() :: list().
+-type mem_node() :: {join_order(), node(), options()}.
+-type mem_node_list() :: [mem_node()].
+-type arg_options() :: {test, boolean()}.
+-type args() :: [] | [arg_options()].
+-type test() :: undefined | node().
+-type epoch() :: float().
+-type clock() :: {node(), epoch()}.
+-type vector_clock() :: [clock()].
+-type ping_node() :: node() | nil.
+-type gossip_fun() :: call | cast.
+
+-type part() :: #shard{}.
+-type fullmap() :: [part()].
+-type ref_part_map() :: {reference(), part()}.
+-type tref() :: reference().
+-type np() :: {node(), part()}.
+-type beg_acc() :: [integer()].
diff --git a/apps/mem3/src/mem3.erl b/apps/mem3/src/mem3.erl
new file mode 100644
index 00000000..1485c7fe
--- /dev/null
+++ b/apps/mem3/src/mem3.erl
@@ -0,0 +1,103 @@
+-module(mem3).
+
+-export([start/0, stop/0, restart/0, nodes/0, shards/1, shards/2,
+ choose_shards/2]).
+-export([compare_nodelists/0, compare_shards/1]).
+
+-include("mem3.hrl").
+
+start() ->
+ application:start(mem3).
+
+stop() ->
+ application:stop(mem3).
+
+restart() ->
+ stop(),
+ start().
+
+%% @doc Detailed report of cluster-wide membership state. Queries the state
+%% on all member nodes and builds a dictionary with unique states as the
+%% key and the nodes holding that state as the value. Also reports member
+%% nodes which fail to respond and nodes which are connected but are not
+%% cluster members. Useful for debugging.
+-spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes
+ | non_member_nodes, [node()]}].
+compare_nodelists() ->
+ Nodes = mem3:nodes(),
+ AllNodes = erlang:nodes([this, visible]),
+ {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist),
+ Dict = lists:foldl(fun({Node, Nodelist}, D) ->
+ orddict:append({cluster_nodes, Nodelist}, Node, D)
+ end, orddict:new(), Replies),
+ [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
+
+-spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}].
+compare_shards(DbName) when is_list(DbName) ->
+ compare_shards(list_to_binary(DbName));
+compare_shards(DbName) ->
+ Nodes = mem3:nodes(),
+ {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]),
+ GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)],
+ Dict = lists:foldl(fun({Shards, Node}, D) ->
+ orddict:append(Shards, Node, D)
+ end, orddict:new(), lists:zip(Replies, GoodNodes)),
+ [{bad_nodes, BadNodes} | Dict].
+
+-spec nodes() -> [node()].
+nodes() ->
+ mem3_nodes:get_nodelist().
+
+-spec shards(DbName::iodata()) -> [#shard{}].
+shards(DbName) when is_list(DbName) ->
+ shards(list_to_binary(DbName));
+shards(DbName) ->
+ try ets:lookup(partitions, DbName) of
+ [] ->
+ mem3_util:load_shards_from_disk(DbName);
+ Else ->
+ Else
+ catch error:badarg ->
+ mem3_util:load_shards_from_disk(DbName)
+ end.
+
+-spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+shards(DbName, DocId) when is_list(DbName) ->
+ shards(list_to_binary(DbName), DocId);
+shards(DbName, DocId) when is_list(DocId) ->
+ shards(DbName, list_to_binary(DocId));
+shards(DbName, DocId) ->
+ HashKey = mem3_util:hash(DocId),
+ Head = #shard{
+ name = '_',
+ node = '_',
+ dbname = DbName,
+ range = ['$1','$2'],
+ ref = '_'
+ },
+ Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}],
+ try ets:select(partitions, [{Head, Conditions, ['$_']}]) of
+ [] ->
+ mem3_util:load_shards_from_disk(DbName, DocId);
+ Shards ->
+ Shards
+ catch error:badarg ->
+ mem3_util:load_shards_from_disk(DbName, DocId)
+ end.
+
+-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}].
+choose_shards(DbName, Options) when is_list(DbName) ->
+ choose_shards(list_to_binary(DbName), Options);
+choose_shards(DbName, Options) ->
+ try shards(DbName)
+ catch error:E when E==database_does_not_exist; E==badarg ->
+ Nodes = mem3:nodes(),
+ NodeCount = length(Nodes),
+ N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
+ Q = mem3_util:to_integer(couch_util:get_value(q, Options,
+ couch_config:get("cluster", "q", "8"))),
+ % rotate to a random entry in the nodelist for even distribution
+ {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes),
+ RotatedNodes = B ++ A,
+ mem3_util:create_partition_map(DbName, N, Q, RotatedNodes)
+ end.
diff --git a/apps/mem3/src/mem3_app.erl b/apps/mem3/src/mem3_app.erl
new file mode 100644
index 00000000..88cd1ea1
--- /dev/null
+++ b/apps/mem3/src/mem3_app.erl
@@ -0,0 +1,9 @@
+-module(mem3_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(_Type, []) ->
+ mem3_sup:start_link().
+
+stop([]) ->
+ ok.
diff --git a/apps/mem3/src/mem3_cache.erl b/apps/mem3/src/mem3_cache.erl
new file mode 100644
index 00000000..2a29ca4c
--- /dev/null
+++ b/apps/mem3/src/mem3_cache.erl
@@ -0,0 +1,92 @@
+-module(mem3_cache).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0]).
+
+-record(state, {changes_pid}).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end),
+ {ok, #state{changes_pid = Pid}}.
+
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}},
+ #state{changes_pid=Pid} = State) ->
+ % fatal error, somebody deleted our ets table
+ {stop, ets_table_error, State};
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end,
+ timer:send_after(5000, {start_listener, Seq}),
+ {noreply, State};
+handle_info({start_listener, Seq}, State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state{changes_pid=Pid}) ->
+ exit(Pid, kill),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+listen_for_changes(Since) ->
+ DbName = ?l2b(couch_config:get("mem3", "db", "dbs")),
+ {ok, Db} = ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+ensure_exists(DbName) ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_db:open(DbName, Options) of
+ {ok, Db} ->
+ {ok, Db};
+ _ ->
+ couch_server:create(DbName, Options)
+ end.
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case couch_util:get_value(deleted, Change, false) of
+ true ->
+ ets:delete(partitions, DbName);
+ false ->
+ case couch_util:get_value(doc, Change) of
+ {error, Reason} ->
+ ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]);
+ {Doc} ->
+ ets:delete(partitions, DbName),
+ ets:insert(partitions, mem3_util:build_shards(DbName, Doc))
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
diff --git a/apps/mem3/src/mem3_httpd.erl b/apps/mem3/src/mem3_httpd.erl
new file mode 100644
index 00000000..cbfaea95
--- /dev/null
+++ b/apps/mem3/src/mem3_httpd.erl
@@ -0,0 +1,39 @@
+-module(mem3_httpd).
+
+-export([handle_membership_req/1]).
+
+%% includes
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)}
+ ]});
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ Shards = mem3:shards(DbName),
+ JsonShards = json_shards(Shards, dict:new()),
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)},
+ {partitions, JsonShards}
+ ]}).
+
+%%
+%% internal
+%%
+
+json_shards([], AccIn) ->
+ List = dict:to_list(AccIn),
+ {lists:sort(List)};
+json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) ->
+ HexBeg = couch_util:to_hex(<<B:32/integer>>),
+ json_shards(Rest, dict:append(HexBeg, Node, AccIn)).
diff --git a/apps/mem3/src/mem3_nodes.erl b/apps/mem3/src/mem3_nodes.erl
new file mode 100644
index 00000000..6cbf3d9a
--- /dev/null
+++ b/apps/mem3/src/mem3_nodes.erl
@@ -0,0 +1,120 @@
+-module(mem3_nodes).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_nodelist/0]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {changes_pid, update_seq, nodes}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_nodelist() ->
+ gen_server:call(?MODULE, get_nodelist).
+
+init([]) ->
+ {Nodes, UpdateSeq} = initialize_nodelist(),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end),
+ {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}.
+
+handle_call(get_nodelist, _From, State) ->
+ {reply, State#state.nodes, State};
+handle_call({add_node, Node}, _From, #state{nodes=Nodes} = State) ->
+ gen_event:notify(mem3_events, {add_node, Node}),
+ {reply, ok, State#state{nodes = lists:umerge([Node], Nodes)}};
+handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) ->
+ gen_event:notify(mem3_events, {remove_node, Node}),
+ {reply, ok, State#state{nodes = lists:delete(Node, Nodes)}};
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
+ StartSeq = State#state.update_seq,
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end,
+ timer:send_after(5000, start_listener),
+ {noreply, State#state{update_seq = Seq}};
+handle_info(start_listener, #state{update_seq = Seq} = State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+initialize_nodelist() ->
+ DbName = couch_config:get("mem3", "nodedb", "nodes"),
+ {ok, Db} = ensure_exists(DbName),
+ {ok, _, Nodes0} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, [], []),
+ % add self if not already present
+ case lists:member(node(), Nodes0) of
+ true ->
+ Nodes = Nodes0;
+ false ->
+ Doc = #doc{id = couch_util:to_binary(node())},
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ Nodes = [node() | Nodes0]
+ end,
+ couch_db:close(Db),
+ {lists:sort(Nodes), Db#db.update_seq}.
+
+first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{deleted=true}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{id=Id}, _, Acc) ->
+ {ok, [mem3_util:to_atom(Id) | Acc]}.
+
+listen_for_changes(Since) ->
+ DbName = ?l2b(couch_config:get("mem3", "nodedb", "nodes")),
+ {ok, Db} = ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+ensure_exists(DbName) when is_list(DbName) ->
+ ensure_exists(list_to_binary(DbName));
+ensure_exists(DbName) ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_db:open(DbName, Options) of
+ {ok, Db} ->
+ {ok, Db};
+ _ ->
+ couch_server:create(DbName, Options)
+ end.
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ Node = couch_util:get_value(<<"id">>, Change),
+ case Node of <<"_design/", _/binary>> -> ok; _ ->
+ case couch_util:get_value(deleted, Change, false) of
+ false ->
+ gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node)});
+ true ->
+ gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
diff --git a/apps/mem3/src/mem3_sup.erl b/apps/mem3/src/mem3_sup.erl
new file mode 100644
index 00000000..58d0bbf5
--- /dev/null
+++ b/apps/mem3/src/mem3_sup.erl
@@ -0,0 +1,21 @@
+-module(mem3_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(_Args) ->
+ Children = [
+ child(mem3_events),
+ child(mem3_sync),
+ child(mem3_cache),
+ child(mem3_nodes)
+ ],
+ {ok, {{one_for_one,10,1}, Children}}.
+
+child(mem3_events) ->
+ MFA = {gen_event, start_link, [{local, mem3_events}]},
+ {mem3_events, MFA, permanent, 1000, worker, dynamic};
+child(Child) ->
+ {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.
diff --git a/apps/mem3/src/mem3_sync.erl b/apps/mem3/src/mem3_sync.erl
new file mode 100644
index 00000000..d3b3ea51
--- /dev/null
+++ b/apps/mem3/src/mem3_sync.erl
@@ -0,0 +1,215 @@
+-module(mem3_sync).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {
+ active = [],
+ count = 0,
+ limit,
+ dict = dict:new(),
+ waiting = [],
+ update_notifier
+}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_active() ->
+ gen_server:call(?MODULE, get_active).
+
+get_queue() ->
+ gen_server:call(?MODULE, get_queue).
+
+push(Db, Node) ->
+ gen_server:cast(?MODULE, {push, Db, Node}).
+
+remove_node(Node) ->
+ gen_server:cast(?MODULE, {remove_node, Node}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
+ gen_event:add_handler(mem3_events, mem3_sync_event, []),
+ {ok, Pid} = start_update_notifier(),
+ spawn(fun initial_sync/0),
+ {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+
+handle_call(get_active, _From, State) ->
+ {reply, State#state.active, State};
+
+handle_call(get_queue, _From, State) ->
+ {reply, State#state.waiting, State}.
+
+handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State)
+ when Count >= Limit ->
+ {noreply, add_to_queue(State, DbName, Node)};
+
+handle_cast({push, DbName, Node}, State) ->
+ #state{active = L, count = C} = State,
+ case is_running(DbName, Node, L) of
+ true ->
+ {noreply, add_to_queue(State, DbName, Node)};
+ false ->
+ Pid = start_push_replication(DbName, Node),
+ {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}}
+ end;
+
+handle_cast({remove_node, Node}, State) ->
+ Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node],
+ Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end,
+ State#state.dict, [S || {S,N} <- Waiting, N =:= Node]),
+ {noreply, State#state{dict = Dict, waiting = Waiting}}.
+
+handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
+ {ok, NewPid} = start_update_notifier(),
+ {noreply, State#state{update_notifier=NewPid}};
+
+handle_info({'EXIT', Active, normal}, State) ->
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, Reason}, State) ->
+ case lists:keyfind(Active, 3, State#state.active) of
+ {OldDbName, OldNode, _} ->
+ ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName,
+ OldNode, Reason]),
+ timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]);
+ false -> ok end,
+ handle_replication_exit(State, Active);
+
+handle_info(Msg, State) ->
+ ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active],
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_replication_exit(#state{waiting=[]} = State, Pid) ->
+ NewActive = lists:keydelete(Pid, 3, State#state.active),
+ {noreply, State#state{active=NewActive, count=length(NewActive)}};
+handle_replication_exit(State, Pid) ->
+ #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
+ Active1 = lists:keydelete(Pid, 3, Active),
+ Count = length(Active1),
+ NewState = if Count < Limit ->
+ case next_replication(Active1, Waiting) of
+ nil -> % all waiting replications are also active
+ State#state{active = Active1, count = Count};
+ {DbName, Node, StillWaiting} ->
+ NewPid = start_push_replication(DbName, Node),
+ State#state{
+ active = [{DbName, Node, NewPid} | Active1],
+ count = Count+1,
+ dict = dict:erase({DbName,Node}, D),
+ waiting = StillWaiting
+ }
+ end;
+ true ->
+ State#state{active = Active1, count=Count}
+ end,
+ {noreply, NewState}.
+
+start_push_replication(DbName, Node) ->
+ PostBody = {[
+ {<<"source">>, DbName},
+ {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}},
+ {<<"continuous">>, false},
+ {<<"async">>, true}
+ ]},
+ ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]),
+ UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]},
+ case (catch couch_rep:replicate(PostBody, UserCtx)) of
+ Pid when is_pid(Pid) ->
+ link(Pid),
+ Pid;
+ {db_not_found, _Msg} ->
+ case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ % source exists, let's (re)create the target
+ couch_db:close(Db),
+ case rpc:call(Node, couch_api, create_db, [DbName, []]) of
+ {ok, Target} ->
+ ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName,
+ Node]),
+ couch_db:close(Target),
+ start_push_replication(DbName, Node);
+ file_exists ->
+ start_push_replication(DbName, Node);
+ Error ->
+ ?LOG_ERROR("~p couldn't create ~s on ~p because ~p",
+ [?MODULE, DbName, Node, Error]),
+ exit(shutdown)
+ end;
+ {not_found, no_db_file} ->
+ % source is gone, so this is a hack to skip it
+ ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted",
+ [?MODULE, DbName, Node]),
+ spawn_link(fun() -> ok end)
+ end;
+ {node_not_connected, _} ->
+ % we'll get this one when the node rejoins
+ ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]),
+ spawn_link(fun() -> ok end);
+ CatchAll ->
+ ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]),
+ case lists:member(Node, nodes()) of
+ true ->
+ timer:apply_after(5000, ?MODULE, push, [DbName, Node]);
+ false ->
+ ok
+ end,
+ spawn_link(fun() -> ok end)
+ end.
+
+add_to_queue(State, DbName, Node) ->
+ #state{dict=D, waiting=Waiting} = State,
+ case dict:is_key({DbName, Node}, D) of
+ true ->
+ State;
+ false ->
+ ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]),
+ State#state{
+ dict = dict:store({DbName,Node}, ok, D),
+ waiting = Waiting ++ [{DbName,Node}]
+ }
+ end.
+
+initial_sync() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)].
+
+start_update_notifier() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ couch_db_update_notifier:start_link(fun
+ ({updated, Db}) when Db == Db1; Db == Db2 ->
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
+ (_) -> ok end).
+
+%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
+%% which does not correspond to an already running replication
+-spec next_replication(list(), list()) -> {binary(),node(),list()} | nil.
+next_replication(Active, Waiting) ->
+ case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of
+ {_, []} ->
+ nil;
+ {Running, [{DbName,Node}|Rest]} ->
+ {DbName, Node, Running ++ Rest}
+ end.
+
+is_running(DbName, Node, ActiveList) ->
+ [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node].
diff --git a/apps/mem3/src/mem3_sync_event.erl b/apps/mem3/src/mem3_sync_event.erl
new file mode 100644
index 00000000..45fcb8aa
--- /dev/null
+++ b/apps/mem3/src/mem3_sync_event.erl
@@ -0,0 +1,44 @@
+-module(mem3_sync_event).
+-behaviour(gen_event).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+init(_) ->
+ {ok, nil}.
+
+handle_event({add_node, Node}, State) ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]],
+ {ok, State};
+
+handle_event({nodeup, Node}, State) ->
+ case lists:member(Node, mem3:nodes()) of
+ true ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]];
+ false ->
+ ok
+ end,
+ {ok, State};
+
+handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
+
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, ok, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/apps/mem3/src/mem3_util.erl b/apps/mem3/src/mem3_util.erl
new file mode 100644
index 00000000..2ed84db6
--- /dev/null
+++ b/apps/mem3/src/mem3_util.erl
@@ -0,0 +1,139 @@
+-module(mem3_util).
+
+-export([hash/1, name_shard/1, create_partition_map/4, build_shards/2,
+ n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
+ load_shards_from_disk/1, load_shards_from_disk/2]).
+
+-define(RINGTOP, 2 bsl 31). % CRC32 space
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+hash(Item) when is_binary(Item) ->
+ erlang:crc32(Item);
+hash(Item) ->
+ erlang:crc32(term_to_binary(Item)).
+
+name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) ->
+ Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>), "/", DbName],
+ Shard#shard{name = ?l2b(Name)}.
+
+create_partition_map(DbName, N, Q, Nodes) ->
+ UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []),
+ Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]),
+ Shards1 = attach_nodes(Shards0, [], Nodes, []),
+ [name_shard(S#shard{dbname=DbName}) || S <- Shards1].
+
+make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP ->
+ Acc;
+make_key_ranges(Increment, Start, Acc) ->
+ case Start + 2*Increment of
+ X when X > ?RINGTOP ->
+ End = ?RINGTOP - 1;
+ _ ->
+ End = Start + Increment - 1
+ end,
+ make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]).
+
+attach_nodes([], Acc, _, _) ->
+ lists:reverse(Acc);
+attach_nodes(Shards, Acc, [], UsedNodes) ->
+ attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []);
+attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
+ attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
+
+write_db_doc(Doc) ->
+ {ok, Db} = couch_db:open(<<"dbs">>, []),
+ try
+ update_db_doc(Db, Doc)
+ catch conflict ->
+ ?LOG_ERROR("conflict writing db doc, must be a race", [])
+ after
+ couch_db:close(Db)
+ end.
+
+update_db_doc(Db, #doc{id=Id, body=Body} = Doc) ->
+ case couch_db:open_doc(Db, Id, []) of
+ {not_found, _} ->
+ {ok, _} = couch_db:update_doc(Db, Doc, []);
+ {ok, #doc{body=Body}} ->
+ ok;
+ {ok, OldDoc} ->
+ {ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, [])
+ end.
+
+delete_db_doc(DocId) ->
+ {ok, Db} = couch_db:open(<<"dbs">>, []),
+ try
+ delete_db_doc(Db, DocId)
+ catch conflict ->
+ ok
+ after
+ couch_db:close(Db)
+ end.
+
+delete_db_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, []) of
+ {not_found, _} ->
+ ok;
+ {ok, OldDoc} ->
+ {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, [])
+ end.
+
+build_shards(DbName, DocProps) ->
+ {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
+ lists:flatmap(fun({Node, Ranges}) ->
+ lists:map(fun(Range) ->
+ [B,E] = string:tokens(?b2l(Range), "-"),
+ Beg = httpd_util:hexlist_to_integer(B),
+ End = httpd_util:hexlist_to_integer(E),
+ name_shard(#shard{
+ dbname = DbName,
+ node = to_atom(Node),
+ range = [Beg, End]
+ })
+ end, Ranges)
+ end, ByNode).
+
+to_atom(Node) when is_binary(Node) ->
+ list_to_atom(binary_to_list(Node));
+to_atom(Node) when is_atom(Node) ->
+ Node.
+
+to_integer(N) when is_integer(N) ->
+ N;
+to_integer(N) when is_binary(N) ->
+ list_to_integer(binary_to_list(N));
+to_integer(N) when is_list(N) ->
+ list_to_integer(N).
+
+n_val(undefined, NodeCount) ->
+ n_val(couch_config:get("cluster", "n", "3"), NodeCount);
+n_val(N, NodeCount) when is_list(N) ->
+ n_val(list_to_integer(N), NodeCount);
+n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount ->
+ ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]),
+ NodeCount;
+n_val(N, _) when N < 1 ->
+ 1;
+n_val(N, _) ->
+ N.
+
+load_shards_from_disk(DbName) when is_binary(DbName) ->
+ {ok, Db} = couch_db:open(<<"dbs">>, []),
+ try load_shards_from_db(Db, DbName) after couch_db:close(Db) end.
+
+load_shards_from_db(#db{} = ShardDb, DbName) ->
+ case couch_db:open_doc(ShardDb, DbName, []) of
+ {ok, #doc{body = {Props}}} ->
+ ?LOG_INFO("dbs cache miss for ~s", [DbName]),
+ build_shards(DbName, Props);
+ {not_found, _} ->
+ erlang:error(database_does_not_exist)
+ end.
+
+load_shards_from_disk(DbName, DocId)->
+ Shards = load_shards_from_disk(DbName),
+ HashKey = hash(DocId),
+ [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E].
diff --git a/apps/mem3/test/01-config-default.ini b/apps/mem3/test/01-config-default.ini
new file mode 100644
index 00000000..757f7830
--- /dev/null
+++ b/apps/mem3/test/01-config-default.ini
@@ -0,0 +1,2 @@
+[cluster]
+n=3
diff --git a/apps/mem3/test/mem3_util_test.erl b/apps/mem3/test/mem3_util_test.erl
new file mode 100644
index 00000000..0f6d24be
--- /dev/null
+++ b/apps/mem3/test/mem3_util_test.erl
@@ -0,0 +1,140 @@
+-module(mem3_util_test).
+
+-include("mem3.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+hash_test() ->
+ ?assertEqual(1624516141,mem3_util:hash(0)),
+ ?assertEqual(3816901808,mem3_util:hash("0")),
+ ?assertEqual(3523407757,mem3_util:hash(<<0>>)),
+ ?assertEqual(4108050209,mem3_util:hash(<<"0">>)),
+ ?assertEqual(3094724072,mem3_util:hash(zero)),
+ ok.
+
+name_shard_test() ->
+ Shard1 = #shard{},
+ ?assertError(function_clause, mem3_util:name_shard(Shard1)),
+
+ Shard2 = #shard{dbname = <<"testdb">>, range = [0,100]},
+ #shard{name=Name2} = mem3_util:name_shard(Shard2),
+ ?assertEqual(<<"shards/00000000-00000064/testdb">>, Name2),
+
+ ok.
+
+create_partition_map_test() ->
+ {DbName1, N1, Q1, Nodes1} = {<<"testdb1">>, 3, 4, [a,b,c,d]},
+ Map1 = mem3_util:create_partition_map(DbName1, N1, Q1, Nodes1),
+ ?assertEqual(12, length(Map1)),
+
+ {DbName2, N2, Q2, Nodes2} = {<<"testdb2">>, 1, 1, [a,b,c,d]},
+ [#shard{name=Name2,node=Node2}] = Map2 =
+ mem3_util:create_partition_map(DbName2, N2, Q2, Nodes2),
+ ?assertEqual(1, length(Map2)),
+ ?assertEqual(<<"shards/00000000-ffffffff/testdb2">>, Name2),
+ ?assertEqual(a, Node2),
+ ok.
+
+build_shards_test() ->
+ DocProps1 =
+ [{<<"changelog">>,
+ [[<<"add">>,<<"00000000-1fffffff">>,
+ <<"dbcore@node.local">>],
+ [<<"add">>,<<"20000000-3fffffff">>,
+ <<"dbcore@node.local">>],
+ [<<"add">>,<<"40000000-5fffffff">>,
+ <<"dbcore@node.local">>],
+ [<<"add">>,<<"60000000-7fffffff">>,
+ <<"dbcore@node.local">>],
+ [<<"add">>,<<"80000000-9fffffff">>,
+ <<"dbcore@node.local">>],
+ [<<"add">>,<<"a0000000-bfffffff">>,
+ <<"dbcore@node.local">>],
+ [<<"add">>,<<"c0000000-dfffffff">>,
+ <<"dbcore@node.local">>],
+ [<<"add">>,<<"e0000000-ffffffff">>,
+ <<"dbcore@node.local">>]]},
+ {<<"by_node">>,
+ {[{<<"dbcore@node.local">>,
+ [<<"00000000-1fffffff">>,<<"20000000-3fffffff">>,
+ <<"40000000-5fffffff">>,<<"60000000-7fffffff">>,
+ <<"80000000-9fffffff">>,<<"a0000000-bfffffff">>,
+ <<"c0000000-dfffffff">>,<<"e0000000-ffffffff">>]}]}},
+ {<<"by_range">>,
+ {[{<<"00000000-1fffffff">>,[<<"dbcore@node.local">>]},
+ {<<"20000000-3fffffff">>,[<<"dbcore@node.local">>]},
+ {<<"40000000-5fffffff">>,[<<"dbcore@node.local">>]},
+ {<<"60000000-7fffffff">>,[<<"dbcore@node.local">>]},
+ {<<"80000000-9fffffff">>,[<<"dbcore@node.local">>]},
+ {<<"a0000000-bfffffff">>,[<<"dbcore@node.local">>]},
+ {<<"c0000000-dfffffff">>,[<<"dbcore@node.local">>]},
+ {<<"e0000000-ffffffff">>,[<<"dbcore@node.local">>]}]}}],
+ Shards1 = mem3_util:build_shards(<<"testdb1">>, DocProps1),
+ ExpectedShards1 =
+ [{shard,<<"shards/00000000-1fffffff/testdb1">>,
+ 'dbcore@node.local',<<"testdb1">>,
+ [0,536870911],
+ undefined},
+ {shard,<<"shards/20000000-3fffffff/testdb1">>,
+ 'dbcore@node.local',<<"testdb1">>,
+ [536870912,1073741823],
+ undefined},
+ {shard,<<"shards/40000000-5fffffff/testdb1">>,
+ 'dbcore@node.local',<<"testdb1">>,
+ [1073741824,1610612735],
+ undefined},
+ {shard,<<"shards/60000000-7fffffff/testdb1">>,
+ 'dbcore@node.local',<<"testdb1">>,
+ [1610612736,2147483647],
+ undefined},
+ {shard,<<"shards/80000000-9fffffff/testdb1">>,
+ 'dbcore@node.local',<<"testdb1">>,
+ [2147483648,2684354559],
+ undefined},
+ {shard,<<"shards/a0000000-bfffffff/testdb1">>,
+ 'dbcore@node.local',<<"testdb1">>,
+ [2684354560,3221225471],
+ undefined},
+ {shard,<<"shards/c0000000-dfffffff/testdb1">>,
+ 'dbcore@node.local',<<"testdb1">>,
+ [3221225472,3758096383],
+ undefined},
+ {shard,<<"shards/e0000000-ffffffff/testdb1">>,
+ 'dbcore@node.local',<<"testdb1">>,
+ [3758096384,4294967295],
+ undefined}],
+ ?assertEqual(ExpectedShards1, Shards1),
+ ok.
+
+
+%% n_val tests
+
+nval_test() ->
+ ?assertEqual(2, mem3_util:n_val(2,4)),
+ ?assertEqual(1, mem3_util:n_val(-1,4)),
+ ?assertEqual(4, mem3_util:n_val(6,4)),
+ ok.
+
+config_01_setup() ->
+ Ini = filename:join([code:lib_dir(mem3, test), "01-config-default.ini"]),
+ {ok, Pid} = couch_config:start_link([Ini]),
+ Pid.
+
+config_teardown(_Pid) ->
+ couch_config:stop().
+
+n_val_test_() ->
+ {"n_val tests",
+ [
+ {setup,
+ fun config_01_setup/0,
+ fun config_teardown/1,
+ fun(Pid) ->
+ {with, Pid, [
+ fun n_val_1/1
+ ]}
+ end}
+ ]
+ }.
+
+n_val_1(_Pid) ->
+ ?assertEqual(3, mem3_util:n_val(undefined, 4)).