summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-07-24 16:38:53 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-07-24 16:38:53 +0000
commitac3d047a6be45316681d9c6e507537231d1e0e0a (patch)
treef18976f0040ed32bf2772dc3cbc5d376da2f54ec
parent7b400c260c7c93bb24ff0e13dc2575af8751f35a (diff)
reassemble split chunks in changes feed, add all() export
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@797553 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl123
-rwxr-xr-xtest/etap/110-replication-changes-feed.t43
2 files changed, 138 insertions, 28 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index deea2d8e..3de178a5 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -15,7 +15,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([start/2, start_link/2, next/1, stop/1]).
+-export([start/2, start_link/2, all/1, next/1, stop/1]).
-define(MIN_BUFFER_SIZE, 100).
@@ -23,10 +23,12 @@
-include("../ibrowse/ibrowse.hrl").
-record (remote, {
- conn = nil,
- reqid = nil,
+ conn,
+ reqid,
+ last_seq,
complete = false,
count = 0,
+ partial_chunk = nil,
reply_to = nil,
rows = queue:new()
}).
@@ -46,6 +48,11 @@ start(Url, Options) ->
start_link(Url, Options) ->
gen_server:start_link(?MODULE, [Url, Options], []).
+%% @doc does not block
+all(Server) ->
+ gen_server:call(Server, all_changes).
+
+%% @doc returns the next change from the feed, blocking if necessary
next(Server) ->
gen_server:call(Server, next_change, infinity).
@@ -57,7 +64,7 @@ init([{remote, Url}, Options]) ->
Continuous = proplists:get_value(continuous, Options, false),
{Pid, ReqId} = start_http_request(lists:concat([Url, "/_changes",
"?style=all_docs", "&since=", Since, "&continuous=", Continuous])),
- {ok, #remote{conn=Pid, reqid=ReqId}};
+ {ok, #remote{conn=Pid, last_seq=Since, reqid=ReqId}};
init([{local, DbName}, Options]) when is_list(DbName) ->
init([{local, ?l2b(DbName)}, Options]);
@@ -91,6 +98,30 @@ handle_call({add, Row}, From, #local{}=State) ->
} = State,
{noreply, State#local{count=Count+1, changes_from=From, rows=queue:in(Row,Rows)}};
+handle_call(all_changes, _From, #local{complete=Complete, count=Count}=State)
+ when Complete =:= false; Count > 0 ->
+ #local{
+ changes_from = ChangesFrom,
+ rows = Rows
+ } = State,
+ if Count < ?MIN_BUFFER_SIZE, ChangesFrom =/= nil ->
+ gen_server:reply(ChangesFrom, ok);
+ true -> ok end,
+ {reply, queue:to_list(Rows), State#local{count=0, rows=queue:new()}};
+handle_call(all_changes, _From, #local{}=State) ->
+ {stop, normal, complete, State};
+
+handle_call(all_changes, _From, #remote{complete=Complete, count=Count}=State)
+ when Complete =:= false; Count > 0 ->
+ #remote{
+ reqid = Id,
+ rows = Rows
+ } = State,
+ ok = maybe_stream_next(Complete, 0, Id),
+ {reply, queue:to_list(Rows), State#remote{count=0, rows=queue:new()}};
+handle_call(all_changes, _From, #remote{}=State) ->
+ {stop, normal, complete, State};
+
handle_call(next_change, From, #local{count=0}=State) ->
if State#local.complete ->
{stop, normal, complete, State};
@@ -158,28 +189,11 @@ handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #remote{reqid=Id}=State) ->
[Code,Hdrs]),
{stop, {error, list_to_integer(Code)}, State};
+handle_info({ibrowse_async_response, _, {error, Reason}}, State) ->
+ {stop, {error, Reason}, State};
handle_info({ibrowse_async_response, Id, Msg}, #remote{reqid=Id} = State) ->
?LOG_DEBUG("~p reqid ~p ibrowse_async_response ~p", [?MODULE, Id, Msg]),
- #remote{
- complete = Complete,
- count = Count,
- rows = Rows
- } = State,
- try
- Row = decode_row(Msg),
- case State of
- #remote{reply_to=nil} ->
- {noreply, State#remote{count=Count+1, rows = queue:in(Row, Rows)}};
- #remote{count=0, reply_to=From}->
- gen_server:reply(From, Row),
- {noreply, State#remote{reply_to=nil}}
- end
- catch
- throw:{invalid_json, Msg} ->
- ?LOG_DEBUG("got invalid_json ~p", [Msg]),
- ok = maybe_stream_next(Complete, Count, Id),
- {noreply, State}
- end;
+ {noreply, process_response(Msg, Id, State)};
handle_info({ibrowse_async_response_end, Id}, #remote{reqid=Id} = State) ->
?LOG_DEBUG("got ibrowse_async_response_end ~p", [State#remote.reply_to]),
@@ -217,7 +231,63 @@ code_change(_OldVsn, State, _Extra) ->
%internal funs
-decode_row([$,, $\n | Rest]) ->
+process_response(<<"{\"results\":[\n">>, Id, State) ->
+ #remote{
+ complete = Complete,
+ count = Count
+ } = State,
+ ok = maybe_stream_next(Complete, Count, Id),
+ State;
+process_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, _, State) ->
+ LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
+ State#remote{last_seq = LastSeq};
+process_response(Chunk, Id, #remote{partial_chunk=nil} = State) ->
+ #remote{
+ complete = Complete,
+ count = Count,
+ rows = Rows
+ } = State,
+ try
+ Row = decode_row(Chunk),
+ ok = maybe_stream_next(Complete, Count+1, Id),
+ case State of
+ #remote{reply_to=nil} ->
+ State#remote{count=Count+1, rows = queue:in(Row, Rows)};
+ #remote{count=0, reply_to=From}->
+ gen_server:reply(From, Row),
+ State#remote{reply_to=nil}
+ end
+ catch
+ throw:{invalid_json, Bad} ->
+ ?LOG_DEBUG("got invalid_json ~p", [Bad]),
+ ok = maybe_stream_next(Complete, Count, Id),
+ State#remote{partial_chunk = Bad}
+ end;
+process_response(Chunk, Id, State) ->
+ #remote{
+ complete = Complete,
+ count = Count,
+ partial_chunk = Partial,
+ rows = Rows
+ } = State,
+ try
+ Row = decode_row(<<Partial/binary, Chunk/binary>>),
+ ok = maybe_stream_next(Complete, Count+1, Id),
+ case State of
+ #remote{reply_to=nil} ->
+ State#remote{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
+ #remote{count=0, reply_to=From}->
+ gen_server:reply(From, Row),
+ State#remote{reply_to=nil, partial_chunk=nil}
+ end
+ catch
+ throw:{invalid_json, Bad} ->
+ ?LOG_DEBUG("got invalid_json ~p", [Bad]),
+ ok = maybe_stream_next(Complete, Count, Id),
+ State#remote{partial_chunk = Bad}
+ end.
+
+decode_row(<<",\n", Rest/binary>>) ->
decode_row(Rest);
decode_row(Row) ->
?JSON_DECODE(Row).
@@ -278,7 +348,8 @@ start_http_request(RawUrl) ->
{ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
Opts = [
{stream_to, {self(), once}},
- {inactivity_timeout, 30000}
+ {inactivity_timeout, 30000},
+ {response_format, binary}
],
{ibrowse_req_id, Id} =
ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
diff --git a/test/etap/110-replication-changes-feed.t b/test/etap/110-replication-changes-feed.t
index 9b5dec85..a3b1b8a7 100755
--- a/test/etap/110-replication-changes-feed.t
+++ b/test/etap/110-replication-changes-feed.t
@@ -22,7 +22,7 @@ main(_) ->
code:add_pathz("src/ibrowse"),
code:add_pathz("src/mochiweb"),
- etap:plan(12),
+ etap:plan(17),
case (catch test()) of
ok ->
etap:end_tests();
@@ -48,6 +48,7 @@ test() ->
couch_server:delete(<<"etap-test-db">>, []),
{ok, Db2} = couch_db:create(<<"etap-test-db">>, []),
test_all(remote),
+ test_remote_only(),
couch_db:close(Db2),
couch_server:delete(<<"etap-test-db">>, []),
@@ -59,7 +60,11 @@ test_all(Type) ->
test_since_parameter(Type),
test_continuous_parameter(Type),
test_conflicts(Type),
- test_deleted_conflicts(Type).
+ test_deleted_conflicts(Type),
+ test_non_blocking_call(Type).
+
+test_remote_only() ->
+ test_chunk_reassembly(remote).
test_unchanged_db(Type) ->
{ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)}, []),
@@ -154,6 +159,40 @@ test_deleted_conflicts(Type) ->
io_lib:format("(~p) deleted conflict revisions show up in feed", [Type])
).
+test_non_blocking_call(Type) ->
+ Since = get_update_seq(),
+ {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)},
+ [{since, Since}, {continuous, true}]),
+ etap:is(
+ couch_rep_changes_feed:all(Pid),
+ [],
+ io_lib:format("(~p) all() returns empty list if no changes available",
+ [Type])
+ ),
+ Expect1 = generate_change(),
+ Expect2 = generate_change(),
+ timer:sleep(100),
+ etap:is(
+ couch_rep_changes_feed:all(Pid),
+ [Expect1, Expect2],
+ io_lib:format("(~p) all() returns full list of outstanding changes",
+ [Type])
+ ),
+ ok = couch_rep_changes_feed:stop(Pid).
+
+test_chunk_reassembly(Type) ->
+ Since = get_update_seq(),
+ Expect = [generate_change() || _I <- lists:seq(1,30)],
+ {ok, Pid} = couch_rep_changes_feed:start({Type, get_dbname(Type)},
+ [{since, Since}]),
+ timer:sleep(100),
+ etap:is(
+ couch_rep_changes_feed:all(Pid),
+ Expect,
+ io_lib:format("(~p) reassembles chunks split across TCP frames",
+ [Type])
+ ).
+
generate_change() ->
generate_change(couch_util:new_uuid()).