From d04d64e6eedf41f25b39a3366bc3dc4534843c52 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Sat, 18 Jul 2009 02:18:38 +0000 Subject: listen for local update notifications when continuous=true git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@795297 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_rep_changes_feed.erl | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index 35d6de1d..7aa34697 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -1,12 +1,12 @@ % Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of +% use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. @@ -222,6 +222,18 @@ decode_row([$,, $\n | Rest]) -> decode_row(Row) -> ?JSON_DECODE(Row). +flush_updated_messages() -> + receive updated -> flush_updated_messages() + after 0 -> ok + end. + +local_update_notification(Self, DbName, {updated, DbName}) -> + Self ! updated; +local_update_notification(Self, DbName, {updated, DbName}) -> + Self ! deleted; +local_update_notification(_, _, _) -> + ok. + maybe_stream_next(false, Count, Id) when Count < ?MIN_BUFFER_SIZE -> ?LOG_DEBUG("~p reqid ~p streaming next chunk", [?MODULE, Id]), ibrowse:stream_next(Id); @@ -230,8 +242,11 @@ maybe_stream_next(_Complete, _Count, Id) -> ok. send_local_changes_forever(Server, DbName, Since) -> + Self = self(), + {ok, _} = couch_db_update_notifier:start_link( + fun(Msg) -> local_update_notification(Self, DbName, Msg) end), {ok, NewSeq} = send_local_changes_once(Server, DbName, Since), - timer:sleep(5000), + ok = wait_db_updated(), send_local_changes_forever(Server, DbName, NewSeq). send_local_changes_once(Server, DbName, Since) -> @@ -268,3 +283,12 @@ start_http_request(RawUrl) -> {ibrowse_req_id, Id} = ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity), {Pid, Id}. + +wait_db_updated() -> + receive deleted -> + exit(deleted) + after 0 -> + receive updated -> + flush_updated_messages() + end + end. -- cgit v1.2.3