diff options
author | Adam Kocoloski <kocolosk@apache.org> | 2009-05-17 16:02:39 +0000 |
---|---|---|
committer | Adam Kocoloski <kocolosk@apache.org> | 2009-05-17 16:02:39 +0000 |
commit | 8afaff9a115a1d70586b642b23b37019bab205dc (patch) | |
tree | b5cf512fc140f5ca1d7dace3b60d8cd87ffac86d /src/couchdb/couch_rep.erl | |
parent | fea7af460e91492224ce15356358ddf376c25e86 (diff) |
replicator should never hang when attachment receiver dies
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@775685 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_rep.erl')
-rw-r--r-- | src/couchdb/couch_rep.erl | 48 |
1 files changed, 34 insertions, 14 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 399f41e9..7aa491ae 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -211,13 +211,19 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) case should_flush(lists:flatlength([Docs|Buffer])) of true -> Docs2 = lists:flatten([Docs|Buffer]), - {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes), - dump_update_errors(Errors), - ets:update_counter(Stats, doc_write_failures, length(Errors)), - ets:update_counter(Stats, docs_written, length(Docs2) - - length(Errors)), - {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), - {[], Ctxt}; + try update_docs(Target, Docs2, [], replicated_changes) of + {ok, Errors} -> + dump_update_errors(Errors), + ets:update_counter(Stats, doc_write_failures, length(Errors)), + ets:update_counter(Stats, docs_written, length(Docs2) - + length(Errors)), + {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats), + {[], Ctxt} + catch + throw:attachment_write_failed -> + ?LOG_ERROR("attachment request failed during write to disk", []), + exit({internal_server_error, replication_link_failure}) + end; false -> {[Docs | Buffer], Context} end, @@ -272,11 +278,17 @@ terminate(normal, State) -> stats = Stats } = State, - {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes), - dump_update_errors(Errors), - ets:update_counter(Stats, doc_write_failures, length(Errors)), - ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - - length(Errors)), + try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of + {ok, Errors} -> + dump_update_errors(Errors), + ets:update_counter(Stats, doc_write_failures, length(Errors)), + ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) - + length(Errors)) + catch + throw:attachment_write_failed -> + ?LOG_ERROR("attachment request failed during final write", []), + exit({internal_server_error, replication_link_failure}) + end, couch_task_status:update("Finishing"), @@ -421,7 +433,10 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> %% wait for headers to ensure that we have a 200 status code %% this is where we follow redirects etc Pid ! {self(), gimme_status}, - receive {Pid, {status, StreamStatus, StreamHeaders}} -> + receive + {'EXIT', Pid, attachment_request_failed} -> + make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries-1); + {Pid, {status, StreamStatus, StreamHeaders}} -> ?LOG_DEBUG("streaming attachment Status ~p Headers ~p", [StreamStatus, StreamHeaders]), @@ -435,7 +450,12 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> %% be the one to actually receive the ibrowse data. {ok, fun() -> Pid ! {self(), gimme_data}, - receive {Pid, Data} -> Data end + receive + {Pid, Data} -> + Data; + {'EXIT', Pid, attachment_request_failed} -> + throw(attachment_write_failed) + end end}; ResponseCode >= 300, ResponseCode < 400 -> % follow the redirect |