From c43ae32b00ab64e5427b924867a4783ca0edee57 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Wed, 7 Apr 2010 02:25:36 +0000 Subject: changes is less likely to miss updates, and changes test is more robust git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@931407 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_changes.erl | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'src/couchdb') diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index 35ea147f..9496c226 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -17,7 +17,7 @@ %% @type Req -> #httpd{} | {json_req, JsonObj()} handle_changes(#changes_args{}=Args1, Req, Db) -> - Args = Args1#changes_args{filter=make_filter_fun(Args1, Req, Db)}, + Args = Args1#changes_args{filter=make_filter_fun(Args1#changes_args.filter, Req, Db)}, StartSeq = case Args#changes_args.dir of rev -> couch_db:get_update_seq(Db); @@ -27,7 +27,6 @@ handle_changes(#changes_args{}=Args1, Req, Db) -> if Args#changes_args.feed == "continuous" orelse Args#changes_args.feed == "longpoll" -> fun(Callback) -> - start_sending_changes(Callback, Args#changes_args.feed), Self = self(), {ok, Notify} = couch_db_update_notifier:start_link( fun({_, DbName}) when DbName == Db#db.name -> @@ -36,6 +35,7 @@ handle_changes(#changes_args{}=Args1, Req, Db) -> ok end ), + start_sending_changes(Callback, Args#changes_args.feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), couch_stats_collector:track_process_count( Self, @@ -72,7 +72,7 @@ handle_changes(#changes_args{}=Args1, Req, Db) -> end. %% @type Req -> #httpd{} | {json_req, JsonObj()} -make_filter_fun(#changes_args{filter=FilterName}, Req, Db) -> +make_filter_fun(FilterName, Req, Db) -> case [list_to_binary(couch_httpd:unquote(Part)) || Part <- string:tokens(FilterName, "/")] of [] -> @@ -94,6 +94,7 @@ make_filter_fun(#changes_args{filter=FilterName}, Req, Db) -> {ok, Passes} = couch_query_servers:filter_docs( Req, Db, DDoc, FName, Docs ), + % ?LOG_INFO("filtering ~p ~w",[FName, [DI#doc_info.high_seq || DI <- DocInfos]]), [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos, Pass <- Passes, Pass == true] @@ -155,20 +156,22 @@ send_changes(Args, Callback, Db, StartSeq, Prepend) -> keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> - #changes_args{ feed = ResponseType, limit = Limit } = Args, + % ?LOG_INFO("send_changes start ~p",[StartSeq]), {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes( Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend ), + % ?LOG_INFO("send_changes last ~p",[EndSeq]), couch_db:close(Db), if Limit > NewLimit, ResponseType == "longpoll" -> end_sending_changes(Callback, EndSeq, ResponseType); true -> case wait_db_updated(Timeout, TimeoutFun) of updated -> + % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]), case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of {ok, Db2} -> keep_sending_changes( @@ -184,6 +187,7 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, end_sending_changes(Callback, EndSeq, ResponseType) end; stop -> + % ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]), end_sending_changes(Callback, EndSeq, ResponseType) end end. -- cgit v1.2.3