From c2cde95cddcfee01213d8984e5076e2b0e28e55c Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Mon, 20 Jul 2009 18:46:30 +0000 Subject: Optimize filtered _changes to use an OS process only when actively filtering changes with the continuous=true option. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@795953 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_httpd_db.erl | 88 +++++++++++++++++++++--------------------- 1 file changed, 44 insertions(+), 44 deletions(-) (limited to 'src/couchdb') diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 9d7dda6b..5d915d30 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -66,38 +66,32 @@ get_changes_timeout(Req, Resp) -> handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")), - Filter = couch_httpd:qs_value(Req, "filter", nil), - {ok, FilterFun, EndFilterFun} = make_filter_funs(Req, Db, Filter), - try - {ok, Resp} = start_json_response(Req, 200), - send_chunk(Resp, "{\"results\":[\n"), - case couch_httpd:qs_value(Req, "continuous", "false") of - "true" -> - Self = self(), - {ok, Notify} = couch_db_update_notifier:start_link( - fun({_, DbName0}) when DbName0 == DbName -> - Self ! db_updated; - (_) -> - ok - end), - {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp), - couch_stats_collector:track_process_count(Self, - {httpd, clients_requesting_changes}), - try - keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout, TimeoutFun, FilterFun) - after - couch_db_update_notifier:stop(Notify), - get_rest_db_updated() % clean out any remaining update messages - end; + {ok, Resp} = start_json_response(Req, 200), + send_chunk(Resp, "{\"results\":[\n"), + case couch_httpd:qs_value(Req, "continuous", "false") of + "true" -> + Self = self(), + {ok, Notify} = couch_db_update_notifier:start_link( + fun({_, DbName0}) when DbName0 == DbName -> + Self ! db_updated; + (_) -> + ok + end), + {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp), + couch_stats_collector:track_process_count(Self, + {httpd, clients_requesting_changes}), + try + keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout, TimeoutFun) + after + couch_db_update_notifier:stop(Notify), + get_rest_db_updated() % clean out any remaining update messages + end; - "false" -> - {ok, {LastSeq, _Prepend}} = - send_changes(Req, Resp, Db, StartSeq, <<"">>, FilterFun), - send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [LastSeq])), - send_chunk(Resp, "") - end - after - EndFilterFun() + "false" -> + {ok, {LastSeq, _Prepend}} = + send_changes(Req, Resp, Db, StartSeq, <<"">>), + send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [LastSeq])), + send_chunk(Resp, "") end; handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> @@ -118,22 +112,24 @@ get_rest_db_updated() -> after 0 -> updated end. -keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun, FilterFun) -> - {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend, FilterFun), +keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> + {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend), couch_db:close(Db), case wait_db_updated(Timeout, TimeoutFun) of updated -> {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]), - keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun, FilterFun); + keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun); stop -> send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])), send_chunk(Resp, "") end. -send_changes(Req, Resp, Db, StartSeq, Prepend0, FilterFun) -> +send_changes(Req, Resp, Db, StartSeq, Prepend0) -> Style = list_to_existing_atom( couch_httpd:qs_value(Req, "style", "main_only")), - couch_db:changes_since(Db, Style, StartSeq, + {FilterFun, EndFilterFun} = make_filter_funs(Req, Db), + try + couch_db:changes_since(Db, Style, StartSeq, fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, {_, Prepend}) -> Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], Results = [Result || Result <- Results0, Result /= null], @@ -146,16 +142,20 @@ send_changes(Req, Resp, Db, StartSeq, Prepend0, FilterFun) -> {changes,Results}]})]), {ok, {Seq, <<",\n">>}} end - end, {StartSeq, Prepend0}). + end, {StartSeq, Prepend0}) + after + EndFilterFun() + end. -make_filter_funs(_Req, _Db, nil) -> - {ok, fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) -> - {[{rev, couch_doc:rev_to_str(Rev)}]} - end, - fun() -> ok end}; -make_filter_funs(Req, Db, Filter) -> +make_filter_funs(Req, Db) -> + Filter = couch_httpd:qs_value(Req, "filter", ""), case [list_to_binary(couch_httpd:unquote(Part)) || Part <- string:tokens(Filter, "/")] of + [] -> + {fun(#doc_info{revs=[#rev_info{rev=Rev}|_]}) -> + {[{rev, couch_doc:rev_to_str(Rev)}]} + end, + fun() -> ok end}; [DName, FName] -> DesignId = <<"_design/", DName/binary>>, #doc{body={Props}} = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), @@ -175,7 +175,7 @@ make_filter_funs(Req, Db, Filter) -> EndFilterFun = fun() -> couch_query_servers:end_filter(Pid) end, - {ok, FilterFun, EndFilterFun}; + {FilterFun, EndFilterFun}; _Else -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) -- cgit v1.2.3