From 5dcbc2290ac780f1a625b5c9435cfb35eac4e1ef Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Mon, 10 Aug 2009 18:37:43 +0000 Subject: new replicator using _changes feed for continuous replication git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802888 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_att.erl | 100 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 src/couchdb/couch_rep_att.erl (limited to 'src/couchdb/couch_rep_att.erl') diff --git a/src/couchdb/couch_rep_att.erl b/src/couchdb/couch_rep_att.erl new file mode 100644 index 00000000..baeb6c65 --- /dev/null +++ b/src/couchdb/couch_rep_att.erl @@ -0,0 +1,100 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_att). + +-export([convert_stub/2, cleanup/0]). + +-include("couch_db.hrl"). + +convert_stub(#att{data=stub} = Attachment, {#http_db{} = Db, Id, Rev}) -> + {Pos, [RevId|_]} = Rev, + Name = Attachment#att.name, + Request = Db#http_db{ + resource = lists:flatten([couch_util:url_encode(Id), "/", + couch_util:url_encode(Name)]), + qs = [{rev, couch_doc:rev_to_str({Pos,RevId})}] + }, + Ref = make_ref(), + RcvFun = fun() -> attachment_receiver(Ref, Request) end, + Attachment#att{data=RcvFun}. + +cleanup() -> + receive + {ibrowse_async_response, _, _} -> + %% TODO maybe log, didn't expect to have data here + cleanup(); + {ibrowse_async_response_end, _} -> + cleanup() + after 0 -> + erase(), + ok + end. + +% internal funs + +attachment_receiver(Ref, Request) -> + case get(Ref) of + undefined -> + ReqId = start_http_request(Request), + put(Ref, ReqId), + receive_data(Ref, ReqId); + ReqId -> + receive_data(Ref, ReqId) + end. + +receive_data(Ref, ReqId) -> + receive + {ibrowse_async_response, ReqId, {chunk_start,_}} -> + receive_data(Ref, ReqId); + {ibrowse_async_response, ReqId, chunk_end} -> + receive_data(Ref, ReqId); + {ibrowse_async_response, ReqId, {error, Err}} -> + ?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]), + throw({attachment_request_failed, Err}); + {ibrowse_async_response, ReqId, Data} -> + % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]), + Data; + {ibrowse_async_response_end, ReqId} -> + ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]), + throw({attachment_request_failed, premature_end}) + end. + +start_http_request(Req) -> + %% set stream_to here because self() has changed + Req2 = Req#http_db{options = [{stream_to,self()} | Req#http_db.options]}, + {ibrowse_req_id, ReqId} = couch_rep_httpc:request(Req2), + receive {ibrowse_async_headers, ReqId, Code, Headers} -> + case validate_headers(Req2, list_to_integer(Code), Headers) of + ok -> + ReqId; + {ok, NewReqId} -> + NewReqId + end + end. + +validate_headers(_Req, 200, _Headers) -> + ok; +validate_headers(Req, Code, Headers) when Code > 299, Code < 400 -> + %% TODO check that the qs is actually included in the Location header + %% TODO this only supports one level of redirection + Url = mochiweb_headers:get_value("Location",mochiweb_headers:make(Headers)), + NewReq = Req#http_db{url=Url, resource="", qs=[]}, + {ibrowse_req_id, ReqId} = couch_rep_httpc:request(NewReq), + receive {ibrowse_async_headers, ReqId, NewCode, NewHeaders} -> + ok = validate_headers(NewReq, list_to_integer(NewCode), NewHeaders) + end, + {ok, ReqId}; +validate_headers(Req, Code, _Headers) -> + #http_db{url=Url, resource=Resource} = Req, + ?LOG_ERROR("got ~p for ~s~s", [Code, Url, Resource]), + throw({attachment_request_failed, {bad_code, Code}}). -- cgit v1.2.3