diff options
-rw-r--r-- | src/couchdb/couch_rep.erl | 34 | ||||
-rw-r--r-- | src/couchdb/couch_rep_missing_revs.erl | 17 | ||||
-rw-r--r-- | src/couchdb/couch_rep_writer.erl | 10 |
3 files changed, 46 insertions, 15 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index c2f691eb..14ecdf53 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -27,6 +27,7 @@ source, target, + continuous, init_args, checkpoint_scheduled = nil, @@ -59,7 +60,7 @@ replicate({Props}=PostBody, UserCtx) -> {BaseId, Extension} = make_replication_id(PostBody, UserCtx), Replicator = {BaseId ++ Extension, {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, - transient, + temporary, 1, worker, [?MODULE] @@ -100,6 +101,8 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> SourceProps = proplists:get_value(<<"source">>, PostProps), TargetProps = proplists:get_value(<<"target">>, PostProps), + Continuous = proplists:get_value(<<"continuous">>, PostProps, false), + Source = open_db(SourceProps, UserCtx), Target = open_db(TargetProps, UserCtx), @@ -139,6 +142,7 @@ do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> source = Source, target = Target, + continuous = Continuous, init_args = InitArgs, stats = Stats, checkpoint_scheduled = nil, @@ -182,6 +186,11 @@ handle_info({update_stats, Key, N}, State) -> ets:update_counter(State#state.stats, Key, N), {noreply, State}; +handle_info({'DOWN', _, _, _, _}, State) -> + ?LOG_INFO("replication terminating because local DB is shutting down", []), + timer:cancel(State#state.checkpoint_scheduled), + {stop, shutdown, State}; + handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) -> case State#state.listeners of [] -> @@ -192,8 +201,12 @@ handle_info({'EXIT', Writer, normal}, #state{writer=Writer} = State) -> handle_info({'EXIT', _, normal}, State) -> {noreply, State}; -handle_info({'EXIT', Pid, Reason}, State) -> - ?LOG_ERROR("exit of linked Pid ~p with reason ~p", [Pid, Reason]), +handle_info({'EXIT', _Pid, {Err, Reason}}, State) when Err == source_error; + Err == target_error -> + ?LOG_INFO("replication terminating due to ~p: ~p", [Err, Reason]), + timer:cancel(State#state.checkpoint_scheduled), + {stop, shutdown, State}; +handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. terminate(normal, #state{checkpoint_scheduled=nil} = State) -> @@ -316,6 +329,7 @@ do_terminate(State) -> listeners = Listeners, source = Source, target = Target, + continuous = Continuous, stats = Stats, source_log = #doc{body={OldHistory}} } = State, @@ -331,8 +345,14 @@ do_terminate(State) -> end, %% reply to original requester - [Original|OtherListeners] = lists:reverse(Listeners), - gen_server:reply(Original, {ok, NewRepHistory}), + OtherListeners = case Continuous of + true -> + []; % continuous replications have no listeners + _ -> + [Original|Rest] = lists:reverse(Listeners), + gen_server:reply(Original, {ok, NewRepHistory}), + Rest + end, %% maybe trigger another replication. If this replicator uses a local %% source Db, changes to that Db since we started will not be included in @@ -430,7 +450,9 @@ open_db(<<"https://",_/binary>>=Url, _) -> open_db({[{<<"url">>,Url}]}, []); open_db(<<DbName/binary>>, UserCtx) -> case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> Db; + {ok, Db} -> + couch_db:monitor(Db), + Db; {not_found, no_db_file} -> throw({db_not_found, DbName}) end. diff --git a/src/couchdb/couch_rep_missing_revs.erl b/src/couchdb/couch_rep_missing_revs.erl index 59ab30ec..847a00db 100644 --- a/src/couchdb/couch_rep_missing_revs.erl +++ b/src/couchdb/couch_rep_missing_revs.erl @@ -131,8 +131,7 @@ handle_changes_loop_exit(normal, State) -> {noreply, State#state{complete=true, changes_loop=nil}} end; handle_changes_loop_exit(Reason, State) -> - ?LOG_ERROR("changes_loop died with reason ~p", [Reason]), - {stop, changes_loop_died, State#state{changes_loop=nil}}. + {stop, Reason, State#state{changes_loop=nil}}. changes_loop(OurServer, SourceChangesServer, Target) -> case couch_rep_changes_feed:next(SourceChangesServer) of @@ -156,11 +155,15 @@ get_missing_revs(#http_db{}=Target, Changes) -> body = {IdRevsList} }, {Resp} = couch_rep_httpc:request(Request), - {MissingRevs} = proplists:get_value(<<"missing_revs">>, Resp), - X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} || - {Id,RevStrs} <- MissingRevs], - {HighSeq, X}; - + case proplists:get_value(<<"missing_revs">>, Resp) of + {MissingRevs} -> + X = [{Id, dict:fetch(Id, SeqDict), couch_doc:parse_revs(RevStrs)} || + {Id,RevStrs} <- MissingRevs], + {HighSeq, X}; + _ -> + exit({target_error, proplists:get_value(<<"error">>, Resp)}) + end; + get_missing_revs(Target, Changes) -> Transform = fun({[{<<"seq">>,_}, {<<"id">>,Id}, {<<"changes">>,C}]}) -> {Id, [R || {[{<<"rev">>, R}]} <- C]} end, diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index d88c3ee2..ffe9768f 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -46,12 +46,18 @@ writer_loop(Parent, Reader, Target) -> write_docs(#http_db{} = Db, Docs) -> JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], - ErrorsJson = couch_rep_httpc:request(Db#http_db{ + Request = Db#http_db{ resource = "_bulk_docs", method = post, body = {[{new_edits, false}, {docs, JsonDocs}]}, headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers] - }), + }, + ErrorsJson = case couch_rep_httpc:request(Request) of + {FailProps} -> + exit({target_error, proplists:get_value(<<"error">>, FailProps)}); + List when is_list(List) -> + List + end, ErrorsList = lists:map( fun({Props}) -> |