summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_rep.erl74
-rw-r--r--src/couchdb/couch_rep_missing_revs.erl12
-rw-r--r--src/couchdb/couch_rep_reader.erl14
-rw-r--r--src/couchdb/couch_rep_writer.erl9
-rwxr-xr-xtest/etap/112-replication-missing-revs.t19
5 files changed, 94 insertions, 34 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index e288efa6..b5918abc 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -53,7 +53,9 @@
committed_seq = 0,
stats = nil,
- rep_doc = nil
+ rep_doc = nil,
+ source_db_update_notifier = nil,
+ target_db_update_notifier = nil
}).
%% convenience function to do a simple replication from the shell
@@ -187,7 +189,9 @@ do_init([RepId, {PostProps} = RepDoc, UserCtx] = InitArgs) ->
rep_starttime = httpd_util:rfc1123_date(),
src_starttime = couch_util:get_value(instance_start_time, SourceInfo),
tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo),
- rep_doc = RepDoc
+ rep_doc = RepDoc,
+ source_db_update_notifier = source_db_update_notifier(Source),
+ target_db_update_notifier = target_db_update_notifier(Target)
},
{ok, State}.
@@ -195,7 +199,21 @@ handle_call(get_result, From, #state{complete=true, listeners=[]} = State) ->
{stop, normal, State#state{listeners=[From]}};
handle_call(get_result, From, State) ->
Listeners = State#state.listeners,
- {noreply, State#state{listeners=[From|Listeners]}}.
+ {noreply, State#state{listeners=[From|Listeners]}};
+
+handle_call(get_source_db, _From, #state{source = Source} = State) ->
+ {reply, {ok, Source}, State};
+
+handle_call(get_target_db, _From, #state{target = Target} = State) ->
+ {reply, {ok, Target}, State}.
+
+handle_cast(reopen_source_db, #state{source = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#state{source = NewSource}};
+
+handle_cast(reopen_target_db, #state{target = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#state{target = NewTarget}};
handle_cast(do_checkpoint, State) ->
{noreply, do_checkpoint(State)};
@@ -255,11 +273,11 @@ terminate(normal, State) ->
terminate(shutdown, #state{listeners = Listeners} = State) ->
% continuous replication stopped
[gen_server:reply(L, {ok, stopped}) || L <- Listeners],
- do_forced_terminate(State);
+ terminate_cleanup(State);
terminate(Reason, #state{listeners = Listeners} = State) ->
[gen_server:reply(L, {error, Reason}) || L <- Listeners],
- do_forced_terminate(State),
+ terminate_cleanup(State),
update_rep_doc(State#state.rep_doc, [{<<"state">>, <<"error">>}]).
code_change(_OldVsn, State, _Extra) ->
@@ -267,11 +285,6 @@ code_change(_OldVsn, State, _Extra) ->
% internal funs
-do_forced_terminate(#state{source = Source, target = Target, stats = Stats}) ->
- ets:delete(Stats),
- close_db(Target),
- close_db(Source).
-
start_replication_server(Replicator) ->
RepId = element(1, Replicator),
case supervisor:start_child(couch_rep_sup, Replicator) of
@@ -399,13 +412,20 @@ do_terminate(State) ->
false ->
[gen_server:reply(R, retry) || R <- OtherListeners]
end,
+ couch_task_status:update("Finishing"),
terminate_cleanup(State).
-terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) ->
- couch_task_status:update("Finishing"),
- close_db(Target),
- close_db(Source),
- ets:delete(Stats).
+terminate_cleanup(State) ->
+ close_db(State#state.source),
+ close_db(State#state.target),
+ stop_db_update_notifier(State#state.source_db_update_notifier),
+ stop_db_update_notifier(State#state.target_db_update_notifier),
+ ets:delete(State#state.stats).
+
+stop_db_update_notifier(nil) ->
+ ok;
+stop_db_update_notifier(Notifier) ->
+ couch_db_update_notifier:stop(Notifier).
has_session_id(_SessionId, []) ->
false;
@@ -887,3 +907,27 @@ ensure_rep_ddoc_exists(RepDb, DDocID) ->
{ok, _Rev} = couch_db:update_doc(RepDb, DDoc, [])
end,
ok.
+
+source_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_source_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+source_db_update_notifier(_) ->
+ nil.
+
+target_db_update_notifier(#db{name = DbName}) ->
+ Server = self(),
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, reopen_target_db);
+ (_) ->
+ ok
+ end),
+ Notifier;
+target_db_update_notifier(_) ->
+ nil.
diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl
index 1eff6774..9809ca5e 100644
--- a/src/couchdb/couch_rep_missing_revs.erl
+++ b/src/couchdb/couch_rep_missing_revs.erl
@@ -24,7 +24,6 @@
-record (state, {
changes_loop,
changes_from = nil,
- target,
parent,
complete = false,
count = 0,
@@ -44,11 +43,11 @@ next(Server) ->
stop(Server) ->
gen_server:call(Server, stop).
-init([Parent, Target, ChangesFeed, _PostProps]) ->
+init([Parent, _Target, ChangesFeed, _PostProps]) ->
process_flag(trap_exit, true),
Self = self(),
- Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end),
- {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}.
+ Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Parent) end),
+ {ok, #state{changes_loop=Pid, parent=Parent}}.
handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) ->
State#state.parent ! {update_stats, missing_revs, length(Revs)},
@@ -133,15 +132,16 @@ handle_changes_loop_exit(normal, State) ->
handle_changes_loop_exit(Reason, State) ->
{stop, Reason, State#state{changes_loop=nil}}.
-changes_loop(OurServer, SourceChangesServer, Target) ->
+changes_loop(OurServer, SourceChangesServer, Parent) ->
case couch_rep_changes_feed:next(SourceChangesServer) of
complete ->
exit(normal);
Changes ->
+ {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
MissingRevs = get_missing_revs(Target, Changes),
gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity)
end,
- changes_loop(OurServer, SourceChangesServer, Target).
+ changes_loop(OurServer, SourceChangesServer, Parent).
get_missing_revs(#http_db{}=Target, Changes) ->
Transform = fun({Props}) ->
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index bdef3dfc..9252bc8c 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -57,7 +57,8 @@ init([Parent, Source, MissingRevs, _PostProps]) ->
ibrowse:set_max_pipeline_size(Host, Port, ?MAX_PIPELINE_SIZE);
true -> ok end,
Self = self(),
- ReaderLoop = spawn_link(fun() -> reader_loop(Self, Source, MissingRevs) end),
+ ReaderLoop = spawn_link(
+ fun() -> reader_loop(Self, Parent, Source, MissingRevs) end),
State = #state{
parent = Parent,
source = Source,
@@ -247,7 +248,7 @@ open_doc_revs(#http_db{url = Url} = DbS, DocId, Revs) ->
end,
lists:reverse(lists:foldl(Transform, [], JsonResults)).
-reader_loop(ReaderServer, Source, MissingRevsServer) ->
+reader_loop(ReaderServer, Parent, Source, MissingRevsServer) ->
case couch_rep_missing_revs:next(MissingRevsServer) of
complete ->
exit(complete);
@@ -260,22 +261,23 @@ reader_loop(ReaderServer, Source, MissingRevsServer) ->
#http_db{} ->
[gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs},
infinity) || {Id,Seq,Revs} <- SortedIdsRevs],
- reader_loop(ReaderServer, Source, MissingRevsServer);
+ reader_loop(ReaderServer, Parent, Source, MissingRevsServer);
_Local ->
- Source2 = maybe_reopen_db(Source, HighSeq),
+ {ok, Source1} = gen_server:call(Parent, get_source_db, infinity),
+ Source2 = maybe_reopen_db(Source1, HighSeq),
lists:foreach(fun({Id,Seq,Revs}) ->
{ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]),
JustTheDocs = [Doc || {ok, Doc} <- Docs],
gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs},
infinity)
end, SortedIdsRevs),
- reader_loop(ReaderServer, Source2, MissingRevsServer)
+ couch_db:close(Source2),
+ reader_loop(ReaderServer, Parent, Source2, MissingRevsServer)
end
end.
maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq ->
{ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]),
- couch_db:close(Db),
NewDb;
maybe_reopen_db(Db, _HighSeq) ->
Db.
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index 622bfb27..12d6dec5 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -16,16 +16,17 @@
-include("couch_db.hrl").
-start_link(Parent, Target, Reader, _PostProps) ->
- {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
+start_link(Parent, _Target, Reader, _PostProps) ->
+ {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}.
-writer_loop(Parent, Reader, Target) ->
+writer_loop(Parent, Reader) ->
case couch_rep_reader:next(Reader) of
{complete, FinalSeq} ->
Parent ! {writer_checkpoint, FinalSeq},
ok;
{HighSeq, Docs} ->
DocCount = length(Docs),
+ {ok, Target} = gen_server:call(Parent, get_target_db, infinity),
try write_docs(Target, Docs) of
{ok, []} ->
Parent ! {update_stats, docs_written, DocCount};
@@ -41,7 +42,7 @@ writer_loop(Parent, Reader, Target) ->
Parent ! {writer_checkpoint, HighSeq},
couch_rep_att:cleanup(),
couch_util:should_flush(),
- writer_loop(Parent, Reader, Target)
+ writer_loop(Parent, Reader)
end.
write_docs(#http_db{} = Db, Docs) ->
diff --git a/test/etap/112-replication-missing-revs.t b/test/etap/112-replication-missing-revs.t
index 750334b9..71971088 100755
--- a/test/etap/112-replication-missing-revs.t
+++ b/test/etap/112-replication-missing-revs.t
@@ -188,8 +188,21 @@ start_changes_feed(remote, Since, Continuous) ->
Db = #http_db{url = "http://127.0.0.1:5984/etap-test-source/"},
couch_rep_changes_feed:start_link(self(), Db, Since, Props).
+couch_rep_pid(Db) ->
+ spawn(fun() -> couch_rep_pid_loop(Db) end).
+
+couch_rep_pid_loop(Db) ->
+ receive
+ {'$gen_call', From, get_target_db} ->
+ gen_server:reply(From, {ok, Db})
+ end,
+ couch_rep_pid_loop(Db).
+
start_missing_revs(local, Changes) ->
- couch_rep_missing_revs:start_link(self(), get_db(target), Changes, []);
+ TargetDb = get_db(target),
+ MainPid = couch_rep_pid(TargetDb),
+ couch_rep_missing_revs:start_link(MainPid, TargetDb, Changes, []);
start_missing_revs(remote, Changes) ->
- Db = #http_db{url = "http://127.0.0.1:5984/etap-test-target/"},
- couch_rep_missing_revs:start_link(self(), Db, Changes, []).
+ TargetDb = #http_db{url = "http://127.0.0.1:5984/etap-test-target/"},
+ MainPid = couch_rep_pid(TargetDb),
+ couch_rep_missing_revs:start_link(MainPid, TargetDb, Changes, []).