path: root/src/mem3_sync.erl
1 files changed, 205 insertions, 37 deletions
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
--export([start_link/0, init/1, childspec/1, sup_upgrade_notify/2]).
+-export([start_link/0, get_active/0, get_queue/0, push/2, remove_node/1]).
+-record(state, {
+ active = [],
+ count = 0,
+ limit,
+ dict = dict:new(),
+ waiting = [],
+ update_notifier
start_link() ->
- supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+get_active() ->
+ gen_server:call(?MODULE, get_active).
+get_queue() ->
+ gen_server:call(?MODULE, get_queue).
+push(Db, Node) ->
+ gen_server:cast(?MODULE, {push, Db, Node}).
+remove_node(Node) ->
+ gen_server:cast(?MODULE, {remove_node, Node}).
init([]) ->
- {ok, MemNodes} = mem3:nodes(),
- LiveNodes = nodes(),
- ChildSpecs = [childspec(N) || N <- MemNodes, lists:member(N, LiveNodes)],
- gen_event:add_handler(membership_events, dbs_event, []),
- {ok, {{one_for_one, 10, 8}, ChildSpecs}}.
-childspec(Node) ->
- ?LOG_INFO("dbs repl ~p --> ~p starting", [node(), Node]),
+ process_flag(trap_exit, true),
+ Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
+ gen_event:add_handler(mem3_events, mem3_sync_event, []),
+ {ok, Pid} = start_update_notifier(),
+ spawn(fun initial_sync/0),
+ {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+handle_call(get_active, _From, State) ->
+ {reply,, State};
+handle_call(get_queue, _From, State) ->
+ {reply, State#state.waiting, State}.
+handle_cast({push, DbName, Node}, #state{count=Count, limit=Limit} = State)
+ when Count >= Limit ->
+ {noreply, add_to_queue(State, DbName, Node)};
+handle_cast({push, DbName, Node}, State) ->
+ #state{active = L, count = C} = State,
+ case is_running(DbName, Node, L) of
+ true ->
+ {noreply, add_to_queue(State, DbName, Node)};
+ false ->
+ Pid = start_push_replication(DbName, Node),
+ {noreply, State#state{active=[{DbName, Node, Pid}|L], count=C+1}}
+ end;
+handle_cast({remove_node, Node}, State) ->
+ Waiting = [{S,N} || {S,N} <- State#state.waiting, N =/= Node],
+ Dict = lists:foldl(fun(DbName,D) -> dict:erase({DbName,Node}, D) end,
+ State#state.dict, [S || {S,N} <- Waiting, N =:= Node]),
+ {noreply, State#state{dict = Dict, waiting = Waiting}}.
+handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
+ {ok, NewPid} = start_update_notifier(),
+ {noreply, State#state{update_notifier=NewPid}};
+handle_info({'EXIT', Active, normal}, State) ->
+ handle_replication_exit(State, Active);
+handle_info({'EXIT', Active, Reason}, State) ->
+ case lists:keyfind(Active, 3, of
+ {OldDbName, OldNode, _} ->
+ ?LOG_ERROR("~p replication ~s -> ~p died:~n~p", [?MODULE, OldDbName,
+ OldNode, Reason]),
+ timer:apply_after(5000, ?MODULE, push, [OldDbName, OldNode]);
+ false -> ok end,
+ handle_replication_exit(State, Active);
+handle_info(Msg, State) ->
+ ?LOG_ERROR("unexpected msg at replication manager ~p", [Msg]),
+ {noreply, State}.
+terminate(_Reason, State) ->
+ [exit(Pid, shutdown) || {_,_,Pid} <-],
+ ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+handle_replication_exit(#state{waiting=[]} = State, Pid) ->
+ NewActive = lists:keydelete(Pid, 3,,
+ {noreply, State#state{active=NewActive, count=length(NewActive)}};
+handle_replication_exit(State, Pid) ->
+ #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
+ Active1 = lists:keydelete(Pid, 3, Active),
+ Count = length(Active1),
+ NewState = if Count < Limit ->
+ case next_replication(Active1, Waiting) of
+ nil -> % all waiting replications are also active
+ State#state{active = Active1, count = Count};
+ {DbName, Node, StillWaiting} ->
+ NewPid = start_push_replication(DbName, Node),
+ State#state{
+ active = [{DbName, Node, NewPid} | Active1],
+ count = Count+1,
+ dict = dict:erase({DbName,Node}, D),
+ waiting = StillWaiting
+ }
+ end;
+ true ->
+ State#state{active = Active1, count=Count}
+ end,
+ {noreply, NewState}.
+start_push_replication(DbName, Node) ->
PostBody = {[
- {<<"source">>, <<"dbs">>},
- {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, <<"dbs">>}]}},
- {<<"continuous">>, true}
+ {<<"source">>, DbName},
+ {<<"target">>, {[{<<"node">>, Node}, {<<"name">>, DbName}]}},
+ {<<"continuous">>, false},
+ {<<"async">>, true}
- Id = couch_util:to_hex(erlang:md5(term_to_binary([node(), Node]))),
- MFA = {couch_rep, start_link, [Id, PostBody, #user_ctx{}]},
- {Node, MFA, permanent, 100, worker, [couch_rep]}.
-% from
-sup_upgrade_notify (_Old, _New) ->
- {ok, {_, Specs}} = init([]),
- Old = sets:from_list(
- [Name || {Name, _, _, _} <- supervisor:which_children(?MODULE)]),
- New = sets:from_list([Name || {Name, _, _, _, _, _} <- Specs]),
- Kill = sets:subtract(Old, New),
- sets:fold(fun(Id, ok) ->
- supervisor:terminate_child(?MODULE, Id),
- supervisor:delete_child(?MODULE, Id),
- ok
- end,
- ok,
- Kill),
- [supervisor:start_child (?MODULE, Spec) || Spec <- Specs ],
- ok.
+ ?LOG_INFO("starting ~s -> ~p internal replication", [DbName, Node]),
+ UserCtx = #user_ctx{name = <<"replicator">>, roles = [<<"_admin">>]},
+ case (catch couch_rep:replicate(PostBody, UserCtx)) of
+ Pid when is_pid(Pid) ->
+ link(Pid),
+ Pid;
+ {db_not_found, _Msg} ->
+ case couch_api:open_db(DbName, []) of
+ {ok, Db} ->
+ % source exists, let's (re)create the target
+ couch_api:close_db(Db),
+ case rpc:call(Node, couch_api, create_db, [DbName, []]) of
+ {ok, Target} ->
+ ?LOG_INFO("~p successfully created ~s on ~p", [?MODULE, DbName,
+ Node]),
+ couch_api:close_db(Target),
+ start_push_replication(DbName, Node);
+ file_exists ->
+ start_push_replication(DbName, Node);
+ Error ->
+ ?LOG_ERROR("~p couldn't create ~s on ~p because ~p",
+ [?MODULE, DbName, Node, Error]),
+ exit(shutdown)
+ end;
+ {not_found, no_db_file} ->
+ % source is gone, so this is a hack to skip it
+ ?LOG_INFO("~p tried to push ~s to ~p but it was already deleted",
+ [?MODULE, DbName, Node]),
+ spawn_link(fun() -> ok end)
+ end;
+ {node_not_connected, _} ->
+ % we'll get this one when the node rejoins
+ ?LOG_ERROR("~p exiting because ~p is not connected", [?MODULE, Node]),
+ spawn_link(fun() -> ok end);
+ CatchAll ->
+ ?LOG_INFO("~p strange error ~p", [?MODULE, CatchAll]),
+ case lists:member(Node, nodes()) of
+ true ->
+ timer:apply_after(5000, ?MODULE, push, [DbName, Node]);
+ false ->
+ ok
+ end,
+ spawn_link(fun() -> ok end)
+ end.
+add_to_queue(State, DbName, Node) ->
+ #state{dict=D, waiting=Waiting} = State,
+ case dict:is_key({DbName, Node}, D) of
+ true ->
+ State;
+ false ->
+ ?LOG_DEBUG("adding ~s -> ~p to internal queue", [DbName, Node]),
+ State#state{
+ dict = dict:store({DbName,Node}, ok, D),
+ waiting = Waiting ++ [{DbName,Node}]
+ }
+ end.
+initial_sync() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [[push(Db, N) || Db <- [Db1,Db2]] || N <- Nodes, lists:member(N, Live)].
+start_update_notifier() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ couch_db_update_notifier:start_link(fun
+ ({updated, Db}) when Db == Db1; Db == Db2 ->
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
+ (_) -> ok end).
+%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
+%% which does not correspond to an already running replication
+-spec next_replication(list(), list()) -> {binary(),node(),list()} | nil.
+next_replication(Active, Waiting) ->
+ case lists:splitwith(fun({S,N}) -> is_running(S,N,Active) end, Waiting) of
+ {_, []} ->
+ nil;
+ {Running, [{DbName,Node}|Rest]} ->
+ {DbName, Node, Running ++ Rest}
+ end.
+is_running(DbName, Node, ActiveList) ->
+ [] =/= [true || {S,N,_} <- ActiveList, S=:=DbName, N=:=Node].