diff options
-rw-r--r-- | src/couchdb/couch_rep.erl | 65 | ||||
-rw-r--r-- | src/couchdb/couch_rep_missing_revs.erl | 12 | ||||
-rw-r--r-- | src/couchdb/couch_rep_reader.erl | 18 | ||||
-rw-r--r-- | src/couchdb/couch_rep_writer.erl | 9 | ||||
-rwxr-xr-x | test/etap/112-replication-missing-revs.t | 19 |
5 files changed, 94 insertions, 29 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 90b065c0..ba387285 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -47,7 +47,9 @@ committed_seq = 0, stats = nil, - doc_ids = nil + doc_ids = nil, + source_db_update_notifier = nil, + target_db_update_notifier = nil }). %% convenience function to do a simple replication from the shell @@ -196,7 +198,9 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> rep_starttime = httpd_util:rfc1123_date(), src_starttime = couch_util:get_value(instance_start_time, SourceInfo), tgt_starttime = couch_util:get_value(instance_start_time, TargetInfo), - doc_ids = DocIds + doc_ids = DocIds, + source_db_update_notifier = source_db_update_notifier(Source), + target_db_update_notifier = target_db_update_notifier(Target) }, {ok, State}. @@ -204,7 +208,21 @@ handle_call(get_result, From, #state{complete=true, listeners=[]} = State) -> {stop, normal, State#state{listeners=[From]}}; handle_call(get_result, From, State) -> Listeners = State#state.listeners, - {noreply, State#state{listeners=[From|Listeners]}}. + {noreply, State#state{listeners=[From|Listeners]}}; + +handle_call(get_source_db, _From, #state{source = Source} = State) -> + {reply, {ok, Source}, State}; + +handle_call(get_target_db, _From, #state{target = Target} = State) -> + {reply, {ok, Target}, State}. + +handle_cast(reopen_source_db, #state{source = Source} = State) -> + {ok, NewSource} = couch_db:reopen(Source), + {noreply, State#state{source = NewSource}}; + +handle_cast(reopen_target_db, #state{target = Target} = State) -> + {ok, NewTarget} = couch_db:reopen(Target), + {noreply, State#state{target = NewTarget}}; handle_cast(do_checkpoint, State) -> {noreply, do_checkpoint(State)}; @@ -422,13 +440,20 @@ do_terminate(State) -> false -> [gen_server:reply(R, retry) || R <- OtherListeners] end, + couch_task_status:update("Finishing"), terminate_cleanup(State). -terminate_cleanup(#state{source=Source, target=Target, stats=Stats}) -> - couch_task_status:update("Finishing"), - close_db(Target), - close_db(Source), - ets:delete(Stats). +terminate_cleanup(State) -> + close_db(State#state.source), + close_db(State#state.target), + stop_db_update_notifier(State#state.source_db_update_notifier), + stop_db_update_notifier(State#state.target_db_update_notifier), + ets:delete(State#state.stats). + +stop_db_update_notifier(nil) -> + ok; +stop_db_update_notifier(Notifier) -> + couch_db_update_notifier:stop(Notifier). has_session_id(_SessionId, []) -> false; @@ -752,3 +777,27 @@ parse_proxy_params(ProxyUrl) -> true -> [{proxy_user, User}, {proxy_password, Passwd}] end. + +source_db_update_notifier(#db{name = DbName}) -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, reopen_source_db); + (_) -> + ok + end), + Notifier; +source_db_update_notifier(_) -> + nil. + +target_db_update_notifier(#db{name = DbName}) -> + Server = self(), + {ok, Notifier} = couch_db_update_notifier:start_link( + fun({compacted, DbName1}) when DbName1 =:= DbName -> + ok = gen_server:cast(Server, reopen_target_db); + (_) -> + ok + end), + Notifier; +target_db_update_notifier(_) -> + nil. diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl index 1eff6774..9809ca5e 100644 --- a/src/couchdb/couch_rep_missing_revs.erl +++ b/src/couchdb/couch_rep_missing_revs.erl @@ -24,7 +24,6 @@ -record (state, { changes_loop, changes_from = nil, - target, parent, complete = false, count = 0, @@ -44,11 +43,11 @@ next(Server) -> stop(Server) -> gen_server:call(Server, stop). -init([Parent, Target, ChangesFeed, _PostProps]) -> +init([Parent, _Target, ChangesFeed, _PostProps]) -> process_flag(trap_exit, true), Self = self(), - Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Target) end), - {ok, #state{changes_loop=Pid, target=Target, parent=Parent}}. + Pid = spawn_link(fun() -> changes_loop(Self, ChangesFeed, Parent) end), + {ok, #state{changes_loop=Pid, parent=Parent}}. handle_call({add_missing_revs, {HighSeq, Revs}}, From, State) -> State#state.parent ! {update_stats, missing_revs, length(Revs)}, @@ -133,15 +132,16 @@ handle_changes_loop_exit(normal, State) -> handle_changes_loop_exit(Reason, State) -> {stop, Reason, State#state{changes_loop=nil}}. -changes_loop(OurServer, SourceChangesServer, Target) -> +changes_loop(OurServer, SourceChangesServer, Parent) -> case couch_rep_changes_feed:next(SourceChangesServer) of complete -> exit(normal); Changes -> + {ok, Target} = gen_server:call(Parent, get_target_db, infinity), MissingRevs = get_missing_revs(Target, Changes), gen_server:call(OurServer, {add_missing_revs, MissingRevs}, infinity) end, - changes_loop(OurServer, SourceChangesServer, Target). + changes_loop(OurServer, SourceChangesServer, Parent). get_missing_revs(#http_db{}=Target, Changes) -> Transform = fun({Props}) -> diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index 5c824cbc..0930599c 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -60,7 +60,7 @@ init([Parent, Source, MissingRevs_or_DocIds, _PostProps]) -> true -> ok end, Self = self(), ReaderLoop = spawn_link( - fun() -> reader_loop(Self, Source, MissingRevs_or_DocIds) end + fun() -> reader_loop(Self, Parent, Source, MissingRevs_or_DocIds) end ), MissingRevs = case MissingRevs_or_DocIds of Pid when is_pid(Pid) -> @@ -281,12 +281,13 @@ open_doc(#http_db{url = Url} = DbS, DocId) -> [] end. -reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> - case Source of +reader_loop(ReaderServer, Parent, Source1, DocIds) when is_list(DocIds) -> + case Source1 of #http_db{} -> [gen_server:call(ReaderServer, {open_remote_doc, Id, nil, nil}, infinity) || Id <- DocIds]; _LocalDb -> + {ok, Source} = gen_server:call(Parent, get_source_db, infinity), Docs = lists:foldr(fun(Id, Acc) -> case couch_db:open_doc(Source, Id) of {ok, Doc} -> @@ -299,7 +300,7 @@ reader_loop(ReaderServer, Source, DocIds) when is_list(DocIds) -> end, exit(complete); -reader_loop(ReaderServer, Source, MissingRevsServer) -> +reader_loop(ReaderServer, Parent, Source, MissingRevsServer) -> case couch_rep_missing_revs:next(MissingRevsServer) of complete -> exit(complete); @@ -312,22 +313,23 @@ reader_loop(ReaderServer, Source, MissingRevsServer) -> #http_db{} -> [gen_server:call(ReaderServer, {open_remote_doc, Id, Seq, Revs}, infinity) || {Id,Seq,Revs} <- SortedIdsRevs], - reader_loop(ReaderServer, Source, MissingRevsServer); + reader_loop(ReaderServer, Parent, Source, MissingRevsServer); _Local -> - Source2 = maybe_reopen_db(Source, HighSeq), + {ok, Source1} = gen_server:call(Parent, get_source_db, infinity), + Source2 = maybe_reopen_db(Source1, HighSeq), lists:foreach(fun({Id,Seq,Revs}) -> {ok, Docs} = couch_db:open_doc_revs(Source2, Id, Revs, [latest]), JustTheDocs = [Doc || {ok, Doc} <- Docs], gen_server:call(ReaderServer, {add_docs, Seq, JustTheDocs}, infinity) end, SortedIdsRevs), - reader_loop(ReaderServer, Source2, MissingRevsServer) + couch_db:close(Source2), + reader_loop(ReaderServer, Parent, Source2, MissingRevsServer) end end. maybe_reopen_db(#db{update_seq=OldSeq} = Db, HighSeq) when HighSeq > OldSeq -> {ok, NewDb} = couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]), - couch_db:close(Db), NewDb; maybe_reopen_db(Db, _HighSeq) -> Db. diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index dd6396fd..cf98ccfb 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -16,10 +16,10 @@ -include("couch_db.hrl"). -start_link(Parent, Target, Reader, _PostProps) -> - {ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}. +start_link(Parent, _Target, Reader, _PostProps) -> + {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}. -writer_loop(Parent, Reader, Target) -> +writer_loop(Parent, Reader) -> case couch_rep_reader:next(Reader) of {complete, nil} -> ok; @@ -28,6 +28,7 @@ writer_loop(Parent, Reader, Target) -> ok; {HighSeq, Docs} -> DocCount = length(Docs), + {ok, Target} = gen_server:call(Parent, get_target_db, infinity), try write_docs(Target, Docs) of {ok, []} -> Parent ! {update_stats, docs_written, DocCount}; @@ -48,7 +49,7 @@ writer_loop(Parent, Reader, Target) -> end, couch_rep_att:cleanup(), couch_util:should_flush(), - writer_loop(Parent, Reader, Target) + writer_loop(Parent, Reader) end. write_docs(#http_db{} = Db, Docs) -> diff --git a/test/etap/112-replication-missing-revs.t b/test/etap/112-replication-missing-revs.t index 750334b9..71971088 100755 --- a/test/etap/112-replication-missing-revs.t +++ b/test/etap/112-replication-missing-revs.t @@ -188,8 +188,21 @@ start_changes_feed(remote, Since, Continuous) -> Db = #http_db{url = "http://127.0.0.1:5984/etap-test-source/"}, couch_rep_changes_feed:start_link(self(), Db, Since, Props). +couch_rep_pid(Db) -> + spawn(fun() -> couch_rep_pid_loop(Db) end). + +couch_rep_pid_loop(Db) -> + receive + {'$gen_call', From, get_target_db} -> + gen_server:reply(From, {ok, Db}) + end, + couch_rep_pid_loop(Db). + start_missing_revs(local, Changes) -> - couch_rep_missing_revs:start_link(self(), get_db(target), Changes, []); + TargetDb = get_db(target), + MainPid = couch_rep_pid(TargetDb), + couch_rep_missing_revs:start_link(MainPid, TargetDb, Changes, []); start_missing_revs(remote, Changes) -> - Db = #http_db{url = "http://127.0.0.1:5984/etap-test-target/"}, - couch_rep_missing_revs:start_link(self(), Db, Changes, []). + TargetDb = #http_db{url = "http://127.0.0.1:5984/etap-test-target/"}, + MainPid = couch_rep_pid(TargetDb), + couch_rep_missing_revs:start_link(MainPid, TargetDb, Changes, []). |