diff options
Diffstat (limited to 'apps/mem3/src')
-rw-r--r-- | apps/mem3/src/mem3.erl | 103 | ||||
-rw-r--r-- | apps/mem3/src/mem3_app.erl | 9 | ||||
-rw-r--r-- | apps/mem3/src/mem3_cache.erl | 92 | ||||
-rw-r--r-- | apps/mem3/src/mem3_httpd.erl | 39 | ||||
-rw-r--r-- | apps/mem3/src/mem3_nodes.erl | 120 | ||||
-rw-r--r-- | apps/mem3/src/mem3_sup.erl | 21 | ||||
-rw-r--r-- | apps/mem3/src/mem3_sync.erl | 215 | ||||
-rw-r--r-- | apps/mem3/src/mem3_sync_event.erl | 44 | ||||
-rw-r--r-- | apps/mem3/src/mem3_util.erl | 139 |
9 files changed, 782 insertions, 0 deletions
diff --git a/apps/mem3/src/mem3.erl b/apps/mem3/src/mem3.erl new file mode 100644 index 00000000..1485c7fe --- /dev/null +++ b/apps/mem3/src/mem3.erl @@ -0,0 +1,103 @@ +-module(mem3). + +-export([start/0, stop/0, restart/0, nodes/0, shards/1, shards/2, + choose_shards/2]). +-export([compare_nodelists/0, compare_shards/1]). + +-include("mem3.hrl"). + +start() -> + application:start(mem3). + +stop() -> + application:stop(mem3). + +restart() -> + stop(), + start(). + +%% @doc Detailed report of cluster-wide membership state. Queries the state +%% on all member nodes and builds a dictionary with unique states as the +%% key and the nodes holding that state as the value. Also reports member +%% nodes which fail to respond and nodes which are connected but are not +%% cluster members. Useful for debugging. +-spec compare_nodelists() -> [{{cluster_nodes, [node()]} | bad_nodes + | non_member_nodes, [node()]}]. +compare_nodelists() -> + Nodes = mem3:nodes(), + AllNodes = erlang:nodes([this, visible]), + {Replies, BadNodes} = gen_server:multi_call(Nodes, mem3_nodes, get_nodelist), + Dict = lists:foldl(fun({Node, Nodelist}, D) -> + orddict:append({cluster_nodes, Nodelist}, Node, D) + end, orddict:new(), Replies), + [{non_member_nodes, AllNodes -- Nodes}, {bad_nodes, BadNodes} | Dict]. + +-spec compare_shards(DbName::iodata()) -> [{bad_nodes | [#shard{}], [node()]}]. +compare_shards(DbName) when is_list(DbName) -> + compare_shards(list_to_binary(DbName)); +compare_shards(DbName) -> + Nodes = mem3:nodes(), + {Replies, BadNodes} = rpc:multicall(mem3, shards, [DbName]), + GoodNodes = [N || N <- Nodes, not lists:member(N, BadNodes)], + Dict = lists:foldl(fun({Shards, Node}, D) -> + orddict:append(Shards, Node, D) + end, orddict:new(), lists:zip(Replies, GoodNodes)), + [{bad_nodes, BadNodes} | Dict]. + +-spec nodes() -> [node()]. +nodes() -> + mem3_nodes:get_nodelist(). + +-spec shards(DbName::iodata()) -> [#shard{}]. +shards(DbName) when is_list(DbName) -> + shards(list_to_binary(DbName)); +shards(DbName) -> + try ets:lookup(partitions, DbName) of + [] -> + mem3_util:load_shards_from_disk(DbName); + Else -> + Else + catch error:badarg -> + mem3_util:load_shards_from_disk(DbName) + end. + +-spec shards(DbName::iodata(), DocId::binary()) -> [#shard{}]. +shards(DbName, DocId) when is_list(DbName) -> + shards(list_to_binary(DbName), DocId); +shards(DbName, DocId) when is_list(DocId) -> + shards(DbName, list_to_binary(DocId)); +shards(DbName, DocId) -> + HashKey = mem3_util:hash(DocId), + Head = #shard{ + name = '_', + node = '_', + dbname = DbName, + range = ['$1','$2'], + ref = '_' + }, + Conditions = [{'<', '$1', HashKey}, {'=<', HashKey, '$2'}], + try ets:select(partitions, [{Head, Conditions, ['$_']}]) of + [] -> + mem3_util:load_shards_from_disk(DbName, DocId); + Shards -> + Shards + catch error:badarg -> + mem3_util:load_shards_from_disk(DbName, DocId) + end. + +-spec choose_shards(DbName::iodata(), Options::list()) -> [#shard{}]. +choose_shards(DbName, Options) when is_list(DbName) -> + choose_shards(list_to_binary(DbName), Options); +choose_shards(DbName, Options) -> + try shards(DbName) + catch error:E when E==database_does_not_exist; E==badarg -> + Nodes = mem3:nodes(), + NodeCount = length(Nodes), + N = mem3_util:n_val(couch_util:get_value(n, Options), NodeCount), + Q = mem3_util:to_integer(couch_util:get_value(q, Options, + couch_config:get("cluster", "q", "8"))), + % rotate to a random entry in the nodelist for even distribution + {A, B} = lists:split(crypto:rand_uniform(1,length(Nodes)+1), Nodes), + RotatedNodes = B ++ A, + mem3_util:create_partition_map(DbName, N, Q, RotatedNodes) + end. diff --git a/apps/mem3/src/mem3_app.erl b/apps/mem3/src/mem3_app.erl new file mode 100644 index 00000000..88cd1ea1 --- /dev/null +++ b/apps/mem3/src/mem3_app.erl @@ -0,0 +1,9 @@ +-module(mem3_app). +-behaviour(application). +-export([start/2, stop/1]). + +start(_Type, []) -> + mem3_sup:start_link(). + +stop([]) -> + ok. diff --git a/apps/mem3/src/mem3_cache.erl b/apps/mem3/src/mem3_cache.erl new file mode 100644 index 00000000..2a29ca4c --- /dev/null +++ b/apps/mem3/src/mem3_cache.erl @@ -0,0 +1,92 @@ +-module(mem3_cache). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0]). + +-record(state, {changes_pid}). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(partitions, [bag, public, named_table, {keypos,#shard.dbname}]), + {Pid, _} = spawn_monitor(fun() -> listen_for_changes(0) end), + {ok, #state{changes_pid = Pid}}. + +handle_call(_Call, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', _, _, Pid, {badarg, [{ets,delete,[partitions,_]}|_]}}, + #state{changes_pid=Pid} = State) -> + % fatal error, somebody deleted our ets table + {stop, ets_table_error, State}; +handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> + ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]), + Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> 0 end, + timer:send_after(5000, {start_listener, Seq}), + {noreply, State}; +handle_info({start_listener, Seq}, State) -> + {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), + {noreply, State#state{changes_pid=NewPid}}; +handle_info(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, #state{changes_pid=Pid}) -> + exit(Pid, kill), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% internal functions + +listen_for_changes(Since) -> + DbName = ?l2b(couch_config:get("mem3", "db", "dbs")), + {ok, Db} = ensure_exists(DbName), + Args = #changes_args{ + feed = "continuous", + since = Since, + heartbeat = true, + include_docs = true + }, + ChangesFun = couch_changes:handle_changes(Args, nil, Db), + ChangesFun(fun changes_callback/2). + +ensure_exists(DbName) -> + Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], + case couch_db:open(DbName, Options) of + {ok, Db} -> + {ok, Db}; + _ -> + couch_server:create(DbName, Options) + end. + +changes_callback(start, _) -> + {ok, nil}; +changes_callback({stop, EndSeq}, _) -> + exit({seq, EndSeq}); +changes_callback({change, {Change}, _}, _) -> + DbName = couch_util:get_value(<<"id">>, Change), + case couch_util:get_value(deleted, Change, false) of + true -> + ets:delete(partitions, DbName); + false -> + case couch_util:get_value(doc, Change) of + {error, Reason} -> + ?LOG_ERROR("missing partition table for ~s: ~p", [DbName, Reason]); + {Doc} -> + ets:delete(partitions, DbName), + ets:insert(partitions, mem3_util:build_shards(DbName, Doc)) + end + end, + {ok, couch_util:get_value(<<"seq">>, Change)}; +changes_callback(timeout, _) -> + {ok, nil}. diff --git a/apps/mem3/src/mem3_httpd.erl b/apps/mem3/src/mem3_httpd.erl new file mode 100644 index 00000000..cbfaea95 --- /dev/null +++ b/apps/mem3/src/mem3_httpd.erl @@ -0,0 +1,39 @@ +-module(mem3_httpd). + +-export([handle_membership_req/1]). + +%% includes +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +handle_membership_req(#httpd{method='GET', + path_parts=[<<"_membership">>]} = Req) -> + ClusterNodes = try mem3:nodes() + catch _:_ -> {ok,[]} end, + couch_httpd:send_json(Req, {[ + {all_nodes, lists:sort([node()|nodes()])}, + {cluster_nodes, lists:sort(ClusterNodes)} + ]}); +handle_membership_req(#httpd{method='GET', + path_parts=[<<"_membership">>, <<"parts">>, DbName]} = Req) -> + ClusterNodes = try mem3:nodes() + catch _:_ -> {ok,[]} end, + Shards = mem3:shards(DbName), + JsonShards = json_shards(Shards, dict:new()), + couch_httpd:send_json(Req, {[ + {all_nodes, lists:sort([node()|nodes()])}, + {cluster_nodes, lists:sort(ClusterNodes)}, + {partitions, JsonShards} + ]}). + +%% +%% internal +%% + +json_shards([], AccIn) -> + List = dict:to_list(AccIn), + {lists:sort(List)}; +json_shards([#shard{node=Node, range=[B,_E]} | Rest], AccIn) -> + HexBeg = couch_util:to_hex(<<B:32/integer>>), + json_shards(Rest, dict:append(HexBeg, Node, AccIn)). diff --git a/apps/mem3/src/mem3_nodes.erl b/apps/mem3/src/mem3_nodes.erl new file mode 100644 index 00000000..6cbf3d9a --- /dev/null +++ b/apps/mem3/src/mem3_nodes.erl @@ -0,0 +1,120 @@ +-module(mem3_nodes). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, get_nodelist/0]). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(state, {changes_pid, update_seq, nodes}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_nodelist() -> + gen_server:call(?MODULE, get_nodelist). + +init([]) -> + {Nodes, UpdateSeq} = initialize_nodelist(), + {Pid, _} = spawn_monitor(fun() -> listen_for_changes(UpdateSeq) end), + {ok, #state{changes_pid = Pid, update_seq = UpdateSeq, nodes = Nodes}}. + +handle_call(get_nodelist, _From, State) -> + {reply, State#state.nodes, State}; +handle_call({add_node, Node}, _From, #state{nodes=Nodes} = State) -> + gen_event:notify(mem3_events, {add_node, Node}), + {reply, ok, State#state{nodes = lists:umerge([Node], Nodes)}}; +handle_call({remove_node, Node}, _From, #state{nodes=Nodes} = State) -> + gen_event:notify(mem3_events, {remove_node, Node}), + {reply, ok, State#state{nodes = lists:delete(Node, Nodes)}}; +handle_call(_Call, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> + ?LOG_INFO("~p changes listener died ~p", [?MODULE, Reason]), + StartSeq = State#state.update_seq, + Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end, + timer:send_after(5000, start_listener), + {noreply, State#state{update_seq = Seq}}; +handle_info(start_listener, #state{update_seq = Seq} = State) -> + {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), + {noreply, State#state{changes_pid=NewPid}}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% internal functions + +initialize_nodelist() -> + DbName = couch_config:get("mem3", "nodedb", "nodes"), + {ok, Db} = ensure_exists(DbName), + {ok, _, Nodes0} = couch_btree:fold(Db#db.id_tree, fun first_fold/3, [], []), + % add self if not already present + case lists:member(node(), Nodes0) of + true -> + Nodes = Nodes0; + false -> + Doc = #doc{id = couch_util:to_binary(node())}, + {ok, _} = couch_db:update_doc(Db, Doc, []), + Nodes = [node() | Nodes0] + end, + couch_db:close(Db), + {lists:sort(Nodes), Db#db.update_seq}. + +first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, _, Acc) -> + {ok, Acc}; +first_fold(#full_doc_info{deleted=true}, _, Acc) -> + {ok, Acc}; +first_fold(#full_doc_info{id=Id}, _, Acc) -> + {ok, [mem3_util:to_atom(Id) | Acc]}. + +listen_for_changes(Since) -> + DbName = ?l2b(couch_config:get("mem3", "nodedb", "nodes")), + {ok, Db} = ensure_exists(DbName), + Args = #changes_args{ + feed = "continuous", + since = Since, + heartbeat = true, + include_docs = true + }, + ChangesFun = couch_changes:handle_changes(Args, nil, Db), + ChangesFun(fun changes_callback/2). + +ensure_exists(DbName) when is_list(DbName) -> + ensure_exists(list_to_binary(DbName)); +ensure_exists(DbName) -> + Options = [{user_ctx, #user_ctx{roles=[<<"_admin">>]}}], + case couch_db:open(DbName, Options) of + {ok, Db} -> + {ok, Db}; + _ -> + couch_server:create(DbName, Options) + end. + +changes_callback(start, _) -> + {ok, nil}; +changes_callback({stop, EndSeq}, _) -> + exit({seq, EndSeq}); +changes_callback({change, {Change}, _}, _) -> + Node = couch_util:get_value(<<"id">>, Change), + case Node of <<"_design/", _/binary>> -> ok; _ -> + case couch_util:get_value(deleted, Change, false) of + false -> + gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node)}); + true -> + gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)}) + end + end, + {ok, couch_util:get_value(<<"seq">>, Change)}; +changes_callback(timeout, _) -> + {ok, nil}. diff --git a/apps/mem3/src/mem3_sup.erl b/apps/mem3/src/mem3_sup.erl new file mode 100644 index 00000000..58d0bbf5 --- /dev/null +++ b/apps/mem3/src/mem3_sup.erl @@ -0,0 +1,21 @@ +-module(mem3_sup). +-behaviour(supervisor). +-export([start_link/0, init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init(_Args) -> + Children = [ + child(mem3_events), + child(mem3_sync), + child(mem3_cache), + child(mem3_nodes) + ], + {ok, {{one_for_one,10,1}, Children}}. + +child(mem3_events) -> + MFA = {gen_event, start_link, [{local, mem3_events}]}, + {mem3_events, MFA, permanent, 1000, worker, dynamic}; +child(Child) -> + {Child, {Child, start_link, []}, permanent, 1000, worker, [Child]}. diff --git a/apps/mem3/src/mem3_sync.erl b/apps/mem3/src/mem3_sync.erl new file mode 100644 index 00000000..d3b3ea51 --- /dev/null +++ b/apps/mem3/src/mem3_sync.erl @@ -0,0 +1,215 @@ +-module(mem3_sync). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]). + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-record(state, { + active = [], + count = 0, + limit, + dict = dict:new(), + waiting = [], + update_notifier +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_active() -> + gen_server:call(?MODULE, get_active). + +get_queue() -> + gen_server:call(?MODULE, get_queue). + +push(Db, Node) -> + gen_server:cast(?MODULE, {push, Db, Node}). + +remove_node(Node) -> + gen_server:cast(?MODULE, {remove_node, Node}). + +init([]) -> + process_flag(trap_exit, true), + Concurrency = couch_config:get("mem3", "sync_concurrency", "10"), + gen_event:add_handler(mem3_events, mem3_sync_event, []), + {ok, Pid} = start_update_notifier(), + spawn(fun initial_sync/0), + {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}. + +handle_call(get_active, _From, State) -> + {reply, State#state.active, State}; + +handle_call(get_queue, _From, State) -> + {reply, State#state.waiting, State}. + +handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State) + when Count >= Limit -> + {noreply, add_to_queue(State, DbName, Node)}; + +handle_cast({push, DbName, Node}, State) -> + #state{active = L, count = C} = State, + case is_running(DbName, Node, L) of + true -> + {noreply, add_to_queue(State, DbName, Node)}; + false -> + Pid = start_push_replication(DbName, Node), + {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}} + end; + +handle_cast({remove_node, Node}, State) -> + Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node], + Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end, + State#state.dict, [S || {S,N} <- Waiting, N =:= Node]), + {noreply, State#state{dict = Dict, waiting = Waiting}}. + +handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) -> + {ok, NewPid} = start_update_notifier(), + {noreply, State#state{update_notifier=NewPid}}; + +handle_info({'EXIT', Active, normal}, State) -> + handle_replication_exit(State, Active); + +handle_info({'EXIT', Active, Reason}, State) -> + case lists:keyfind(Active, 3, State#state.active) of + {OldDbName, OldNode, _} -> + ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName, + OldNode, Reason]), + timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]); + false -> ok end, + handle_replication_exit(State, Active); + +handle_info(Msg, State) -> + ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]), + {noreply, State}. + +terminate(_Reason, State) -> + [exit(Pid, shutdown) || {_,_,Pid} <- State#state.active], + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_replication_exit(#state{waiting=[]} = State, Pid) -> + NewActive = lists:keydelete(Pid, 3, State#state.active), + {noreply, State#state{active=NewActive, count=length(NewActive)}}; +handle_replication_exit(State, Pid) -> + #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State, + Active1 = lists:keydelete(Pid, 3, Active), + Count = length(Active1), + NewState = if Count < Limit -> + case next_replication(Active1, Waiting) of + nil -> % all waiting replications are also active + State#state{active = Active1, count = Count}; + {DbName, Node, StillWaiting} -> + NewPid = start_push_replication(DbName, Node), + State#state{ + active = [{DbName, Node, NewPid} | Active1], + count = Count+1, + dict = dict:erase({DbName,Node}, D), + waiting = StillWaiting + } + end; + true -> + State#state{active = Active1, count=Count} + end, + {noreply, NewState}. + +start_push_replication(DbName, Node) -> + PostBody = {[ + {<<"source">>, DbName}, + {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}}, + {<<"continuous">>, false}, + {<<"async">>, true} + ]}, + ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]), + UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]}, + case (catch couch_rep:replicate(PostBody, UserCtx)) of + Pid when is_pid(Pid) -> + link(Pid), + Pid; + {db_not_found, _Msg} -> + case couch_db:open(DbName, []) of + {ok, Db} -> + % source exists, let's (re)create the target + couch_db:close(Db), + case rpc:call(Node, couch_api, create_db, [DbName, []]) of + {ok, Target} -> + ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName, + Node]), + couch_db:close(Target), + start_push_replication(DbName, Node); + file_exists -> + start_push_replication(DbName, Node); + Error -> + ?LOG_ERROR("~p couldn't create ~s on ~p because ~p", + [?MODULE, DbName, Node, Error]), + exit(shutdown) + end; + {not_found, no_db_file} -> + % source is gone, so this is a hack to skip it + ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted", + [?MODULE, DbName, Node]), + spawn_link(fun() -> ok end) + end; + {node_not_connected, _} -> + % we'll get this one when the node rejoins + ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]), + spawn_link(fun() -> ok end); + CatchAll -> + ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]), + case lists:member(Node, nodes()) of + true -> + timer:apply_after(5000, ?MODULE, push, [DbName, Node]); + false -> + ok + end, + spawn_link(fun() -> ok end) + end. + +add_to_queue(State, DbName, Node) -> + #state{dict=D, waiting=Waiting} = State, + case dict:is_key({DbName, Node}, D) of + true -> + State; + false -> + ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]), + State#state{ + dict = dict:store({DbName,Node}, ok, D), + waiting = Waiting ++ [{DbName,Node}] + } + end. + +initial_sync() -> + Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), + Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + Nodes = mem3:nodes(), + Live = nodes(), + [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)]. + +start_update_notifier() -> + Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")), + Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")), + couch_db_update_notifier:start_link(fun + ({updated, Db}) when Db == Db1; Db == Db2 -> + Nodes = mem3:nodes(), + Live = nodes(), + [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)]; + (_) -> ok end). + +%% @doc Finds the next {DbName,Node} pair in the list of waiting replications +%% which does not correspond to an already running replication +-spec next_replication(list(), list()) -> {binary(),node(),list()} | nil. +next_replication(Active, Waiting) -> + case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of + {_, []} -> + nil; + {Running, [{DbName,Node}|Rest]} -> + {DbName, Node, Running ++ Rest} + end. + +is_running(DbName, Node, ActiveList) -> + [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node]. diff --git a/apps/mem3/src/mem3_sync_event.erl b/apps/mem3/src/mem3_sync_event.erl new file mode 100644 index 00000000..45fcb8aa --- /dev/null +++ b/apps/mem3/src/mem3_sync_event.erl @@ -0,0 +1,44 @@ +-module(mem3_sync_event). +-behaviour(gen_event). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, + code_change/3]). + +init(_) -> + {ok, nil}. + +handle_event({add_node, Node}, State) -> + Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), + Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), + [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]], + {ok, State}; + +handle_event({nodeup, Node}, State) -> + case lists:member(Node, mem3:nodes()) of + true -> + Db1 = list_to_binary(couch_config:get("mem3", "node_db", "nodes")), + Db2 = list_to_binary(couch_config:get("mem3", "shard_db", "dbs")), + [mem3_sync:push(Db, Node) || Db <- [Db1, Db2]]; + false -> + ok + end, + {ok, State}; + +handle_event({Down, Node}, State) when Down == nodedown; Down == remove_node -> + mem3_sync:remove_node(Node), + {ok, State}; + +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, ok, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/apps/mem3/src/mem3_util.erl b/apps/mem3/src/mem3_util.erl new file mode 100644 index 00000000..2ed84db6 --- /dev/null +++ b/apps/mem3/src/mem3_util.erl @@ -0,0 +1,139 @@ +-module(mem3_util). + +-export([hash/1, name_shard/1, create_partition_map/4, build_shards/2, + n_val/2, to_atom/1, to_integer/1, write_db_doc/1, delete_db_doc/1, + load_shards_from_disk/1, load_shards_from_disk/2]). + +-define(RINGTOP, 2 bsl 31). % CRC32 space + +-include("mem3.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +hash(Item) when is_binary(Item) -> + erlang:crc32(Item); +hash(Item) -> + erlang:crc32(term_to_binary(Item)). + +name_shard(#shard{dbname = DbName, range=[B,E]} = Shard) -> + Name = ["shards/", couch_util:to_hex(<<B:32/integer>>), "-", + couch_util:to_hex(<<E:32/integer>>), "/", DbName], + Shard#shard{name = ?l2b(Name)}. + +create_partition_map(DbName, N, Q, Nodes) -> + UniqueShards = make_key_ranges((?RINGTOP) div Q, 0, []), + Shards0 = lists:flatten([lists:duplicate(N, S) || S <- UniqueShards]), + Shards1 = attach_nodes(Shards0, [], Nodes, []), + [name_shard(S#shard{dbname=DbName}) || S <- Shards1]. + +make_key_ranges(_, CurrentPos, Acc) when CurrentPos >= ?RINGTOP -> + Acc; +make_key_ranges(Increment, Start, Acc) -> + case Start + 2*Increment of + X when X > ?RINGTOP -> + End = ?RINGTOP - 1; + _ -> + End = Start + Increment - 1 + end, + make_key_ranges(Increment, End+1, [#shard{range=[Start, End]} | Acc]). + +attach_nodes([], Acc, _, _) -> + lists:reverse(Acc); +attach_nodes(Shards, Acc, [], UsedNodes) -> + attach_nodes(Shards, Acc, lists:reverse(UsedNodes), []); +attach_nodes([S | Rest], Acc, [Node | Nodes], UsedNodes) -> + attach_nodes(Rest, [S#shard{node=Node} | Acc], Nodes, [Node | UsedNodes]). + +write_db_doc(Doc) -> + {ok, Db} = couch_db:open(<<"dbs">>, []), + try + update_db_doc(Db, Doc) + catch conflict -> + ?LOG_ERROR("conflict writing db doc, must be a race", []) + after + couch_db:close(Db) + end. + +update_db_doc(Db, #doc{id=Id, body=Body} = Doc) -> + case couch_db:open_doc(Db, Id, []) of + {not_found, _} -> + {ok, _} = couch_db:update_doc(Db, Doc, []); + {ok, #doc{body=Body}} -> + ok; + {ok, OldDoc} -> + {ok, _} = couch_db:update_doc(Db, OldDoc#doc{body=Body}, []) + end. + +delete_db_doc(DocId) -> + {ok, Db} = couch_db:open(<<"dbs">>, []), + try + delete_db_doc(Db, DocId) + catch conflict -> + ok + after + couch_db:close(Db) + end. + +delete_db_doc(Db, DocId) -> + case couch_db:open_doc(Db, DocId, []) of + {not_found, _} -> + ok; + {ok, OldDoc} -> + {ok, _} = couch_db:update_doc(Db, OldDoc#doc{deleted=true}, []) + end. + +build_shards(DbName, DocProps) -> + {ByNode} = couch_util:get_value(<<"by_node">>, DocProps, {[]}), + lists:flatmap(fun({Node, Ranges}) -> + lists:map(fun(Range) -> + [B,E] = string:tokens(?b2l(Range), "-"), + Beg = httpd_util:hexlist_to_integer(B), + End = httpd_util:hexlist_to_integer(E), + name_shard(#shard{ + dbname = DbName, + node = to_atom(Node), + range = [Beg, End] + }) + end, Ranges) + end, ByNode). + +to_atom(Node) when is_binary(Node) -> + list_to_atom(binary_to_list(Node)); +to_atom(Node) when is_atom(Node) -> + Node. + +to_integer(N) when is_integer(N) -> + N; +to_integer(N) when is_binary(N) -> + list_to_integer(binary_to_list(N)); +to_integer(N) when is_list(N) -> + list_to_integer(N). + +n_val(undefined, NodeCount) -> + n_val(couch_config:get("cluster", "n", "3"), NodeCount); +n_val(N, NodeCount) when is_list(N) -> + n_val(list_to_integer(N), NodeCount); +n_val(N, NodeCount) when is_integer(NodeCount), N > NodeCount -> + ?LOG_ERROR("Request to create N=~p DB but only ~p node(s)", [N, NodeCount]), + NodeCount; +n_val(N, _) when N < 1 -> + 1; +n_val(N, _) -> + N. + +load_shards_from_disk(DbName) when is_binary(DbName) -> + {ok, Db} = couch_db:open(<<"dbs">>, []), + try load_shards_from_db(Db, DbName) after couch_db:close(Db) end. + +load_shards_from_db(#db{} = ShardDb, DbName) -> + case couch_db:open_doc(ShardDb, DbName, []) of + {ok, #doc{body = {Props}}} -> + ?LOG_INFO("dbs cache miss for ~s", [DbName]), + build_shards(DbName, Props); + {not_found, _} -> + erlang:error(database_does_not_exist) + end. + +load_shards_from_disk(DbName, DocId)-> + Shards = load_shards_from_disk(DbName), + HashKey = hash(DocId), + [S || #shard{range = [B,E]} = S <- Shards, B < HashKey, HashKey =< E]. |