summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-05-17 16:02:39 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-05-17 16:02:39 +0000
commit8afaff9a115a1d70586b642b23b37019bab205dc (patch)
treeb5cf512fc140f5ca1d7dace3b60d8cd87ffac86d /src
parentfea7af460e91492224ce15356358ddf376c25e86 (diff)
replicator should never hang when attachment receiver dies
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@775685 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_rep.erl48
1 files changed, 34 insertions, 14 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index 399f41e9..7aa491ae 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -211,13 +211,19 @@ handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State)
case should_flush(lists:flatlength([Docs|Buffer])) of
true ->
Docs2 = lists:flatten([Docs|Buffer]),
- {ok, Errors} = update_docs(Target, Docs2, [], replicated_changes),
- dump_update_errors(Errors),
- ets:update_counter(Stats, doc_write_failures, length(Errors)),
- ets:update_counter(Stats, docs_written, length(Docs2) -
- length(Errors)),
- {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
- {[], Ctxt};
+ try update_docs(Target, Docs2, [], replicated_changes) of
+ {ok, Errors} ->
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, length(Docs2) -
+ length(Errors)),
+ {ok, _, Ctxt} = do_checkpoint(Source, Target, Context, Seq, Stats),
+ {[], Ctxt}
+ catch
+ throw:attachment_write_failed ->
+ ?LOG_ERROR("attachment request failed during write to disk", []),
+ exit({internal_server_error, replication_link_failure})
+ end;
false ->
{[Docs | Buffer], Context}
end,
@@ -272,11 +278,17 @@ terminate(normal, State) ->
stats = Stats
} = State,
- {ok, Errors} = update_docs(Target, lists:flatten(Buffer), [], replicated_changes),
- dump_update_errors(Errors),
- ets:update_counter(Stats, doc_write_failures, length(Errors)),
- ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
- length(Errors)),
+ try update_docs(Target, lists:flatten(Buffer), [], replicated_changes) of
+ {ok, Errors} ->
+ dump_update_errors(Errors),
+ ets:update_counter(Stats, doc_write_failures, length(Errors)),
+ ets:update_counter(Stats, docs_written, lists:flatlength(Buffer) -
+ length(Errors))
+ catch
+ throw:attachment_write_failed ->
+ ?LOG_ERROR("attachment request failed during final write", []),
+ exit({internal_server_error, replication_link_failure})
+ end,
couch_task_status:update("Finishing"),
@@ -421,7 +433,10 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) ->
%% wait for headers to ensure that we have a 200 status code
%% this is where we follow redirects etc
Pid ! {self(), gimme_status},
- receive {Pid, {status, StreamStatus, StreamHeaders}} ->
+ receive
+ {'EXIT', Pid, attachment_request_failed} ->
+ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries-1);
+ {Pid, {status, StreamStatus, StreamHeaders}} ->
?LOG_DEBUG("streaming attachment Status ~p Headers ~p",
[StreamStatus, StreamHeaders]),
@@ -435,7 +450,12 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) ->
%% be the one to actually receive the ibrowse data.
{ok, fun() ->
Pid ! {self(), gimme_data},
- receive {Pid, Data} -> Data end
+ receive
+ {Pid, Data} ->
+ Data;
+ {'EXIT', Pid, attachment_request_failed} ->
+ throw(attachment_write_failed)
+ end
end};
ResponseCode >= 300, ResponseCode < 400 ->
% follow the redirect