From 796a41761839395c6e08667a33f17a84893ae637 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Fri, 24 Sep 2010 14:18:56 +0000 Subject: Upgrading ibrowse from version 1.6.2 to 2.0.1. This version fixes a serious issue regarding streaming of chunked HTTP(S) responses. The issue is that the client occasionally gets blocked or receives a timeout (if inactivity_timeout parameter is given to ibrowse). This fixes part of ticket COUCHDB-491. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1000880 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_changes_feed.erl | 2 +- src/couchdb/couch_rep_httpc.erl | 2 +- src/ibrowse/Makefile.am | 2 +- src/ibrowse/ibrowse.app.in | 2 +- src/ibrowse/ibrowse.erl | 78 +++++++++-- src/ibrowse/ibrowse_http_client.erl | 227 +++++++++++++++++++++------------ src/ibrowse/ibrowse_lb.erl | 23 +++- src/ibrowse/ibrowse_test.erl | 51 +++++++- 8 files changed, 281 insertions(+), 106 deletions(-) diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index 3f6079e7..7f7d3a38 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -187,7 +187,7 @@ handle_cast(_Msg, State) -> handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) -> handle_headers(list_to_integer(Code), Hdrs, State); -handle_info({ibrowse_async_response, Id, {error,connection_closed}}, +handle_info({ibrowse_async_response, Id, {error, sel_conn_closed}}, #state{reqid=Id}=State) -> handle_retry(State); diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl index a47c26a3..06d4748a 100644 --- a/src/couchdb/couch_rep_httpc.erl +++ b/src/couchdb/couch_rep_httpc.erl @@ -168,7 +168,7 @@ process_response({error, Reason}, Req) -> pause = Pause } = Req, ShortReason = case Reason of - connection_closed -> + sel_conn_closed -> connection_closed; {'EXIT', {noproc, _}} -> noproc; diff --git a/src/ibrowse/Makefile.am b/src/ibrowse/Makefile.am index b5174862..39878f0a 100644 --- a/src/ibrowse/Makefile.am +++ b/src/ibrowse/Makefile.am @@ -10,7 +10,7 @@ ## License for the specific language governing permissions and limitations under ## the License. -ibrowseebindir = $(localerlanglibdir)/ibrowse-1.6.2/ebin +ibrowseebindir = $(localerlanglibdir)/ibrowse-2.0.1/ebin ibrowse_file_collection = \ ibrowse.app.in \ diff --git a/src/ibrowse/ibrowse.app.in b/src/ibrowse/ibrowse.app.in index 208c311b..8fc20663 100644 --- a/src/ibrowse/ibrowse.app.in +++ b/src/ibrowse/ibrowse.app.in @@ -1,6 +1,6 @@ {application, ibrowse, [{description, "HTTP client application"}, - {vsn, "1.6.2"}, + {vsn, "2.0.1"}, {modules, [ ibrowse, ibrowse_http_client, ibrowse_app, diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl index 09d36a36..7f8d8bcf 100644 --- a/src/ibrowse/ibrowse.erl +++ b/src/ibrowse/ibrowse.erl @@ -7,7 +7,7 @@ %%%------------------------------------------------------------------- %% @author Chandrashekhar Mullaparthi %% @copyright 2005-2010 Chandrashekhar Mullaparthi -%% @version 1.6.0 +%% @version 2.0.1 %% @doc The ibrowse application implements an HTTP 1.1 client. This %% module implements the API of the HTTP client. There is one named %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is @@ -236,6 +236,11 @@ send_req(Url, Headers, Method, Body) -> %% caller to get access to the raw status line and raw unparsed %% headers. Not quite sure why someone would want this, but one of my %% users asked for it, so here it is. +%% +%%
  • The preserve_chunked_encoding option enables the caller +%% to receive the raw data stream when the Transfer-Encoding of the server +%% response is Chunked. +%%
  • %% %% %% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response() @@ -266,7 +271,8 @@ send_req(Url, Headers, Method, Body) -> %% {socket_options, Sock_opts} | %% {transfer_encoding, {chunked, ChunkSize}} | %% {headers_as_is, boolean()} | -%% {give_raw_headers, boolean()} +%% {give_raw_headers, boolean()} | +%% {preserve_chunked_encoding,boolean()} %% %% stream_to() = process() | {process(), once} %% process() = pid() | atom() @@ -302,23 +308,45 @@ send_req(Url, Headers, Method, Body, Options, Timeout) -> Options_1 = merge_options(Host, Port, Options), {SSLOptions, IsSSL} = case (Protocol == https) orelse - get_value(is_ssl, Options_1, false) of + get_value(is_ssl, Options_1, false) of false -> {[], false}; true -> {get_value(ssl_options, Options_1, []), true} end, - case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url, + try_routing_request(Lb_pid, Parsed_url, + Max_sessions, + Max_pipeline_size, + {SSLOptions, IsSSL}, + Headers, Method, Body, Options_1, Timeout, 0); + Err -> + {error, {url_parsing_failed, Err}} + end. + +try_routing_request(Lb_pid, Parsed_url, + Max_sessions, + Max_pipeline_size, + {SSLOptions, IsSSL}, + Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 -> + case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url, Max_sessions, Max_pipeline_size, {SSLOptions, IsSSL}) of - {ok, Conn_Pid} -> - do_send_req(Conn_Pid, Parsed_url, Headers, - Method, Body, Options_1, Timeout); - Err -> - Err + {ok, Conn_Pid} -> + case do_send_req(Conn_Pid, Parsed_url, Headers, + Method, Body, Options_1, Timeout) of + {error, sel_conn_closed} -> + try_routing_request(Lb_pid, Parsed_url, + Max_sessions, + Max_pipeline_size, + {SSLOptions, IsSSL}, + Headers, Method, Body, Options_1, Timeout, Try_count + 1); + Res -> + Res end; Err -> - {error, {url_parsing_failed, Err}} - end. + Err + end; +try_routing_request(_, _, _, _, _, _, _, _, _, _, _) -> + {error, retry_later}. merge_options(Host, Port, Options) -> Config_options = get_config_value({options, Host, Port}, []), @@ -337,11 +365,27 @@ get_lb_pid(Url) -> get_max_sessions(Host, Port, Options) -> get_value(max_sessions, Options, - get_config_value({max_sessions, Host, Port}, ?DEF_MAX_SESSIONS)). + get_config_value({max_sessions, Host, Port}, + default_max_sessions())). get_max_pipeline_size(Host, Port, Options) -> get_value(max_pipeline_size, Options, - get_config_value({max_pipeline_size, Host, Port}, ?DEF_MAX_PIPELINE_SIZE)). + get_config_value({max_pipeline_size, Host, Port}, + default_max_pipeline_size())). + +default_max_sessions() -> + safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS). + +default_max_pipeline_size() -> + safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE). + +safe_get_env(App, Key, Def_val) -> + case application:get_env(App, Key) of + undefined -> + Def_val; + {ok, Val} -> + Val + end. %% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3 %% for achieving the same effect. @@ -375,6 +419,10 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) -> Options, Timeout) of {'EXIT', {timeout, _}} -> {error, req_timedout}; + {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} -> + {error, sel_conn_closed}; + {error, connection_closed} -> + {error, sel_conn_closed}; {'EXIT', Reason} -> {error, {'EXIT', Reason}}; {ok, St_code, Headers, Body} = Ret when is_binary(Body) -> @@ -684,6 +732,10 @@ handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) -> handle_call(stop, _From, State) -> do_trace("IBROWSE shutting down~n", []), + ets:foldl(fun(#lb_pid{pid = Pid}, Acc) -> + ibrowse_lb:stop(Pid), + Acc + end, [], ibrowse_lb), {stop, normal, ok, State}; handle_call({set_config_value, Key, Val}, _From, State) -> diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl index 1633e5b3..16d9b872 100644 --- a/src/ibrowse/ibrowse_http_client.erl +++ b/src/ibrowse/ibrowse_http_client.erl @@ -47,7 +47,8 @@ status_line, raw_headers, is_closing, send_timer, content_length, deleted_crlf = false, transfer_encoding, - chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, + chunk_size, chunk_size_buffer = <<>>, + recvd_chunk_size, interim_reply_sent = false, lb_ets_tid, cur_pipeline_size = 0, prev_req_id }). @@ -57,7 +58,7 @@ req_id, stream_chunk_size, save_response_to_file = false, - tmp_file_name, tmp_file_fd, + tmp_file_name, tmp_file_fd, preserve_chunked_encoding, response_format}). -import(ibrowse_lib, [ @@ -82,8 +83,13 @@ start_link(Args) -> gen_server:start_link(?MODULE, Args, []). stop(Conn_pid) -> - catch gen_server:call(Conn_pid, stop), - ok. + case catch gen_server:call(Conn_pid, stop) of + {'EXIT', {timeout, _}} -> + exit(Conn_pid, kill), + ok; + _ -> + ok + end. send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> gen_server:call( @@ -171,6 +177,7 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- handle_info({tcp, _Sock, Data}, #state{status = Status} = State) -> +%% io:format("Recvd data: ~p~n", [Data]), do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]), handle_sock_data(Data, State); handle_info({ssl, _Sock, Data}, State) -> @@ -178,13 +185,14 @@ handle_info({ssl, _Sock, Data}, State) -> handle_info({stream_next, Req_id}, #state{socket = Socket, cur_req = #request{req_id = Req_id}} = State) -> + %% io:format("Client process set {active, once}~n", []), do_setopts(Socket, [{active, once}], State), {noreply, State}; handle_info({stream_next, _Req_id}, State) -> {noreply, State}; -handle_info({tcp_closed, _Sock}, State) -> +handle_info({tcp_closed, _Sock}, State) -> do_trace("TCP connection closed by peer!~n", []), handle_sock_closed(State), {stop, normal, State}; @@ -194,11 +202,11 @@ handle_info({ssl_closed, _Sock}, State) -> {stop, normal, State}; handle_info({tcp_error, _Sock}, State) -> - io:format("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]), + do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]), handle_sock_closed(State), {stop, normal, State}; handle_info({ssl_error, _Sock}, State) -> - io:format("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]), + do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]), handle_sock_closed(State), {stop, normal, State}; @@ -233,7 +241,8 @@ handle_info(Info, State) -> %% Returns: any (ignored by gen_server) %%-------------------------------------------------------------------- terminate(_Reason, State) -> - do_close(State). + do_close(State), + ok. %%-------------------------------------------------------------------- %% Func: code_change/3 @@ -269,6 +278,7 @@ handle_sock_data(Data, #state{status = get_header}=State) -> end; handle_sock_data(Data, #state{status = get_body, + socket = Socket, content_length = CL, http_status_code = StatCode, recvd_headers = Headers, @@ -293,6 +303,19 @@ handle_sock_data(Data, #state{status = get_body, fail_pipelined_requests(State, {error, {Reason, {stat_code, StatCode}, Headers}}), {stop, normal, State}; + #state{cur_req = #request{caller_controls_socket = Ccs}, + interim_reply_sent = Irs} = State_1 -> + case Irs of + true -> + active_once(State_1); + false when Ccs == true -> + do_setopts(Socket, [{active, once}], State); + false -> + active_once(State_1) + end, + State_2 = State_1#state{interim_reply_sent = false}, + set_inac_timer(State_2), + {noreply, State_2}; State_1 -> active_once(State_1), set_inac_timer(State_1), @@ -338,17 +361,25 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf {error, Reason} -> {error, {file_write_error, Reason}} end; -accumulate_response(<<>>, State) -> - State; -accumulate_response(Data, #state{reply_buffer = RepBuf, - rep_buf_size = RepBufSize, - streamed_size = Streamed_size, - cur_req = CurReq}=State) -> - #request{stream_to=StreamTo, req_id=ReqId, - stream_chunk_size = Stream_chunk_size, - response_format = Response_format, - caller_controls_socket = Caller_controls_socket} = CurReq, - RepBuf_1 = list_to_binary([RepBuf, Data]), +%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket = Ccs}, +%% socket = Socket} = State) -> +%% case Ccs of +%% true -> +%% do_setopts(Socket, [{active, once}], State); +%% false -> +%% ok +%% end, +%% State; +accumulate_response(Data, #state{reply_buffer = RepBuf, + rep_buf_size = RepBufSize, + streamed_size = Streamed_size, + cur_req = CurReq}=State) -> + #request{stream_to = StreamTo, + req_id = ReqId, + stream_chunk_size = Stream_chunk_size, + response_format = Response_format, + caller_controls_socket = Caller_controls_socket} = CurReq, + RepBuf_1 = <>, New_data_size = RepBufSize - Streamed_size, case StreamTo of undefined -> @@ -356,15 +387,21 @@ accumulate_response(Data, #state{reply_buffer = RepBuf, _ when Caller_controls_socket == true -> do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1), State#state{reply_buffer = <<>>, + interim_reply_sent = true, streamed_size = Streamed_size + size(RepBuf_1)}; _ when New_data_size >= Stream_chunk_size -> {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size), do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk), - accumulate_response( - Rem_data, - State#state{ - reply_buffer = <<>>, - streamed_size = Streamed_size + Stream_chunk_size}); + State_1 = State#state{ + reply_buffer = <<>>, + interim_reply_sent = true, + streamed_size = Streamed_size + Stream_chunk_size}, + case Rem_data of + <<>> -> + State_1; + _ -> + accumulate_response(Rem_data, State_1) + end; _ -> State#state{reply_buffer = RepBuf_1} end. @@ -498,9 +535,9 @@ do_close(#state{socket = Sock, is_ssl = true, use_proxy = true, proxy_tunnel_setup = Pts - }) when Pts /= done -> gen_tcp:close(Sock); -do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock); -do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock). + }) when Pts /= done -> catch gen_tcp:close(Sock); +do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock); +do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock). active_once(#state{cur_req = #request{caller_controls_socket = true}}) -> ok; @@ -542,25 +579,17 @@ send_req_1(From, end, State_2 = check_ssl_options(Options, State_1), do_trace("Connecting...~n", []), - Start_ts = now(), Conn_timeout = get_value(connect_timeout, Options, Timeout), case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of {ok, Sock} -> - do_trace("Connected!~n", []), - End_ts = now(), - Timeout_1 = case Timeout of - infinity -> - infinity; - _ -> - Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)) - end, + do_trace("Connected! Socket: ~1000.p~n", [Sock]), State_3 = State_2#state{socket = Sock, connect_timeout = Conn_timeout}, - send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3); + send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3); Err -> shutting_down(State_2), do_trace("Error connecting. Reason: ~1000.p~n", [Err]), - gen_server:reply(From, {error, conn_failed}), + gen_server:reply(From, {error, {conn_failed, Err}}), {stop, normal, State_2} end; @@ -580,8 +609,9 @@ send_req_1(From, use_proxy = true, is_ssl = true} = State) -> NewReq = #request{ - method = connect, - options = Options + method = connect, + preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false), + options = Options }, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1), @@ -611,13 +641,13 @@ send_req_1(From, Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), - gen_server:reply(From, {error, send_failed}), + gen_server:reply(From, {error, {send_failed, Err}}), {stop, normal, State_1} end; Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), - gen_server:reply(From, {error, send_failed}), + gen_server:reply(From, {error, {send_failed, Err}}), {stop, normal, State_1} end; @@ -666,7 +696,9 @@ send_req_1(From, save_response_to_file = SaveResponseToFile, stream_chunk_size = get_stream_chunk_size(Options), response_format = Resp_format, - from = From}, + from = From, + preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false) + }, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1), {Req, Body_1} = make_request(Method, @@ -705,13 +737,13 @@ send_req_1(From, Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), - gen_server:reply(From, {error, send_failed}), + gen_server:reply(From, {error, {send_failed, Err}}), {stop, normal, State_1} end; Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), - gen_server:reply(From, {error, send_failed}), + gen_server:reply(From, {error, {send_failed, Err}}), {stop, normal, State_1} end. @@ -768,14 +800,14 @@ http_auth_digest(Username, Password) -> ibrowse_lib:encode_base64(Username ++ [$: | Password]). make_request(Method, Headers, AbsPath, RelPath, Body, Options, - #state{use_proxy = UseProxy}) -> + #state{use_proxy = UseProxy, is_ssl = Is_ssl}) -> HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})), Headers_1 = case get_value(content_length, Headers, false) of false when (Body == []) or - (Body == <<>>) or - is_tuple(Body) or - is_function(Body) -> + (Body == <<>>) or + is_tuple(Body) or + is_function(Body) -> Headers; false when is_binary(Body) -> [{"content-length", integer_to_list(size(Body))} | Headers]; @@ -799,7 +831,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, Headers_3 = cons_headers(Headers_2), Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of true -> - AbsPath; + case Is_ssl of + true -> + RelPath; + false -> + AbsPath + end; false -> RelPath end, @@ -1017,7 +1054,7 @@ upgrade_to_ssl(#state{socket = Socket, send_queued_requests(lists:reverse(Q), State_1); Err -> do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]), - do_error_reply(State, {error, send_failed}), + do_error_reply(State, {error, {send_failed, Err}}), {error, send_failed} end. @@ -1029,12 +1066,12 @@ send_queued_requests([{From, Url, Headers, Method, Body, Options, Timeout} | Q], case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of {noreply, State_1} -> send_queued_requests(Q, State_1); - _ -> + Err -> do_trace("Error sending queued SSL request: ~n" "URL : ~s~n" "Method : ~p~n" "Headers : ~p~n", [Url, Method, Headers]), - do_error_reply(State, {error, send_failed}), + do_error_reply(State, {error, {send_failed, Err}}), {error, send_failed} end. @@ -1046,11 +1083,12 @@ is_connection_closing(_, _) -> false. %% This clause determines the chunk size when given data from the beginning of the chunk parse_11_response(DataRecvd, #state{transfer_encoding = chunked, - chunk_size = chunk_start, + chunk_size = chunk_start, chunk_size_buffer = Chunk_sz_buf } = State) -> case scan_crlf(Chunk_sz_buf, DataRecvd) of {yes, ChunkHeader, Data_1} -> + State_1 = maybe_accumulate_ce_data(State, <>), ChunkSize = parse_chunk_header(ChunkHeader), %% %% Do we have to preserve the chunk encoding when @@ -1061,10 +1099,10 @@ parse_11_response(DataRecvd, RemLen = size(Data_1), do_trace("Determined chunk size: ~p. Already recvd: ~p~n", [ChunkSize, RemLen]), - parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>, - deleted_crlf = true, - recvd_chunk_size = 0, - chunk_size = ChunkSize}); + parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>, + deleted_crlf = true, + recvd_chunk_size = 0, + chunk_size = ChunkSize}); {no, Data_1} -> State#state{chunk_size_buffer = Data_1} end; @@ -1074,13 +1112,15 @@ parse_11_response(DataRecvd, parse_11_response(DataRecvd, #state{transfer_encoding = chunked, chunk_size = tbd, - chunk_size_buffer = Buf}=State) -> + chunk_size_buffer = Buf + } = State) -> case scan_crlf(Buf, DataRecvd) of {yes, _, NextChunk} -> - State_1 = State#state{chunk_size = chunk_start, - chunk_size_buffer = <<>>, - deleted_crlf = true}, - parse_11_response(NextChunk, State_1); + State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>), + State_2 = State_1#state{chunk_size = chunk_start, + chunk_size_buffer = <<>>, + deleted_crlf = true}, + parse_11_response(NextChunk, State_2); {no, Data_1} -> State#state{chunk_size_buffer = Data_1} end; @@ -1090,9 +1130,10 @@ parse_11_response(DataRecvd, %% received is silently discarded. parse_11_response(DataRecvd, #state{transfer_encoding = chunked, chunk_size = 0, - cur_req = CurReq, - deleted_crlf = DelCrlf, - chunk_size_buffer = Trailer, reqs = Reqs}=State) -> + cur_req = CurReq, + deleted_crlf = DelCrlf, + chunk_size_buffer = Trailer, + reqs = Reqs} = State) -> do_trace("Detected end of chunked transfer...~n", []), DataRecvd_1 = case DelCrlf of false -> @@ -1101,12 +1142,14 @@ parse_11_response(DataRecvd, <<$\r, $\n, DataRecvd/binary>> end, case scan_header(Trailer, DataRecvd_1) of - {yes, _TEHeaders, Rem} -> + {yes, TEHeaders, Rem} -> {_, Reqs_1} = queue:out(Reqs), - State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}), - parse_response(Rem, reset_state(State_1)); + State_1 = maybe_accumulate_ce_data(State, <>), + State_2 = handle_response(CurReq, + State_1#state{reqs = Reqs_1}), + parse_response(Rem, reset_state(State_2)); {no, Rem} -> - State#state{chunk_size_buffer = Rem, deleted_crlf = false} + accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false}) end; %% This clause extracts a chunk, given the size. @@ -1121,7 +1164,7 @@ parse_11_response(DataRecvd, case DataLen >= NeedBytes of true -> {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes), - do_trace("Recvd another chunk...~n", []), + do_trace("Recvd another chunk...~p~n", [RemChunk]), do_trace("RemData -> ~p~n", [RemData]), case accumulate_response(RemChunk, State) of {error, Reason} -> @@ -1155,6 +1198,11 @@ parse_11_response(DataRecvd, accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)}) end. +maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) -> + State; +maybe_accumulate_ce_data(State, Data) -> + accumulate_response(Data, State). + handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format, save_response_to_file = SaveResponseToFile, @@ -1177,11 +1225,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, _ -> {file, TmpFilename} end, + {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options), Reply = case get_value(give_raw_headers, Options, false) of true -> - {ok, Status_line, Raw_headers, ResponseBody}; + {ok, Status_line, Raw_headers_1, ResponseBody}; false -> - {ok, SCode, RespHeaders, ResponseBody} + {ok, SCode, Resp_headers_1, ResponseBody} end, State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply), cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}), @@ -1192,16 +1241,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, #state{http_status_code = SCode, status_line = Status_line, raw_headers = Raw_headers, - recvd_headers = RespHeaders, + recvd_headers = Resp_headers, reply_buffer = RepBuf, send_timer = ReqTimer} = State) -> Body = RepBuf, %% State_1 = set_cur_request(State), + {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options), Reply = case get_value(give_raw_headers, Options, false) of true -> - {ok, Status_line, Raw_headers, Body}; + {ok, Status_line, Raw_headers_1, Body}; false -> - {ok, SCode, RespHeaders, Body} + {ok, SCode, Resp_headers_1, Body} end, State_1 = case get(conn_close) of "close" -> @@ -1227,7 +1277,8 @@ reset_state(State) -> deleted_crlf = false, http_status_code = undefined, chunk_size = undefined, - transfer_encoding = undefined}. + transfer_encoding = undefined + }. set_cur_request(#state{reqs = Reqs} = State) -> case queue:to_list(Reqs) of @@ -1459,15 +1510,29 @@ send_async_headers(_ReqId, undefined, _, _State) -> ok; send_async_headers(ReqId, StreamTo, Give_raw_headers, #state{status_line = Status_line, raw_headers = Raw_headers, - recvd_headers = Headers, http_status_code = StatCode - }) -> + recvd_headers = Headers, http_status_code = StatCode, + cur_req = #request{options = Opts} + }) -> + {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts), case Give_raw_headers of false -> - catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers}; + catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1}; true -> - catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers} + catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1} end. +maybe_add_custom_headers(Headers, Raw_headers, Opts) -> + Custom_headers = get_value(add_custom_headers, Opts, []), + Headers_1 = Headers ++ Custom_headers, + Raw_headers_1 = case Custom_headers of + [_ | _] when is_binary(Raw_headers) -> + Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")), + <>; + _ -> + Raw_headers + end, + {Headers_1, Raw_headers_1}. + format_response_data(Resp_format, Body) -> case Resp_format of list when is_list(Body) -> diff --git a/src/ibrowse/ibrowse_lb.erl b/src/ibrowse/ibrowse_lb.erl index 6bc600be..0e001d48 100644 --- a/src/ibrowse/ibrowse_lb.erl +++ b/src/ibrowse/ibrowse_lb.erl @@ -16,7 +16,8 @@ %% External exports -export([ start_link/1, - spawn_connection/5 + spawn_connection/5, + stop/1 ]). %% gen_server callbacks @@ -85,6 +86,14 @@ spawn_connection(Lb_pid, Url, is_integer(Max_sessions) -> gen_server:call(Lb_pid, {spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}). + +stop(Lb_pid) -> + case catch gen_server:call(Lb_pid, stop) of + {'EXIT', {timeout, _}} -> + exit(Lb_pid, kill); + ok -> + ok + end. %%-------------------------------------------------------------------- %% Function: handle_call/3 %% Description: Handling call messages @@ -120,6 +129,18 @@ handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From, ets:insert(Tid, {{1, Pid}, []}), {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}}; +handle_call(stop, _From, #state{ets_tid = undefined} = State) -> + gen_server:reply(_From, ok), + {stop, normal, State}; + +handle_call(stop, _From, #state{ets_tid = Tid} = State) -> + ets:foldl(fun({{_, Pid}, _}, Acc) -> + ibrowse_http_client:stop(Pid), + Acc + end, [], Tid), + gen_server:reply(_From, ok), + {stop, normal, State}; + handle_call(Request, _From, State) -> Reply = {unknown_request, Request}, {reply, Reply, State}. diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl index 00b0244f..e7d6e59e 100644 --- a/src/ibrowse/ibrowse_test.erl +++ b/src/ibrowse/ibrowse_test.erl @@ -17,6 +17,7 @@ ue_test/1, verify_chunked_streaming/0, verify_chunked_streaming/1, + test_chunked_streaming_once/0, i_do_async_req_list/4, test_stream_once/3, test_stream_once/4 @@ -260,7 +261,20 @@ verify_chunked_streaming(Options) -> io:format("Fetching data with streaming as binary...~n", []), Async_response_bin = do_async_req_list( Url, get, [{response_format, binary} | Options]), - compare_responses(Result_without_streaming, Async_response_list, Async_response_bin). + io:format("Fetching data with streaming as binary, {active, once}...~n", []), + Async_response_bin_once = do_async_req_list( + Url, get, [once, {response_format, binary} | Options]), + compare_responses(Result_without_streaming, Async_response_list, Async_response_bin), + compare_responses(Result_without_streaming, Async_response_list, Async_response_bin_once). + +test_chunked_streaming_once() -> + test_chunked_streaming_once([]). + +test_chunked_streaming_once(Options) -> + Url = "http://www.httpwatch.com/httpgallery/chunked/", + io:format("URL: ~s~n", [Url]), + io:format("Fetching data with streaming as binary, {active, once}...~n", []), + do_async_req_list(Url, get, [once, {response_format, binary} | Options]). compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) -> success; @@ -313,31 +327,54 @@ wait_for_resp(Pid) -> Msg -> io:format("Recvd unknown message: ~p~n", [Msg]), wait_for_resp(Pid) - after 10000 -> + after 100000 -> {error, timeout} end. i_do_async_req_list(Parent, Url, Method, Options) -> - Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]), + Options_1 = case lists:member(once, Options) of + true -> + [{stream_to, {self(), once}} | (Options -- [once])]; + false -> + [{stream_to, self()} | Options] + end, + Res = ibrowse:send_req(Url, [], Method, [], Options_1), case Res of {ibrowse_req_id, Req_id} -> - Result = wait_for_async_resp(Req_id, undefined, undefined, []), + Result = wait_for_async_resp(Req_id, Options, undefined, undefined, []), Parent ! {async_result, self(), Result}; Err -> Parent ! {async_result, self(), Err} end. -wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) -> +wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, Body) -> receive {ibrowse_async_headers, Req_id, StatCode, Headers} -> - wait_for_async_resp(Req_id, StatCode, Headers, Body); + %% io:format("Recvd headers...~n", []), + maybe_stream_next(Req_id, Options), + wait_for_async_resp(Req_id, Options, StatCode, Headers, Body); {ibrowse_async_response_end, Req_id} -> + io:format("Recvd end of response.~n", []), Body_1 = list_to_binary(lists:reverse(Body)), {ok, Acc_Stat_code, Acc_Headers, Body_1}; {ibrowse_async_response, Req_id, Data} -> - wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]); + maybe_stream_next(Req_id, Options), + %% io:format("Recvd data...~n", []), + wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, [Data | Body]); + {ibrowse_async_response, Req_id, {error, _} = Err} -> + {ok, Acc_Stat_code, Acc_Headers, Err}; Err -> {ok, Acc_Stat_code, Acc_Headers, Err} + after 10000 -> + {timeout, Acc_Stat_code, Acc_Headers, Body} + end. + +maybe_stream_next(Req_id, Options) -> + case lists:member(once, Options) of + true -> + ibrowse:stream_next(Req_id); + false -> + ok end. execute_req(Url, Method, Options) -> -- cgit v1.2.3