From 796a41761839395c6e08667a33f17a84893ae637 Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Fri, 24 Sep 2010 14:18:56 +0000 Subject: Upgrading ibrowse from version 1.6.2 to 2.0.1. This version fixes a serious issue regarding streaming of chunked HTTP(S) responses. The issue is that the client occasionally gets blocked or receives a timeout (if inactivity_timeout parameter is given to ibrowse). This fixes part of ticket COUCHDB-491. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1000880 13f79535-47bb-0310-9956-ffa450edef68 --- src/ibrowse/ibrowse_http_client.erl | 227 +++++++++++++++++++++++------------- 1 file changed, 146 insertions(+), 81 deletions(-) (limited to 'src/ibrowse/ibrowse_http_client.erl') diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl index 1633e5b3..16d9b872 100644 --- a/src/ibrowse/ibrowse_http_client.erl +++ b/src/ibrowse/ibrowse_http_client.erl @@ -47,7 +47,8 @@ status_line, raw_headers, is_closing, send_timer, content_length, deleted_crlf = false, transfer_encoding, - chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size, + chunk_size, chunk_size_buffer = <<>>, + recvd_chunk_size, interim_reply_sent = false, lb_ets_tid, cur_pipeline_size = 0, prev_req_id }). @@ -57,7 +58,7 @@ req_id, stream_chunk_size, save_response_to_file = false, - tmp_file_name, tmp_file_fd, + tmp_file_name, tmp_file_fd, preserve_chunked_encoding, response_format}). -import(ibrowse_lib, [ @@ -82,8 +83,13 @@ start_link(Args) -> gen_server:start_link(?MODULE, Args, []). stop(Conn_pid) -> - catch gen_server:call(Conn_pid, stop), - ok. + case catch gen_server:call(Conn_pid, stop) of + {'EXIT', {timeout, _}} -> + exit(Conn_pid, kill), + ok; + _ -> + ok + end. send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) -> gen_server:call( @@ -171,6 +177,7 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} (terminate/2 is called) %%-------------------------------------------------------------------- handle_info({tcp, _Sock, Data}, #state{status = Status} = State) -> +%% io:format("Recvd data: ~p~n", [Data]), do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]), handle_sock_data(Data, State); handle_info({ssl, _Sock, Data}, State) -> @@ -178,13 +185,14 @@ handle_info({ssl, _Sock, Data}, State) -> handle_info({stream_next, Req_id}, #state{socket = Socket, cur_req = #request{req_id = Req_id}} = State) -> + %% io:format("Client process set {active, once}~n", []), do_setopts(Socket, [{active, once}], State), {noreply, State}; handle_info({stream_next, _Req_id}, State) -> {noreply, State}; -handle_info({tcp_closed, _Sock}, State) -> +handle_info({tcp_closed, _Sock}, State) -> do_trace("TCP connection closed by peer!~n", []), handle_sock_closed(State), {stop, normal, State}; @@ -194,11 +202,11 @@ handle_info({ssl_closed, _Sock}, State) -> {stop, normal, State}; handle_info({tcp_error, _Sock}, State) -> - io:format("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]), + do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]), handle_sock_closed(State), {stop, normal, State}; handle_info({ssl_error, _Sock}, State) -> - io:format("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]), + do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]), handle_sock_closed(State), {stop, normal, State}; @@ -233,7 +241,8 @@ handle_info(Info, State) -> %% Returns: any (ignored by gen_server) %%-------------------------------------------------------------------- terminate(_Reason, State) -> - do_close(State). + do_close(State), + ok. %%-------------------------------------------------------------------- %% Func: code_change/3 @@ -269,6 +278,7 @@ handle_sock_data(Data, #state{status = get_header}=State) -> end; handle_sock_data(Data, #state{status = get_body, + socket = Socket, content_length = CL, http_status_code = StatCode, recvd_headers = Headers, @@ -293,6 +303,19 @@ handle_sock_data(Data, #state{status = get_body, fail_pipelined_requests(State, {error, {Reason, {stat_code, StatCode}, Headers}}), {stop, normal, State}; + #state{cur_req = #request{caller_controls_socket = Ccs}, + interim_reply_sent = Irs} = State_1 -> + case Irs of + true -> + active_once(State_1); + false when Ccs == true -> + do_setopts(Socket, [{active, once}], State); + false -> + active_once(State_1) + end, + State_2 = State_1#state{interim_reply_sent = false}, + set_inac_timer(State_2), + {noreply, State_2}; State_1 -> active_once(State_1), set_inac_timer(State_1), @@ -338,17 +361,25 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf {error, Reason} -> {error, {file_write_error, Reason}} end; -accumulate_response(<<>>, State) -> - State; -accumulate_response(Data, #state{reply_buffer = RepBuf, - rep_buf_size = RepBufSize, - streamed_size = Streamed_size, - cur_req = CurReq}=State) -> - #request{stream_to=StreamTo, req_id=ReqId, - stream_chunk_size = Stream_chunk_size, - response_format = Response_format, - caller_controls_socket = Caller_controls_socket} = CurReq, - RepBuf_1 = list_to_binary([RepBuf, Data]), +%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket = Ccs}, +%% socket = Socket} = State) -> +%% case Ccs of +%% true -> +%% do_setopts(Socket, [{active, once}], State); +%% false -> +%% ok +%% end, +%% State; +accumulate_response(Data, #state{reply_buffer = RepBuf, + rep_buf_size = RepBufSize, + streamed_size = Streamed_size, + cur_req = CurReq}=State) -> + #request{stream_to = StreamTo, + req_id = ReqId, + stream_chunk_size = Stream_chunk_size, + response_format = Response_format, + caller_controls_socket = Caller_controls_socket} = CurReq, + RepBuf_1 = <>, New_data_size = RepBufSize - Streamed_size, case StreamTo of undefined -> @@ -356,15 +387,21 @@ accumulate_response(Data, #state{reply_buffer = RepBuf, _ when Caller_controls_socket == true -> do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1), State#state{reply_buffer = <<>>, + interim_reply_sent = true, streamed_size = Streamed_size + size(RepBuf_1)}; _ when New_data_size >= Stream_chunk_size -> {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size), do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk), - accumulate_response( - Rem_data, - State#state{ - reply_buffer = <<>>, - streamed_size = Streamed_size + Stream_chunk_size}); + State_1 = State#state{ + reply_buffer = <<>>, + interim_reply_sent = true, + streamed_size = Streamed_size + Stream_chunk_size}, + case Rem_data of + <<>> -> + State_1; + _ -> + accumulate_response(Rem_data, State_1) + end; _ -> State#state{reply_buffer = RepBuf_1} end. @@ -498,9 +535,9 @@ do_close(#state{socket = Sock, is_ssl = true, use_proxy = true, proxy_tunnel_setup = Pts - }) when Pts /= done -> gen_tcp:close(Sock); -do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock); -do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock). + }) when Pts /= done -> catch gen_tcp:close(Sock); +do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock); +do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock). active_once(#state{cur_req = #request{caller_controls_socket = true}}) -> ok; @@ -542,25 +579,17 @@ send_req_1(From, end, State_2 = check_ssl_options(Options, State_1), do_trace("Connecting...~n", []), - Start_ts = now(), Conn_timeout = get_value(connect_timeout, Options, Timeout), case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of {ok, Sock} -> - do_trace("Connected!~n", []), - End_ts = now(), - Timeout_1 = case Timeout of - infinity -> - infinity; - _ -> - Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)) - end, + do_trace("Connected! Socket: ~1000.p~n", [Sock]), State_3 = State_2#state{socket = Sock, connect_timeout = Conn_timeout}, - send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3); + send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3); Err -> shutting_down(State_2), do_trace("Error connecting. Reason: ~1000.p~n", [Err]), - gen_server:reply(From, {error, conn_failed}), + gen_server:reply(From, {error, {conn_failed, Err}}), {stop, normal, State_2} end; @@ -580,8 +609,9 @@ send_req_1(From, use_proxy = true, is_ssl = true} = State) -> NewReq = #request{ - method = connect, - options = Options + method = connect, + preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false), + options = Options }, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1), @@ -611,13 +641,13 @@ send_req_1(From, Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), - gen_server:reply(From, {error, send_failed}), + gen_server:reply(From, {error, {send_failed, Err}}), {stop, normal, State_1} end; Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), - gen_server:reply(From, {error, send_failed}), + gen_server:reply(From, {error, {send_failed, Err}}), {stop, normal, State_1} end; @@ -666,7 +696,9 @@ send_req_1(From, save_response_to_file = SaveResponseToFile, stream_chunk_size = get_stream_chunk_size(Options), response_format = Resp_format, - from = From}, + from = From, + preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false) + }, State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)}, Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1), {Req, Body_1} = make_request(Method, @@ -705,13 +737,13 @@ send_req_1(From, Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), - gen_server:reply(From, {error, send_failed}), + gen_server:reply(From, {error, {send_failed, Err}}), {stop, normal, State_1} end; Err -> shutting_down(State_1), do_trace("Send failed... Reason: ~p~n", [Err]), - gen_server:reply(From, {error, send_failed}), + gen_server:reply(From, {error, {send_failed, Err}}), {stop, normal, State_1} end. @@ -768,14 +800,14 @@ http_auth_digest(Username, Password) -> ibrowse_lib:encode_base64(Username ++ [$: | Password]). make_request(Method, Headers, AbsPath, RelPath, Body, Options, - #state{use_proxy = UseProxy}) -> + #state{use_proxy = UseProxy, is_ssl = Is_ssl}) -> HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})), Headers_1 = case get_value(content_length, Headers, false) of false when (Body == []) or - (Body == <<>>) or - is_tuple(Body) or - is_function(Body) -> + (Body == <<>>) or + is_tuple(Body) or + is_function(Body) -> Headers; false when is_binary(Body) -> [{"content-length", integer_to_list(size(Body))} | Headers]; @@ -799,7 +831,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, Headers_3 = cons_headers(Headers_2), Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of true -> - AbsPath; + case Is_ssl of + true -> + RelPath; + false -> + AbsPath + end; false -> RelPath end, @@ -1017,7 +1054,7 @@ upgrade_to_ssl(#state{socket = Socket, send_queued_requests(lists:reverse(Q), State_1); Err -> do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]), - do_error_reply(State, {error, send_failed}), + do_error_reply(State, {error, {send_failed, Err}}), {error, send_failed} end. @@ -1029,12 +1066,12 @@ send_queued_requests([{From, Url, Headers, Method, Body, Options, Timeout} | Q], case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of {noreply, State_1} -> send_queued_requests(Q, State_1); - _ -> + Err -> do_trace("Error sending queued SSL request: ~n" "URL : ~s~n" "Method : ~p~n" "Headers : ~p~n", [Url, Method, Headers]), - do_error_reply(State, {error, send_failed}), + do_error_reply(State, {error, {send_failed, Err}}), {error, send_failed} end. @@ -1046,11 +1083,12 @@ is_connection_closing(_, _) -> false. %% This clause determines the chunk size when given data from the beginning of the chunk parse_11_response(DataRecvd, #state{transfer_encoding = chunked, - chunk_size = chunk_start, + chunk_size = chunk_start, chunk_size_buffer = Chunk_sz_buf } = State) -> case scan_crlf(Chunk_sz_buf, DataRecvd) of {yes, ChunkHeader, Data_1} -> + State_1 = maybe_accumulate_ce_data(State, <>), ChunkSize = parse_chunk_header(ChunkHeader), %% %% Do we have to preserve the chunk encoding when @@ -1061,10 +1099,10 @@ parse_11_response(DataRecvd, RemLen = size(Data_1), do_trace("Determined chunk size: ~p. Already recvd: ~p~n", [ChunkSize, RemLen]), - parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>, - deleted_crlf = true, - recvd_chunk_size = 0, - chunk_size = ChunkSize}); + parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>, + deleted_crlf = true, + recvd_chunk_size = 0, + chunk_size = ChunkSize}); {no, Data_1} -> State#state{chunk_size_buffer = Data_1} end; @@ -1074,13 +1112,15 @@ parse_11_response(DataRecvd, parse_11_response(DataRecvd, #state{transfer_encoding = chunked, chunk_size = tbd, - chunk_size_buffer = Buf}=State) -> + chunk_size_buffer = Buf + } = State) -> case scan_crlf(Buf, DataRecvd) of {yes, _, NextChunk} -> - State_1 = State#state{chunk_size = chunk_start, - chunk_size_buffer = <<>>, - deleted_crlf = true}, - parse_11_response(NextChunk, State_1); + State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>), + State_2 = State_1#state{chunk_size = chunk_start, + chunk_size_buffer = <<>>, + deleted_crlf = true}, + parse_11_response(NextChunk, State_2); {no, Data_1} -> State#state{chunk_size_buffer = Data_1} end; @@ -1090,9 +1130,10 @@ parse_11_response(DataRecvd, %% received is silently discarded. parse_11_response(DataRecvd, #state{transfer_encoding = chunked, chunk_size = 0, - cur_req = CurReq, - deleted_crlf = DelCrlf, - chunk_size_buffer = Trailer, reqs = Reqs}=State) -> + cur_req = CurReq, + deleted_crlf = DelCrlf, + chunk_size_buffer = Trailer, + reqs = Reqs} = State) -> do_trace("Detected end of chunked transfer...~n", []), DataRecvd_1 = case DelCrlf of false -> @@ -1101,12 +1142,14 @@ parse_11_response(DataRecvd, <<$\r, $\n, DataRecvd/binary>> end, case scan_header(Trailer, DataRecvd_1) of - {yes, _TEHeaders, Rem} -> + {yes, TEHeaders, Rem} -> {_, Reqs_1} = queue:out(Reqs), - State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}), - parse_response(Rem, reset_state(State_1)); + State_1 = maybe_accumulate_ce_data(State, <>), + State_2 = handle_response(CurReq, + State_1#state{reqs = Reqs_1}), + parse_response(Rem, reset_state(State_2)); {no, Rem} -> - State#state{chunk_size_buffer = Rem, deleted_crlf = false} + accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false}) end; %% This clause extracts a chunk, given the size. @@ -1121,7 +1164,7 @@ parse_11_response(DataRecvd, case DataLen >= NeedBytes of true -> {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes), - do_trace("Recvd another chunk...~n", []), + do_trace("Recvd another chunk...~p~n", [RemChunk]), do_trace("RemData -> ~p~n", [RemData]), case accumulate_response(RemChunk, State) of {error, Reason} -> @@ -1155,6 +1198,11 @@ parse_11_response(DataRecvd, accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)}) end. +maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) -> + State; +maybe_accumulate_ce_data(State, Data) -> + accumulate_response(Data, State). + handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, response_format = Resp_format, save_response_to_file = SaveResponseToFile, @@ -1177,11 +1225,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, _ -> {file, TmpFilename} end, + {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options), Reply = case get_value(give_raw_headers, Options, false) of true -> - {ok, Status_line, Raw_headers, ResponseBody}; + {ok, Status_line, Raw_headers_1, ResponseBody}; false -> - {ok, SCode, RespHeaders, ResponseBody} + {ok, SCode, Resp_headers_1, ResponseBody} end, State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply), cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}), @@ -1192,16 +1241,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId, #state{http_status_code = SCode, status_line = Status_line, raw_headers = Raw_headers, - recvd_headers = RespHeaders, + recvd_headers = Resp_headers, reply_buffer = RepBuf, send_timer = ReqTimer} = State) -> Body = RepBuf, %% State_1 = set_cur_request(State), + {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options), Reply = case get_value(give_raw_headers, Options, false) of true -> - {ok, Status_line, Raw_headers, Body}; + {ok, Status_line, Raw_headers_1, Body}; false -> - {ok, SCode, RespHeaders, Body} + {ok, SCode, Resp_headers_1, Body} end, State_1 = case get(conn_close) of "close" -> @@ -1227,7 +1277,8 @@ reset_state(State) -> deleted_crlf = false, http_status_code = undefined, chunk_size = undefined, - transfer_encoding = undefined}. + transfer_encoding = undefined + }. set_cur_request(#state{reqs = Reqs} = State) -> case queue:to_list(Reqs) of @@ -1459,15 +1510,29 @@ send_async_headers(_ReqId, undefined, _, _State) -> ok; send_async_headers(ReqId, StreamTo, Give_raw_headers, #state{status_line = Status_line, raw_headers = Raw_headers, - recvd_headers = Headers, http_status_code = StatCode - }) -> + recvd_headers = Headers, http_status_code = StatCode, + cur_req = #request{options = Opts} + }) -> + {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts), case Give_raw_headers of false -> - catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers}; + catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1}; true -> - catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers} + catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1} end. +maybe_add_custom_headers(Headers, Raw_headers, Opts) -> + Custom_headers = get_value(add_custom_headers, Opts, []), + Headers_1 = Headers ++ Custom_headers, + Raw_headers_1 = case Custom_headers of + [_ | _] when is_binary(Raw_headers) -> + Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")), + <>; + _ -> + Raw_headers + end, + {Headers_1, Raw_headers_1}. + format_response_data(Resp_format, Body) -> case Resp_format of list when is_list(Body) -> -- cgit v1.2.3