summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2010-09-24 14:18:56 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2010-09-24 14:18:56 +0000
commit796a41761839395c6e08667a33f17a84893ae637 (patch)
tree3a396e29813e897ecd839fc2128cdab0b8e92d24
parenta56f5e49f01de3aeb631a263a8801d6c2c677d89 (diff)
Upgrading ibrowse from version 1.6.2 to 2.0.1.
This version fixes a serious issue regarding streaming of chunked HTTP(S) responses. The issue is that the client occasionally gets blocked or receives a timeout (if inactivity_timeout parameter is given to ibrowse). This fixes part of ticket COUCHDB-491. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1000880 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl2
-rw-r--r--src/couchdb/couch_rep_httpc.erl2
-rw-r--r--src/ibrowse/Makefile.am2
-rw-r--r--src/ibrowse/ibrowse.app.in2
-rw-r--r--src/ibrowse/ibrowse.erl78
-rw-r--r--src/ibrowse/ibrowse_http_client.erl227
-rw-r--r--src/ibrowse/ibrowse_lb.erl23
-rw-r--r--src/ibrowse/ibrowse_test.erl51
8 files changed, 281 insertions, 106 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index 3f6079e7..7f7d3a38 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -187,7 +187,7 @@ handle_cast(_Msg, State) ->
handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #state{reqid=Id}=State) ->
handle_headers(list_to_integer(Code), Hdrs, State);
-handle_info({ibrowse_async_response, Id, {error,connection_closed}},
+handle_info({ibrowse_async_response, Id, {error, sel_conn_closed}},
#state{reqid=Id}=State) ->
handle_retry(State);
diff --git a/src/couchdb/couch_rep_httpc.erl b/src/couchdb/couch_rep_httpc.erl
index a47c26a3..06d4748a 100644
--- a/src/couchdb/couch_rep_httpc.erl
+++ b/src/couchdb/couch_rep_httpc.erl
@@ -168,7 +168,7 @@ process_response({error, Reason}, Req) ->
pause = Pause
} = Req,
ShortReason = case Reason of
- connection_closed ->
+ sel_conn_closed ->
connection_closed;
{'EXIT', {noproc, _}} ->
noproc;
diff --git a/src/ibrowse/Makefile.am b/src/ibrowse/Makefile.am
index b5174862..39878f0a 100644
--- a/src/ibrowse/Makefile.am
+++ b/src/ibrowse/Makefile.am
@@ -10,7 +10,7 @@
## License for the specific language governing permissions and limitations under
## the License.
-ibrowseebindir = $(localerlanglibdir)/ibrowse-1.6.2/ebin
+ibrowseebindir = $(localerlanglibdir)/ibrowse-2.0.1/ebin
ibrowse_file_collection = \
ibrowse.app.in \
diff --git a/src/ibrowse/ibrowse.app.in b/src/ibrowse/ibrowse.app.in
index 208c311b..8fc20663 100644
--- a/src/ibrowse/ibrowse.app.in
+++ b/src/ibrowse/ibrowse.app.in
@@ -1,6 +1,6 @@
{application, ibrowse,
[{description, "HTTP client application"},
- {vsn, "1.6.2"},
+ {vsn, "2.0.1"},
{modules, [ ibrowse,
ibrowse_http_client,
ibrowse_app,
diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl
index 09d36a36..7f8d8bcf 100644
--- a/src/ibrowse/ibrowse.erl
+++ b/src/ibrowse/ibrowse.erl
@@ -7,7 +7,7 @@
%%%-------------------------------------------------------------------
%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
%% @copyright 2005-2010 Chandrashekhar Mullaparthi
-%% @version 1.6.0
+%% @version 2.0.1
%% @doc The ibrowse application implements an HTTP 1.1 client. This
%% module implements the API of the HTTP client. There is one named
%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
@@ -236,6 +236,11 @@ send_req(Url, Headers, Method, Body) ->
%% caller to get access to the raw status line and raw unparsed
%% headers. Not quite sure why someone would want this, but one of my
%% users asked for it, so here it is. </li>
+%%
+%% <li> The <code>preserve_chunked_encoding</code> option enables the caller
+%% to receive the raw data stream when the Transfer-Encoding of the server
+%% response is Chunked.
+%% </li>
%% </ul>
%%
%% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
@@ -266,7 +271,8 @@ send_req(Url, Headers, Method, Body) ->
%% {socket_options, Sock_opts} |
%% {transfer_encoding, {chunked, ChunkSize}} |
%% {headers_as_is, boolean()} |
-%% {give_raw_headers, boolean()}
+%% {give_raw_headers, boolean()} |
+%% {preserve_chunked_encoding,boolean()}
%%
%% stream_to() = process() | {process(), once}
%% process() = pid() | atom()
@@ -302,23 +308,45 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Options_1 = merge_options(Host, Port, Options),
{SSLOptions, IsSSL} =
case (Protocol == https) orelse
- get_value(is_ssl, Options_1, false) of
+ get_value(is_ssl, Options_1, false) of
false -> {[], false};
true -> {get_value(ssl_options, Options_1, []), true}
end,
- case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
+ try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, 0);
+ Err ->
+ {error, {url_parsing_failed, Err}}
+ end.
+
+try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 ->
+ case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL}) of
- {ok, Conn_Pid} ->
- do_send_req(Conn_Pid, Parsed_url, Headers,
- Method, Body, Options_1, Timeout);
- Err ->
- Err
+ {ok, Conn_Pid} ->
+ case do_send_req(Conn_Pid, Parsed_url, Headers,
+ Method, Body, Options_1, Timeout) of
+ {error, sel_conn_closed} ->
+ try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, Try_count + 1);
+ Res ->
+ Res
end;
Err ->
- {error, {url_parsing_failed, Err}}
- end.
+ Err
+ end;
+try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
+ {error, retry_later}.
merge_options(Host, Port, Options) ->
Config_options = get_config_value({options, Host, Port}, []),
@@ -337,11 +365,27 @@ get_lb_pid(Url) ->
get_max_sessions(Host, Port, Options) ->
get_value(max_sessions, Options,
- get_config_value({max_sessions, Host, Port}, ?DEF_MAX_SESSIONS)).
+ get_config_value({max_sessions, Host, Port},
+ default_max_sessions())).
get_max_pipeline_size(Host, Port, Options) ->
get_value(max_pipeline_size, Options,
- get_config_value({max_pipeline_size, Host, Port}, ?DEF_MAX_PIPELINE_SIZE)).
+ get_config_value({max_pipeline_size, Host, Port},
+ default_max_pipeline_size())).
+
+default_max_sessions() ->
+ safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS).
+
+default_max_pipeline_size() ->
+ safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE).
+
+safe_get_env(App, Key, Def_val) ->
+ case application:get_env(App, Key) of
+ undefined ->
+ Def_val;
+ {ok, Val} ->
+ Val
+ end.
%% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
%% for achieving the same effect.
@@ -375,6 +419,10 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
Options, Timeout) of
{'EXIT', {timeout, _}} ->
{error, req_timedout};
+ {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
+ {error, sel_conn_closed};
+ {error, connection_closed} ->
+ {error, sel_conn_closed};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
{ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
@@ -684,6 +732,10 @@ handle_call({get_lb_pid, #url{host = Host, port = Port} = Url}, _From, State) ->
handle_call(stop, _From, State) ->
do_trace("IBROWSE shutting down~n", []),
+ ets:foldl(fun(#lb_pid{pid = Pid}, Acc) ->
+ ibrowse_lb:stop(Pid),
+ Acc
+ end, [], ibrowse_lb),
{stop, normal, ok, State};
handle_call({set_config_value, Key, Val}, _From, State) ->
diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl
index 1633e5b3..16d9b872 100644
--- a/src/ibrowse/ibrowse_http_client.erl
+++ b/src/ibrowse/ibrowse_http_client.erl
@@ -47,7 +47,8 @@
status_line, raw_headers,
is_closing, send_timer, content_length,
deleted_crlf = false, transfer_encoding,
- chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
+ chunk_size, chunk_size_buffer = <<>>,
+ recvd_chunk_size, interim_reply_sent = false,
lb_ets_tid, cur_pipeline_size = 0, prev_req_id
}).
@@ -57,7 +58,7 @@
req_id,
stream_chunk_size,
save_response_to_file = false,
- tmp_file_name, tmp_file_fd,
+ tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
response_format}).
-import(ibrowse_lib, [
@@ -82,8 +83,13 @@ start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).
stop(Conn_pid) ->
- catch gen_server:call(Conn_pid, stop),
- ok.
+ case catch gen_server:call(Conn_pid, stop) of
+ {'EXIT', {timeout, _}} ->
+ exit(Conn_pid, kill),
+ ok;
+ _ ->
+ ok
+ end.
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
gen_server:call(
@@ -171,6 +177,7 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
+%% io:format("Recvd data: ~p~n", [Data]),
do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
handle_sock_data(Data, State);
handle_info({ssl, _Sock, Data}, State) ->
@@ -178,13 +185,14 @@ handle_info({ssl, _Sock, Data}, State) ->
handle_info({stream_next, Req_id}, #state{socket = Socket,
cur_req = #request{req_id = Req_id}} = State) ->
+ %% io:format("Client process set {active, once}~n", []),
do_setopts(Socket, [{active, once}], State),
{noreply, State};
handle_info({stream_next, _Req_id}, State) ->
{noreply, State};
-handle_info({tcp_closed, _Sock}, State) ->
+handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
{stop, normal, State};
@@ -194,11 +202,11 @@ handle_info({ssl_closed, _Sock}, State) ->
{stop, normal, State};
handle_info({tcp_error, _Sock}, State) ->
- io:format("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+ do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State),
{stop, normal, State};
handle_info({ssl_error, _Sock}, State) ->
- io:format("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+ do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State),
{stop, normal, State};
@@ -233,7 +241,8 @@ handle_info(Info, State) ->
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
- do_close(State).
+ do_close(State),
+ ok.
%%--------------------------------------------------------------------
%% Func: code_change/3
@@ -269,6 +278,7 @@ handle_sock_data(Data, #state{status = get_header}=State) ->
end;
handle_sock_data(Data, #state{status = get_body,
+ socket = Socket,
content_length = CL,
http_status_code = StatCode,
recvd_headers = Headers,
@@ -293,6 +303,19 @@ handle_sock_data(Data, #state{status = get_body,
fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
+ #state{cur_req = #request{caller_controls_socket = Ccs},
+ interim_reply_sent = Irs} = State_1 ->
+ case Irs of
+ true ->
+ active_once(State_1);
+ false when Ccs == true ->
+ do_setopts(Socket, [{active, once}], State);
+ false ->
+ active_once(State_1)
+ end,
+ State_2 = State_1#state{interim_reply_sent = false},
+ set_inac_timer(State_2),
+ {noreply, State_2};
State_1 ->
active_once(State_1),
set_inac_timer(State_1),
@@ -338,17 +361,25 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Srtf
{error, Reason} ->
{error, {file_write_error, Reason}}
end;
-accumulate_response(<<>>, State) ->
- State;
-accumulate_response(Data, #state{reply_buffer = RepBuf,
- rep_buf_size = RepBufSize,
- streamed_size = Streamed_size,
- cur_req = CurReq}=State) ->
- #request{stream_to=StreamTo, req_id=ReqId,
- stream_chunk_size = Stream_chunk_size,
- response_format = Response_format,
- caller_controls_socket = Caller_controls_socket} = CurReq,
- RepBuf_1 = list_to_binary([RepBuf, Data]),
+%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket = Ccs},
+%% socket = Socket} = State) ->
+%% case Ccs of
+%% true ->
+%% do_setopts(Socket, [{active, once}], State);
+%% false ->
+%% ok
+%% end,
+%% State;
+accumulate_response(Data, #state{reply_buffer = RepBuf,
+ rep_buf_size = RepBufSize,
+ streamed_size = Streamed_size,
+ cur_req = CurReq}=State) ->
+ #request{stream_to = StreamTo,
+ req_id = ReqId,
+ stream_chunk_size = Stream_chunk_size,
+ response_format = Response_format,
+ caller_controls_socket = Caller_controls_socket} = CurReq,
+ RepBuf_1 = <<RepBuf/binary, Data/binary>>,
New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
@@ -356,15 +387,21 @@ accumulate_response(Data, #state{reply_buffer = RepBuf,
_ when Caller_controls_socket == true ->
do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
State#state{reply_buffer = <<>>,
+ interim_reply_sent = true,
streamed_size = Streamed_size + size(RepBuf_1)};
_ when New_data_size >= Stream_chunk_size ->
{Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
- accumulate_response(
- Rem_data,
- State#state{
- reply_buffer = <<>>,
- streamed_size = Streamed_size + Stream_chunk_size});
+ State_1 = State#state{
+ reply_buffer = <<>>,
+ interim_reply_sent = true,
+ streamed_size = Streamed_size + Stream_chunk_size},
+ case Rem_data of
+ <<>> ->
+ State_1;
+ _ ->
+ accumulate_response(Rem_data, State_1)
+ end;
_ ->
State#state{reply_buffer = RepBuf_1}
end.
@@ -498,9 +535,9 @@ do_close(#state{socket = Sock,
is_ssl = true,
use_proxy = true,
proxy_tunnel_setup = Pts
- }) when Pts /= done -> gen_tcp:close(Sock);
-do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock);
-do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock).
+ }) when Pts /= done -> catch gen_tcp:close(Sock);
+do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock);
+do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock).
active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
ok;
@@ -542,25 +579,17 @@ send_req_1(From,
end,
State_2 = check_ssl_options(Options, State_1),
do_trace("Connecting...~n", []),
- Start_ts = now(),
Conn_timeout = get_value(connect_timeout, Options, Timeout),
case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
{ok, Sock} ->
- do_trace("Connected!~n", []),
- End_ts = now(),
- Timeout_1 = case Timeout of
- infinity ->
- infinity;
- _ ->
- Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000))
- end,
+ do_trace("Connected! Socket: ~1000.p~n", [Sock]),
State_3 = State_2#state{socket = Sock,
connect_timeout = Conn_timeout},
- send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
+ send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
Err ->
shutting_down(State_2),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
- gen_server:reply(From, {error, conn_failed}),
+ gen_server:reply(From, {error, {conn_failed, Err}}),
{stop, normal, State_2}
end;
@@ -580,8 +609,9 @@ send_req_1(From,
use_proxy = true,
is_ssl = true} = State) ->
NewReq = #request{
- method = connect,
- options = Options
+ method = connect,
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
+ options = Options
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@@ -611,13 +641,13 @@ send_req_1(From,
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
@@ -666,7 +696,9 @@ send_req_1(From,
save_response_to_file = SaveResponseToFile,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
- from = From},
+ from = From,
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
+ },
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
{Req, Body_1} = make_request(Method,
@@ -705,13 +737,13 @@ send_req_1(From,
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end.
@@ -768,14 +800,14 @@ http_auth_digest(Username, Password) ->
ibrowse_lib:encode_base64(Username ++ [$: | Password]).
make_request(Method, Headers, AbsPath, RelPath, Body, Options,
- #state{use_proxy = UseProxy}) ->
+ #state{use_proxy = UseProxy, is_ssl = Is_ssl}) ->
HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
Headers_1 =
case get_value(content_length, Headers, false) of
false when (Body == []) or
- (Body == <<>>) or
- is_tuple(Body) or
- is_function(Body) ->
+ (Body == <<>>) or
+ is_tuple(Body) or
+ is_function(Body) ->
Headers;
false when is_binary(Body) ->
[{"content-length", integer_to_list(size(Body))} | Headers];
@@ -799,7 +831,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options,
Headers_3 = cons_headers(Headers_2),
Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
true ->
- AbsPath;
+ case Is_ssl of
+ true ->
+ RelPath;
+ false ->
+ AbsPath
+ end;
false ->
RelPath
end,
@@ -1017,7 +1054,7 @@ upgrade_to_ssl(#state{socket = Socket,
send_queued_requests(lists:reverse(Q), State_1);
Err ->
do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]),
- do_error_reply(State, {error, send_failed}),
+ do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed}
end.
@@ -1029,12 +1066,12 @@ send_queued_requests([{From, Url, Headers, Method, Body, Options, Timeout} | Q],
case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of
{noreply, State_1} ->
send_queued_requests(Q, State_1);
- _ ->
+ Err ->
do_trace("Error sending queued SSL request: ~n"
"URL : ~s~n"
"Method : ~p~n"
"Headers : ~p~n", [Url, Method, Headers]),
- do_error_reply(State, {error, send_failed}),
+ do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed}
end.
@@ -1046,11 +1083,12 @@ is_connection_closing(_, _) -> false.
%% This clause determines the chunk size when given data from the beginning of the chunk
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
- chunk_size = chunk_start,
+ chunk_size = chunk_start,
chunk_size_buffer = Chunk_sz_buf
} = State) ->
case scan_crlf(Chunk_sz_buf, DataRecvd) of
{yes, ChunkHeader, Data_1} ->
+ State_1 = maybe_accumulate_ce_data(State, <<ChunkHeader/binary, $\r, $\n>>),
ChunkSize = parse_chunk_header(ChunkHeader),
%%
%% Do we have to preserve the chunk encoding when
@@ -1061,10 +1099,10 @@ parse_11_response(DataRecvd,
RemLen = size(Data_1),
do_trace("Determined chunk size: ~p. Already recvd: ~p~n",
[ChunkSize, RemLen]),
- parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>,
- deleted_crlf = true,
- recvd_chunk_size = 0,
- chunk_size = ChunkSize});
+ parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>,
+ deleted_crlf = true,
+ recvd_chunk_size = 0,
+ chunk_size = ChunkSize});
{no, Data_1} ->
State#state{chunk_size_buffer = Data_1}
end;
@@ -1074,13 +1112,15 @@ parse_11_response(DataRecvd,
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
chunk_size = tbd,
- chunk_size_buffer = Buf}=State) ->
+ chunk_size_buffer = Buf
+ } = State) ->
case scan_crlf(Buf, DataRecvd) of
{yes, _, NextChunk} ->
- State_1 = State#state{chunk_size = chunk_start,
- chunk_size_buffer = <<>>,
- deleted_crlf = true},
- parse_11_response(NextChunk, State_1);
+ State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>),
+ State_2 = State_1#state{chunk_size = chunk_start,
+ chunk_size_buffer = <<>>,
+ deleted_crlf = true},
+ parse_11_response(NextChunk, State_2);
{no, Data_1} ->
State#state{chunk_size_buffer = Data_1}
end;
@@ -1090,9 +1130,10 @@ parse_11_response(DataRecvd,
%% received is silently discarded.
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked, chunk_size = 0,
- cur_req = CurReq,
- deleted_crlf = DelCrlf,
- chunk_size_buffer = Trailer, reqs = Reqs}=State) ->
+ cur_req = CurReq,
+ deleted_crlf = DelCrlf,
+ chunk_size_buffer = Trailer,
+ reqs = Reqs} = State) ->
do_trace("Detected end of chunked transfer...~n", []),
DataRecvd_1 = case DelCrlf of
false ->
@@ -1101,12 +1142,14 @@ parse_11_response(DataRecvd,
<<$\r, $\n, DataRecvd/binary>>
end,
case scan_header(Trailer, DataRecvd_1) of
- {yes, _TEHeaders, Rem} ->
+ {yes, TEHeaders, Rem} ->
{_, Reqs_1} = queue:out(Reqs),
- State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}),
- parse_response(Rem, reset_state(State_1));
+ State_1 = maybe_accumulate_ce_data(State, <<TEHeaders/binary, $\r, $\n>>),
+ State_2 = handle_response(CurReq,
+ State_1#state{reqs = Reqs_1}),
+ parse_response(Rem, reset_state(State_2));
{no, Rem} ->
- State#state{chunk_size_buffer = Rem, deleted_crlf = false}
+ accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false})
end;
%% This clause extracts a chunk, given the size.
@@ -1121,7 +1164,7 @@ parse_11_response(DataRecvd,
case DataLen >= NeedBytes of
true ->
{RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
- do_trace("Recvd another chunk...~n", []),
+ do_trace("Recvd another chunk...~p~n", [RemChunk]),
do_trace("RemData -> ~p~n", [RemData]),
case accumulate_response(RemChunk, State) of
{error, Reason} ->
@@ -1155,6 +1198,11 @@ parse_11_response(DataRecvd,
accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
end.
+maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) ->
+ State;
+maybe_accumulate_ce_data(State, Data) ->
+ accumulate_response(Data, State).
+
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
save_response_to_file = SaveResponseToFile,
@@ -1177,11 +1225,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
true ->
- {ok, Status_line, Raw_headers, ResponseBody};
+ {ok, Status_line, Raw_headers_1, ResponseBody};
false ->
- {ok, SCode, RespHeaders, ResponseBody}
+ {ok, SCode, Resp_headers_1, ResponseBody}
end,
State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
@@ -1192,16 +1241,17 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
#state{http_status_code = SCode,
status_line = Status_line,
raw_headers = Raw_headers,
- recvd_headers = RespHeaders,
+ recvd_headers = Resp_headers,
reply_buffer = RepBuf,
send_timer = ReqTimer} = State) ->
Body = RepBuf,
%% State_1 = set_cur_request(State),
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
true ->
- {ok, Status_line, Raw_headers, Body};
+ {ok, Status_line, Raw_headers_1, Body};
false ->
- {ok, SCode, RespHeaders, Body}
+ {ok, SCode, Resp_headers_1, Body}
end,
State_1 = case get(conn_close) of
"close" ->
@@ -1227,7 +1277,8 @@ reset_state(State) ->
deleted_crlf = false,
http_status_code = undefined,
chunk_size = undefined,
- transfer_encoding = undefined}.
+ transfer_encoding = undefined
+ }.
set_cur_request(#state{reqs = Reqs} = State) ->
case queue:to_list(Reqs) of
@@ -1459,15 +1510,29 @@ send_async_headers(_ReqId, undefined, _, _State) ->
ok;
send_async_headers(ReqId, StreamTo, Give_raw_headers,
#state{status_line = Status_line, raw_headers = Raw_headers,
- recvd_headers = Headers, http_status_code = StatCode
- }) ->
+ recvd_headers = Headers, http_status_code = StatCode,
+ cur_req = #request{options = Opts}
+ }) ->
+ {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
case Give_raw_headers of
false ->
- catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers};
+ catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
true ->
- catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers}
+ catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
end.
+maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
+ Custom_headers = get_value(add_custom_headers, Opts, []),
+ Headers_1 = Headers ++ Custom_headers,
+ Raw_headers_1 = case Custom_headers of
+ [_ | _] when is_binary(Raw_headers) ->
+ Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")),
+ <<Raw_headers/binary, "\r\n", Custom_headers_bin/binary>>;
+ _ ->
+ Raw_headers
+ end,
+ {Headers_1, Raw_headers_1}.
+
format_response_data(Resp_format, Body) ->
case Resp_format of
list when is_list(Body) ->
diff --git a/src/ibrowse/ibrowse_lb.erl b/src/ibrowse/ibrowse_lb.erl
index 6bc600be..0e001d48 100644
--- a/src/ibrowse/ibrowse_lb.erl
+++ b/src/ibrowse/ibrowse_lb.erl
@@ -16,7 +16,8 @@
%% External exports
-export([
start_link/1,
- spawn_connection/5
+ spawn_connection/5,
+ stop/1
]).
%% gen_server callbacks
@@ -85,6 +86,14 @@ spawn_connection(Lb_pid, Url,
is_integer(Max_sessions) ->
gen_server:call(Lb_pid,
{spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}).
+
+stop(Lb_pid) ->
+ case catch gen_server:call(Lb_pid, stop) of
+ {'EXIT', {timeout, _}} ->
+ exit(Lb_pid, kill);
+ ok ->
+ ok
+ end.
%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
@@ -120,6 +129,18 @@ handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From,
ets:insert(Tid, {{1, Pid}, []}),
{reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}};
+handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
+ gen_server:reply(_From, ok),
+ {stop, normal, State};
+
+handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
+ ets:foldl(fun({{_, Pid}, _}, Acc) ->
+ ibrowse_http_client:stop(Pid),
+ Acc
+ end, [], Tid),
+ gen_server:reply(_From, ok),
+ {stop, normal, State};
+
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
{reply, Reply, State}.
diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl
index 00b0244f..e7d6e59e 100644
--- a/src/ibrowse/ibrowse_test.erl
+++ b/src/ibrowse/ibrowse_test.erl
@@ -17,6 +17,7 @@
ue_test/1,
verify_chunked_streaming/0,
verify_chunked_streaming/1,
+ test_chunked_streaming_once/0,
i_do_async_req_list/4,
test_stream_once/3,
test_stream_once/4
@@ -260,7 +261,20 @@ verify_chunked_streaming(Options) ->
io:format("Fetching data with streaming as binary...~n", []),
Async_response_bin = do_async_req_list(
Url, get, [{response_format, binary} | Options]),
- compare_responses(Result_without_streaming, Async_response_list, Async_response_bin).
+ io:format("Fetching data with streaming as binary, {active, once}...~n", []),
+ Async_response_bin_once = do_async_req_list(
+ Url, get, [once, {response_format, binary} | Options]),
+ compare_responses(Result_without_streaming, Async_response_list, Async_response_bin),
+ compare_responses(Result_without_streaming, Async_response_list, Async_response_bin_once).
+
+test_chunked_streaming_once() ->
+ test_chunked_streaming_once([]).
+
+test_chunked_streaming_once(Options) ->
+ Url = "http://www.httpwatch.com/httpgallery/chunked/",
+ io:format("URL: ~s~n", [Url]),
+ io:format("Fetching data with streaming as binary, {active, once}...~n", []),
+ do_async_req_list(Url, get, [once, {response_format, binary} | Options]).
compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) ->
success;
@@ -313,31 +327,54 @@ wait_for_resp(Pid) ->
Msg ->
io:format("Recvd unknown message: ~p~n", [Msg]),
wait_for_resp(Pid)
- after 10000 ->
+ after 100000 ->
{error, timeout}
end.
i_do_async_req_list(Parent, Url, Method, Options) ->
- Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]),
+ Options_1 = case lists:member(once, Options) of
+ true ->
+ [{stream_to, {self(), once}} | (Options -- [once])];
+ false ->
+ [{stream_to, self()} | Options]
+ end,
+ Res = ibrowse:send_req(Url, [], Method, [], Options_1),
case Res of
{ibrowse_req_id, Req_id} ->
- Result = wait_for_async_resp(Req_id, undefined, undefined, []),
+ Result = wait_for_async_resp(Req_id, Options, undefined, undefined, []),
Parent ! {async_result, self(), Result};
Err ->
Parent ! {async_result, self(), Err}
end.
-wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) ->
+wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, Body) ->
receive
{ibrowse_async_headers, Req_id, StatCode, Headers} ->
- wait_for_async_resp(Req_id, StatCode, Headers, Body);
+ %% io:format("Recvd headers...~n", []),
+ maybe_stream_next(Req_id, Options),
+ wait_for_async_resp(Req_id, Options, StatCode, Headers, Body);
{ibrowse_async_response_end, Req_id} ->
+ io:format("Recvd end of response.~n", []),
Body_1 = list_to_binary(lists:reverse(Body)),
{ok, Acc_Stat_code, Acc_Headers, Body_1};
{ibrowse_async_response, Req_id, Data} ->
- wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]);
+ maybe_stream_next(Req_id, Options),
+ %% io:format("Recvd data...~n", []),
+ wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, [Data | Body]);
+ {ibrowse_async_response, Req_id, {error, _} = Err} ->
+ {ok, Acc_Stat_code, Acc_Headers, Err};
Err ->
{ok, Acc_Stat_code, Acc_Headers, Err}
+ after 10000 ->
+ {timeout, Acc_Stat_code, Acc_Headers, Body}
+ end.
+
+maybe_stream_next(Req_id, Options) ->
+ case lists:member(once, Options) of
+ true ->
+ ibrowse:stream_next(Req_id);
+ false ->
+ ok
end.
execute_req(Url, Method, Options) ->