% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_rep_httpc). -include("couch_db.hrl"). -include_lib("ibrowse/include/ibrowse.hrl"). -export([db_exists/1, db_exists/2]). -export([full_url/1, request/1, redirected_request/3]). -export([spawn_worker_process/1, spawn_link_worker_process/1]). -export([ssl_options/1]). request(#http_db{} = Req) -> do_request(Req). do_request(#http_db{url=Url} = Req) when is_binary(Url) -> do_request(Req#http_db{url = ?b2l(Url)}); do_request(Req) -> #http_db{ auth = Auth, body = B, conn = Conn, headers = Headers0, method = Method, options = Opts, qs = QS } = Req, Url = full_url(Req), Headers = case couch_util:get_value(<<"oauth">>, Auth) of undefined -> Headers0; {OAuthProps} -> [oauth_header(Url, QS, Method, OAuthProps) | Headers0] end, Body = case B of {Fun, InitialState} when is_function(Fun) -> {Fun, InitialState}; nil -> []; _Else -> iolist_to_binary(?JSON_ENCODE(B)) end, Resp = case Conn of nil -> ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity); _ -> ibrowse:send_req_direct(Conn, Url, Headers, Method, Body, Opts, infinity) end, process_response(Resp, Req). db_exists(Req) -> db_exists(Req, Req#http_db.url). db_exists(Req, true) -> db_exists(Req, Req#http_db.url, true); db_exists(Req, false) -> db_exists(Req, Req#http_db.url, false); db_exists(Req, CanonicalUrl) -> db_exists(Req, CanonicalUrl, false). db_exists(Req, CanonicalUrl, CreateDB) -> #http_db{ auth = Auth, headers = Headers0, options = Options, url = Url } = Req, HeadersFun = fun(Method) -> case couch_util:get_value(<<"oauth">>, Auth) of undefined -> Headers0; {OAuthProps} -> [oauth_header(Url, [], Method, OAuthProps) | Headers0] end end, case CreateDB of true -> Headers = [{"Content-Length", 0} | HeadersFun(put)], catch ibrowse:send_req(Url, Headers, put, [], Options); _Else -> ok end, case catch ibrowse:send_req(Url, HeadersFun(head), head, [], Options) of {ok, "200", _, _} -> config_http(CanonicalUrl), Req#http_db{url = CanonicalUrl}; {ok, "301", RespHeaders, _} -> RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), db_exists(Req#http_db{url = RedirectUrl}, RedirectUrl); {ok, "302", RespHeaders, _} -> RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl); {ok, "303", RespHeaders, _} -> RedirectUrl = redirect_url(RespHeaders, Req#http_db.url), db_exists(Req#http_db{method = get, url = RedirectUrl}, CanonicalUrl); {ok, "401", _, _} -> throw({unauthorized, ?l2b(Url)}); Error -> ?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]), throw({db_not_found, ?l2b(Url)}) end. config_http(Url) -> #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url), ok = ibrowse:set_max_sessions(Host, Port, list_to_integer( couch_config:get("replicator", "max_http_sessions", "20"))), ok = ibrowse:set_max_pipeline_size(Host, Port, list_to_integer( couch_config:get("replicator", "max_http_pipeline_size", "50"))), ok = couch_config:register( fun("replicator", "max_http_sessions", MaxSessions) -> ibrowse:set_max_sessions(Host, Port, list_to_integer(MaxSessions)); ("replicator", "max_http_pipeline_size", PipeSize) -> ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(PipeSize)) end). redirect_url(RespHeaders, OrigUrl) -> MochiHeaders = mochiweb_headers:make(RespHeaders), RedUrl = mochiweb_headers:get_value("Location", MochiHeaders), #url{ host = Host, host_type = HostType, port = Port, path = Path, protocol = Proto } = ibrowse_lib:parse_url(RedUrl), #url{username = User, password = Passwd} = ibrowse_lib:parse_url(OrigUrl), Creds = case is_list(User) andalso is_list(Passwd) of true -> User ++ ":" ++ Passwd ++ "@"; false -> [] end, HostPart = case HostType of ipv6_address -> "[" ++ Host ++ "]"; _ -> Host end, atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++ integer_to_list(Port) ++ Path. full_url(#http_db{url=Url} = Req) when is_binary(Url) -> full_url(Req#http_db{url = ?b2l(Url)}); full_url(#http_db{qs=[]} = Req) -> Req#http_db.url ++ Req#http_db.resource; full_url(Req) -> #http_db{ url = Url, resource = Resource, qs = QS } = Req, QStr = lists:map(fun({K,V}) -> io_lib:format("~s=~s", [couch_util:to_list(K), couch_util:to_list(V)]) end, QS), lists:flatten([Url, Resource, "?", string:join(QStr, "&")]). process_response({ok, Status, Headers, Body}, Req) -> Code = list_to_integer(Status), if Code =:= 200; Code =:= 201; Code =:= 202 -> ?JSON_DECODE(maybe_decompress(Headers, Body)); Code =:= 301; Code =:= 302 ; Code =:= 303 -> do_request(redirected_request(Code, Headers, Req)); Code =:= 409 -> throw(conflict); Code >= 400, Code < 500 -> ?JSON_DECODE(maybe_decompress(Headers, Body)); Code =:= 500; Code =:= 502; Code =:= 503 -> #http_db{pause = Pause, retries = Retries} = Req, ?LOG_INFO("retrying couch_rep_httpc request in ~p seconds " ++ % "due to remote server error: ~s~s", [Pause/1000, Req#http_db.url, "due to remote server error: ~p Body ~s", [Pause/1000, Code, Body]), timer:sleep(Pause), do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}); true -> exit({http_request_failed, ?l2b(["unhandled response code ", Status])}) end; process_response({ibrowse_req_id, Id}, _Req) -> {ibrowse_req_id, Id}; process_response({error, _Reason}, #http_db{url=Url, retries=0}) -> ?LOG_ERROR("couch_rep_httpc request failed after 10 retries: ~s", [Url]), exit({http_request_failed, ?l2b(["failed to replicate ", Url])}); process_response({error, Reason}, Req) -> #http_db{ method = Method, retries = Retries, pause = Pause } = Req, ShortReason = case Reason of sel_conn_closed -> connection_closed; {'EXIT', {noproc, _}} -> noproc; {'EXIT', {normal, _}} -> normal; Else -> Else end, ?LOG_ERROR("~p retry ~p ~s in ~p seconds due to {error, ~p}", [?MODULE, Method, full_url(Req), Pause/1000, ShortReason]), timer:sleep(Pause), if Reason == worker_is_dead -> C = spawn_link_worker_process(Req), do_request(Req#http_db{retries = Retries-1, pause = 2*Pause, conn=C}); true -> do_request(Req#http_db{retries = Retries-1, pause = 2*Pause}) end. redirected_request(Code, Headers, Req) -> RedirectUrl = redirect_url(Headers, Req#http_db.url), {Base, QStr, _} = mochiweb_util:urlsplit_path(RedirectUrl), QS = mochiweb_util:parse_qs(QStr), ReqHeaders = case couch_util:get_value(<<"oauth">>, Req#http_db.auth) of undefined -> Req#http_db.headers; _Else -> lists:keydelete("Authorization", 1, Req#http_db.headers) end, Req#http_db{ method = case couch_util:to_integer(Code) of 303 -> get; _ -> Req#http_db.method end, url = Base, resource = "", qs = QS, headers = ReqHeaders }. spawn_worker_process(Req) -> Url = ibrowse_lib:parse_url(Req#http_db.url), {ok, Pid} = ibrowse_http_client:start(Url), Pid. spawn_link_worker_process(Req) -> {ok, Pid} = ibrowse:spawn_link_worker_process(Req#http_db.url), Pid. maybe_decompress(Headers, Body) -> MochiHeaders = mochiweb_headers:make(Headers), case mochiweb_headers:get_value("Content-Encoding", MochiHeaders) of "gzip" -> zlib:gunzip(Body); _ -> Body end. oauth_header(Url, QS, Action, Props) -> % erlang-oauth doesn't like iolists QSL = [{couch_util:to_list(K), ?b2l(?l2b(couch_util:to_list(V)))} || {K,V} <- QS], ConsumerKey = ?b2l(couch_util:get_value(<<"consumer_key">>, Props)), Token = ?b2l(couch_util:get_value(<<"token">>, Props)), TokenSecret = ?b2l(couch_util:get_value(<<"token_secret">>, Props)), ConsumerSecret = ?b2l(couch_util:get_value(<<"consumer_secret">>, Props)), SignatureMethodStr = ?b2l(couch_util:get_value(<<"signature_method">>, Props, <<"HMAC-SHA1">>)), SignatureMethodAtom = case SignatureMethodStr of "PLAINTEXT" -> plaintext; "HMAC-SHA1" -> hmac_sha1; "RSA-SHA1" -> rsa_sha1 end, Consumer = {ConsumerKey, ConsumerSecret, SignatureMethodAtom}, Method = case Action of get -> "GET"; post -> "POST"; put -> "PUT"; head -> "HEAD" end, Params = oauth:signed_params(Method, Url, QSL, Consumer, Token, TokenSecret) -- QSL, {"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}. ssl_options(#http_db{url = Url}) -> case ibrowse_lib:parse_url(Url) of #url{protocol = https} -> Depth = list_to_integer( couch_config:get("replicator", "ssl_certificate_max_depth", "3") ), SslOpts = [{depth, Depth} | case couch_config:get("replicator", "verify_ssl_certificates") of "true" -> ssl_verify_options(true); _ -> ssl_verify_options(false) end], [{is_ssl, true}, {ssl_options, SslOpts}]; #url{protocol = http} -> [] end. ssl_verify_options(Value) -> ssl_verify_options(Value, erlang:system_info(otp_release)). ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" -> CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), [{verify, verify_peer}, {cacertfile, CAFile}]; ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" -> [{verify, verify_none}]; ssl_verify_options(true, _OTPVersion) -> CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"), [{verify, 2}, {cacertfile, CAFile}]; ssl_verify_options(false, _OTPVersion) -> [{verify, 0}].