summaryrefslogtreecommitdiff
path: root/src/ibrowse/ibrowse_http_client.erl
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2010-09-24 14:18:56 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2010-09-24 14:18:56 +0000
commit796a41761839395c6e08667a33f17a84893ae637 (patch)
tree3a396e29813e897ecd839fc2128cdab0b8e92d24 /src/ibrowse/ibrowse_http_client.erl
parenta56f5e49f01de3aeb631a263a8801d6c2c677d89 (diff)
Upgrading ibrowse from version 1.6.2 to 2.0.1.
This version fixes a serious issue regarding streaming of chunked HTTP(S) responses. The issue is that the client occasionally gets blocked or receives a timeout (if inactivity_timeout parameter is given to ibrowse). This fixes part of ticket COUCHDB-491. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1000880 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/ibrowse/ibrowse_http_client.erl')
-rw-r--r--src/ibrowse/ibrowse_http_client.erl227
1 files changed, 146 insertions, 81 deletions
diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl
index 1633e5b3..16d9b872 100644
--- a/src/ibrowse/ibrowse_http_client.erl
+++ b/src/ibrowse/ibrowse_http_client.erl
@@ -47,7 +47,8 @@
status_line, raw_headers,
is_closing, send_timer, content_length,
deleted_crlf = false, transfer_encoding,
- chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
+ chunk_size, chunk_size_buffer = <<>>,
+ recvd_chunk_size, interim_reply_sent = false,
lb_ets_tid, cur_pipeline_size = 0, prev_req_id
}).
@@ -57,7 +58,7 @@
req_id,
stream_chunk_size,
save_response_to_file = false,
- tmp_file_name, tmp_file_fd,
+ tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
response_format}).
-import(ibrowse_lib, [
@@ -82,8 +83,13 @@ start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
stop(Conn_pid) ->
- catch gen_server:call(Conn_pid, stop),
- ok.
+ case catch gen_server:call(Conn_pid, stop) of
+ {'EXIT', {timeout, _}} ->
+ exit(Conn_pid, kill),
+ ok;
+ _ ->
+ ok
+ end.
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
gen_server:call(
@@ -171,6 +177,7 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
+%% io:format("Recvd data: ~p~n", [Data]),
do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
handle_sock_data(Data, State);
handle_info({ssl, _Sock, Data}, State) ->
@@ -178,13 +185,14 @@ handle_info({ssl, _Sock, Data}, State) ->
handle_info({stream_next, Req_id}, #state{socket = Socket,
cur_req = #request{req_id = Req_id}} = State) ->
+ %% io:format("Client process set {active, once}~n", []),
do_setopts(Socket, [{active, once}], State),
{noreply, State};
handle_info({stream_next, _Req_id}, State) ->
{noreply, State};
-handle_info({tcp_closed, _Sock}, State) ->
+handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
{stop, normal, State};
@@ -194,11 +202,11 @@ handle_info({ssl_closed, _Sock}, State) ->
{stop, normal, State};
handle_info({tcp_error, _Sock}, State) ->
- io:format("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+ do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State),
{stop, normal, State};
handle_info({ssl_error, _Sock}, State) ->
- io:format("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+ do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State),
{stop, normal, State};
@@ -233,7 +241,8 @@ handle_info(Info, State) ->
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
- do_close(State).
+ do_close(State),
+ ok.
%%--------------------------------------------------------------------
%% Func: code_change/3
@@ -269,6 +278,7 @@ handle_sock_data(Data, #state{status = get_header}=State) ->
end;
handle_sock_data(Data, #state{status = get_body,
+ socket = Socket,
content_length = CL,
http_status_code = StatCode,
recvd_headers = Headers,
@@ -293,6 +303,19 @@ handle_sock_data(Data, #state{status = get_body,
fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
+ #state{cur_req = #request{caller_controls_socket = Ccs},
+ interim_reply_sent = Irs} = State_1 ->
+ case Irs of
+ true ->
+ active_once(State_1);
+ false when Ccs == true ->
+ do_setopts(Socket, [{active, once}], State);
+ false ->
+ active_once(State_1)
+ end,
+ State_2 = State_1#state{interim_reply_sent = false},
+ set_inac_timer(State_2),
+ {noreply, State_2};
State_1 ->
active_once(State_1),
set_inac_timer(State_1),
@@ -338,17 +361,25 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf
{error, Reason} ->
{error, {file_write_error, Reason}}
end;
-accumulate_response(<<>>, State) ->
- State;
-accumulate_response(Data, #state{reply_buffer = RepBuf,
- rep_buf_size = RepBufSize,
- streamed_size = Streamed_size,
- cur_req = CurReq}=State) ->
- #request{stream_to=StreamTo, req_id=ReqId,
- stream_chunk_size = Stream_chunk_size,
- response_format = Response_format,
- caller_controls_socket = Caller_controls_socket} = CurReq,
- RepBuf_1 = list_to_binary([RepBuf, Data]),
+%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket = Ccs},
+%% socket = Socket} = State) ->
+%% case Ccs of
+%% true ->
+%% do_setopts(Socket, [{active, once}], State);
+%% false ->
+%% ok
+%% end,
+%% State;
+accumulate_response(Data, #state{reply_buffer = RepBuf,
+ rep_buf_size = RepBufSize,
+ streamed_size = Streamed_size,
+ cur_req = CurReq}=State) ->
+ #request{stream_to = StreamTo,
+ req_id = ReqId,
+ stream_chunk_size = Stream_chunk_size,
+ response_format = Response_format,
+ caller_controls_socket = Caller_controls_socket} = CurReq,
+ RepBuf_1 = <<RepBuf/binary, Data/binary>>,
New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
@@ -356,15 +387,21 @@ accumulate_response(Data, #state{reply_buffer = RepBuf,
_ when Caller_controls_socket == true ->
do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
State#state{reply_buffer = <<>>,
+ interim_reply_sent = true,
streamed_size = Streamed_size + size(RepBuf_1)};
_ when New_data_size >= Stream_chunk_size ->
{Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
- accumulate_response(
- Rem_data,
- State#state{
- reply_buffer = <<>>,
- streamed_size = Streamed_size + Stream_chunk_size});
+ State_1 = State#state{
+ reply_buffer = <<>>,
+ interim_reply_sent = true,
+ streamed_size = Streamed_size + Stream_chunk_size},
+ case Rem_data of
+ <<>> ->
+ State_1;
+ _ ->
+ accumulate_response(Rem_data, State_1)
+ end;
_ ->
State#state{reply_buffer = RepBuf_1}
end.
@@ -498,9 +535,9 @@ do_close(#state{socket = Sock,
is_ssl = true,
use_proxy = true,
proxy_tunnel_setup = Pts
- }) when Pts /= done -> gen_tcp:close(Sock);
-do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock);
-do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock).
+ }) when Pts /= done -> catch gen_tcp:close(Sock);
+do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock);
+do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock).
active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
ok;
@@ -542,25 +579,17 @@ send_req_1(From,
end,
State_2 = check_ssl_options(Options, State_1),
do_trace("Connecting...~n", []),
- Start_ts = now(),
Conn_timeout = get_value(connect_timeout, Options, Timeout),
case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
{ok, Sock} ->
- do_trace("Connected!~n", []),
- End_ts = now(),
- Timeout_1 = case Timeout of
- infinity ->
- infinity;
- _ ->
- Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000))
- end,
+ do_trace("Connected! Socket: ~1000.p~n", [Sock]),
State_3 = State_2#state{socket = Sock,
connect_timeout = Conn_timeout},
- send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
+ send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
Err ->
shutting_down(State_2),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
- gen_server:reply(From, {error, conn_failed}),
+ gen_server:reply(From, {error, {conn_failed, Err}}),
{stop, normal, State_2}
end;
@@ -580,8 +609,9 @@ send_req_1(From,
use_proxy = true,
is_ssl = true} = State) ->
NewReq = #request{
- method = connect,
- options = Options
+ method = connect,
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
+ options = Options
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@@ -611,13 +641,13 @@ send_req_1(From,
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
@@ -666,7 +696,9 @@ send_req_1(From,
save_response_to_file = SaveResponseToFile,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
- from = From},
+ from = From,
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
+ },
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
{Req, Body_1} = make_request(Method,
@@ -705,13 +737,13 @@ send_req_1(From,
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end.
@@ -768,14 +800,14 @@ http_auth_digest(Username, Password) ->
ibrowse_lib:encode_base64(Username ++ [$: | Password]).
make_request(Method, Headers, AbsPath, RelPath, Body, Options,
- #state{use_proxy = UseProxy}) ->
+ #state{use_proxy = UseProxy, is_ssl = Is_ssl}) ->
HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
Headers_1 =
case get_value(content_length, Headers, false) of
false when (Body == []) or
- (Body == <<>>) or
- is_tuple(Body) or
- is_function(Body) ->
+ (Body == <<>>) or
+ is_tuple(Body) or
+ is_function(Body) ->
Headers;
false when is_binary(Body) ->
[{"content-length", integer_to_list(size(Body))} | Headers];
@@ -799,7 +831,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options,
Headers_3 = cons_headers(Headers_2),
Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
true ->
- AbsPath;
+ case Is_ssl of
+ true ->
+ RelPath;
+ false ->
+ AbsPath
+ end;
false ->
RelPath
end,
@@ -1017,7 +1054,7 @@ upgrade_to_ssl(#state{socket = Socket,
send_queued_requests(lists:reverse(Q), State_1);
Err ->
do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]),
- do_error_reply(State, {error, send_failed}),
+ do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed}
end.
@@ -1029,12 +1066,12 @@ send_queued_requests([{From, Url, Headers, Method, Body, Options, Timeout} | Q],
case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of
{noreply, State_1} ->
send_queued_requests(Q, State_1);
- _ ->
+ Err ->
do_trace("Error sending queued SSL request: ~n"
"URL : ~s~n"
"Method : ~p~n"
"Headers : ~p~n", [Url, Method, Headers]),
- do_error_reply(State, {error, send_failed}),
+ do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed}
end.
@@ -1046,11 +1083,12 @@ is_connection_closing(_, _) -> false.
%% This clause determines the chunk size when given data from the beginning of the chunk
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
- chunk_size = chunk_start,
+ chunk_size = chunk_start,
chunk_size_buffer = Chunk_sz_buf
} = State) ->
case scan_crlf(Chunk_sz_buf, DataRecvd) of
{yes, ChunkHeader, Data_1} ->
+ State_1 = maybe_accumulate_ce_data(State, <<ChunkHeader/binary, $\r, $\n>>),
ChunkSize = parse_chunk_header(ChunkHeader),
%%
%% Do we have to preserve the chunk encoding when
@@ -1061,10 +1099,10 @@ parse_11_response(DataRecvd,
RemLen = size(Data_1),
do_trace("Determined chunk size: ~p. Already recvd: ~p~n",
[ChunkSize, RemLen]),
- parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>,
- deleted_crlf = true,
- recvd_chunk_size = 0,
- chunk_size = ChunkSize});
+ parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>,
+ deleted_crlf = true,
+ recvd_chunk_size = 0,
+ chunk_size = ChunkSize});
{no, Data_1} ->
State#state{chunk_size_buffer = Data_1}
end;
@@ -1074,13 +1112,15 @@ parse_11_response(DataRecvd,
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
chunk_size = tbd,
- chunk_size_buffer = Buf}=State) ->
+ chunk_size_buffer = Buf
+ } = State) ->
case scan_crlf(Buf, DataRecvd) of
{yes, _, NextChunk} ->
- State_1 = State#state{chunk_size = chunk_start,
- chunk_size_buffer = <<>>,
- deleted_crlf = true},
- parse_11_response(NextChunk, State_1);
+ State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>),
+ State_2 = State_1#state{chunk_size = chunk_start,
+ chunk_size_buffer = <<>>,
+ deleted_crlf = true},
+ parse_11_response(NextChunk, State_2);
{no, Data_1} ->
State#state{chunk_size_buffer = Data_1}
end;
@@ -1090,9 +1130,10 @@ parse_11_response(DataRecvd,
%% received is silently discarded.
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked, chunk_size = 0,
- cur_req = CurReq,
- deleted_crlf = DelCrlf,
- chunk_size_buffer = Trailer, reqs = Reqs}=State) ->
+ cur_req = CurReq,
+ deleted_crlf = DelCrlf,
+ chunk_size_buffer = Trailer,
+ reqs = Reqs} = State) ->
do_trace("Detected end of chunked transfer...~n", []),
DataRecvd_1 = case DelCrlf of
false ->
@@ -1101,12 +1142,14 @@ parse_11_response(DataRecvd,
<<$\r, $\n, DataRecvd/binary>>
end,
case scan_header(Trailer, DataRecvd_1) of
- {yes, _TEHeaders, Rem} ->
+ {yes, TEHeaders, Rem} ->
{_, Reqs_1} = queue:out(Reqs),
- State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}),
- parse_response(Rem, reset_state(State_1));
+ State_1 = maybe_accumulate_ce_data(State, <<TEHeaders/binary, $\r, $\n>>),
+ State_2 = handle_response(CurReq,
+ State_1#state{reqs = Reqs_1}),
+ parse_response(Rem, reset_state(State_2));
{no, Rem} ->
- State#state{chunk_size_buffer = Rem, deleted_crlf = false}
+ accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false})
end;
%% This clause extracts a chunk, given the size.
@@ -1121,7 +1164,7 @@ parse_11_response(DataRecvd,
case DataLen >= NeedBytes of
true ->
{RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
- do_trace("Recvd another chunk...~n", []),
+ do_trace("Recvd another chunk...~p~n", [RemChunk]),
do_trace("RemData -> ~p~n", [RemData]),
case accumulate_response(RemChunk, State) of
{error, Reason} ->
@@ -1155,6 +1198,11 @@ parse_11_response(DataRecvd,
accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
end.
+maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) ->
+ State;
+maybe_accumulate_ce_data(State, Data) ->
+ accumulate_response(Data, State).
+
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
save_response_to_file = SaveResponseToFile,
@@ -1177,11 +1225,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
true ->
- {ok, Status_line, Raw_headers, ResponseBody};
+ {ok, Status_line, Raw_headers_1, ResponseBody};
false ->
- {ok, SCode, RespHeaders, ResponseBody}
+ {ok, SCode, Resp_headers_1, ResponseBody}
end,
State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
@@ -1192,16 +1241,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
#state{http_status_code = SCode,
status_line = Status_line,
raw_headers = Raw_headers,
- recvd_headers = RespHeaders,
+ recvd_headers = Resp_headers,
reply_buffer = RepBuf,
send_timer = ReqTimer} = State) ->
Body = RepBuf,
%% State_1 = set_cur_request(State),
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
true ->
- {ok, Status_line, Raw_headers, Body};
+ {ok, Status_line, Raw_headers_1, Body};
false ->
- {ok, SCode, RespHeaders, Body}
+ {ok, SCode, Resp_headers_1, Body}
end,
State_1 = case get(conn_close) of
"close" ->
@@ -1227,7 +1277,8 @@ reset_state(State) ->
deleted_crlf = false,
http_status_code = undefined,
chunk_size = undefined,
- transfer_encoding = undefined}.
+ transfer_encoding = undefined
+ }.
set_cur_request(#state{reqs = Reqs} = State) ->
case queue:to_list(Reqs) of
@@ -1459,15 +1510,29 @@ send_async_headers(_ReqId, undefined, _, _State) ->
ok;
send_async_headers(ReqId, StreamTo, Give_raw_headers,
#state{status_line = Status_line, raw_headers = Raw_headers,
- recvd_headers = Headers, http_status_code = StatCode
- }) ->
+ recvd_headers = Headers, http_status_code = StatCode,
+ cur_req = #request{options = Opts}
+ }) ->
+ {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
case Give_raw_headers of
false ->
- catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers};
+ catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
true ->
- catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers}
+ catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
end.
+maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
+ Custom_headers = get_value(add_custom_headers, Opts, []),
+ Headers_1 = Headers ++ Custom_headers,
+ Raw_headers_1 = case Custom_headers of
+ [_ | _] when is_binary(Raw_headers) ->
+ Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")),
+ <<Raw_headers/binary, "\r\n", Custom_headers_bin/binary>>;
+ _ ->
+ Raw_headers
+ end,
+ {Headers_1, Raw_headers_1}.
+
format_response_data(Resp_format, Body) ->
case Resp_format of
list when is_list(Body) ->