From 7f9a155b4614190aeb0ac1e6bbdb7e8bbec32a08 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Fri, 7 Aug 2009 19:12:39 +0000 Subject: ibrowse wrapper for replicator, will replace do_http_request git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@802145 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/Makefile.am | 2 + src/couchdb/couch_db.hrl | 21 ++++++ src/couchdb/couch_rep.erl | 26 +++---- src/couchdb/couch_rep_httpc.erl | 163 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 199 insertions(+), 13 deletions(-) create mode 100644 src/couchdb/couch_rep_httpc.erl (limited to 'src/couchdb') diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index ec707d64..c3806310 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -75,6 +75,7 @@ source_files = \ couch_ref_counter.erl \ couch_rep.erl \ couch_rep_changes_feed.erl \ + couch_rep_httpc.erl \ couch_rep_sup.erl \ couch_server.erl \ couch_server_sup.erl \ @@ -122,6 +123,7 @@ compiled_files = \ couch_ref_counter.beam \ couch_rep.beam \ couch_rep_changes_feed.beam \ + couch_rep_httpc.beam \ couch_rep_sup.beam \ couch_server.beam \ couch_server_sup.beam \ diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index d4e0fbd5..53b2eaf9 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -228,5 +228,26 @@ view_states=nil }). +-record(http_db, { + url, + auth = [], + resource = "", + headers = [ + {"User-Agent", "CouchDb/"++couch_server:get_version()}, + {"Accept", "application/json"}, + {"Accept-Encoding", "gzip"} + ], + qs = [], + method = get, + body = nil, + options = [ + {response_format,binary}, + {inactivity_timeout, 30000} + ], + retries = 10, + pause = 1, + conn = nil +}). + % small value used in revision trees to indicate the revision isn't stored -define(REV_MISSING, []). diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index b63a2610..1692943a 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -82,7 +82,7 @@ replicate(Source, Target) -> %% gen_server callbacks %%============================================================================= --record(http_db, { +-record(old_http_db, { uri, headers, oauth @@ -391,7 +391,7 @@ attachment_loop(ReqId, Conn) -> att_stub_converter(DbS, Id, Rev, #att{name=Name,data=stub,type=Type,len=Length}=Att) -> - #http_db{uri=DbUrl, headers=Headers} = DbS, + #old_http_db{uri=DbUrl, headers=Headers} = DbS, {Pos, [RevId|_]} = Rev, Url = lists:flatten([DbUrl, couch_util:url_encode(Id), "/", couch_util:url_encode(?b2l(Name)), "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]), @@ -498,7 +498,7 @@ make_att_stub_receiver(Url, Headers, Name, Type, Length, Retries, Pause) -> open_db({remote, Url, Headers, Auth})-> - {ok, #http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url}; + {ok, #old_http_db{uri=?b2l(Url), headers=Headers, oauth=Auth}, Url}; open_db({local, DbName, UserCtx})-> case couch_db:open(DbName, [{user_ctx, UserCtx}]) of {ok, Db} -> {ok, Db, DbName}; @@ -506,7 +506,7 @@ open_db({local, DbName, UserCtx})-> end. -close_db(#http_db{})-> +close_db(#old_http_db{})-> ok; close_db(Db)-> couch_db:close(Db). @@ -675,7 +675,7 @@ do_http_request0(Url, Action, Headers, JsonBody, Retries, Pause) -> do_http_request0(Url, Action, Headers, JsonBody, Retries - 1, 2*Pause) end. -ensure_full_commit(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> +ensure_full_commit(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, Headers, OAuth, true), true = proplists:get_value(<<"ok">>, ResultProps), @@ -707,13 +707,13 @@ enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) -> -get_db_info(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> +get_db_info(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}) -> {DbProps} = do_http_request(DbUrl, get, Headers, OAuth), {ok, [{list_to_atom(?b2l(K)), V} || {K,V} <- DbProps]}; get_db_info(Db) -> couch_db:get_db_info(Db). -get_doc_info_list(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) -> +get_doc_info_list(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, StartSeq) -> Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey=" ++ integer_to_list(StartSeq), {Results} = do_http_request(Url, get, Headers, OAuth), @@ -739,7 +739,7 @@ get_doc_info_list(DbSource, StartSeq) -> end, {0, []}), lists:reverse(DocInfoList). -get_missing_revs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) -> +get_missing_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocIdRevsList) -> DocIdRevsList2 = [{Id, couch_doc:rev_to_strs(Revs)} || {Id, Revs} <- DocIdRevsList], {ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers, OAuth, {DocIdRevsList2}), @@ -750,7 +750,7 @@ get_missing_revs(Db, DocId) -> couch_db:get_missing_revs(Db, DocId). -open_doc(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) -> +open_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) -> [] = Options, case do_http_request(DbUrl ++ couch_util:url_encode(DocId), get, Headers, OAuth) of {[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} -> @@ -761,7 +761,7 @@ open_doc(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, DocId, Options) -> open_doc(Db, DocId, Options) -> couch_db:open_doc(Db, DocId, Options). -open_doc_revs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0, +open_doc_revs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth} = DbS, DocId, Revs0, [latest]) -> Revs = couch_doc:rev_to_strs(Revs0), BaseUrl = DbUrl ++ couch_util:url_encode(DocId) ++ "?revs=true&latest=true", @@ -845,7 +845,7 @@ binary_memory(Pid) -> lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end, 0, element(2,process_info(Pid, binary))). -update_doc(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) -> +update_doc(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, #doc{id=DocId}=Doc, Options) -> [] = Options, Url = DbUrl ++ couch_util:url_encode(DocId), {ResponseMembers} = do_http_request(Url, put, Headers, OAuth, @@ -857,7 +857,7 @@ update_doc(Db, Doc, Options) -> update_docs(_, [], _, _) -> {ok, []}; -update_docs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) -> +update_docs(#old_http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replicated_changes) -> JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs], ErrorsJson = do_http_request(DbUrl ++ "_bulk_docs", post, Headers, OAuth, @@ -877,7 +877,7 @@ update_docs(#http_db{uri=DbUrl, headers=Headers, oauth=OAuth}, Docs, [], replica update_docs(Db, Docs, Options, UpdateType) -> couch_db:update_docs(Db, Docs, Options, UpdateType). -up_to_date(#http_db{}, _Seq) -> +up_to_date(#old_http_db{}, _Seq) -> true; up_to_date(Source, Seq) -> {ok, NewDb} = couch_db:open(Source#db.name, []), diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl new file mode 100644 index 00000000..aa2b2714 --- /dev/null +++ b/src/couchdb/couch_rep_httpc.erl @@ -0,0 +1,163 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_rep_httpc). +-include("couch_db.hrl"). +-include("../ibrowse/ibrowse.hrl"). + +-export([db_exists/1, full_url/1, request/1, spawn_worker_process/1, + spawn_link_worker_process/1]). + +request(Req) when is_record(Req, http_db) -> + do_request(Req). + +do_request(#http_db{url=Url} = Req) when is_binary(Url) -> + do_request(Req#http_db{url = ?b2l(Url)}); + +do_request(Req) -> + #http_db{ + auth = Auth, + headers = Headers0, + method = Method, + body = B, + options = Opts, + conn = Conn + } = Req, + Url = full_url(Req), + Headers = case proplists:get_value(<<"oauth">>, Auth) of + undefined -> + Headers0; + {OAuthProps} -> + [oauth_header(Url, Method, OAuthProps) | Headers0] + end, + Body = if B =:= nil -> []; true -> iolist_to_binary(?JSON_ENCODE(B)) end, + Resp = case Conn of + nil -> + ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity); + _ -> + ibrowse:send_req_direct(Conn, Url, Headers, Method, Body, Opts, infinity) + end, + process_response(Resp, Req). + +db_exists(Req) -> + #http_db{ + url = Url, + headers = Headers + } = Req, + case catch ibrowse:send_req(Url, Headers, head) of + {ok, "200", _, _} -> + true; + {ok, "301", Headers, _} -> + MochiHeaders = mochiweb_headers:make(Headers), + RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders), + db_exists(Req#http_db{url = RedirectUrl}); + Error -> + ?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]), + false + end. + +full_url(#http_db{url=Url} = Req) when is_binary(Url) -> + full_url(Req#http_db{url = ?b2l(Url)}); + +full_url(#http_db{qs=[]} = Req) -> + Req#http_db.url ++ Req#http_db.resource; + +full_url(Req) -> + #http_db{ + url = Url, + resource = Resource, + qs = QS + } = Req, + QStr = lists:map(fun({K,V}) -> io_lib:format("~s=~s", + [couch_util:to_list(K), couch_util:to_list(V)]) end, QS), + lists:flatten([Url, Resource, "?", string:join(QStr, "&")]). + +process_response({ok, Status, Headers, Body}, Req) -> + Code = list_to_integer(Status), + if Code =:= 200; Code =:= 201 -> + ?JSON_DECODE(maybe_decompress(Headers, Body)); + Code =:= 301 -> + MochiHeaders = mochiweb_headers:make(Headers), + RedirectUrl = mochiweb_headers:get_value("Location", MochiHeaders), + do_request(Req#http_db{url = RedirectUrl}); + Code >= 400, Code < 500 -> + ?JSON_DECODE(maybe_decompress(Headers, Body)); + Code =:= 500; Code =:= 502 -> + #http_db{pause = Pause, retries = Retries} = Req, + ?LOG_INFO("retrying couch_rep_httpc request in ~p seconds " ++ + % "due to remote server error: ~s~s", [Pause/1000, Req#http_db.url, + "due to remote server error: ~p Body ~s", [Pause, Code, + Body]), + timer:sleep(1000*Pause), + do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}) + end; + +process_response({ibrowse_req_id, Id}, _Req) -> + {ibrowse_req_id, Id}; + +process_response({error, _Reason}, #http_db{url=Url, retries=0}) -> + ?LOG_ERROR("couch_rep_httpc request failed after 10 retries: ~s", [Url]), + exit({http_request_failed, ?l2b(["failed to replicate ", Url])}); +process_response({error, Reason}, Req) -> + #http_db{ + method = Method, + retries = Retries, + pause = Pause + } = Req, + ShortReason = case Reason of + connection_closed -> + connection_closed; + {'EXIT', {noproc, _}} -> + noproc; + {'EXIT', {normal, _}} -> + normal; + Else -> + Else + end, + ?LOG_DEBUG("retrying couch_rep_httpc ~p request in ~p seconds due to " ++ + "{error, ~p}", [Method, Pause, ShortReason]), + % "{error}", [Method, Pause]), + timer:sleep(1000*Pause), + do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}). + +spawn_worker_process(Req) -> + Url = ibrowse_lib:parse_url(Req#http_db.url), + {ok, Pid} = ibrowse:spawn_worker_process(Url#url.host, Url#url.port), + Pid. + +spawn_link_worker_process(Req) -> + Url = ibrowse_lib:parse_url(Req#http_db.url), + {ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port), + Pid. + +maybe_decompress(Headers, Body) -> + MochiHeaders = mochiweb_headers:make(Headers), + case mochiweb_headers:get_value("Content-Encoding", MochiHeaders) of + "gzip" -> + zlib:gunzip(Body); + _ -> + Body + end. + +oauth_header(Url, Action, Props) -> + ConsumerKey = ?b2l(proplists:get_value(<<"consumer_key">>, Props)), + Token = ?b2l(proplists:get_value(<<"token">>, Props)), + TokenSecret = ?b2l(proplists:get_value(<<"token_secret">>, Props)), + ConsumerSecret = ?b2l(proplists:get_value(<<"consumer_secret">>, Props)), + Consumer = {ConsumerKey, ConsumerSecret, hmac_sha1}, + Method = case Action of + get -> "GET"; + post -> "POST"; + put -> "PUT" + end, + Params = oauth:signed_params(Method, Url, [], Consumer, Token, TokenSecret), + {"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}. -- cgit v1.2.3