summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_rep.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r--src/couchdb/couch_rep.erl34
1 files changed, 28 insertions, 6 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index c2f691eb..14ecdf53 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -27,6 +27,7 @@
source,
target,
+ continuous,
init_args,
checkpoint_scheduled = nil,
@@ -59,7 +60,7 @@ replicate({Props}=PostBody, UserCtx) ->
{BaseId, Extension} = make_replication_id(PostBody, UserCtx),
Replicator = {BaseId ++ Extension,
{gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
- transient,
+ temporary,
1,
worker,
[?MODULE]
@@ -100,6 +101,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
SourceProps = proplists:get_value(<<"source">>, PostProps),
TargetProps = proplists:get_value(<<"target">>, PostProps),
+ Continuous = proplists:get_value(<<"continuous">>, PostProps, false),
+
Source = open_db(SourceProps, UserCtx),
Target = open_db(TargetProps, UserCtx),
@@ -139,6 +142,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
source = Source,
target = Target,
+ continuous = Continuous,
init_args = InitArgs,
stats = Stats,
checkpoint_scheduled = nil,
@@ -182,6 +186,11 @@ handle_info({update_stats, Key, N}, State) ->
ets:update_counter(State#state.stats, Key, N),
{noreply, State};
+handle_info({'DOWN', _, _, _, _}, State) ->
+ ?LOG_INFO("replication terminating because local DB is shutting down", []),
+ timer:cancel(State#state.checkpoint_scheduled),
+ {stop, shutdown, State};
+
handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
case State#state.listeners of
[] ->
@@ -192,8 +201,12 @@ handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
handle_info({'EXIT', _, normal}, State) ->
{noreply, State};
-handle_info({'EXIT', Pid, Reason}, State) ->
- ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]),
+handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
+ Err == target_error ->
+ ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]),
+ timer:cancel(State#state.checkpoint_scheduled),
+ {stop, shutdown, State};
+handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
@@ -316,6 +329,7 @@ do_terminate(State) ->
listeners = Listeners,
source = Source,
target = Target,
+ continuous = Continuous,
stats = Stats,
source_log = #doc{body={OldHistory}}
} = State,
@@ -331,8 +345,14 @@ do_terminate(State) ->
end,
%% reply to original requester
- [Original|OtherListeners] = lists:reverse(Listeners),
- gen_server:reply(Original, {ok, NewRepHistory}),
+ OtherListeners = case Continuous of
+ true ->
+ []; % continuous replications have no listeners
+ _ ->
+ [Original|Rest] = lists:reverse(Listeners),
+ gen_server:reply(Original, {ok, NewRepHistory}),
+ Rest
+ end,
%% maybe trigger another replication. If this replicator uses a local
%% source Db, changes to that Db since we started will not be included in
@@ -430,7 +450,9 @@ open_db(<<"https://",_/binary>>=Url, _) ->
open_db({[{<<"url">>,Url}]}, []);
open_db(<<DbName/binary>>, UserCtx) ->
case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} -> Db;
+ {ok, Db} ->
+ couch_db:monitor(Db),
+ Db;
{not_found, no_db_file} -> throw({db_not_found, DbName})
end.