summaryrefslogtreecommitdiff
path: root/src/couchdb
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb')
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl123
1 files changed, 97 insertions, 26 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index deea2d8e..3de178a5 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -15,7 +15,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([start/2, start_link/2, next/1, stop/1]).
+-export([start/2, start_link/2, all/1, next/1, stop/1]).
-define(MIN_BUFFER_SIZE, 100).
@@ -23,10 +23,12 @@
-include("../ibrowse/ibrowse.hrl").
-record (remote, {
- conn = nil,
- reqid = nil,
+ conn,
+ reqid,
+ last_seq,
complete = false,
count = 0,
+ partial_chunk = nil,
reply_to = nil,
rows = queue:new()
}).
@@ -46,6 +48,11 @@ start(Url, Options) ->
start_link(Url, Options) ->
gen_server:start_link(?MODULE, [Url, Options], []).
+%% @doc does not block
+all(Server) ->
+ gen_server:call(Server, all_changes).
+
+%% @doc returns the next change from the feed, blocking if necessary
next(Server) ->
gen_server:call(Server, next_change, infinity).
@@ -57,7 +64,7 @@ init([{remote, Url}, Options]) ->
Continuous = proplists:get_value(continuous, Options, false),
{Pid, ReqId} = start_http_request(lists:concat([Url, "/_changes",
"?style=all_docs", "&since=", Since, "&continuous=", Continuous])),
- {ok, #remote{conn=Pid, reqid=ReqId}};
+ {ok, #remote{conn=Pid, last_seq=Since, reqid=ReqId}};
init([{local, DbName}, Options]) when is_list(DbName) ->
init([{local, ?l2b(DbName)}, Options]);
@@ -91,6 +98,30 @@ handle_call({add, Row}, From, #local{}=State) ->
} = State,
{noreply, State#local{count=Count+1, changes_from=From, rows=queue:in(Row,Rows)}};
+handle_call(all_changes, _From, #local{complete=Complete, count=Count}=State)
+ when Complete =:= false; Count > 0 ->
+ #local{
+ changes_from = ChangesFrom,
+ rows = Rows
+ } = State,
+ if Count < ?MIN_BUFFER_SIZE, ChangesFrom =/= nil ->
+ gen_server:reply(ChangesFrom, ok);
+ true -> ok end,
+ {reply, queue:to_list(Rows), State#local{count=0, rows=queue:new()}};
+handle_call(all_changes, _From, #local{}=State) ->
+ {stop, normal, complete, State};
+
+handle_call(all_changes, _From, #remote{complete=Complete, count=Count}=State)
+ when Complete =:= false; Count > 0 ->
+ #remote{
+ reqid = Id,
+ rows = Rows
+ } = State,
+ ok = maybe_stream_next(Complete, 0, Id),
+ {reply, queue:to_list(Rows), State#remote{count=0, rows=queue:new()}};
+handle_call(all_changes, _From, #remote{}=State) ->
+ {stop, normal, complete, State};
+
handle_call(next_change, From, #local{count=0}=State) ->
if State#local.complete ->
{stop, normal, complete, State};
@@ -158,28 +189,11 @@ handle_info({ibrowse_async_headers, Id, Code, Hdrs}, #remote{reqid=Id}=State) ->
[Code,Hdrs]),
{stop, {error, list_to_integer(Code)}, State};
+handle_info({ibrowse_async_response, _, {error, Reason}}, State) ->
+ {stop, {error, Reason}, State};
handle_info({ibrowse_async_response, Id, Msg}, #remote{reqid=Id} = State) ->
?LOG_DEBUG("~p reqid ~p ibrowse_async_response ~p", [?MODULE, Id, Msg]),
- #remote{
- complete = Complete,
- count = Count,
- rows = Rows
- } = State,
- try
- Row = decode_row(Msg),
- case State of
- #remote{reply_to=nil} ->
- {noreply, State#remote{count=Count+1, rows = queue:in(Row, Rows)}};
- #remote{count=0, reply_to=From}->
- gen_server:reply(From, Row),
- {noreply, State#remote{reply_to=nil}}
- end
- catch
- throw:{invalid_json, Msg} ->
- ?LOG_DEBUG("got invalid_json ~p", [Msg]),
- ok = maybe_stream_next(Complete, Count, Id),
- {noreply, State}
- end;
+ {noreply, process_response(Msg, Id, State)};
handle_info({ibrowse_async_response_end, Id}, #remote{reqid=Id} = State) ->
?LOG_DEBUG("got ibrowse_async_response_end ~p", [State#remote.reply_to]),
@@ -217,7 +231,63 @@ code_change(_OldVsn, State, _Extra) ->
%internal funs
-decode_row([$,, $\n | Rest]) ->
+process_response(<<"{\"results\":[\n">>, Id, State) ->
+ #remote{
+ complete = Complete,
+ count = Count
+ } = State,
+ ok = maybe_stream_next(Complete, Count, Id),
+ State;
+process_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, _, State) ->
+ LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
+ State#remote{last_seq = LastSeq};
+process_response(Chunk, Id, #remote{partial_chunk=nil} = State) ->
+ #remote{
+ complete = Complete,
+ count = Count,
+ rows = Rows
+ } = State,
+ try
+ Row = decode_row(Chunk),
+ ok = maybe_stream_next(Complete, Count+1, Id),
+ case State of
+ #remote{reply_to=nil} ->
+ State#remote{count=Count+1, rows = queue:in(Row, Rows)};
+ #remote{count=0, reply_to=From}->
+ gen_server:reply(From, Row),
+ State#remote{reply_to=nil}
+ end
+ catch
+ throw:{invalid_json, Bad} ->
+ ?LOG_DEBUG("got invalid_json ~p", [Bad]),
+ ok = maybe_stream_next(Complete, Count, Id),
+ State#remote{partial_chunk = Bad}
+ end;
+process_response(Chunk, Id, State) ->
+ #remote{
+ complete = Complete,
+ count = Count,
+ partial_chunk = Partial,
+ rows = Rows
+ } = State,
+ try
+ Row = decode_row(<<Partial/binary, Chunk/binary>>),
+ ok = maybe_stream_next(Complete, Count+1, Id),
+ case State of
+ #remote{reply_to=nil} ->
+ State#remote{count=Count+1, partial_chunk=nil, rows=queue:in(Row,Rows)};
+ #remote{count=0, reply_to=From}->
+ gen_server:reply(From, Row),
+ State#remote{reply_to=nil, partial_chunk=nil}
+ end
+ catch
+ throw:{invalid_json, Bad} ->
+ ?LOG_DEBUG("got invalid_json ~p", [Bad]),
+ ok = maybe_stream_next(Complete, Count, Id),
+ State#remote{partial_chunk = Bad}
+ end.
+
+decode_row(<<",\n", Rest/binary>>) ->
decode_row(Rest);
decode_row(Row) ->
?JSON_DECODE(Row).
@@ -278,7 +348,8 @@ start_http_request(RawUrl) ->
{ok, Pid} = ibrowse:spawn_link_worker_process(Url#url.host, Url#url.port),
Opts = [
{stream_to, {self(), once}},
- {inactivity_timeout, 30000}
+ {inactivity_timeout, 30000},
+ {response_format, binary}
],
{ibrowse_req_id, Id} =
ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),