From 227ac8db0384d3f4ca5bbd479642807442692c85 Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Sun, 31 May 2009 23:43:22 +0000 Subject: Added timeout and heartbeat options to the _changes api git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@780529 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_httpd_db.erl | 52 +++++++++++++++++++++++++++++++-------- src/couchdb/couch_ref_counter.erl | 4 ++- 2 files changed, 45 insertions(+), 11 deletions(-) (limited to 'src/couchdb') diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index cb8c205f..cf11d4e1 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -43,6 +43,25 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, do_db_req(Req, Handler) end. +get_changes_timeout(Req, Resp) -> + DefaultTimeout = list_to_integer( + couch_config:get("httpd", "changes_timeout", "60000")), + case couch_httpd:qs_value(Req, "heartbeat") of + undefined -> + case couch_httpd:qs_value(Req, "timeout") of + undefined -> + {DefaultTimeout, fun() -> stop end}; + TimeoutList -> + {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]), + fun() -> stop end} + end; + "true" -> + {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end}; + TimeoutList -> + {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]), + fun() -> send_chunk(Resp, " \n"), ok end} + end. + handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")), @@ -57,13 +76,14 @@ handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> (_) -> ok end), + {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp), couch_stats_collector:track_process_count(Self, {httpd, clients_requesting_changes}), try - keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>) + keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout, TimeoutFun) after couch_db_update_notifier:stop(Notify), - wait_db_updated(0) % clean out any remaining update messages + get_rest_db_updated() % clean out any remaining update messages end; "false" -> @@ -77,18 +97,30 @@ handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> send_method_not_allowed(Req, "GET,HEAD"). % waits for a db_updated msg, if there are multiple msgs, collects them. -wait_db_updated(Timeout) -> - receive db_updated -> - wait_db_updated(0) - after Timeout -> ok +wait_db_updated(Timeout, TimeoutFun) -> + receive db_updated -> get_rest_db_updated() + after Timeout -> + case TimeoutFun() of + ok -> wait_db_updated(Timeout, TimeoutFun); + stop -> stop + end + end. + +get_rest_db_updated() -> + receive db_updated -> get_rest_db_updated() + after 0 -> updated end. -keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend) -> +keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend), couch_db:close(Db), - wait_db_updated(infinity), - {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]), - keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2). + case wait_db_updated(Timeout, TimeoutFun) of + updated -> + {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]), + keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun); + stop -> + send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])) + end. send_changes(Req, Resp, Db, StartSeq, Prepend0) -> Style = list_to_existing_atom( diff --git a/src/couchdb/couch_ref_counter.erl b/src/couchdb/couch_ref_counter.erl index 4c824aa6..0fbec729 100644 --- a/src/couchdb/couch_ref_counter.erl +++ b/src/couchdb/couch_ref_counter.erl @@ -75,7 +75,9 @@ handle_cast({drop, Pid}, #srv{referrers=Referrers}=Srv) -> erlang:demonitor(MonRef, [flush]), dict:erase(Pid, Referrers); {ok, {MonRef, Num}} -> - dict:store(Pid, {MonRef, Num-1}, Referrers) + dict:store(Pid, {MonRef, Num-1}, Referrers); + error -> + Referrers end, maybe_close_async(Srv#srv{referrers=Referrers2}). -- cgit v1.2.3