summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-07-03 15:56:51 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-07-03 15:56:51 +0000
commita2ab68be599153add0fb8b3049f71afbae1a89a1 (patch)
treeb385afb7d056b259aee87a8b7b4456ff172ff1fb
parent2de233d67c05b32b1de6f2af18fefc56d8aad704 (diff)
ibrowse now allows user to control socket. Thanks again Chandru
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@790953 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/ibrowse/ibrowse.erl37
-rw-r--r--src/ibrowse/ibrowse_http_client.erl441
-rw-r--r--src/ibrowse/ibrowse_test.erl46
3 files changed, 289 insertions, 235 deletions
diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl
index 1b0daadd..0d3478b3 100644
--- a/src/ibrowse/ibrowse.erl
+++ b/src/ibrowse/ibrowse.erl
@@ -89,6 +89,7 @@
send_req_direct/5,
send_req_direct/6,
send_req_direct/7,
+ stream_next/1,
set_max_sessions/3,
set_max_pipeline_size/3,
set_dest/3,
@@ -150,7 +151,8 @@ stop() ->
%% respHeader() = {headerName(), headerValue()}
%% headerName() = string()
%% headerValue() = string()
-%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {error, Reason}
+%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
+%% req_id = term()
%% ResponseBody = string() | {file, Filename}
%% Reason = term()
send_req(Url, Headers, Method) ->
@@ -425,7 +427,20 @@ send_req_direct(Conn_pid, Url, Headers, Method, Body, Options, Timeout) ->
Err ->
{error, {url_parsing_failed, Err}}
end.
-
+
+%% @doc Tell ibrowse to stream the next chunk of data to the
+%% caller. Should be used in conjunction with the
+%% <code>stream_to</code> option
+%% @spec stream_next(Req_id :: req_id()) -> ok | {error, unknown_req_id}
+stream_next(Req_id) ->
+ case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
+ [] ->
+ {error, unknown_req_id};
+ [{_, Pid}] ->
+ catch Pid ! {stream_next, Req_id},
+ ok
+ end.
+
%% @doc Turn tracing on for the ibrowse process
trace_on() ->
ibrowse ! {trace, true}.
@@ -522,6 +537,7 @@ init(_) ->
put(ibrowse_trace_token, "ibrowse"),
ets:new(ibrowse_lb, [named_table, public, {keypos, 2}]),
ets:new(ibrowse_conf, [named_table, protected, {keypos, 2}]),
+ ets:new(ibrowse_stream, [named_table, public]),
import_config(),
{ok, #state{}}.
@@ -539,9 +555,9 @@ import_config(Filename) ->
{ok, Terms} ->
ets:delete_all_objects(ibrowse_conf),
Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options})
- when list(Host), integer(Port),
- integer(MaxSess), MaxSess > 0,
- integer(MaxPipe), MaxPipe > 0, list(Options) ->
+ when is_list(Host), is_integer(Port),
+ is_integer(MaxSess), MaxSess > 0,
+ is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
I = [{{max_sessions, Host, Port}, MaxSess},
{{max_pipeline_size, Host, Port}, MaxPipe},
{{options, Host, Port}, Options}],
@@ -641,13 +657,6 @@ handle_info(all_trace_off, State) ->
true ->
catch Pid ! {trace, false}
end;
- (#client_conn{key = {H, P, Pid}}, _) ->
- case lists:member({H, P}, Trace_on_dests) of
- false ->
- ok;
- true ->
- catch Pid ! {trace, false}
- end;
(_, Acc) ->
Acc
end,
@@ -664,10 +673,6 @@ handle_info({trace, Bool, Host, Port}, State) ->
when H == Host,
P == Port ->
catch Pid ! {trace, Bool};
- (#client_conn{key = {H, P, Pid}}, _)
- when H == Host,
- P == Port ->
- catch Pid ! {trace, Bool};
(_, Acc) ->
Acc
end,
diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl
index 24214ffb..3cacf391 100644
--- a/src/ibrowse/ibrowse_http_client.erl
+++ b/src/ibrowse/ibrowse_http_client.erl
@@ -47,11 +47,12 @@
is_closing, send_timer, content_length,
deleted_crlf = false, transfer_encoding,
chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
- lb_ets_tid, cur_pipeline_size = 0
+ lb_ets_tid, cur_pipeline_size = 0, prev_req_id
}).
-record(request, {url, method, options, from,
- stream_to, req_id,
+ stream_to, caller_controls_socket = false,
+ req_id,
stream_chunk_size,
save_response_to_file = false,
tmp_file_name, tmp_file_fd,
@@ -126,144 +127,15 @@ init({Host, Port}) ->
%%--------------------------------------------------------------------
%% Received a request when the remote server has already sent us a
%% Connection: Close header
-handle_call({send_req, _},
- _From,
- #state{is_closing=true}=State) ->
+handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
{reply, {error, connection_closing}, State};
handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
- From,
- #state{socket=undefined,
- host=Host, port=Port}=State) ->
- Resp_format = get_value(response_format, Options, list),
- {Host_1, Port_1, State_1} =
- case get_value(proxy_host, Options, false) of
- false ->
- {Host, Port, State};
- PHost ->
- ProxyUser = get_value(proxy_user, Options, []),
- ProxyPassword = get_value(proxy_password, Options, []),
- Digest = http_auth_digest(ProxyUser, ProxyPassword),
- {PHost, get_value(proxy_port, Options, 80),
- State#state{use_proxy = true,
- proxy_auth_digest = Digest}}
- end,
- StreamTo = get_value(stream_to, Options, undefined),
- ReqId = make_req_id(),
- SaveResponseToFile = get_value(save_response_to_file, Options, false),
- NewReq = #request{url=Url,
- method=Method,
- stream_to=StreamTo,
- options=Options,
- req_id=ReqId,
- save_response_to_file = SaveResponseToFile,
- stream_chunk_size = get_stream_chunk_size(Options),
- response_format = Resp_format,
- from=From},
- Reqs = queue:in(NewReq, State#state.reqs),
- State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}),
- do_trace("Connecting...~n", []),
- Start_ts = now(),
- Conn_timeout = get_value(connect_timeout, Options, Timeout),
- case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
- {ok, Sock} ->
- do_trace("Connected!~n", []),
- End_ts = now(),
- Ref = case Timeout of
- infinity ->
- undefined;
- _ ->
- Rem_time = Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)),
- case Rem_time > 0 of
- true ->
- erlang:send_after(Rem_time, self(), {req_timedout, From});
- false ->
- shutting_down(State_2),
- do_error_reply(State_2, req_timedout),
- exit(normal)
- end
- end,
- case send_req_1(Url, Headers, Method, Body, Options, Sock, State_2) of
- ok ->
- do_setopts(Sock, [{active, once}], State_2#state.is_ssl),
- case StreamTo of
- undefined ->
- ok;
- _ ->
- gen_server:reply(From, {ibrowse_req_id, ReqId})
- end,
- State_3 = inc_pipeline_counter(State_2#state{socket = Sock,
- send_timer = Ref,
- cur_req = NewReq,
- status = get_header}),
- {noreply, State_3, get_inac_timeout(State_3)};
- Err ->
- shutting_down(State_2),
- do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
- {stop, normal, State_2}
- end;
- Err ->
- shutting_down(State_2),
- do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
- gen_server:reply(From, {error, conn_failed}),
- {stop, normal, State_2}
- end;
-
-%% Request which is to be pipelined
-handle_call({send_req, {Url, Headers, Method,
- Body, Options, Timeout}},
- From,
- #state{socket=Sock, status=Status, reqs=Reqs}=State) ->
- do_trace("Recvd request in connected state. Status -> ~p NumPending: ~p~n", [Status, length(queue:to_list(Reqs))]),
- Resp_format = get_value(response_format, Options, list),
- StreamTo = get_value(stream_to, Options, undefined),
- SaveResponseToFile = get_value(save_response_to_file, Options, false),
- ReqId = make_req_id(),
- NewReq = #request{url=Url,
- stream_to=StreamTo,
- method=Method,
- options=Options,
- req_id=ReqId,
- save_response_to_file = SaveResponseToFile,
- stream_chunk_size = get_stream_chunk_size(Options),
- response_format = Resp_format,
- from=From},
- State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
- case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of
- ok ->
- State_2 = inc_pipeline_counter(State_1),
- do_setopts(Sock, [{active, once}], State#state.is_ssl),
- case Timeout of
- infinity ->
- ok;
- _ ->
- erlang:send_after(Timeout, self(), {req_timedout, From})
- end,
- State_3 = case Status of
- idle ->
- State_2#state{status = get_header,
- cur_req = NewReq};
- _ ->
- State_2
- end,
- case StreamTo of
- undefined ->
- ok;
- _ ->
- gen_server:reply(From, {ibrowse_req_id, ReqId})
- end,
- {noreply, State_3, get_inac_timeout(State_3)};
- Err ->
- shutting_down(State_1),
- do_trace("Send request failed: Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
- do_error_reply(State, send_failed),
- {stop, normal, State_1}
- end;
+ From, State) ->
+ send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
-handle_call(stop, _From, #state{socket = Socket, is_ssl = Is_ssl} = State) ->
- do_close(Socket, Is_ssl),
+handle_call(stop, _From, State) ->
+ do_close(State),
do_error_reply(State, closing_on_request),
{stop, normal, State};
@@ -294,6 +166,15 @@ handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
handle_info({ssl, _Sock, Data}, State) ->
handle_sock_data(Data, State);
+handle_info({stream_next, Req_id}, #state{socket = Socket,
+ is_ssl = Is_ssl,
+ cur_req = #request{req_id = Req_id}} = State) ->
+ do_setopts(Socket, [{active, once}], Is_ssl),
+ {noreply, State};
+
+handle_info({stream_next, _Req_id}, State) ->
+ {noreply, State};
+
handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
@@ -332,12 +213,7 @@ handle_info(Info, State) ->
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
- case State#state.socket of
- undefined ->
- ok;
- Sock ->
- do_close(Sock, State#state.is_ssl)
- end.
+ do_close(State).
%%--------------------------------------------------------------------
%% Func: code_change/3
@@ -358,10 +234,10 @@ handle_sock_data(Data, #state{status=idle}=State) ->
do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
shutting_down(State),
do_error_reply(State, data_in_status_idle),
- do_close(State#state.socket, State#state.is_ssl),
+ do_close(State),
{stop, normal, State};
-handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
+handle_sock_data(Data, #state{status = get_header}=State) ->
case parse_response(Data, State) of
{error, _Reason} ->
shutting_down(State),
@@ -370,14 +246,15 @@ handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
shutting_down(State),
{stop, normal, State};
State_1 ->
- do_setopts(Sock, [{active, once}], State#state.is_ssl),
+ active_once(State_1),
{noreply, State_1, get_inac_timeout(State_1)}
end;
-handle_sock_data(Data, #state{status=get_body, content_length=CL,
+handle_sock_data(Data, #state{status = get_body,
+ content_length = CL,
http_status_code = StatCode,
- recvd_headers=Headers,
- chunk_size=CSz, socket=Sock}=State) ->
+ recvd_headers = Headers,
+ chunk_size = CSz} = State) ->
case (CL == undefined) and (CSz == undefined) of
true ->
case accumulate_response(Data, State) of
@@ -387,7 +264,7 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
State_1 ->
- do_setopts(Sock, [{active, once}], State#state.is_ssl),
+ active_once(State_1),
{noreply, State_1, get_inac_timeout(State_1)}
end;
_ ->
@@ -401,7 +278,7 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
shutting_down(State),
{stop, normal, State};
State_1 ->
- do_setopts(Sock, [{active, once}], State#state.is_ssl),
+ active_once(State_1),
{noreply, State_1, get_inac_timeout(State_1)}
end
end.
@@ -452,22 +329,27 @@ accumulate_response(Data, #state{reply_buffer = RepBuf,
cur_req = CurReq}=State) ->
#request{stream_to=StreamTo, req_id=ReqId,
stream_chunk_size = Stream_chunk_size,
- response_format = Response_format} = CurReq,
+ response_format = Response_format,
+ caller_controls_socket = Caller_controls_socket} = CurReq,
RepBuf_1 = concat_binary([RepBuf, Data]),
New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
State#state{reply_buffer = RepBuf_1};
- _ when New_data_size < Stream_chunk_size ->
- State#state{reply_buffer = RepBuf_1};
- _ ->
+ _ when Caller_controls_socket == true ->
+ do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
+ State#state{reply_buffer = <<>>,
+ streamed_size = Streamed_size + size(RepBuf_1)};
+ _ when New_data_size >= Stream_chunk_size ->
{Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
accumulate_response(
Rem_data,
State#state{
reply_buffer = <<>>,
- streamed_size = Streamed_size + Stream_chunk_size})
+ streamed_size = Streamed_size + Stream_chunk_size});
+ _ ->
+ State#state{reply_buffer = RepBuf_1}
end.
make_tmp_filename() ->
@@ -528,37 +410,45 @@ do_connect(Host, Port, _Options, _State, Timeout) ->
[binary, {nodelay, true}, {active, false}],
Timeout).
-do_send(Sock, Req, true) -> ssl:send(Sock, Req);
-do_send(Sock, Req, false) -> gen_tcp:send(Sock, Req).
+do_send(Req, #state{socket = Sock, is_ssl = true}) -> ssl:send(Sock, Req);
+do_send(Req, #state{socket = Sock, is_ssl = false}) -> gen_tcp:send(Sock, Req).
%% @spec do_send_body(Sock::socket_descriptor(), Source::source_descriptor(), IsSSL::boolean()) -> ok | error()
%% source_descriptor() = fun_arity_0 |
%% {fun_arity_0} |
%% {fun_arity_1, term()}
%% error() = term()
-do_send_body(Sock, Source, IsSSL) when is_function(Source) ->
- do_send_body(Sock, {Source}, IsSSL);
-do_send_body(Sock, {Source}, IsSSL) when is_function(Source) ->
- do_send_body1(Sock, Source, IsSSL, Source());
-do_send_body(Sock, {Source, State}, IsSSL) when is_function(Source) ->
- do_send_body1(Sock, Source, IsSSL, Source(State));
-do_send_body(Sock, Body, IsSSL) ->
- do_send(Sock, Body, IsSSL).
-
-do_send_body1(Sock, Source, IsSSL, Resp) ->
+do_send_body(Source, State) when is_function(Source) ->
+ do_send_body({Source}, State);
+do_send_body({Source}, State) when is_function(Source) ->
+ do_send_body1(Source, Source(), State);
+do_send_body({Source, Source_state}, State) when is_function(Source) ->
+ do_send_body1(Source, Source(Source_state), State);
+do_send_body(Body, State) ->
+ do_send(Body, State).
+
+do_send_body1(Source, Resp, State) ->
case Resp of
{ok, Data} ->
- do_send(Sock, Data, IsSSL),
- do_send_body(Sock, {Source}, IsSSL);
- {ok, Data, NewState} ->
- do_send(Sock, Data, IsSSL),
- do_send_body(Sock, {Source, NewState}, IsSSL);
- eof -> ok;
- Err -> Err
+ do_send(Data, State),
+ do_send_body({Source}, State);
+ {ok, Data, New_source_state} ->
+ do_send(Data, State),
+ do_send_body({Source, New_source_state}, State);
+ eof ->
+ ok;
+ Err ->
+ Err
end.
-do_close(Sock, true) -> ssl:close(Sock);
-do_close(Sock, false) -> gen_tcp:close(Sock).
+do_close(#state{socket = undefined}) -> ok;
+do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock);
+do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock).
+
+active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
+ ok;
+active_once(#state{socket = Socket, is_ssl = Is_ssl}) ->
+ do_setopts(Socket, [{active, once}], Is_ssl).
do_setopts(Sock, Opts, true) -> ssl:setopts(Sock, Opts);
do_setopts(Sock, Opts, false) -> inet:setopts(Sock, Opts).
@@ -571,11 +461,81 @@ check_ssl_options(Options, State) ->
State#state{is_ssl=true, ssl_options=get_value(ssl_options, Options)}
end.
-send_req_1(#url{abspath = AbsPath,
- host = Host,
- port = Port,
- path = RelPath} = Url,
- Headers, Method, Body, Options, Sock, State) ->
+send_req_1(From,
+ #url{host = Host,
+ port = Port} = Url,
+ Headers, Method, Body, Options, Timeout,
+ #state{socket = undefined} = State) ->
+ {Host_1, Port_1, State_1} =
+ case get_value(proxy_host, Options, false) of
+ false ->
+ {Host, Port, State};
+ PHost ->
+ ProxyUser = get_value(proxy_user, Options, []),
+ ProxyPassword = get_value(proxy_password, Options, []),
+ Digest = http_auth_digest(ProxyUser, ProxyPassword),
+ {PHost, get_value(proxy_port, Options, 80),
+ State#state{use_proxy = true,
+ proxy_auth_digest = Digest}}
+ end,
+ State_2 = check_ssl_options(Options, State_1),
+ do_trace("Connecting...~n", []),
+ Start_ts = now(),
+ Conn_timeout = get_value(connect_timeout, Options, Timeout),
+ case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
+ {ok, Sock} ->
+ do_trace("Connected!~n", []),
+ End_ts = now(),
+ Timeout_1 = case Timeout of
+ infinity ->
+ infinity;
+ _ ->
+ Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000))
+ end,
+ State_3 = State_2#state{socket = Sock},
+ send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
+ Err ->
+ shutting_down(State_2),
+ do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
+ gen_server:reply(From, {error, conn_failed}),
+ {stop, normal, State_2}
+ end;
+send_req_1(From,
+ #url{abspath = AbsPath,
+ host = Host,
+ port = Port,
+ path = RelPath} = Url,
+ Headers, Method, Body, Options, Timeout,
+ #state{status = Status} = State) ->
+ ReqId = make_req_id(),
+ Resp_format = get_value(response_format, Options, list),
+ {StreamTo, Caller_controls_socket} =
+ case get_value(stream_to, Options, undefined) of
+ {Caller, once} when is_pid(Caller) or
+ is_atom(Caller) ->
+ Async_pid_rec = {{req_id_pid, ReqId}, self()},
+ true = ets:insert(ibrowse_stream, Async_pid_rec),
+ {Caller, true};
+ undefined ->
+ {undefined, false};
+ Caller when is_pid(Caller) or
+ is_atom(Caller) ->
+ {Caller, false};
+ Stream_to_inv ->
+ exit({invalid_option, {stream_to, Stream_to_inv}})
+ end,
+ SaveResponseToFile = get_value(save_response_to_file, Options, false),
+ NewReq = #request{url = Url,
+ method = Method,
+ stream_to = StreamTo,
+ caller_controls_socket = Caller_controls_socket,
+ options = Options,
+ req_id = ReqId,
+ save_response_to_file = SaveResponseToFile,
+ stream_chunk_size = get_stream_chunk_size(Options),
+ response_format = Resp_format,
+ from = From},
+ State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = add_auth_headers(Url, Options, Headers, State),
HostHeaderValue = case lists:keysearch(host_header, 1, Options) of
false ->
@@ -598,14 +558,45 @@ send_req_1(#url{abspath = AbsPath,
"--- Request End ---~n", [NReq]);
_ -> ok
end,
- SndRes = case do_send(Sock, Req, State#state.is_ssl) of
- ok -> do_send_body(Sock, Body_1, State#state.is_ssl);
- Err ->
- io:format("Err: ~p~n", [Err]),
- Err
- end,
- do_setopts(Sock, [{active, once}], State#state.is_ssl),
- SndRes.
+ case do_send(Req, State) of
+ ok ->
+ case do_send_body(Body_1, State) of
+ ok ->
+ State_2 = inc_pipeline_counter(State_1),
+ active_once(State_1),
+ Ref = case Timeout of
+ infinity ->
+ undefined;
+ _ ->
+ erlang:send_after(Timeout, self(), {req_timedout, From})
+ end,
+ State_3 = case Status of
+ idle ->
+ State_2#state{status = get_header,
+ cur_req = NewReq,
+ send_timer = Ref};
+ _ ->
+ State_2#state{send_timer = Ref}
+ end,
+ case StreamTo of
+ undefined ->
+ ok;
+ _ ->
+ gen_server:reply(From, {ibrowse_req_id, ReqId})
+ end,
+ {noreply, State_3, get_inac_timeout(State_3)};
+ Err ->
+ shutting_down(State_1),
+ do_trace("Send failed... Reason: ~p~n", [Err]),
+ gen_server:reply(From, {error, send_failed}),
+ {stop, normal, State_1}
+ end;
+ Err ->
+ shutting_down(State_1),
+ do_trace("Send failed... Reason: ~p~n", [Err]),
+ gen_server:reply(From, {error, send_failed}),
+ {stop, normal, State_1}
+ end.
add_auth_headers(#url{username = User,
password = UPw},
@@ -719,9 +710,9 @@ encode_headers(L) ->
encode_headers(L, []).
encode_headers([{http_vsn, _Val} | T], Acc) ->
encode_headers(T, Acc);
-encode_headers([{Name,Val} | T], Acc) when list(Name) ->
+encode_headers([{Name,Val} | T], Acc) when is_list(Name) ->
encode_headers(T, [[Name, ": ", fmt_val(Val), crnl()] | Acc]);
-encode_headers([{Name,Val} | T], Acc) when atom(Name) ->
+encode_headers([{Name,Val} | T], Acc) when is_atom(Name) ->
encode_headers(T, [[atom_to_list(Name), ": ", fmt_val(Val), crnl()] | Acc]);
encode_headers([], Acc) ->
lists:reverse(Acc).
@@ -732,25 +723,25 @@ chunk_request_body(Body, ChunkSize) ->
chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] ->
LastChunk = "0\r\n",
lists:reverse(["\r\n", LastChunk | Acc]);
-chunk_request_body(Body, ChunkSize, Acc) when binary(Body),
+chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body),
size(Body) >= ChunkSize ->
<<ChunkBody:ChunkSize/binary, Rest/binary>> = Body,
Chunk = [ibrowse_lib:dec2hex(4, ChunkSize),"\r\n",
ChunkBody, "\r\n"],
chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
-chunk_request_body(Body, _ChunkSize, Acc) when binary(Body) ->
+chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) ->
BodySize = size(Body),
Chunk = [ibrowse_lib:dec2hex(4, BodySize),"\r\n",
Body, "\r\n"],
LastChunk = "0\r\n",
lists:reverse(["\r\n", LastChunk, Chunk | Acc]);
-chunk_request_body(Body, ChunkSize, Acc) when list(Body),
+chunk_request_body(Body, ChunkSize, Acc) when is_list(Body),
length(Body) >= ChunkSize ->
{ChunkBody, Rest} = split_list_at(Body, ChunkSize),
Chunk = [ibrowse_lib:dec2hex(4, ChunkSize),"\r\n",
ChunkBody, "\r\n"],
chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
-chunk_request_body(Body, _ChunkSize, Acc) when list(Body) ->
+chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
BodySize = length(Body),
Chunk = [ibrowse_lib:dec2hex(4, BodySize),"\r\n",
Body, "\r\n"],
@@ -840,7 +831,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
{error, content_length_undefined};
V ->
case catch list_to_integer(V) of
- V_1 when integer(V_1), V_1 >= 0 ->
+ V_1 when is_integer(V_1), V_1 >= 0 ->
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
do_trace("Recvd Content-Length of ~p~n", [V_1]),
State_2 = State_1#state{rep_buf_size=0,
@@ -1058,17 +1049,20 @@ set_cur_request(#state{reqs = Reqs} = State) ->
parse_headers(Headers) ->
case scan_crlf(Headers) of
{yes, StatusLine, T} ->
- Headers_1 = parse_headers_1(T),
- case parse_status_line(StatusLine) of
- {ok, HttpVsn, StatCode, _Msg} ->
- put(http_prot_vsn, HttpVsn),
- {HttpVsn, StatCode, Headers_1};
- _ -> %% A HTTP 0.9 response?
- put(http_prot_vsn, "HTTP/0.9"),
- {"HTTP/0.9", undefined, Headers}
- end;
- _ ->
- {error, no_status_line}
+ parse_headers(StatusLine, T);
+ {no, StatusLine} ->
+ parse_headers(StatusLine, <<>>)
+ end.
+
+parse_headers(StatusLine, Headers) ->
+ Headers_1 = parse_headers_1(Headers),
+ case parse_status_line(StatusLine) of
+ {ok, HttpVsn, StatCode, _Msg} ->
+ put(http_prot_vsn, HttpVsn),
+ {HttpVsn, StatCode, Headers_1};
+ _ -> %% A HTTP 0.9 response?
+ put(http_prot_vsn, "HTTP/0.9"),
+ {"HTTP/0.9", undefined, Headers}
end.
% From RFC 2616
@@ -1079,10 +1073,10 @@ parse_headers(Headers) ->
% SP. A recipient MAY replace any linear white space with a single
% SP before interpreting the field value or forwarding the message
% downstream.
-parse_headers_1(B) when is_binary(B) ->
- parse_headers_1(binary_to_list(B));
-parse_headers_1(String) ->
- parse_headers_1(String, [], []).
+ parse_headers_1(B) when is_binary(B) ->
+ parse_headers_1(binary_to_list(B));
+ parse_headers_1(String) ->
+ parse_headers_1(String, [], []).
parse_headers_1([$\n, H |T], [$\r | L], Acc) when H == 32;
H == $\t ->
@@ -1205,10 +1199,10 @@ get_crlf_pos(<<>>, _) -> no.
%% scan_crlf([H|T], L) -> scan_crlf(T, [H|L]);
%% scan_crlf([], L) -> {no, L}.
-fmt_val(L) when list(L) -> L;
-fmt_val(I) when integer(I) -> integer_to_list(I);
-fmt_val(A) when atom(A) -> atom_to_list(A);
-fmt_val(Term) -> io_lib:format("~p", [Term]).
+fmt_val(L) when is_list(L) -> L;
+fmt_val(I) when is_integer(I) -> integer_to_list(I);
+fmt_val(A) when is_atom(A) -> atom_to_list(A);
+fmt_val(Term) -> io_lib:format("~p", [Term]).
crnl() -> "\r\n".
@@ -1306,7 +1300,8 @@ do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) -
do_reply(State, From, undefined, _, _, Msg) ->
gen_server:reply(From, Msg),
dec_pipeline_counter(State);
-do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
+do_reply(#state{prev_req_id = Prev_req_id} = State,
+ _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
State_1 = dec_pipeline_counter(State),
case Body of
[] ->
@@ -1316,7 +1311,18 @@ do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
catch StreamTo ! {ibrowse_async_response, ReqId, Body_1}
end,
catch StreamTo ! {ibrowse_async_response_end, ReqId},
- State_1;
+ %% We don't want to delete the Req-id to Pid mapping straightaway
+ %% as the client may send a stream_next message just while we are
+ %% sending back this ibrowse_async_response_end message. If we
+ %% deleted this mapping straightaway, the caller will see a
+ %% {error, unknown_req_id} when it calls ibrowse:stream_next/1. To
+ %% get around this, we store the req id, and clear it after the
+ %% next request. If there are wierd combinations of stream,
+ %% stream_once and sync requests on the same connection, it will
+ %% take a while for the req_id-pid mapping to get cleared, but it
+ %% should do no harm.
+ ets:delete(ibrowse_stream, {req_id_pid, Prev_req_id}),
+ State_1#state{prev_req_id = ReqId};
do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
State_1 = dec_pipeline_counter(State),
Msg_1 = format_response_data(Resp_format, Msg),
@@ -1333,6 +1339,7 @@ do_error_reply(#state{reqs = Reqs} = State, Err) ->
ReqList = queue:to_list(Reqs),
lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format}) ->
+ ets:delete(ibrowse_stream, {req_id_pid, ReqId}),
do_reply(State, From, StreamTo, ReqId, Resp_format, {error, Err})
end, ReqList).
diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl
index f3559b51..ad3e8126 100644
--- a/src/ibrowse/ibrowse_test.erl
+++ b/src/ibrowse/ibrowse_test.erl
@@ -18,9 +18,50 @@
ue_test/1,
verify_chunked_streaming/0,
verify_chunked_streaming/1,
- i_do_async_req_list/4
+ i_do_async_req_list/4,
+ test_stream_once/3,
+ test_stream_once/4
]).
+test_stream_once(Url, Method, Options) ->
+ test_stream_once(Url, Method, Options, 5000).
+
+test_stream_once(Url, Method, Options, Timeout) ->
+ case ibrowse:send_req(Url, [], Method, [], [{stream_to, {self(), once}} | Options], Timeout) of
+ {ibrowse_req_id, Req_id} ->
+ case ibrowse:stream_next(Req_id) of
+ ok ->
+ test_stream_once(Req_id);
+ Err ->
+ Err
+ end;
+ Err ->
+ Err
+ end.
+
+test_stream_once(Req_id) ->
+ receive
+ {ibrowse_async_headers, Req_id, StatCode, Headers} ->
+ io:format("Recvd headers~n~p~n", [{ibrowse_async_headers, Req_id, StatCode, Headers}]),
+ case ibrowse:stream_next(Req_id) of
+ ok ->
+ test_stream_once(Req_id);
+ Err ->
+ Err
+ end;
+ {ibrowse_async_response, Req_id, {error, Err}} ->
+ io:format("Recvd error: ~p~n", [Err]);
+ {ibrowse_async_response, Req_id, Body_1} ->
+ io:format("Recvd body part: ~n~p~n", [{ibrowse_async_response, Req_id, Body_1}]),
+ case ibrowse:stream_next(Req_id) of
+ ok ->
+ test_stream_once(Req_id);
+ Err ->
+ Err
+ end;
+ {ibrowse_async_response_end, Req_id} ->
+ ok
+ end.
%% Use ibrowse:set_max_sessions/3 and ibrowse:set_max_pipeline_size/3 to
%% tweak settings before running the load test. The defaults are 10 and 10.
load_test(Url, NumWorkers, NumReqsPerWorker) when is_list(Url),
@@ -182,7 +223,8 @@ unit_tests() ->
unit_tests([]).
unit_tests(Options) ->
- {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options]),
+ Options_1 = Options ++ [{connect_timeout, 5000}],
+ {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
receive
{done, Pid} ->
ok;