summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-03-07 18:48:47 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-03-07 18:48:47 +0000
commitf7c2f1f59ef95d4c4976c56c1bbf718f8036ca87 (patch)
tree00c7c16650d31701746f6b944ae3e4ab070c3823
parent5b9b9823e091b6e8720d3930785f59c424239daa (diff)
rewrite replicator using OTP behaviours
- only one instance of given source->target runs at a time - supervisor restarts replications that terminate abnormally - pull repl. streams attachments directly to disk - improved memory utilization - temporarily rollback parallel async doc GETs during pull rep. - replication updates show up in Futon Status window git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@751305 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/Makefile.am3
-rw-r--r--src/couchdb/couch_doc.erl30
-rw-r--r--src/couchdb/couch_rep.erl769
-rw-r--r--src/couchdb/couch_server_sup.erl10
-rw-r--r--src/couchdb/couch_util.erl21
-rw-r--r--src/ibrowse/ibrowse.erl19
-rw-r--r--src/ibrowse/ibrowse_http_client.erl298
-rw-r--r--src/ibrowse/ibrowse_test.erl109
8 files changed, 810 insertions, 449 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 51e4878c..8bea5290 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -64,6 +64,7 @@ source_files = \
couch_query_servers.erl \
couch_ref_counter.erl \
couch_rep.erl \
+ couch_rep_sup.erl \
couch_server.erl \
couch_server_sup.erl \
couch_stats_aggregator.erl \
@@ -104,6 +105,7 @@ compiled_files = \
couch_query_servers.beam \
couch_ref_counter.beam \
couch_rep.beam \
+ couch_rep_sup.beam \
couch_server.beam \
couch_server_sup.beam \
couch_stats_aggregator.beam \
@@ -139,6 +141,7 @@ compiled_files = \
# couch_log.html \
# couch_query_servers.html \
# couch_rep.html \
+# couch_rep_sup.html \
# couch_server.html \
# couch_server_sup.html \
# couch_stream.html \
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 1eb1575a..9860ac0c 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -51,7 +51,13 @@ to_json_revs_info(Meta) ->
to_json_attachment_stubs(Attachments) ->
BinProps = lists:map(
- fun({Name, {Type, BinValue}}) ->
+ fun({Name, {Type, {_RcvFun, Length}}}) ->
+ {Name, {[
+ {<<"stub">>, true},
+ {<<"content_type">>, Type},
+ {<<"length">>, Length}
+ ]}};
+ ({Name, {Type, BinValue}}) ->
{Name, {[
{<<"stub">>, true},
{<<"content_type">>, Type},
@@ -66,7 +72,13 @@ to_json_attachment_stubs(Attachments) ->
to_json_attachments(Attachments) ->
BinProps = lists:map(
- fun({Name, {Type, BinValue}}) ->
+ fun({Name, {Type, {RcvFun, Length}}}) ->
+ Data = read_streamed_attachment(RcvFun, Length, _Acc = []),
+ {Name, {[
+ {<<"content_type">>, Type},
+ {<<"data">>, couch_util:encodeBase64(Data)}
+ ]}};
+ ({Name, {Type, BinValue}}) ->
{Name, {[
{<<"content_type">>, Type},
{<<"data">>, couch_util:encodeBase64(bin_to_binary(BinValue))}
@@ -100,7 +112,9 @@ from_json_obj({Props}) ->
Bins = lists:flatmap(fun({Name, {BinProps}}) ->
case proplists:get_value(<<"stub">>, BinProps) of
true ->
- [{Name, stub}];
+ Type = proplists:get_value(<<"content_type">>, BinProps),
+ Length = proplists:get_value(<<"length">>, BinProps),
+ [{Name, {stub, Type, Length}}];
_ ->
Value = proplists:get_value(<<"data">>, BinProps),
Type = proplists:get_value(<<"content_type">>, BinProps,
@@ -225,7 +239,7 @@ has_stubs(#doc{attachments=Bins}) ->
has_stubs(Bins);
has_stubs([]) ->
false;
-has_stubs([{_Name, stub}|_]) ->
+has_stubs([{_Name, {stub, _, _}}|_]) ->
true;
has_stubs([_Bin|Rest]) ->
has_stubs(Rest).
@@ -233,9 +247,15 @@ has_stubs([_Bin|Rest]) ->
merge_stubs(#doc{attachments=MemBins}=StubsDoc, #doc{attachments=DiskBins}) ->
BinDict = dict:from_list(DiskBins),
MergedBins = lists:map(
- fun({Name, stub}) ->
+ fun({Name, {stub, _, _}}) ->
{Name, dict:fetch(Name, BinDict)};
({Name, Value}) ->
{Name, Value}
end, MemBins),
StubsDoc#doc{attachments= MergedBins}.
+
+read_streamed_attachment(_RcvFun, 0, Acc) ->
+ list_to_binary(lists:reverse(Acc));
+read_streamed_attachment(RcvFun, LenLeft, Acc) ->
+ Bin = RcvFun(),
+ read_streamed_attachment(RcvFun, LenLeft - size(Bin), [Bin|Acc]).
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index e0c4d470..3df2d821 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -11,170 +11,343 @@
% the License.
-module(couch_rep).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([replicate/3]).
+
+-include_lib("couch_db.hrl").
+
+%% @spec replicate(Source::binary(), Target::binary(), Options::proplist()) ->
+%% {ok, Stats} | {error, Reason}
+%% @doc Triggers a replication. Stats is a JSON Object with the following
+%% keys: session_id (UUID), source_last_seq (integer), and history (array).
+%% Each element of the history is an Object with keys start_time, end_time,
+%% start_last_seq, end_last_seq, missing_checked, missing_found, docs_read,
+%% and docs_written.
+%%
+%% The supervisor will try to restart the replication in case of any error
+%% other than shutdown. Just call this function again to listen for the
+%% result of the retry.
+replicate(Source, Target, Options) ->
+ Id = <<Source/binary, ":", Target/binary>>,
+ Args = [?MODULE, [Source,Target,Options], []],
+
+ Replicator = {Id,
+ {gen_server, start_link, Args},
+ transient,
+ 10000,
+ worker,
+ [?MODULE]
+ },
+
+ Server = case supervisor:start_child(couch_rep_sup, Replicator) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting new replication ~p at ~p", [Id, Pid]),
+ Pid;
+ {error, already_present} ->
+ case supervisor:restart_child(couch_rep_sup, Id) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting replication ~p at ~p", [Id, Pid]),
+ Pid;
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, Replicator),
+ ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ Pid
+ end;
+ {error, {already_started, Pid}} ->
+ ?LOG_INFO("replication ~p already running at ~p", [Id, Pid]),
+ Pid
+ end,
+
+ case gen_server:call(Server, get_result, infinity) of
+ retry -> replicate(Source, Target, Options);
+ Else -> Else
+ end.
--include("couch_db.hrl").
+%%=============================================================================
+%% gen_server callbacks
+%%=============================================================================
-record(http_db, {
uri,
headers
}).
--export([replicate/2, replicate/3]).
-
-url_encode(Bin) when is_binary(Bin) ->
- url_encode(binary_to_list(Bin));
-url_encode([H|T]) ->
- if
- H >= $a, $z >= H ->
- [H|url_encode(T)];
- H >= $A, $Z >= H ->
- [H|url_encode(T)];
- H >= $0, $9 >= H ->
- [H|url_encode(T)];
- H == $_; H == $.; H == $-; H == $: ->
- [H|url_encode(T)];
- true ->
- case lists:flatten(io_lib:format("~.16.0B", [H])) of
- [X, Y] ->
- [$%, X, Y | url_encode(T)];
- [X] ->
- [$%, $0, X | url_encode(T)]
- end
- end;
-url_encode([]) ->
- [].
-
-
-replicate(DbNameA, DbNameB) ->
- replicate(DbNameA, DbNameB, []).
+-record(state, {
+ context,
+ current_seq,
+ source,
+ target,
+ stats,
+ enum_pid,
+ docs_buffer = [],
+ listeners = []
+}).
-replicate(Source, Target, Options) ->
- {ok, DbSrc} = open_db(Source,
- proplists:get_value(source_options, Options, [])),
- try
- {ok, DbTgt} = open_db(Target,
- proplists:get_value(target_options, Options, [])),
- try
- replicate2(Source, DbSrc, Target, DbTgt, Options)
- after
- close_db(DbTgt)
- end
- after
- close_db(DbSrc)
- end.
+init([Source, Target, Options]) ->
+ {ok, DbSrc} =
+ open_db(Source, proplists:get_value(source_options, Options, [])),
+ {ok, DbTgt} =
+ open_db(Target, proplists:get_value(target_options, Options, [])),
-replicate2(Source, DbSrc, Target, DbTgt, Options) ->
- {ok, HostName} = inet:gethostname(),
- HostNameBin = list_to_binary(HostName),
- RepRecKey = <<?LOCAL_DOC_PREFIX, HostNameBin/binary,
- ":", Source/binary, ":", Target/binary>>,
-
- ReplicationStartTime = httpd_util:rfc1123_date(),
+ {ok, Host} = inet:gethostname(),
+ HostBin = list_to_binary(Host),
+ DocKey = <<?LOCAL_DOC_PREFIX, HostBin/binary, ":", Source/binary, ":",
+ Target/binary>>,
{ok, InfoSrc} = get_db_info(DbSrc),
{ok, InfoTgt} = get_db_info(DbTgt),
+ ReplicationStartTime = httpd_util:rfc1123_date(),
SrcInstanceStartTime = proplists:get_value(instance_start_time, InfoSrc),
TgtInstanceStartTime = proplists:get_value(instance_start_time, InfoTgt),
case proplists:get_value(full, Options, false)
orelse proplists:get_value("full", Options, false) of
true ->
- RepRecSrc = RepRecTgt = #doc{id=RepRecKey};
+ RepRecSrc = RepRecTgt = #doc{id=DocKey};
false ->
- RepRecSrc =
- case open_doc(DbSrc, RepRecKey, []) of
- {ok, SrcDoc} ->
- ?LOG_DEBUG("Found existing replication record on source", []),
- SrcDoc;
- _ -> #doc{id=RepRecKey}
+ RepRecSrc = case open_doc(DbSrc, DocKey, []) of
+ {ok, SrcDoc} ->
+ ?LOG_DEBUG("Found existing replication record on source", []),
+ SrcDoc;
+ _ -> #doc{id=DocKey}
end,
- RepRecTgt =
- case open_doc(DbTgt, RepRecKey, []) of
- {ok, TgtDoc} ->
- ?LOG_DEBUG("Found existing replication record on target", []),
- TgtDoc;
- _ -> #doc{id=RepRecKey}
+ RepRecTgt = case open_doc(DbTgt, DocKey, []) of
+ {ok, TgtDoc} ->
+ ?LOG_DEBUG("Found existing replication record on target", []),
+ TgtDoc;
+ _ -> #doc{id=DocKey}
end
end,
#doc{body={OldRepHistoryProps}} = RepRecSrc,
#doc{body={OldRepHistoryPropsTrg}} = RepRecTgt,
- SeqNum =
- case OldRepHistoryProps == OldRepHistoryPropsTrg of
- true ->
- % if the records are identical, then we have a valid replication history
- proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0);
- false ->
- ?LOG_INFO("Replication records differ. "
+ SeqNum = case OldRepHistoryProps == OldRepHistoryPropsTrg of
+ true ->
+ % if the records are identical, then we have a valid replication history
+ proplists:get_value(<<"source_last_seq">>, OldRepHistoryProps, 0);
+ false ->
+ ?LOG_INFO("Replication records differ. "
"Performing full replication instead of incremental.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n", [OldRepHistoryProps, OldRepHistoryPropsTrg]),
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [OldRepHistoryProps, OldRepHistoryPropsTrg]),
0
end,
+
+ Context = [
+ {start_seq, SeqNum},
+ {history, OldRepHistoryProps},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime},
+ {src_record, RepRecSrc},
+ {tgt_record, RepRecTgt}
+ ],
+
+ Stats = ets:new(replication_stats, [set, private]),
+ ets:insert(Stats, {total_revs,0}),
+ ets:insert(Stats, {missing_revs, 0}),
+ ets:insert(Stats, {docs_read, 0}),
+ ets:insert(Stats, {docs_written, 0}),
+
+ couch_task_status:add_task("Replication", <<Source/binary, " -> ",
+ Target/binary>>, "Starting"),
+
+ Parent = self(),
+ Pid = spawn_link(fun() -> enum_docs_since(Parent,DbSrc,DbTgt,{SeqNum,0}) end),
+
+ State = #state{
+ context = Context,
+ current_seq = SeqNum,
+ enum_pid = Pid,
+ source = DbSrc,
+ target = DbTgt,
+ stats = Stats
+ },
+
+ {ok, State}.
+
- {NewSeqNum, Stats} = pull_rep(DbTgt, DbSrc, SeqNum),
+handle_call(get_result, From, #state{listeners=L} = State) ->
+ {noreply, State#state{listeners=[From|L]}};
+
+handle_call({replicate_doc, {Id, Revs}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
+ #state{
+ docs_buffer = Buffer,
+ source = Source,
+ target = Target,
+ stats = Stats
+ } = State,
- case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
- true ->
- % nothing changed, don't record results
- {ok, {OldRepHistoryProps}};
- false ->
- % commit changes to both src and tgt. The src because if changes
- % we replicated are lost, we'll record the a seq number of ahead
- % of what was committed and therefore lose future changes with the
- % same seq nums.
-
- {ok, SrcInstanceStartTime2} = ensure_full_commit(DbSrc),
- {ok, TgtInstanceStartTime2} = ensure_full_commit(DbTgt),
-
- RecordSeqNum =
- if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
- TgtInstanceStartTime2 == TgtInstanceStartTime ->
- NewSeqNum;
+ ets:update_counter(Stats, missing_revs, length(Revs)),
+
+ %% get document(s)
+ {ok, DocResults} = open_doc_revs(Source, Id, Revs, [latest]),
+ Docs = [RevDoc || {ok, RevDoc} <- DocResults],
+ ets:update_counter(Stats, docs_read, length(Docs)),
+
+ %% save them (maybe in a buffer)
+ NewBuffer = case couch_util:should_flush() of
true ->
- ?LOG_INFO("A server has restarted sinced replication start. "
- "Not recording the new sequence number to ensure the "
- "replication is redone and documents reexamined.", []),
- SeqNum
- end,
-
- HistEntries =[
- {
- [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
- {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
- {<<"start_last_seq">>, SeqNum},
- {<<"end_last_seq">>, NewSeqNum} | Stats]}
- | proplists:get_value("history", OldRepHistoryProps, [])],
- % something changed, record results
- NewRepHistory =
- {
- [{<<"session_id">>, couch_util:new_uuid()},
- {<<"source_last_seq">>, RecordSeqNum},
- {<<"history">>, lists:sublist(HistEntries, 50)}]},
-
- {ok, _} = update_doc(DbSrc, RepRecSrc#doc{body=NewRepHistory}, []),
- {ok, _} = update_doc(DbTgt, RepRecTgt#doc{body=NewRepHistory}, []),
- {ok, NewRepHistory}
+ Docs2 = lists:flatten([Docs|Buffer]),
+ ok = update_docs(Target, Docs2, [], false),
+ ets:update_counter(Stats, docs_written, length(Docs2)),
+ [];
+ false ->
+ [Docs | Buffer]
+ end,
+
+ {reply, ok, State#state{docs_buffer=NewBuffer}};
+
+handle_call({fin, {LastSeq, RevsCount}}, {Pid,_}, #state{enum_pid=Pid} = State) ->
+ ets:update_counter(State#state.stats, total_revs, RevsCount),
+ {stop, normal, ok, State#state{current_seq=LastSeq}}.
+
+handle_cast({increment_update_seq, Seq}, State) ->
+ couch_task_status:update("Processed source update #~p", [Seq]),
+ {noreply, State#state{current_seq=Seq}}.
+
+handle_info(_Msg, State) ->
+ {noreply, State}.
+
+terminate(normal, State) ->
+ #state{
+ context = Context,
+ current_seq = Seq,
+ docs_buffer = Buffer,
+ listeners = Listeners,
+ source = Source,
+ target = Target,
+ stats = Stats
+ } = State,
+
+ ok = update_docs(Target, lists:flatten(Buffer), [], false),
+ ets:update_counter(Stats, docs_written, lists:flatlength(Buffer)),
+
+ couch_task_status:update("Finishing"),
+
+ %% format replication history
+ JsonStats = [
+ {<<"missing_checked">>, ets:lookup_element(Stats, total_revs, 2)},
+ {<<"missing_found">>, ets:lookup_element(Stats, missing_revs, 2)},
+ {<<"docs_read">>, ets:lookup_element(Stats, docs_read, 2)},
+ {<<"docs_written">>, ets:lookup_element(Stats, docs_written, 2)}
+ ],
+ ets:delete(Stats),
+ {ok, NewRepHistory} = finalize_response(Source, Target, Context, Seq, JsonStats),
+
+ %% update local documents
+ RepRecSrc = proplists:get_value(src_record, Context),
+ RepRecTgt = proplists:get_value(tgt_record, Context),
+ {ok, _} = update_local_doc(Source, RepRecSrc#doc{body=NewRepHistory}, []),
+ {ok, _} = update_local_doc(Target, RepRecTgt#doc{body=NewRepHistory}, []),
+
+ close_db(Target),
+
+ %% reply to original requester
+ [Original|Rest] = Listeners,
+ gen_server:reply(Original, {ok, NewRepHistory}),
+
+ %% maybe trigger another replication. If this replicator uses a local
+ %% source Db, changes to that Db since we started will not be included in
+ %% this pass.
+ case up_to_date(Source, Seq) of
+ true ->
+ [gen_server:reply(R, {ok, NewRepHistory}) || R <- Rest];
+ false ->
+ [gen_server:reply(R, retry) || R <- Rest]
+ end,
+ close_db(Source);
+terminate(Reason, State) ->
+ #state{
+ listeners = Listeners,
+ source = Source,
+ target = Target,
+ stats = Stats
+ } = State,
+
+ [gen_server:reply(L, {error, Reason}) || L <- Listeners],
+
+ ets:delete(Stats),
+ close_db(Target),
+ close_db(Source).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%=============================================================================
+%% internal functions
+%%=============================================================================
+
+attachment_loop(ReqId) ->
+ couch_util:should_flush(),
+ receive
+ {From, {set_req_id, NewId}} ->
+ From ! {self(), {ok, NewId}},
+ attachment_loop(NewId);
+ {ibrowse_async_headers, ReqId, _Status, _Headers} ->
+ attachment_loop(ReqId);
+ {ibrowse_async_response, ReqId, {chunk_start,_}} ->
+ attachment_loop(ReqId);
+ {ibrowse_async_response, ReqId, chunk_end} ->
+ attachment_loop(ReqId);
+ {ibrowse_async_response, ReqId, Data} ->
+ receive {From, gimme_data} -> From ! {self(), Data} end,
+ attachment_loop(ReqId);
+ {ibrowse_async_response_end, ReqId} -> ok
end.
-pull_rep(DbTarget, DbSource, SourceSeqNum) ->
- {ok, {NewSeq, Stats}} =
- enum_docs_since(DbSource, DbTarget, SourceSeqNum, {SourceSeqNum, []}),
- {NewSeq, Stats}.
+attachment_stub_converter(DbS, Id, {Name, {stub, Type, Length}}) ->
+ #http_db{uri=DbUrl, headers=Headers} = DbS,
+ % TODO worry about revisions
+ Url = DbUrl ++ url_encode(Id) ++ "/" ++ ?b2l(Name),
+ ?LOG_DEBUG("Attachment URL ~p", [Url]),
+
+ %% start the process that receives attachment data from ibrowse
+ Pid = spawn_link(fun() -> attachment_loop(nil) end),
+
+ %% make the async request
+ Options = [{stream_to, Pid}, {response_format, binary}],
+ {ibrowse_req_id, ReqId} = ibrowse:send_req(Url, Headers, get, [], Options),
+
+ %% tell our receiver about the ReqId it needs to look for
+ Pid ! {self(), {set_req_id, ReqId}},
+ receive {Pid, {ok, ReqId}} -> ok end,
+
+ %% this is the function that goes into the streaming attachment code.
+ %% It gets executed by the replication gen_server, so it can't
+ %% be the one to actually receive the ibrowse data.
+ RcvFun = fun() ->
+ Pid ! {self(), gimme_data},
+ receive {Pid, Data} -> Data end
+ end,
+ {Name, {Type, {RcvFun, Length}}}.
+
+close_db(#http_db{})->
+ ok;
+close_db(Db)->
+ couch_db:close(Db).
do_http_request(Url, Action, Headers) ->
do_http_request(Url, Action, Headers, []).
do_http_request(Url, Action, Headers, JsonBody) ->
- do_http_request(Url, Action, Headers, JsonBody, 10).
+ do_http_request(?b2l(?l2b(Url)), Action, Headers, JsonBody, 10).
do_http_request(Url, Action, _Headers, _JsonBody, 0) ->
- ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~p",
+ ?LOG_ERROR("couch_rep HTTP ~p request failed after 10 retries: ~s",
[Action, Url]);
do_http_request(Url, Action, Headers, JsonBody, Retries) ->
- ?LOG_DEBUG("couch_rep HTTP ~p request: ~p", [Action, Url]),
+ ?LOG_DEBUG("couch_rep HTTP ~p request: ~s", [Action, Url]),
Body =
case JsonBody of
[] ->
@@ -187,9 +360,10 @@ do_http_request(Url, Action, Headers, JsonBody, Retries) ->
_ -> [{transfer_encoding, {chunked, 65535}}]
end ++ [
{content_type, "application/json; charset=utf-8"},
- {max_pipeline_size, 101}
+ {max_pipeline_size, 101},
+ {response_format, binary}
],
- case ibrowse:send_req(Url, Headers, Action, Body, Options) of
+ case ibrowse:send_req(Url, Headers, Action, Body, Options, infinity) of
{ok, Status, ResponseHeaders, ResponseBody} ->
ResponseCode = list_to_integer(Status),
if
@@ -202,139 +376,113 @@ do_http_request(Url, Action, Headers, JsonBody, Retries) ->
ResponseCode >= 400, ResponseCode < 500 ->
?JSON_DECODE(ResponseBody);
ResponseCode == 500 ->
- ?LOG_INFO("retrying couch_rep HTTP ~p request due to 500 error: ~p",
+ ?LOG_INFO("retrying couch_rep HTTP ~p request due to 500 error: ~s",
[Action, Url]),
do_http_request(Url, Action, Headers, JsonBody, Retries - 1)
end;
{error, Reason} ->
- ?LOG_INFO("retrying couch_rep HTTP ~p request due to {error, ~p}: ~p",
+ ?LOG_INFO("retrying couch_rep HTTP ~p request due to {error, ~p}: ~s",
[Action, Reason, Url]),
do_http_request(Url, Action, Headers, JsonBody, Retries - 1)
end.
-save_docs_buffer(DbTarget, DocsBuffer, []) ->
- receive
- {Src, shutdown} ->
- ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false),
- Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
- end;
-save_docs_buffer(DbTarget, DocsBuffer, UpdateSequences) ->
- [NextSeq|Rest] = UpdateSequences,
- receive
- {Src, skip, NextSeq} ->
- Src ! got_it,
- save_docs_buffer(DbTarget, DocsBuffer, Rest);
- {Src, docs, {NextSeq, Docs}} ->
- Src ! got_it,
- case couch_util:should_flush() of
- true ->
- ok = update_docs(DbTarget, lists:reverse(Docs++DocsBuffer), [],
- false),
- save_docs_buffer(DbTarget, [], Rest);
- false ->
- save_docs_buffer(DbTarget, Docs++DocsBuffer, Rest)
- end;
- {Src, shutdown} ->
- ?LOG_ERROR("received shutdown while waiting for more update_seqs", []),
- ok = update_docs(DbTarget, lists:reverse(DocsBuffer), [], false),
- Src ! {done, self(), [{<<"docs_written">>, length(DocsBuffer)}]}
+ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) ->
+ {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post,
+ Headers, true),
+ true = proplists:get_value(<<"ok">>, ResultProps),
+ {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
+ensure_full_commit(Db) ->
+ couch_db:ensure_full_commit(Db).
+
+enum_docs_since(Pid, DbSource, DbTarget, {StartSeq, RevsCount}) ->
+ case get_doc_info_list(DbSource, StartSeq) of
+ [] ->
+ gen_server:call(Pid, {fin, {StartSeq, RevsCount}}, infinity);
+ DocInfoList ->
+ % UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
+ SrcRevsList = lists:map(fun(SrcDocInfo) ->
+ #doc_info{id=Id,
+ rev=Rev,
+ conflict_revs=Conflicts,
+ deleted_conflict_revs=DelConflicts
+ } = SrcDocInfo,
+ SrcRevs = [Rev | Conflicts] ++ DelConflicts,
+ {Id, SrcRevs}
+ end, DocInfoList),
+ {ok, MissingRevs} = get_missing_revs(DbTarget, SrcRevsList),
+
+ %% do we need to check for success here?
+ [ gen_server:call(Pid, {replicate_doc, Info}, infinity)
+ || Info <- MissingRevs ],
+
+ #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
+ RevsCount2 = RevsCount + length(SrcRevsList),
+ gen_server:cast(Pid, {increment_update_seq, LastSeq}),
+
+ enum_docs_since(Pid, DbSource, DbTarget, {LastSeq, RevsCount2})
end.
-pmap(F,List) ->
- [wait_result(Worker) || Worker <- [spawn_worker(self(),F,E) || E <- List]].
-
-spawn_worker(Parent, F, E) ->
- erlang:spawn_monitor(fun() -> Parent ! {self(), F(E)} end).
-
-wait_result({Pid,Ref}) ->
- receive
- {'DOWN', Ref, _, _, normal} -> receive {Pid,Result} -> Result end;
- {'DOWN', Ref, _, _, Reason} -> exit(Reason)
-end.
-
-enum_docs_parallel(DbS, DbT, InfoList) ->
- UpdateSeqs = [Seq || {_, Seq, _, _} <- InfoList],
- SaveDocsPid = spawn_link(fun() -> save_docs_buffer(DbT,[],UpdateSeqs) end),
-
- Stats = pmap(fun({Id, Seq, SrcRevs, MissingRevs}) ->
- case MissingRevs of
- [] ->
- SaveDocsPid ! {self(), skip, Seq},
- receive got_it -> ok end,
- [{missing_checked, length(SrcRevs)}];
- _ ->
- {ok, DocResults} = open_doc_revs(DbS, Id, MissingRevs, [latest]),
-
- % only save successful reads
- Docs = [RevDoc || {ok, RevDoc} <- DocResults],
-
- % include update_seq so we save docs in order
- SaveDocsPid ! {self(), docs, {Seq, Docs}},
- receive got_it -> ok end,
- [{missing_checked, length(SrcRevs)},
- {missing_found, length(MissingRevs)},
- {docs_read, length(Docs)}]
- end
- end, InfoList),
+finalize_response(Source, Target, Context, NewSeqNum, Stats) ->
+ [
+ {start_seq, SeqNum},
+ {history, OldRepHistoryProps},
+ {rep_starttime, ReplicationStartTime},
+ {src_starttime, SrcInstanceStartTime},
+ {tgt_starttime, TgtInstanceStartTime}
+ |_] = Context,
- SaveDocsPid ! {self(), shutdown},
-
- {MissingChecked, MissingFound, DocsRead} = lists:foldl(fun(S, {C, F, R}) ->
- C1 = C + proplists:get_value(missing_checked, S, 0),
- F1 = F + proplists:get_value(missing_found, S, 0),
- R1 = R + proplists:get_value(docs_read, S, 0),
- {C1, F1, R1}
- end, {0, 0, 0}, Stats),
-
- receive
- {done, SaveDocsPid, [{<<"docs_written">>, DocsWritten}]} -> ok
- end,
-
- [ {<<"missing_checked">>, MissingChecked},
- {<<"missing_found">>, MissingFound},
- {<<"docs_read">>, DocsRead},
- {<<"docs_written">>, DocsWritten} ].
+ case NewSeqNum == SeqNum andalso OldRepHistoryProps /= [] of
+ true ->
+ % nothing changed, don't record results
+ {ok, {OldRepHistoryProps}};
+ false ->
+ % commit changes to both src and tgt. The src because if changes
+ % we replicated are lost, we'll record the a seq number of ahead
+ % of what was committed and therefore lose future changes with the
+ % same seq nums.
+ {ok, SrcInstanceStartTime2} = ensure_full_commit(Source),
+ {ok, TgtInstanceStartTime2} = ensure_full_commit(Target),
+
+ RecordSeqNum =
+ if SrcInstanceStartTime2 == SrcInstanceStartTime andalso
+ TgtInstanceStartTime2 == TgtInstanceStartTime ->
+ NewSeqNum;
+ true ->
+ ?LOG_INFO("A server has restarted sinced replication start. "
+ "Not recording the new sequence number to ensure the "
+ "replication is redone and documents reexamined.", []),
+ SeqNum
+ end,
+
+ HistEntries =[
+ {
+ [{<<"start_time">>, list_to_binary(ReplicationStartTime)},
+ {<<"end_time">>, list_to_binary(httpd_util:rfc1123_date())},
+ {<<"start_last_seq">>, SeqNum},
+ {<<"end_last_seq">>, NewSeqNum} | Stats]}
+ | proplists:get_value(<<"history">>, OldRepHistoryProps, [])],
+ % something changed, record results
+ NewRepHistory =
+ {
+ [{<<"session_id">>, couch_util:new_uuid()},
+ {<<"source_last_seq">>, RecordSeqNum},
+ {<<"history">>, lists:sublist(HistEntries, 50)}]},
+ {ok, NewRepHistory}
+ end.
fix_url(UrlBin) ->
Url = binary_to_list(UrlBin),
case lists:last(Url) of
- $/ ->
- Url;
- _ ->
- Url ++ "/"
+ $/ -> Url;
+ _ -> Url ++ "/"
end.
-open_http_db(UrlBin, Options) ->
- Headers = proplists:get_value(headers, Options, {[]}),
- {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}.
-
-open_db(<<"http://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(<<"https://", _/binary>>=Url, Options)->
- open_http_db(Url, Options);
-open_db(DbName, Options)->
- couch_db:open(DbName, Options).
-
-close_db(#http_db{})->
- ok;
-close_db(Db)->
- couch_db:close(Db).
-
get_db_info(#http_db{uri=DbUrl, headers=Headers}) ->
{DbProps} = do_http_request(DbUrl, get, Headers),
{ok, [{list_to_existing_atom(?b2l(K)), V} || {K,V} <- DbProps]};
get_db_info(Db) ->
couch_db:get_db_info(Db).
-
-ensure_full_commit(#http_db{uri=DbUrl, headers=Headers}) ->
- {ResultProps} = do_http_request(DbUrl ++ "_ensure_full_commit", post, Headers, true),
- true = proplists:get_value(<<"ok">>, ResultProps),
- {ok, proplists:get_value(<<"instance_start_time">>, ResultProps)};
-ensure_full_commit(Db) ->
- couch_db:ensure_full_commit(Db).
-
-
get_doc_info_list(#http_db{uri=DbUrl, headers=Headers}, StartSeq) ->
Url = DbUrl ++ "_all_docs_by_seq?limit=100&startkey="
++ integer_to_list(StartSeq),
@@ -361,82 +509,26 @@ get_doc_info_list(DbSource, StartSeq) ->
end, {0, []}),
lists:reverse(DocInfoList).
-enum_docs_since(DbSource, DbTarget, StartSeq, InAcc) ->
- DocInfoList = get_doc_info_list(DbSource, StartSeq),
- case DocInfoList of
- [] ->
- {ok, InAcc};
- _ ->
- UpdateSeqs = [D#doc_info.update_seq || D <- DocInfoList],
- SrcRevsList = lists:map(fun(SrcDocInfo) ->
- #doc_info{id=Id,
- rev=Rev,
- conflict_revs=Conflicts,
- deleted_conflict_revs=DelConflicts
- } = SrcDocInfo,
- SrcRevs = [Rev | Conflicts] ++ DelConflicts,
- {Id, SrcRevs}
- end, DocInfoList),
- {ok, MissingRevsList} = get_missing_revs(DbTarget, SrcRevsList),
- InfoList = lists:map(fun({{Id, SrcRevs}, Seq}) ->
- MissingRevs = proplists:get_value(Id, MissingRevsList, []),
- {Id, Seq, SrcRevs, MissingRevs}
- end, lists:zip(SrcRevsList, UpdateSeqs)),
- Stats = enum_docs_parallel(DbSource, DbTarget, InfoList),
- OldStats = element(2, InAcc),
- TotalStats = [
- {<<"missing_checked">>,
- proplists:get_value(<<"missing_checked">>, OldStats, 0) +
- proplists:get_value(<<"missing_checked">>, Stats, 0)},
- {<<"missing_found">>,
- proplists:get_value(<<"missing_found">>, OldStats, 0) +
- proplists:get_value(<<"missing_found">>, Stats, 0)},
- {<<"docs_read">>,
- proplists:get_value(<<"docs_read">>, OldStats, 0) +
- proplists:get_value(<<"docs_read">>, Stats, 0)},
- {<<"docs_written">>,
- proplists:get_value(<<"docs_written">>, OldStats, 0) +
- proplists:get_value(<<"docs_written">>, Stats, 0)}
- ],
-
- #doc_info{update_seq=LastSeq} = lists:last(DocInfoList),
- enum_docs_since(DbSource, DbTarget, LastSeq, {LastSeq, TotalStats})
- end.
-
get_missing_revs(#http_db{uri=DbUrl, headers=Headers}, DocIdRevsList) ->
{ResponseMembers} = do_http_request(DbUrl ++ "_missing_revs", post, Headers,
{DocIdRevsList}),
- {DocMissingRevsList} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
- {ok, DocMissingRevsList};
+ {MissingRevs} = proplists:get_value(<<"missing_revs">>, ResponseMembers),
+ {ok, MissingRevs};
get_missing_revs(Db, DocId) ->
couch_db:get_missing_revs(Db, DocId).
+open_http_db(UrlBin, Options) ->
+ Headers = proplists:get_value(headers, Options, {[]}),
+ {ok, #http_db{uri=fix_url(UrlBin), headers=Headers}}.
+
+open_db(<<"http://", _/binary>>=Url, Options)->
+ open_http_db(Url, Options);
+open_db(<<"https://", _/binary>>=Url, Options)->
+ open_http_db(Url, Options);
+open_db(DbName, Options)->
+ couch_db:open(DbName, Options).
-update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, Options) ->
- [] = Options,
- Url = DbUrl ++ url_encode(DocId),
- {ResponseMembers} = do_http_request(Url, put, Headers,
- couch_doc:to_json_obj(Doc, [revs,attachments])),
- RevId = proplists:get_value(<<"_rev">>, ResponseMembers),
- {ok, RevId};
-update_doc(Db, Doc, Options) ->
- couch_db:update_doc(Db, Doc, Options).
-
-update_docs(_, [], _, _) ->
- ok;
-update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) ->
- JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
- {Returned} =
- do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
- {[{new_edits, NewEdits}, {docs, JsonDocs}]}),
- true = proplists:get_value(<<"ok">>, Returned),
- ok;
-update_docs(Db, Docs, Options, NewEdits) ->
- couch_db:update_docs(Db, Docs, Options, NewEdits).
-
-
-open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) ->
- [] = Options,
+open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, []) ->
case do_http_request(DbUrl ++ url_encode(DocId), get, Headers) of
{[{<<"error">>, ErrId}, {<<"reason">>, Reason}]} ->
{couch_util:to_existing_atom(ErrId), Reason};
@@ -446,16 +538,8 @@ open_doc(#http_db{uri=DbUrl, headers=Headers}, DocId, Options) ->
open_doc(Db, DocId, Options) ->
couch_db:open_doc(Db, DocId, Options).
-
-open_doc_revs(#http_db{uri=DbUrl, headers=Headers}, DocId, Revs, Options) ->
- QueryOptionStrs =
- lists:map(fun(latest) ->
- % latest is only option right now
- "latest=true"
- end, Options),
-
- BaseUrl = DbUrl ++ url_encode(DocId) ++ "?" ++ couch_util:implode(
- ["revs=true", "attachments=true"] ++ QueryOptionStrs, "&"),
+open_doc_revs(#http_db{uri=DbUrl, headers=Headers} = DbS, DocId, Revs, _Opts) ->
+ BaseUrl = DbUrl ++ url_encode(DocId) ++ "?revs=true&latest=true",
%% MochiWeb expects URLs < 8KB long, so maybe split into multiple requests
MaxN = trunc((8192 - length(BaseUrl))/14),
@@ -477,15 +561,66 @@ open_doc_revs(#http_db{uri=DbUrl, headers=Headers}, DocId, Revs, Options) ->
lists:flatten(?JSON_ENCODE(lists:reverse(Rest))), get, Headers)
end,
- Results =
- lists:map(
- fun({[{<<"missing">>, Rev}]}) ->
- {{not_found, missing}, Rev};
- ({[{<<"ok">>, JsonDoc}]}) ->
- {ok, couch_doc:from_json_obj(JsonDoc)}
- end, JsonResults),
+ Results =
+ lists:map(fun({[{<<"missing">>, Rev}]}) ->
+ {{not_found, missing}, Rev};
+ ({[{<<"ok">>, JsonDoc}]}) ->
+ #doc{id=Id, attachments=Attach} = Doc = couch_doc:from_json_obj(JsonDoc),
+ Attach2 = [attachment_stub_converter(DbS,Id,A) || A <- Attach],
+ {ok, Doc#doc{attachments=Attach2}}
+ end, JsonResults),
{ok, Results};
open_doc_revs(Db, DocId, Revs, Options) ->
couch_db:open_doc_revs(Db, DocId, Revs, Options).
+update_docs(_, [], _, _) ->
+ ok;
+update_docs(#http_db{uri=DbUrl, headers=Headers}, Docs, [], NewEdits) ->
+ JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
+ {Returned} =
+ do_http_request(DbUrl ++ "_bulk_docs", post, Headers,
+ {[{new_edits, NewEdits}, {docs, JsonDocs}]}),
+ true = proplists:get_value(<<"ok">>, Returned),
+ ok;
+update_docs(Db, Docs, Options, NewEdits) ->
+ couch_db:update_docs(Db, Docs, Options, NewEdits).
+
+update_local_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc, []) ->
+ Url = DbUrl ++ url_encode(DocId),
+ {ResponseMembers} = do_http_request(Url, put, Headers,
+ couch_doc:to_json_obj(Doc, [revs,attachments])),
+ RevId = proplists:get_value(<<"_rev">>, ResponseMembers),
+ {ok, RevId};
+update_local_doc(Db, Doc, Options) ->
+ couch_db:update_doc(Db, Doc, Options).
+up_to_date(#http_db{}, _Seq) ->
+ true;
+up_to_date(Source, Seq) ->
+ {ok, NewDb} = couch_db:open(Source#db.name, []),
+ T = NewDb#db.update_seq == Seq,
+ couch_db:close(NewDb),
+ T.
+
+url_encode(Bin) when is_binary(Bin) ->
+ url_encode(binary_to_list(Bin));
+url_encode([H|T]) ->
+ if
+ H >= $a, $z >= H ->
+ [H|url_encode(T)];
+ H >= $A, $Z >= H ->
+ [H|url_encode(T)];
+ H >= $0, $9 >= H ->
+ [H|url_encode(T)];
+ H == $_; H == $.; H == $-; H == $: ->
+ [H|url_encode(T)];
+ true ->
+ case lists:flatten(io_lib:format("~.16.0B", [H])) of
+ [X, Y] ->
+ [$%, X, Y | url_encode(T)];
+ [X] ->
+ [$%, $0, X | url_encode(T)]
+ end
+ end;
+url_encode([]) ->
+ [].
diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl
index 942de092..15867ad7 100644
--- a/src/couchdb/couch_server_sup.erl
+++ b/src/couchdb/couch_server_sup.erl
@@ -133,6 +133,12 @@ start_primary_services() ->
brutal_kill,
worker,
[couch_log]},
+ {couch_replication_supervisor,
+ {couch_rep_sup, start_link, []},
+ permanent,
+ infinity,
+ supervisor,
+ [couch_rep_sup]},
{couch_task_status,
{couch_task_status, start_link, []},
permanent,
@@ -150,7 +156,9 @@ start_primary_services() ->
permanent,
brutal_kill,
supervisor,
- dynamic}]}).
+ dynamic}
+ ]
+ }).
start_secondary_services() ->
DaemonChildSpecs = [
diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl
index d0f0c0b6..21d6eb4c 100644
--- a/src/couchdb/couch_util.erl
+++ b/src/couchdb/couch_util.erl
@@ -184,19 +184,18 @@ should_flush() ->
should_flush(?FLUSH_MAX_MEM).
should_flush(MemThreshHold) ->
- case process_info(self(), memory) of
- {memory, Mem} when Mem > 2*MemThreshHold ->
+ {memory, ProcMem} = process_info(self(), memory),
+ BinMem = lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+ 0, element(2,process_info(self(), binary))),
+ if ProcMem+BinMem > 2*MemThreshHold ->
garbage_collect(),
- case process_info(self(), memory) of
- {memory, Mem} when Mem > MemThreshHold ->
+ {memory, ProcMem2} = process_info(self(), memory),
+ BinMem2 = lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
+ 0, element(2,process_info(self(), binary))),
+ if ProcMem2+BinMem2 > MemThreshHold ->
true;
- _ ->
- false
- end;
- _ ->
- false
- end.
-
+ true -> false end;
+ true -> false end.
%%% Purpose : Base 64 encoding and decoding.
diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl
index 4e6404ad..3390e58a 100644
--- a/src/ibrowse/ibrowse.erl
+++ b/src/ibrowse/ibrowse.erl
@@ -192,6 +192,8 @@ send_req(Url, Headers, Method, Body) ->
%% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
%% optionList() = [option()]
%% option() = {max_sessions, integer()} |
+%% {response_format,response_format()}|
+%% {stream_chunk_size, integer()} |
%% {max_pipeline_size, integer()} |
%% {trace, boolean()} |
%% {is_ssl, boolean()} |
@@ -219,7 +221,7 @@ send_req(Url, Headers, Method, Body) ->
%% ChunkSize = integer()
%% srtf() = boolean() | filename()
%% filename() = string()
-%%
+%% response_format() = list | binary
send_req(Url, Headers, Method, Body, Options) ->
send_req(Url, Headers, Method, Body, Options, 30000).
@@ -230,7 +232,8 @@ send_req(Url, Headers, Method, Body, Options) ->
send_req(Url, Headers, Method, Body, Options, Timeout) ->
case catch parse_url(Url) of
#url{host = Host,
- port = Port} = Parsed_url ->
+ port = Port,
+ protocol = Protocol} = Parsed_url ->
Lb_pid = case ets:lookup(ibrowse_lb, {Host, Port}) of
[] ->
get_lb_pid(Parsed_url);
@@ -241,9 +244,10 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Max_pipeline_size = get_max_pipeline_size(Host, Port, Options),
Options_1 = merge_options(Host, Port, Options),
{SSLOptions, IsSSL} =
- case get_value(is_ssl, Options_1, false) of
+ case (Protocol == https) orelse
+ get_value(is_ssl, Options_1, false) of
false -> {[], false};
- true -> {get_value(ssl_options, Options_1), true}
+ true -> {get_value(ssl_options, Options_1, []), true}
end,
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
@@ -316,6 +320,13 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
{error, req_timedout};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
+ {ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
+ case get_value(response_format, Options, list) of
+ list ->
+ {ok, St_code, Headers, binary_to_list(Body)};
+ binary ->
+ Ret
+ end;
Ret ->
Ret
end.
diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl
index 9a0e4d3b..9455bc20 100644
--- a/src/ibrowse/ibrowse_http_client.erl
+++ b/src/ibrowse/ibrowse_http_client.erl
@@ -38,19 +38,23 @@
-include("ibrowse.hrl").
--record(state, {host, port,
+-record(state, {host, port,
use_proxy = false, proxy_auth_digest,
- ssl_options = [], is_ssl = false, socket,
- reqs=queue:new(), cur_req, status=idle, http_status_code,
- reply_buffer=[], rep_buf_size=0, recvd_headers=[],
+ ssl_options = [], is_ssl = false, socket,
+ reqs=queue:new(), cur_req, status=idle, http_status_code,
+ reply_buffer=[], rep_buf_size=0, streamed_size = 0,
+ recvd_headers=[],
is_closing, send_timer, content_length,
- deleted_crlf = false, transfer_encoding, chunk_size,
- chunks=[], lb_ets_tid, cur_pipeline_size = 0}).
+ deleted_crlf = false, transfer_encoding, chunk_size,
+ chunks=[], lb_ets_tid, cur_pipeline_size = 0
+ }).
-record(request, {url, method, options, from,
stream_to, req_id,
- save_response_to_file = false,
- tmp_file_name, tmp_file_fd}).
+ stream_chunk_size,
+ save_response_to_file = false,
+ tmp_file_name, tmp_file_fd,
+ response_format}).
-import(ibrowse_lib, [
parse_url/1,
@@ -60,6 +64,8 @@
do_trace/2
]).
+-define(DEFAULT_STREAM_CHUNK_SIZE, 1024*1024).
+
%%====================================================================
%% External functions
%%====================================================================
@@ -127,15 +133,16 @@ init({Host, Port}) ->
%%--------------------------------------------------------------------
%% Received a request when the remote server has already sent us a
%% Connection: Close header
-handle_call({send_req, _},
+handle_call({send_req, _},
_From,
#state{is_closing=true}=State) ->
{reply, {error, connection_closing}, State};
-handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
+handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
From,
#state{socket=undefined,
host=Host, port=Port}=State) ->
+ Resp_format = get_value(response_format, Options, list),
{Host_1, Port_1, State_1} =
case get_value(proxy_host, Options, false) of
false ->
@@ -151,12 +158,14 @@ handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
StreamTo = get_value(stream_to, Options, undefined),
ReqId = make_req_id(),
SaveResponseToFile = get_value(save_response_to_file, Options, false),
- NewReq = #request{url=Url,
+ NewReq = #request{url=Url,
method=Method,
stream_to=StreamTo,
- options=Options,
+ options=Options,
req_id=ReqId,
save_response_to_file = SaveResponseToFile,
+ stream_chunk_size = get_stream_chunk_size(Options),
+ response_format = Resp_format,
from=From},
Reqs = queue:in(NewReq, State#state.reqs),
State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}),
@@ -208,15 +217,18 @@ handle_call({send_req, {Url, Headers, Method,
From,
#state{socket=Sock, status=Status, reqs=Reqs}=State) ->
do_trace("Recvd request in connected state. Status -> ~p NumPending: ~p~n", [Status, length(queue:to_list(Reqs))]),
+ Resp_format = get_value(response_format, Options, list),
StreamTo = get_value(stream_to, Options, undefined),
SaveResponseToFile = get_value(save_response_to_file, Options, false),
ReqId = make_req_id(),
- NewReq = #request{url=Url,
+ NewReq = #request{url=Url,
stream_to=StreamTo,
method=Method,
- options=Options,
+ options=Options,
req_id=ReqId,
save_response_to_file = SaveResponseToFile,
+ stream_chunk_size = get_stream_chunk_size(Options),
+ response_format = Resp_format,
from=From},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of
@@ -359,14 +371,14 @@ handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
handle_sock_data(Data, #state{status=get_body, content_length=CL,
http_status_code = StatCode,
- recvd_headers=Headers,
+ recvd_headers=Headers,
chunk_size=CSz, socket=Sock}=State) ->
case (CL == undefined) and (CSz == undefined) of
true ->
case accumulate_response(Data, State) of
{error, Reason} ->
shutting_down(State),
- fail_pipelined_requests(State,
+ fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
State_1 ->
@@ -377,7 +389,7 @@ handle_sock_data(Data, #state{status=get_body, content_length=CL,
case parse_11_response(Data, State) of
{error, Reason} ->
shutting_down(State),
- fail_pipelined_requests(State,
+ fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
stop ->
@@ -433,14 +445,27 @@ accumulate_response(Data, #state{cur_req = #request{save_response_to_file = Save
accumulate_response([], State) ->
State;
accumulate_response(Data, #state{reply_buffer = RepBuf,
+ rep_buf_size = RepBufSize,
+ streamed_size = Streamed_size,
cur_req = CurReq}=State) ->
- #request{stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{stream_to=StreamTo, req_id=ReqId,
+ stream_chunk_size = Stream_chunk_size,
+ response_format = Response_format} = CurReq,
+ RepBuf_1 = [Data | RepBuf],
+ New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
- State#state{reply_buffer = [Data | RepBuf]};
+ State#state{reply_buffer = RepBuf_1};
+ _ when New_data_size < Stream_chunk_size ->
+ State#state{reply_buffer = RepBuf_1};
_ ->
- do_interim_reply(StreamTo, ReqId, Data),
- State
+ {Stream_chunk, Rem_data} = split_list_at(flatten(lists:reverse(RepBuf_1)), Stream_chunk_size),
+ do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
+ accumulate_response(
+ Rem_data,
+ State#state{
+ reply_buffer = [],
+ streamed_size = Streamed_size + Stream_chunk_size})
end.
make_tmp_filename() ->
@@ -463,7 +488,7 @@ handle_sock_closed(#state{status=get_header}=State) ->
handle_sock_closed(#state{cur_req=undefined} = State) ->
shutting_down(State);
-%% We check for IsClosing because this the server could have sent a
+%% We check for IsClosing because this the server could have sent a
%% Connection-Close header and has closed the socket to indicate end
%% of response. There maybe requests pipelined which need a response.
handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
@@ -471,18 +496,18 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
cur_req=#request{tmp_file_name=TmpFilename,
tmp_file_fd=Fd} = CurReq,
status=get_body, recvd_headers=Headers}=State) ->
- #request{from=From, stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{from=From, stream_to=StreamTo, req_id=ReqId,
+ response_format = Resp_format} = CurReq,
case IsClosing of
true ->
{_, Reqs_1} = queue:out(Reqs),
case TmpFilename of
undefined ->
- do_reply(State, From, StreamTo, ReqId,
- {ok, SC, Headers,
- lists:flatten(lists:reverse(Buf))});
+ do_reply(State, From, StreamTo, ReqId, Resp_format,
+ {ok, SC, Headers, lists:reverse(Buf)});
_ ->
file:close(Fd),
- do_reply(State, From, StreamTo, ReqId,
+ do_reply(State, From, StreamTo, ReqId, Resp_format,
{ok, SC, Headers, {file, TmpFilename}})
end,
do_error_reply(State#state{reqs = Reqs_1}, connection_closed),
@@ -493,9 +518,13 @@ handle_sock_closed(#state{reply_buffer=Buf, reqs=Reqs, http_status_code=SC,
end.
do_connect(Host, Port, _Options, #state{is_ssl=true, ssl_options=SSLOptions}, Timeout) ->
- ssl:connect(Host, Port, [{nodelay, true}, {active, false} | SSLOptions], Timeout);
+ ssl:connect(Host, Port,
+ [{nodelay, true}, {active, false} | SSLOptions],
+ Timeout);
do_connect(Host, Port, _Options, _State, Timeout) ->
- gen_tcp:connect(Host, Port, [{nodelay, true}, {active, false}], Timeout).
+ gen_tcp:connect(Host, Port,
+ [{nodelay, true}, {active, false}],
+ Timeout).
do_send(Sock, Req, true) -> ssl:send(Sock, Req);
do_send(Sock, Req, false) -> gen_tcp:send(Sock, Req).
@@ -542,7 +571,7 @@ check_ssl_options(Options, State) ->
send_req_1(#url{abspath = AbsPath,
host = Host,
- port = Port,
+ port = Port,
path = RelPath} = Url,
Headers, Method, Body, Options, Sock, State) ->
Headers_1 = add_auth_headers(Url, Options, Headers, State),
@@ -555,10 +584,10 @@ send_req_1(#url{abspath = AbsPath,
{value, {_, Host_h_val}} ->
Host_h_val
end,
- {Req, Body_1} = make_request(Method,
+ {Req, Body_1} = make_request(Method,
[{"Host", HostHeaderValue} | Headers_1],
AbsPath, RelPath, Body, Options, State#state.use_proxy),
- case get(my_trace_flag) of
+ case get(my_trace_flag) of
true ->
%%Avoid the binary operations if trace is not on...
NReq = binary_to_list(list_to_binary(Req)),
@@ -569,7 +598,7 @@ send_req_1(#url{abspath = AbsPath,
end,
SndRes = case do_send(Sock, Req, State#state.is_ssl) of
ok -> do_send_body(Sock, Body_1, State#state.is_ssl);
- Err ->
+ Err ->
io:format("Err: ~p~n", [Err]),
Err
end,
@@ -577,9 +606,9 @@ send_req_1(#url{abspath = AbsPath,
SndRes.
add_auth_headers(#url{username = User,
- password = UPw},
+ password = UPw},
Options,
- Headers,
+ Headers,
#state{use_proxy = UseProxy,
proxy_auth_digest = ProxyAuthDigest}) ->
Headers_1 = case User of
@@ -601,7 +630,7 @@ add_auth_headers(#url{username = User,
true ->
[{"Proxy-Authorization", ["Basic ", ProxyAuthDigest]} | Headers_1]
end.
-
+
http_auth_digest([], []) ->
[];
http_auth_digest(Username, Password) ->
@@ -617,7 +646,7 @@ encode_base64([A,B,C|Ls]) ->
encode_base64_do(A,B,C, Ls).
encode_base64_do(A,B,C, Rest) ->
BB = (A bsl 16) bor (B bsl 8) bor C,
- [e(BB bsr 18), e((BB bsr 12) band 63),
+ [e(BB bsr 18), e((BB bsr 12) band 63),
e((BB bsr 6) band 63), e(BB band 63)|encode_base64(Rest)].
e(X) when X >= 0, X < 26 -> X+65;
@@ -643,12 +672,12 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, UseProxy) ->
_ ->
Headers
end,
- {Headers_2, Body_1} =
+ {Headers_2, Body_1} =
case get_value(transfer_encoding, Options, false) of
false ->
{Headers_1, Body};
{chunked, ChunkSize} ->
- {[{X, Y} || {X, Y} <- Headers_1,
+ {[{X, Y} || {X, Y} <- Headers_1,
X /= "Content-Length",
X /= "content-length",
X /= content_length] ++
@@ -659,7 +688,7 @@ make_request(Method, Headers, AbsPath, RelPath, Body, Options, UseProxy) ->
Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
true ->
AbsPath;
- false ->
+ false ->
RelPath
end,
{[method(Method), " ", Uri, " ", HttpVsn, crnl(), Headers_3, crnl()], Body_1}.
@@ -732,7 +761,7 @@ parse_response(_Data, #state{cur_req = undefined}=State) ->
parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
cur_req=CurReq}=State) ->
#request{from=From, stream_to=StreamTo, req_id=ReqId,
- method=Method} = CurReq,
+ method=Method, response_format = Resp_format} = CurReq,
MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
case scan_header(Data, Acc) of
{yes, Headers, Data_1} ->
@@ -749,7 +778,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
false ->
ok
end,
- State_1 = State#state{recvd_headers=Headers_1, status=get_body,
+ State_1 = State#state{recvd_headers=Headers_1, status=get_body,
+ reply_buffer = [],
http_status_code=StatCode, is_closing=IsClosing},
put(conn_close, ConnClose),
TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")),
@@ -757,7 +787,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
_ when Method == head ->
{_, Reqs_1} = queue:out(Reqs),
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
- State_1_1 = do_reply(State_1, From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}),
+ State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
+ {ok, StatCode, Headers_1, []}),
cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
@@ -776,7 +807,8 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
%% RFC2616 - Sec 4.4
{_, Reqs_1} = queue:out(Reqs),
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
- State_1_1 = do_reply(State_1, From, StreamTo, ReqId, {ok, StatCode, Headers_1, []}),
+ State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
+ {ok, StatCode, Headers_1, []}),
cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
State_2 = reset_state(State_1_1),
State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
@@ -788,7 +820,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
chunk_size=chunk_start,
reply_buffer=[], chunks=[]}) of
{error, Reason} ->
- fail_pipelined_requests(State_1,
+ fail_pipelined_requests(State_1,
{error, {Reason,
{stat_code, StatCode}, Headers_1}}),
{error, Reason};
@@ -800,7 +832,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
State_1#state{reply_buffer=[Data_1]};
undefined ->
- fail_pipelined_requests(State_1,
+ fail_pipelined_requests(State_1,
{error, {content_length_undefined,
{stat_code, StatCode}, Headers}}),
{error, content_length_undefined};
@@ -814,7 +846,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
content_length=V_1},
case parse_11_response(Data_1, State_2) of
{error, Reason} ->
- fail_pipelined_requests(State_1,
+ fail_pipelined_requests(State_1,
{error, {Reason,
{stat_code, StatCode}, Headers_1}}),
{error, Reason};
@@ -822,7 +854,7 @@ parse_response(Data, #state{reply_buffer=Acc, reqs=Reqs,
State_3
end;
_ ->
- fail_pipelined_requests(State_1,
+ fail_pipelined_requests(State_1,
{error, {content_length_undefined,
{stat_code, StatCode}, Headers}}),
{error, content_length_undefined}
@@ -843,25 +875,28 @@ is_connection_closing("HTTP/1.0", "false") -> true;
is_connection_closing(_, _) -> false.
%% This clause determines the chunk size when given data from the beginning of the chunk
-parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked,
+parse_11_response(DataRecvd,
+ #state{transfer_encoding=chunked,
chunk_size=chunk_start,
cur_req=CurReq,
- reply_buffer=Buf}=State) ->
+ reply_buffer=Buf
+ }=State) ->
case scan_crlf(DataRecvd, Buf) of
{yes, ChunkHeader, Data_1} ->
case parse_chunk_header(ChunkHeader) of
{error, Reason} ->
{error, Reason};
ChunkSize ->
- #request{stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{stream_to=StreamTo, req_id=ReqId,
+ response_format = Response_format} = CurReq,
%%
%% Do we have to preserve the chunk encoding when streaming?
%%
- do_interim_reply(StreamTo, ReqId, {chunk_start, ChunkSize}),
+ do_interim_reply(StreamTo, Response_format,
+ ReqId, {chunk_start, ChunkSize}),
RemLen = length(Data_1),
do_trace("Determined chunk size: ~p. Already recvd: ~p~n", [ChunkSize, RemLen]),
- parse_11_response(Data_1, State#state{rep_buf_size=0,
+ parse_11_response(Data_1, State#state{rep_buf_size=0,
reply_buffer=[],
deleted_crlf=true,
chunk_size=ChunkSize})
@@ -871,29 +906,34 @@ parse_11_response(DataRecvd,
end;
%% This clause is there to remove the CRLF between two chunks
-%%
-parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked,
+%%
+parse_11_response(DataRecvd,
+ #state{transfer_encoding=chunked,
chunk_size=tbd,
chunks = Chunks,
cur_req=CurReq,
reply_buffer=Buf}=State) ->
case scan_crlf(DataRecvd, Buf) of
{yes, _, NextChunk} ->
- #request{stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{stream_to=StreamTo, req_id=ReqId,
+ response_format = Response_format} = CurReq,
%%
%% Do we have to preserve the chunk encoding when streaming?
%%
State_1 = State#state{chunk_size=chunk_start,
- rep_buf_size=0,
+ rep_buf_size=0,
reply_buffer=[],
deleted_crlf=true},
State_2 = case StreamTo of
undefined ->
State_1#state{chunks = [Buf | Chunks]};
- _ ->
- do_interim_reply(StreamTo, ReqId, chunk_end),
- State_1
+ _ ->
+ %% Flush out all buffered data as chunk is ending
+ do_interim_reply(StreamTo, Response_format, ReqId,
+ lists:reverse([Buf | Chunks])),
+ do_interim_reply(StreamTo, Response_format,
+ ReqId, chunk_end),
+ State_1#state{chunks = [], streamed_size = 0}
end,
parse_11_response(NextChunk, State_2);
{no, Data_1} ->
@@ -901,26 +941,27 @@ parse_11_response(DataRecvd,
end;
%% This clause deals with the end of a chunked transfer
-parse_11_response(DataRecvd,
- #state{transfer_encoding=chunked, chunk_size=0,
+parse_11_response(DataRecvd,
+ #state{transfer_encoding=chunked, chunk_size=0,
cur_req=CurReq,
deleted_crlf = DelCrlf,
reply_buffer=Trailer, reqs=Reqs}=State) ->
do_trace("Detected end of chunked transfer...~n", []),
DataRecvd_1 = case DelCrlf of
- false ->
+ false ->
DataRecvd;
true ->
[$\r, $\n | DataRecvd]
end,
- #request{stream_to=StreamTo, req_id=ReqId} = CurReq,
+ #request{stream_to=StreamTo, req_id=ReqId,
+ response_format = Response_format} = CurReq,
case scan_header(DataRecvd_1, Trailer) of
{yes, _TEHeaders, Rem} ->
{_, Reqs_1} = queue:out(Reqs),
%%
- %% Do we have to preserve the chunk encoding when streaming?
+ %% Do we have to preserve the chunk encoding when streaming? Nope.
%%
- do_interim_reply(StreamTo, ReqId, chunk_end),
+ do_interim_reply(StreamTo, Response_format, ReqId, chunk_end),
State_1 = handle_response(CurReq, State#state{reqs=Reqs_1}),
parse_response(Rem, reset_state(State_1));
{no, Rem} ->
@@ -928,7 +969,7 @@ parse_11_response(DataRecvd,
end;
%% This clause extracts a chunk, given the size.
-parse_11_response(DataRecvd,
+parse_11_response(DataRecvd,
#state{transfer_encoding=chunked, chunk_size=CSz,
rep_buf_size=RepBufSz}=State) ->
NeedBytes = CSz - RepBufSz,
@@ -952,12 +993,12 @@ parse_11_response(DataRecvd,
parse_11_response(RemData, State_2)
end;
false ->
- accumulate_response(DataRecvd, State#state{rep_buf_size=RepBufSz + DataLen})
+ accumulate_response(DataRecvd, State#state{rep_buf_size=(RepBufSz + DataLen)})
end;
%% This clause to extract the body when Content-Length is specified
-parse_11_response(DataRecvd,
- #state{content_length=CL, rep_buf_size=RepBufSz,
+parse_11_response(DataRecvd,
+ #state{content_length=CL, rep_buf_size=RepBufSz,
reqs=Reqs}=State) ->
NeedBytes = CL - RepBufSz,
DataLen = length(DataRecvd),
@@ -970,11 +1011,12 @@ parse_11_response(DataRecvd,
State_3 = reset_state(State_2),
parse_response(Rem, State_3);
false ->
- accumulate_response(DataRecvd, State#state{rep_buf_size=RepBufSz+DataLen})
+ accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
end.
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
- save_response_to_file = SaveResponseToFile,
+ response_format = Resp_format,
+ save_response_to_file = SaveResponseToFile,
tmp_file_name = TmpFilename,
tmp_file_fd = Fd
},
@@ -986,9 +1028,9 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false ->
Body = case TEnc of
chunked ->
- lists:flatten(lists:reverse(Chunks));
+ lists:reverse(Chunks);
_ ->
- lists:flatten(lists:reverse(RepBuf))
+ lists:reverse(RepBuf)
end,
State_1 = set_cur_request(State),
file:close(Fd),
@@ -998,32 +1040,38 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
_ ->
{file, TmpFilename}
end,
- State_2 = do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, ResponseBody}),
+ State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
+ {ok, SCode, RespHeaders, ResponseBody}),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
State_2;
-handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId},
+handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
+ response_format = Resp_format},
#state{http_status_code=SCode, recvd_headers=RespHeaders,
reply_buffer=RepBuf, transfer_encoding=TEnc,
chunks=Chunks, send_timer=ReqTimer}=State) ->
Body = case TEnc of
chunked ->
- lists:flatten(lists:reverse(Chunks));
+ lists:reverse(Chunks);
_ ->
- lists:flatten(lists:reverse(RepBuf))
+ lists:reverse(RepBuf)
end,
- State_1 = set_cur_request(State),
- case get(conn_close) of
+%% State_1 = set_cur_request(State),
+ State_1 = case get(conn_close) of
"close" ->
- do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}),
+ do_reply(State, From, StreamTo, ReqId, Resp_format,
+ {ok, SCode, RespHeaders, Body}),
exit(normal);
_ ->
- State_2 = do_reply(State_1, From, StreamTo, ReqId, {ok, SCode, RespHeaders, Body}),
+ State_1_1 = do_reply(State, From, StreamTo, ReqId, Resp_format,
+ {ok, SCode, RespHeaders, Body}),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
- State_2
- end.
+ State_1_1
+ end,
+ set_cur_request(State_1).
reset_state(State) ->
- State#state{status=get_header, rep_buf_size=0,content_length=undefined,
+ State#state{status=get_header, rep_buf_size=0, streamed_size = 0,
+ content_length=undefined,
reply_buffer=[], chunks=[], recvd_headers=[], deleted_crlf=false,
http_status_code=undefined, chunk_size=undefined, transfer_encoding=undefined}.
@@ -1063,18 +1111,18 @@ parse_headers_1(String) ->
parse_headers_1(String, [], []).
parse_headers_1([$\n, H |T], [$\r | L], Acc) when H == 32;
- H == $\t ->
+ H == $\t ->
parse_headers_1(lists:dropwhile(fun(X) ->
is_whitespace(X)
end, T), [32 | L], Acc);
-parse_headers_1([$\n|T], [$\r | L], Acc) ->
+parse_headers_1([$\n|T], [$\r | L], Acc) ->
case parse_header(lists:reverse(L)) of
invalid ->
parse_headers_1(T, [], Acc);
NewHeader ->
parse_headers_1(T, [], [NewHeader | Acc])
end;
-parse_headers_1([H|T], L, Acc) ->
+parse_headers_1([H|T], L, Acc) ->
parse_headers_1(T, [H|L], Acc);
parse_headers_1([], [], Acc) ->
lists:reverse(Acc);
@@ -1185,7 +1233,7 @@ parse_chunk_header([H | T], Acc) ->
parse_chunk_header([], Acc) ->
hexlist_to_integer(lists:reverse(Acc)).
-is_whitespace(32) -> true;
+is_whitespace($\s) -> true;
is_whitespace($\r) -> true;
is_whitespace($\n) -> true;
is_whitespace($\t) -> true;
@@ -1197,36 +1245,62 @@ send_async_headers(_ReqId, undefined, _StatCode, _Headers) ->
send_async_headers(ReqId, StreamTo, StatCode, Headers) ->
catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers}.
-do_reply(State, From, undefined, _, Msg) ->
+format_response_data(Resp_format, Body) ->
+ case Resp_format of
+ list when is_list(Body) ->
+ flatten(Body);
+ binary when is_list(Body) ->
+ list_to_binary(Body);
+ _ ->
+ %% This is to cater for sending messages such as
+ %% {chunk_start, _}, chunk_end etc
+ Body
+ end.
+
+do_reply(State, From, undefined, _, Resp_format, {ok, St_code, Headers, Body}) ->
+ Msg_1 = {ok, St_code, Headers, format_response_data(Resp_format, Body)},
+ gen_server:reply(From, Msg_1),
+ dec_pipeline_counter(State);
+do_reply(State, From, undefined, _, _, Msg) ->
gen_server:reply(From, Msg),
dec_pipeline_counter(State);
-do_reply(State, _From, StreamTo, ReqId, {ok, _, _, _}) ->
+do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
State_1 = dec_pipeline_counter(State),
+ case Body of
+ [] ->
+ ok;
+ _ ->
+ Body_1 = format_response_data(Resp_format, Body),
+ catch StreamTo ! {ibrowse_async_response, ReqId, Body_1}
+ end,
catch StreamTo ! {ibrowse_async_response_end, ReqId},
State_1;
-do_reply(State, _From, StreamTo, ReqId, Msg) ->
+do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
State_1 = dec_pipeline_counter(State),
- catch StreamTo ! {ibrowse_async_response, ReqId, Msg},
+ Msg_1 = format_response_data(Resp_format, Msg),
+ catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1},
State_1.
-do_interim_reply(undefined, _ReqId, _Msg) ->
+do_interim_reply(undefined, _, _ReqId, _Msg) ->
ok;
-do_interim_reply(StreamTo, ReqId, Msg) ->
- catch StreamTo ! {ibrowse_async_response, ReqId, Msg}.
+do_interim_reply(StreamTo, Response_format, ReqId, Msg) ->
+ Msg_1 = format_response_data(Response_format, Msg),
+ catch StreamTo ! {ibrowse_async_response, ReqId, Msg_1}.
do_error_reply(#state{reqs = Reqs} = State, Err) ->
ReqList = queue:to_list(Reqs),
- lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId}) ->
- do_reply(State, From, StreamTo, ReqId, {error, Err})
+ lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId,
+ response_format = Resp_format}) ->
+ do_reply(State, From, StreamTo, ReqId, Resp_format, {error, Err})
end, ReqList).
fail_pipelined_requests(#state{reqs = Reqs, cur_req = CurReq} = State, Reply) ->
{_, Reqs_1} = queue:out(Reqs),
- #request{from=From, stream_to=StreamTo, req_id=ReqId} = CurReq,
- do_reply(State, From, StreamTo, ReqId, Reply),
+ #request{from=From, stream_to=StreamTo, req_id=ReqId,
+ response_format = Resp_format} = CurReq,
+ do_reply(State, From, StreamTo, ReqId, Resp_format, Reply),
do_error_reply(State#state{reqs = Reqs_1}, previous_request_failed).
-
split_list_at(List, N) ->
split_list_at(List, N, []).
split_list_at([], _, Acc) ->
@@ -1271,7 +1345,7 @@ cancel_timer(Ref) -> erlang:cancel_timer(Ref).
cancel_timer(Ref, {eat_message, Msg}) ->
cancel_timer(Ref),
- receive
+ receive
Msg ->
ok
after 0 ->
@@ -1310,3 +1384,19 @@ dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
ets:delete(Tid, {Pipe_sz, self()}),
ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
State#state{cur_pipeline_size = Pipe_sz - 1}.
+
+flatten([H | _] = L) when is_integer(H) ->
+ L;
+flatten([H | _] = L) when is_list(H) ->
+ lists:flatten(L);
+flatten([]) ->
+ [].
+
+get_stream_chunk_size(Options) ->
+ case lists:keysearch(stream_chunk_size, 1, Options) of
+ {value, {_, V}} when V > 0 ->
+ V;
+ _ ->
+ ?DEFAULT_STREAM_CHUNK_SIZE
+ end.
+
diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl
index b4429c9b..de8865ff 100644
--- a/src/ibrowse/ibrowse_test.erl
+++ b/src/ibrowse/ibrowse_test.erl
@@ -14,7 +14,10 @@
drv_ue_test/0,
drv_ue_test/1,
ue_test/0,
- ue_test/1
+ ue_test/1,
+ verify_chunked_streaming/0,
+ verify_chunked_streaming/1,
+ i_do_async_req_list/4
]).
-import(ibrowse_lib, [printable_date/0]).
@@ -88,7 +91,7 @@ do_wait() ->
do_wait()
end
end.
-
+
do_send_req(Url, NumReqs) ->
do_send_req_1(Url, NumReqs).
@@ -149,7 +152,7 @@ dump_errors(Key, Iod) ->
-define(TEST_LIST, [{"http://intranet/messenger", get},
{"http://www.google.co.uk", get},
{"http://www.google.com", get},
- {"http://www.google.com", options},
+ {"http://www.google.com", options},
{"http://www.sun.com", get},
{"http://www.oracle.com", get},
{"http://www.bbc.co.uk", get},
@@ -172,7 +175,8 @@ dump_errors(Key, Iod) ->
{"http://jigsaw.w3.org/HTTP/400/toolong/", get},
{"http://jigsaw.w3.org/HTTP/300/", get},
{"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]},
- {"http://jigsaw.w3.org/HTTP/CL/", get}
+ {"http://jigsaw.w3.org/HTTP/CL/", get},
+ {"http://www.httpwatch.com/httpgallery/chunked/", get}
]).
unit_tests() ->
@@ -185,13 +189,104 @@ unit_tests(Options) ->
execute_req(Url, Method, X_Opts ++ Options)
end, ?TEST_LIST).
-execute_req(Url, Method) ->
- execute_req(Url, Method, []).
+verify_chunked_streaming() ->
+ verify_chunked_streaming([]).
+
+verify_chunked_streaming(Options) ->
+ Url = "http://www.httpwatch.com/httpgallery/chunked/",
+ io:format("URL: ~s~n", [Url]),
+ io:format("Fetching data without streaming...~n", []),
+ Result_without_streaming = ibrowse:send_req(
+ Url, [], get, [],
+ [{response_format, binary} | Options]),
+ io:format("Fetching data with streaming as list...~n", []),
+ Async_response_list = do_async_req_list(
+ Url, get, [{response_format, list}]),
+ io:format("Fetching data with streaming as binary...~n", []),
+ Async_response_bin = do_async_req_list(
+ Url, get, [{response_format, binary}]),
+ compare_responses(Result_without_streaming, Async_response_list, Async_response_bin).
+
+compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) ->
+ success;
+compare_responses({ok, St_code, _, Body_1}, {ok, St_code, _, Body_2}, {ok, St_code, _, Body_3}) ->
+ case Body_1 of
+ Body_2 ->
+ io:format("Body_1 and Body_2 match~n", []);
+ Body_3 ->
+ io:format("Body_1 and Body_3 match~n", []);
+ _ when Body_2 == Body_3 ->
+ io:format("Body_2 and Body_3 match~n", []);
+ _ ->
+ io:format("All three bodies are different!~n", [])
+ end,
+ fail_bodies_mismatch;
+compare_responses(R1, R2, R3) ->
+ io:format("R1 -> ~p~n", [R1]),
+ io:format("R2 -> ~p~n", [R2]),
+ io:format("R3 -> ~p~n", [R3]),
+ fail.
+
+do_async_req_list(Url) ->
+ do_async_req_list(Url, get).
+
+do_async_req_list(Url, Method) ->
+ do_async_req_list(Url, Method, [{stream_to, self()},
+ {stream_chunk_size, 1000}]).
+
+do_async_req_list(Url, Method, Options) ->
+ {Pid,_} = erlang:spawn_monitor(?MODULE, i_do_async_req_list,
+ [self(), Url, Method,
+ Options ++ [{stream_chunk_size, 1000}]]),
+ io:format("Spawned process ~p~n", [Pid]),
+ wait_for_resp(Pid).
+
+wait_for_resp(Pid) ->
+ receive
+ {async_result, Pid, Res} ->
+ Res;
+ {'DOWN', _, _, Pid, Reason} ->
+ {'EXIT', Reason};
+ {'DOWN', _, _, _, _} ->
+ wait_for_resp(Pid);
+ Msg ->
+ io:format("Recvd unknown message: ~p~n", [Msg]),
+ wait_for_resp(Pid)
+ after 10000 ->
+ {error, timeout}
+ end.
+
+i_do_async_req_list(Parent, Url, Method, Options) ->
+ Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]),
+ case Res of
+ {ibrowse_req_id, Req_id} ->
+ Result = wait_for_async_resp(Req_id, undefined, undefined, []),
+ Parent ! {async_result, self(), Result};
+ Err ->
+ Parent ! {async_result, self(), Err}
+ end.
+
+wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) ->
+ receive
+ {ibrowse_async_headers, Req_id, StatCode, Headers} ->
+ wait_for_async_resp(Req_id, StatCode, Headers, Body);
+ {ibrowse_async_response, Req_id, {chunk_start, _}} ->
+ wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body);
+ {ibrowse_async_response, Req_id, chunk_end} ->
+ wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body);
+ {ibrowse_async_response_end, Req_id} ->
+ Body_1 = list_to_binary(lists:reverse(Body)),
+ {ok, Acc_Stat_code, Acc_Headers, Body_1};
+ {ibrowse_async_response, Req_id, Data} ->
+ wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]);
+ Err ->
+ {ok, Acc_Stat_code, Acc_Headers, Err}
+ end.
execute_req(Url, Method, Options) ->
io:format("~s, ~p: ", [Url, Method]),
Result = (catch ibrowse:send_req(Url, [], Method, [], Options)),
- case Result of
+ case Result of
{ok, SCode, _H, _B} ->
io:format("Status code: ~p~n", [SCode]);
Err ->