summaryrefslogtreecommitdiff
path: root/src/ibrowse/ibrowse_http_client.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-07-03 00:58:13 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-07-03 00:58:13 +0000
commit963dd5ee2c59341e1506908e164100d5fa79e10b (patch)
treefa000912590ae4b45de88fa6157e86d84f318fb6 /src/ibrowse/ibrowse_http_client.erl
parenta2a39e30b51cde4b5df6adf32078bad881ebf34c (diff)
upgrade to ibrowse 1.5.0
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@790771 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/ibrowse/ibrowse_http_client.erl')
-rw-r--r--src/ibrowse/ibrowse_http_client.erl402
1 files changed, 224 insertions, 178 deletions
diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl
index 9455bc20..24214ffb 100644
--- a/src/ibrowse/ibrowse_http_client.erl
+++ b/src/ibrowse/ibrowse_http_client.erl
@@ -6,7 +6,7 @@
%%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <chandrashekhar.mullaparthi@t-mobile.co.uk>
%%%-------------------------------------------------------------------
-module(ibrowse_http_client).
--vsn('$Id: ibrowse_http_client.erl,v 1.18 2008/05/21 15:28:11 chandrusf Exp $ ').
+-vsn('$Id: ibrowse_http_client.erl,v 1.19 2009/07/01 22:43:19 chandrusf Exp $ ').
-behaviour(gen_server).
%%--------------------------------------------------------------------
@@ -42,11 +42,12 @@
use_proxy = false, proxy_auth_digest,
ssl_options = [], is_ssl = false, socket,
reqs=queue:new(), cur_req, status=idle, http_status_code,
- reply_buffer=[], rep_buf_size=0, streamed_size = 0,
+ reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0,
recvd_headers=[],
is_closing, send_timer, content_length,
- deleted_crlf = false, transfer_encoding, chunk_size,
- chunks=[], lb_ets_tid, cur_pipeline_size = 0
+ deleted_crlf = false, transfer_encoding,
+ chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
+ lb_ets_tid, cur_pipeline_size = 0
}).
-record(request, {url, method, options, from,
@@ -57,8 +58,6 @@
response_format}).
-import(ibrowse_lib, [
- parse_url/1,
- printable_date/0,
get_value/2,
get_value/3,
do_trace/2
@@ -83,15 +82,9 @@ stop(Conn_pid) ->
gen_server:call(Conn_pid, stop).
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
- Timeout_1 = case Timeout of
- infinity ->
- infinity;
- _ when is_integer(Timeout) ->
- Timeout + 100
- end,
gen_server:call(
Conn_Pid,
- {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout_1).
+ {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout).
%%====================================================================
%% Server functions
@@ -170,23 +163,29 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
Reqs = queue:in(NewReq, State#state.reqs),
State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}),
do_trace("Connecting...~n", []),
- Timeout_1 = case Timeout of
- infinity ->
- infinity;
- _ ->
- round(Timeout*0.9)
- end,
- case do_connect(Host_1, Port_1, Options, State_2, Timeout_1) of
+ Start_ts = now(),
+ Conn_timeout = get_value(connect_timeout, Options, Timeout),
+ case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
{ok, Sock} ->
+ do_trace("Connected!~n", []),
+ End_ts = now(),
Ref = case Timeout of
infinity ->
undefined;
_ ->
- erlang:send_after(Timeout, self(), {req_timedout, From})
+ Rem_time = Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)),
+ case Rem_time > 0 of
+ true ->
+ erlang:send_after(Rem_time, self(), {req_timedout, From});
+ false ->
+ shutting_down(State_2),
+ do_error_reply(State_2, req_timedout),
+ exit(normal)
+ end
end,
- do_trace("Connected!~n", []),
case send_req_1(Url, Headers, Method, Body, Options, Sock, State_2) of
ok ->
+ do_setopts(Sock, [{active, once}], State_2#state.is_ssl),
case StreamTo of
undefined ->
ok;
@@ -197,7 +196,7 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
send_timer = Ref,
cur_req = NewReq,
status = get_header}),
- {noreply, State_3};
+ {noreply, State_3, get_inac_timeout(State_3)};
Err ->
shutting_down(State_2),
do_trace("Send failed... Reason: ~p~n", [Err]),
@@ -234,7 +233,7 @@ handle_call({send_req, {Url, Headers, Method,
case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of
ok ->
State_2 = inc_pipeline_counter(State_1),
- do_setopts(Sock, [{active, true}], State#state.is_ssl),
+ do_setopts(Sock, [{active, once}], State#state.is_ssl),
case Timeout of
infinity ->
ok;
@@ -254,7 +253,7 @@ handle_call({send_req, {Url, Headers, Method,
_ ->
gen_server:reply(From, {ibrowse_req_id, ReqId})
end,
- {noreply, State_3};
+ {noreply, State_3, get_inac_timeout(State_3)};
Err ->
shutting_down(State_1),
do_trace("Send request failed: Reason: ~p~n", [Err]),
@@ -289,7 +288,8 @@ handle_cast(_Msg, State) ->
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
-handle_info({tcp, _Sock, Data}, State) ->
+handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
+ do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
handle_sock_data(Data, State);
handle_info({ssl, _Sock, Data}, State) ->
handle_sock_data(Data, State);
@@ -305,14 +305,19 @@ handle_info({ssl_closed, _Sock}, State) ->
handle_info({req_timedout, From}, State) ->
case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of
- false ->
- {noreply, State};
- {value, _} ->
- shutting_down(State),
- do_error_reply(State, req_timedout),
- {stop, normal, State}
+ false ->
+ {noreply, State};
+ {value, _} ->
+ shutting_down(State),
+ do_error_reply(State, req_timedout),
+ {stop, normal, State}
end;
+handle_info(timeout, State) ->
+ shutting_down(State),
+ do_error_reply(State, req_timedout),
+ {stop, normal, State};
+
handle_info({trace, Bool}, State) ->
put(my_trace_flag, Bool),
{noreply, State};
@@ -365,8 +370,8 @@ handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
shutting_down(State),
{stop, normal, State};
State_1 ->
- do_setopts(Sock, [{active, true}], State#state.is_ssl),
- {noreply, State_1}
+ do_setopts(Sock, [{active, once}], State#state.is_ssl),
+ {noreply, State_1, get_inac_timeout(State_1)}
end;
handle_sock_data(Data, #state{status=get_body, content_length=CL,
@@ -382,8 +387,8 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
State_1 ->
- do_setopts(Sock, [{active, true}], State#state.is_ssl),
- {noreply, State_1}
+ do_setopts(Sock, [{active, once}], State#state.is_ssl),
+ {noreply, State_1, get_inac_timeout(State_1)}
end;
_ ->
case parse_11_response(Data, State) of
@@ -396,20 +401,17 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
shutting_down(State),
{stop, normal, State};
State_1 ->
- do_setopts(Sock, [{active, true}], State#state.is_ssl),
- {noreply, State_1}
+ do_setopts(Sock, [{active, once}], State#state.is_ssl),
+ {noreply, State_1, get_inac_timeout(State_1)}
end
end.
accumulate_response(Data,
#state{
- cur_req = #request{save_response_to_file = SaveResponseToFile,
+ cur_req = #request{save_response_to_file = true,
tmp_file_fd = undefined} = CurReq,
- http_status_code=[$2 | _]}=State) when SaveResponseToFile /= false ->
- TmpFilename = case SaveResponseToFile of
- true -> make_tmp_filename();
- F -> F
- end,
+ http_status_code=[$2 | _]}=State) ->
+ TmpFilename = make_tmp_filename(),
case file:open(TmpFilename, [write, delayed_write, raw]) of
{ok, Fd} ->
accumulate_response(Data, State#state{
@@ -419,30 +421,30 @@ accumulate_response(Data,
{error, Reason} ->
{error, {file_open_error, Reason}}
end;
-accumulate_response(Data, #state{cur_req = #request{save_response_to_file = SaveResponseToFile,
+accumulate_response(Data, #state{cur_req = #request{save_response_to_file = true,
tmp_file_fd = Fd},
transfer_encoding=chunked,
- chunks = Chunks,
+ reply_buffer = Reply_buf,
http_status_code=[$2 | _]
- } = State) when SaveResponseToFile /= false ->
- case file:write(Fd, [Chunks | Data]) of
+ } = State) ->
+ case file:write(Fd, [Reply_buf, Data]) of
ok ->
- State#state{chunks = []};
+ State#state{reply_buffer = <<>>};
{error, Reason} ->
{error, {file_write_error, Reason}}
end;
-accumulate_response(Data, #state{cur_req = #request{save_response_to_file = SaveResponseToFile,
+accumulate_response(Data, #state{cur_req = #request{save_response_to_file = true,
tmp_file_fd = Fd},
reply_buffer = RepBuf,
http_status_code=[$2 | _]
- } = State) when SaveResponseToFile /= false ->
- case file:write(Fd, [RepBuf | Data]) of
+ } = State) ->
+ case file:write(Fd, [RepBuf, Data]) of
ok ->
- State#state{reply_buffer = []};
+ State#state{reply_buffer = <<>>};
{error, Reason} ->
{error, {file_write_error, Reason}}
end;
-accumulate_response([], State) ->
+accumulate_response(<<>>, State) ->
State;
accumulate_response(Data, #state{reply_buffer = RepBuf,
rep_buf_size = RepBufSize,
@@ -451,7 +453,7 @@ accumulate_response(Data, #state{reply_buffer = RepBuf,
#request{stream_to=StreamTo, req_id=ReqId,
stream_chunk_size = Stream_chunk_size,
response_format = Response_format} = CurReq,
- RepBuf_1 = [Data | RepBuf],
+ RepBuf_1 = concat_binary([RepBuf, Data]),
New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
@@ -459,12 +461,12 @@ accumulate_response(Data, #state{reply_buffer = RepBuf,
_ when New_data_size < Stream_chunk_size ->
State#state{reply_buffer = RepBuf_1};
_ ->
- {Stream_chunk, Rem_data} = split_list_at(flatten(lists:reverse(RepBuf_1)), Stream_chunk_size),
+ {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
accumulate_response(
Rem_data,
State#state{
- reply_buffer = [],
+ reply_buffer = <<>>,
streamed_size = Streamed_size + Stream_chunk_size})
end.
@@ -491,11 +493,11 @@ handle_sock_closed(#state{cur_req=undefined} = State) ->
%% We check for IsClosing because this the server could have sent a
%% Connection-Close header and has closed the socket to indicate end
%% of response. There maybe requests pipelined which need a response.
-handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
- is_closing=IsClosing,
- cur_req=#request{tmp_file_name=TmpFilename,
- tmp_file_fd=Fd} = CurReq,
- status=get_body, recvd_headers=Headers}=State) ->
+handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC,
+ is_closing = IsClosing,
+ cur_req = #request{tmp_file_name=TmpFilename,
+ tmp_file_fd=Fd} = CurReq,
+ status = get_body, recvd_headers = Headers}=State) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format} = CurReq,
case IsClosing of
@@ -519,11 +521,11 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
do_connect(Host, Port, _Options, #state{is_ssl=true, ssl_options=SSLOptions}, Timeout) ->
ssl:connect(Host, Port,
- [{nodelay, true}, {active, false} | SSLOptions],
+ [binary, {nodelay, true}, {active, false} | SSLOptions],
Timeout);
do_connect(Host, Port, _Options, _State, Timeout) ->
gen_tcp:connect(Host, Port,
- [{nodelay, true}, {active, false}],
+ [binary, {nodelay, true}, {active, false}],
Timeout).
do_send(Sock, Req, true) -> ssl:send(Sock, Req);
@@ -602,7 +604,7 @@ send_req_1(#url{abspath = AbsPath,
io:format("Err: ~p~n", [Err]),
Err
end,
- do_setopts(Sock, [{active, true}], State#state.is_ssl),
+ do_setopts(Sock, [{active, once}], State#state.is_ssl),
SndRes.
add_auth_headers(#url{username = User,
@@ -758,12 +760,12 @@ chunk_request_body(Body, _ChunkSize, Acc) when list(Body) ->
parse_response(_Data, #state{cur_req = undefined}=State) ->
State#state{status = idle};
-parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
- cur_req=CurReq}=State) ->
+parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
+ cur_req = CurReq} = State) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
method=Method, response_format = Resp_format} = CurReq,
MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
- case scan_header(Data, Acc) of
+ case scan_header(Acc, Data) of
{yes, Headers, Data_1} ->
do_trace("Recvd Header Data -> ~s~n----~n", [Headers]),
do_trace("Recvd headers~n--- Headers Begin ---~n~s~n--- Headers End ---~n~n", [Headers]),
@@ -779,7 +781,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
ok
end,
State_1 = State#state{recvd_headers=Headers_1, status=get_body,
- reply_buffer = [],
+ reply_buffer = <<>>,
http_status_code=StatCode, is_closing=IsClosing},
put(conn_close, ConnClose),
TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")),
@@ -818,7 +820,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
case parse_11_response(Data_1, State_1#state{transfer_encoding=chunked,
chunk_size=chunk_start,
- reply_buffer=[], chunks=[]}) of
+ reply_buffer = <<>>}) of
{error, Reason} ->
fail_pipelined_requests(State_1,
{error, {Reason,
@@ -830,7 +832,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
undefined when HttpVsn == "HTTP/1.0";
ConnClose == "close" ->
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
- State_1#state{reply_buffer=[Data_1]};
+ State_1#state{reply_buffer = Data_1};
undefined ->
fail_pipelined_requests(State_1,
{error, {content_length_undefined,
@@ -842,7 +844,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
do_trace("Recvd Content-Length of ~p~n", [V_1]),
State_2 = State_1#state{rep_buf_size=0,
- reply_buffer=[],
+ reply_buffer = <<>>,
content_length=V_1},
case parse_11_response(Data_1, State_2) of
{error, Reason} ->
@@ -861,9 +863,9 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
end
end;
{no, Acc_1} when MaxHeaderSize == infinity ->
- State#state{reply_buffer=Acc_1};
- {no, Acc_1} when length(Acc_1) < MaxHeaderSize ->
- State#state{reply_buffer=Acc_1};
+ State#state{reply_buffer = Acc_1};
+ {no, Acc_1} when size(Acc_1) < MaxHeaderSize ->
+ State#state{reply_buffer = Acc_1};
{no, _Acc_1} ->
fail_pipelined_requests(State, {error, max_headers_size_exceeded}),
{error, max_headers_size_exceeded}
@@ -878,122 +880,97 @@ is_connection_closing(_, _) -> false.
parse_11_response(DataRecvd,
#state{transfer_encoding=chunked,
chunk_size=chunk_start,
- cur_req=CurReq,
- reply_buffer=Buf
- }=State) ->
- case scan_crlf(DataRecvd, Buf) of
+ chunk_size_buffer = Chunk_sz_buf
+ } = State) ->
+ case scan_crlf(Chunk_sz_buf, DataRecvd) of
{yes, ChunkHeader, Data_1} ->
case parse_chunk_header(ChunkHeader) of
{error, Reason} ->
{error, Reason};
ChunkSize ->
- #request{stream_to=StreamTo, req_id=ReqId,
- response_format = Response_format} = CurReq,
%%
- %% Do we have to preserve the chunk encoding when streaming?
+ %% Do we have to preserve the chunk encoding when
+ %% streaming? NO. This should be transparent to the client
+ %% process. Chunked encoding was only introduced to make
+ %% it efficient for the server.
%%
- do_interim_reply(StreamTo, Response_format,
- ReqId, {chunk_start, ChunkSize}),
- RemLen = length(Data_1),
+ RemLen = size(Data_1),
do_trace("Determined chunk size: ~p. Already recvd: ~p~n", [ChunkSize, RemLen]),
- parse_11_response(Data_1, State#state{rep_buf_size=0,
- reply_buffer=[],
- deleted_crlf=true,
- chunk_size=ChunkSize})
+ parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>,
+ deleted_crlf = true,
+ recvd_chunk_size = 0,
+ chunk_size = ChunkSize})
end;
{no, Data_1} ->
- State#state{reply_buffer=Data_1, rep_buf_size=length(Data_1)}
+ State#state{chunk_size_buffer = Data_1}
end;
-%% This clause is there to remove the CRLF between two chunks
+%% This clause is to remove the CRLF between two chunks
%%
parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked,
- chunk_size=tbd,
- chunks = Chunks,
- cur_req=CurReq,
- reply_buffer=Buf}=State) ->
- case scan_crlf(DataRecvd, Buf) of
+ #state{transfer_encoding = chunked,
+ chunk_size = tbd,
+ chunk_size_buffer = Buf}=State) ->
+ case scan_crlf(Buf, DataRecvd) of
{yes, _, NextChunk} ->
- #request{stream_to=StreamTo, req_id=ReqId,
- response_format = Response_format} = CurReq,
- %%
- %% Do we have to preserve the chunk encoding when streaming?
- %%
- State_1 = State#state{chunk_size=chunk_start,
- rep_buf_size=0,
- reply_buffer=[],
- deleted_crlf=true},
- State_2 = case StreamTo of
- undefined ->
- State_1#state{chunks = [Buf | Chunks]};
- _ ->
- %% Flush out all buffered data as chunk is ending
- do_interim_reply(StreamTo, Response_format, ReqId,
- lists:reverse([Buf | Chunks])),
- do_interim_reply(StreamTo, Response_format,
- ReqId, chunk_end),
- State_1#state{chunks = [], streamed_size = 0}
- end,
- parse_11_response(NextChunk, State_2);
+ State_1 = State#state{chunk_size = chunk_start,
+ chunk_size_buffer = <<>>,
+%% reply_buffer = Buf_1,
+ deleted_crlf = true},
+ parse_11_response(NextChunk, State_1);
{no, Data_1} ->
- State#state{reply_buffer=Data_1, rep_buf_size=length(Data_1)}
+%% State#state{reply_buffer = Data_1, rep_buf_size = size(Data_1)}
+ State#state{chunk_size_buffer = Data_1}
end;
%% This clause deals with the end of a chunked transfer
parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked, chunk_size=0,
- cur_req=CurReq,
+ #state{transfer_encoding = chunked, chunk_size = 0,
+ cur_req = CurReq,
deleted_crlf = DelCrlf,
- reply_buffer=Trailer, reqs=Reqs}=State) ->
+ reply_buffer = Trailer, reqs = Reqs}=State) ->
do_trace("Detected end of chunked transfer...~n", []),
DataRecvd_1 = case DelCrlf of
false ->
DataRecvd;
true ->
- [$\r, $\n | DataRecvd]
- end,
- #request{stream_to=StreamTo, req_id=ReqId,
- response_format = Response_format} = CurReq,
- case scan_header(DataRecvd_1, Trailer) of
+ <<$\r, $\n, DataRecvd/binary>>
+ end,
+ case scan_header(Trailer, DataRecvd_1) of
{yes, _TEHeaders, Rem} ->
{_, Reqs_1} = queue:out(Reqs),
- %%
- %% Do we have to preserve the chunk encoding when streaming? Nope.
- %%
- do_interim_reply(StreamTo, Response_format, ReqId, chunk_end),
- State_1 = handle_response(CurReq, State#state{reqs=Reqs_1}),
+ State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}),
parse_response(Rem, reset_state(State_1));
{no, Rem} ->
- State#state{reply_buffer=Rem, rep_buf_size=length(Rem), deleted_crlf=false}
+ State#state{reply_buffer = Rem, rep_buf_size = size(Rem), deleted_crlf = false}
end;
%% This clause extracts a chunk, given the size.
parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked, chunk_size=CSz,
- rep_buf_size=RepBufSz}=State) ->
- NeedBytes = CSz - RepBufSz,
- DataLen = length(DataRecvd),
+ #state{transfer_encoding = chunked,
+ chunk_size = CSz,
+ recvd_chunk_size = Recvd_csz,
+ rep_buf_size = RepBufSz} = State) ->
+ NeedBytes = CSz - Recvd_csz,
+ DataLen = size(DataRecvd),
do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]),
case DataLen >= NeedBytes of
true ->
- {RemChunk, RemData} = split_list_at(DataRecvd, NeedBytes),
+ {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
do_trace("Recvd another chunk...~n", []),
do_trace("RemData -> ~p~n", [RemData]),
case accumulate_response(RemChunk, State) of
{error, Reason} ->
do_trace("Error accumulating response --> ~p~n", [Reason]),
{error, Reason};
- #state{reply_buffer = NewRepBuf,
- chunks = NewChunks} = State_1 ->
- State_2 = State_1#state{reply_buffer=[],
- chunks = [lists:reverse(NewRepBuf) | NewChunks],
- rep_buf_size=0,
- chunk_size=tbd},
+ #state{} = State_1 ->
+ State_2 = State_1#state{chunk_size=tbd},
parse_11_response(RemData, State_2)
end;
false ->
- accumulate_response(DataRecvd, State#state{rep_buf_size=(RepBufSz + DataLen)})
+ accumulate_response(DataRecvd,
+ State#state{rep_buf_size = RepBufSz + DataLen,
+ recvd_chunk_size = Recvd_csz + DataLen})
end;
%% This clause to extract the body when Content-Length is specified
@@ -1001,10 +978,10 @@ parse_11_response(DataRecvd,
#state{content_length=CL, rep_buf_size=RepBufSz,
reqs=Reqs}=State) ->
NeedBytes = CL - RepBufSz,
- DataLen = length(DataRecvd),
+ DataLen = size(DataRecvd),
case DataLen >= NeedBytes of
true ->
- {RemBody, Rem} = split_list_at(DataRecvd, NeedBytes),
+ {RemBody, Rem} = split_binary(DataRecvd, NeedBytes),
{_, Reqs_1} = queue:out(Reqs),
State_1 = accumulate_response(RemBody, State),
State_2 = handle_response(State_1#state.cur_req, State_1#state{reqs=Reqs_1}),
@@ -1023,15 +1000,8 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
#state{http_status_code = SCode,
send_timer = ReqTimer,
reply_buffer = RepBuf,
- transfer_encoding = TEnc,
- chunks = Chunks,
recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false ->
- Body = case TEnc of
- chunked ->
- lists:reverse(Chunks);
- _ ->
- lists:reverse(RepBuf)
- end,
+ Body = RepBuf,
State_1 = set_cur_request(State),
file:close(Fd),
ResponseBody = case TmpFilename of
@@ -1047,14 +1017,9 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format},
#state{http_status_code=SCode, recvd_headers=RespHeaders,
- reply_buffer=RepBuf, transfer_encoding=TEnc,
- chunks=Chunks, send_timer=ReqTimer}=State) ->
- Body = case TEnc of
- chunked ->
- lists:reverse(Chunks);
- _ ->
- lists:reverse(RepBuf)
- end,
+ reply_buffer = RepBuf,
+ send_timer=ReqTimer}=State) ->
+ Body = RepBuf,
%% State_1 = set_cur_request(State),
State_1 = case get(conn_close) of
"close" ->
@@ -1070,10 +1035,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
set_cur_request(State_1).
reset_state(State) ->
- State#state{status=get_header, rep_buf_size=0, streamed_size = 0,
- content_length=undefined,
- reply_buffer=[], chunks=[], recvd_headers=[], deleted_crlf=false,
- http_status_code=undefined, chunk_size=undefined, transfer_encoding=undefined}.
+ State#state{status = get_header,
+ rep_buf_size = 0,
+ streamed_size = 0,
+ content_length = undefined,
+ reply_buffer = <<>>,
+ chunk_size_buffer = <<>>,
+ recvd_headers = [],
+ deleted_crlf = false,
+ http_status_code = undefined,
+ chunk_size = undefined,
+ transfer_encoding = undefined}.
set_cur_request(#state{reqs = Reqs} = State) ->
case queue:to_list(Reqs) of
@@ -1084,7 +1056,7 @@ set_cur_request(#state{reqs = Reqs} = State) ->
end.
parse_headers(Headers) ->
- case scan_crlf(Headers, []) of
+ case scan_crlf(Headers) of
{yes, StatusLine, T} ->
Headers_1 = parse_headers_1(T),
case parse_status_line(StatusLine) of
@@ -1107,6 +1079,8 @@ parse_headers(Headers) ->
% SP. A recipient MAY replace any linear white space with a single
% SP before interpreting the field value or forwarding the message
% downstream.
+parse_headers_1(B) when is_binary(B) ->
+ parse_headers_1(binary_to_list(B));
parse_headers_1(String) ->
parse_headers_1(String, [], []).
@@ -1135,6 +1109,8 @@ parse_headers_1([], L, Acc) ->
end,
lists:reverse(Acc_1).
+parse_status_line(Line) when is_binary(Line) ->
+ parse_status_line(binary_to_list(Line));
parse_status_line(Line) ->
parse_status_line(Line, get_prot_vsn, [], []).
parse_status_line([32 | T], get_prot_vsn, ProtVsn, StatCode) ->
@@ -1148,6 +1124,8 @@ parse_status_line([H | T], get_status_code, ProtVsn, StatCode) ->
parse_status_line([], _, _, _) ->
http_09.
+parse_header(B) when is_binary(B) ->
+ parse_header(binary_to_list(B));
parse_header(L) ->
parse_header(L, []).
parse_header([$: | V], Acc) ->
@@ -1157,13 +1135,75 @@ parse_header([H | T], Acc) ->
parse_header([], _) ->
invalid.
-scan_header([$\n|T], [$\r,$\n,$\r|L]) -> {yes, lists:reverse([$\n,$\r| L]), T};
-scan_header([H|T], L) -> scan_header(T, [H|L]);
-scan_header([], L) -> {no, L}.
+scan_header(Bin) ->
+ case get_crlf_crlf_pos(Bin, 0) of
+ {yes, Pos} ->
+ {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos),
+ {yes, Headers, Body};
+ no ->
+ {no, Bin}
+ end.
+
+scan_header(Bin1, Bin2) when size(Bin1) < 4 ->
+ scan_header(<<Bin1/binary, Bin2/binary>>);
+scan_header(Bin1, <<>>) ->
+ scan_header(Bin1);
+scan_header(Bin1, Bin2) ->
+ Bin1_already_scanned_size = size(Bin1) - 4,
+ <<Headers_prefix:Bin1_already_scanned_size/binary, Rest/binary>> = Bin1,
+ Bin_to_scan = <<Rest/binary, Bin2/binary>>,
+ case get_crlf_crlf_pos(Bin_to_scan, 0) of
+ {yes, Pos} ->
+ {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos),
+ {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body};
+ no ->
+ {no, <<Bin1/binary, Bin2/binary>>}
+ end.
+
+get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos};
+get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1);
+get_crlf_crlf_pos(<<>>, _) -> no.
+
+scan_crlf(Bin) ->
+ case get_crlf_pos(Bin) of
+ {yes, Pos} ->
+ {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos),
+ {yes, Prefix, Suffix};
+ no ->
+ {no, Bin}
+ end.
+
+scan_crlf(<<>>, Bin2) ->
+ scan_crlf(Bin2);
+scan_crlf(Bin1, Bin2) when size(Bin1) < 2 ->
+ scan_crlf(<<Bin1/binary, Bin2/binary>>);
+scan_crlf(Bin1, Bin2) ->
+ scan_crlf_1(size(Bin1) - 2, Bin1, Bin2).
+
+scan_crlf_1(Bin1_head_size, Bin1, Bin2) ->
+ <<Bin1_head:Bin1_head_size/binary, Bin1_tail/binary>> = Bin1,
+ Bin3 = <<Bin1_tail/binary, Bin2/binary>>,
+ case get_crlf_pos(Bin3) of
+ {yes, Pos} ->
+ {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos),
+ {yes, concat_binary([Bin1_head, Prefix]), Suffix};
+ no ->
+ {no, concat_binary([Bin1, Bin2])}
+ end.
-scan_crlf([$\n|T], [$\r | L]) -> {yes, lists:reverse(L), T};
-scan_crlf([H|T], L) -> scan_crlf(T, [H|L]);
-scan_crlf([], L) -> {no, L}.
+get_crlf_pos(Bin) ->
+ get_crlf_pos(Bin, 0).
+
+get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, Pos};
+get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1);
+get_crlf_pos(<<>>, _) -> no.
+
+%% scan_crlf(<<$\n, T/binary>>, [$\r | L]) -> {yes, lists:reverse(L), T};
+%% scan_crlf(<<H, T/binary>>, L) -> scan_crlf(T, [H|L]);
+%% scan_crlf(<<>>, L) -> {no, L};
+%% scan_crlf([$\n|T], [$\r | L]) -> {yes, lists:reverse(L), T};
+%% scan_crlf([H|T], L) -> scan_crlf(T, [H|L]);
+%% scan_crlf([], L) -> {no, L}.
fmt_val(L) when list(L) -> L;
fmt_val(I) when integer(I) -> integer_to_list(I);
@@ -1221,16 +1261,16 @@ parse_chunk_header([]) ->
parse_chunk_header(ChunkHeader) ->
parse_chunk_header(ChunkHeader, []).
-parse_chunk_header([$; | _], Acc) ->
+parse_chunk_header(<<$;, _/binary>>, Acc) ->
hexlist_to_integer(lists:reverse(Acc));
-parse_chunk_header([H | T], Acc) ->
+parse_chunk_header(<<H, T/binary>>, Acc) ->
case is_whitespace(H) of
true ->
parse_chunk_header(T, Acc);
false ->
parse_chunk_header(T, [H | Acc])
end;
-parse_chunk_header([], Acc) ->
+parse_chunk_header(<<>>, Acc) ->
hexlist_to_integer(lists:reverse(Acc)).
is_whitespace($\s) -> true;
@@ -1249,6 +1289,8 @@ format_response_data(Resp_format, Body) ->
case Resp_format of
list when is_list(Body) ->
flatten(Body);
+ list when is_binary(Body) ->
+ binary_to_list(Body);
binary when is_list(Body) ->
list_to_binary(Body);
_ ->
@@ -1399,4 +1441,8 @@ get_stream_chunk_size(Options) ->
_ ->
?DEFAULT_STREAM_CHUNK_SIZE
end.
-
+
+get_inac_timeout(#state{cur_req = #request{options = Opts}}) ->
+ get_value(inactivity_timeout, Opts, infinity);
+get_inac_timeout(#state{cur_req = undefined}) ->
+ infinity.