diff options
author | Adam Kocoloski <adam@cloudant.com> | 2011-08-15 15:45:15 -0400 |
---|---|---|
committer | Adam Kocoloski <adam@cloudant.com> | 2011-08-15 15:45:15 -0400 |
commit | 6ffe1675dd7b004e48891956a6bdbe32899ce80c (patch) | |
tree | 57326d9a498481e65bb0db38c66daf10896801f1 /apps/couch/src/couch_changes.erl | |
parent | 52ff89ff7996e839b9e2f91fd76184d362a8aeb0 (diff) | |
parent | fdd1a5d0bc48b49b0df5c9217beff9574011283c (diff) |
Merge branch '11554-merge-couchdb-1.1'
Diffstat (limited to 'apps/couch/src/couch_changes.erl')
-rw-r--r-- | apps/couch/src/couch_changes.erl | 272 |
1 files changed, 192 insertions, 80 deletions
diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl index 196f2fd5..44d0ad46 100644 --- a/apps/couch/src/couch_changes.erl +++ b/apps/couch/src/couch_changes.erl @@ -17,17 +17,19 @@ configure_filter/4, filter/2]). %% @spec handle_changes(#changes_args{}, #httpd{} | {json_req, {[any()]}}, #db{}) -> any() -handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> - Args = Args1#changes_args{filter=make_filter_fun(Raw, Style, Req, Db)}, +handle_changes(#changes_args{style=Style}=Args1, Req, Db) -> + #changes_args{feed = Feed} = Args = Args1#changes_args{ + filter = make_filter_fun(Args1#changes_args.filter, Style, Req, Db) + }, StartSeq = case Args#changes_args.dir of rev -> couch_db:get_update_seq(Db); fwd -> Args#changes_args.since end, - if Args#changes_args.feed == "continuous" orelse - Args#changes_args.feed == "longpoll" -> - fun(Callback) -> + if Feed == "continuous" orelse Feed == "longpoll" -> + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), Self = self(), {ok, Notify} = couch_db_update_notifier:start_link( fun({_, DbName}) when DbName == Db#db.name -> @@ -36,12 +38,13 @@ handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> ok end ), - start_sending_changes(Callback, Args#changes_args.feed), + UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback), try keep_sending_changes( Args, Callback, + UserAcc2, Db, StartSeq, <<"">>, @@ -50,37 +53,52 @@ handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> ) after couch_db_update_notifier:stop(Notify), - get_rest_db_updated() % clean out any remaining update messages + get_rest_db_updated(ok) % clean out any remaining update messages end end; true -> - fun(Callback) -> - start_sending_changes(Callback, Args#changes_args.feed), - {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} = + fun(CallbackAcc) -> + {Callback, UserAcc} = get_callback_acc(CallbackAcc), + UserAcc2 = start_sending_changes(Callback, UserAcc, Feed), + {ok, {_, LastSeq, _Prepend, _, _, UserAcc3, _, _, _, _}} = send_changes( Args#changes_args{feed="normal"}, Callback, + UserAcc2, Db, StartSeq, - <<"">> + <<>> ), - end_sending_changes(Callback, LastSeq, Args#changes_args.feed) + end_sending_changes(Callback, UserAcc3, LastSeq, Feed) end end. +get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) -> + Pair; +get_callback_acc(Callback) when is_function(Callback, 2) -> + {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}. + %% @spec make_filter_fun(string(), main_only|all_docs, #httpd{} | {json_req, %% {[any()]}}, #db{}) -> fun() -make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) -> - case [?l2b(couch_httpd:unquote(X)) || X <- string:tokens(Filter, "/")] of +make_filter_fun([$_ | _] = FilterName, Style, Req, Db) -> + builtin_filter_fun(FilterName, Style, Req, Db); +make_filter_fun(FilterName, Style, Req, Db) -> + os_filter_fun(FilterName, Style, Req, Db). + +os_filter_fun(FilterName, Style, Req, Db) -> + case [list_to_binary(couch_httpd:unquote(Part)) + || Part <- string:tokens(FilterName, "/")] of [] -> - make_filter_fun(nil, Style, Req, Db); + fun(_Db2, #doc_info{revs=Revs}) -> + builtin_results(Style, Revs) + end; [DName, FName] -> DesignId = <<"_design/", DName/binary>>, DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), % validate that the ddoc has the filter fun #doc{body={Props}} = DDoc, couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), - fun(DocInfo) -> + fun(Db2, DocInfo) -> DocInfos = case Style of main_only -> @@ -89,10 +107,10 @@ make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) -> [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs] end, Docs = [Doc || {ok, Doc} <- [ - couch_db:open_doc(Db, DocInfo2, [deleted, conflicts]) + couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts]) || DocInfo2 <- DocInfos]], {ok, Passes} = couch_query_servers:filter_docs( - Req, Db, DDoc, FName, Docs + Req, Db2, DDoc, FName, Docs ), [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]} || {Pass, #doc{revs={RevPos,[RevId|_]}}} @@ -101,9 +119,50 @@ make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) -> _Else -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) + end. + +builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) -> + filter_docids(couch_util:get_value(<<"doc_ids">>, Props), Style); +builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) -> + {Props} = couch_httpd:json_body_obj(Req), + DocIds = couch_util:get_value(<<"doc_ids">>, Props, nil), + filter_docids(DocIds, Style); +builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) -> + DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")), + filter_docids(DocIds, Style); +builtin_filter_fun("_design", Style, _Req, _Db) -> + filter_designdoc(Style); +builtin_filter_fun(_FilterName, _Style, _Req, _Db) -> + throw({bad_request, "unknown builtin filter name"}). + +filter_docids(DocIds, Style) when is_list(DocIds)-> + fun(_Db, #doc_info{id=DocId, revs=Revs}) -> + case lists:member(DocId, DocIds) of + true -> + builtin_results(Style, Revs); + _ -> [] + end end; -make_filter_fun(_, Style, _, _) -> - fun(DI) -> ?MODULE:filter(DI, Style) end. +filter_docids(_, _) -> + throw({bad_request, "`doc_ids` filter parameter is not a list."}). + +filter_designdoc(Style) -> + fun(_Db, #doc_info{id=DocId, revs=Revs}) -> + case DocId of + <<"_design", _/binary>> -> + builtin_results(Style, Revs); + _ -> [] + end + end. + +builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) -> + case Style of + main_only -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; + all_docs -> + [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} + || #rev_info{rev=R} <- Revs] + end. configure_filter(Filter, Style, Req, Db) when is_list(Filter) -> case [?l2b(couch_httpd:unquote(X)) || X <- string:tokens(Filter, "/")] of @@ -118,6 +177,17 @@ configure_filter(Filter, Style, Req, Db) when is_list(Filter) -> #doc{body={Props}} = DDoc, couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), {custom, Style, {Db, JsonReq, DDoc, FName}}; + [<<"_doc_ids">>] -> + DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")), + case is_list(DocIds) of + true -> ok; + false -> throw({bad_request, "`doc_ids` filter parameter is not a list."}) + end, + {builtin, Style, {doc_ids, DocIds}}; + [<<"_design">>] -> + {builtin, Style, design}; + [<<"_", _/binary>>] -> + throw({bad_request, "unknown builtin filter name"}); _Else -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) @@ -132,7 +202,11 @@ filter(#doc_info{revs=Revs}, all_docs) -> filter(#doc_info{id=Id, revs=RevInfos}, {custom, main_only, Acc}) -> custom_filter(Id, [(hd(RevInfos))#rev_info.rev], Acc); filter(#doc_info{id=Id, revs=RevInfos}, {custom, all_docs, Acc}) -> - custom_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc). + custom_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc); +filter(#doc_info{id=Id, revs=RevInfos}, {builtin, main_only, Acc}) -> + builtin_filter(Id, [(hd(RevInfos))#rev_info.rev], Acc); +filter(#doc_info{id=Id, revs=RevInfos}, {builtin, all_docs, Acc}) -> + builtin_filter(Id, [R || #rev_info{rev=R} <- RevInfos], Acc). custom_filter(Id, Revs, {Db, JsonReq, DDoc, Filter}) -> {ok, Results} = fabric:open_revs(Db, Id, Revs, [deleted, conflicts]), @@ -144,6 +218,21 @@ custom_filter(Id, Revs, {Db, JsonReq, DDoc, Filter}) -> || {Pass, #doc{revs={RevPos,[RevId|_]}}} <- lists:zip(Passes, Docs), Pass == true]. +builtin_filter(Id, Revs, design) -> + case Id of + <<"_design", _/binary>> -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || Rev <- Revs]; + _ -> + [] + end; +builtin_filter(Id, Revs, {doc_ids, DocIds}) -> + case lists:member(Id, DocIds) of + true -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || Rev <- Revs]; + false -> + [] + end. + get_changes_timeout(Args, Callback) -> #changes_args{ heartbeat = Heartbeat, @@ -157,28 +246,31 @@ get_changes_timeout(Args, Callback) -> undefined -> case Timeout of undefined -> - {DefaultTimeout, fun() -> stop end}; + {DefaultTimeout, fun(UserAcc) -> {stop, UserAcc} end}; infinity -> - {infinity, fun() -> stop end}; + {infinity, fun(UserAcc) -> {stop, UserAcc} end}; _ -> - {lists:min([DefaultTimeout, Timeout]), fun() -> stop end} + {lists:min([DefaultTimeout, Timeout]), + fun(UserAcc) -> {stop, UserAcc} end} end; true -> - {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end}; + {DefaultTimeout, + fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}; _ -> {lists:min([DefaultTimeout, Heartbeat]), - fun() -> Callback(timeout, ResponseType), ok end} + fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end} end. -start_sending_changes(_Callback, "continuous") -> - ok; -start_sending_changes(Callback, ResponseType) -> - Callback(start, ResponseType). +start_sending_changes(_Callback, UserAcc, "continuous") -> + UserAcc; +start_sending_changes(Callback, UserAcc, ResponseType) -> + Callback(start, ResponseType, UserAcc). -send_changes(Args, Callback, Db, StartSeq, Prepend) -> +send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) -> #changes_args{ style = Style, include_docs = IncludeDocs, + conflicts = Conflicts, limit = Limit, feed = ResponseType, dir = Dir, @@ -190,33 +282,36 @@ send_changes(Args, Callback, Db, StartSeq, Prepend) -> StartSeq, fun changes_enumerator/2, [{dir, Dir}], - {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit, - IncludeDocs} + {Db, StartSeq, Prepend, FilterFun, Callback, UserAcc, ResponseType, + Limit, IncludeDocs, Conflicts} ). -keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, +keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -> #changes_args{ feed = ResponseType, - limit = Limit + limit = Limit, + db_open_options = DbOptions } = Args, % ?LOG_INFO("send_changes start ~p",[StartSeq]), - {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes( - Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend + {ok, {_, EndSeq, Prepend2, _, _, UserAcc2, _, NewLimit, _, _}} = send_changes( + Args#changes_args{dir=fwd}, Callback, UserAcc, Db, StartSeq, Prepend ), % ?LOG_INFO("send_changes last ~p",[EndSeq]), couch_db:close(Db), if Limit > NewLimit, ResponseType == "longpoll" -> - end_sending_changes(Callback, EndSeq, ResponseType); + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType); true -> - case wait_db_updated(Timeout, TimeoutFun) of - updated -> + case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of + {updated, UserAcc3} -> % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]), - case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of + DbOptions1 = [{user_ctx, Db#db.user_ctx} | DbOptions], + case couch_db:open(Db#db.name, DbOptions1) of {ok, Db2} -> keep_sending_changes( Args#changes_args{limit=NewLimit}, Callback, + UserAcc3, Db2, EndSeq, Prepend2, @@ -224,79 +319,96 @@ keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout, TimeoutFun ); _Else -> - end_sending_changes(Callback, EndSeq, ResponseType) + end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType) end; - stop -> + {stop, UserAcc3} -> % ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]), - end_sending_changes(Callback, EndSeq, ResponseType) + end_sending_changes(Callback, UserAcc3, EndSeq, ResponseType) end end. -end_sending_changes(Callback, EndSeq, ResponseType) -> - Callback({stop, EndSeq}, ResponseType). +end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) -> + Callback({stop, EndSeq}, ResponseType, UserAcc). -changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous", - Limit, IncludeDocs}) -> +changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, UserAcc, + "continuous", Limit, IncludeDocs, Conflicts}) -> - #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} - = DocInfo, - Results0 = FilterFun(DocInfo), + #doc_info{high_seq = Seq} = DocInfo, + Results0 = FilterFun(Db, DocInfo), Results = [Result || Result <- Results0, Result /= null], Go = if Limit =< 1 -> stop; true -> ok end, case Results of [] -> - {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit, - IncludeDocs} + {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc, "continuous", Limit, + IncludeDocs, Conflicts} }; _ -> - ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs), - Callback({change, ChangesRow, <<"">>}, "continuous"), - {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit - 1, - IncludeDocs} + ChangesRow = changes_row(Db, Results, DocInfo, IncludeDocs, Conflicts), + UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc), + {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc2, "continuous", + Limit - 1, IncludeDocs, Conflicts} } end; -changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType, - Limit, IncludeDocs}) -> +changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, UserAcc, + ResponseType, Limit, IncludeDocs, Conflicts}) -> - #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} - = DocInfo, - Results0 = FilterFun(DocInfo), + #doc_info{high_seq = Seq} = DocInfo, + Results0 = FilterFun(Db, DocInfo), Results = [Result || Result <- Results0, Result /= null], - Go = if Limit =< 1 -> stop; true -> ok end, + Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end, case Results of [] -> - {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit, - IncludeDocs} + {Go, {Db, Seq, Prepend, FilterFun, Callback, UserAcc, ResponseType, + Limit, IncludeDocs, Conflicts} }; _ -> - ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs), - Callback({change, ChangesRow, Prepend}, ResponseType), - {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit - 1, - IncludeDocs} + ChangesRow = changes_row(Db, Results, DocInfo, IncludeDocs, Conflicts), + UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc), + {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, UserAcc2, ResponseType, + Limit - 1, IncludeDocs, Conflicts} } end. -changes_row(Db, Seq, Id, Del, Results, Rev, true) -> - {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ - deleted_item(Del) ++ couch_httpd_view:doc_member(Db, {Id, Rev})}; -changes_row(_, Seq, Id, Del, Results, _, false) -> +changes_row(Db, Results, DocInfo, IncludeDoc, Conflicts) -> + #doc_info{ + id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] + } = DocInfo, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++ - deleted_item(Del)}. + deleted_item(Del) ++ case IncludeDoc of + true -> + Options = if Conflicts -> [conflicts]; true -> [] end, + couch_httpd_view:doc_member(Db, DocInfo, Options); + false -> + [] + end}. -deleted_item(true) -> [{deleted, true}]; +deleted_item(true) -> [{<<"deleted">>, true}]; deleted_item(_) -> []. % waits for a db_updated msg, if there are multiple msgs, collects them. -wait_db_updated(Timeout, TimeoutFun) -> - receive db_updated -> get_rest_db_updated() +wait_db_updated(Timeout, TimeoutFun, UserAcc) -> + receive + db_updated -> + get_rest_db_updated(UserAcc) after Timeout -> - case TimeoutFun() of - ok -> wait_db_updated(Timeout, TimeoutFun); - stop -> stop + {Go, UserAcc2} = TimeoutFun(UserAcc), + case Go of + ok -> + wait_db_updated(Timeout, TimeoutFun, UserAcc2); + stop -> + {stop, UserAcc2} end end. +get_rest_db_updated(UserAcc) -> + receive + db_updated -> + get_rest_db_updated(UserAcc) + after 0 -> + {updated, UserAcc} + end. + get_rest_db_updated() -> receive db_updated -> get_rest_db_updated() after 0 -> updated |