From 2bdd75901fba402068d08e316a3ac32249307e27 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Fri, 26 Feb 2010 01:11:02 +0000 Subject: fdmananas patch for filtered replication via COUCHDB-673 git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@916518 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_httpd_db.erl | 265 ++++++++++++----------------------------- 1 file changed, 77 insertions(+), 188 deletions(-) (limited to 'src/couchdb/couch_httpd_db.erl') diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 9ad34752..1028e857 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -55,200 +55,62 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, do_db_req(Req, Handler) end. -get_changes_timeout(Req, Resp) -> - DefaultTimeout = list_to_integer( - couch_config:get("httpd", "changes_timeout", "60000")), - case couch_httpd:qs_value(Req, "heartbeat") of - undefined -> - case couch_httpd:qs_value(Req, "timeout") of - undefined -> - {DefaultTimeout, fun() -> stop end}; - TimeoutList -> - {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]), - fun() -> stop end} - end; - "true" -> - {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end}; - TimeoutList -> - {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]), - fun() -> send_chunk(Resp, "\n"), ok end} - end. - - -start_sending_changes(_Resp, "continuous") -> - ok; -start_sending_changes(Resp, _Else) -> - send_chunk(Resp, "{\"results\":[\n"). - -handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> - FilterFun = make_filter_fun(Req, Db), - {ok, Info} = couch_db:get_db_info(Db), - Seq = proplists:get_value(update_seq, Info), - {Dir, StartSeq} = case couch_httpd:qs_value(Req, "descending", "false") of - "false" -> - {fwd, list_to_integer(couch_httpd:qs_value(Req, "since", "0"))}; - "true" -> - {rev, Seq}; - _Bad -> throw({bad_request, "descending must be true or false"}) +handle_changes_req(#httpd{method='GET'}=Req, Db) -> + MakeCallback = fun(Resp) -> + fun({change, Change, _}, "continuous") -> + send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); + ({change, Change, Prepend}, _) -> + send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); + (start, "continuous") -> + ok; + (start, _) -> + send_chunk(Resp, "{\"results\":[\n"); + ({stop, EndSeq}, "continuous") -> + send_chunk( + Resp, + [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"] + ), + end_json_response(Resp); + ({stop, EndSeq}, _) -> + send_chunk( + Resp, + io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]) + ), + end_json_response(Resp); + (timeout, _) -> + send_chunk(Resp, "\n") + end end, - Limit = list_to_integer(couch_httpd:qs_value(Req, "limit", "1000000000000000")), - ResponseType = couch_httpd:qs_value(Req, "feed", "normal"), - if ResponseType == "continuous" orelse ResponseType == "longpoll" -> - {ok, Resp} = start_json_response(Req, 200), - start_sending_changes(Resp, ResponseType), - - Self = self(), - {ok, Notify} = couch_db_update_notifier:start_link( - fun({_, DbName0}) when DbName0 == DbName -> - Self ! db_updated; - (_) -> - ok - end), - {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp), - couch_stats_collector:track_process_count(Self, - {httpd, clients_requesting_changes}), - try - keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout, - TimeoutFun, ResponseType, Limit, FilterFun) - after - couch_db_update_notifier:stop(Notify), - get_rest_db_updated() % clean out any remaining update messages - end; - true -> + ChangesArgs = parse_changes_query(Req), + ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), + WrapperFun = case ChangesArgs#changes_args.feed of + "normal" -> + {ok, Info} = couch_db:get_db_info(Db), CurrentEtag = couch_httpd:make_etag(Info), - couch_httpd:etag_respond(Req, CurrentEtag, fun() -> - % send the etag - {ok, Resp} = start_json_response(Req, 200, [{"Etag", CurrentEtag}]), - start_sending_changes(Resp, ResponseType), - {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} = - send_changes(Req, Resp, Db, Dir, StartSeq, <<"">>, "normal", - Limit, FilterFun), - end_sending_changes(Resp, LastSeq, ResponseType) - end) - end; + fun(FeedChangesFun) -> + couch_httpd:etag_respond( + Req, + CurrentEtag, + fun() -> + {ok, Resp} = couch_httpd:start_json_response( + Req, 200, [{"Etag", CurrentEtag}] + ), + FeedChangesFun(MakeCallback(Resp)) + end + ) + end; + _ -> + % "longpoll" or "continuous" + {ok, Resp} = couch_httpd:start_json_response(Req, 200), + fun(FeedChangesFun) -> + FeedChangesFun(MakeCallback(Resp)) + end + end, + WrapperFun(ChangesFun); handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> send_method_not_allowed(Req, "GET,HEAD"). -% waits for a db_updated msg, if there are multiple msgs, collects them. -wait_db_updated(Timeout, TimeoutFun) -> - receive db_updated -> get_rest_db_updated() - after Timeout -> - case TimeoutFun() of - ok -> wait_db_updated(Timeout, TimeoutFun); - stop -> stop - end - end. - -get_rest_db_updated() -> - receive db_updated -> get_rest_db_updated() - after 0 -> updated - end. - -end_sending_changes(Resp, EndSeq, "continuous") -> - send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]), - end_json_response(Resp); -end_sending_changes(Resp, EndSeq, _Else) -> - send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])), - end_json_response(Resp). - -keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, - Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType, Limit, Filter) -> - {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(Req, Resp, Db, fwd, StartSeq, - Prepend, ResponseType, Limit, Filter), - couch_db:close(Db), - if - Limit > NewLimit, ResponseType == "longpoll" -> - end_sending_changes(Resp, EndSeq, ResponseType); - true -> - case wait_db_updated(Timeout, TimeoutFun) of - updated -> - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db2} -> - keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, - TimeoutFun, ResponseType, NewLimit, Filter); - _Else -> - end_sending_changes(Resp, EndSeq, ResponseType) - end; - stop -> - end_sending_changes(Resp, EndSeq, ResponseType) - end - end. - -changes_enumerator(DocInfos, {Db, _, _, FilterFun, Resp, "continuous", Limit, IncludeDocs}) -> - [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos, - Results0 = FilterFun(DocInfos), - Results = [Result || Result <- Results0, Result /= null], - Go = if Limit =< 1 -> stop; true -> ok end, - case Results of - [] -> - {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit, IncludeDocs}}; - _ -> - send_chunk(Resp, [?JSON_ENCODE(changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs)) - |"\n"]), - {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit-1, IncludeDocs}} - end; -changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Resp, _, Limit, IncludeDocs}) -> - [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos, - Results0 = FilterFun(DocInfos), - Results = [Result || Result <- Results0, Result /= null], - Go = if Limit =< 1 -> stop; true -> ok end, - case Results of - [] -> - {Go, {Db, Seq, Prepend, FilterFun, Resp, nil, Limit, IncludeDocs}}; - _ -> - send_chunk(Resp, [Prepend, ?JSON_ENCODE( - changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs))]), - {Go, {Db, Seq, <<",\n">>, FilterFun, Resp, nil, Limit-1, IncludeDocs}} - end. - -changes_row(Db, Seq, Id, Del, Results, Rev, true) -> - {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del) ++ - couch_httpd_view:doc_member(Db, {Id, Rev})}; -changes_row(_, Seq, Id, Del, Results, _, false) -> - {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del)}. - -deleted_item(true) -> [{deleted,true}]; -deleted_item(_) -> []. - -send_changes(Req, Resp, Db, Dir, StartSeq, Prepend, ResponseType, Limit, FilterFun) -> - Style = list_to_existing_atom( - couch_httpd:qs_value(Req, "style", "main_only")), - IncludeDocs = list_to_existing_atom( - couch_httpd:qs_value(Req, "include_docs", "false")), - couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2, - [{dir, Dir}], {Db, StartSeq, Prepend, FilterFun, Resp, ResponseType, Limit, IncludeDocs}). - -make_filter_fun(Req, Db) -> - Filter = couch_httpd:qs_value(Req, "filter", ""), - case [list_to_binary(couch_httpd:unquote(Part)) - || Part <- string:tokens(Filter, "/")] of - [] -> - fun(DocInfos) -> - % doing this as a batch is more efficient for external filters - [{[{rev, couch_doc:rev_to_str(Rev)}]} || - #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos] - end; - [DName, FName] -> - DesignId = <<"_design/", DName/binary>>, - DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), - % validate that the ddoc has the filter fun - #doc{body={Props}} = DDoc, - couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]), - fun(DocInfos) -> - Docs = [Doc || {ok, Doc} <- [ - {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts]) - || DInfo <- DocInfos]], - {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs), - [{[{rev, couch_doc:rev_to_str(Rev)}]} - || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos, - Pass <- Passes, Pass == true] - end; - _Else -> - throw({bad_request, - "filter parameter must be of the form `designname/filtername`"}) - end. - handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) -> ok = couch_view_compactor:start_compact(DbName, Id), send_json(Req, 202, {[{ok, true}]}); @@ -1188,6 +1050,33 @@ parse_doc_query(Req) -> end end, #doc_query_args{}, couch_httpd:qs(Req)). +parse_changes_query(Req) -> + lists:foldl(fun({Key, Value}, Args) -> + case {Key, Value} of + {"feed", _} -> + Args#changes_args{feed=Value}; + {"descending", "true"} -> + Args#changes_args{dir=rev}; + {"since", _} -> + Args#changes_args{since=list_to_integer(Value)}; + {"limit", _} -> + Args#changes_args{limit=list_to_integer(Value)}; + {"style", _} -> + Args#changes_args{style=list_to_existing_atom(Value)}; + {"heartbeat", "true"} -> + Args#changes_args{heartbeat=true}; + {"heartbeat", _} -> + Args#changes_args{heartbeat=list_to_integer(Value)}; + {"timeout", _} -> + Args#changes_args{timeout=list_to_integer(Value)}; + {"include_docs", "true"} -> + Args#changes_args{include_docs=true}; + {"filter", _} -> + Args#changes_args{filter=Value}; + _Else -> % unknown key value pair, ignore. + Args + end + end, #changes_args{}, couch_httpd:qs(Req)). extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)-> extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev)); -- cgit v1.2.3