summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_httpd_db.erl52
-rw-r--r--src/couchdb/couch_ref_counter.erl4
2 files changed, 45 insertions, 11 deletions
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index cb8c205f..cf11d4e1 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -43,6 +43,25 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method,
do_db_req(Req, Handler)
end.
+get_changes_timeout(Req, Resp) ->
+ DefaultTimeout = list_to_integer(
+ couch_config:get("httpd", "changes_timeout", "60000")),
+ case couch_httpd:qs_value(Req, "heartbeat") of
+ undefined ->
+ case couch_httpd:qs_value(Req, "timeout") of
+ undefined ->
+ {DefaultTimeout, fun() -> stop end};
+ TimeoutList ->
+ {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
+ fun() -> stop end}
+ end;
+ "true" ->
+ {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end};
+ TimeoutList ->
+ {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
+ fun() -> send_chunk(Resp, " \n"), ok end}
+ end.
+
handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")),
@@ -57,13 +76,14 @@ handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
(_) ->
ok
end),
+ {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp),
couch_stats_collector:track_process_count(Self,
{httpd, clients_requesting_changes}),
try
- keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>)
+ keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout, TimeoutFun)
after
couch_db_update_notifier:stop(Notify),
- wait_db_updated(0) % clean out any remaining update messages
+ get_rest_db_updated() % clean out any remaining update messages
end;
"false" ->
@@ -77,18 +97,30 @@ handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
send_method_not_allowed(Req, "GET,HEAD").
% waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout) ->
- receive db_updated ->
- wait_db_updated(0)
- after Timeout -> ok
+wait_db_updated(Timeout, TimeoutFun) ->
+ receive db_updated -> get_rest_db_updated()
+ after Timeout ->
+ case TimeoutFun() of
+ ok -> wait_db_updated(Timeout, TimeoutFun);
+ stop -> stop
+ end
+ end.
+
+get_rest_db_updated() ->
+ receive db_updated -> get_rest_db_updated()
+ after 0 -> updated
end.
-keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend) ->
+keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
{ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend),
couch_db:close(Db),
- wait_db_updated(infinity),
- {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
- keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2).
+ case wait_db_updated(Timeout, TimeoutFun) of
+ updated ->
+ {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+ keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun);
+ stop ->
+ send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]))
+ end.
send_changes(Req, Resp, Db, StartSeq, Prepend0) ->
Style = list_to_existing_atom(
diff --git a/src/couchdb/couch_ref_counter.erl b/src/couchdb/couch_ref_counter.erl
index 4c824aa6..0fbec729 100644
--- a/src/couchdb/couch_ref_counter.erl
+++ b/src/couchdb/couch_ref_counter.erl
@@ -75,7 +75,9 @@ handle_cast({drop, Pid}, #srv{referrers=Referrers}=Srv) ->
erlang:demonitor(MonRef, [flush]),
dict:erase(Pid, Referrers);
{ok, {MonRef, Num}} ->
- dict:store(Pid, {MonRef, Num-1}, Referrers)
+ dict:store(Pid, {MonRef, Num-1}, Referrers);
+ error ->
+ Referrers
end,
maybe_close_async(Srv#srv{referrers=Referrers2}).