diff options
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 65 |
1 files changed, 57 insertions, 8 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 90b065c0..ba387285 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -47,7 +47,9 @@ committed_seq = 0, stats = nil, - doc_ids = nil + doc_ids = nil, + source_db_update_notifier = nil, + target_db_update_notifier = nil }). %% convenience function to do a simple replication from the shell @@ -196,7 +198,9 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds + doc_ids = DocIds, + source_db_update_notifier = source_db_update_notifier(Source), + target_db_update_notifier = target_db_update_notifier(Target) }, {ok, State}. @@ -204,7 +208,21 @@ handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> {stop, normal, State#state{listeners=[From]}}; handle_call(get_result, From, State) -> Listeners = State#state.listeners, - {noreply, State#state{listeners=[From|Listeners]}}. + {noreply, State#state{listeners=[From|Listeners]}}; + +handle_call(get_source_db, _From, #state{source = Source} = State) -> + {reply, {ok, Source}, State}; + +handle_call(get_target_db, _From, #state{target = Target} = State) -> + {reply, {ok, Target}, State}. + +handle_cast(reopen_source_db, #state{source = Source} = State) -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#state{source = NewSource}}; + +handle_cast(reopen_target_db, #state{target = Target} = State) -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#state{target = NewTarget}}; handle_cast(do_checkpoint, State) -> {noreply, do_checkpoint(State)}; @@ -422,13 +440,20 @@ do_terminate(State) -> false -> [gen_server:reply(R, retry) || R <- OtherListeners] end, + couch_task_status:update("Finishing"), terminate_cleanup(State). -terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) -> - couch_task_status:update("Finishing"), - close_db(Target), - close_db(Source), - ets:delete(Stats). +terminate_cleanup(State) -> + close_db(State#state.source), + close_db(State#state.target), + stop_db_update_notifier(State#state.source_db_update_notifier), + stop_db_update_notifier(State#state.target_db_update_notifier), + ets:delete(State#state.stats). + +stop_db_update_notifier(nil) -> + ok; +stop_db_update_notifier(Notifier) -> + couch_db_update_notifier:stop(Notifier). has_session_id(_SessionId, []) -> false; @@ -752,3 +777,27 @@ parse_proxy_params(ProxyUrl) -> true -> [{proxy_user, User}, {proxy_password, Passwd}] end. + +source_db_update_notifier(#db{name = DbName}) -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, reopen_source_db); + (_) -> + ok + end), + Notifier; +source_db_update_notifier(_) -> + nil. + +target_db_update_notifier(#db{name = DbName}) -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, reopen_target_db); + (_) -> + ok + end), + Notifier; +target_db_update_notifier(_) -> + nil. |