+% Copyright 2010 Cloudant
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+-export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
+ remove_node/1, initial_sync/1]).
+-record(state, {
+ active = [],
+ count = 0,
+ limit,
+ dict = dict:new(),
+ waiting = [],
+ update_notifier
+-record(job, {name, node, count=nil, pid=nil}).
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+get_active() ->
+ gen_server:call(?MODULE, get_active).
+get_queue() ->
+ gen_server:call(?MODULE, get_queue).
+push(#shard{name = Name}, Target) ->
+ push(Name, Target);
+push(Name, #shard{node=Node}) ->
+ push(Name, Node);
+push(Name, Node) ->
+ push(#job{name = Name, node = Node}).
+push(#job{node = Node} = Job) when Node =/= node() ->
+ gen_server:cast(?MODULE, {push, Job});
+push(_) ->
+ ok.
+remove_node(Node) ->
+ gen_server:cast(?MODULE, {remove_node, Node}).
+init([]) ->
+ process_flag(trap_exit, true),
+ Concurrency = couch_config:get("mem3", "sync_concurrency", "10"),
+ gen_event:add_handler(mem3_events, mem3_sync_event, []),
+ {ok, Pid} = start_update_notifier(),
+ spawn(fun initial_sync/0),
+ {ok, #state{limit = list_to_integer(Concurrency), update_notifier=Pid}}.
+handle_call(get_active, _From, State) ->
+ {reply,, State};
+handle_call(get_queue, _From, State) ->
+ {reply, State#state.waiting, State};
+handle_call(get_backlog, _From, #state{active=A, waiting=W} = State) ->
+ CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]),
+ CW = lists:sum([C || #job{count=C} <- W, is_integer(C)]),
+ {reply, CA+CW, State}.
+handle_cast({push, DbName, Node}, State) ->
+ handle_cast({push, #job{name = DbName, node = Node}}, State);
+handle_cast({push, Job}, #state{count=Count, limit=Limit} = State)
+ when Count >= Limit ->
+ {noreply, add_to_queue(State, Job)};
+handle_cast({push, Job}, State) ->
+ #state{active = L, count = C} = State,
+ #job{name = DbName, node = Node} = Job,
+ case is_running(DbName, Node, L) of
+ true ->
+ {noreply, add_to_queue(State, Job)};
+ false ->
+ Pid = start_push_replication(Job),
+ {noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}}
+ end;
+handle_cast({remove_node, Node}, #state{waiting = W0} = State) ->
+ {Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, W0),
+ Dict = remove_entries(State#state.dict, Dead),
+ [exit(Pid, die_now) || #job{node=N, pid=Pid} <-,
+ N =:= Node],
+ {noreply, State#state{dict = Dict, waiting = Alive}};
+handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
+ {Alive, Dead} = lists:partition(fun(#job{name=S}) -> S =/= Shard end, W0),
+ Dict = remove_entries(State#state.dict, Dead),
+ [exit(Pid, die_now) || #job{name=S, pid=Pid} <-,
+ S =:= Shard],
+ {noreply, State#state{dict = Dict, waiting = Alive}}.
+handle_info({'EXIT', Pid, _}, #state{update_notifier=Pid} = State) ->
+ {ok, NewPid} = start_update_notifier(),
+ {noreply, State#state{update_notifier=NewPid}};
+handle_info({'EXIT', Active, normal}, State) ->
+ handle_replication_exit(State, Active);
+handle_info({'EXIT', Active, die_now}, State) ->
+ % we forced this one ourselves, do not retry
+ handle_replication_exit(State, Active);
+handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) ->
+ % target doesn't exist, do not retry
+ handle_replication_exit(State, Active);
+handle_info({'EXIT', Active, Reason}, State) ->
+ NewState = case lists:keyfind(Active,, of
+ #job{name=OldDbName, node=OldNode} = Job ->
+ twig:log(warn, "~p ~s -> ~p ~p", [?MODULE, OldDbName, OldNode,
+ Reason]),
+ case Reason of {pending_changes, Count} ->
+ add_to_queue(State, Job#job{pid = nil, count = Count});
+ _ ->
+ timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]),
+ State
+ end;
+ false -> State end,
+ handle_replication_exit(NewState, Active);
+handle_info(Msg, State) ->
+ twig:log(notice, "unexpected msg at replication manager ~p", [Msg]),
+ {noreply, State}.
+terminate(_Reason, State) ->
+ [exit(Pid, shutdown) || #job{pid=Pid} <-],
+ ok.
+code_change(_, #state{waiting = [{_,_}|_] = W, active=A} = State, _) ->
+ Waiting = [#job{name=Name, node=Node} || {Name,Node} <- W],
+ Active = [#job{name=Name, node=Node, pid=Pid} || {Name,Node,Pid} <- A],
+ {ok, State#state{active = Active, waiting = Waiting}};
+code_change(_, State, _) ->
+ {ok, State}.
+handle_replication_exit(#state{waiting=[]} = State, Pid) ->
+ NewActive = lists:keydelete(Pid,,,
+ {noreply, State#state{active=NewActive, count=length(NewActive)}};
+handle_replication_exit(State, Pid) ->
+ #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
+ Active1 = lists:keydelete(Pid,, Active),
+ Count = length(Active1),
+ NewState = if Count < Limit ->
+ case next_replication(Active1, Waiting) of
+ nil -> % all waiting replications are also active
+ State#state{active = Active1, count = Count};
+ {#job{name=DbName, node=Node} = Job, StillWaiting} ->
+ NewPid = start_push_replication(Job),
+ State#state{
+ active = [Job#job{pid = NewPid} | Active1],
+ count = Count+1,
+ dict = dict:erase({DbName,Node}, D),
+ waiting = StillWaiting
+ }
+ end;
+ true ->
+ State#state{active = Active1, count=Count}
+ end,
+ {noreply, NewState}.
+start_push_replication(#job{name=Name, node=Node}) ->
+ spawn_link(mem3_rep, go, [Name, Node]).
+add_to_queue(State, #job{name=DbName, node=Node} = Job) ->
+ #state{dict=D, waiting=Waiting} = State,
+ case dict:is_key({DbName, Node}, D) of
+ true ->
+ State;
+ false ->
+ twig:log(debug, "adding ~s -> ~p to mem3_sync queue", [DbName, Node]),
+ State#state{
+ dict = dict:store({DbName,Node}, ok, D),
+ waiting = Waiting ++ [Job]
+ }
+ end.
+sync_nodes_and_dbs() ->
+ Db1 = couch_config:get("mem3", "node_db", "nodes"),
+ Db2 = couch_config:get("mem3", "shard_db", "dbs"),
+ Db3 = couch_config:get("couch_httpd_auth", "authentication_db", "_users"),
+ Dbs = [Db1, Db2, Db3],
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [[push(?l2b(Db), N) || Db <- Dbs] || N <- Nodes, lists:member(N, Live)].
+initial_sync() ->
+ [net_kernel:connect_node(Node) || Node <- mem3:nodes()],
+ sync_nodes_and_dbs(),
+ initial_sync(nodes()).
+initial_sync(Live) ->
+ Self = node(),
+ {ok, AllDbs} = fabric:all_dbs(),
+ lists:foreach(fun(Db) ->
+ LocalShards = [S || #shard{node=N} = S <- mem3:shards(Db), N =:= Self],
+ lists:foreach(fun(#shard{name=ShardName}) ->
+ Targets = [S || #shard{node=N, name=Name} = S <- mem3:shards(Db),
+ N =/= Self, Name =:= ShardName],
+ [?MODULE:push(ShardName, N) || #shard{node=N} <- Targets,
+ lists:member(N, Live)]
+ end, LocalShards)
+ end, AllDbs).
+start_update_notifier() ->
+ Db1 = ?l2b(couch_config:get("mem3", "node_db", "nodes")),
+ Db2 = ?l2b(couch_config:get("mem3", "shard_db", "dbs")),
+ Db3 = ?l2b(couch_config:get("couch_httpd_auth", "authentication_db",
+ "_users")),
+ couch_db_update_notifier:start_link(fun
+ ({updated, Db}) when Db == Db1; Db == Db2; Db == Db3 ->
+ Nodes = mem3:nodes(),
+ Live = nodes(),
+ [?MODULE:push(Db, N) || N <- Nodes, lists:member(N, Live)];
+ ({updated, <<"shards/", _/binary>> = ShardName}) ->
+ % TODO deal with split/merged partitions by comparing keyranges
+ try mem3:shards(mem3:dbname(ShardName)) of
+ Shards ->
+ Targets = [S || #shard{node=N, name=Name} = S <- Shards,
+ N =/= node(), Name =:= ShardName],
+ Live = nodes(),
+ [?MODULE:push(ShardName,N) || #shard{node=N} <- Targets,
+ lists:member(N, Live)]
+ catch error:database_does_not_exist ->
+ ok
+ end;
+ ({deleted, <<"shards/", _:18/binary, _/binary>> = ShardName}) ->
+ gen_server:cast(?MODULE, {remove_shard, ShardName});
+ (_) -> ok end).
+%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
+%% which does not correspond to an already running replication
+-spec next_replication([#job{}], [#job{}]) -> {#job{}, [#job{}]} | nil.
+next_replication(Active, Waiting) ->
+ Fun = fun(#job{name=S, node=N}) -> is_running(S,N,Active) end,
+ case lists:splitwith(Fun, Waiting) of
+ {_, []} ->
+ nil;
+ {Running, [Job|Rest]} ->
+ {Job, Running ++ Rest}
+ end.
+is_running(DbName, Node, ActiveList) ->
+ [] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node].
+remove_entries(Dict, Entries) ->
+ lists:foldl(fun(Entry, D) -> dict:erase(Entry, D) end, Dict, Entries).