summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-29 18:03:05 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-29 18:03:05 -0400
commit53adfad5f7faa980cbce66bcd231594ce80ecbba (patch)
treeffb7a405107a342f6ef1628149a02697e681672c
parentc11a92e53306f54071ed1a561e3a90ce9b2dad04 (diff)
fix heartbeat for _changes feeds
-rw-r--r--apps/couch/src/couch_changes.erl3
-rw-r--r--apps/fabric/src/fabric_view_changes.erl30
2 files changed, 20 insertions, 13 deletions
diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl
index a9c08509..b2848c88 100644
--- a/apps/couch/src/couch_changes.erl
+++ b/apps/couch/src/couch_changes.erl
@@ -14,8 +14,7 @@
-include("couch_db.hrl").
-export([handle_changes/3, get_changes_timeout/2, main_only_filter/1,
- all_docs_filter/1, wait_db_updated/2, get_rest_db_updated/0,
- make_filter_fun/4]).
+ all_docs_filter/1, get_rest_db_updated/0, make_filter_fun/4]).
%% @spec handle_changes(#changes_args{}, #httpd{} | {json_req, {[any()]}}, #db{}) -> any()
handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) ->
diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl
index c7b6b7c4..63ffc02a 100644
--- a/apps/fabric/src/fabric_view_changes.erl
+++ b/apps/fabric/src/fabric_view_changes.erl
@@ -25,7 +25,7 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
Args = make_changes_args(Options),
{ok, Acc} = Callback(start, Acc0),
Notifiers = start_update_notifiers(DbName),
- {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback),
+ {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
try
keep_sending_changes(
DbName,
@@ -33,8 +33,7 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
Callback,
get_start_seq(DbName, Args),
Acc,
- Timeout,
- TimeoutFun
+ Timeout
)
after
stop_update_notifiers(Notifiers),
@@ -53,27 +52,32 @@ go(DbName, "normal", Options, Callback, Acc0) ->
),
Callback({stop, pack_seqs(Seqs)}, AccOut).
-keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) ->
- #changes_args{limit=Limit, feed=Feed} = Args,
+keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout) ->
+ #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
{ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn),
#collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
LastSeq = pack_seqs(NewSeqs),
if Limit > Limit2, Feed == "longpoll" ->
Callback({stop, LastSeq}, AccOut);
true ->
- case couch_changes:wait_db_updated(Timeout, TFun) of
+ case wait_db_updated(Timeout) of
updated ->
keep_sending_changes(
DbName,
Args#changes_args{limit=Limit2},
Callback,
LastSeq,
- AccIn,
- Timeout,
- TFun
+ AccOut,
+ Timeout
);
- stop ->
- Callback({stop, LastSeq}, AccOut)
+ timeout ->
+ case Heartbeat of undefined ->
+ Callback({stop, LastSeq}, AccOut);
+ _ ->
+ {ok, AccTimeout} = Callback(timeout, AccOut),
+ keep_sending_changes(DbName, Args#changes_args{limit=Limit2},
+ Callback, LastSeq, AccTimeout, Timeout)
+ end
end
end.
@@ -263,3 +267,7 @@ changes_row(#view_row{key=Seq, id=Id, value=Value}, false) ->
find_replacement_shards(#shard{range=Range}, AllShards) ->
% TODO make this moar betta -- we might have split or merged the partition
[Shard || Shard <- AllShards, Shard#shard.range =:= Range].
+
+wait_db_updated(Timeout) ->
+ receive db_updated -> couch_changes:get_rest_db_updated()
+ after Timeout -> timeout end.