From 53adfad5f7faa980cbce66bcd231594ce80ecbba Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Sun, 29 Aug 2010 18:03:05 -0400 Subject: fix heartbeat for _changes feeds --- apps/couch/src/couch_changes.erl | 3 +-- apps/fabric/src/fabric_view_changes.erl | 30 +++++++++++++++++++----------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl index a9c08509..b2848c88 100644 --- a/apps/couch/src/couch_changes.erl +++ b/apps/couch/src/couch_changes.erl @@ -14,8 +14,7 @@ -include("couch_db.hrl"). -export([handle_changes/3, get_changes_timeout/2, main_only_filter/1, - all_docs_filter/1, wait_db_updated/2, get_rest_db_updated/0, - make_filter_fun/4]). + all_docs_filter/1, get_rest_db_updated/0, make_filter_fun/4]). %% @spec handle_changes(#changes_args{}, #httpd{} | {json_req, {[any()]}}, #db{}) -> any() handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl index c7b6b7c4..63ffc02a 100644 --- a/apps/fabric/src/fabric_view_changes.erl +++ b/apps/fabric/src/fabric_view_changes.erl @@ -25,7 +25,7 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse Args = make_changes_args(Options), {ok, Acc} = Callback(start, Acc0), Notifiers = start_update_notifiers(DbName), - {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback), + {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), try keep_sending_changes( DbName, @@ -33,8 +33,7 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse Callback, get_start_seq(DbName, Args), Acc, - Timeout, - TimeoutFun + Timeout ) after stop_update_notifiers(Notifiers), @@ -53,27 +52,32 @@ go(DbName, "normal", Options, Callback, Acc0) -> ), Callback({stop, pack_seqs(Seqs)}, AccOut). -keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> - #changes_args{limit=Limit, feed=Feed} = Args, +keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout) -> + #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args, {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, LastSeq = pack_seqs(NewSeqs), if Limit > Limit2, Feed == "longpoll" -> Callback({stop, LastSeq}, AccOut); true -> - case couch_changes:wait_db_updated(Timeout, TFun) of + case wait_db_updated(Timeout) of updated -> keep_sending_changes( DbName, Args#changes_args{limit=Limit2}, Callback, LastSeq, - AccIn, - Timeout, - TFun + AccOut, + Timeout ); - stop -> - Callback({stop, LastSeq}, AccOut) + timeout -> + case Heartbeat of undefined -> + Callback({stop, LastSeq}, AccOut); + _ -> + {ok, AccTimeout} = Callback(timeout, AccOut), + keep_sending_changes(DbName, Args#changes_args{limit=Limit2}, + Callback, LastSeq, AccTimeout, Timeout) + end end end. @@ -263,3 +267,7 @@ changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> find_replacement_shards(#shard{range=Range}, AllShards) -> % TODO make this moar betta -- we might have split or merged the partition [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. + +wait_db_updated(Timeout) -> + receive db_updated -> couch_changes:get_rest_db_updated() + after Timeout -> timeout end. -- cgit v1.2.3