summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_changes.erl
diff options
context:
space:
mode:
authorFilipe David Borba Manana <fdmanana@apache.org>2010-09-09 18:19:05 +0000
committerFilipe David Borba Manana <fdmanana@apache.org>2010-09-09 18:19:05 +0000
commit905d6bc13a034a59e7a2c2d8df5975271500da0f (patch)
tree3974545e55790bff673e33b15558133e720b59a7 /src/couchdb/couch_changes.erl
parent2b9d22d8ca64c2017443318a9d1c03fb202d273f (diff)
Refactor changes module to allow for accumulators with the callback (optional, doesn't break public API).
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@995528 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_changes.erl')
-rw-r--r--src/couchdb/couch_changes.erl128
1 files changed, 74 insertions, 54 deletions
diff --git a/src/couchdb/couch_changes.erl b/src/couchdb/couch_changes.erl
index 098fac84..78e730b5 100644
--- a/src/couchdb/couch_changes.erl
+++ b/src/couchdb/couch_changes.erl
@@ -17,17 +17,18 @@
%% @type Req -> #httpd{} | {json_req, JsonObj()}
handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
- Args = Args1#changes_args{filter=
- make_filter_fun(Args1#changes_args.filter, Style, Req, Db)},
+ #changes_args{feed = Feed} = Args = Args1#changes_args{
+ filter = make_filter_fun(Args1#changes_args.filter, Style, Req, Db)
+ },
StartSeq = case Args#changes_args.dir of
rev ->
couch_db:get_update_seq(Db);
fwd ->
Args#changes_args.since
end,
- if Args#changes_args.feed == "continuous" orelse
- Args#changes_args.feed == "longpoll" ->
- fun(Callback) ->
+ if Feed == "continuous" orelse Feed == "longpoll" ->
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
Self = self(),
{ok, Notify} = couch_db_update_notifier:start_link(
fun({_, DbName}) when DbName == Db#db.name ->
@@ -36,12 +37,13 @@ handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
ok
end
),
- start_sending_changes(Callback, Args#changes_args.feed),
+ UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
try
keep_sending_changes(
Args,
Callback,
+ UserAcc2,
Db,
StartSeq,
<<"">>,
@@ -50,24 +52,31 @@ handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
)
after
couch_db_update_notifier:stop(Notify),
- get_rest_db_updated() % clean out any remaining update messages
+ get_rest_db_updated(ok) % clean out any remaining update messages
end
end;
true ->
- fun(Callback) ->
- start_sending_changes(Callback, Args#changes_args.feed),
- {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} =
+ fun(CallbackAcc) ->
+ {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+ UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ {ok, {_, LastSeq, _Prepend, _, _, UserAcc3, _, _, _}} =
send_changes(
Args#changes_args{feed="normal"},
Callback,
+ UserAcc2,
Db,
StartSeq,
- <<"">>
+ <<>>
),
- end_sending_changes(Callback, LastSeq, Args#changes_args.feed)
+ end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
end
end.
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+ Pair;
+get_callback_acc(Callback) when is_function(Callback, 2) ->
+ {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+
%% @type Req -> #httpd{} | {json_req, JsonObj()}
make_filter_fun(FilterName, Style, Req, Db) ->
case [list_to_binary(couch_httpd:unquote(Part))
@@ -128,21 +137,23 @@ get_changes_timeout(Args, Callback) ->
infinity ->
{infinity, fun() -> stop end};
_ ->
- {lists:min([DefaultTimeout, Timeout]), fun() -> stop end}
+ {lists:min([DefaultTimeout, Timeout]),
+ fun(UserAcc) -> {stop, UserAcc} end}
end;
true ->
- {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end};
+ {DefaultTimeout,
+ fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
_ ->
{lists:min([DefaultTimeout, Heartbeat]),
- fun() -> Callback(timeout, ResponseType), ok end}
+ fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
end.
-start_sending_changes(_Callback, "continuous") ->
- ok;
-start_sending_changes(Callback, ResponseType) ->
- Callback(start, ResponseType).
+start_sending_changes(_Callback, UserAcc, "continuous") ->
+ UserAcc;
+start_sending_changes(Callback, UserAcc, ResponseType) ->
+ Callback(start, ResponseType, UserAcc).
-send_changes(Args, Callback, Db, StartSeq, Prepend) ->
+send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) ->
#changes_args{
style = Style,
include_docs = IncludeDocs,
@@ -157,11 +168,11 @@ send_changes(Args, Callback, Db, StartSeq, Prepend) ->
StartSeq,
fun changes_enumerator/2,
[{dir, Dir}],
- {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit,
- IncludeDocs}
+ {Db, StartSeq, Prepend, FilterFun, Callback, UserAcc, ResponseType,
+ Limit, IncludeDocs}
).
-keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
+keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout,
TimeoutFun) ->
#changes_args{
feed = ResponseType,
@@ -169,16 +180,16 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
db_open_options = DbOptions
} = Args,
% ?LOG_INFO("send_changes start ~p",[StartSeq]),
- {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(
- Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend
+ {ok, {_, EndSeq, Prepend2, _, _, UserAcc2, _, NewLimit, _}} = send_changes(
+ Args#changes_args{dir=fwd}, Callback, UserAcc, Db, StartSeq, Prepend
),
% ?LOG_INFO("send_changes last ~p",[EndSeq]),
couch_db:close(Db),
if Limit > NewLimit, ResponseType == "longpoll" ->
- end_sending_changes(Callback, EndSeq, ResponseType);
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
true ->
- case wait_db_updated(Timeout, TimeoutFun) of
- updated ->
+ case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
+ {updated, UserAcc3} ->
% ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]),
DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions],
case couch_db:open(Db#db.name, DbOptions1) of
@@ -186,6 +197,7 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
keep_sending_changes(
Args#changes_args{limit=NewLimit},
Callback,
+ UserAcc3,
Db2,
EndSeq,
Prepend2,
@@ -193,19 +205,19 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
TimeoutFun
);
_Else ->
- end_sending_changes(Callback, EndSeq, ResponseType)
+ end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
end;
- stop ->
+ {stop, UserAcc3} ->
% ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]),
- end_sending_changes(Callback, EndSeq, ResponseType)
+ end_sending_changes(Callback, UserAcc3, EndSeq, ResponseType)
end
end.
-end_sending_changes(Callback, EndSeq, ResponseType) ->
- Callback({stop, EndSeq}, ResponseType).
+end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
+ Callback({stop, EndSeq}, ResponseType, UserAcc).
-changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous",
- Limit, IncludeDocs}) ->
+changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, UserAcc,
+ "continuous", Limit, IncludeDocs}) ->
#doc_info{id=Id, high_seq=Seq,
revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo,
@@ -214,18 +226,18 @@ changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous",
Go = if Limit =< 1 -> stop; true -> ok end,
case Results of
[] ->
- {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit,
+ {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc, "continuous", Limit,
IncludeDocs}
};
_ ->
ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
- Callback({change, ChangesRow, <<"">>}, "continuous"),
- {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit - 1,
- IncludeDocs}
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc),
+ {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc2, "continuous",
+ Limit - 1, IncludeDocs}
}
end;
-changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType,
- Limit, IncludeDocs}) ->
+changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, UserAcc,
+ ResponseType, Limit, IncludeDocs}) ->
#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
= DocInfo,
@@ -234,14 +246,14 @@ changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType,
Go = if Limit =< 1 -> stop; true -> ok end,
case Results of
[] ->
- {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit,
- IncludeDocs}
+ {Go, {Db, Seq, Prepend, FilterFun, Callback, UserAcc, ResponseType,
+ Limit, IncludeDocs}
};
_ ->
ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
- Callback({change, ChangesRow, Prepend}, ResponseType),
- {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit - 1,
- IncludeDocs}
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, UserAcc2, ResponseType,
+ Limit - 1, IncludeDocs}
}
end.
@@ -257,16 +269,24 @@ deleted_item(true) -> [{<<"deleted">>, true}];
deleted_item(_) -> [].
% waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout, TimeoutFun) ->
- receive db_updated -> get_rest_db_updated()
+wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
+ receive
+ db_updated ->
+ get_rest_db_updated(UserAcc)
after Timeout ->
- case TimeoutFun() of
- ok -> wait_db_updated(Timeout, TimeoutFun);
- stop -> stop
+ {Go, UserAcc2} = TimeoutFun(UserAcc),
+ case Go of
+ ok ->
+ wait_db_updated(Timeout, TimeoutFun, UserAcc2);
+ stop ->
+ {stop, UserAcc2}
end
end.
-get_rest_db_updated() ->
- receive db_updated -> get_rest_db_updated()
- after 0 -> updated
+get_rest_db_updated(UserAcc) ->
+ receive
+ db_updated ->
+ get_rest_db_updated(UserAcc)
+ after 0 ->
+ {updated, UserAcc}
end.