summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-07-18 02:18:38 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-07-18 02:18:38 +0000
commitd04d64e6eedf41f25b39a3366bc3dc4534843c52 (patch)
treeb48244843885d0ecdd9538e2082b6169959b1754
parent7264706c21a2533bc3244d01c168085614d13968 (diff)
listen for local update notifications when continuous=true
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@795297 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl30
1 files changed, 27 insertions, 3 deletions
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index 35d6de1d..7aa34697 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -1,12 +1,12 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
+% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
@@ -222,6 +222,18 @@ decode_row([$,, $\n | Rest]) ->
decode_row(Row) ->
?JSON_DECODE(Row).
+flush_updated_messages() ->
+ receive updated -> flush_updated_messages()
+ after 0 -> ok
+ end.
+
+local_update_notification(Self, DbName, {updated, DbName}) ->
+ Self ! updated;
+local_update_notification(Self, DbName, {updated, DbName}) ->
+ Self ! deleted;
+local_update_notification(_, _, _) ->
+ ok.
+
maybe_stream_next(false, Count, Id) when Count < ?MIN_BUFFER_SIZE ->
?LOG_DEBUG("~p reqid ~p streaming next chunk", [?MODULE, Id]),
ibrowse:stream_next(Id);
@@ -230,8 +242,11 @@ maybe_stream_next(_Complete, _Count, Id) ->
ok.
send_local_changes_forever(Server, DbName, Since) ->
+ Self = self(),
+ {ok, _} = couch_db_update_notifier:start_link(
+ fun(Msg) -> local_update_notification(Self, DbName, Msg) end),
{ok, NewSeq} = send_local_changes_once(Server, DbName, Since),
- timer:sleep(5000),
+ ok = wait_db_updated(),
send_local_changes_forever(Server, DbName, NewSeq).
send_local_changes_once(Server, DbName, Since) ->
@@ -268,3 +283,12 @@ start_http_request(RawUrl) ->
{ibrowse_req_id, Id} =
ibrowse:send_req_direct(Pid, RawUrl, [], get, [], Opts, infinity),
{Pid, Id}.
+
+wait_db_updated() ->
+ receive deleted ->
+ exit(deleted)
+ after 0 ->
+ receive updated ->
+ flush_updated_messages()
+ end
+ end.