summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2011-01-24 13:48:28 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2011-01-24 13:48:28 +0000
commite434d8ae3f605a5e122c8e13c29caae27ae61ce1 (patch)
treee78c8a0dd857a97291f00aafb550b818bf19d0a2 /src
parent9ce71f36273bad1f48d0b117799ad8d682451258 (diff)
Merge revision 1062772 from trunk
Refactoring of the replicator database listener Simpler implementation and more reliable behaviour when the replicator database is deleted or changed on the fly. git-svn-id: https://svn.apache.org/repos/asf/couchdb/branches/1.1.x@1062773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_rep_db_listener.erl262
1 files changed, 141 insertions, 121 deletions
diff --git a/src/couchdb/couch_rep_db_listener.erl b/src/couchdb/couch_rep_db_listener.erl
index 6f1f0443..4e0a929e 100644
--- a/src/couchdb/couch_rep_db_listener.erl
+++ b/src/couchdb/couch_rep_db_listener.erl
@@ -18,98 +18,113 @@
-include("couch_db.hrl").
--define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id).
--define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
+-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
+-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
-record(state, {
changes_feed_loop = nil,
- changes_queue = nil,
- changes_processor = nil,
- db_notifier = nil
+ db_notifier = nil,
+ rep_db_name = nil,
+ rep_start_pids = []
}).
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_) ->
process_flag(trap_exit, true),
- {ok, Queue} = couch_work_queue:new(
- [{max_size, 1024 * 1024}, {max_items, 1000}]),
- {ok, Processor} = changes_processor(Queue),
- {ok, Loop} = changes_feed_loop(Queue),
+ ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]),
+ ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
Server = self(),
ok = couch_config:register(
- fun("replicator", "db") ->
- ok = gen_server:cast(Server, rep_db_changed)
+ fun("replicator", "db", NewName) ->
+ ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)})
end
),
+ {Loop, RepDbName} = changes_feed_loop(),
{ok, #state{
changes_feed_loop = Loop,
- changes_queue = Queue,
- changes_processor = Processor,
+ rep_db_name = RepDbName,
db_notifier = db_update_notifier()}
}.
+handle_call({rep_db_update, Change}, _From, State) ->
+ {reply, ok, process_update(State, Change)};
+
handle_call(Msg, From, State) ->
?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
[Msg, From]),
{stop, {error, {unexpected_call, Msg}}, State}.
-handle_cast(rep_db_changed, State) ->
- #state{
- changes_feed_loop = Loop,
- changes_queue = Queue
- } = State,
- catch unlink(Loop),
- catch exit(Loop, rep_db_changed),
- couch_work_queue:queue(Queue, stop_all_replications),
- {ok, NewLoop} = changes_feed_loop(Queue),
- {noreply, State#state{changes_feed_loop = NewLoop}};
-
-handle_cast(rep_db_created, #state{changes_feed_loop = Loop} = State) ->
- catch unlink(Loop),
- catch exit(Loop, rep_db_changed),
- {ok, NewLoop} = changes_feed_loop(State#state.changes_queue),
- {noreply, State#state{changes_feed_loop = NewLoop}};
+handle_cast({rep_db_changed, NewName},
+ #state{rep_db_name = NewName} = State) ->
+ {noreply, State};
+
+handle_cast({rep_db_changed, _NewName}, State) ->
+ {noreply, restart(State)};
+
+handle_cast({rep_db_created, NewName},
+ #state{rep_db_name = NewName} = State) ->
+ {noreply, State};
+
+handle_cast({rep_db_created, _NewName}, State) ->
+ {noreply, restart(State)};
handle_cast(Msg, State) ->
?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]),
{stop, {error, {unexpected_cast, Msg}}, State}.
+
handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
% replicator DB deleted
- couch_work_queue:queue(State#state.changes_queue, stop_all_replications),
- {noreply, State#state{changes_feed_loop = nil}};
+ {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
{stop, {db_update_notifier_died, Reason}, State};
-handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
- ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
- {stop, {rep_db_changes_processor_died, Reason}, State}.
+handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
+ % one of the replication start processes terminated successfully
+ {noreply, State#state{rep_start_pids = Pids -- [From]}};
+
+handle_info(Msg, State) ->
+ ?LOG_ERROR("Replicator DB listener received unexpected message ~p", [Msg]),
+ {stop, {unexpected_msg, Msg}, State}.
terminate(_Reason, State) ->
#state{
+ rep_start_pids = StartPids,
changes_feed_loop = Loop,
- changes_queue = Queue
+ db_notifier = Notifier
} = State,
- exit(Loop, stop),
- % closing the queue will cause changes_processor to shutdown
- couch_work_queue:close(Queue),
- ok.
+ stop_all_replications(),
+ lists:foreach(
+ fun(Pid) ->
+ catch unlink(Pid),
+ catch exit(Pid, stop)
+ end,
+ [Loop | StartPids]),
+ true = ets:delete(?REP_ID_TO_DOC_ID),
+ true = ets:delete(?DOC_ID_TO_REP_ID),
+ couch_db_update_notifier:stop(Notifier).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-changes_feed_loop(ChangesQueue) ->
+changes_feed_loop() ->
{ok, RepDb} = couch_rep:ensure_rep_db_exists(),
+ Server = self(),
Pid = spawn_link(
fun() ->
ChangesFeedFun = couch_changes:handle_changes(
@@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) ->
fun({change, Change, _}, _) ->
case has_valid_rep_id(Change) of
true ->
- couch_work_queue:queue(ChangesQueue, Change);
+ ok = gen_server:call(
+ Server, {rep_db_update, Change}, infinity);
false ->
ok
end;
@@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) ->
end
),
couch_db:close(RepDb),
- {ok, Pid}.
+ {Pid, couch_db:name(RepDb)}.
+
+
+has_valid_rep_id({Change}) ->
+ has_valid_rep_id(get_value(<<"id">>, Change));
+has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
+ false;
+has_valid_rep_id(_Else) ->
+ true.
db_update_notifier() ->
@@ -146,121 +170,106 @@ db_update_notifier() ->
fun({created, DbName}) ->
case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
DbName ->
- ok = gen_server:cast(Server, rep_db_created);
+ ok = gen_server:cast(Server, {rep_db_created, DbName});
_ ->
ok
end;
(_) ->
+ % no need to handle the 'deleted' event - the changes feed loop
+ % dies when the database is deleted
ok
end
),
Notifier.
-changes_processor(ChangesQueue) ->
- Pid = spawn_link(
- fun() ->
- ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]),
- ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]),
- consume_changes(ChangesQueue),
- true = ets:delete(?REP_ID_TO_DOC_ID_MAP),
- true = ets:delete(?DOC_TO_REP_ID_MAP)
- end
- ),
- {ok, Pid}.
-
-
-consume_changes(ChangesQueue) ->
- case couch_work_queue:dequeue(ChangesQueue) of
- closed ->
- ok;
- {ok, Changes} ->
- lists:foreach(fun process_change/1, Changes),
- consume_changes(ChangesQueue)
- end.
-
-
-has_valid_rep_id({Change}) ->
- has_valid_rep_id(couch_util:get_value(<<"id">>, Change));
-has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
- false;
-has_valid_rep_id(_Else) ->
- true.
+restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
+ stop_all_replications(),
+ lists:foreach(
+ fun(Pid) ->
+ catch unlink(Pid),
+ catch exit(Pid, rep_db_changed)
+ end,
+ [Loop | StartPids]),
+ {NewLoop, NewRepDbName} = changes_feed_loop(),
+ State#state{
+ changes_feed_loop = NewLoop,
+ rep_db_name = NewRepDbName,
+ rep_start_pids = []
+ }.
-process_change(stop_all_replications) ->
- ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
- "was deleted or changed", []),
- stop_all_replications();
-process_change({Change}) ->
- {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
- DocId = couch_util:get_value(<<"_id">>, RepProps),
- case couch_util:get_value(<<"deleted">>, Change, false) of
+process_update(State, {Change}) ->
+ {RepProps} = JsonRepDoc = get_value(doc, Change),
+ DocId = get_value(<<"_id">>, RepProps),
+ case get_value(<<"deleted">>, Change, false) of
true ->
- rep_doc_deleted(DocId);
+ rep_doc_deleted(DocId),
+ State;
false ->
- case couch_util:get_value(<<"_replication_state">>, RepProps) of
+ case get_value(<<"_replication_state">>, RepProps) of
<<"completed">> ->
- replication_complete(DocId);
+ replication_complete(DocId),
+ State;
<<"error">> ->
- stop_replication(DocId);
+ stop_replication(DocId),
+ State;
<<"triggered">> ->
- maybe_start_replication(DocId, JsonRepDoc);
+ maybe_start_replication(State, DocId, JsonRepDoc);
undefined ->
- maybe_start_replication(DocId, JsonRepDoc);
- _ ->
- ?LOG_ERROR("Invalid value for the `_replication_state` property"
- " of the replication document `~s`", [DocId])
+ maybe_start_replication(State, DocId, JsonRepDoc)
end
- end,
- ok.
+ end.
rep_user_ctx({RepDoc}) ->
- case couch_util:get_value(<<"user_ctx">>, RepDoc) of
+ case get_value(<<"user_ctx">>, RepDoc) of
undefined ->
#user_ctx{roles = [<<"_admin">>]};
{UserCtx} ->
#user_ctx{
- name = couch_util:get_value(<<"name">>, UserCtx, null),
- roles = couch_util:get_value(<<"roles">>, UserCtx, [])
+ name = get_value(<<"name">>, UserCtx, null),
+ roles = get_value(<<"roles">>, UserCtx, [])
}
end.
-maybe_start_replication(DocId, JsonRepDoc) ->
+maybe_start_replication(State, DocId, JsonRepDoc) ->
UserCtx = rep_user_ctx(JsonRepDoc),
{BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
- case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of
+ case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
[] ->
- true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}),
- true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
- spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
+ true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
+ true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
+ Pid = spawn_link(fun() ->
+ start_replication(JsonRepDoc, RepId, UserCtx)
+ end),
+ State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
[{BaseId, DocId}] ->
- ok;
+ State;
[{BaseId, OtherDocId}] ->
- maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId)
+ ?LOG_INFO("The replication specified by the document `~s` was already"
+ " triggered by the document `~s`", [DocId, OtherDocId]),
+ maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
+ State
end.
-maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) ->
- case couch_util:get_value(<<"_replication_id">>, Props) of
+maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
+ case get_value(<<"_replication_id">>, Props) of
RepId ->
ok;
_ ->
- ?LOG_INFO("The replication specified by the document `~s` was already"
- " triggered by the document `~s`", [DocId, OtherDocId]),
couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
end.
-
-start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
+start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
- RepPid when is_pid(RepPid) ->
+ Pid when is_pid(Pid) ->
?LOG_INFO("Document `~s` triggered replication `~s`",
- [couch_util:get_value(<<"_id">>, RepProps), Base ++ Ext]),
- couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx);
+ [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
+ couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
Error ->
couch_rep:update_rep_doc(
RepDoc,
@@ -269,43 +278,54 @@ start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
{<<"_replication_id">>, ?l2b(Base)}
]
),
- ?LOG_ERROR("Error starting replication `~s`: ~p", [Base ++ Ext, Error])
+ ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
end.
+
rep_doc_deleted(DocId) ->
case stop_replication(DocId) of
- {ok, {Base, Ext}} ->
+ {ok, RepId} ->
?LOG_INFO("Stopped replication `~s` because replication document `~s`"
- " was deleted", [Base ++ Ext, DocId]);
+ " was deleted", [pp_rep_id(RepId), DocId]);
none ->
ok
end.
+
replication_complete(DocId) ->
case stop_replication(DocId) of
- {ok, {Base, Ext}} ->
+ {ok, RepId} ->
?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
- [Base ++ Ext, DocId]);
+ [pp_rep_id(RepId), DocId]);
none ->
ok
end.
+
stop_replication(DocId) ->
- case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
+ case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
[{DocId, {BaseId, _} = RepId}] ->
couch_rep:end_replication(RepId),
- true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId),
- true = ets:delete(?DOC_TO_REP_ID_MAP, DocId),
+ true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
+ true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
{ok, RepId};
[] ->
none
end.
+
stop_all_replications() ->
+ ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
+ "was deleted or changed", []),
ets:foldl(
fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
ok,
- ?DOC_TO_REP_ID_MAP
+ ?DOC_ID_TO_REP_ID
),
- true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP),
- true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP).
+ true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
+ true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
+
+
+% pretty-print replication id
+pp_rep_id({Base, Extension}) ->
+ Base ++ Extension.