diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_rep_db_listener.erl | 262 |
1 files changed, 141 insertions, 121 deletions
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index 6f1f0443..4e0a929e 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -18,98 +18,113 @@ -include("couch_db.hrl"). --define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id). --define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id). +-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). +-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id). -record(state, { changes_feed_loop = nil, - changes_queue = nil, - changes_processor = nil, - db_notifier = nil + db_notifier = nil, + rep_db_name = nil, + rep_start_pids = [] }). +-import(couch_util, [ + get_value/2, + get_value/3 +]). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). init(_) -> process_flag(trap_exit, true), - {ok, Queue} = couch_work_queue:new( - [{max_size, 1024 * 1024}, {max_items, 1000}]), - {ok, Processor} = changes_processor(Queue), - {ok, Loop} = changes_feed_loop(Queue), + ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]), + ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]), Server = self(), ok = couch_config:register( - fun("replicator", "db") -> - ok = gen_server:cast(Server, rep_db_changed) + fun("replicator", "db", NewName) -> + ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)}) end ), + {Loop, RepDbName} = changes_feed_loop(), {ok, #state{ changes_feed_loop = Loop, - changes_queue = Queue, - changes_processor = Processor, + rep_db_name = RepDbName, db_notifier = db_update_notifier()} }. +handle_call({rep_db_update, Change}, _From, State) -> + {reply, ok, process_update(State, Change)}; + handle_call(Msg, From, State) -> ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p", [Msg, From]), {stop, {error, {unexpected_call, Msg}}, State}. -handle_cast(rep_db_changed, State) -> - #state{ - changes_feed_loop = Loop, - changes_queue = Queue - } = State, - catch unlink(Loop), - catch exit(Loop, rep_db_changed), - couch_work_queue:queue(Queue, stop_all_replications), - {ok, NewLoop} = changes_feed_loop(Queue), - {noreply, State#state{changes_feed_loop = NewLoop}}; - -handle_cast(rep_db_created, #state{changes_feed_loop = Loop} = State) -> - catch unlink(Loop), - catch exit(Loop, rep_db_changed), - {ok, NewLoop} = changes_feed_loop(State#state.changes_queue), - {noreply, State#state{changes_feed_loop = NewLoop}}; +handle_cast({rep_db_changed, NewName}, + #state{rep_db_name = NewName} = State) -> + {noreply, State}; + +handle_cast({rep_db_changed, _NewName}, State) -> + {noreply, restart(State)}; + +handle_cast({rep_db_created, NewName}, + #state{rep_db_name = NewName} = State) -> + {noreply, State}; + +handle_cast({rep_db_created, _NewName}, State) -> + {noreply, restart(State)}; handle_cast(Msg, State) -> ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]), {stop, {error, {unexpected_cast, Msg}}, State}. + handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) -> % replicator DB deleted - couch_work_queue:queue(State#state.changes_queue, stop_all_replications), - {noreply, State#state{changes_feed_loop = nil}}; + {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}}; handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) -> ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]), {stop, {db_update_notifier_died, Reason}, State}; -handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) -> - ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]), - {stop, {rep_db_changes_processor_died, Reason}, State}. +handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> + % one of the replication start processes terminated successfully + {noreply, State#state{rep_start_pids = Pids -- [From]}}; + +handle_info(Msg, State) -> + ?LOG_ERROR("Replicator DB listener received unexpected message ~p", [Msg]), + {stop, {unexpected_msg, Msg}, State}. terminate(_Reason, State) -> #state{ + rep_start_pids = StartPids, changes_feed_loop = Loop, - changes_queue = Queue + db_notifier = Notifier } = State, - exit(Loop, stop), - % closing the queue will cause changes_processor to shutdown - couch_work_queue:close(Queue), - ok. + stop_all_replications(), + lists:foreach( + fun(Pid) -> + catch unlink(Pid), + catch exit(Pid, stop) + end, + [Loop | StartPids]), + true = ets:delete(?REP_ID_TO_DOC_ID), + true = ets:delete(?DOC_ID_TO_REP_ID), + couch_db_update_notifier:stop(Notifier). code_change(_OldVsn, State, _Extra) -> {ok, State}. -changes_feed_loop(ChangesQueue) -> +changes_feed_loop() -> {ok, RepDb} = couch_rep:ensure_rep_db_exists(), + Server = self(), Pid = spawn_link( fun() -> ChangesFeedFun = couch_changes:handle_changes( @@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) -> fun({change, Change, _}, _) -> case has_valid_rep_id(Change) of true -> - couch_work_queue:queue(ChangesQueue, Change); + ok = gen_server:call( + Server, {rep_db_update, Change}, infinity); false -> ok end; @@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) -> end ), couch_db:close(RepDb), - {ok, Pid}. + {Pid, couch_db:name(RepDb)}. + + +has_valid_rep_id({Change}) -> + has_valid_rep_id(get_value(<<"id">>, Change)); +has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> + false; +has_valid_rep_id(_Else) -> + true. db_update_notifier() -> @@ -146,121 +170,106 @@ db_update_notifier() -> fun({created, DbName}) -> case ?l2b(couch_config:get("replicator", "db", "_replicator")) of DbName -> - ok = gen_server:cast(Server, rep_db_created); + ok = gen_server:cast(Server, {rep_db_created, DbName}); _ -> ok end; (_) -> + % no need to handle the 'deleted' event - the changes feed loop + % dies when the database is deleted ok end ), Notifier. -changes_processor(ChangesQueue) -> - Pid = spawn_link( - fun() -> - ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]), - ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]), - consume_changes(ChangesQueue), - true = ets:delete(?REP_ID_TO_DOC_ID_MAP), - true = ets:delete(?DOC_TO_REP_ID_MAP) - end - ), - {ok, Pid}. - - -consume_changes(ChangesQueue) -> - case couch_work_queue:dequeue(ChangesQueue) of - closed -> - ok; - {ok, Changes} -> - lists:foreach(fun process_change/1, Changes), - consume_changes(ChangesQueue) - end. - - -has_valid_rep_id({Change}) -> - has_valid_rep_id(couch_util:get_value(<<"id">>, Change)); -has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> - false; -has_valid_rep_id(_Else) -> - true. +restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) -> + stop_all_replications(), + lists:foreach( + fun(Pid) -> + catch unlink(Pid), + catch exit(Pid, rep_db_changed) + end, + [Loop | StartPids]), + {NewLoop, NewRepDbName} = changes_feed_loop(), + State#state{ + changes_feed_loop = NewLoop, + rep_db_name = NewRepDbName, + rep_start_pids = [] + }. -process_change(stop_all_replications) -> - ?LOG_INFO("Stopping all ongoing replications because the replicator DB " - "was deleted or changed", []), - stop_all_replications(); -process_change({Change}) -> - {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change), - DocId = couch_util:get_value(<<"_id">>, RepProps), - case couch_util:get_value(<<"deleted">>, Change, false) of +process_update(State, {Change}) -> + {RepProps} = JsonRepDoc = get_value(doc, Change), + DocId = get_value(<<"_id">>, RepProps), + case get_value(<<"deleted">>, Change, false) of true -> - rep_doc_deleted(DocId); + rep_doc_deleted(DocId), + State; false -> - case couch_util:get_value(<<"_replication_state">>, RepProps) of + case get_value(<<"_replication_state">>, RepProps) of <<"completed">> -> - replication_complete(DocId); + replication_complete(DocId), + State; <<"error">> -> - stop_replication(DocId); + stop_replication(DocId), + State; <<"triggered">> -> - maybe_start_replication(DocId, JsonRepDoc); + maybe_start_replication(State, DocId, JsonRepDoc); undefined -> - maybe_start_replication(DocId, JsonRepDoc); - _ -> - ?LOG_ERROR("Invalid value for the `_replication_state` property" - " of the replication document `~s`", [DocId]) + maybe_start_replication(State, DocId, JsonRepDoc) end - end, - ok. + end. rep_user_ctx({RepDoc}) -> - case couch_util:get_value(<<"user_ctx">>, RepDoc) of + case get_value(<<"user_ctx">>, RepDoc) of undefined -> #user_ctx{roles = [<<"_admin">>]}; {UserCtx} -> #user_ctx{ - name = couch_util:get_value(<<"name">>, UserCtx, null), - roles = couch_util:get_value(<<"roles">>, UserCtx, []) + name = get_value(<<"name">>, UserCtx, null), + roles = get_value(<<"roles">>, UserCtx, []) } end. -maybe_start_replication(DocId, JsonRepDoc) -> +maybe_start_replication(State, DocId, JsonRepDoc) -> UserCtx = rep_user_ctx(JsonRepDoc), {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), - case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of + case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of [] -> - true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}), - true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}), - spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end); + true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), + true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), + Pid = spawn_link(fun() -> + start_replication(JsonRepDoc, RepId, UserCtx) + end), + State#state{rep_start_pids = [Pid | State#state.rep_start_pids]}; [{BaseId, DocId}] -> - ok; + State; [{BaseId, OtherDocId}] -> - maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId) + ?LOG_INFO("The replication specified by the document `~s` was already" + " triggered by the document `~s`", [DocId, OtherDocId]), + maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), + State end. -maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) -> - case couch_util:get_value(<<"_replication_id">>, Props) of +maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> + case get_value(<<"_replication_id">>, Props) of RepId -> ok; _ -> - ?LOG_INFO("The replication specified by the document `~s` was already" - " triggered by the document `~s`", [DocId, OtherDocId]), couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}]) end. - -start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) -> +start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) -> case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of - RepPid when is_pid(RepPid) -> + Pid when is_pid(Pid) -> ?LOG_INFO("Document `~s` triggered replication `~s`", - [couch_util:get_value(<<"_id">>, RepProps), Base ++ Ext]), - couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx); + [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), + couch_rep:get_result(Pid, RepId, RepDoc, UserCtx); Error -> couch_rep:update_rep_doc( RepDoc, @@ -269,43 +278,54 @@ start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) -> {<<"_replication_id">>, ?l2b(Base)} ] ), - ?LOG_ERROR("Error starting replication `~s`: ~p", [Base ++ Ext, Error]) + ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error]) end. + rep_doc_deleted(DocId) -> case stop_replication(DocId) of - {ok, {Base, Ext}} -> + {ok, RepId} -> ?LOG_INFO("Stopped replication `~s` because replication document `~s`" - " was deleted", [Base ++ Ext, DocId]); + " was deleted", [pp_rep_id(RepId), DocId]); none -> ok end. + replication_complete(DocId) -> case stop_replication(DocId) of - {ok, {Base, Ext}} -> + {ok, RepId} -> ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", - [Base ++ Ext, DocId]); + [pp_rep_id(RepId), DocId]); none -> ok end. + stop_replication(DocId) -> - case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of + case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of [{DocId, {BaseId, _} = RepId}] -> couch_rep:end_replication(RepId), - true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId), - true = ets:delete(?DOC_TO_REP_ID_MAP, DocId), + true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), + true = ets:delete(?DOC_ID_TO_REP_ID, DocId), {ok, RepId}; [] -> none end. + stop_all_replications() -> + ?LOG_INFO("Stopping all ongoing replications because the replicator DB " + "was deleted or changed", []), ets:foldl( fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end, ok, - ?DOC_TO_REP_ID_MAP + ?DOC_ID_TO_REP_ID ), - true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP), - true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP). + true = ets:delete_all_objects(?REP_ID_TO_DOC_ID), + true = ets:delete_all_objects(?DOC_ID_TO_REP_ID). + + +% pretty-print replication id +pp_rep_id({Base, Extension}) -> + Base ++ Extension. |