summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/couchdb/couch_rep.erl34
-rw-r--r--src/couchdb/couch_rep_missing_revs.erl17
-rw-r--r--src/couchdb/couch_rep_writer.erl10
3 files changed, 46 insertions, 15 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index c2f691eb..14ecdf53 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -27,6 +27,7 @@
source,
target,
+ continuous,
init_args,
checkpoint_scheduled = nil,
@@ -59,7 +60,7 @@ replicate({Props}=PostBody, UserCtx) ->
{BaseId, Extension} = make_replication_id(PostBody, UserCtx),
Replicator = {BaseId ++ Extension,
{gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
- transient,
+ temporary,
1,
worker,
[?MODULE]
@@ -100,6 +101,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
SourceProps = proplists:get_value(<<"source">>, PostProps),
TargetProps = proplists:get_value(<<"target">>, PostProps),
+ Continuous = proplists:get_value(<<"continuous">>, PostProps, false),
+
Source = open_db(SourceProps, UserCtx),
Target = open_db(TargetProps, UserCtx),
@@ -139,6 +142,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
source = Source,
target = Target,
+ continuous = Continuous,
init_args = InitArgs,
stats = Stats,
checkpoint_scheduled = nil,
@@ -182,6 +186,11 @@ handle_info({update_stats, Key, N}, State) ->
ets:update_counter(State#state.stats, Key, N),
{noreply, State};
+handle_info({'DOWN', _, _, _, _}, State) ->
+ ?LOG_INFO("replication terminating because local DB is shutting down", []),
+ timer:cancel(State#state.checkpoint_scheduled),
+ {stop, shutdown, State};
+
handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
case State#state.listeners of
[] ->
@@ -192,8 +201,12 @@ handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) ->
handle_info({'EXIT', _, normal}, State) ->
{noreply, State};
-handle_info({'EXIT', Pid, Reason}, State) ->
- ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]),
+handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error;
+ Err == target_error ->
+ ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]),
+ timer:cancel(State#state.checkpoint_scheduled),
+ {stop, shutdown, State};
+handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
terminate(normal, #state{checkpoint_scheduled=nil} = State) ->
@@ -316,6 +329,7 @@ do_terminate(State) ->
listeners = Listeners,
source = Source,
target = Target,
+ continuous = Continuous,
stats = Stats,
source_log = #doc{body={OldHistory}}
} = State,
@@ -331,8 +345,14 @@ do_terminate(State) ->
end,
%% reply to original requester
- [Original|OtherListeners] = lists:reverse(Listeners),
- gen_server:reply(Original, {ok, NewRepHistory}),
+ OtherListeners = case Continuous of
+ true ->
+ []; % continuous replications have no listeners
+ _ ->
+ [Original|Rest] = lists:reverse(Listeners),
+ gen_server:reply(Original, {ok, NewRepHistory}),
+ Rest
+ end,
%% maybe trigger another replication. If this replicator uses a local
%% source Db, changes to that Db since we started will not be included in
@@ -430,7 +450,9 @@ open_db(<<"https://",_/binary>>=Url, _) ->
open_db({[{<<"url">>,Url}]}, []);
open_db(<<DbName/binary>>, UserCtx) ->
case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} -> Db;
+ {ok, Db} ->
+ couch_db:monitor(Db),
+ Db;
{not_found, no_db_file} -> throw({db_not_found, DbName})
end.
diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl
index 59ab30ec..847a00db 100644
--- a/src/couchdb/couch_rep_missing_revs.erl
+++ b/src/couchdb/couch_rep_missing_revs.erl
@@ -131,8 +131,7 @@ handle_changes_loop_exit(normal, State) ->
{noreply, State#state{complete=true, changes_loop=nil}}
end;
handle_changes_loop_exit(Reason, State) ->
- ?LOG_ERROR("changes_loop died with reason ~p", [Reason]),
- {stop, changes_loop_died, State#state{changes_loop=nil}}.
+ {stop, Reason, State#state{changes_loop=nil}}.
changes_loop(OurServer, SourceChangesServer, Target) ->
case couch_rep_changes_feed:next(SourceChangesServer) of
@@ -156,11 +155,15 @@ get_missing_revs(#http_db{}=Target, Changes) ->
body = {IdRevsList}
},
{Resp} = couch_rep_httpc:request(Request),
- {MissingRevs} = proplists:get_value(<<"missing_revs">>, Resp),
- X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} ||
- {Id,RevStrs} <- MissingRevs],
- {HighSeq, X};
-
+ case proplists:get_value(<<"missing_revs">>, Resp) of
+ {MissingRevs} ->
+ X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} ||
+ {Id,RevStrs} <- MissingRevs],
+ {HighSeq, X};
+ _ ->
+ exit({target_error, proplists:get_value(<<"error">>, Resp)})
+ end;
+
get_missing_revs(Target, Changes) ->
Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) ->
{Id, [R || {[{<<"rev">>, R}]} <- C]} end,
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index d88c3ee2..ffe9768f 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -46,12 +46,18 @@ writer_loop(Parent, Reader, Target) ->
write_docs(#http_db{} = Db, Docs) ->
JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
- ErrorsJson = couch_rep_httpc:request(Db#http_db{
+ Request = Db#http_db{
resource = "_bulk_docs",
method = post,
body = {[{new_edits, false}, {docs, JsonDocs}]},
headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
- }),
+ },
+ ErrorsJson = case couch_rep_httpc:request(Request) of
+ {FailProps} ->
+ exit({target_error, proplists:get_value(<<"error">>, FailProps)});
+ List when is_list(List) ->
+ List
+ end,
ErrorsList =
lists:map(
fun({Props}) ->