From 8afaff9a115a1d70586b642b23b37019bab205dc Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Sun, 17 May 2009 16:02:39 +0000 Subject: replicator should never hang when attachment receiver dies git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@775685 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep.erl | 48 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 14 deletions(-) (limited to 'src/couchdb/couch_rep.erl') diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 399f41e9..7aa491ae 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -211,13 +211,19 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) case should_flush(lists:flatlength([Docs|Buffer])) of true -> Docs2 = lists:flatten([Docs|Buffer]), - {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes), - dump_update_errors(Errors), - ets:update_counter(Stats, doc_write_failures, length(Errors)), - ets:update_counter(Stats, docs_written, length(Docs2) - - length(Errors)), - {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), - {[], Ctxt}; + try update_docs(Target, Docs2, [], replicated_changes) of + {ok, Errors} -> + dump_update_errors(Errors), + ets:update_counter(Stats, doc_write_failures, length(Errors)), + ets:update_counter(Stats, docs_written, length(Docs2) - + length(Errors)), + {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), + {[], Ctxt} + catch + throw:attachment_write_failed -> + ?LOG_ERROR("attachment request failed during write to disk", []), + exit({internal_server_error, replication_link_failure}) + end; false -> {[Docs | Buffer], Context} end, @@ -272,11 +278,17 @@ terminate(normal, State) -> stats = Stats } = State, - {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes), - dump_update_errors(Errors), - ets:update_counter(Stats, doc_write_failures, length(Errors)), - ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - - length(Errors)), + try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of + {ok, Errors} -> + dump_update_errors(Errors), + ets:update_counter(Stats, doc_write_failures, length(Errors)), + ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - + length(Errors)) + catch + throw:attachment_write_failed -> + ?LOG_ERROR("attachment request failed during final write", []), + exit({internal_server_error, replication_link_failure}) + end, couch_task_status:update("Finishing"), @@ -421,7 +433,10 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> %% wait for headers to ensure that we have a 200 status code %% this is where we follow redirects etc Pid ! {self(), gimme_status}, - receive {Pid, {status, StreamStatus, StreamHeaders}} -> + receive + {'EXIT', Pid, attachment_request_failed} -> + make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries-1); + {Pid, {status, StreamStatus, StreamHeaders}} -> ?LOG_DEBUG("streaming attachment Status ~p Headers ~p", [StreamStatus, StreamHeaders]), @@ -435,7 +450,12 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> %% be the one to actually receive the ibrowse data. {ok, fun() -> Pid ! {self(), gimme_data}, - receive {Pid, Data} -> Data end + receive + {Pid, Data} -> + Data; + {'EXIT', Pid, attachment_request_failed} -> + throw(attachment_write_failed) + end end}; ResponseCode >= 300, ResponseCode < 400 -> % follow the redirect -- cgit v1.2.3