From 899fedf56790c3c4c213d02ce698e796dbbb6b43 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Sun, 15 Mar 2009 17:47:29 +0000 Subject: Streaming attachment replication now follows redirects and checks for error codes. Includes tests that design doc attachments are replicated. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@754704 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep.erl | 83 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 9 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index 3647f6db..aa107cdf 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -327,9 +327,30 @@ attachment_loop(ReqId) -> couch_util:should_flush(), receive {From, {set_req_id, NewId}} -> + %% we learn the ReqId to listen for From ! {self(), {ok, NewId}}, attachment_loop(NewId); - {ibrowse_async_headers, ReqId, _Status, _Headers} -> + {ibrowse_async_headers, ReqId, Status, Headers} -> + %% we got header, give the controlling process a chance to react + receive + {From, gimme_status} -> + %% send status/headers to controller + From ! {self(), {status, Status, Headers}}, + receive + {From, continue} -> + %% normal case + attachment_loop(ReqId); + {From, fail} -> + %% error, failure code + ?LOG_ERROR( + "streaming attachment failed with status ~p", + [Status]), + exit(attachment_request_failed); + {From, stop_ok} -> + %% stop looping, controller will start a new loop + stop_ok + end + end, attachment_loop(ReqId); {ibrowse_async_response, ReqId, {chunk_start,_}} -> attachment_loop(ReqId); @@ -349,7 +370,19 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) -> % TODO worry about revisions Url = DbUrl ++ url_encode(Id) ++ "/" ++ ?b2l(Name), ?LOG_DEBUG("Attachment URL ~p", [Url]), + {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name, + Type, Length), + {Name, {Type, {RcvFun, Length}}}. + +make_attachment_stub_receiver(Url, Headers, Name, Type, Length) -> + make_attachment_stub_receiver(Url, Headers, Name, Type, Length, 10). + +make_attachment_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0) -> + ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s", + [Url]), + exit(attachment_request_failed); +make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> %% start the process that receives attachment data from ibrowse Pid = spawn_link(fun() -> attachment_loop(nil) end), @@ -364,14 +397,46 @@ attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) -> Pid ! {self(), {set_req_id, ReqId}}, receive {Pid, {ok, ReqId}} -> ok end, - %% this is the function that goes into the streaming attachment code. - %% It gets executed by the replication gen_server, so it can't - %% be the one to actually receive the ibrowse data. - RcvFun = fun() -> - Pid ! {self(), gimme_data}, - receive {Pid, Data} -> Data end - end, - {Name, {Type, {RcvFun, Length}}}. + %% wait for headers to ensure that we have a 200 status code + %% this is where we follow redirects etc + Pid ! {self(), gimme_status}, + receive {Pid, {status, StreamStatus, StreamHeaders}} -> + ?LOG_DEBUG("streaming attachment Status ~p Headers ~p", + [StreamStatus, StreamHeaders]), + + ResponseCode = list_to_integer(StreamStatus), + if + ResponseCode >= 200, ResponseCode < 300 -> + % the normal case + Pid ! {self(), continue}, + %% this function goes into the streaming attachment code. + %% It gets executed by the replication gen_server, so it can't + %% be the one to actually receive the ibrowse data. + {ok, fun() -> + Pid ! {self(), gimme_data}, + receive {Pid, Data} -> Data end + end}; + ResponseCode >= 300, ResponseCode < 400 -> + % follow the redirect + Pid ! {self(), stop_ok}, + RedirectUrl = mochiweb_headers:get_value("Location", + mochiweb_headers:make(StreamHeaders)), + make_attachment_stub_receiver(RedirectUrl, Headers, Name, Type, Length, + Retries - 1); + ResponseCode >= 400, ResponseCode < 500 -> + % an error... log and fail + ?LOG_ERROR("streaming attachment failed with code ~p: ~s", + [ResponseCode, Url]), + Pid ! {self(), fail}, + exit(attachment_request_failed); + ResponseCode == 500 -> + % an error... log and retry + ?LOG_INFO("retrying streaming attachment request due to 500 error: ~s", [Url]), + Pid ! {self(), fail}, + make_attachment_stub_receiver(Url, Headers, Name, Type, Length, + Retries - 1) + end + end. open_db({remote, Url, Headers})-> -- cgit v1.2.3