From 905d6bc13a034a59e7a2c2d8df5975271500da0f Mon Sep 17 00:00:00 2001 From: Filipe David Borba Manana Date: Thu, 9 Sep 2010 18:19:05 +0000 Subject: Refactor changes module to allow for accumulators with the callback (optional, doesn't break public API). git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@995528 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_changes.erl | 128 ++++++++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 54 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl index 098fac84..78e730b5 100644 --- a/src/couchdb/couch_changes.erl +++ b/src/couchdb/couch_changes.erl @@ -17,17 +17,18 @@ %% @type Req -> #httpd{} | {json_req, JsonObj()} handle_changes(#changes_args{style=Style}=Args1, Req, Db) -> - Args = Args1#changes_args{filter= - make_filter_fun(Args1#changes_args.filter, Style, Req, Db)}, + #changes_args{feed = Feed} = Args = Args1#changes_args{ + filter = make_filter_fun(Args1#changes_args.filter, Style, Req, Db) + }, StartSeq = case Args#changes_args.dir of rev -> couch_db:get_update_seq(Db); fwd -> Args#changes_args.since end, - if Args#changes_args.feed == "continuous" orelse - Args#changes_args.feed == "longpoll" -> - fun(Callback) -> + if Feed == "continuous" orelse Feed == "longpoll" -> + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), Self = self(), {ok, Notify} = couch_db_update_notifier:start_link( fun({_, DbName}) when DbName == Db#db.name -> @@ -36,12 +37,13 @@ handle_changes(#changes_args{style=Style}=Args1, Req, Db) -> ok end ), - start_sending_changes(Callback, Args#changes_args.feed), + UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), try keep_sending_changes( Args, Callback, + UserAcc2, Db, StartSeq, <<"">>, @@ -50,24 +52,31 @@ handle_changes(#changes_args{style=Style}=Args1, Req, Db) -> ) after couch_db_update_notifier:stop(Notify), - get_rest_db_updated() % clean out any remaining update messages + get_rest_db_updated(ok) % clean out any remaining update messages end end; true -> - fun(Callback) -> - start_sending_changes(Callback, Args#changes_args.feed), - {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} = + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), + UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), + {ok, {_, LastSeq, _Prepend, _, _, UserAcc3, _, _, _}} = send_changes( Args#changes_args{feed="normal"}, Callback, + UserAcc2, Db, StartSeq, - <<"">> + <<>> ), - end_sending_changes(Callback, LastSeq, Args#changes_args.feed) + end_sending_changes(Callback, UserAcc3, LastSeq, Feed) end end. +get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) -> + Pair; +get_callback_acc(Callback) when is_function(Callback, 2) -> + {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}. + %% @type Req -> #httpd{} | {json_req, JsonObj()} make_filter_fun(FilterName, Style, Req, Db) -> case [list_to_binary(couch_httpd:unquote(Part)) @@ -128,21 +137,23 @@ get_changes_timeout(Args, Callback) -> infinity -> {infinity, fun() -> stop end}; _ -> - {lists:min([DefaultTimeout, Timeout]), fun() -> stop end} + {lists:min([DefaultTimeout, Timeout]), + fun(UserAcc) -> {stop, UserAcc} end} end; true -> - {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end}; + {DefaultTimeout, + fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}; _ -> {lists:min([DefaultTimeout, Heartbeat]), - fun() -> Callback(timeout, ResponseType), ok end} + fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} end. -start_sending_changes(_Callback, "continuous") -> - ok; -start_sending_changes(Callback, ResponseType) -> - Callback(start, ResponseType). +start_sending_changes(_Callback, UserAcc, "continuous") -> + UserAcc; +start_sending_changes(Callback, UserAcc, ResponseType) -> + Callback(start, ResponseType, UserAcc). -send_changes(Args, Callback, Db, StartSeq, Prepend) -> +send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) -> #changes_args{ style = Style, include_docs = IncludeDocs, @@ -157,11 +168,11 @@ send_changes(Args, Callback, Db, StartSeq, Prepend) -> StartSeq, fun changes_enumerator/2, [{dir, Dir}], - {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit, - IncludeDocs} + {Db, StartSeq, Prepend, FilterFun, Callback, UserAcc, ResponseType, + Limit, IncludeDocs} ). -keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, +keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ feed = ResponseType, @@ -169,16 +180,16 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, db_open_options = DbOptions } = Args, % ?LOG_INFO("send_changes start ~p",[StartSeq]), - {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes( - Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend + {ok, {_, EndSeq, Prepend2, _, _, UserAcc2, _, NewLimit, _}} = send_changes( + Args#changes_args{dir=fwd}, Callback, UserAcc, Db, StartSeq, Prepend ), % ?LOG_INFO("send_changes last ~p",[EndSeq]), couch_db:close(Db), if Limit > NewLimit, ResponseType == "longpoll" -> - end_sending_changes(Callback, EndSeq, ResponseType); + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); true -> - case wait_db_updated(Timeout, TimeoutFun) of - updated -> + case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of + {updated, UserAcc3} -> % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]), DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], case couch_db:open(Db#db.name, DbOptions1) of @@ -186,6 +197,7 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, keep_sending_changes( Args#changes_args{limit=NewLimit}, Callback, + UserAcc3, Db2, EndSeq, Prepend2, @@ -193,19 +205,19 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, TimeoutFun ); _Else -> - end_sending_changes(Callback, EndSeq, ResponseType) + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) end; - stop -> + {stop, UserAcc3} -> % ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]), - end_sending_changes(Callback, EndSeq, ResponseType) + end_sending_changes(Callback, UserAcc3, EndSeq, ResponseType) end end. -end_sending_changes(Callback, EndSeq, ResponseType) -> - Callback({stop, EndSeq}, ResponseType). +end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> + Callback({stop, EndSeq}, ResponseType, UserAcc). -changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous", - Limit, IncludeDocs}) -> +changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, UserAcc, + "continuous", Limit, IncludeDocs}) -> #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo, @@ -214,18 +226,18 @@ changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous", Go = if Limit =< 1 -> stop; true -> ok end, case Results of [] -> - {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit, + {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc, "continuous", Limit, IncludeDocs} }; _ -> ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs), - Callback({change, ChangesRow, <<"">>}, "continuous"), - {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit - 1, - IncludeDocs} + UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc), + {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc2, "continuous", + Limit - 1, IncludeDocs} } end; -changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType, - Limit, IncludeDocs}) -> +changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, UserAcc, + ResponseType, Limit, IncludeDocs}) -> #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo, @@ -234,14 +246,14 @@ changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType, Go = if Limit =< 1 -> stop; true -> ok end, case Results of [] -> - {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit, - IncludeDocs} + {Go, {Db, Seq, Prepend, FilterFun, Callback, UserAcc, ResponseType, + Limit, IncludeDocs} }; _ -> ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs), - Callback({change, ChangesRow, Prepend}, ResponseType), - {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit - 1, - IncludeDocs} + UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, UserAcc2, ResponseType, + Limit - 1, IncludeDocs} } end. @@ -257,16 +269,24 @@ deleted_item(true) -> [{<<"deleted">>, true}]; deleted_item(_) -> []. % waits for a db_updated msg, if there are multiple msgs, collects them. -wait_db_updated(Timeout, TimeoutFun) -> - receive db_updated -> get_rest_db_updated() +wait_db_updated(Timeout, TimeoutFun, UserAcc) -> + receive + db_updated -> + get_rest_db_updated(UserAcc) after Timeout -> - case TimeoutFun() of - ok -> wait_db_updated(Timeout, TimeoutFun); - stop -> stop + {Go, UserAcc2} = TimeoutFun(UserAcc), + case Go of + ok -> + wait_db_updated(Timeout, TimeoutFun, UserAcc2); + stop -> + {stop, UserAcc2} end end. -get_rest_db_updated() -> - receive db_updated -> get_rest_db_updated() - after 0 -> updated +get_rest_db_updated(UserAcc) -> + receive + db_updated -> + get_rest_db_updated(UserAcc) + after 0 -> + {updated, UserAcc} end. -- cgit v1.2.3