diff options
Diffstat (limited to 'apps/couch/src/couch_rep_att.erl')
-rw-r--r-- | apps/couch/src/couch_rep_att.erl | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/apps/couch/src/couch_rep_att.erl b/apps/couch/src/couch_rep_att.erl new file mode 100644 index 00000000..9988c5db --- /dev/null +++ b/apps/couch/src/couch_rep_att.erl @@ -0,0 +1,118 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_att). + +-export([convert_stub/2, cleanup/0]). + +-include("couch_db.hrl"). + +convert_stub(#att{data=stub, name=Name} = Attachment, + {#http_db{} = Db, Id, Rev}) -> + {Pos, [RevId|_]} = Rev, + Request = Db#http_db{ + resource = lists:flatten([couch_util:url_encode(Id), "/", + couch_util:url_encode(Name)]), + qs = [{rev, couch_doc:rev_to_str({Pos,RevId})}] + }, + Ref = make_ref(), + RcvFun = fun() -> attachment_receiver(Ref, Request) end, + Attachment#att{data=RcvFun}. + +cleanup() -> + receive + {ibrowse_async_response, _, _} -> + %% TODO maybe log, didn't expect to have data here + cleanup(); + {ibrowse_async_response_end, _} -> + cleanup(); + {ibrowse_async_headers, _, _, _} -> + cleanup() + after 0 -> + erase(), + ok + end. + +% internal funs + +attachment_receiver(Ref, Request) -> + try case get(Ref) of + undefined -> + {ReqId, ContentEncoding} = start_http_request(Request), + put(Ref, {ReqId, ContentEncoding}), + receive_data(Ref, ReqId, ContentEncoding); + {ReqId, ContentEncoding} -> + receive_data(Ref, ReqId, ContentEncoding) + end + catch + throw:{attachment_request_failed, _} -> + case {Request#http_db.retries, Request#http_db.pause} of + {0, _} -> + ?LOG_INFO("request for ~p failed", [Request#http_db.resource]), + throw({attachment_request_failed, max_retries_reached}); + {N, Pause} when N > 0 -> + ?LOG_INFO("request for ~p timed out, retrying in ~p seconds", + [Request#http_db.resource, Pause/1000]), + timer:sleep(Pause), + cleanup(), + attachment_receiver(Ref, Request#http_db{retries = N-1}) + end + end. + +receive_data(Ref, ReqId, ContentEncoding) -> + receive + {ibrowse_async_response, ReqId, {chunk_start,_}} -> + receive_data(Ref, ReqId, ContentEncoding); + {ibrowse_async_response, ReqId, chunk_end} -> + receive_data(Ref, ReqId, ContentEncoding); + {ibrowse_async_response, ReqId, {error, Err}} -> + ?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]), + throw({attachment_request_failed, Err}); + {ibrowse_async_response, ReqId, Data} -> + Data; + {ibrowse_async_response_end, ReqId} -> + ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]), + throw({attachment_request_failed, premature_end}) + after 31000 -> + throw({attachment_request_failed, timeout}) + end. + +start_http_request(Req) -> + %% set stream_to here because self() has changed + Req2 = Req#http_db{options = [{stream_to,self()} | Req#http_db.options]}, + {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req2), + receive {ibrowse_async_headers, ReqId, Code, Headers} -> + case validate_headers(Req2, list_to_integer(Code), Headers) of + {ok, ContentEncoding} -> + {ReqId, ContentEncoding}; + {ok, ContentEncoding, NewReqId} -> + {NewReqId, ContentEncoding} + end + after 10000 -> + throw({attachment_request_failed, timeout}) + end. + +validate_headers(_Req, 200, Headers) -> + MochiHeaders = mochiweb_headers:make(Headers), + {ok, mochiweb_headers:get_value("Content-Encoding", MochiHeaders)}; +validate_headers(Req, Code, Headers) when Code > 299, Code < 400 -> + NewReq = couch_rep_httpc:redirected_request(Code, Headers, Req), + {ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq), + receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} -> + {ok, Encoding} = validate_headers(NewReq, list_to_integer(NewCode), + NewHeaders) + end, + {ok, Encoding, ReqId}; +validate_headers(Req, Code, _Headers) -> + #http_db{url=Url, resource=Resource} = Req, + ?LOG_ERROR("got ~p for ~s~s", [Code, Url, Resource]), + throw({attachment_request_failed, {bad_code, Code}}). |