diff options
-rw-r--r-- | src/couchdb/couch_rep.erl | 72 |
1 files changed, 47 insertions, 25 deletions
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index c356e189..6dfe6fee 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -403,14 +403,14 @@ attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type, Length}}) -> {Name, {Type, {RcvFun, Length}}}. make_attachment_stub_receiver(Url, Headers, Name, Type, Length) -> - make_attachment_stub_receiver(Url, Headers, Name, Type, Length, 10). + make_attachment_stub_receiver(Url, Headers, Name, Type, Length, 10, 1000). -make_attachment_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0) -> +make_attachment_stub_receiver(Url, _Headers, _Name, _Type, _Length, 0, _Pause) -> ?LOG_ERROR("streaming attachment request failed after 10 retries: ~s", [Url]), exit({attachment_request_failed, ?l2b(["failed to replicate ", Url])}); -make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> +make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) -> %% start the process that receives attachment data from ibrowse #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url), {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port), @@ -420,22 +420,37 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> Opts = [{stream_to, Pid}, {response_format, binary}], ReqId = case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts, infinity) of - {ibrowse_req_id, X} -> X; - {error, Reason} -> - exit({attachment_request_failed, - ?l2b(["ibrowse error on ", Url, " : ", atom_to_list(Reason)])}) + {ibrowse_req_id, X} -> + X; + {error, Reason} -> + ?LOG_INFO("retrying couch_rep attachment request in ~p " ++ + "seconds due to {error, ~p}: ~s", [Pause/1000, Reason, Url]), + catch ibrowse:stop_worker_process(Conn), + timer:sleep(Pause), + make_attachment_stub_receiver(Url, Headers, Name, Type, Length, + Retries-1, 2*Pause) end, %% tell our receiver about the ReqId it needs to look for Pid ! {self(), {set_req_id, ReqId}}, - receive {Pid, {ok, ReqId}} -> ok end, + receive + {Pid, {ok, ReqId}} -> + ok; + {'EXIT', Pid, _Reason} -> + catch ibrowse:stop_worker_process(Conn), + timer:sleep(Pause), + make_attachment_stub_receiver(Url, Headers, Name, Type, Length, + Retries-1, 2*Pause) + end, %% wait for headers to ensure that we have a 200 status code %% this is where we follow redirects etc Pid ! {self(), gimme_status}, receive {'EXIT', Pid, attachment_request_failed} -> - make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries-1); + catch ibrowse:stop_worker_process(Conn), + make_attachment_stub_receiver(Url, Headers, Name, Type, Length, + Retries-1, Pause); {Pid, {status, StreamStatus, StreamHeaders}} -> ?LOG_DEBUG("streaming attachment Status ~p Headers ~p", [StreamStatus, StreamHeaders]), @@ -462,8 +477,9 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> Pid ! {self(), stop_ok}, RedirectUrl = mochiweb_headers:get_value("Location", mochiweb_headers:make(StreamHeaders)), - make_attachment_stub_receiver(RedirectUrl, Headers, Name, Type, Length, - Retries - 1); + catch ibrowse:stop_worker_process(Conn), + make_attachment_stub_receiver(RedirectUrl, Headers, Name, Type, + Length, Retries - 1, Pause); ResponseCode >= 400, ResponseCode < 500 -> % an error... log and fail ?LOG_ERROR("streaming attachment failed with code ~p: ~s", @@ -472,10 +488,13 @@ make_attachment_stub_receiver(Url, Headers, Name, Type, Length, Retries) -> exit(attachment_request_failed); ResponseCode == 500 -> % an error... log and retry - ?LOG_INFO("retrying streaming attachment request due to 500 error: ~s", [Url]), + ?LOG_INFO("retrying couch_rep attachment request in ~p " ++ + "seconds due to 500 response: ~s", [Pause/1000, Url]), Pid ! {self(), fail}, + catch ibrowse:stop_worker_process(Conn), + timer:sleep(Pause), make_attachment_stub_receiver(Url, Headers, Name, Type, Length, - Retries - 1) + Retries - 1, 2*Pause) end end. @@ -588,15 +607,15 @@ do_http_request(Url, Action, Headers) -> do_http_request(Url, Action, Headers, []). do_http_request(Url, Action, Headers, JsonBody) -> - do_http_request(Url, Action, Headers, JsonBody, 10). + do_http_request(Url, Action, Headers, JsonBody, 10, 1000). -do_http_request(Url, Action, Headers, Body, Retries) when is_binary(Url) -> - do_http_request(?b2l(Url), Action, Headers, Body, Retries); -do_http_request(Url, Action, _Headers, _JsonBody, 0) -> +do_http_request(Url, Action, Headers, Body, Retries, Pause) when is_binary(Url) -> + do_http_request(?b2l(Url), Action, Headers, Body, Retries, Pause); +do_http_request(Url, Action, _Headers, _JsonBody, 0, _Pause) -> ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s", [Action, Url]), exit({http_request_failed, ?l2b(["failed to replicate ", Url])}); -do_http_request(Url, Action, Headers, JsonBody, Retries) -> +do_http_request(Url, Action, Headers, JsonBody, Retries, Pause) -> ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]), Body = case JsonBody of @@ -622,18 +641,21 @@ do_http_request(Url, Action, Headers, JsonBody, Retries) -> ResponseCode >= 300, ResponseCode < 400 -> RedirectUrl = mochiweb_headers:get_value("Location", mochiweb_headers:make(ResponseHeaders)), - do_http_request(RedirectUrl, Action, Headers, JsonBody, Retries-1); + do_http_request(RedirectUrl, Action, Headers, JsonBody, Retries-1, + Pause); ResponseCode >= 400, ResponseCode < 500 -> ?JSON_DECODE(ResponseBody); ResponseCode == 500 -> - ?LOG_INFO("retrying couch_rep HTTP ~p request due to 500 error: ~s", - [Action, Url]), - do_http_request(Url, Action, Headers, JsonBody, Retries - 1) + ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds " ++ + "due to 500 error: ~s", [Action, Pause/1000, Url]), + timer:sleep(Pause), + do_http_request(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) end; {error, Reason} -> - ?LOG_INFO("retrying couch_rep HTTP ~p request due to {error, ~p}: ~s", - [Action, Reason, Url]), - do_http_request(Url, Action, Headers, JsonBody, Retries - 1) + ?LOG_INFO("retrying couch_rep HTTP ~p request in ~p seconds due to " ++ + "{error, ~p}: ~s", [Action, Pause/1000, Reason, Url]), + timer:sleep(Pause), + do_http_request(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) end. ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) -> |