From 963dd5ee2c59341e1506908e164100d5fa79e10b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Fri, 3 Jul 2009 00:58:13 +0000 Subject: upgrade to ibrowse 1.5.0 git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@790771 13f79535-47bb-0310-9956-ffa450edef68 --- src/ibrowse/ibrowse_http_client.erl | 402 ++++++++++++++++++++---------------- 1 file changed, 224 insertions(+), 178 deletions(-) (limited to 'src/ibrowse/ibrowse_http_client.erl') diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl index 9455bc20..24214ffb 100644 --- a/src/ibrowse/ibrowse_http_client.erl +++ b/src/ibrowse/ibrowse_http_client.erl @@ -6,7 +6,7 @@ %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi %%%------------------------------------------------------------------- -module(ibrowse_http_client). --vsn('$Id: ibrowse_http_client.erl,v 1.18 2008/05/21 15:28:11 chandrusf Exp $ '). +-vsn('$Id: ibrowse_http_client.erl,v 1.19 2009/07/01 22:43:19 chandrusf Exp $ '). -behaviour(gen_server). %%-------------------------------------------------------------------- @@ -42,11 +42,12 @@ use_proxy = false, proxy_auth_digest, ssl_options = [], is_ssl = false, socket, reqs=queue:new(), cur_req, status=idle, http_status_code, - reply_buffer=[], rep_buf_size=0, streamed_size = 0, + reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0, recvd_headers=[], is_closing, send_timer, content_length, - deleted_crlf = false, transfer_encoding, chunk_size, - chunks=[], lb_ets_tid, cur_pipeline_size = 0 + deleted_crlf = false, transfer_encoding, + chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, + lb_ets_tid, cur_pipeline_size = 0 }). -record(request, {url, method, options, from, @@ -57,8 +58,6 @@ response_format}). -import(ibrowse_lib, [ - parse_url/1, - printable_date/0, get_value/2, get_value/3, do_trace/2 @@ -83,15 +82,9 @@ stop(Conn_pid) -> gen_server:call(Conn_pid, stop). send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> - Timeout_1 = case Timeout of - infinity -> - infinity; - _ when is_integer(Timeout) -> - Timeout + 100 - end, gen_server:call( Conn_Pid, - {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout_1). + {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout). %%==================================================================== %% Server functions @@ -170,23 +163,29 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, Reqs = queue:in(NewReq, State#state.reqs), State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}), do_trace("Connecting...~n", []), - Timeout_1 = case Timeout of - infinity -> - infinity; - _ -> - round(Timeout*0.9) - end, - case do_connect(Host_1, Port_1, Options, State_2, Timeout_1) of + Start_ts = now(), + Conn_timeout = get_value(connect_timeout, Options, Timeout), + case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of {ok, Sock} -> + do_trace("Connected!~n", []), + End_ts = now(), Ref = case Timeout of infinity -> undefined; _ -> - erlang:send_after(Timeout, self(), {req_timedout, From}) + Rem_time = Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)), + case Rem_time > 0 of + true -> + erlang:send_after(Rem_time, self(), {req_timedout, From}); + false -> + shutting_down(State_2), + do_error_reply(State_2, req_timedout), + exit(normal) + end end, - do_trace("Connected!~n", []), case send_req_1(Url, Headers, Method, Body, Options, Sock, State_2) of ok -> + do_setopts(Sock, [{active, once}], State_2#state.is_ssl), case StreamTo of undefined -> ok; @@ -197,7 +196,7 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, send_timer = Ref, cur_req = NewReq, status = get_header}), - {noreply, State_3}; + {noreply, State_3, get_inac_timeout(State_3)}; Err -> shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), @@ -234,7 +233,7 @@ handle_call({send_req, {Url, Headers, Method, case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of ok -> State_2 = inc_pipeline_counter(State_1), - do_setopts(Sock, [{active, true}], State#state.is_ssl), + do_setopts(Sock, [{active, once}], State#state.is_ssl), case Timeout of infinity -> ok; @@ -254,7 +253,7 @@ handle_call({send_req, {Url, Headers, Method, _ -> gen_server:reply(From, {ibrowse_req_id, ReqId}) end, - {noreply, State_3}; + {noreply, State_3, get_inac_timeout(State_3)}; Err -> shutting_down(State_1), do_trace("Send request failed: Reason: ~p~n", [Err]), @@ -289,7 +288,8 @@ handle_cast(_Msg, State) -> %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -handle_info({tcp, _Sock, Data}, State) -> +handle_info({tcp, _Sock, Data}, #state{status = Status} = State) -> + do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]), handle_sock_data(Data, State); handle_info({ssl, _Sock, Data}, State) -> handle_sock_data(Data, State); @@ -305,14 +305,19 @@ handle_info({ssl_closed, _Sock}, State) -> handle_info({req_timedout, From}, State) -> case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of - false -> - {noreply, State}; - {value, _} -> - shutting_down(State), - do_error_reply(State, req_timedout), - {stop, normal, State} + false -> + {noreply, State}; + {value, _} -> + shutting_down(State), + do_error_reply(State, req_timedout), + {stop, normal, State} end; +handle_info(timeout, State) -> + shutting_down(State), + do_error_reply(State, req_timedout), + {stop, normal, State}; + handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; @@ -365,8 +370,8 @@ handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) -> shutting_down(State), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end; handle_sock_data(Data, #state{status=get_body, content_length=CL, @@ -382,8 +387,8 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL, {error, {Reason, {stat_code, StatCode}, Headers}}), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end; _ -> case parse_11_response(Data, State) of @@ -396,20 +401,17 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL, shutting_down(State), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end end. accumulate_response(Data, #state{ - cur_req = #request{save_response_to_file = SaveResponseToFile, + cur_req = #request{save_response_to_file = true, tmp_file_fd = undefined} = CurReq, - http_status_code=[$2 | _]}=State) when SaveResponseToFile /= false -> - TmpFilename = case SaveResponseToFile of - true -> make_tmp_filename(); - F -> F - end, + http_status_code=[$2 | _]}=State) -> + TmpFilename = make_tmp_filename(), case file:open(TmpFilename, [write, delayed_write, raw]) of {ok, Fd} -> accumulate_response(Data, State#state{ @@ -419,30 +421,30 @@ accumulate_response(Data, {error, Reason} -> {error, {file_open_error, Reason}} end; -accumulate_response(Data, #state{cur_req = #request{save_response_to_file = SaveResponseToFile, +accumulate_response(Data, #state{cur_req = #request{save_response_to_file = true, tmp_file_fd = Fd}, transfer_encoding=chunked, - chunks = Chunks, + reply_buffer = Reply_buf, http_status_code=[$2 | _] - } = State) when SaveResponseToFile /= false -> - case file:write(Fd, [Chunks | Data]) of + } = State) -> + case file:write(Fd, [Reply_buf, Data]) of ok -> - State#state{chunks = []}; + State#state{reply_buffer = <<>>}; {error, Reason} -> {error, {file_write_error, Reason}} end; -accumulate_response(Data, #state{cur_req = #request{save_response_to_file = SaveResponseToFile, +accumulate_response(Data, #state{cur_req = #request{save_response_to_file = true, tmp_file_fd = Fd}, reply_buffer = RepBuf, http_status_code=[$2 | _] - } = State) when SaveResponseToFile /= false -> - case file:write(Fd, [RepBuf | Data]) of + } = State) -> + case file:write(Fd, [RepBuf, Data]) of ok -> - State#state{reply_buffer = []}; + State#state{reply_buffer = <<>>}; {error, Reason} -> {error, {file_write_error, Reason}} end; -accumulate_response([], State) -> +accumulate_response(<<>>, State) -> State; accumulate_response(Data, #state{reply_buffer = RepBuf, rep_buf_size = RepBufSize, @@ -451,7 +453,7 @@ accumulate_response(Data, #state{reply_buffer = RepBuf, #request{stream_to=StreamTo, req_id=ReqId, stream_chunk_size = Stream_chunk_size, response_format = Response_format} = CurReq, - RepBuf_1 = [Data | RepBuf], + RepBuf_1 = concat_binary([RepBuf, Data]), New_data_size = RepBufSize - Streamed_size, case StreamTo of undefined -> @@ -459,12 +461,12 @@ accumulate_response(Data, #state{reply_buffer = RepBuf, _ when New_data_size < Stream_chunk_size -> State#state{reply_buffer = RepBuf_1}; _ -> - {Stream_chunk, Rem_data} = split_list_at(flatten(lists:reverse(RepBuf_1)), Stream_chunk_size), + {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size), do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk), accumulate_response( Rem_data, State#state{ - reply_buffer = [], + reply_buffer = <<>>, streamed_size = Streamed_size + Stream_chunk_size}) end. @@ -491,11 +493,11 @@ handle_sock_closed(#state{cur_req=undefined} = State) -> %% We check for IsClosing because this the server could have sent a %% Connection-Close header and has closed the socket to indicate end %% of response. There maybe requests pipelined which need a response. -handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC, - is_closing=IsClosing, - cur_req=#request{tmp_file_name=TmpFilename, - tmp_file_fd=Fd} = CurReq, - status=get_body, recvd_headers=Headers}=State) -> +handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC, + is_closing = IsClosing, + cur_req = #request{tmp_file_name=TmpFilename, + tmp_file_fd=Fd} = CurReq, + status = get_body, recvd_headers = Headers}=State) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format} = CurReq, case IsClosing of @@ -519,11 +521,11 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC, do_connect(Host, Port, _Options, #state{is_ssl=true, ssl_options=SSLOptions}, Timeout) -> ssl:connect(Host, Port, - [{nodelay, true}, {active, false} | SSLOptions], + [binary, {nodelay, true}, {active, false} | SSLOptions], Timeout); do_connect(Host, Port, _Options, _State, Timeout) -> gen_tcp:connect(Host, Port, - [{nodelay, true}, {active, false}], + [binary, {nodelay, true}, {active, false}], Timeout). do_send(Sock, Req, true) -> ssl:send(Sock, Req); @@ -602,7 +604,7 @@ send_req_1(#url{abspath = AbsPath, io:format("Err: ~p~n", [Err]), Err end, - do_setopts(Sock, [{active, true}], State#state.is_ssl), + do_setopts(Sock, [{active, once}], State#state.is_ssl), SndRes. add_auth_headers(#url{username = User, @@ -758,12 +760,12 @@ chunk_request_body(Body, _ChunkSize, Acc) when list(Body) -> parse_response(_Data, #state{cur_req = undefined}=State) -> State#state{status = idle}; -parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, - cur_req=CurReq}=State) -> +parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, + cur_req = CurReq} = State) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, method=Method, response_format = Resp_format} = CurReq, MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity), - case scan_header(Data, Acc) of + case scan_header(Acc, Data) of {yes, Headers, Data_1} -> do_trace("Recvd Header Data -> ~s~n----~n", [Headers]), do_trace("Recvd headers~n--- Headers Begin ---~n~s~n--- Headers End ---~n~n", [Headers]), @@ -779,7 +781,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, ok end, State_1 = State#state{recvd_headers=Headers_1, status=get_body, - reply_buffer = [], + reply_buffer = <<>>, http_status_code=StatCode, is_closing=IsClosing}, put(conn_close, ConnClose), TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")), @@ -818,7 +820,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, send_async_headers(ReqId, StreamTo, StatCode, Headers_1), case parse_11_response(Data_1, State_1#state{transfer_encoding=chunked, chunk_size=chunk_start, - reply_buffer=[], chunks=[]}) of + reply_buffer = <<>>}) of {error, Reason} -> fail_pipelined_requests(State_1, {error, {Reason, @@ -830,7 +832,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, undefined when HttpVsn == "HTTP/1.0"; ConnClose == "close" -> send_async_headers(ReqId, StreamTo, StatCode, Headers_1), - State_1#state{reply_buffer=[Data_1]}; + State_1#state{reply_buffer = Data_1}; undefined -> fail_pipelined_requests(State_1, {error, {content_length_undefined, @@ -842,7 +844,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, send_async_headers(ReqId, StreamTo, StatCode, Headers_1), do_trace("Recvd Content-Length of ~p~n", [V_1]), State_2 = State_1#state{rep_buf_size=0, - reply_buffer=[], + reply_buffer = <<>>, content_length=V_1}, case parse_11_response(Data_1, State_2) of {error, Reason} -> @@ -861,9 +863,9 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, end end; {no, Acc_1} when MaxHeaderSize == infinity -> - State#state{reply_buffer=Acc_1}; - {no, Acc_1} when length(Acc_1) < MaxHeaderSize -> - State#state{reply_buffer=Acc_1}; + State#state{reply_buffer = Acc_1}; + {no, Acc_1} when size(Acc_1) < MaxHeaderSize -> + State#state{reply_buffer = Acc_1}; {no, _Acc_1} -> fail_pipelined_requests(State, {error, max_headers_size_exceeded}), {error, max_headers_size_exceeded} @@ -878,122 +880,97 @@ is_connection_closing(_, _) -> false. parse_11_response(DataRecvd, #state{transfer_encoding=chunked, chunk_size=chunk_start, - cur_req=CurReq, - reply_buffer=Buf - }=State) -> - case scan_crlf(DataRecvd, Buf) of + chunk_size_buffer = Chunk_sz_buf + } = State) -> + case scan_crlf(Chunk_sz_buf, DataRecvd) of {yes, ChunkHeader, Data_1} -> case parse_chunk_header(ChunkHeader) of {error, Reason} -> {error, Reason}; ChunkSize -> - #request{stream_to=StreamTo, req_id=ReqId, - response_format = Response_format} = CurReq, %% - %% Do we have to preserve the chunk encoding when streaming? + %% Do we have to preserve the chunk encoding when + %% streaming? NO. This should be transparent to the client + %% process. Chunked encoding was only introduced to make + %% it efficient for the server. %% - do_interim_reply(StreamTo, Response_format, - ReqId, {chunk_start, ChunkSize}), - RemLen = length(Data_1), + RemLen = size(Data_1), do_trace("Determined chunk size: ~p. Already recvd: ~p~n", [ChunkSize, RemLen]), - parse_11_response(Data_1, State#state{rep_buf_size=0, - reply_buffer=[], - deleted_crlf=true, - chunk_size=ChunkSize}) + parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>, + deleted_crlf = true, + recvd_chunk_size = 0, + chunk_size = ChunkSize}) end; {no, Data_1} -> - State#state{reply_buffer=Data_1, rep_buf_size=length(Data_1)} + State#state{chunk_size_buffer = Data_1} end; -%% This clause is there to remove the CRLF between two chunks +%% This clause is to remove the CRLF between two chunks %% parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, - chunk_size=tbd, - chunks = Chunks, - cur_req=CurReq, - reply_buffer=Buf}=State) -> - case scan_crlf(DataRecvd, Buf) of + #state{transfer_encoding = chunked, + chunk_size = tbd, + chunk_size_buffer = Buf}=State) -> + case scan_crlf(Buf, DataRecvd) of {yes, _, NextChunk} -> - #request{stream_to=StreamTo, req_id=ReqId, - response_format = Response_format} = CurReq, - %% - %% Do we have to preserve the chunk encoding when streaming? - %% - State_1 = State#state{chunk_size=chunk_start, - rep_buf_size=0, - reply_buffer=[], - deleted_crlf=true}, - State_2 = case StreamTo of - undefined -> - State_1#state{chunks = [Buf | Chunks]}; - _ -> - %% Flush out all buffered data as chunk is ending - do_interim_reply(StreamTo, Response_format, ReqId, - lists:reverse([Buf | Chunks])), - do_interim_reply(StreamTo, Response_format, - ReqId, chunk_end), - State_1#state{chunks = [], streamed_size = 0} - end, - parse_11_response(NextChunk, State_2); + State_1 = State#state{chunk_size = chunk_start, + chunk_size_buffer = <<>>, +%% reply_buffer = Buf_1, + deleted_crlf = true}, + parse_11_response(NextChunk, State_1); {no, Data_1} -> - State#state{reply_buffer=Data_1, rep_buf_size=length(Data_1)} +%% State#state{reply_buffer = Data_1, rep_buf_size = size(Data_1)} + State#state{chunk_size_buffer = Data_1} end; %% This clause deals with the end of a chunked transfer parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, chunk_size=0, - cur_req=CurReq, + #state{transfer_encoding = chunked, chunk_size = 0, + cur_req = CurReq, deleted_crlf = DelCrlf, - reply_buffer=Trailer, reqs=Reqs}=State) -> + reply_buffer = Trailer, reqs = Reqs}=State) -> do_trace("Detected end of chunked transfer...~n", []), DataRecvd_1 = case DelCrlf of false -> DataRecvd; true -> - [$\r, $\n | DataRecvd] - end, - #request{stream_to=StreamTo, req_id=ReqId, - response_format = Response_format} = CurReq, - case scan_header(DataRecvd_1, Trailer) of + <<$\r, $\n, DataRecvd/binary>> + end, + case scan_header(Trailer, DataRecvd_1) of {yes, _TEHeaders, Rem} -> {_, Reqs_1} = queue:out(Reqs), - %% - %% Do we have to preserve the chunk encoding when streaming? Nope. - %% - do_interim_reply(StreamTo, Response_format, ReqId, chunk_end), - State_1 = handle_response(CurReq, State#state{reqs=Reqs_1}), + State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}), parse_response(Rem, reset_state(State_1)); {no, Rem} -> - State#state{reply_buffer=Rem, rep_buf_size=length(Rem), deleted_crlf=false} + State#state{reply_buffer = Rem, rep_buf_size = size(Rem), deleted_crlf = false} end; %% This clause extracts a chunk, given the size. parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, chunk_size=CSz, - rep_buf_size=RepBufSz}=State) -> - NeedBytes = CSz - RepBufSz, - DataLen = length(DataRecvd), + #state{transfer_encoding = chunked, + chunk_size = CSz, + recvd_chunk_size = Recvd_csz, + rep_buf_size = RepBufSz} = State) -> + NeedBytes = CSz - Recvd_csz, + DataLen = size(DataRecvd), do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]), case DataLen >= NeedBytes of true -> - {RemChunk, RemData} = split_list_at(DataRecvd, NeedBytes), + {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes), do_trace("Recvd another chunk...~n", []), do_trace("RemData -> ~p~n", [RemData]), case accumulate_response(RemChunk, State) of {error, Reason} -> do_trace("Error accumulating response --> ~p~n", [Reason]), {error, Reason}; - #state{reply_buffer = NewRepBuf, - chunks = NewChunks} = State_1 -> - State_2 = State_1#state{reply_buffer=[], - chunks = [lists:reverse(NewRepBuf) | NewChunks], - rep_buf_size=0, - chunk_size=tbd}, + #state{} = State_1 -> + State_2 = State_1#state{chunk_size=tbd}, parse_11_response(RemData, State_2) end; false -> - accumulate_response(DataRecvd, State#state{rep_buf_size=(RepBufSz + DataLen)}) + accumulate_response(DataRecvd, + State#state{rep_buf_size = RepBufSz + DataLen, + recvd_chunk_size = Recvd_csz + DataLen}) end; %% This clause to extract the body when Content-Length is specified @@ -1001,10 +978,10 @@ parse_11_response(DataRecvd, #state{content_length=CL, rep_buf_size=RepBufSz, reqs=Reqs}=State) -> NeedBytes = CL - RepBufSz, - DataLen = length(DataRecvd), + DataLen = size(DataRecvd), case DataLen >= NeedBytes of true -> - {RemBody, Rem} = split_list_at(DataRecvd, NeedBytes), + {RemBody, Rem} = split_binary(DataRecvd, NeedBytes), {_, Reqs_1} = queue:out(Reqs), State_1 = accumulate_response(RemBody, State), State_2 = handle_response(State_1#state.cur_req, State_1#state{reqs=Reqs_1}), @@ -1023,15 +1000,8 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, #state{http_status_code = SCode, send_timer = ReqTimer, reply_buffer = RepBuf, - transfer_encoding = TEnc, - chunks = Chunks, recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false -> - Body = case TEnc of - chunked -> - lists:reverse(Chunks); - _ -> - lists:reverse(RepBuf) - end, + Body = RepBuf, State_1 = set_cur_request(State), file:close(Fd), ResponseBody = case TmpFilename of @@ -1047,14 +1017,9 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format}, #state{http_status_code=SCode, recvd_headers=RespHeaders, - reply_buffer=RepBuf, transfer_encoding=TEnc, - chunks=Chunks, send_timer=ReqTimer}=State) -> - Body = case TEnc of - chunked -> - lists:reverse(Chunks); - _ -> - lists:reverse(RepBuf) - end, + reply_buffer = RepBuf, + send_timer=ReqTimer}=State) -> + Body = RepBuf, %% State_1 = set_cur_request(State), State_1 = case get(conn_close) of "close" -> @@ -1070,10 +1035,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, set_cur_request(State_1). reset_state(State) -> - State#state{status=get_header, rep_buf_size=0, streamed_size = 0, - content_length=undefined, - reply_buffer=[], chunks=[], recvd_headers=[], deleted_crlf=false, - http_status_code=undefined, chunk_size=undefined, transfer_encoding=undefined}. + State#state{status = get_header, + rep_buf_size = 0, + streamed_size = 0, + content_length = undefined, + reply_buffer = <<>>, + chunk_size_buffer = <<>>, + recvd_headers = [], + deleted_crlf = false, + http_status_code = undefined, + chunk_size = undefined, + transfer_encoding = undefined}. set_cur_request(#state{reqs = Reqs} = State) -> case queue:to_list(Reqs) of @@ -1084,7 +1056,7 @@ set_cur_request(#state{reqs = Reqs} = State) -> end. parse_headers(Headers) -> - case scan_crlf(Headers, []) of + case scan_crlf(Headers) of {yes, StatusLine, T} -> Headers_1 = parse_headers_1(T), case parse_status_line(StatusLine) of @@ -1107,6 +1079,8 @@ parse_headers(Headers) -> % SP. A recipient MAY replace any linear white space with a single % SP before interpreting the field value or forwarding the message % downstream. +parse_headers_1(B) when is_binary(B) -> + parse_headers_1(binary_to_list(B)); parse_headers_1(String) -> parse_headers_1(String, [], []). @@ -1135,6 +1109,8 @@ parse_headers_1([], L, Acc) -> end, lists:reverse(Acc_1). +parse_status_line(Line) when is_binary(Line) -> + parse_status_line(binary_to_list(Line)); parse_status_line(Line) -> parse_status_line(Line, get_prot_vsn, [], []). parse_status_line([32 | T], get_prot_vsn, ProtVsn, StatCode) -> @@ -1148,6 +1124,8 @@ parse_status_line([H | T], get_status_code, ProtVsn, StatCode) -> parse_status_line([], _, _, _) -> http_09. +parse_header(B) when is_binary(B) -> + parse_header(binary_to_list(B)); parse_header(L) -> parse_header(L, []). parse_header([$: | V], Acc) -> @@ -1157,13 +1135,75 @@ parse_header([H | T], Acc) -> parse_header([], _) -> invalid. -scan_header([$\n|T], [$\r,$\n,$\r|L]) -> {yes, lists:reverse([$\n,$\r| L]), T}; -scan_header([H|T], L) -> scan_header(T, [H|L]); -scan_header([], L) -> {no, L}. +scan_header(Bin) -> + case get_crlf_crlf_pos(Bin, 0) of + {yes, Pos} -> + {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos), + {yes, Headers, Body}; + no -> + {no, Bin} + end. + +scan_header(Bin1, Bin2) when size(Bin1) < 4 -> + scan_header(<>); +scan_header(Bin1, <<>>) -> + scan_header(Bin1); +scan_header(Bin1, Bin2) -> + Bin1_already_scanned_size = size(Bin1) - 4, + <> = Bin1, + Bin_to_scan = <>, + case get_crlf_crlf_pos(Bin_to_scan, 0) of + {yes, Pos} -> + {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos), + {yes, <>, Body}; + no -> + {no, <>} + end. + +get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1); +get_crlf_crlf_pos(<<>>, _) -> no. + +scan_crlf(Bin) -> + case get_crlf_pos(Bin) of + {yes, Pos} -> + {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos), + {yes, Prefix, Suffix}; + no -> + {no, Bin} + end. + +scan_crlf(<<>>, Bin2) -> + scan_crlf(Bin2); +scan_crlf(Bin1, Bin2) when size(Bin1) < 2 -> + scan_crlf(<>); +scan_crlf(Bin1, Bin2) -> + scan_crlf_1(size(Bin1) - 2, Bin1, Bin2). + +scan_crlf_1(Bin1_head_size, Bin1, Bin2) -> + <> = Bin1, + Bin3 = <>, + case get_crlf_pos(Bin3) of + {yes, Pos} -> + {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos), + {yes, concat_binary([Bin1_head, Prefix]), Suffix}; + no -> + {no, concat_binary([Bin1, Bin2])} + end. -scan_crlf([$\n|T], [$\r | L]) -> {yes, lists:reverse(L), T}; -scan_crlf([H|T], L) -> scan_crlf(T, [H|L]); -scan_crlf([], L) -> {no, L}. +get_crlf_pos(Bin) -> + get_crlf_pos(Bin, 0). + +get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1); +get_crlf_pos(<<>>, _) -> no. + +%% scan_crlf(<<$\n, T/binary>>, [$\r | L]) -> {yes, lists:reverse(L), T}; +%% scan_crlf(<>, L) -> scan_crlf(T, [H|L]); +%% scan_crlf(<<>>, L) -> {no, L}; +%% scan_crlf([$\n|T], [$\r | L]) -> {yes, lists:reverse(L), T}; +%% scan_crlf([H|T], L) -> scan_crlf(T, [H|L]); +%% scan_crlf([], L) -> {no, L}. fmt_val(L) when list(L) -> L; fmt_val(I) when integer(I) -> integer_to_list(I); @@ -1221,16 +1261,16 @@ parse_chunk_header([]) -> parse_chunk_header(ChunkHeader) -> parse_chunk_header(ChunkHeader, []). -parse_chunk_header([$; | _], Acc) -> +parse_chunk_header(<<$;, _/binary>>, Acc) -> hexlist_to_integer(lists:reverse(Acc)); -parse_chunk_header([H | T], Acc) -> +parse_chunk_header(<>, Acc) -> case is_whitespace(H) of true -> parse_chunk_header(T, Acc); false -> parse_chunk_header(T, [H | Acc]) end; -parse_chunk_header([], Acc) -> +parse_chunk_header(<<>>, Acc) -> hexlist_to_integer(lists:reverse(Acc)). is_whitespace($\s) -> true; @@ -1249,6 +1289,8 @@ format_response_data(Resp_format, Body) -> case Resp_format of list when is_list(Body) -> flatten(Body); + list when is_binary(Body) -> + binary_to_list(Body); binary when is_list(Body) -> list_to_binary(Body); _ -> @@ -1399,4 +1441,8 @@ get_stream_chunk_size(Options) -> _ -> ?DEFAULT_STREAM_CHUNK_SIZE end. - + +get_inac_timeout(#state{cur_req = #request{options = Opts}}) -> + get_value(inactivity_timeout, Opts, infinity); +get_inac_timeout(#state{cur_req = undefined}) -> + infinity. -- cgit v1.2.3