From 53adfad5f7faa980cbce66bcd231594ce80ecbba Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Sun, 29 Aug 2010 18:03:05 -0400 Subject: fix heartbeat for _changes feeds --- apps/fabric/src/fabric_view_changes.erl | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) (limited to 'apps/fabric/src') diff --git a/apps/fabric/src/fabric_view_changes.erl b/apps/fabric/src/fabric_view_changes.erl index c7b6b7c4..63ffc02a 100644 --- a/apps/fabric/src/fabric_view_changes.erl +++ b/apps/fabric/src/fabric_view_changes.erl @@ -25,7 +25,7 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse Args = make_changes_args(Options), {ok, Acc} = Callback(start, Acc0), Notifiers = start_update_notifiers(DbName), - {Timeout, TimeoutFun} = couch_changes:get_changes_timeout(Args, Callback), + {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), try keep_sending_changes( DbName, @@ -33,8 +33,7 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse Callback, get_start_seq(DbName, Args), Acc, - Timeout, - TimeoutFun + Timeout ) after stop_update_notifiers(Notifiers), @@ -53,27 +52,32 @@ go(DbName, "normal", Options, Callback, Acc0) -> ), Callback({stop, pack_seqs(Seqs)}, AccOut). -keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, TFun) -> - #changes_args{limit=Limit, feed=Feed} = Args, +keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout) -> + #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args, {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn), #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector, LastSeq = pack_seqs(NewSeqs), if Limit > Limit2, Feed == "longpoll" -> Callback({stop, LastSeq}, AccOut); true -> - case couch_changes:wait_db_updated(Timeout, TFun) of + case wait_db_updated(Timeout) of updated -> keep_sending_changes( DbName, Args#changes_args{limit=Limit2}, Callback, LastSeq, - AccIn, - Timeout, - TFun + AccOut, + Timeout ); - stop -> - Callback({stop, LastSeq}, AccOut) + timeout -> + case Heartbeat of undefined -> + Callback({stop, LastSeq}, AccOut); + _ -> + {ok, AccTimeout} = Callback(timeout, AccOut), + keep_sending_changes(DbName, Args#changes_args{limit=Limit2}, + Callback, LastSeq, AccTimeout, Timeout) + end end end. @@ -263,3 +267,7 @@ changes_row(#view_row{key=Seq, id=Id, value=Value}, false) -> find_replacement_shards(#shard{range=Range}, AllShards) -> % TODO make this moar betta -- we might have split or merged the partition [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. + +wait_db_updated(Timeout) -> + receive db_updated -> couch_changes:get_rest_db_updated() + after Timeout -> timeout end. -- cgit v1.2.3