summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2010-11-19 01:38:41 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2010-11-19 01:38:41 +0000
commit1074767a6b2a254fc6abcb08ec8ccc156e5aa6e9 (patch)
tree5b7701295e50f1f795011b441f483b77b7ccbadf /src/couchdb/couch_rep.erl
parent8c607abe413839f428a6a7209aab385b0fa22308 (diff)
Make sure that after a local database compaction the old database reference counters don't get unreleased forever because of a
continuous (or long) replication is going on. Same type of issue as in COUCHDB-926. Thanks Adam Kocoloski for some suggestions. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1036705 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl74
1 files changed, 59 insertions, 15 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index e288efa6..b5918abc 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -53,7 +53,9 @@
committed_seq = 0,
stats = nil,
- rep_doc = nil
+ rep_doc = nil,
+ source_db_update_notifier = nil,
+ target_db_update_notifier = nil
}).
%% convenience function to do a simple replication from the shell
@@ -187,7 +189,9 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- rep_doc = RepDoc
+ rep_doc = RepDoc,
+ source_db_update_notifier = source_db_update_notifier(Source),
+ target_db_update_notifier = target_db_update_notifier(Target)
},
{ok, State}.
@@ -195,7 +199,21 @@ handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
{stop, normal, State#state{listeners=[From]}};
handle_call(get_result, From, State) ->
Listeners = State#state.listeners,
- {noreply, State#state{listeners=[From|Listeners]}}.
+ {noreply, State#state{listeners=[From|Listeners]}};
+
+handle_call(get_source_db, _From, #state{source = Source} = State) ->
+ {reply, {ok, Source}, State};
+
+handle_call(get_target_db, _From, #state{target = Target} = State) ->
+ {reply, {ok, Target}, State}.
+
+handle_cast(reopen_source_db, #state{source = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#state{source = NewSource}};
+
+handle_cast(reopen_target_db, #state{target = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#state{target = NewTarget}};
handle_cast(do_checkpoint, State) ->
{noreply, do_checkpoint(State)};
@@ -255,11 +273,11 @@ terminate(normal, State) ->
terminate(shutdown, #state{listeners = Listeners} = State) ->
% continuous replication stopped
[gen_server:reply(L, {ok, stopped}) || L <- Listeners],
- do_forced_terminate(State);
+ terminate_cleanup(State);
terminate(Reason, #state{listeners = Listeners} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
- do_forced_terminate(State),
+ terminate_cleanup(State),
update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]).
code_change(_OldVsn, State, _Extra) ->
@@ -267,11 +285,6 @@ code_change(_OldVsn, State, _Extra) ->
% internal funs
-do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) ->
- ets:delete(Stats),
- close_db(Target),
- close_db(Source).
-
start_replication_server(Replicator) ->
RepId = element(1, Replicator),
case supervisor:start_child(couch_rep_sup, Replicator) of
@@ -399,13 +412,20 @@ do_terminate(State) ->
false ->
[gen_server:reply(R, retry) || R <- OtherListeners]
end,
+ couch_task_status:update("Finishing"),
terminate_cleanup(State).
-terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) ->
- couch_task_status:update("Finishing"),
- close_db(Target),
- close_db(Source),
- ets:delete(Stats).
+terminate_cleanup(State) ->
+ close_db(State#state.source),
+ close_db(State#state.target),
+ stop_db_update_notifier(State#state.source_db_update_notifier),
+ stop_db_update_notifier(State#state.target_db_update_notifier),
+ ets:delete(State#state.stats).
+
+stop_db_update_notifier(nil) ->
+ ok;
+stop_db_update_notifier(Notifier) ->
+ couch_db_update_notifier:stop(Notifier).
has_session_id(_SessionId, []) ->
false;
@@ -887,3 +907,27 @@ ensure_rep_ddoc_exists(RepDb, DDocID) ->
{ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
end,
ok.
+
+source_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_source_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+source_db_update_notifier(_) ->
+ nil.
+
+target_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_target_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+target_db_update_notifier(_) ->
+ nil.