diff options
author | Adam Kocoloski <adam.kocoloski@gmail.com> | 2011-01-20 12:43:37 -0500 |
---|---|---|
committer | Adam Kocoloski <adam.kocoloski@gmail.com> | 2011-01-20 13:05:41 -0500 |
commit | f79d0a666a5fb9541a0925db5111208a94631065 (patch) | |
tree | 392d85a8a9887ddc8f6268a48a65537b21734a7d /apps/couch/src/couch_rep.erl | |
parent | 2ea18bdaa19ea7f2da1a5dccce65d50cf0efc64d (diff) | |
parent | 94286611038e661487382ed834103853e88fdf69 (diff) |
Merge CouchDB 1.0.2 release candidate
Conflicts:
Makefile.am
acinclude.m4.in
apps/couch/src/couch_db.erl
apps/couch/src/couch_db_updater.erl
apps/couch/src/couch_rep.erl
apps/couch/src/couch_rep_reader.erl
apps/couch/src/couch_view.erl
apps/couch/src/couch_view_group.erl
rel/overlay/etc/default.ini
share/Makefile.am
src/couchdb/couch_query_servers.erl
src/ibrowse/Makefile.am
src/ibrowse/ibrowse.app.in
src/ibrowse/ibrowse.erl
src/ibrowse/ibrowse_app.erl
src/ibrowse/ibrowse_http_client.erl
src/ibrowse/ibrowse_lb.erl
src/ibrowse/ibrowse_lib.erl
src/ibrowse/ibrowse_sup.erl
src/ibrowse/ibrowse_test.erl
src/mochiweb/mochijson2.erl
test/etap/112-replication-missing-revs.t
test/etap/113-replication-attachment-comp.t
test/etap/140-attachment-comp.t
Diffstat (limited to 'apps/couch/src/couch_rep.erl')
-rw-r--r-- | apps/couch/src/couch_rep.erl | 324 |
1 files changed, 162 insertions, 162 deletions
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 126639e0..c804b49d 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -15,7 +15,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/2, checkpoint/1, start_link/3]). +-export([replicate/2, checkpoint/1]). -include("couch_db.hrl"). @@ -34,6 +34,7 @@ start_seq, history, + session_id, source_log, target_log, rep_starttime, @@ -46,12 +47,11 @@ committed_seq = 0, stats = nil, - doc_ids = nil + doc_ids = nil, + source_db_update_notifier = nil, + target_db_update_notifier = nil }). -start_link(Id, PostBody, UserCtx) -> - gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []). - %% convenience function to do a simple replication from the shell replicate(Source, Target) when is_list(Source) -> replicate(?l2b(Source), Target); @@ -64,7 +64,7 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> replicate({Props}=PostBody, UserCtx) -> {BaseId, Extension} = make_replication_id(PostBody, UserCtx), Replicator = {BaseId ++ Extension, - {?MODULE, start_link, [BaseId, PostBody, UserCtx]}, + {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, temporary, 1, worker, @@ -83,15 +83,10 @@ replicate({Props}=PostBody, UserCtx) -> false -> Server = start_replication_server(Replicator), - Continuous = couch_util:get_value(<<"continuous">>, Props, false), - Async = couch_util:get_value(<<"async">>, Props, false), - case {Continuous, Async} of - {true, _} -> + case couch_util:get_value(<<"continuous">>, Props, false) of + true -> {ok, {continuous, ?l2b(BaseId)}}; - {_, true} -> - spawn(fun() -> get_result(Server, PostBody, UserCtx) end), - Server; - _ -> + false -> get_result(Server, PostBody, UserCtx) end end. @@ -113,8 +108,10 @@ get_result(Server, PostBody, UserCtx) -> end. init(InitArgs) -> - try do_init(InitArgs) - catch _:Error -> + try + do_init(InitArgs) + catch + throw:Error -> {stop, Error} end. @@ -199,12 +196,15 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> start_seq = StartSeq, history = History, + session_id = couch_uuids:random(), source_log = SourceLog, target_log = TargetLog, rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds + doc_ids = DocIds, + source_db_update_notifier = source_db_update_notifier(Source), + target_db_update_notifier = target_db_update_notifier(Target) }, {ok, State}. @@ -212,7 +212,21 @@ handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> {stop, normal, State#state{listeners=[From]}}; handle_call(get_result, From, State) -> Listeners = State#state.listeners, - {noreply, State#state{listeners=[From|Listeners]}}. + {noreply, State#state{listeners=[From|Listeners]}}; + +handle_call(get_source_db, _From, #state{source = Source} = State) -> + {reply, {ok, Source}, State}; + +handle_call(get_target_db, _From, #state{target = Target} = State) -> + {reply, {ok, Target}, State}. + +handle_cast(reopen_source_db, #state{source = Source} = State) -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#state{source = NewSource}}; + +handle_cast(reopen_target_db, #state{target = Target} = State) -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#state{target = NewTarget}}; handle_cast(do_checkpoint, State) -> {noreply, do_checkpoint(State)}; @@ -221,16 +235,14 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({missing_revs_checkpoint, SourceSeq}, State) -> - couch_task_status:update("MR Processed source update #~p of ~p", - [SourceSeq, seqnum(State#state.source)]), + couch_task_status:update("MR Processed source update #~p", [SourceSeq]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), - couch_task_status:update("W Processed source update #~p of ~p", - [SourceSeq, seqnum(State#state.source)]), + couch_task_status:update("W Processed source update #~p", [SourceSeq]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, _}, State) -> {noreply, State}; @@ -239,14 +251,8 @@ handle_info({update_stats, Key, N}, State) -> ets:update_counter(State#state.stats, Key, N), {noreply, State}; -handle_info({'DOWN', _, _, Pid, _}, State) -> - Me = node(), - case erlang:node(Pid) of - Me -> - ?LOG_INFO("replication terminating - local DB is shutting down", []); - Node -> - ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node]) - end, +handle_info({'DOWN', _, _, _, _}, State) -> + ?LOG_INFO("replication terminating because local DB is shutting down", []), timer:cancel(State#state.checkpoint_scheduled), {stop, shutdown, State}; @@ -293,35 +299,40 @@ code_change(_OldVsn, State, _Extra) -> % internal funs start_replication_server(Replicator) -> - start_replication_server(Replicator, fun start_child/1). - -start_replication_server(Replicator, StartFun) -> - case StartFun(Replicator) of + RepId = element(1, Replicator), + case supervisor:start_child(couch_rep_sup, Replicator) of {ok, Pid} -> + ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), Pid; {error, already_present} -> - start_replication_server(Replicator, fun restart_child/1); + case supervisor:restart_child(couch_rep_sup, RepId) of + {ok, Pid} -> + ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), + Pid; + {error, running} -> + %% this error occurs if multiple replicators are racing + %% each other to start and somebody else won. Just grab + %% the Pid by calling start_child again. + {error, {already_started, Pid}} = + supervisor:start_child(couch_rep_sup, Replicator), + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), + Pid; + {error, {db_not_found, DbUrl}} -> + throw({db_not_found, <<"could not open ", DbUrl/binary>>}); + {error, {unauthorized, DbUrl}} -> + throw({unauthorized, + <<"unauthorized to access database ", DbUrl/binary>>}) + end; {error, {already_started, Pid}} -> + ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), Pid; - {error, running} -> - Children = supervisor:which_children(couch_rep_sup), - {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children), - Pid; - % sadly both seem to be needed. I don't know why. {error, {{db_not_found, DbUrl}, _}} -> throw({db_not_found, <<"could not open ", DbUrl/binary>>}); - {error, {db_not_found, DbUrl}} -> - throw({db_not_found, <<"could not open ", DbUrl/binary>>}); - {error, {node_not_connected, Node}} -> - throw({node_not_connected, Node}) + {error, {{unauthorized, DbUrl}, _}} -> + throw({unauthorized, + <<"unauthorized to access database ", DbUrl/binary>>}) end. -start_child(Replicator) -> - supervisor:start_child(couch_rep_sup, Replicator). - -restart_child(Replicator) -> - supervisor:restart_child(couch_rep_sup, element(1, Replicator)). - compare_replication_logs(SrcDoc, TgtDoc) -> #doc{body={RepRecProps}} = SrcDoc, #doc{body={RepRecPropsTgt}} = TgtDoc, @@ -373,15 +384,9 @@ close_db(Db) -> couch_db:close(Db). dbname(#http_db{url = Url}) -> - strip_password(Url); -dbname(#db{name = Name, main_pid = MainPid}) -> - ?l2b([Name, " (", pid_to_list(MainPid), ")"]). - -strip_password(Url) -> - re:replace(Url, - "http(s)?://([^:]+):[^@]+@(.*)$", - "http\\1://\\2:*****@\\3", - [{return, list}]). + couch_util:url_strip_password(Url); +dbname(#db{name = Name}) -> + Name. dbinfo(#http_db{} = Db) -> {DbProps} = couch_rep_httpc:request(Db), @@ -445,13 +450,20 @@ do_terminate(State) -> false -> [gen_server:reply(R, retry) || R <- OtherListeners] end, + couch_task_status:update("Finishing"), terminate_cleanup(State). -terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) -> - couch_task_status:update("Finishing"), - close_db(Target), - close_db(Source), - ets:delete(Stats). +terminate_cleanup(State) -> + close_db(State#state.source), + close_db(State#state.target), + stop_db_update_notifier(State#state.source_db_update_notifier), + stop_db_update_notifier(State#state.target_db_update_notifier), + ets:delete(State#state.stats). + +stop_db_update_notifier(nil) -> + ok; +stop_db_update_notifier(Notifier) -> + couch_db_update_notifier:stop(Notifier). has_session_id(_SessionId, []) -> false; @@ -476,12 +488,7 @@ maybe_append_options(Options, Props) -> make_replication_id({Props}, UserCtx) -> %% funky algorithm to preserve backwards compatibility - case couch_util:get_value(<<"use_hostname">>, Props, false) of - true -> - {ok, HostName} = inet:gethostname(); - false -> - HostName = couch_config:get("replication", "hostname", "cloudant.com") - end, + {ok, HostName} = inet:gethostname(), % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), @@ -504,22 +511,15 @@ make_replication_id({Props}, UserCtx) -> maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). -get_rep_endpoint(UserCtx, {Props}) -> - case couch_util:get_value(<<"url">>, Props) of +get_rep_endpoint(_UserCtx, {Props}) -> + Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), + {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), + {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), + case couch_util:get_value(<<"oauth">>, Auth) of undefined -> - Node = couch_util:get_value(<<"node">>, Props), - Name = couch_util:get_value(<<"name">>, Props), - {Node, Name, UserCtx}; - RawUrl -> - Url = maybe_add_trailing_slash(RawUrl), - {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), - {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), - case couch_util:get_value(<<"oauth">>, Auth) of - undefined -> - {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; - {OAuth} -> - {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} - end + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; + {OAuth} -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} end; get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> {remote, maybe_add_trailing_slash(Url), []}; @@ -533,43 +533,27 @@ open_replication_log(#http_db{}=Db, RepId) -> Req = Db#http_db{resource=couch_util:url_encode(DocId)}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> - % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), #doc{id=?l2b(DocId)}; Doc -> - % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), + ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), couch_doc:from_json_obj(Doc) end; open_replication_log(Db, RepId) -> DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), case couch_db:open_doc(Db, DocId, []) of {ok, Doc} -> - % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), + ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), Doc; _ -> - % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), + ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), #doc{id=DocId} end. open_db(Props, UserCtx, ProxyParams) -> open_db(Props, UserCtx, ProxyParams, false). -open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) -> - open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); -open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) -> - open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); -open_db({Props}, UserCtx, ProxyParams, Create) -> - case couch_util:get_value(<<"url">>, Props) of - undefined -> - Node = couch_util:get_value(<<"node">>, Props, node()), - DbName = couch_util:get_value(<<"name">>, Props), - open_local_db(Node, DbName, UserCtx, Create); - _Url -> - open_remote_db({Props}, ProxyParams, Create) - end; -open_db(<<DbName/binary>>, UserCtx, _ProxyParams, Create) -> - open_local_db(node(), DbName, UserCtx, Create). - -open_remote_db({Props}, ProxyParams, CreateTarget) -> +open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}), {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), @@ -580,33 +564,34 @@ open_remote_db({Props}, ProxyParams, CreateTarget) -> auth = AuthProps, headers = lists:ukeymerge(1, Headers, DefaultHeaders) }, - Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams}, - couch_rep_httpc:db_exists(Db, CreateTarget). - -open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) -> - try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create) - catch error:badarg -> - ?LOG_ERROR("unknown replication node ~s", [Node]), - throw({node_not_connected, Node}) end; -open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) -> - case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of - {ok, #db{} = Db} -> - couch_db:monitor(Db), - Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)}; - {ok, MainPid} when is_pid(MainPid) -> - {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx), - couch_db:monitor(Db), - Db; - {not_found, no_db_file} when Create =:= false-> - throw({db_not_found, DbName}); - {not_found, no_db_file} -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); - {'EXIT', {{nodedown, Node}, _Stack}} -> - throw({node_not_connected, couch_util:to_binary(Node)}); - {'EXIT', {noproc, {gen_server,call,_}}} -> - timer:sleep(1000), - throw({noproc, couch_server, Node}) + Db = Db1#http_db{ + options = Db1#http_db.options ++ ProxyParams ++ + couch_rep_httpc:ssl_options(Db1) + }, + couch_rep_httpc:db_exists(Db, CreateTarget); +open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> + open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); +open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> + open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); +open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> + try + case CreateTarget of + true -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); + false -> + ok + end, + + case couch_db:open(DbName, [{user_ctx, UserCtx}]) of + {ok, Db} -> + couch_db:monitor(Db), + Db; + {not_found, no_db_file} -> + throw({db_not_found, DbName}) + end + catch throw:{unauthorized, _} -> + throw({unauthorized, DbName}) end. schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> @@ -628,6 +613,7 @@ do_checkpoint(State) -> committed_seq = NewSeqNum, start_seq = StartSeqNum, history = OldHistory, + session_id = SessionId, source_log = SourceLog, target_log = TargetLog, rep_starttime = ReplicationStartTime, @@ -637,14 +623,8 @@ do_checkpoint(State) -> } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> - ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p" - " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]), - SessionId = couch_uuids:new(), - TargetNode = case Target of #db{main_pid=MainPid} -> - erlang:node(MainPid); - _ -> - http - end, + ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", + [dbname(Source), dbname(Target), NewSeqNum]), NewHistoryEntry = {[ {<<"session_id">>, SessionId}, {<<"start_time">>, list_to_binary(ReplicationStartTime)}, @@ -663,7 +643,6 @@ do_checkpoint(State) -> NewRepHistory = {[ {<<"session_id">>, SessionId}, {<<"source_last_seq">>, NewSeqNum}, - {<<"target_node">>, TargetNode}, {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} ]}, @@ -683,9 +662,7 @@ do_checkpoint(State) -> "yourself?)", []), State end; - Else -> - ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [ - {SrcInstanceStartTime, TgtInstanceStartTime}, Else]), + _Else -> ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", [dbname(Source), dbname(Target)]), #state{ @@ -717,31 +694,30 @@ commit_to_both(Source, Target, RequiredSeq) -> {SrcCommitPid, Timestamp} -> Timestamp; {'EXIT', SrcCommitPid, {http_request_failed, _}} -> - nil; - {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} -> - nil; % DB crashed, this should trigger a reboot - {'EXIT', SrcCommitPid, Else} -> - ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]), - nil + exit(replication_link_failure) end, {SourceStartTime, TargetStartTime}. ensure_full_commit(#http_db{headers = Headers} = Target) -> + Headers1 = [ + {"Content-Length", 0} | + couch_util:proplist_apply_field( + {"Content-Type", "application/json"}, Headers) + ], Req = Target#http_db{ resource = "_ensure_full_commit", method = post, - headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers) + headers = Headers1 }, {ResultProps} = couch_rep_httpc:request(Req), true = couch_util:get_value(<<"ok">>, ResultProps), couch_util:get_value(<<"instance_start_time">>, ResultProps); -ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) -> - TargetNode = erlang:node(Pid), - {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]), +ensure_full_commit(Target) -> + {ok, NewDb} = couch_db:open_int(Target#db.name, []), UpdateSeq = couch_db:get_update_seq(Target), CommitSeq = couch_db:get_committed_update_seq(NewDb), InstanceStartTime = NewDb#db.instance_start_time, - catch couch_db:close(NewDb), + couch_db:close(NewDb), if UpdateSeq > CommitSeq -> ?LOG_DEBUG("target needs a full commit: update ~p commit ~p", [UpdateSeq, CommitSeq]), @@ -753,11 +729,16 @@ ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) -> end. ensure_full_commit(#http_db{headers = Headers} = Source, RequiredSeq) -> + Headers1 = [ + {"Content-Length", 0} | + couch_util:proplist_apply_field( + {"Content-Type", "application/json"}, Headers) + ], Req = Source#http_db{ resource = "_ensure_full_commit", method = post, qs = [{seq, RequiredSeq}], - headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, Headers) + headers = Headers1 }, {ResultProps} = couch_rep_httpc:request(Req), case couch_util:get_value(<<"ok">>, ResultProps) of @@ -801,11 +782,6 @@ up_to_date(Source, Seq) -> couch_db:close(NewDb), T. -seqnum(#http_db{}) -> - -1; -seqnum(Db) -> - Db#db.update_seq. - parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> parse_proxy_params(?b2l(ProxyUrl)); parse_proxy_params([]) -> @@ -820,3 +796,27 @@ parse_proxy_params(ProxyUrl) -> true -> [{proxy_user, User}, {proxy_password, Passwd}] end. + +source_db_update_notifier(#db{name = DbName}) -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, reopen_source_db); + (_) -> + ok + end), + Notifier; +source_db_update_notifier(_) -> + nil. + +target_db_update_notifier(#db{name = DbName}) -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, reopen_target_db); + (_) -> + ok + end), + Notifier; +target_db_update_notifier(_) -> + nil. |