summaryrefslogtreecommitdiff
path: root/src/couch_inets/httpc_handler.erl
diff options
context:
space:
mode:
authorChristopher Lenz <cmlenz@apache.org>2008-03-28 23:32:19 +0000
committerChristopher Lenz <cmlenz@apache.org>2008-03-28 23:32:19 +0000
commit544a38dd45f6a58d34296c6c768afd086eb2ac70 (patch)
treec84cc02340b06aae189cff0dbfaee698f273f1f5 /src/couch_inets/httpc_handler.erl
parent804cbbe033b8e7a3e8d7058aaf31bdf69ef18ac5 (diff)
Imported trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@642432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couch_inets/httpc_handler.erl')
-rw-r--r--src/couch_inets/httpc_handler.erl953
1 files changed, 953 insertions, 0 deletions
diff --git a/src/couch_inets/httpc_handler.erl b/src/couch_inets/httpc_handler.erl
new file mode 100644
index 00000000..8019b72b
--- /dev/null
+++ b/src/couch_inets/httpc_handler.erl
@@ -0,0 +1,953 @@
+% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%%
+%% $Id$
+
+-module(httpc_handler).
+
+-behaviour(gen_server).
+
+-include("httpc_internal.hrl").
+-include("http_internal.hrl").
+
+%%--------------------------------------------------------------------
+%% Application API
+-export([start_link/2, send/2, cancel/2, stream/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(timers, {request_timers = [], % [ref()]
+ pipeline_timer % ref()
+ }).
+
+-record(state, {request, % #request{}
+ session, % #tcp_session{}
+ status_line, % {Version, StatusCode, ReasonPharse}
+ headers, % #http_response_h{}
+ body, % binary()
+ mfa, % {Moduel, Function, Args}
+ pipeline = queue:new(),% queue()
+ status = new, % new | pipeline | close | ssl_tunnel
+ canceled = [], % [RequestId]
+ max_header_size = nolimit, % nolimit | integer()
+ max_body_size = nolimit, % nolimit | integer()
+ options, % #options{}
+ timers = #timers{} % #timers{}
+ }).
+
+%%====================================================================
+%% External functions
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start() -> {ok, Pid}
+%%
+%% Description: Starts a http-request handler process. Intended to be
+%% called by the httpc manager process.
+%%
+%% Note: Uses proc_lib and gen_server:enter_loop so that waiting
+%% for gen_tcp:connect to timeout in init/1 will not
+%% block the httpc manager process in odd cases such as trying to call
+%% a server that does not exist. (See OTP-6735) The only API function
+%% sending messages to the handler process that can be called before
+%% init has compleated is cancel and that is not a problem! (Send and
+%% stream will not be called before the first request has been sent and
+%% the reply or part of it has arrived.)
+%%--------------------------------------------------------------------
+start_link(Request, ProxyOptions) ->
+ {ok, proc_lib:spawn_link(?MODULE, init, [[Request, ProxyOptions]])}.
+
+%%--------------------------------------------------------------------
+%% Function: send(Request, Pid) -> ok
+%% Request = #request{}
+%% Pid = pid() - the pid of the http-request handler process.
+%%
+%% Description: Uses this handlers session to send a request. Intended
+%% to be called by the httpc manager process.
+%%--------------------------------------------------------------------
+send(Request, Pid) ->
+ call(Request, Pid, 5000).
+
+%%--------------------------------------------------------------------
+%% Function: cancel(RequestId, Pid) -> ok
+%% RequestId = ref()
+%% Pid = pid() - the pid of the http-request handler process.
+%%
+%% Description: Cancels a request. Intended to be called by the httpc
+%% manager process.
+%%--------------------------------------------------------------------
+cancel(RequestId, Pid) ->
+ cast({cancel, RequestId}, Pid).
+
+%%--------------------------------------------------------------------
+%% Function: stream(BodyPart, Request, Code) -> _
+%% BodyPart = binary()
+%% Request = #request{}
+%% Code = integer()
+%%
+%% Description: Stream the HTTP body to the caller process (client)
+%% or to a file. Note that the data that has been stream
+%% does not have to be saved. (We do not want to use up
+%% memory in vain.)
+%%--------------------------------------------------------------------
+%% Request should not be streamed
+stream(BodyPart, Request = #request{stream = none}, _) ->
+ {BodyPart, Request};
+
+stream(BodyPart, Request = #request{stream = self}, 200) -> % Stream to caller
+ httpc_response:send(Request#request.from,
+ {Request#request.id, stream, BodyPart}),
+ {<<>>, Request};
+
+stream(BodyPart, Request = #request{stream = Filename}, 200)
+ when is_list(Filename) -> % Stream to file
+ case file:open(Filename, [write, raw, append, delayed_write]) of
+ {ok, Fd} ->
+ stream(BodyPart, Request#request{stream = Fd}, 200);
+ {error, Reason} ->
+ exit({stream_to_file_failed, Reason})
+ end;
+
+stream(BodyPart, Request = #request{stream = Fd}, 200) -> % Stream to file
+ case file:write(Fd, BodyPart) of
+ ok ->
+ {<<>>, Request};
+ {error, Reason} ->
+ exit({stream_to_file_failed, Reason})
+ end;
+stream(BodyPart, Request,_) -> % only 200 responses can be streamed
+ {BodyPart, Request}.
+
+%%====================================================================
+%% Server functions
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init([Request, Session]) -> {ok, State} |
+%% {ok, State, Timeout} | ignore |{stop, Reason}
+%%
+%% Description: Initiates the httpc_handler process
+%%
+%% Note: The init function may not fail, that will kill the
+%% httpc_manager process. We could make the httpc_manager more comlex
+%% but we do not want that so errors will be handled by the process
+%% sending an init_error message to itself.
+%%--------------------------------------------------------------------
+init([Request, Options]) ->
+ process_flag(trap_exit, true),
+ handle_verbose(Options#options.verbose),
+ Address = handle_proxy(Request#request.address, Options#options.proxy),
+ {ok, State} =
+ case {Address /= Request#request.address, Request#request.scheme} of
+ {true, https} ->
+ Error = https_through_proxy_is_not_currently_supported,
+ self() ! {init_error,
+ Error, httpc_response:error(Request, Error)},
+ {ok, #state{request = Request, options = Options,
+ status = ssl_tunnel}};
+ %% This is what we should do if and when ssl supports
+ %% "socket upgrading"
+ %%send_ssl_tunnel_request(Address, Request,
+ %% #state{options = Options,
+ %% status = ssl_tunnel});
+ {_, _} ->
+ send_first_request(Address, Request, #state{options = Options})
+ end,
+ gen_server:enter_loop(?MODULE, [], State).
+
+%%--------------------------------------------------------------------
+%% Function: handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} | (terminate/2 is called)
+%% {stop, Reason, State} (terminate/2 is called)
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(Request, _, State = #state{session = Session =
+ #tcp_session{socket = Socket},
+ timers = Timers,
+ options = Options}) ->
+ Address = handle_proxy(Request#request.address, Options#options.proxy),
+
+ case httpc_request:send(Address, Request, Socket) of
+ ok ->
+ %% Activate the request time out for the new request
+ NewState = activate_request_timeout(State#state{request =
+ Request}),
+
+ ClientClose = httpc_request:is_client_closing(
+ Request#request.headers),
+ case State#state.request of
+ #request{} -> %% Old request no yet finished
+ %% Make sure to use the new value of timers in state
+ NewTimers = NewState#state.timers,
+ NewPipeline = queue:in(Request, State#state.pipeline),
+ NewSession =
+ Session#tcp_session{pipeline_length =
+ %% Queue + current
+ queue:len(NewPipeline) + 1,
+ client_close = ClientClose},
+ httpc_manager:insert_session(NewSession),
+ {reply, ok, State#state{pipeline = NewPipeline,
+ session = NewSession,
+ timers = NewTimers}};
+ undefined ->
+ %% Note: tcp-message reciving has already been
+ %% activated by handle_pipeline/2. Also
+ %% the parsing-function #state.mfa is initiated
+ %% by handle_pipeline/2.
+ cancel_timer(Timers#timers.pipeline_timer,
+ timeout_pipeline),
+ NewSession =
+ Session#tcp_session{pipeline_length = 1,
+ client_close = ClientClose},
+ httpc_manager:insert_session(NewSession),
+ {reply, ok,
+ NewState#state{request = Request,
+ session = NewSession,
+ timers =
+ Timers#timers{pipeline_timer =
+ undefined}}}
+ end;
+ {error, Reason} ->
+ {reply, {pipline_failed, Reason}, State}
+ end.
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State} (terminate/2 is called)
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+
+%% When the request in process has been canceld the handler process is
+%% stopped and the pipelined requests will be reissued. This is is
+%% based on the assumption that it is proably cheaper to reissue the
+%% requests than to wait for a potentiall large response that we then
+%% only throw away. This of course is not always true maybe we could
+%% do something smarter here?! If the request canceled is not
+%% the one handled right now the same effect will take place in
+%% handle_pipeline/2 when the canceled request is on turn.
+handle_cast({cancel, RequestId}, State = #state{request = Request =
+ #request{id = RequestId}}) ->
+ httpc_manager:request_canceled(RequestId),
+ {stop, normal,
+ State#state{canceled = [RequestId | State#state.canceled],
+ request = Request#request{from = answer_sent}}};
+handle_cast({cancel, RequestId}, State) ->
+ httpc_manager:request_canceled(RequestId),
+ {noreply, State#state{canceled = [RequestId | State#state.canceled]}}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State} (terminate/2 is called)
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({Proto, _Socket, Data}, State =
+ #state{mfa = {Module, Function, Args},
+ request = #request{method = Method,
+ stream = Stream} = Request,
+ session = Session, status_line = StatusLine})
+ when Proto == tcp; Proto == ssl; Proto == httpc_handler ->
+
+ case Module:Function([Data | Args]) of
+ {ok, Result} ->
+ handle_http_msg(Result, State);
+ {_, whole_body, _} when Method == head ->
+ handle_response(State#state{body = <<>>});
+ {Module, whole_body, [Body, Length]} ->
+ {_, Code, _} = StatusLine,
+ {NewBody, NewRequest} = stream(Body, Request, Code),
+ %% When we stream we will not keep the already
+ %% streamed data, that would be a waste of memory.
+ NewLength = case Stream of
+ none ->
+ Length;
+ _ ->
+ Length - size(Body)
+ end,
+ http_transport:setopts(socket_type(Session#tcp_session.scheme),
+ Session#tcp_session.socket,
+ [{active, once}]),
+ {noreply, State#state{mfa = {Module, whole_body,
+ [NewBody, NewLength]},
+ request = NewRequest}};
+ NewMFA ->
+ http_transport:setopts(socket_type(Session#tcp_session.scheme),
+ Session#tcp_session.socket,
+ [{active, once}]),
+ {noreply, State#state{mfa = NewMFA}}
+ end;
+
+%% The Server may close the connection to indicate that the
+%% whole body is now sent instead of sending an lengh
+%% indicator.
+handle_info({tcp_closed, _}, State = #state{mfa = {_, whole_body, Args}}) ->
+ handle_response(State#state{body = hd(Args)});
+handle_info({ssl_closed, _}, State = #state{mfa = {_, whole_body, Args}}) ->
+ handle_response(State#state{body = hd(Args)});
+
+%%% Server closes idle pipeline
+handle_info({tcp_closed, _}, State = #state{request = undefined}) ->
+ {stop, normal, State};
+handle_info({ssl_closed, _}, State = #state{request = undefined}) ->
+ {stop, normal, State};
+
+%%% Error cases
+handle_info({tcp_closed, _}, State) ->
+ {stop, session_remotly_closed, State};
+handle_info({ssl_closed, _}, State) ->
+ {stop, session_remotly_closed, State};
+handle_info({tcp_error, _, _} = Reason, State) ->
+ {stop, Reason, State};
+handle_info({ssl_error, _, _} = Reason, State) ->
+ {stop, Reason, State};
+
+%%% Timeouts
+%% Internaly, to a request handling process, a request time out is
+%% seen as a canceld request.
+handle_info({timeout, RequestId}, State =
+ #state{request = Request = #request{id = RequestId}}) ->
+ httpc_response:send(Request#request.from,
+ httpc_response:error(Request,timeout)),
+ {stop, normal,
+ State#state{canceled = [RequestId | State#state.canceled],
+ request = Request#request{from = answer_sent}}};
+handle_info({timeout, RequestId}, State = #state{request = Request}) ->
+ httpc_response:send(Request#request.from,
+ httpc_response:error(Request,timeout)),
+ {noreply, State#state{canceled = [RequestId | State#state.canceled]}};
+
+handle_info(timeout_pipeline, State = #state{request = undefined}) ->
+ {stop, normal, State};
+
+%% Setting up the connection to the server somehow failed.
+handle_info({init_error, _, ClientErrMsg},
+ State = #state{request = Request}) ->
+ NewState = answer_request(Request, ClientErrMsg, State),
+ {stop, normal, NewState};
+
+%%% httpc_manager process dies.
+handle_info({'EXIT', _, _}, State = #state{request = undefined}) ->
+ {stop, normal, State};
+%%Try to finish the current request anyway,
+%% there is a fairly high probability that it can be done successfully.
+%% Then close the connection, hopefully a new manager is started that
+%% can retry requests in the pipeline.
+handle_info({'EXIT', _, _}, State) ->
+ {noreply, State#state{status = close}}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> _ (ignored by gen_server)
+%% Description: Shutdown the httpc_handler
+%%--------------------------------------------------------------------
+terminate(normal, #state{session = undefined}) ->
+ ok; %% Init error there is no socket to be closed.
+terminate(normal, #state{request = Request,
+ session = #tcp_session{id = undefined,
+ socket = Socket}}) ->
+ %% Init error sending, no session information has been setup but
+ %% there is a socket that needs closing.
+ http_transport:close(socket_type(Request), Socket);
+
+terminate(_, State = #state{session = Session, request = undefined,
+ timers = Timers}) ->
+ catch httpc_manager:delete_session(Session#tcp_session.id),
+
+ case queue:is_empty(State#state.pipeline) of
+ false ->
+ retry_pipline(queue:to_list(State#state.pipeline), State);
+ true ->
+ ok
+ end,
+ cancel_timer(Timers#timers.pipeline_timer, timeout_pipeline),
+ http_transport:close(socket_type(Session#tcp_session.scheme),
+ Session#tcp_session.socket);
+
+terminate(Reason, State = #state{request = Request})->
+ NewState = case Request#request.from of
+ answer_sent ->
+ State;
+ _ ->
+ answer_request(Request,
+ httpc_response:error(Request, Reason),
+ State)
+ end,
+ terminate(Reason, NewState#state{request = undefined}).
+
+
+%%--------------------------------------------------------------------
+%% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState}
+%% Purpose: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+send_first_request(Address, Request, State) ->
+ Ipv6 = (State#state.options)#options.ipv6,
+ SocketType = socket_type(Request),
+ TimeOut = (Request#request.settings)#http_options.timeout,
+ case http_transport:connect(SocketType, Address, Ipv6, TimeOut) of
+ {ok, Socket} ->
+ case httpc_request:send(Address, Request, Socket) of
+ ok ->
+ ClientClose =
+ httpc_request:is_client_closing(
+ Request#request.headers),
+ Session =
+ #tcp_session{id = {Request#request.address, self()},
+ scheme = Request#request.scheme,
+ socket = Socket,
+ client_close = ClientClose},
+ TmpState = State#state{request = Request,
+ session = Session,
+ mfa =
+ {httpc_response, parse,
+ [State#state.max_header_size]},
+ status_line = undefined,
+ headers = undefined,
+ body = undefined,
+ status = new},
+ http_transport:setopts(SocketType,
+ Socket, [{active, once}]),
+ NewState = activate_request_timeout(TmpState),
+ {ok, NewState};
+ {error, Reason} ->
+ %% Commented out in wait of ssl support to avoid
+ %% dialyzer warning
+ %%case State#state.status of
+ %% new -> % Called from init/1
+ self() ! {init_error, error_sending,
+ httpc_response:error(Request, Reason)},
+ {ok, State#state{request = Request,
+ session =
+ #tcp_session{socket = Socket}}}
+ %%ssl_tunnel -> % Not called from init/1
+ %% NewState =
+ %% answer_request(Request,
+ %%httpc_response:error(Request,
+ %%Reason),
+ %% State),
+ %% {stop, normal, NewState}
+ %% end
+ end;
+ {error, Reason} ->
+ %% Commented out in wait of ssl support to avoid
+ %% dialyzer warning
+ %% case State#state.status of
+ %% new -> % Called from init/1
+ self() ! {init_error, error_connecting,
+ httpc_response:error(Request, Reason)},
+ {ok, State#state{request = Request}}
+ %% ssl_tunnel -> % Not called from init/1
+ %% NewState =
+ %% answer_request(Request,
+ %% httpc_response:error(Request,
+ %% Reason),
+ %% State),
+ %% {stop, normal, NewState}
+ %%end
+ end.
+
+handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body},
+ State = #state{request = Request}) ->
+
+ case Headers#http_response_h.'content-type' of
+ "multipart/byteranges" ++ _Param ->
+ exit(not_yet_implemented);
+ _ ->
+ start_stream({{Version, StatusCode, ReasonPharse}, Headers},
+ Request),
+ handle_http_body(Body,
+ State#state{status_line = {Version,
+ StatusCode,
+ ReasonPharse},
+ headers = Headers})
+ end;
+handle_http_msg({ChunkedHeaders, Body},
+ State = #state{headers = Headers}) ->
+ NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders),
+ handle_response(State#state{headers = NewHeaders, body = Body});
+handle_http_msg(Body, State = #state{status_line = {_,Code, _}}) ->
+ {NewBody, NewRequest}= stream(Body, State#state.request, Code),
+ handle_response(State#state{body = NewBody, request = NewRequest}).
+
+handle_http_body(<<>>, State = #state{request = #request{method = head}}) ->
+ handle_response(State#state{body = <<>>});
+
+handle_http_body(Body, State = #state{headers = Headers, session = Session,
+ max_body_size = MaxBodySize,
+ status_line = {_,Code, _},
+ request = Request}) ->
+ case Headers#http_response_h.'transfer-encoding' of
+ "chunked" ->
+ case http_chunk:decode(Body, State#state.max_body_size,
+ State#state.max_header_size,
+ {Code, Request}) of
+ {Module, Function, Args} ->
+ http_transport:setopts(socket_type(
+ Session#tcp_session.scheme),
+ Session#tcp_session.socket,
+ [{active, once}]),
+ {noreply, State#state{mfa =
+ {Module, Function, Args}}};
+ {ok, {ChunkedHeaders, NewBody}} ->
+ NewHeaders = http_chunk:handle_headers(Headers,
+ ChunkedHeaders),
+ handle_response(State#state{headers = NewHeaders,
+ body = NewBody})
+ end;
+ Encoding when list(Encoding) ->
+ NewState = answer_request(Request,
+ httpc_response:error(Request,
+ unknown_encoding),
+ State),
+ {stop, normal, NewState};
+ _ ->
+ Length =
+ list_to_integer(Headers#http_response_h.'content-length'),
+ case ((Length =< MaxBodySize) or (MaxBodySize == nolimit)) of
+ true ->
+ case httpc_response:whole_body(Body, Length) of
+ {ok, Body} ->
+ {NewBody, NewRequest}= stream(Body, Request, Code),
+ handle_response(State#state{body = NewBody,
+ request = NewRequest});
+ MFA ->
+ http_transport:setopts(
+ socket_type(Session#tcp_session.scheme),
+ Session#tcp_session.socket,
+ [{active, once}]),
+ {noreply, State#state{mfa = MFA}}
+ end;
+ false ->
+ NewState =
+ answer_request(Request,
+ httpc_response:error(Request,
+ body_too_big),
+ State),
+ {stop, normal, NewState}
+ end
+ end.
+
+%%% Normaly I do not comment out code, I throw it away. But this might
+%%% actually be used on day if ssl is improved.
+%% handle_response(State = #state{status = ssl_tunnel,
+%% request = Request,
+%% options = Options,
+%% session = #tcp_session{socket = Socket,
+%% scheme = Scheme},
+%% status_line = {_, 200, _}}) ->
+%% %%% Insert code for upgrading the socket if and when ssl supports this.
+%% Address = handle_proxy(Request#request.address, Options#options.proxy),
+%% send_first_request(Address, Request, State);
+%% handle_response(State = #state{status = ssl_tunnel,
+%% request = Request}) ->
+%% NewState = answer_request(Request,
+%% httpc_response:error(Request,
+%% ssl_proxy_tunnel_failed),
+%% State),
+%% {stop, normal, NewState};
+
+handle_response(State = #state{status = new}) ->
+ handle_response(try_to_enable_pipline(State));
+
+handle_response(State = #state{request = Request,
+ status = Status,
+ session = Session,
+ status_line = StatusLine,
+ headers = Headers,
+ body = Body,
+ options = Options}) when Status =/= new ->
+
+ handle_cookies(Headers, Request, Options),
+ case httpc_response:result({StatusLine, Headers, Body}, Request) of
+ %% 100-continue
+ continue ->
+ %% Send request body
+ {_, RequestBody} = Request#request.content,
+ http_transport:send(socket_type(Session#tcp_session.scheme),
+ Session#tcp_session.socket,
+ RequestBody),
+ %% Wait for next response
+ http_transport:setopts(socket_type(Session#tcp_session.scheme),
+ Session#tcp_session.socket,
+ [{active, once}]),
+ {noreply,
+ State#state{mfa = {httpc_response, parse,
+ [State#state.max_header_size]},
+ status_line = undefined,
+ headers = undefined,
+ body = undefined
+ }};
+ %% Ignore unexpected 100-continue response and receive the
+ %% actual response that the server will send right away.
+ {ignore, Data} ->
+ NewState = State#state{mfa =
+ {httpc_response, parse,
+ [State#state.max_header_size]},
+ status_line = undefined,
+ headers = undefined,
+ body = undefined},
+ handle_info({httpc_handler, dummy, Data}, NewState);
+ %% On a redirect or retry the current request becomes
+ %% obsolete and the manager will create a new request
+ %% with the same id as the current.
+ {redirect, NewRequest, Data}->
+ ok = httpc_manager:redirect_request(NewRequest),
+ handle_pipeline(State#state{request = undefined}, Data);
+ {retry, TimeNewRequest, Data}->
+ ok = httpc_manager:retry_request(TimeNewRequest),
+ handle_pipeline(State#state{request = undefined}, Data);
+ {ok, Msg, Data} ->
+ end_stream(StatusLine, Request),
+ NewState = answer_request(Request, Msg, State),
+ handle_pipeline(NewState, Data);
+ {stop, Msg} ->
+ end_stream(StatusLine, Request),
+ NewState = answer_request(Request, Msg, State),
+ {stop, normal, NewState}
+ end.
+
+handle_cookies(_,_, #options{cookies = disabled}) ->
+ ok;
+%% User wants to verify the cookies before they are stored,
+%% so the user will have to call a store command.
+handle_cookies(_,_, #options{cookies = verify}) ->
+ ok;
+handle_cookies(Headers, Request, #options{cookies = enabled}) ->
+ {Host, _ } = Request#request.address,
+ Cookies = http_cookie:cookies(Headers#http_response_h.other,
+ Request#request.path, Host),
+ httpc_manager:store_cookies(Cookies, Request#request.address).
+
+%% This request could not be pipelined
+handle_pipeline(State = #state{status = close}, _) ->
+ {stop, normal, State};
+
+handle_pipeline(State = #state{status = pipeline, session = Session},
+ Data) ->
+ case queue:out(State#state.pipeline) of
+ {empty, _} ->
+ %% The server may choose too teminate an idle pipeline
+ %% in this case we want to receive the close message
+ %% at once and not when trying to pipline the next
+ %% request.
+ http_transport:setopts(socket_type(Session#tcp_session.scheme),
+ Session#tcp_session.socket,
+ [{active, once}]),
+ %% If a pipeline that has been idle for some time is not
+ %% closed by the server, the client may want to close it.
+ NewState = activate_pipeline_timeout(State),
+ NewSession = Session#tcp_session{pipeline_length = 0},
+ httpc_manager:insert_session(NewSession),
+ {noreply,
+ NewState#state{request = undefined,
+ mfa = {httpc_response, parse,
+ [NewState#state.max_header_size]},
+ status_line = undefined,
+ headers = undefined,
+ body = undefined
+ }
+ };
+ {{value, NextRequest}, Pipeline} ->
+ case lists:member(NextRequest#request.id,
+ State#state.canceled) of
+ true ->
+ %% See comment for handle_cast({cancel, RequestId})
+ {stop, normal,
+ State#state{request =
+ NextRequest#request{from = answer_sent}}};
+ false ->
+ NewSession =
+ Session#tcp_session{pipeline_length =
+ %% Queue + current
+ queue:len(Pipeline) + 1},
+ httpc_manager:insert_session(NewSession),
+ NewState =
+ State#state{pipeline = Pipeline,
+ request = NextRequest,
+ mfa = {httpc_response, parse,
+ [State#state.max_header_size]},
+ status_line = undefined,
+ headers = undefined,
+ body = undefined},
+ case Data of
+ <<>> ->
+ http_transport:setopts(
+ socket_type(Session#tcp_session.scheme),
+ Session#tcp_session.socket,
+ [{active, once}]),
+ {noreply, NewState};
+ _ ->
+ %% If we already received some bytes of
+ %% the next response
+ handle_info({httpc_handler, dummy, Data},
+ NewState)
+ end
+ end
+ end.
+
+call(Msg, Pid, Timeout) ->
+ gen_server:call(Pid, Msg, Timeout).
+
+cast(Msg, Pid) ->
+ gen_server:cast(Pid, Msg).
+
+activate_request_timeout(State = #state{request = Request}) ->
+ Time = (Request#request.settings)#http_options.timeout,
+ case Time of
+ infinity ->
+ State;
+ _ ->
+ Ref = erlang:send_after(Time, self(),
+ {timeout, Request#request.id}),
+ State#state
+ {timers =
+ #timers{request_timers =
+ [{Request#request.id, Ref}|
+ (State#state.timers)#timers.request_timers]}}
+ end.
+
+activate_pipeline_timeout(State = #state{options =
+ #options{pipeline_timeout =
+ infinity}}) ->
+ State;
+activate_pipeline_timeout(State = #state{options =
+ #options{pipeline_timeout = Time}}) ->
+ Ref = erlang:send_after(Time, self(), timeout_pipeline),
+ State#state{timers = #timers{pipeline_timer = Ref}}.
+
+is_pipeline_capable_server("HTTP/1." ++ N, _) when hd(N) >= $1 ->
+ true;
+is_pipeline_capable_server("HTTP/1.0",
+ #http_response_h{connection = "keep-alive"}) ->
+ true;
+is_pipeline_capable_server(_,_) ->
+ false.
+
+is_keep_alive_connection(Headers, Session) ->
+ (not ((Session#tcp_session.client_close) or
+ httpc_response:is_server_closing(Headers))).
+
+try_to_enable_pipline(State = #state{session = Session,
+ request = #request{method = Method},
+ status_line = {Version, _, _},
+ headers = Headers}) ->
+ case (is_pipeline_capable_server(Version, Headers)) and
+ (is_keep_alive_connection(Headers, Session)) and
+ (httpc_request:is_idempotent(Method)) of
+ true ->
+ httpc_manager:insert_session(Session),
+ State#state{status = pipeline};
+ false ->
+ State#state{status = close}
+ end.
+
+answer_request(Request, Msg, State = #state{timers = Timers}) ->
+ httpc_response:send(Request#request.from, Msg),
+ RequestTimers = Timers#timers.request_timers,
+ TimerRef =
+ http_util:key1search(RequestTimers, Request#request.id, undefined),
+ Timer = {Request#request.id, TimerRef},
+ cancel_timer(TimerRef, {timeout, Request#request.id}),
+ State#state{request = Request#request{from = answer_sent},
+ timers =
+ Timers#timers{request_timers =
+ lists:delete(Timer, RequestTimers)}}.
+cancel_timer(undefined, _) ->
+ ok;
+cancel_timer(Timer, TimeoutMsg) ->
+ erlang:cancel_timer(Timer),
+ receive
+ TimeoutMsg ->
+ ok
+ after 0 ->
+ ok
+ end.
+
+retry_pipline([], _) ->
+ ok;
+
+retry_pipline([Request |PipeLine], State = #state{timers = Timers}) ->
+ NewState =
+ case (catch httpc_manager:retry_request(Request)) of
+ ok ->
+ RequestTimers = Timers#timers.request_timers,
+ Timer = {_, TimerRef} =
+ http_util:key1search(RequestTimers, Request#request.id,
+ {undefined, undefined}),
+ cancel_timer(TimerRef, {timeout, Request#request.id}),
+ State#state{timers = Timers#timers{request_timers =
+ lists:delete(Timer,
+ RequestTimers)}};
+ Error ->
+ answer_request(Request#request.from,
+ httpc_response:error(Request, Error), State)
+ end,
+ retry_pipline(PipeLine, NewState).
+
+%%% Check to see if the given {Host,Port} tuple is in the NoProxyList
+%%% Returns an eventually updated {Host,Port} tuple, with the proxy address
+handle_proxy(HostPort = {Host, _Port}, {Proxy, NoProxy}) ->
+ case Proxy of
+ undefined ->
+ HostPort;
+ Proxy ->
+ case is_no_proxy_dest(Host, NoProxy) of
+ true ->
+ HostPort;
+ false ->
+ Proxy
+ end
+ end.
+
+is_no_proxy_dest(_, []) ->
+ false;
+is_no_proxy_dest(Host, [ "*." ++ NoProxyDomain | NoProxyDests]) ->
+
+ case is_no_proxy_dest_domain(Host, NoProxyDomain) of
+ true ->
+ true;
+ false ->
+ is_no_proxy_dest(Host, NoProxyDests)
+ end;
+
+is_no_proxy_dest(Host, [NoProxyDest | NoProxyDests]) ->
+ IsNoProxyDest = case http_util:is_hostname(NoProxyDest) of
+ true ->
+ fun is_no_proxy_host_name/2;
+ false ->
+ fun is_no_proxy_dest_address/2
+ end,
+
+ case IsNoProxyDest(Host, NoProxyDest) of
+ true ->
+ true;
+ false ->
+ is_no_proxy_dest(Host, NoProxyDests)
+ end.
+
+is_no_proxy_host_name(Host, Host) ->
+ true;
+is_no_proxy_host_name(_,_) ->
+ false.
+
+is_no_proxy_dest_domain(Dest, DomainPart) ->
+ lists:suffix(DomainPart, Dest).
+
+is_no_proxy_dest_address(Dest, Dest) ->
+ true;
+is_no_proxy_dest_address(Dest, AddressPart) ->
+ lists:prefix(AddressPart, Dest).
+
+socket_type(#request{scheme = http}) ->
+ ip_comm;
+socket_type(#request{scheme = https, settings = Settings}) ->
+ {ssl, Settings#http_options.ssl};
+socket_type(http) ->
+ ip_comm;
+socket_type(https) ->
+ {ssl, []}. %% Dummy value ok for ex setops that does not use this value
+
+start_stream(_, #request{stream = none}) ->
+ ok;
+start_stream({{_, 200, _}, Headers}, Request = #request{stream = self}) ->
+ Msg = httpc_response:stream_start(Headers, Request),
+ httpc_response:send(Request#request.from, Msg);
+start_stream(_, _) ->
+ ok.
+
+%% Note the end stream message is handled by httpc_response and will
+%% be sent by answer_request
+end_stream(_, #request{stream = none}) ->
+ ok;
+end_stream(_, #request{stream = self}) ->
+ ok;
+end_stream({_,200,_}, #request{stream = Fd}) ->
+ case file:close(Fd) of
+ ok ->
+ ok;
+ {error, enospc} -> % Could be due to delayed_write
+ file:close(Fd)
+ end;
+end_stream(_, _) ->
+ ok.
+
+handle_verbose(verbose) ->
+ dbg:p(self(), [r]);
+handle_verbose(debug) ->
+ dbg:p(self(), [call]),
+ dbg:tp(?MODULE, [{'_', [], [{return_trace}]}]);
+handle_verbose(trace) ->
+ dbg:p(self(), [call]),
+ dbg:tpl(?MODULE, [{'_', [], [{return_trace}]}]);
+handle_verbose(_) ->
+ ok.
+
+%%% Normaly I do not comment out code, I throw it away. But this might
+%%% actually be used on day if ssl is improved.
+%% send_ssl_tunnel_request(Address, Request = #request{address = {Host, Port}},
+%% State) ->
+%% %% A ssl tunnel request is a special http request that looks like
+%% %% CONNECT host:port HTTP/1.1
+%% SslTunnelRequest = #request{method = connect, scheme = http,
+%% headers =
+%% #http_request_h{
+%% host = Host,
+%% address = Address,
+%% path = Host ++ ":",
+%% pquery = integer_to_list(Port),
+%% other = [{ "Proxy-Connection", "keep-alive"}]},
+%% Ipv6 = (State#state.options)#options.ipv6,
+%% SocketType = socket_type(SslTunnelRequest),
+%% case http_transport:connect(SocketType,
+%% SslTunnelRequest#request.address, Ipv6) of
+%% {ok, Socket} ->
+%% case httpc_request:send(Address, SslTunnelRequest, Socket) of
+%% ok ->
+%% Session = #tcp_session{id =
+%% {SslTunnelRequest#request.address,
+%% self()},
+%% scheme =
+%% SslTunnelRequest#request.scheme,
+%% socket = Socket},
+%% NewState = State#state{mfa =
+%% {httpc_response, parse,
+%% [State#state.max_header_size]},
+%% request = Request,
+%% session = Session},
+%% http_transport:setopts(socket_type(
+%% SslTunnelRequest#request.scheme),
+%% Socket,
+%% [{active, once}]),
+%% {ok, NewState};
+%% {error, Reason} ->
+%% self() ! {init_error, error_sending,
+%% httpc_response:error(Request, Reason)},
+%% {ok, State#state{request = Request,
+%% session = #tcp_session{socket =
+%% Socket}}}
+%% end;
+%% {error, Reason} ->
+%% self() ! {init_error, error_connecting,
+%% httpc_response:error(Request, Reason)},
+%% {ok, State#state{request = Request}}
+%% end.