summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_rep_httpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'apps/couch/src/couch_rep_httpc.erl')
-rw-r--r--apps/couch/src/couch_rep_httpc.erl317
1 files changed, 317 insertions, 0 deletions
diff --git a/apps/couch/src/couch_rep_httpc.erl b/apps/couch/src/couch_rep_httpc.erl
new file mode 100644
index 00000000..e22c8f81
--- /dev/null
+++ b/apps/couch/src/couch_rep_httpc.erl
@@ -0,0 +1,317 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_rep_httpc).
+-include("couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+
+-export([db_exists/1, db_exists/2]).
+-export([full_url/1, request/1, redirected_request/3]).
+-export([spawn_worker_process/1, spawn_link_worker_process/1]).
+-export([ssl_options/1]).
+
+request(#http_db{} = Req) ->
+ do_request(Req).
+
+do_request(#http_db{url=Url} = Req) when is_binary(Url) ->
+ do_request(Req#http_db{url = ?b2l(Url)});
+
+do_request(Req) ->
+ #http_db{
+ auth = Auth,
+ body = B,
+ conn = Conn,
+ headers = Headers0,
+ method = Method,
+ options = Opts,
+ qs = QS
+ } = Req,
+ Url = full_url(Req),
+ Headers = case couch_util:get_value(<<"oauth">>, Auth) of
+ undefined ->
+ Headers0;
+ {OAuthProps} ->
+ [oauth_header(Url, QS, Method, OAuthProps) | Headers0]
+ end,
+ Body = case B of
+ {Fun, InitialState} when is_function(Fun) ->
+ {Fun, InitialState};
+ nil ->
+ [];
+ _Else ->
+ iolist_to_binary(?JSON_ENCODE(B))
+ end,
+ Resp = case Conn of
+ nil ->
+ ibrowse:send_req(Url, Headers, Method, Body, Opts, infinity);
+ _ ->
+ ibrowse:send_req_direct(Conn, Url, Headers, Method, Body, Opts, infinity)
+ end,
+ process_response(Resp, Req).
+
+db_exists(Req) ->
+ db_exists(Req, Req#http_db.url).
+
+db_exists(Req, true) ->
+ db_exists(Req, Req#http_db.url, true);
+
+db_exists(Req, false) ->
+ db_exists(Req, Req#http_db.url, false);
+
+db_exists(Req, CanonicalUrl) ->
+ db_exists(Req, CanonicalUrl, false).
+
+db_exists(Req, CanonicalUrl, CreateDB) ->
+ #http_db{
+ auth = Auth,
+ headers = Headers0,
+ options = Options,
+ url = Url
+ } = Req,
+ HeadersFun = fun(Method) ->
+ case couch_util:get_value(<<"oauth">>, Auth) of
+ undefined ->
+ Headers0;
+ {OAuthProps} ->
+ [oauth_header(Url, [], Method, OAuthProps) | Headers0]
+ end
+ end,
+ case CreateDB of
+ true ->
+ Headers = [{"Content-Length", 0} | HeadersFun(put)],
+ catch ibrowse:send_req(Url, Headers, put, [], Options);
+ _Else -> ok
+ end,
+ case catch ibrowse:send_req(Url, HeadersFun(head), head, [], Options) of
+ {ok, "200", _, _} ->
+ config_http(CanonicalUrl),
+ Req#http_db{url = CanonicalUrl};
+ {ok, "301", RespHeaders, _} ->
+ RedirectUrl = redirect_url(RespHeaders, Req#http_db.url),
+ db_exists(Req#http_db{url = RedirectUrl}, RedirectUrl);
+ {ok, "302", RespHeaders, _} ->
+ RedirectUrl = redirect_url(RespHeaders, Req#http_db.url),
+ db_exists(Req#http_db{url = RedirectUrl}, CanonicalUrl);
+ {ok, "303", RespHeaders, _} ->
+ RedirectUrl = redirect_url(RespHeaders, Req#http_db.url),
+ db_exists(Req#http_db{method = get, url = RedirectUrl}, CanonicalUrl);
+ {ok, "401", _, _} ->
+ throw({unauthorized, ?l2b(Url)});
+ Error ->
+ ?LOG_DEBUG("DB at ~s could not be found because ~p", [Url, Error]),
+ throw({db_not_found, ?l2b(Url)})
+ end.
+
+config_http(Url) ->
+ #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
+ ok = ibrowse:set_max_sessions(Host, Port, list_to_integer(
+ couch_config:get("replicator", "max_http_sessions", "20"))),
+ ok = ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(
+ couch_config:get("replicator", "max_http_pipeline_size", "50"))),
+ ok = couch_config:register(
+ fun("replicator", "max_http_sessions", MaxSessions) ->
+ ibrowse:set_max_sessions(Host, Port, list_to_integer(MaxSessions));
+ ("replicator", "max_http_pipeline_size", PipeSize) ->
+ ibrowse:set_max_pipeline_size(Host, Port, list_to_integer(PipeSize))
+ end).
+
+redirect_url(RespHeaders, OrigUrl) ->
+ MochiHeaders = mochiweb_headers:make(RespHeaders),
+ RedUrl = mochiweb_headers:get_value("Location", MochiHeaders),
+ #url{
+ host = Host, host_type = HostType, port = Port,
+ path = Path, protocol = Proto
+ } = ibrowse_lib:parse_url(RedUrl),
+ #url{username = User, password = Passwd} = ibrowse_lib:parse_url(OrigUrl),
+ Creds = case is_list(User) andalso is_list(Passwd) of
+ true ->
+ User ++ ":" ++ Passwd ++ "@";
+ false ->
+ []
+ end,
+ HostPart = case HostType of
+ ipv6_address ->
+ "[" ++ Host ++ "]";
+ _ ->
+ Host
+ end,
+ atom_to_list(Proto) ++ "://" ++ Creds ++ HostPart ++ ":" ++
+ integer_to_list(Port) ++ Path.
+
+full_url(#http_db{url=Url} = Req) when is_binary(Url) ->
+ full_url(Req#http_db{url = ?b2l(Url)});
+
+full_url(#http_db{qs=[]} = Req) ->
+ Req#http_db.url ++ Req#http_db.resource;
+
+full_url(Req) ->
+ #http_db{
+ url = Url,
+ resource = Resource,
+ qs = QS
+ } = Req,
+ QStr = lists:map(fun({K,V}) -> io_lib:format("~s=~s",
+ [couch_util:to_list(K), couch_util:to_list(V)]) end, QS),
+ lists:flatten([Url, Resource, "?", string:join(QStr, "&")]).
+
+process_response({ok, Status, Headers, Body}, Req) ->
+ Code = list_to_integer(Status),
+ if Code =:= 200; Code =:= 201 ->
+ ?JSON_DECODE(maybe_decompress(Headers, Body));
+ Code =:= 301; Code =:= 302 ; Code =:= 303 ->
+ do_request(redirected_request(Code, Headers, Req));
+ Code =:= 409 ->
+ throw(conflict);
+ Code >= 400, Code < 500 ->
+ ?JSON_DECODE(maybe_decompress(Headers, Body));
+ Code =:= 500; Code =:= 502; Code =:= 503 ->
+ #http_db{pause = Pause, retries = Retries} = Req,
+ ?LOG_INFO("retrying couch_rep_httpc request in ~p seconds " ++
+ % "due to remote server error: ~s~s", [Pause/1000, Req#http_db.url,
+ "due to remote server error: ~p Body ~s", [Pause/1000, Code,
+ Body]),
+ timer:sleep(Pause),
+ do_request(Req#http_db{retries = Retries-1, pause = 2*Pause});
+ true ->
+ exit({http_request_failed, ?l2b(["unhandled response code ", Status])})
+ end;
+
+process_response({ibrowse_req_id, Id}, _Req) ->
+ {ibrowse_req_id, Id};
+
+process_response({error, _Reason}, #http_db{url=Url, retries=0}) ->
+ ?LOG_ERROR("couch_rep_httpc request failed after 10 retries: ~s", [Url]),
+ exit({http_request_failed, ?l2b(["failed to replicate ", Url])});
+process_response({error, Reason}, Req) ->
+ #http_db{
+ method = Method,
+ retries = Retries,
+ pause = Pause
+ } = Req,
+ ShortReason = case Reason of
+ sel_conn_closed ->
+ connection_closed;
+ {'EXIT', {noproc, _}} ->
+ noproc;
+ {'EXIT', {normal, _}} ->
+ normal;
+ Else ->
+ Else
+ end,
+ ?LOG_ERROR("~p retry ~p ~s in ~p seconds due to {error, ~p}",
+ [?MODULE, Method, full_url(Req), Pause/1000, ShortReason]),
+ timer:sleep(Pause),
+ if Reason == worker_is_dead ->
+ C = spawn_link_worker_process(Req),
+ do_request(Req#http_db{retries = Retries-1, pause = 2*Pause, conn=C});
+ true ->
+ do_request(Req#http_db{retries = Retries-1, pause = 2*Pause})
+ end.
+
+redirected_request(Code, Headers, Req) ->
+ RedirectUrl = redirect_url(Headers, Req#http_db.url),
+ {Base, QStr, _} = mochiweb_util:urlsplit_path(RedirectUrl),
+ QS = mochiweb_util:parse_qs(QStr),
+ ReqHeaders = case couch_util:get_value(<<"oauth">>, Req#http_db.auth) of
+ undefined ->
+ Req#http_db.headers;
+ _Else ->
+ lists:keydelete("Authorization", 1, Req#http_db.headers)
+ end,
+ Req#http_db{
+ method = case couch_util:to_integer(Code) of
+ 303 -> get;
+ _ -> Req#http_db.method
+ end,
+ url = Base,
+ resource = "",
+ qs = QS,
+ headers = ReqHeaders
+ }.
+
+spawn_worker_process(Req) ->
+ Url = ibrowse_lib:parse_url(Req#http_db.url),
+ {ok, Pid} = ibrowse_http_client:start(Url),
+ Pid.
+
+spawn_link_worker_process(Req) ->
+ {ok, Pid} = ibrowse:spawn_link_worker_process(Req#http_db.url),
+ Pid.
+
+maybe_decompress(Headers, Body) ->
+ MochiHeaders = mochiweb_headers:make(Headers),
+ case mochiweb_headers:get_value("Content-Encoding", MochiHeaders) of
+ "gzip" ->
+ zlib:gunzip(Body);
+ _ ->
+ Body
+ end.
+
+oauth_header(Url, QS, Action, Props) ->
+ % erlang-oauth doesn't like iolists
+ QSL = [{couch_util:to_list(K), ?b2l(?l2b(couch_util:to_list(V)))} ||
+ {K,V} <- QS],
+ ConsumerKey = ?b2l(couch_util:get_value(<<"consumer_key">>, Props)),
+ Token = ?b2l(couch_util:get_value(<<"token">>, Props)),
+ TokenSecret = ?b2l(couch_util:get_value(<<"token_secret">>, Props)),
+ ConsumerSecret = ?b2l(couch_util:get_value(<<"consumer_secret">>, Props)),
+ SignatureMethodStr = ?b2l(couch_util:get_value(<<"signature_method">>, Props, <<"HMAC-SHA1">>)),
+ SignatureMethodAtom = case SignatureMethodStr of
+ "PLAINTEXT" ->
+ plaintext;
+ "HMAC-SHA1" ->
+ hmac_sha1;
+ "RSA-SHA1" ->
+ rsa_sha1
+ end,
+ Consumer = {ConsumerKey, ConsumerSecret, SignatureMethodAtom},
+ Method = case Action of
+ get -> "GET";
+ post -> "POST";
+ put -> "PUT";
+ head -> "HEAD"
+ end,
+ Params = oauth:signed_params(Method, Url, QSL, Consumer, Token, TokenSecret)
+ -- QSL,
+ {"Authorization", "OAuth " ++ oauth_uri:params_to_header_string(Params)}.
+
+ssl_options(#http_db{url = Url}) ->
+ case ibrowse_lib:parse_url(Url) of
+ #url{protocol = https} ->
+ Depth = list_to_integer(
+ couch_config:get("replicator", "ssl_certificate_max_depth", "3")
+ ),
+ SslOpts = [{depth, Depth} |
+ case couch_config:get("replicator", "verify_ssl_certificates") of
+ "true" ->
+ ssl_verify_options(true);
+ _ ->
+ ssl_verify_options(false)
+ end],
+ [{is_ssl, true}, {ssl_options, SslOpts}];
+ #url{protocol = http} ->
+ []
+ end.
+
+ssl_verify_options(Value) ->
+ ssl_verify_options(Value, erlang:system_info(otp_release)).
+
+ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
+ CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
+ [{verify, verify_none}];
+ssl_verify_options(true, _OTPVersion) ->
+ CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, 2}, {cacertfile, CAFile}];
+ssl_verify_options(false, _OTPVersion) ->
+ [{verify, 0}].