summaryrefslogtreecommitdiff
path: root/apps/mem3/src
diff options
context:
space:
mode:
Diffstat (limited to 'apps/mem3/src')
-rw-r--r--apps/mem3/src/mem3.erl103
-rw-r--r--apps/mem3/src/mem3_app.erl9
-rw-r--r--apps/mem3/src/mem3_cache.erl92
-rw-r--r--apps/mem3/src/mem3_httpd.erl39
-rw-r--r--apps/mem3/src/mem3_nodes.erl120
-rw-r--r--apps/mem3/src/mem3_sup.erl21
-rw-r--r--apps/mem3/src/mem3_sync.erl215
-rw-r--r--apps/mem3/src/mem3_sync_event.erl44
-rw-r--r--apps/mem3/src/mem3_util.erl139
9 files changed, 782 insertions, 0 deletions
diff --git a/apps/mem3/src/mem3.erl b/apps/mem3/src/mem3.erl
new file mode 100644
index 00000000..1485c7fe
--- /dev/null
+++ b/apps/mem3/src/mem3.erl
@@ -0,0 +1,103 @@
+-module(mem3).
+
+-export([start/0, stop/0, restart/0, nodes/0, shards/1, shards/2,
+ choose_shards/2]).
+-export([compare_nodelists/0, compare_shards/1]).
+
+-include("mem3.hrl").
+
+start() ->
+ application:start(mem3).
+
+stop() ->
+ application:stop(mem3).
+
+restart() ->
+ stop(),
+ start().
+
+%% @doc Detailed report of cluster-wide membership state. Queries the state
+%% on all member nodes and builds a dictionary with unique states as the
+%% key and the nodes holding that state as the value. Also reports member
+%% nodes which fail to respond and nodes which are connected but are not
+%% cluster members. Useful for debugging.
+-spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes
+ | non_member_nodes, [node()]}].
+compare_nodelists() ->
+ Nodes = mem3:nodes(),
+ AllNodes = erlang:nodes([this, visible]),
+ {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist),
+ Dict = lists:foldl(fun({Node, Nodelist}, D) ->
+ orddict:append({cluster_nodes, Nodelist}, Node, D)
+ end, orddict:new(), Replies),
+ [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict].
+
+-spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}].
+compare_shards(DbName) when is_list(DbName) ->
+ compare_shards(list_to_binary(DbName));
+compare_shards(DbName) ->
+ Nodes = mem3:nodes(),
+ {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]),
+ GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)],
+ Dict = lists:foldl(fun({Shards, Node}, D) ->
+ orddict:append(Shards, Node, D)
+ end, orddict:new(), lists:zip(Replies, GoodNodes)),
+ [{bad_nodes, BadNodes} | Dict].
+
+-spec nodes() -> [node()].
+nodes() ->
+ mem3_nodes:get_nodelist().
+
+-spec shards(DbName::iodata()) -> [#shard{}].
+shards(DbName) when is_list(DbName) ->
+ shards(list_to_binary(DbName));
+shards(DbName) ->
+ try ets:lookup(partitions, DbName) of
+ [] ->
+ mem3_util:load_shards_from_disk(DbName);
+ Else ->
+ Else
+ catch error:badarg ->
+ mem3_util:load_shards_from_disk(DbName)
+ end.
+
+-spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+shards(DbName, DocId) when is_list(DbName) ->
+ shards(list_to_binary(DbName), DocId);
+shards(DbName, DocId) when is_list(DocId) ->
+ shards(DbName, list_to_binary(DocId));
+shards(DbName, DocId) ->
+ HashKey = mem3_util:hash(DocId),
+ Head = #shard{
+ name = '_',
+ node = '_',
+ dbname = DbName,
+ range = ['$1','$2'],
+ ref = '_'
+ },
+ Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}],
+ try ets:select(partitions, [{Head, Conditions, ['$_']}]) of
+ [] ->
+ mem3_util:load_shards_from_disk(DbName, DocId);
+ Shards ->
+ Shards
+ catch error:badarg ->
+ mem3_util:load_shards_from_disk(DbName, DocId)
+ end.
+
+-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}].
+choose_shards(DbName, Options) when is_list(DbName) ->
+ choose_shards(list_to_binary(DbName), Options);
+choose_shards(DbName, Options) ->
+ try shards(DbName)
+ catch error:E when E==database_does_not_exist; E==badarg ->
+ Nodes = mem3:nodes(),
+ NodeCount = length(Nodes),
+ N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount),
+ Q = mem3_util:to_integer(couch_util:get_value(q, Options,
+ couch_config:get("cluster", "q", "8"))),
+ % rotate to a random entry in the nodelist for even distribution
+ {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes),
+ RotatedNodes = B ++ A,
+ mem3_util:create_partition_map(DbName, N, Q, RotatedNodes)
+ end.
diff --git a/apps/mem3/src/mem3_app.erl b/apps/mem3/src/mem3_app.erl
new file mode 100644
index 00000000..88cd1ea1
--- /dev/null
+++ b/apps/mem3/src/mem3_app.erl
@@ -0,0 +1,9 @@
+-module(mem3_app).
+-behaviour(application).
+-export([start/2, stop/1]).
+
+start(_Type, []) ->
+ mem3_sup:start_link().
+
+stop([]) ->
+ ok.
diff --git a/apps/mem3/src/mem3_cache.erl b/apps/mem3/src/mem3_cache.erl
new file mode 100644
index 00000000..2a29ca4c
--- /dev/null
+++ b/apps/mem3/src/mem3_cache.erl
@@ -0,0 +1,92 @@
+-module(mem3_cache).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0]).
+
+-record(state, {changes_pid}).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end),
+ {ok, #state{changes_pid = Pid}}.
+
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}},
+ #state{changes_pid=Pid} = State) ->
+ % fatal error, somebody deleted our ets table
+ {stop, ets_table_error, State};
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end,
+ timer:send_after(5000, {start_listener, Seq}),
+ {noreply, State};
+handle_info({start_listener, Seq}, State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #state{changes_pid=Pid}) ->
+ exit(Pid, kill),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+listen_for_changes(Since) ->
+ DbName = ?l2b(couch_config:get("mem3", "db", "dbs")),
+ {ok, Db} = ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+ensure_exists(DbName) ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_db:open(DbName, Options) of
+ {ok, Db} ->
+ {ok, Db};
+ _ ->
+ couch_server:create(DbName, Options)
+ end.
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ DbName = couch_util:get_value(<<"id">>, Change),
+ case couch_util:get_value(deleted, Change, false) of
+ true ->
+ ets:delete(partitions, DbName);
+ false ->
+ case couch_util:get_value(doc, Change) of
+ {error, Reason} ->
+ ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]);
+ {Doc} ->
+ ets:delete(partitions, DbName),
+ ets:insert(partitions, mem3_util:build_shards(DbName, Doc))
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
diff --git a/apps/mem3/src/mem3_httpd.erl b/apps/mem3/src/mem3_httpd.erl
new file mode 100644
index 00000000..cbfaea95
--- /dev/null
+++ b/apps/mem3/src/mem3_httpd.erl
@@ -0,0 +1,39 @@
+-module(mem3_httpd).
+
+-export([handle_membership_req/1]).
+
+%% includes
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)}
+ ]});
+handle_membership_req(#httpd{method='GET',
+ path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) ->
+ ClusterNodes = try mem3:nodes()
+ catch _:_ -> {ok,[]} end,
+ Shards = mem3:shards(DbName),
+ JsonShards = json_shards(Shards, dict:new()),
+ couch_httpd:send_json(Req, {[
+ {all_nodes, lists:sort([node()|nodes()])},
+ {cluster_nodes, lists:sort(ClusterNodes)},
+ {partitions, JsonShards}
+ ]}).
+
+%%
+%% internal
+%%
+
+json_shards([], AccIn) ->
+ List = dict:to_list(AccIn),
+ {lists:sort(List)};
+json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) ->
+ HexBeg = couch_util:to_hex(<<B:32/integer>>),
+ json_shards(Rest, dict:append(HexBeg, Node, AccIn)).
diff --git a/apps/mem3/src/mem3_nodes.erl b/apps/mem3/src/mem3_nodes.erl
new file mode 100644
index 00000000..6cbf3d9a
--- /dev/null
+++ b/apps/mem3/src/mem3_nodes.erl
@@ -0,0 +1,120 @@
+-module(mem3_nodes).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_nodelist/0]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {changes_pid, update_seq, nodes}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_nodelist() ->
+ gen_server:call(?MODULE, get_nodelist).
+
+init([]) ->
+ {Nodes, UpdateSeq} = initialize_nodelist(),
+ {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end),
+ {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}.
+
+handle_call(get_nodelist, _From, State) ->
+ {reply, State#state.nodes, State};
+handle_call({add_node, Node}, _From, #state{nodes=Nodes} = State) ->
+ gen_event:notify(mem3_events, {add_node, Node}),
+ {reply, ok, State#state{nodes = lists:umerge([Node], Nodes)}};
+handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) ->
+ gen_event:notify(mem3_events, {remove_node, Node}),
+ {reply, ok, State#state{nodes = lists:delete(Node, Nodes)}};
+handle_call(_Call, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+ ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]),
+ StartSeq = State#state.update_seq,
+ Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end,
+ timer:send_after(5000, start_listener),
+ {noreply, State#state{update_seq = Seq}};
+handle_info(start_listener, #state{update_seq = Seq} = State) ->
+ {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
+ {noreply, State#state{changes_pid=NewPid}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% internal functions
+
+initialize_nodelist() ->
+ DbName = couch_config:get("mem3", "nodedb", "nodes"),
+ {ok, Db} = ensure_exists(DbName),
+ {ok, _, Nodes0} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, [], []),
+ % add self if not already present
+ case lists:member(node(), Nodes0) of
+ true ->
+ Nodes = Nodes0;
+ false ->
+ Doc = #doc{id = couch_util:to_binary(node())},
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ Nodes = [node() | Nodes0]
+ end,
+ couch_db:close(Db),
+ {lists:sort(Nodes), Db#db.update_seq}.
+
+first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{deleted=true}, _, Acc) ->
+ {ok, Acc};
+first_fold(#full_doc_info{id=Id}, _, Acc) ->
+ {ok, [mem3_util:to_atom(Id) | Acc]}.
+
+listen_for_changes(Since) ->
+ DbName = ?l2b(couch_config:get("mem3", "nodedb", "nodes")),
+ {ok, Db} = ensure_exists(DbName),
+ Args = #changes_args{
+ feed = "continuous",
+ since = Since,
+ heartbeat = true,
+ include_docs = true
+ },
+ ChangesFun = couch_changes:handle_changes(Args, nil, Db),
+ ChangesFun(fun changes_callback/2).
+
+ensure_exists(DbName) when is_list(DbName) ->
+ ensure_exists(list_to_binary(DbName));
+ensure_exists(DbName) ->
+ Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}],
+ case couch_db:open(DbName, Options) of
+ {ok, Db} ->
+ {ok, Db};
+ _ ->
+ couch_server:create(DbName, Options)
+ end.
+
+changes_callback(start, _) ->
+ {ok, nil};
+changes_callback({stop, EndSeq}, _) ->
+ exit({seq, EndSeq});
+changes_callback({change, {Change}, _}, _) ->
+ Node = couch_util:get_value(<<"id">>, Change),
+ case Node of <<"_design/", _/binary>> -> ok; _ ->
+ case couch_util:get_value(deleted, Change, false) of
+ false ->
+ gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node)});
+ true ->
+ gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
+ end
+ end,
+ {ok, couch_util:get_value(<<"seq">>, Change)};
+changes_callback(timeout, _) ->
+ {ok, nil}.
diff --git a/apps/mem3/src/mem3_sup.erl b/apps/mem3/src/mem3_sup.erl
new file mode 100644
index 00000000..58d0bbf5
--- /dev/null
+++ b/apps/mem3/src/mem3_sup.erl
@@ -0,0 +1,21 @@
+-module(mem3_sup).
+-behaviour(supervisor).
+-export([start_link/0, init/1]).
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(_Args) ->
+ Children = [
+ child(mem3_events),
+ child(mem3_sync),
+ child(mem3_cache),
+ child(mem3_nodes)
+ ],
+ {ok, {{one_for_one,10,1}, Children}}.
+
+child(mem3_events) ->
+ MFA = {gen_event, start_link, [{local, mem3_events}]},
+ {mem3_events, MFA, permanent, 1000, worker, dynamic};
+child(Child) ->
+ {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}.
diff --git a/apps/mem3/src/mem3_sync.erl b/apps/mem3/src/mem3_sync.erl
new file mode 100644
index 00000000..d3b3ea51
--- /dev/null
+++ b/apps/mem3/src/mem3_sync.erl
@@ -0,0 +1,215 @@
+-module(mem3_sync).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]).
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-record(state, {
+ active = [],
+ count = 0,
+ limit,
+ dict = dict:new(),
+ waiting = [],
+ update_notifier
+}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+get_active() ->
+ gen_server:call(?MODULE, get_active).
+
+get_queue() ->
+ gen_server:call(?MODULE, get_queue).
+
+push(Db, Node) ->
+ gen_server:cast(?MODULE, {push, Db, Node}).
+
+remove_node(Node) ->
+ gen_server:cast(?MODULE, {remove_node, Node}).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
+ gen_event:add_handler(mem3_events, mem3_sync_event, []),
+ {ok, Pid} = start_update_notifier(),
+ spawn(fun initial_sync/0),
+ {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+
+handle_call(get_active, _From, State) ->
+ {reply, State#state.active, State};
+
+handle_call(get_queue, _From, State) ->
+ {reply, State#state.waiting, State}.
+
+handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State)
+ when Count >= Limit ->
+ {noreply, add_to_queue(State, DbName, Node)};
+
+handle_cast({push, DbName, Node}, State) ->
+ #state{active = L, count = C} = State,
+ case is_running(DbName, Node, L) of
+ true ->
+ {noreply, add_to_queue(State, DbName, Node)};
+ false ->
+ Pid = start_push_replication(DbName, Node),
+ {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}}
+ end;
+
+handle_cast({remove_node, Node}, State) ->
+ Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node],
+ Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end,
+ State#state.dict, [S || {S,N} <- Waiting, N =:= Node]),
+ {noreply, State#state{dict = Dict, waiting = Waiting}}.
+
+handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
+ {ok, NewPid} = start_update_notifier(),
+ {noreply, State#state{update_notifier=NewPid}};
+
+handle_info({'EXIT', Active, normal}, State) ->
+ handle_replication_exit(State, Active);
+
+handle_info({'EXIT', Active, Reason}, State) ->
+ case lists:keyfind(Active, 3, State#state.active) of
+ {OldDbName, OldNode, _} ->
+ ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName,
+ OldNode, Reason]),
+ timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]);
+ false -> ok end,
+ handle_replication_exit(State, Active);
+
+handle_info(Msg, State) ->
+ ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active],
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_replication_exit(#state{waiting=[]} = State, Pid) ->
+ NewActive = lists:keydelete(Pid, 3, State#state.active),
+ {noreply, State#state{active=NewActive, count=length(NewActive)}};
+handle_replication_exit(State, Pid) ->
+ #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
+ Active1 = lists:keydelete(Pid, 3, Active),
+ Count = length(Active1),
+ NewState = if Count < Limit ->
+ case next_replication(Active1, Waiting) of
+ nil -> % all waiting replications are also active
+ State#state{active = Active1, count = Count};
+ {DbName, Node, StillWaiting} ->
+ NewPid = start_push_replication(DbName, Node),
+ State#state{
+ active = [{DbName, Node, NewPid} | Active1],
+ count = Count+1,
+ dict = dict:erase({DbName,Node}, D),
+ waiting = StillWaiting
+ }
+ end;
+ true ->
+ State#state{active = Active1, count=Count}
+ end,
+ {noreply, NewState}.
+
+start_push_replication(DbName, Node) ->
+ PostBody = {[
+ {<<"source">>, DbName},
+ {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}},
+ {<<"continuous">>, false},
+ {<<"async">>, true}
+ ]},
+ ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]),
+ UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]},
+ case (catch couch_rep:replicate(PostBody, UserCtx)) of
+ Pid when is_pid(Pid) ->
+ link(Pid),
+ Pid;
+ {db_not_found, _Msg} ->
+ case couch_db:open(DbName, []) of
+ {ok, Db} ->
+ % source exists, let's (re)create the target
+ couch_db:close(Db),
+ case rpc:call(Node, couch_api, create_db, [DbName, []]) of
+ {ok, Target} ->
+ ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName,
+ Node]),
+ couch_db:close(Target),
+ start_push_replication(DbName, Node);
+ file_exists ->
+ start_push_replication(DbName, Node);
+ Error ->
+ ?LOG_ERROR("~p couldn't create ~s on ~p because ~p",
+ [?MODULE, DbName, Node, Error]),
+ exit(shutdown)
+ end;
+ {not_found, no_db_file} ->
+ % source is gone, so this is a hack to skip it
+ ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted",
+ [?MODULE, DbName, Node]),
+ spawn_link(fun() -> ok end)
+ end;
+ {node_not_connected, _} ->
+ % we'll get this one when the node rejoins
+ ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]),
+ spawn_link(fun() -> ok end);
+ CatchAll ->
+ ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]),
+ case lists:member(Node, nodes()) of
+ true ->
+ timer:apply_after(5000, ?MODULE, push, [DbName, Node]);
+ false ->
+ ok
+ end,
+ spawn_link(fun() -> ok end)
+ end.
+
+add_to_queue(State, DbName, Node) ->
+ #state{dict=D, waiting=Waiting} = State,
+ case dict:is_key({DbName, Node}, D) of
+ true ->
+ State;
+ false ->
+ ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]),
+ State#state{
+ dict = dict:store({DbName,Node}, ok, D),
+ waiting = Waiting ++ [{DbName,Node}]
+ }
+ end.
+
+initial_sync() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)].
+
+start_update_notifier() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ couch_db_update_notifier:start_link(fun
+ ({updated, Db}) when Db == Db1; Db == Db2 ->
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
+ (_) -> ok end).
+
+%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
+%% which does not correspond to an already running replication
+-spec next_replication(list(), list()) -> {binary(),node(),list()} | nil.
+next_replication(Active, Waiting) ->
+ case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of
+ {_, []} ->
+ nil;
+ {Running, [{DbName,Node}|Rest]} ->
+ {DbName, Node, Running ++ Rest}
+ end.
+
+is_running(DbName, Node, ActiveList) ->
+ [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node].
diff --git a/apps/mem3/src/mem3_sync_event.erl b/apps/mem3/src/mem3_sync_event.erl
new file mode 100644
index 00000000..45fcb8aa
--- /dev/null
+++ b/apps/mem3/src/mem3_sync_event.erl
@@ -0,0 +1,44 @@
+-module(mem3_sync_event).
+-behaviour(gen_event).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+init(_) ->
+ {ok, nil}.
+
+handle_event({add_node, Node}, State) ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]],
+ {ok, State};
+
+handle_event({nodeup, Node}, State) ->
+ case lists:member(Node, mem3:nodes()) of
+ true ->
+ Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")),
+ [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]];
+ false ->
+ ok
+ end,
+ {ok, State};
+
+handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node ->
+ mem3_sync:remove_node(Node),
+ {ok, State};
+
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, ok, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/apps/mem3/src/mem3_util.erl b/apps/mem3/src/mem3_util.erl
new file mode 100644
index 00000000..2ed84db6
--- /dev/null
+++ b/apps/mem3/src/mem3_util.erl
@@ -0,0 +1,139 @@
+-module(mem3_util).
+
+-export([hash/1, name_shard/1, create_partition_map/4, build_shards/2,
+ n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1,
+ load_shards_from_disk/1, load_shards_from_disk/2]).
+
+-define(RINGTOP, 2 bsl 31). % CRC32 space
+
+-include("mem3.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+hash(Item) when is_binary(Item) ->
+ erlang:crc32(Item);
+hash(Item) ->
+ erlang:crc32(term_to_binary(Item)).
+
+name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) ->
+ Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-",
+ couch_util:to_hex(<<E:32/integer>>), "/", DbName],
+ Shard#shard{name = ?l2b(Name)}.
+
+create_partition_map(DbName, N, Q, Nodes) ->
+ UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []),
+ Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]),
+ Shards1 = attach_nodes(Shards0, [], Nodes, []),
+ [name_shard(S#shard{dbname=DbName}) || S <- Shards1].
+
+make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP ->
+ Acc;
+make_key_ranges(Increment, Start, Acc) ->
+ case Start + 2*Increment of
+ X when X > ?RINGTOP ->
+ End = ?RINGTOP - 1;
+ _ ->
+ End = Start + Increment - 1
+ end,
+ make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]).
+
+attach_nodes([], Acc, _, _) ->
+ lists:reverse(Acc);
+attach_nodes(Shards, Acc, [], UsedNodes) ->
+ attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []);
+attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) ->
+ attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]).
+
+write_db_doc(Doc) ->
+ {ok, Db} = couch_db:open(<<"dbs">>, []),
+ try
+ update_db_doc(Db, Doc)
+ catch conflict ->
+ ?LOG_ERROR("conflict writing db doc, must be a race", [])
+ after
+ couch_db:close(Db)
+ end.
+
+update_db_doc(Db, #doc{id=Id, body=Body} = Doc) ->
+ case couch_db:open_doc(Db, Id, []) of
+ {not_found, _} ->
+ {ok, _} = couch_db:update_doc(Db, Doc, []);
+ {ok, #doc{body=Body}} ->
+ ok;
+ {ok, OldDoc} ->
+ {ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, [])
+ end.
+
+delete_db_doc(DocId) ->
+ {ok, Db} = couch_db:open(<<"dbs">>, []),
+ try
+ delete_db_doc(Db, DocId)
+ catch conflict ->
+ ok
+ after
+ couch_db:close(Db)
+ end.
+
+delete_db_doc(Db, DocId) ->
+ case couch_db:open_doc(Db, DocId, []) of
+ {not_found, _} ->
+ ok;
+ {ok, OldDoc} ->
+ {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, [])
+ end.
+
+build_shards(DbName, DocProps) ->
+ {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}),
+ lists:flatmap(fun({Node, Ranges}) ->
+ lists:map(fun(Range) ->
+ [B,E] = string:tokens(?b2l(Range), "-"),
+ Beg = httpd_util:hexlist_to_integer(B),
+ End = httpd_util:hexlist_to_integer(E),
+ name_shard(#shard{
+ dbname = DbName,
+ node = to_atom(Node),
+ range = [Beg, End]
+ })
+ end, Ranges)
+ end, ByNode).
+
+to_atom(Node) when is_binary(Node) ->
+ list_to_atom(binary_to_list(Node));
+to_atom(Node) when is_atom(Node) ->
+ Node.
+
+to_integer(N) when is_integer(N) ->
+ N;
+to_integer(N) when is_binary(N) ->
+ list_to_integer(binary_to_list(N));
+to_integer(N) when is_list(N) ->
+ list_to_integer(N).
+
+n_val(undefined, NodeCount) ->
+ n_val(couch_config:get("cluster", "n", "3"), NodeCount);
+n_val(N, NodeCount) when is_list(N) ->
+ n_val(list_to_integer(N), NodeCount);
+n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount ->
+ ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]),
+ NodeCount;
+n_val(N, _) when N < 1 ->
+ 1;
+n_val(N, _) ->
+ N.
+
+load_shards_from_disk(DbName) when is_binary(DbName) ->
+ {ok, Db} = couch_db:open(<<"dbs">>, []),
+ try load_shards_from_db(Db, DbName) after couch_db:close(Db) end.
+
+load_shards_from_db(#db{} = ShardDb, DbName) ->
+ case couch_db:open_doc(ShardDb, DbName, []) of
+ {ok, #doc{body = {Props}}} ->
+ ?LOG_INFO("dbs cache miss for ~s", [DbName]),
+ build_shards(DbName, Props);
+ {not_found, _} ->
+ erlang:error(database_does_not_exist)
+ end.
+
+load_shards_from_disk(DbName, DocId)->
+ Shards = load_shards_from_disk(DbName),
+ HashKey = hash(DocId),
+ [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E].