From 963dd5ee2c59341e1506908e164100d5fa79e10b Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Fri, 3 Jul 2009 00:58:13 +0000 Subject: upgrade to ibrowse 1.5.0 git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@790771 13f79535-47bb-0310-9956-ffa450edef68 --- src/ibrowse/ibrowse.app | 2 +- src/ibrowse/ibrowse.erl | 93 +++++++-- src/ibrowse/ibrowse_http_client.erl | 402 ++++++++++++++++++++---------------- src/ibrowse/ibrowse_lb.erl | 9 +- src/ibrowse/ibrowse_test.erl | 44 ++-- 5 files changed, 333 insertions(+), 217 deletions(-) (limited to 'src') diff --git a/src/ibrowse/ibrowse.app b/src/ibrowse/ibrowse.app index 960c0794..5e4621d3 100644 --- a/src/ibrowse/ibrowse.app +++ b/src/ibrowse/ibrowse.app @@ -1,6 +1,6 @@ {application, ibrowse, [{description, "HTTP client application"}, - {vsn, "1.4.1"}, + {vsn, "1.5.0"}, {modules, [ ibrowse, ibrowse_http_client, ibrowse_app, diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl index 3390e58a..1b0daadd 100644 --- a/src/ibrowse/ibrowse.erl +++ b/src/ibrowse/ibrowse.erl @@ -6,8 +6,8 @@ %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi %%%------------------------------------------------------------------- %% @author Chandrashekhar Mullaparthi -%% @copyright 2005-2008 Chandrashekhar Mullaparthi -%% @version 1.4 +%% @copyright 2005-2009 Chandrashekhar Mullaparthi +%% @version 1.5.0 %% @doc The ibrowse application implements an HTTP 1.1 client. This %% module implements the API of the HTTP client. There is one named %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is @@ -57,7 +57,7 @@ %% driver isn't actually used.

-module(ibrowse). --vsn('$Id: ibrowse.erl,v 1.7 2008/05/21 15:28:11 chandrusf Exp $ '). +-vsn('$Id: ibrowse.erl,v 1.8 2009/07/01 22:43:19 chandrusf Exp $ '). -behaviour(gen_server). %%-------------------------------------------------------------------- @@ -96,6 +96,7 @@ trace_off/0, trace_on/2, trace_off/2, + all_trace_off/0, show_dest_status/2 ]). @@ -105,8 +106,6 @@ -import(ibrowse_lib, [ parse_url/1, - printable_date/0, - get_value/2, get_value/3, do_trace/2 ]). @@ -114,6 +113,7 @@ -record(state, {trace = false}). -include("ibrowse.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). -define(DEF_MAX_SESSIONS,10). -define(DEF_MAX_PIPELINE_SIZE,10). @@ -170,7 +170,7 @@ send_req(Url, Headers, Method, Body) -> %% For a description of SSL Options, look in the ssl manpage. If the %% HTTP Version to use is not specified, the default is 1.1. %%
-%%

The host_header is useful in the case where ibrowse is +%%

The host_header option is useful in the case where ibrowse is %% connecting to a component such as stunnel which then sets up a %% secure connection to a webserver. In this case, the URL supplied to @@ -188,11 +188,39 @@ send_req(Url, Headers, Method, Body) -> %%

  • Whenever an error occurs in the processing of a request, ibrowse will return as much %% information as it has, such as HTTP Status Code and HTTP Headers. When this happens, the response %% is of the form {error, {Reason, {stat_code, StatusCode}, HTTP_headers}}
  • +%% +%%
  • The inactivity_timeout option is useful when +%% dealing with large response bodies and/or slow links. In these +%% cases, it might be hard to estimate how long a request will take to +%% complete. In such cases, the client might want to timeout if no +%% data has been received on the link for a certain time interval.
  • +%% +%%
  • +%% The connect_timeout option is to specify how long the +%% client process should wait for connection establishment. This is +%% useful in scenarios where connections to servers are usually setup +%% very fast, but responses might take much longer compared to +%% connection setup. In such cases, it is better for the calling +%% process to timeout faster if there is a problem (DNS lookup +%% delays/failures, network routing issues, etc). The total timeout +%% value specified for the request will enforced. To illustrate using +%% an example: +%% +%% ibrowse:send_req("http://www.example.com/cgi-bin/request", [], get, [], [{connect_timeout, 100}], 1000). +%% +%% In the above invocation, if the connection isn't established within +%% 100 milliseconds, the request will fail with +%% {error, conn_failed}.
    +%% If connection setup succeeds, the total time allowed for the +%% request to complete will be 1000 milliseconds minus the time taken +%% for connection setup. +%%
  • %% +%% %% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response() %% optionList() = [option()] %% option() = {max_sessions, integer()} | -%% {response_format,response_format()}| +%% {response_format,response_format()}| %% {stream_chunk_size, integer()} | %% {max_pipeline_size, integer()} | %% {trace, boolean()} | @@ -212,8 +240,10 @@ send_req(Url, Headers, Method, Body) -> %% {stream_to, process()} | %% {http_vsn, {MajorVsn, MinorVsn}} | %% {host_header, string()} | +%% {inactivity_timeout, integer()} | +%% {connect_timeout, integer()} | %% {transfer_encoding, {chunked, ChunkSize}} -%% +%% %% process() = pid() | atom() %% username() = string() %% password() = string() @@ -314,7 +344,7 @@ set_max_pipeline_size(Host, Port, Max) when is_integer(Max), Max > 0 -> do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) -> case catch ibrowse_http_client:send_req(Conn_Pid, Parsed_url, - Headers, Method, Body, + Headers, Method, ensure_bin(Body), Options, Timeout) of {'EXIT', {timeout, _}} -> {error, req_timedout}; @@ -331,6 +361,11 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) -> Ret end. +ensure_bin(L) when is_list(L) -> + list_to_binary(L); +ensure_bin(B) when is_binary(B) -> + B. + %% @doc Creates a HTTP client process to the specified Host:Port which %% is not part of the load balancing pool. This is useful in cases %% where some requests to a webserver might take a long time whereas @@ -400,17 +435,25 @@ trace_off() -> %% @doc Turn tracing on for all connections to the specified HTTP %% server. Host is whatever is specified as the domain name in the URL -%% @spec trace_on(Host, Port) -> term() +%% @spec trace_on(Host, Port) -> ok %% Host = string() %% Port = integer() trace_on(Host, Port) -> - ibrowse ! {trace, true, Host, Port}. + ibrowse ! {trace, true, Host, Port}, + ok. %% @doc Turn tracing OFF for all connections to the specified HTTP %% server. -%% @spec trace_off(Host, Port) -> term() +%% @spec trace_off(Host, Port) -> ok trace_off(Host, Port) -> - ibrowse ! {trace, false, Host, Port}. + ibrowse ! {trace, false, Host, Port}, + ok. + +%% @doc Turn Off ALL tracing +%% @spec all_trace_off() -> ok +all_trace_off() -> + ibrowse ! all_trace_off, + ok. %% @doc Shows some internal information about load balancing to a %% specified Host:Port. Info about workers spawned using @@ -588,6 +631,30 @@ handle_cast(_Msg, State) -> %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- +handle_info(all_trace_off, State) -> + Mspec = [{{ibrowse_conf,{trace,'$1','$2'},true},[],[{{'$1','$2'}}]}], + Trace_on_dests = ets:select(ibrowse_conf, Mspec), + Fun = fun(#lb_pid{host_port = {H, P}, pid = Pid}, _) -> + case lists:member({H, P}, Trace_on_dests) of + false -> + ok; + true -> + catch Pid ! {trace, false} + end; + (#client_conn{key = {H, P, Pid}}, _) -> + case lists:member({H, P}, Trace_on_dests) of + false -> + ok; + true -> + catch Pid ! {trace, false} + end; + (_, Acc) -> + Acc + end, + ets:foldl(Fun, undefined, ibrowse_lb), + ets:select_delete(ibrowse_conf, [{{ibrowse_conf,{trace,'$1','$2'},true},[],['true']}]), + {noreply, State}; + handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl index 9455bc20..24214ffb 100644 --- a/src/ibrowse/ibrowse_http_client.erl +++ b/src/ibrowse/ibrowse_http_client.erl @@ -6,7 +6,7 @@ %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi %%%------------------------------------------------------------------- -module(ibrowse_http_client). --vsn('$Id: ibrowse_http_client.erl,v 1.18 2008/05/21 15:28:11 chandrusf Exp $ '). +-vsn('$Id: ibrowse_http_client.erl,v 1.19 2009/07/01 22:43:19 chandrusf Exp $ '). -behaviour(gen_server). %%-------------------------------------------------------------------- @@ -42,11 +42,12 @@ use_proxy = false, proxy_auth_digest, ssl_options = [], is_ssl = false, socket, reqs=queue:new(), cur_req, status=idle, http_status_code, - reply_buffer=[], rep_buf_size=0, streamed_size = 0, + reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0, recvd_headers=[], is_closing, send_timer, content_length, - deleted_crlf = false, transfer_encoding, chunk_size, - chunks=[], lb_ets_tid, cur_pipeline_size = 0 + deleted_crlf = false, transfer_encoding, + chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, + lb_ets_tid, cur_pipeline_size = 0 }). -record(request, {url, method, options, from, @@ -57,8 +58,6 @@ response_format}). -import(ibrowse_lib, [ - parse_url/1, - printable_date/0, get_value/2, get_value/3, do_trace/2 @@ -83,15 +82,9 @@ stop(Conn_pid) -> gen_server:call(Conn_pid, stop). send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> - Timeout_1 = case Timeout of - infinity -> - infinity; - _ when is_integer(Timeout) -> - Timeout + 100 - end, gen_server:call( Conn_Pid, - {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout_1). + {send_req, {Url, Headers, Method, Body, Options, Timeout}}, Timeout). %%==================================================================== %% Server functions @@ -170,23 +163,29 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, Reqs = queue:in(NewReq, State#state.reqs), State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}), do_trace("Connecting...~n", []), - Timeout_1 = case Timeout of - infinity -> - infinity; - _ -> - round(Timeout*0.9) - end, - case do_connect(Host_1, Port_1, Options, State_2, Timeout_1) of + Start_ts = now(), + Conn_timeout = get_value(connect_timeout, Options, Timeout), + case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of {ok, Sock} -> + do_trace("Connected!~n", []), + End_ts = now(), Ref = case Timeout of infinity -> undefined; _ -> - erlang:send_after(Timeout, self(), {req_timedout, From}) + Rem_time = Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)), + case Rem_time > 0 of + true -> + erlang:send_after(Rem_time, self(), {req_timedout, From}); + false -> + shutting_down(State_2), + do_error_reply(State_2, req_timedout), + exit(normal) + end end, - do_trace("Connected!~n", []), case send_req_1(Url, Headers, Method, Body, Options, Sock, State_2) of ok -> + do_setopts(Sock, [{active, once}], State_2#state.is_ssl), case StreamTo of undefined -> ok; @@ -197,7 +196,7 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}}, send_timer = Ref, cur_req = NewReq, status = get_header}), - {noreply, State_3}; + {noreply, State_3, get_inac_timeout(State_3)}; Err -> shutting_down(State_2), do_trace("Send failed... Reason: ~p~n", [Err]), @@ -234,7 +233,7 @@ handle_call({send_req, {Url, Headers, Method, case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of ok -> State_2 = inc_pipeline_counter(State_1), - do_setopts(Sock, [{active, true}], State#state.is_ssl), + do_setopts(Sock, [{active, once}], State#state.is_ssl), case Timeout of infinity -> ok; @@ -254,7 +253,7 @@ handle_call({send_req, {Url, Headers, Method, _ -> gen_server:reply(From, {ibrowse_req_id, ReqId}) end, - {noreply, State_3}; + {noreply, State_3, get_inac_timeout(State_3)}; Err -> shutting_down(State_1), do_trace("Send request failed: Reason: ~p~n", [Err]), @@ -289,7 +288,8 @@ handle_cast(_Msg, State) -> %% {noreply, State, Timeout} | %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- -handle_info({tcp, _Sock, Data}, State) -> +handle_info({tcp, _Sock, Data}, #state{status = Status} = State) -> + do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]), handle_sock_data(Data, State); handle_info({ssl, _Sock, Data}, State) -> handle_sock_data(Data, State); @@ -305,14 +305,19 @@ handle_info({ssl_closed, _Sock}, State) -> handle_info({req_timedout, From}, State) -> case lists:keysearch(From, #request.from, queue:to_list(State#state.reqs)) of - false -> - {noreply, State}; - {value, _} -> - shutting_down(State), - do_error_reply(State, req_timedout), - {stop, normal, State} + false -> + {noreply, State}; + {value, _} -> + shutting_down(State), + do_error_reply(State, req_timedout), + {stop, normal, State} end; +handle_info(timeout, State) -> + shutting_down(State), + do_error_reply(State, req_timedout), + {stop, normal, State}; + handle_info({trace, Bool}, State) -> put(my_trace_flag, Bool), {noreply, State}; @@ -365,8 +370,8 @@ handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) -> shutting_down(State), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end; handle_sock_data(Data, #state{status=get_body, content_length=CL, @@ -382,8 +387,8 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL, {error, {Reason, {stat_code, StatCode}, Headers}}), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end; _ -> case parse_11_response(Data, State) of @@ -396,20 +401,17 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL, shutting_down(State), {stop, normal, State}; State_1 -> - do_setopts(Sock, [{active, true}], State#state.is_ssl), - {noreply, State_1} + do_setopts(Sock, [{active, once}], State#state.is_ssl), + {noreply, State_1, get_inac_timeout(State_1)} end end. accumulate_response(Data, #state{ - cur_req = #request{save_response_to_file = SaveResponseToFile, + cur_req = #request{save_response_to_file = true, tmp_file_fd = undefined} = CurReq, - http_status_code=[$2 | _]}=State) when SaveResponseToFile /= false -> - TmpFilename = case SaveResponseToFile of - true -> make_tmp_filename(); - F -> F - end, + http_status_code=[$2 | _]}=State) -> + TmpFilename = make_tmp_filename(), case file:open(TmpFilename, [write, delayed_write, raw]) of {ok, Fd} -> accumulate_response(Data, State#state{ @@ -419,30 +421,30 @@ accumulate_response(Data, {error, Reason} -> {error, {file_open_error, Reason}} end; -accumulate_response(Data, #state{cur_req = #request{save_response_to_file = SaveResponseToFile, +accumulate_response(Data, #state{cur_req = #request{save_response_to_file = true, tmp_file_fd = Fd}, transfer_encoding=chunked, - chunks = Chunks, + reply_buffer = Reply_buf, http_status_code=[$2 | _] - } = State) when SaveResponseToFile /= false -> - case file:write(Fd, [Chunks | Data]) of + } = State) -> + case file:write(Fd, [Reply_buf, Data]) of ok -> - State#state{chunks = []}; + State#state{reply_buffer = <<>>}; {error, Reason} -> {error, {file_write_error, Reason}} end; -accumulate_response(Data, #state{cur_req = #request{save_response_to_file = SaveResponseToFile, +accumulate_response(Data, #state{cur_req = #request{save_response_to_file = true, tmp_file_fd = Fd}, reply_buffer = RepBuf, http_status_code=[$2 | _] - } = State) when SaveResponseToFile /= false -> - case file:write(Fd, [RepBuf | Data]) of + } = State) -> + case file:write(Fd, [RepBuf, Data]) of ok -> - State#state{reply_buffer = []}; + State#state{reply_buffer = <<>>}; {error, Reason} -> {error, {file_write_error, Reason}} end; -accumulate_response([], State) -> +accumulate_response(<<>>, State) -> State; accumulate_response(Data, #state{reply_buffer = RepBuf, rep_buf_size = RepBufSize, @@ -451,7 +453,7 @@ accumulate_response(Data, #state{reply_buffer = RepBuf, #request{stream_to=StreamTo, req_id=ReqId, stream_chunk_size = Stream_chunk_size, response_format = Response_format} = CurReq, - RepBuf_1 = [Data | RepBuf], + RepBuf_1 = concat_binary([RepBuf, Data]), New_data_size = RepBufSize - Streamed_size, case StreamTo of undefined -> @@ -459,12 +461,12 @@ accumulate_response(Data, #state{reply_buffer = RepBuf, _ when New_data_size < Stream_chunk_size -> State#state{reply_buffer = RepBuf_1}; _ -> - {Stream_chunk, Rem_data} = split_list_at(flatten(lists:reverse(RepBuf_1)), Stream_chunk_size), + {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size), do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk), accumulate_response( Rem_data, State#state{ - reply_buffer = [], + reply_buffer = <<>>, streamed_size = Streamed_size + Stream_chunk_size}) end. @@ -491,11 +493,11 @@ handle_sock_closed(#state{cur_req=undefined} = State) -> %% We check for IsClosing because this the server could have sent a %% Connection-Close header and has closed the socket to indicate end %% of response. There maybe requests pipelined which need a response. -handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC, - is_closing=IsClosing, - cur_req=#request{tmp_file_name=TmpFilename, - tmp_file_fd=Fd} = CurReq, - status=get_body, recvd_headers=Headers}=State) -> +handle_sock_closed(#state{reply_buffer = Buf, reqs = Reqs, http_status_code = SC, + is_closing = IsClosing, + cur_req = #request{tmp_file_name=TmpFilename, + tmp_file_fd=Fd} = CurReq, + status = get_body, recvd_headers = Headers}=State) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format} = CurReq, case IsClosing of @@ -519,11 +521,11 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC, do_connect(Host, Port, _Options, #state{is_ssl=true, ssl_options=SSLOptions}, Timeout) -> ssl:connect(Host, Port, - [{nodelay, true}, {active, false} | SSLOptions], + [binary, {nodelay, true}, {active, false} | SSLOptions], Timeout); do_connect(Host, Port, _Options, _State, Timeout) -> gen_tcp:connect(Host, Port, - [{nodelay, true}, {active, false}], + [binary, {nodelay, true}, {active, false}], Timeout). do_send(Sock, Req, true) -> ssl:send(Sock, Req); @@ -602,7 +604,7 @@ send_req_1(#url{abspath = AbsPath, io:format("Err: ~p~n", [Err]), Err end, - do_setopts(Sock, [{active, true}], State#state.is_ssl), + do_setopts(Sock, [{active, once}], State#state.is_ssl), SndRes. add_auth_headers(#url{username = User, @@ -758,12 +760,12 @@ chunk_request_body(Body, _ChunkSize, Acc) when list(Body) -> parse_response(_Data, #state{cur_req = undefined}=State) -> State#state{status = idle}; -parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, - cur_req=CurReq}=State) -> +parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs, + cur_req = CurReq} = State) -> #request{from=From, stream_to=StreamTo, req_id=ReqId, method=Method, response_format = Resp_format} = CurReq, MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity), - case scan_header(Data, Acc) of + case scan_header(Acc, Data) of {yes, Headers, Data_1} -> do_trace("Recvd Header Data -> ~s~n----~n", [Headers]), do_trace("Recvd headers~n--- Headers Begin ---~n~s~n--- Headers End ---~n~n", [Headers]), @@ -779,7 +781,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, ok end, State_1 = State#state{recvd_headers=Headers_1, status=get_body, - reply_buffer = [], + reply_buffer = <<>>, http_status_code=StatCode, is_closing=IsClosing}, put(conn_close, ConnClose), TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")), @@ -818,7 +820,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, send_async_headers(ReqId, StreamTo, StatCode, Headers_1), case parse_11_response(Data_1, State_1#state{transfer_encoding=chunked, chunk_size=chunk_start, - reply_buffer=[], chunks=[]}) of + reply_buffer = <<>>}) of {error, Reason} -> fail_pipelined_requests(State_1, {error, {Reason, @@ -830,7 +832,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, undefined when HttpVsn == "HTTP/1.0"; ConnClose == "close" -> send_async_headers(ReqId, StreamTo, StatCode, Headers_1), - State_1#state{reply_buffer=[Data_1]}; + State_1#state{reply_buffer = Data_1}; undefined -> fail_pipelined_requests(State_1, {error, {content_length_undefined, @@ -842,7 +844,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, send_async_headers(ReqId, StreamTo, StatCode, Headers_1), do_trace("Recvd Content-Length of ~p~n", [V_1]), State_2 = State_1#state{rep_buf_size=0, - reply_buffer=[], + reply_buffer = <<>>, content_length=V_1}, case parse_11_response(Data_1, State_2) of {error, Reason} -> @@ -861,9 +863,9 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs, end end; {no, Acc_1} when MaxHeaderSize == infinity -> - State#state{reply_buffer=Acc_1}; - {no, Acc_1} when length(Acc_1) < MaxHeaderSize -> - State#state{reply_buffer=Acc_1}; + State#state{reply_buffer = Acc_1}; + {no, Acc_1} when size(Acc_1) < MaxHeaderSize -> + State#state{reply_buffer = Acc_1}; {no, _Acc_1} -> fail_pipelined_requests(State, {error, max_headers_size_exceeded}), {error, max_headers_size_exceeded} @@ -878,122 +880,97 @@ is_connection_closing(_, _) -> false. parse_11_response(DataRecvd, #state{transfer_encoding=chunked, chunk_size=chunk_start, - cur_req=CurReq, - reply_buffer=Buf - }=State) -> - case scan_crlf(DataRecvd, Buf) of + chunk_size_buffer = Chunk_sz_buf + } = State) -> + case scan_crlf(Chunk_sz_buf, DataRecvd) of {yes, ChunkHeader, Data_1} -> case parse_chunk_header(ChunkHeader) of {error, Reason} -> {error, Reason}; ChunkSize -> - #request{stream_to=StreamTo, req_id=ReqId, - response_format = Response_format} = CurReq, %% - %% Do we have to preserve the chunk encoding when streaming? + %% Do we have to preserve the chunk encoding when + %% streaming? NO. This should be transparent to the client + %% process. Chunked encoding was only introduced to make + %% it efficient for the server. %% - do_interim_reply(StreamTo, Response_format, - ReqId, {chunk_start, ChunkSize}), - RemLen = length(Data_1), + RemLen = size(Data_1), do_trace("Determined chunk size: ~p. Already recvd: ~p~n", [ChunkSize, RemLen]), - parse_11_response(Data_1, State#state{rep_buf_size=0, - reply_buffer=[], - deleted_crlf=true, - chunk_size=ChunkSize}) + parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>, + deleted_crlf = true, + recvd_chunk_size = 0, + chunk_size = ChunkSize}) end; {no, Data_1} -> - State#state{reply_buffer=Data_1, rep_buf_size=length(Data_1)} + State#state{chunk_size_buffer = Data_1} end; -%% This clause is there to remove the CRLF between two chunks +%% This clause is to remove the CRLF between two chunks %% parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, - chunk_size=tbd, - chunks = Chunks, - cur_req=CurReq, - reply_buffer=Buf}=State) -> - case scan_crlf(DataRecvd, Buf) of + #state{transfer_encoding = chunked, + chunk_size = tbd, + chunk_size_buffer = Buf}=State) -> + case scan_crlf(Buf, DataRecvd) of {yes, _, NextChunk} -> - #request{stream_to=StreamTo, req_id=ReqId, - response_format = Response_format} = CurReq, - %% - %% Do we have to preserve the chunk encoding when streaming? - %% - State_1 = State#state{chunk_size=chunk_start, - rep_buf_size=0, - reply_buffer=[], - deleted_crlf=true}, - State_2 = case StreamTo of - undefined -> - State_1#state{chunks = [Buf | Chunks]}; - _ -> - %% Flush out all buffered data as chunk is ending - do_interim_reply(StreamTo, Response_format, ReqId, - lists:reverse([Buf | Chunks])), - do_interim_reply(StreamTo, Response_format, - ReqId, chunk_end), - State_1#state{chunks = [], streamed_size = 0} - end, - parse_11_response(NextChunk, State_2); + State_1 = State#state{chunk_size = chunk_start, + chunk_size_buffer = <<>>, +%% reply_buffer = Buf_1, + deleted_crlf = true}, + parse_11_response(NextChunk, State_1); {no, Data_1} -> - State#state{reply_buffer=Data_1, rep_buf_size=length(Data_1)} +%% State#state{reply_buffer = Data_1, rep_buf_size = size(Data_1)} + State#state{chunk_size_buffer = Data_1} end; %% This clause deals with the end of a chunked transfer parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, chunk_size=0, - cur_req=CurReq, + #state{transfer_encoding = chunked, chunk_size = 0, + cur_req = CurReq, deleted_crlf = DelCrlf, - reply_buffer=Trailer, reqs=Reqs}=State) -> + reply_buffer = Trailer, reqs = Reqs}=State) -> do_trace("Detected end of chunked transfer...~n", []), DataRecvd_1 = case DelCrlf of false -> DataRecvd; true -> - [$\r, $\n | DataRecvd] - end, - #request{stream_to=StreamTo, req_id=ReqId, - response_format = Response_format} = CurReq, - case scan_header(DataRecvd_1, Trailer) of + <<$\r, $\n, DataRecvd/binary>> + end, + case scan_header(Trailer, DataRecvd_1) of {yes, _TEHeaders, Rem} -> {_, Reqs_1} = queue:out(Reqs), - %% - %% Do we have to preserve the chunk encoding when streaming? Nope. - %% - do_interim_reply(StreamTo, Response_format, ReqId, chunk_end), - State_1 = handle_response(CurReq, State#state{reqs=Reqs_1}), + State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}), parse_response(Rem, reset_state(State_1)); {no, Rem} -> - State#state{reply_buffer=Rem, rep_buf_size=length(Rem), deleted_crlf=false} + State#state{reply_buffer = Rem, rep_buf_size = size(Rem), deleted_crlf = false} end; %% This clause extracts a chunk, given the size. parse_11_response(DataRecvd, - #state{transfer_encoding=chunked, chunk_size=CSz, - rep_buf_size=RepBufSz}=State) -> - NeedBytes = CSz - RepBufSz, - DataLen = length(DataRecvd), + #state{transfer_encoding = chunked, + chunk_size = CSz, + recvd_chunk_size = Recvd_csz, + rep_buf_size = RepBufSz} = State) -> + NeedBytes = CSz - Recvd_csz, + DataLen = size(DataRecvd), do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]), case DataLen >= NeedBytes of true -> - {RemChunk, RemData} = split_list_at(DataRecvd, NeedBytes), + {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes), do_trace("Recvd another chunk...~n", []), do_trace("RemData -> ~p~n", [RemData]), case accumulate_response(RemChunk, State) of {error, Reason} -> do_trace("Error accumulating response --> ~p~n", [Reason]), {error, Reason}; - #state{reply_buffer = NewRepBuf, - chunks = NewChunks} = State_1 -> - State_2 = State_1#state{reply_buffer=[], - chunks = [lists:reverse(NewRepBuf) | NewChunks], - rep_buf_size=0, - chunk_size=tbd}, + #state{} = State_1 -> + State_2 = State_1#state{chunk_size=tbd}, parse_11_response(RemData, State_2) end; false -> - accumulate_response(DataRecvd, State#state{rep_buf_size=(RepBufSz + DataLen)}) + accumulate_response(DataRecvd, + State#state{rep_buf_size = RepBufSz + DataLen, + recvd_chunk_size = Recvd_csz + DataLen}) end; %% This clause to extract the body when Content-Length is specified @@ -1001,10 +978,10 @@ parse_11_response(DataRecvd, #state{content_length=CL, rep_buf_size=RepBufSz, reqs=Reqs}=State) -> NeedBytes = CL - RepBufSz, - DataLen = length(DataRecvd), + DataLen = size(DataRecvd), case DataLen >= NeedBytes of true -> - {RemBody, Rem} = split_list_at(DataRecvd, NeedBytes), + {RemBody, Rem} = split_binary(DataRecvd, NeedBytes), {_, Reqs_1} = queue:out(Reqs), State_1 = accumulate_response(RemBody, State), State_2 = handle_response(State_1#state.cur_req, State_1#state{reqs=Reqs_1}), @@ -1023,15 +1000,8 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, #state{http_status_code = SCode, send_timer = ReqTimer, reply_buffer = RepBuf, - transfer_encoding = TEnc, - chunks = Chunks, recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false -> - Body = case TEnc of - chunked -> - lists:reverse(Chunks); - _ -> - lists:reverse(RepBuf) - end, + Body = RepBuf, State_1 = set_cur_request(State), file:close(Fd), ResponseBody = case TmpFilename of @@ -1047,14 +1017,9 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format}, #state{http_status_code=SCode, recvd_headers=RespHeaders, - reply_buffer=RepBuf, transfer_encoding=TEnc, - chunks=Chunks, send_timer=ReqTimer}=State) -> - Body = case TEnc of - chunked -> - lists:reverse(Chunks); - _ -> - lists:reverse(RepBuf) - end, + reply_buffer = RepBuf, + send_timer=ReqTimer}=State) -> + Body = RepBuf, %% State_1 = set_cur_request(State), State_1 = case get(conn_close) of "close" -> @@ -1070,10 +1035,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, set_cur_request(State_1). reset_state(State) -> - State#state{status=get_header, rep_buf_size=0, streamed_size = 0, - content_length=undefined, - reply_buffer=[], chunks=[], recvd_headers=[], deleted_crlf=false, - http_status_code=undefined, chunk_size=undefined, transfer_encoding=undefined}. + State#state{status = get_header, + rep_buf_size = 0, + streamed_size = 0, + content_length = undefined, + reply_buffer = <<>>, + chunk_size_buffer = <<>>, + recvd_headers = [], + deleted_crlf = false, + http_status_code = undefined, + chunk_size = undefined, + transfer_encoding = undefined}. set_cur_request(#state{reqs = Reqs} = State) -> case queue:to_list(Reqs) of @@ -1084,7 +1056,7 @@ set_cur_request(#state{reqs = Reqs} = State) -> end. parse_headers(Headers) -> - case scan_crlf(Headers, []) of + case scan_crlf(Headers) of {yes, StatusLine, T} -> Headers_1 = parse_headers_1(T), case parse_status_line(StatusLine) of @@ -1107,6 +1079,8 @@ parse_headers(Headers) -> % SP. A recipient MAY replace any linear white space with a single % SP before interpreting the field value or forwarding the message % downstream. +parse_headers_1(B) when is_binary(B) -> + parse_headers_1(binary_to_list(B)); parse_headers_1(String) -> parse_headers_1(String, [], []). @@ -1135,6 +1109,8 @@ parse_headers_1([], L, Acc) -> end, lists:reverse(Acc_1). +parse_status_line(Line) when is_binary(Line) -> + parse_status_line(binary_to_list(Line)); parse_status_line(Line) -> parse_status_line(Line, get_prot_vsn, [], []). parse_status_line([32 | T], get_prot_vsn, ProtVsn, StatCode) -> @@ -1148,6 +1124,8 @@ parse_status_line([H | T], get_status_code, ProtVsn, StatCode) -> parse_status_line([], _, _, _) -> http_09. +parse_header(B) when is_binary(B) -> + parse_header(binary_to_list(B)); parse_header(L) -> parse_header(L, []). parse_header([$: | V], Acc) -> @@ -1157,13 +1135,75 @@ parse_header([H | T], Acc) -> parse_header([], _) -> invalid. -scan_header([$\n|T], [$\r,$\n,$\r|L]) -> {yes, lists:reverse([$\n,$\r| L]), T}; -scan_header([H|T], L) -> scan_header(T, [H|L]); -scan_header([], L) -> {no, L}. +scan_header(Bin) -> + case get_crlf_crlf_pos(Bin, 0) of + {yes, Pos} -> + {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos), + {yes, Headers, Body}; + no -> + {no, Bin} + end. + +scan_header(Bin1, Bin2) when size(Bin1) < 4 -> + scan_header(<>); +scan_header(Bin1, <<>>) -> + scan_header(Bin1); +scan_header(Bin1, Bin2) -> + Bin1_already_scanned_size = size(Bin1) - 4, + <> = Bin1, + Bin_to_scan = <>, + case get_crlf_crlf_pos(Bin_to_scan, 0) of + {yes, Pos} -> + {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos), + {yes, <>, Body}; + no -> + {no, <>} + end. + +get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_crlf_pos(Rest, Pos + 1); +get_crlf_crlf_pos(<<>>, _) -> no. + +scan_crlf(Bin) -> + case get_crlf_pos(Bin) of + {yes, Pos} -> + {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos), + {yes, Prefix, Suffix}; + no -> + {no, Bin} + end. + +scan_crlf(<<>>, Bin2) -> + scan_crlf(Bin2); +scan_crlf(Bin1, Bin2) when size(Bin1) < 2 -> + scan_crlf(<>); +scan_crlf(Bin1, Bin2) -> + scan_crlf_1(size(Bin1) - 2, Bin1, Bin2). + +scan_crlf_1(Bin1_head_size, Bin1, Bin2) -> + <> = Bin1, + Bin3 = <>, + case get_crlf_pos(Bin3) of + {yes, Pos} -> + {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos), + {yes, concat_binary([Bin1_head, Prefix]), Suffix}; + no -> + {no, concat_binary([Bin1, Bin2])} + end. -scan_crlf([$\n|T], [$\r | L]) -> {yes, lists:reverse(L), T}; -scan_crlf([H|T], L) -> scan_crlf(T, [H|L]); -scan_crlf([], L) -> {no, L}. +get_crlf_pos(Bin) -> + get_crlf_pos(Bin, 0). + +get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, Pos}; +get_crlf_pos(<<_, Rest/binary>>, Pos) -> get_crlf_pos(Rest, Pos + 1); +get_crlf_pos(<<>>, _) -> no. + +%% scan_crlf(<<$\n, T/binary>>, [$\r | L]) -> {yes, lists:reverse(L), T}; +%% scan_crlf(<>, L) -> scan_crlf(T, [H|L]); +%% scan_crlf(<<>>, L) -> {no, L}; +%% scan_crlf([$\n|T], [$\r | L]) -> {yes, lists:reverse(L), T}; +%% scan_crlf([H|T], L) -> scan_crlf(T, [H|L]); +%% scan_crlf([], L) -> {no, L}. fmt_val(L) when list(L) -> L; fmt_val(I) when integer(I) -> integer_to_list(I); @@ -1221,16 +1261,16 @@ parse_chunk_header([]) -> parse_chunk_header(ChunkHeader) -> parse_chunk_header(ChunkHeader, []). -parse_chunk_header([$; | _], Acc) -> +parse_chunk_header(<<$;, _/binary>>, Acc) -> hexlist_to_integer(lists:reverse(Acc)); -parse_chunk_header([H | T], Acc) -> +parse_chunk_header(<>, Acc) -> case is_whitespace(H) of true -> parse_chunk_header(T, Acc); false -> parse_chunk_header(T, [H | Acc]) end; -parse_chunk_header([], Acc) -> +parse_chunk_header(<<>>, Acc) -> hexlist_to_integer(lists:reverse(Acc)). is_whitespace($\s) -> true; @@ -1249,6 +1289,8 @@ format_response_data(Resp_format, Body) -> case Resp_format of list when is_list(Body) -> flatten(Body); + list when is_binary(Body) -> + binary_to_list(Body); binary when is_list(Body) -> list_to_binary(Body); _ -> @@ -1399,4 +1441,8 @@ get_stream_chunk_size(Options) -> _ -> ?DEFAULT_STREAM_CHUNK_SIZE end. - + +get_inac_timeout(#state{cur_req = #request{options = Opts}}) -> + get_value(inactivity_timeout, Opts, infinity); +get_inac_timeout(#state{cur_req = undefined}) -> + infinity. diff --git a/src/ibrowse/ibrowse_lb.erl b/src/ibrowse/ibrowse_lb.erl index 03dc4e02..9212ccd7 100644 --- a/src/ibrowse/ibrowse_lb.erl +++ b/src/ibrowse/ibrowse_lb.erl @@ -7,7 +7,7 @@ %%%------------------------------------------------------------------- -module(ibrowse_lb). --vsn('$Id: ibrowse_lb.erl,v 1.1 2008/03/27 01:36:21 chandrusf Exp $ '). +-vsn('$Id: ibrowse_lb.erl,v 1.2 2009/07/01 22:43:19 chandrusf Exp $ '). -author(chandru). -behaviour(gen_server). %%-------------------------------------------------------------------- @@ -39,13 +39,6 @@ max_pipeline_size, num_cur_sessions = 0}). --import(ibrowse_lib, [ - parse_url/1, - printable_date/0, - get_value/3 - ]). - - -include("ibrowse.hrl"). %%==================================================================== diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl index de8865ff..f3559b51 100644 --- a/src/ibrowse/ibrowse_test.erl +++ b/src/ibrowse/ibrowse_test.erl @@ -4,13 +4,14 @@ %%% Created : 14 Oct 2003 by Chandrashekhar Mullaparthi -module(ibrowse_test). --vsn('$Id: ibrowse_test.erl,v 1.3 2008/05/21 15:28:11 chandrusf Exp $ '). +-vsn('$Id: ibrowse_test.erl,v 1.4 2009/07/01 22:43:19 chandrusf Exp $ '). -export([ load_test/3, send_reqs_1/3, do_send_req/2, unit_tests/0, unit_tests/1, + unit_tests_1/2, drv_ue_test/0, drv_ue_test/1, ue_test/0, @@ -20,8 +21,6 @@ i_do_async_req_list/4 ]). --import(ibrowse_lib, [printable_date/0]). - %% Use ibrowse:set_max_sessions/3 and ibrowse:set_max_pipeline_size/3 to %% tweak settings before running the load test. The defaults are 10 and 10. load_test(Url, NumWorkers, NumReqsPerWorker) when is_list(Url), @@ -49,7 +48,7 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) -> log_msg("End time : ~1000.p~n", [calendar:now_to_local_time(End_time)]), Elapsed_time_secs = trunc(timer:now_diff(End_time, Start_time) / 1000000), log_msg("Elapsed : ~p~n", [Elapsed_time_secs]), - log_msg("Reqs/sec : ~p~n", [(NumWorkers*NumReqsPerWorker) / Elapsed_time_secs]), + log_msg("Reqs/sec : ~p~n", [round(trunc((NumWorkers*NumReqsPerWorker) / Elapsed_time_secs))]), dump_errors(). init_results() -> @@ -183,11 +182,23 @@ unit_tests() -> unit_tests([]). unit_tests(Options) -> + {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options]), + receive + {done, Pid} -> + ok; + {'DOWN', Ref, _, _, Info} -> + io:format("Test process crashed: ~p~n", [Info]) + after 60000 -> + io:format("Timed out waiting for tests to complete~n", []) + end. + +unit_tests_1(Parent, Options) -> lists:foreach(fun({Url, Method}) -> execute_req(Url, Method, Options); ({Url, Method, X_Opts}) -> execute_req(Url, Method, X_Opts ++ Options) - end, ?TEST_LIST). + end, ?TEST_LIST), + Parent ! {done, self()}. verify_chunked_streaming() -> verify_chunked_streaming([]). @@ -201,10 +212,10 @@ verify_chunked_streaming(Options) -> [{response_format, binary} | Options]), io:format("Fetching data with streaming as list...~n", []), Async_response_list = do_async_req_list( - Url, get, [{response_format, list}]), + Url, get, [{response_format, list} | Options]), io:format("Fetching data with streaming as binary...~n", []), Async_response_bin = do_async_req_list( - Url, get, [{response_format, binary}]), + Url, get, [{response_format, binary} | Options]), compare_responses(Result_without_streaming, Async_response_list, Async_response_bin). compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) -> @@ -220,6 +231,9 @@ compare_responses({ok, St_code, _, Body_1}, {ok, St_code, _, Body_2}, {ok, St_co _ -> io:format("All three bodies are different!~n", []) end, + io:format("Body_1 -> ~p~n", [Body_1]), + io:format("Body_2 -> ~p~n", [Body_2]), + io:format("Body_3 -> ~p~n", [Body_3]), fail_bodies_mismatch; compare_responses(R1, R2, R3) -> io:format("R1 -> ~p~n", [R1]), @@ -227,12 +241,12 @@ compare_responses(R1, R2, R3) -> io:format("R3 -> ~p~n", [R3]), fail. -do_async_req_list(Url) -> - do_async_req_list(Url, get). +%% do_async_req_list(Url) -> +%% do_async_req_list(Url, get). -do_async_req_list(Url, Method) -> - do_async_req_list(Url, Method, [{stream_to, self()}, - {stream_chunk_size, 1000}]). +%% do_async_req_list(Url, Method) -> +%% do_async_req_list(Url, Method, [{stream_to, self()}, +%% {stream_chunk_size, 1000}]). do_async_req_list(Url, Method, Options) -> {Pid,_} = erlang:spawn_monitor(?MODULE, i_do_async_req_list, @@ -270,10 +284,6 @@ wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) -> receive {ibrowse_async_headers, Req_id, StatCode, Headers} -> wait_for_async_resp(Req_id, StatCode, Headers, Body); - {ibrowse_async_response, Req_id, {chunk_start, _}} -> - wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body); - {ibrowse_async_response, Req_id, chunk_end} -> - wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body); {ibrowse_async_response_end, Req_id} -> Body_1 = list_to_binary(lists:reverse(Body)), {ok, Acc_Stat_code, Acc_Headers, Body_1}; @@ -284,7 +294,7 @@ wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) -> end. execute_req(Url, Method, Options) -> - io:format("~s, ~p: ", [Url, Method]), + io:format("~7.7w, ~50.50s: ", [Method, Url]), Result = (catch ibrowse:send_req(Url, [], Method, [], Options)), case Result of {ok, SCode, _H, _B} -> -- cgit v1.2.3