diff options
| author | Filipe David Borba Manana <fdmanana@apache.org> | 2011-01-24 13:48:28 +0000 | 
|---|---|---|
| committer | Filipe David Borba Manana <fdmanana@apache.org> | 2011-01-24 13:48:28 +0000 | 
| commit | e434d8ae3f605a5e122c8e13c29caae27ae61ce1 (patch) | |
| tree | e78c8a0dd857a97291f00aafb550b818bf19d0a2 | |
| parent | 9ce71f36273bad1f48d0b117799ad8d682451258 (diff) | |
Merge revision 1062772 from trunk
Refactoring of the replicator database listener
Simpler implementation and more reliable behaviour when the replicator
database is deleted or changed on the fly.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1062773 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | src/couchdb/couch_rep_db_listener.erl | 262 | 
1 files changed, 141 insertions, 121 deletions
| diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl index 6f1f0443..4e0a929e 100644 --- a/src/couchdb/couch_rep_db_listener.erl +++ b/src/couchdb/couch_rep_db_listener.erl @@ -18,98 +18,113 @@  -include("couch_db.hrl"). --define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id). --define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id). +-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id). +-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).  -record(state, {      changes_feed_loop = nil, -    changes_queue = nil, -    changes_processor = nil, -    db_notifier = nil +    db_notifier = nil, +    rep_db_name = nil, +    rep_start_pids = []  }). +-import(couch_util, [ +    get_value/2, +    get_value/3 +]). +  start_link() ->      gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).  init(_) ->      process_flag(trap_exit, true), -    {ok, Queue} = couch_work_queue:new( -        [{max_size, 1024 * 1024}, {max_items, 1000}]), -    {ok, Processor} = changes_processor(Queue), -    {ok, Loop} = changes_feed_loop(Queue), +    ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]), +    ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),      Server = self(),      ok = couch_config:register( -        fun("replicator", "db") -> -            ok = gen_server:cast(Server, rep_db_changed) +        fun("replicator", "db", NewName) -> +            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)})          end      ), +    {Loop, RepDbName} = changes_feed_loop(),      {ok, #state{          changes_feed_loop = Loop, -        changes_queue = Queue, -        changes_processor = Processor, +        rep_db_name = RepDbName,          db_notifier = db_update_notifier()}      }. +handle_call({rep_db_update, Change}, _From, State) -> +    {reply, ok, process_update(State, Change)}; +  handle_call(Msg, From, State) ->      ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",          [Msg, From]),      {stop, {error, {unexpected_call, Msg}}, State}. -handle_cast(rep_db_changed, State) -> -    #state{ -        changes_feed_loop = Loop, -        changes_queue = Queue -    } = State, -    catch unlink(Loop), -    catch exit(Loop, rep_db_changed), -    couch_work_queue:queue(Queue, stop_all_replications), -    {ok, NewLoop} = changes_feed_loop(Queue), -    {noreply, State#state{changes_feed_loop = NewLoop}}; - -handle_cast(rep_db_created, #state{changes_feed_loop = Loop} = State) -> -    catch unlink(Loop), -    catch exit(Loop, rep_db_changed), -    {ok, NewLoop} = changes_feed_loop(State#state.changes_queue), -    {noreply, State#state{changes_feed_loop = NewLoop}}; +handle_cast({rep_db_changed, NewName}, +        #state{rep_db_name = NewName} = State) -> +    {noreply, State}; + +handle_cast({rep_db_changed, _NewName}, State) -> +    {noreply, restart(State)}; + +handle_cast({rep_db_created, NewName}, +        #state{rep_db_name = NewName} = State) -> +    {noreply, State}; + +handle_cast({rep_db_created, _NewName}, State) -> +    {noreply, restart(State)};  handle_cast(Msg, State) ->      ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]),      {stop, {error, {unexpected_cast, Msg}}, State}. +  handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->      % replicator DB deleted -    couch_work_queue:queue(State#state.changes_queue, stop_all_replications), -    {noreply, State#state{changes_feed_loop = nil}}; +    {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};  handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->      ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),      {stop, {db_update_notifier_died, Reason}, State}; -handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) -> -    ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]), -    {stop, {rep_db_changes_processor_died, Reason}, State}. +handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) -> +    % one of the replication start processes terminated successfully +    {noreply, State#state{rep_start_pids = Pids -- [From]}}; + +handle_info(Msg, State) -> +    ?LOG_ERROR("Replicator DB listener received unexpected message ~p", [Msg]), +    {stop, {unexpected_msg, Msg}, State}.  terminate(_Reason, State) ->      #state{ +        rep_start_pids = StartPids,          changes_feed_loop = Loop, -        changes_queue = Queue +        db_notifier = Notifier      } = State, -    exit(Loop, stop), -    % closing the queue will cause changes_processor to shutdown -    couch_work_queue:close(Queue), -    ok. +    stop_all_replications(), +    lists:foreach( +        fun(Pid) -> +            catch unlink(Pid), +            catch exit(Pid, stop) +        end, +        [Loop | StartPids]), +    true = ets:delete(?REP_ID_TO_DOC_ID), +    true = ets:delete(?DOC_ID_TO_REP_ID), +    couch_db_update_notifier:stop(Notifier).  code_change(_OldVsn, State, _Extra) ->      {ok, State}. -changes_feed_loop(ChangesQueue) -> +changes_feed_loop() ->      {ok, RepDb} = couch_rep:ensure_rep_db_exists(), +    Server = self(),      Pid = spawn_link(          fun() ->              ChangesFeedFun = couch_changes:handle_changes( @@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) ->                  fun({change, Change, _}, _) ->                      case has_valid_rep_id(Change) of                      true -> -                        couch_work_queue:queue(ChangesQueue, Change); +                        ok = gen_server:call( +                            Server, {rep_db_update, Change}, infinity);                      false ->                          ok                      end; @@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) ->          end      ),      couch_db:close(RepDb), -    {ok, Pid}. +    {Pid, couch_db:name(RepDb)}. + + +has_valid_rep_id({Change}) -> +    has_valid_rep_id(get_value(<<"id">>, Change)); +has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> +    false; +has_valid_rep_id(_Else) -> +    true.  db_update_notifier() -> @@ -146,121 +170,106 @@ db_update_notifier() ->          fun({created, DbName}) ->              case ?l2b(couch_config:get("replicator", "db", "_replicator")) of              DbName -> -                ok = gen_server:cast(Server, rep_db_created); +                ok = gen_server:cast(Server, {rep_db_created, DbName});              _ ->                  ok              end;          (_) -> +            % no need to handle the 'deleted' event - the changes feed loop +            % dies when the database is deleted              ok          end      ),      Notifier. -changes_processor(ChangesQueue) -> -    Pid = spawn_link( -        fun() -> -            ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]), -            ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]), -            consume_changes(ChangesQueue), -            true = ets:delete(?REP_ID_TO_DOC_ID_MAP), -            true = ets:delete(?DOC_TO_REP_ID_MAP) -        end -    ), -    {ok, Pid}. - - -consume_changes(ChangesQueue) -> -    case couch_work_queue:dequeue(ChangesQueue) of -    closed -> -        ok; -    {ok, Changes} -> -        lists:foreach(fun process_change/1, Changes), -        consume_changes(ChangesQueue) -    end. - - -has_valid_rep_id({Change}) -> -    has_valid_rep_id(couch_util:get_value(<<"id">>, Change)); -has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) -> -    false; -has_valid_rep_id(_Else) -> -    true. +restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) -> +    stop_all_replications(), +    lists:foreach( +        fun(Pid) -> +            catch unlink(Pid), +            catch exit(Pid, rep_db_changed) +        end, +        [Loop | StartPids]), +    {NewLoop, NewRepDbName} = changes_feed_loop(), +    State#state{ +        changes_feed_loop = NewLoop, +        rep_db_name = NewRepDbName, +        rep_start_pids = [] +    }. -process_change(stop_all_replications) -> -    ?LOG_INFO("Stopping all ongoing replications because the replicator DB " -        "was deleted or changed", []), -    stop_all_replications(); -process_change({Change}) -> -    {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change), -    DocId = couch_util:get_value(<<"_id">>, RepProps), -    case couch_util:get_value(<<"deleted">>, Change, false) of +process_update(State, {Change}) -> +    {RepProps} = JsonRepDoc = get_value(doc, Change), +    DocId = get_value(<<"_id">>, RepProps), +    case get_value(<<"deleted">>, Change, false) of      true -> -        rep_doc_deleted(DocId); +        rep_doc_deleted(DocId), +        State;      false -> -        case couch_util:get_value(<<"_replication_state">>, RepProps) of +        case get_value(<<"_replication_state">>, RepProps) of          <<"completed">> -> -            replication_complete(DocId); +            replication_complete(DocId), +            State;          <<"error">> -> -            stop_replication(DocId); +            stop_replication(DocId), +            State;          <<"triggered">> -> -            maybe_start_replication(DocId, JsonRepDoc); +            maybe_start_replication(State, DocId, JsonRepDoc);          undefined -> -            maybe_start_replication(DocId, JsonRepDoc); -        _ -> -            ?LOG_ERROR("Invalid value for the `_replication_state` property" -                " of the replication document `~s`", [DocId]) +            maybe_start_replication(State, DocId, JsonRepDoc)          end -    end, -    ok. +    end.  rep_user_ctx({RepDoc}) -> -    case couch_util:get_value(<<"user_ctx">>, RepDoc) of +    case get_value(<<"user_ctx">>, RepDoc) of      undefined ->          #user_ctx{roles = [<<"_admin">>]};      {UserCtx} ->          #user_ctx{ -            name = couch_util:get_value(<<"name">>, UserCtx, null), -            roles = couch_util:get_value(<<"roles">>, UserCtx, []) +            name = get_value(<<"name">>, UserCtx, null), +            roles = get_value(<<"roles">>, UserCtx, [])          }      end. -maybe_start_replication(DocId, JsonRepDoc) -> +maybe_start_replication(State, DocId, JsonRepDoc) ->      UserCtx = rep_user_ctx(JsonRepDoc),      {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx), -    case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of +    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of      [] -> -        true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}), -        true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}), -        spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end); +        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}), +        true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}), +        Pid = spawn_link(fun() -> +            start_replication(JsonRepDoc, RepId, UserCtx) +        end), +        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};      [{BaseId, DocId}] -> -        ok; +        State;      [{BaseId, OtherDocId}] -> -        maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId) +        ?LOG_INFO("The replication specified by the document `~s` was already" +            " triggered by the document `~s`", [DocId, OtherDocId]), +        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)), +        State      end. -maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) -> -    case couch_util:get_value(<<"_replication_id">>, Props) of +maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) -> +    case get_value(<<"_replication_id">>, Props) of      RepId ->          ok;      _ -> -        ?LOG_INFO("The replication specified by the document `~s` was already" -            " triggered by the document `~s`", [DocId, OtherDocId]),          couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])      end. - -start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) -> +start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->      case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of -    RepPid when is_pid(RepPid) -> +    Pid when is_pid(Pid) ->          ?LOG_INFO("Document `~s` triggered replication `~s`", -            [couch_util:get_value(<<"_id">>, RepProps), Base ++ Ext]), -        couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx); +            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]), +        couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);      Error ->          couch_rep:update_rep_doc(              RepDoc, @@ -269,43 +278,54 @@ start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->                  {<<"_replication_id">>, ?l2b(Base)}              ]          ), -        ?LOG_ERROR("Error starting replication `~s`: ~p", [Base ++ Ext, Error]) +        ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])      end. +  rep_doc_deleted(DocId) ->      case stop_replication(DocId) of -    {ok, {Base, Ext}} -> +    {ok, RepId} ->          ?LOG_INFO("Stopped replication `~s` because replication document `~s`" -            " was deleted", [Base ++ Ext, DocId]); +            " was deleted", [pp_rep_id(RepId), DocId]);      none ->          ok      end. +  replication_complete(DocId) ->      case stop_replication(DocId) of -    {ok, {Base, Ext}} -> +    {ok, RepId} ->          ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)", -            [Base ++ Ext, DocId]); +            [pp_rep_id(RepId), DocId]);      none ->          ok      end. +  stop_replication(DocId) -> -    case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of +    case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of      [{DocId, {BaseId, _} = RepId}] ->          couch_rep:end_replication(RepId), -        true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId), -        true = ets:delete(?DOC_TO_REP_ID_MAP, DocId), +        true = ets:delete(?REP_ID_TO_DOC_ID, BaseId), +        true = ets:delete(?DOC_ID_TO_REP_ID, DocId),          {ok, RepId};      [] ->          none      end. +  stop_all_replications() -> +    ?LOG_INFO("Stopping all ongoing replications because the replicator DB " +        "was deleted or changed", []),      ets:foldl(          fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,          ok, -        ?DOC_TO_REP_ID_MAP +        ?DOC_ID_TO_REP_ID      ), -    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP), -    true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP). +    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID), +    true = ets:delete_all_objects(?DOC_ID_TO_REP_ID). + + +% pretty-print replication id +pp_rep_id({Base, Extension}) -> +    Base ++ Extension. | 
