summaryrefslogtreecommitdiff
path: root/src/ibrowse
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-03-07 18:48:47 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-03-07 18:48:47 +0000
commitf7c2f1f59ef95d4c4976c56c1bbf718f8036ca87 (patch)
tree00c7c16650d31701746f6b944ae3e4ab070c3823 /src/ibrowse
parent5b9b9823e091b6e8720d3930785f59c424239daa (diff)
rewrite replicator using OTP behaviours
- only one instance of given source->target runs at a time - supervisor restarts replications that terminate abnormally - pull repl. streams attachments directly to disk - improved memory utilization - temporarily rollback parallel async doc GETs during pull rep. - replication updates show up in Futon Status window git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@751305 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/ibrowse')
-rw-r--r--src/ibrowse/ibrowse.erl19
-rw-r--r--src/ibrowse/ibrowse_http_client.erl298
-rw-r--r--src/ibrowse/ibrowse_test.erl109
3 files changed, 311 insertions, 115 deletions
diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl
index 4e6404ad..3390e58a 100644
--- a/src/ibrowse/ibrowse.erl
+++ b/src/ibrowse/ibrowse.erl
@@ -192,6 +192,8 @@ send_req(Url, Headers, Method, Body) ->
%% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
%% optionList() = [option()]
%% option() = {max_sessions, integer()} |
+%% {response_format,response_format()}|
+%% {stream_chunk_size, integer()} |
%% {max_pipeline_size, integer()} |
%% {trace, boolean()} |
%% {is_ssl, boolean()} |
@@ -219,7 +221,7 @@ send_req(Url, Headers, Method, Body) ->
%% ChunkSize = integer()
%% srtf() = boolean() | filename()
%% filename() = string()
-%%
+%% response_format() = list | binary
send_req(Url, Headers, Method, Body, Options) ->
send_req(Url, Headers, Method, Body, Options, 30000).
@@ -230,7 +232,8 @@ send_req(Url, Headers, Method, Body, Options) ->
send_req(Url, Headers, Method, Body, Options, Timeout) ->
case catch parse_url(Url) of
#url{host = Host,
- port = Port} = Parsed_url ->
+ port = Port,
+ protocol = Protocol} = Parsed_url ->
Lb_pid = case ets:lookup(ibrowse_lb, {Host, Port}) of
[] ->
get_lb_pid(Parsed_url);
@@ -241,9 +244,10 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Max_pipeline_size = get_max_pipeline_size(Host, Port, Options),
Options_1 = merge_options(Host, Port, Options),
{SSLOptions, IsSSL} =
- case get_value(is_ssl, Options_1, false) of
+ case (Protocol == https) orelse
+ get_value(is_ssl, Options_1, false) of
false -> {[], false};
- true -> {get_value(ssl_options, Options_1), true}
+ true -> {get_value(ssl_options, Options_1, []), true}
end,
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
@@ -316,6 +320,13 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
{error, req_timedout};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
+ {ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
+ case get_value(response_format, Options, list) of
+ list ->
+ {ok, St_code, Headers, binary_to_list(Body)};
+ binary ->
+ Ret
+ end;
Ret ->
Ret
end.
diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl
index 9a0e4d3b..9455bc20 100644
--- a/src/ibrowse/ibrowse_http_client.erl
+++ b/src/ibrowse/ibrowse_http_client.erl
@@ -38,19 +38,23 @@
-include("ibrowse.hrl").
--record(state, {host, port,
+-record(state, {host, port,
use_proxy = false, proxy_auth_digest,
- ssl_options = [], is_ssl = false, socket,
- reqs=queue:new(), cur_req, status=idle, http_status_code,
- reply_buffer=[], rep_buf_size=0, recvd_headers=[],
+ ssl_options = [], is_ssl = false, socket,
+ reqs=queue:new(), cur_req, status=idle, http_status_code,
+ reply_buffer=[], rep_buf_size=0, streamed_size = 0,
+ recvd_headers=[],
is_closing, send_timer, content_length,
- deleted_crlf = false, transfer_encoding, chunk_size,
- chunks=[], lb_ets_tid, cur_pipeline_size = 0}).
+ deleted_crlf = false, transfer_encoding, chunk_size,
+ chunks=[], lb_ets_tid, cur_pipeline_size = 0
+ }).
-record(request, {url, method, options, from,
stream_to, req_id,
- save_response_to_file = false,
- tmp_file_name, tmp_file_fd}).
+ stream_chunk_size,
+ save_response_to_file = false,
+ tmp_file_name, tmp_file_fd,
+ response_format}).
-import(ibrowse_lib, [
parse_url/1,
@@ -60,6 +64,8 @@
do_trace/2
]).
+-define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024).
+
%%====================================================================
%% External functions
%%====================================================================
@@ -127,15 +133,16 @@ init({Host, Port}) ->
%%--------------------------------------------------------------------
%% Received a request when the remote server has already sent us a
%% Connection: Close header
-handle_call({send_req, _},
+handle_call({send_req, _},
_From,
#state{is_closing=true}=State) ->
{reply, {error, connection_closing}, State};
-handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
+handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
From,
#state{socket=undefined,
host=Host, port=Port}=State) ->
+ Resp_format = get_value(response_format, Options, list),
{Host_1, Port_1, State_1} =
case get_value(proxy_host, Options, false) of
false ->
@@ -151,12 +158,14 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
StreamTo = get_value(stream_to, Options, undefined),
ReqId = make_req_id(),
SaveResponseToFile = get_value(save_response_to_file, Options, false),
- NewReq = #request{url=Url,
+ NewReq = #request{url=Url,
method=Method,
stream_to=StreamTo,
- options=Options,
+ options=Options,
req_id=ReqId,
save_response_to_file = SaveResponseToFile,
+ stream_chunk_size = get_stream_chunk_size(Options),
+ response_format = Resp_format,
from=From},
Reqs = queue:in(NewReq, State#state.reqs),
State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}),
@@ -208,15 +217,18 @@ handle_call({send_req, {Url, Headers, Method,
From,
#state{socket=Sock, status=Status, reqs=Reqs}=State) ->
do_trace("Recvd request in connected state. Status -> ~p NumPending: ~p~n", [Status, length(queue:to_list(Reqs))]),
+ Resp_format = get_value(response_format, Options, list),
StreamTo = get_value(stream_to, Options, undefined),
SaveResponseToFile = get_value(save_response_to_file, Options, false),
ReqId = make_req_id(),
- NewReq = #request{url=Url,
+ NewReq = #request{url=Url,
stream_to=StreamTo,
method=Method,
- options=Options,
+ options=Options,
req_id=ReqId,
save_response_to_file = SaveResponseToFile,
+ stream_chunk_size = get_stream_chunk_size(Options),
+ response_format = Resp_format,
from=From},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of
@@ -359,14 +371,14 @@ handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
handle_sock_data(Data, #state{status=get_body, content_length=CL,
http_status_code = StatCode,
- recvd_headers=Headers,
+ recvd_headers=Headers,
chunk_size=CSz, socket=Sock}=State) ->
case (CL == undefined) and (CSz == undefined) of
true ->
case accumulate_response(Data, State) of
{error, Reason} ->
shutting_down(State),
- fail_pipelined_requests(State,
+ fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
State_1 ->
@@ -377,7 +389,7 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
case parse_11_response(Data, State) of
{error, Reason} ->
shutting_down(State),
- fail_pipelined_requests(State,
+ fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
stop ->
@@ -433,14 +445,27 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Save
accumulate_response([], State) ->
State;
accumulate_response(Data, #state{reply_buffer = RepBuf,
+ rep_buf_size = RepBufSize,
+ streamed_size = Streamed_size,
cur_req = CurReq}=State) ->
- #request{stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{stream_to=StreamTo, req_id=ReqId,
+ stream_chunk_size = Stream_chunk_size,
+ response_format = Response_format} = CurReq,
+ RepBuf_1 = [Data | RepBuf],
+ New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
- State#state{reply_buffer = [Data | RepBuf]};
+ State#state{reply_buffer = RepBuf_1};
+ _ when New_data_size < Stream_chunk_size ->
+ State#state{reply_buffer = RepBuf_1};
_ ->
- do_interim_reply(StreamTo, ReqId, Data),
- State
+ {Stream_chunk, Rem_data} = split_list_at(flatten(lists:reverse(RepBuf_1)), Stream_chunk_size),
+ do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
+ accumulate_response(
+ Rem_data,
+ State#state{
+ reply_buffer = [],
+ streamed_size = Streamed_size + Stream_chunk_size})
end.
make_tmp_filename() ->
@@ -463,7 +488,7 @@ handle_sock_closed(#state{status=get_header}=State) ->
handle_sock_closed(#state{cur_req=undefined} = State) ->
shutting_down(State);
-%% We check for IsClosing because this the server could have sent a
+%% We check for IsClosing because this the server could have sent a
%% Connection-Close header and has closed the socket to indicate end
%% of response. There maybe requests pipelined which need a response.
handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
@@ -471,18 +496,18 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
cur_req=#request{tmp_file_name=TmpFilename,
tmp_file_fd=Fd} = CurReq,
status=get_body, recvd_headers=Headers}=State) ->
- #request{from=From, stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{from=From, stream_to=StreamTo, req_id=ReqId,
+ response_format = Resp_format} = CurReq,
case IsClosing of
true ->
{_, Reqs_1} = queue:out(Reqs),
case TmpFilename of
undefined ->
- do_reply(State, From, StreamTo, ReqId,
- {ok, SC, Headers,
- lists:flatten(lists:reverse(Buf))});
+ do_reply(State, From, StreamTo, ReqId, Resp_format,
+ {ok, SC, Headers, lists:reverse(Buf)});
_ ->
file:close(Fd),
- do_reply(State, From, StreamTo, ReqId,
+ do_reply(State, From, StreamTo, ReqId, Resp_format,
{ok, SC, Headers, {file, TmpFilename}})
end,
do_error_reply(State#state{reqs = Reqs_1}, connection_closed),
@@ -493,9 +518,13 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
end.
do_connect(Host, Port, _Options, #state{is_ssl=true, ssl_options=SSLOptions}, Timeout) ->
- ssl:connect(Host, Port, [{nodelay, true}, {active, false} | SSLOptions], Timeout);
+ ssl:connect(Host, Port,
+ [{nodelay, true}, {active, false} | SSLOptions],
+ Timeout);
do_connect(Host, Port, _Options, _State, Timeout) ->
- gen_tcp:connect(Host, Port, [{nodelay, true}, {active, false}], Timeout).
+ gen_tcp:connect(Host, Port,
+ [{nodelay, true}, {active, false}],
+ Timeout).
do_send(Sock, Req, true) -> ssl:send(Sock, Req);
do_send(Sock, Req, false) -> gen_tcp:send(Sock, Req).
@@ -542,7 +571,7 @@ check_ssl_options(Options, State) ->
send_req_1(#url{abspath = AbsPath,
host = Host,
- port = Port,
+ port = Port,
path = RelPath} = Url,
Headers, Method, Body, Options, Sock, State) ->
Headers_1 = add_auth_headers(Url, Options, Headers, State),
@@ -555,10 +584,10 @@ send_req_1(#url{abspath = AbsPath,
{value, {_, Host_h_val}} ->
Host_h_val
end,
- {Req, Body_1} = make_request(Method,
+ {Req, Body_1} = make_request(Method,
[{"Host", HostHeaderValue} | Headers_1],
AbsPath, RelPath, Body, Options, State#state.use_proxy),
- case get(my_trace_flag) of
+ case get(my_trace_flag) of
true ->
%%Avoid the binary operations if trace is not on...
NReq = binary_to_list(list_to_binary(Req)),
@@ -569,7 +598,7 @@ send_req_1(#url{abspath = AbsPath,
end,
SndRes = case do_send(Sock, Req, State#state.is_ssl) of
ok -> do_send_body(Sock, Body_1, State#state.is_ssl);
- Err ->
+ Err ->
io:format("Err: ~p~n", [Err]),
Err
end,
@@ -577,9 +606,9 @@ send_req_1(#url{abspath = AbsPath,
SndRes.
add_auth_headers(#url{username = User,
- password = UPw},
+ password = UPw},
Options,
- Headers,
+ Headers,
#state{use_proxy = UseProxy,
proxy_auth_digest = ProxyAuthDigest}) ->
Headers_1 = case User of
@@ -601,7 +630,7 @@ add_auth_headers(#url{username = User,
true ->
[{"Proxy-Authorization", ["Basic ", ProxyAuthDigest]} | Headers_1]
end.
-
+
http_auth_digest([], []) ->
[];
http_auth_digest(Username, Password) ->
@@ -617,7 +646,7 @@ encode_base64([A,B,C|Ls]) ->
encode_base64_do(A,B,C, Ls).
encode_base64_do(A,B,C, Rest) ->
BB = (A bsl 16) bor (B bsl 8) bor C,
- [e(BB bsr 18), e((BB bsr 12) band 63),
+ [e(BB bsr 18), e((BB bsr 12) band 63),
e((BB bsr 6) band 63), e(BB band 63)|encode_base64(Rest)].
e(X) when X >= 0, X < 26 -> X+65;
@@ -643,12 +672,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, UseProxy) ->
_ ->
Headers
end,
- {Headers_2, Body_1} =
+ {Headers_2, Body_1} =
case get_value(transfer_encoding, Options, false) of
false ->
{Headers_1, Body};
{chunked, ChunkSize} ->
- {[{X, Y} || {X, Y} <- Headers_1,
+ {[{X, Y} || {X, Y} <- Headers_1,
X /= "Content-Length",
X /= "content-length",
X /= content_length] ++
@@ -659,7 +688,7 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, UseProxy) ->
Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
true ->
AbsPath;
- false ->
+ false ->
RelPath
end,
{[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_3, crnl()], Body_1}.
@@ -732,7 +761,7 @@ parse_response(_Data, #state{cur_req = undefined}=State) ->
parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
cur_req=CurReq}=State) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
- method=Method} = CurReq,
+ method=Method, response_format = Resp_format} = CurReq,
MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
case scan_header(Data, Acc) of
{yes, Headers, Data_1} ->
@@ -749,7 +778,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
false ->
ok
end,
- State_1 = State#state{recvd_headers=Headers_1, status=get_body,
+ State_1 = State#state{recvd_headers=Headers_1, status=get_body,
+ reply_buffer = [],
http_status_code=StatCode, is_closing=IsClosing},
put(conn_close, ConnClose),
TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")),
@@ -757,7 +787,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
_ when Method == head ->
{_, Reqs_1} = queue:out(Reqs),
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
- State_1_1 = do_reply(State_1, From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}),
+ State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
+ {ok, StatCode, Headers_1, []}),
cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
@@ -776,7 +807,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
%% RFC2616 - Sec 4.4
{_, Reqs_1} = queue:out(Reqs),
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
- State_1_1 = do_reply(State_1, From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}),
+ State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
+ {ok, StatCode, Headers_1, []}),
cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
@@ -788,7 +820,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
chunk_size=chunk_start,
reply_buffer=[], chunks=[]}) of
{error, Reason} ->
- fail_pipelined_requests(State_1,
+ fail_pipelined_requests(State_1,
{error, {Reason,
{stat_code, StatCode}, Headers_1}}),
{error, Reason};
@@ -800,7 +832,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
State_1#state{reply_buffer=[Data_1]};
undefined ->
- fail_pipelined_requests(State_1,
+ fail_pipelined_requests(State_1,
{error, {content_length_undefined,
{stat_code, StatCode}, Headers}}),
{error, content_length_undefined};
@@ -814,7 +846,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
content_length=V_1},
case parse_11_response(Data_1, State_2) of
{error, Reason} ->
- fail_pipelined_requests(State_1,
+ fail_pipelined_requests(State_1,
{error, {Reason,
{stat_code, StatCode}, Headers_1}}),
{error, Reason};
@@ -822,7 +854,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
State_3
end;
_ ->
- fail_pipelined_requests(State_1,
+ fail_pipelined_requests(State_1,
{error, {content_length_undefined,
{stat_code, StatCode}, Headers}}),
{error, content_length_undefined}
@@ -843,25 +875,28 @@ is_connection_closing("HTTP/1.0", "false") -> true;
is_connection_closing(_, _) -> false.
%% This clause determines the chunk size when given data from the beginning of the chunk
-parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked,
+parse_11_response(DataRecvd,
+ #state{transfer_encoding=chunked,
chunk_size=chunk_start,
cur_req=CurReq,
- reply_buffer=Buf}=State) ->
+ reply_buffer=Buf
+ }=State) ->
case scan_crlf(DataRecvd, Buf) of
{yes, ChunkHeader, Data_1} ->
case parse_chunk_header(ChunkHeader) of
{error, Reason} ->
{error, Reason};
ChunkSize ->
- #request{stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{stream_to=StreamTo, req_id=ReqId,
+ response_format = Response_format} = CurReq,
%%
%% Do we have to preserve the chunk encoding when streaming?
%%
- do_interim_reply(StreamTo, ReqId, {chunk_start, ChunkSize}),
+ do_interim_reply(StreamTo, Response_format,
+ ReqId, {chunk_start, ChunkSize}),
RemLen = length(Data_1),
do_trace("Determined chunk size: ~p. Already recvd: ~p~n", [ChunkSize, RemLen]),
- parse_11_response(Data_1, State#state{rep_buf_size=0,
+ parse_11_response(Data_1, State#state{rep_buf_size=0,
reply_buffer=[],
deleted_crlf=true,
chunk_size=ChunkSize})
@@ -871,29 +906,34 @@ parse_11_response(DataRecvd,
end;
%% This clause is there to remove the CRLF between two chunks
-%%
-parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked,
+%%
+parse_11_response(DataRecvd,
+ #state{transfer_encoding=chunked,
chunk_size=tbd,
chunks = Chunks,
cur_req=CurReq,
reply_buffer=Buf}=State) ->
case scan_crlf(DataRecvd, Buf) of
{yes, _, NextChunk} ->
- #request{stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{stream_to=StreamTo, req_id=ReqId,
+ response_format = Response_format} = CurReq,
%%
%% Do we have to preserve the chunk encoding when streaming?
%%
State_1 = State#state{chunk_size=chunk_start,
- rep_buf_size=0,
+ rep_buf_size=0,
reply_buffer=[],
deleted_crlf=true},
State_2 = case StreamTo of
undefined ->
State_1#state{chunks = [Buf | Chunks]};
- _ ->
- do_interim_reply(StreamTo, ReqId, chunk_end),
- State_1
+ _ ->
+ %% Flush out all buffered data as chunk is ending
+ do_interim_reply(StreamTo, Response_format, ReqId,
+ lists:reverse([Buf | Chunks])),
+ do_interim_reply(StreamTo, Response_format,
+ ReqId, chunk_end),
+ State_1#state{chunks = [], streamed_size = 0}
end,
parse_11_response(NextChunk, State_2);
{no, Data_1} ->
@@ -901,26 +941,27 @@ parse_11_response(DataRecvd,
end;
%% This clause deals with the end of a chunked transfer
-parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked, chunk_size=0,
+parse_11_response(DataRecvd,
+ #state{transfer_encoding=chunked, chunk_size=0,
cur_req=CurReq,
deleted_crlf = DelCrlf,
reply_buffer=Trailer, reqs=Reqs}=State) ->
do_trace("Detected end of chunked transfer...~n", []),
DataRecvd_1 = case DelCrlf of
- false ->
+ false ->
DataRecvd;
true ->
[$\r, $\n | DataRecvd]
end,
- #request{stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{stream_to=StreamTo, req_id=ReqId,
+ response_format = Response_format} = CurReq,
case scan_header(DataRecvd_1, Trailer) of
{yes, _TEHeaders, Rem} ->
{_, Reqs_1} = queue:out(Reqs),
%%
- %% Do we have to preserve the chunk encoding when streaming?
+ %% Do we have to preserve the chunk encoding when streaming? Nope.
%%
- do_interim_reply(StreamTo, ReqId, chunk_end),
+ do_interim_reply(StreamTo, Response_format, ReqId, chunk_end),
State_1 = handle_response(CurReq, State#state{reqs=Reqs_1}),
parse_response(Rem, reset_state(State_1));
{no, Rem} ->
@@ -928,7 +969,7 @@ parse_11_response(DataRecvd,
end;
%% This clause extracts a chunk, given the size.
-parse_11_response(DataRecvd,
+parse_11_response(DataRecvd,
#state{transfer_encoding=chunked, chunk_size=CSz,
rep_buf_size=RepBufSz}=State) ->
NeedBytes = CSz - RepBufSz,
@@ -952,12 +993,12 @@ parse_11_response(DataRecvd,
parse_11_response(RemData, State_2)
end;
false ->
- accumulate_response(DataRecvd, State#state{rep_buf_size=RepBufSz + DataLen})
+ accumulate_response(DataRecvd, State#state{rep_buf_size=(RepBufSz + DataLen)})
end;
%% This clause to extract the body when Content-Length is specified
-parse_11_response(DataRecvd,
- #state{content_length=CL, rep_buf_size=RepBufSz,
+parse_11_response(DataRecvd,
+ #state{content_length=CL, rep_buf_size=RepBufSz,
reqs=Reqs}=State) ->
NeedBytes = CL - RepBufSz,
DataLen = length(DataRecvd),
@@ -970,11 +1011,12 @@ parse_11_response(DataRecvd,
State_3 = reset_state(State_2),
parse_response(Rem, State_3);
false ->
- accumulate_response(DataRecvd, State#state{rep_buf_size=RepBufSz+DataLen})
+ accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
end.
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
- save_response_to_file = SaveResponseToFile,
+ response_format = Resp_format,
+ save_response_to_file = SaveResponseToFile,
tmp_file_name = TmpFilename,
tmp_file_fd = Fd
},
@@ -986,9 +1028,9 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false ->
Body = case TEnc of
chunked ->
- lists:flatten(lists:reverse(Chunks));
+ lists:reverse(Chunks);
_ ->
- lists:flatten(lists:reverse(RepBuf))
+ lists:reverse(RepBuf)
end,
State_1 = set_cur_request(State),
file:close(Fd),
@@ -998,32 +1040,38 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
- State_2 = do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, ResponseBody}),
+ State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
+ {ok, SCode, RespHeaders, ResponseBody}),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
State_2;
-handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId},
+handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
+ response_format = Resp_format},
#state{http_status_code=SCode, recvd_headers=RespHeaders,
reply_buffer=RepBuf, transfer_encoding=TEnc,
chunks=Chunks, send_timer=ReqTimer}=State) ->
Body = case TEnc of
chunked ->
- lists:flatten(lists:reverse(Chunks));
+ lists:reverse(Chunks);
_ ->
- lists:flatten(lists:reverse(RepBuf))
+ lists:reverse(RepBuf)
end,
- State_1 = set_cur_request(State),
- case get(conn_close) of
+%% State_1 = set_cur_request(State),
+ State_1 = case get(conn_close) of
"close" ->
- do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}),
+ do_reply(State, From, StreamTo, ReqId, Resp_format,
+ {ok, SCode, RespHeaders, Body}),
exit(normal);
_ ->
- State_2 = do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}),
+ State_1_1 = do_reply(State, From, StreamTo, ReqId, Resp_format,
+ {ok, SCode, RespHeaders, Body}),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
- State_2
- end.
+ State_1_1
+ end,
+ set_cur_request(State_1).
reset_state(State) ->
- State#state{status=get_header, rep_buf_size=0,content_length=undefined,
+ State#state{status=get_header, rep_buf_size=0, streamed_size = 0,
+ content_length=undefined,
reply_buffer=[], chunks=[], recvd_headers=[], deleted_crlf=false,
http_status_code=undefined, chunk_size=undefined, transfer_encoding=undefined}.
@@ -1063,18 +1111,18 @@ parse_headers_1(String) ->
parse_headers_1(String, [], []).
parse_headers_1([$\n, H |T], [$\r | L], Acc) when H == 32;
- H == $\t ->
+ H == $\t ->
parse_headers_1(lists:dropwhile(fun(X) ->
is_whitespace(X)
end, T), [32 | L], Acc);
-parse_headers_1([$\n|T], [$\r | L], Acc) ->
+parse_headers_1([$\n|T], [$\r | L], Acc) ->
case parse_header(lists:reverse(L)) of
invalid ->
parse_headers_1(T, [], Acc);
NewHeader ->
parse_headers_1(T, [], [NewHeader | Acc])
end;
-parse_headers_1([H|T], L, Acc) ->
+parse_headers_1([H|T], L, Acc) ->
parse_headers_1(T, [H|L], Acc);
parse_headers_1([], [], Acc) ->
lists:reverse(Acc);
@@ -1185,7 +1233,7 @@ parse_chunk_header([H | T], Acc) ->
parse_chunk_header([], Acc) ->
hexlist_to_integer(lists:reverse(Acc)).
-is_whitespace(32) -> true;
+is_whitespace($\s) -> true;
is_whitespace($\r) -> true;
is_whitespace($\n) -> true;
is_whitespace($\t) -> true;
@@ -1197,36 +1245,62 @@ send_async_headers(_ReqId, undefined, _StatCode, _Headers) ->
send_async_headers(ReqId, StreamTo, StatCode, Headers) ->
catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers}.
-do_reply(State, From, undefined, _, Msg) ->
+format_response_data(Resp_format, Body) ->
+ case Resp_format of
+ list when is_list(Body) ->
+ flatten(Body);
+ binary when is_list(Body) ->
+ list_to_binary(Body);
+ _ ->
+ %% This is to cater for sending messages such as
+ %% {chunk_start, _}, chunk_end etc
+ Body
+ end.
+
+do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) ->
+ Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)},
+ gen_server:reply(From, Msg_1),
+ dec_pipeline_counter(State);
+do_reply(State, From, undefined, _, _, Msg) ->
gen_server:reply(From, Msg),
dec_pipeline_counter(State);
-do_reply(State, _From, StreamTo, ReqId, {ok, _, _, _}) ->
+do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
State_1 = dec_pipeline_counter(State),
+ case Body of
+ [] ->
+ ok;
+ _ ->
+ Body_1 = format_response_data(Resp_format, Body),
+ catch StreamTo ! {ibrowse_async_response, ReqId, Body_1}
+ end,
catch StreamTo ! {ibrowse_async_response_end, ReqId},
State_1;
-do_reply(State, _From, StreamTo, ReqId, Msg) ->
+do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
State_1 = dec_pipeline_counter(State),
- catch StreamTo ! {ibrowse_async_response, ReqId, Msg},
+ Msg_1 = format_response_data(Resp_format, Msg),
+ catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1},
State_1.
-do_interim_reply(undefined, _ReqId, _Msg) ->
+do_interim_reply(undefined, _, _ReqId, _Msg) ->
ok;
-do_interim_reply(StreamTo, ReqId, Msg) ->
- catch StreamTo ! {ibrowse_async_response, ReqId, Msg}.
+do_interim_reply(StreamTo, Response_format, ReqId, Msg) ->
+ Msg_1 = format_response_data(Response_format, Msg),
+ catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}.
do_error_reply(#state{reqs = Reqs} = State, Err) ->
ReqList = queue:to_list(Reqs),
- lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId}) ->
- do_reply(State, From, StreamTo, ReqId, {error, Err})
+ lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId,
+ response_format = Resp_format}) ->
+ do_reply(State, From, StreamTo, ReqId, Resp_format, {error, Err})
end, ReqList).
fail_pipelined_requests(#state{reqs = Reqs, cur_req = CurReq} = State, Reply) ->
{_, Reqs_1} = queue:out(Reqs),
- #request{from=From, stream_to=StreamTo, req_id=ReqId} = CurReq,
- do_reply(State, From, StreamTo, ReqId, Reply),
+ #request{from=From, stream_to=StreamTo, req_id=ReqId,
+ response_format = Resp_format} = CurReq,
+ do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
do_error_reply(State#state{reqs = Reqs_1}, previous_request_failed).
-
split_list_at(List, N) ->
split_list_at(List, N, []).
split_list_at([], _, Acc) ->
@@ -1271,7 +1345,7 @@ cancel_timer(Ref) -> erlang:cancel_timer(Ref).
cancel_timer(Ref, {eat_message, Msg}) ->
cancel_timer(Ref),
- receive
+ receive
Msg ->
ok
after 0 ->
@@ -1310,3 +1384,19 @@ dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
ets:delete(Tid, {Pipe_sz, self()}),
ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
State#state{cur_pipeline_size = Pipe_sz - 1}.
+
+flatten([H | _] = L) when is_integer(H) ->
+ L;
+flatten([H | _] = L) when is_list(H) ->
+ lists:flatten(L);
+flatten([]) ->
+ [].
+
+get_stream_chunk_size(Options) ->
+ case lists:keysearch(stream_chunk_size, 1, Options) of
+ {value, {_, V}} when V > 0 ->
+ V;
+ _ ->
+ ?DEFAULT_STREAM_CHUNK_SIZE
+ end.
+
diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl
index b4429c9b..de8865ff 100644
--- a/src/ibrowse/ibrowse_test.erl
+++ b/src/ibrowse/ibrowse_test.erl
@@ -14,7 +14,10 @@
drv_ue_test/0,
drv_ue_test/1,
ue_test/0,
- ue_test/1
+ ue_test/1,
+ verify_chunked_streaming/0,
+ verify_chunked_streaming/1,
+ i_do_async_req_list/4
]).
-import(ibrowse_lib, [printable_date/0]).
@@ -88,7 +91,7 @@ do_wait() ->
do_wait()
end
end.
-
+
do_send_req(Url, NumReqs) ->
do_send_req_1(Url, NumReqs).
@@ -149,7 +152,7 @@ dump_errors(Key, Iod) ->
-define(TEST_LIST, [{"http://intranet/messenger", get},
{"http://www.google.co.uk", get},
{"http://www.google.com", get},
- {"http://www.google.com", options},
+ {"http://www.google.com", options},
{"http://www.sun.com", get},
{"http://www.oracle.com", get},
{"http://www.bbc.co.uk", get},
@@ -172,7 +175,8 @@ dump_errors(Key, Iod) ->
{"http://jigsaw.w3.org/HTTP/400/toolong/", get},
{"http://jigsaw.w3.org/HTTP/300/", get},
{"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]},
- {"http://jigsaw.w3.org/HTTP/CL/", get}
+ {"http://jigsaw.w3.org/HTTP/CL/", get},
+ {"http://www.httpwatch.com/httpgallery/chunked/", get}
]).
unit_tests() ->
@@ -185,13 +189,104 @@ unit_tests(Options) ->
execute_req(Url, Method, X_Opts ++ Options)
end, ?TEST_LIST).
-execute_req(Url, Method) ->
- execute_req(Url, Method, []).
+verify_chunked_streaming() ->
+ verify_chunked_streaming([]).
+
+verify_chunked_streaming(Options) ->
+ Url = "http://www.httpwatch.com/httpgallery/chunked/",
+ io:format("URL: ~s~n", [Url]),
+ io:format("Fetching data without streaming...~n", []),
+ Result_without_streaming = ibrowse:send_req(
+ Url, [], get, [],
+ [{response_format, binary} | Options]),
+ io:format("Fetching data with streaming as list...~n", []),
+ Async_response_list = do_async_req_list(
+ Url, get, [{response_format, list}]),
+ io:format("Fetching data with streaming as binary...~n", []),
+ Async_response_bin = do_async_req_list(
+ Url, get, [{response_format, binary}]),
+ compare_responses(Result_without_streaming, Async_response_list, Async_response_bin).
+
+compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) ->
+ success;
+compare_responses({ok, St_code, _, Body_1}, {ok, St_code, _, Body_2}, {ok, St_code, _, Body_3}) ->
+ case Body_1 of
+ Body_2 ->
+ io:format("Body_1 and Body_2 match~n", []);
+ Body_3 ->
+ io:format("Body_1 and Body_3 match~n", []);
+ _ when Body_2 == Body_3 ->
+ io:format("Body_2 and Body_3 match~n", []);
+ _ ->
+ io:format("All three bodies are different!~n", [])
+ end,
+ fail_bodies_mismatch;
+compare_responses(R1, R2, R3) ->
+ io:format("R1 -> ~p~n", [R1]),
+ io:format("R2 -> ~p~n", [R2]),
+ io:format("R3 -> ~p~n", [R3]),
+ fail.
+
+do_async_req_list(Url) ->
+ do_async_req_list(Url, get).
+
+do_async_req_list(Url, Method) ->
+ do_async_req_list(Url, Method, [{stream_to, self()},
+ {stream_chunk_size, 1000}]).
+
+do_async_req_list(Url, Method, Options) ->
+ {Pid,_} = erlang:spawn_monitor(?MODULE, i_do_async_req_list,
+ [self(), Url, Method,
+ Options ++ [{stream_chunk_size, 1000}]]),
+ io:format("Spawned process ~p~n", [Pid]),
+ wait_for_resp(Pid).
+
+wait_for_resp(Pid) ->
+ receive
+ {async_result, Pid, Res} ->
+ Res;
+ {'DOWN', _, _, Pid, Reason} ->
+ {'EXIT', Reason};
+ {'DOWN', _, _, _, _} ->
+ wait_for_resp(Pid);
+ Msg ->
+ io:format("Recvd unknown message: ~p~n", [Msg]),
+ wait_for_resp(Pid)
+ after 10000 ->
+ {error, timeout}
+ end.
+
+i_do_async_req_list(Parent, Url, Method, Options) ->
+ Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]),
+ case Res of
+ {ibrowse_req_id, Req_id} ->
+ Result = wait_for_async_resp(Req_id, undefined, undefined, []),
+ Parent ! {async_result, self(), Result};
+ Err ->
+ Parent ! {async_result, self(), Err}
+ end.
+
+wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) ->
+ receive
+ {ibrowse_async_headers, Req_id, StatCode, Headers} ->
+ wait_for_async_resp(Req_id, StatCode, Headers, Body);
+ {ibrowse_async_response, Req_id, {chunk_start, _}} ->
+ wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body);
+ {ibrowse_async_response, Req_id, chunk_end} ->
+ wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body);
+ {ibrowse_async_response_end, Req_id} ->
+ Body_1 = list_to_binary(lists:reverse(Body)),
+ {ok, Acc_Stat_code, Acc_Headers, Body_1};
+ {ibrowse_async_response, Req_id, Data} ->
+ wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]);
+ Err ->
+ {ok, Acc_Stat_code, Acc_Headers, Err}
+ end.
execute_req(Url, Method, Options) ->
io:format("~s, ~p: ", [Url, Method]),
Result = (catch ibrowse:send_req(Url, [], Method, [], Options)),
- case Result of
+ case Result of
{ok, SCode, _H, _B} ->
io:format("Status code: ~p~n", [SCode]);
Err ->