From f4d7191a3c0c37fe7b2ba290ba3e1afda0269863 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Fri, 11 Jun 2010 16:08:05 -0400 Subject: _changes feed via fabric --- src/chttpd_db.erl | 102 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 57 insertions(+), 45 deletions(-) diff --git a/src/chttpd_db.erl b/src/chttpd_db.erl index 3b35220a..10388ff5 100644 --- a/src/chttpd_db.erl +++ b/src/chttpd_db.erl @@ -47,56 +47,69 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method, end. handle_changes_req(#httpd{method='GET'}=Req, Db) -> - MakeCallback = fun(Resp) -> - fun({change, Change, _}, "continuous") -> - send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]); - ({change, Change, Prepend}, _) -> - send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]); - (start, "continuous") -> - ok; - (start, _) -> - send_chunk(Resp, "{\"results\":[\n"); - ({stop, EndSeq}, "continuous") -> - send_chunk( - Resp, - [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"] - ), - end_json_response(Resp); - ({stop, EndSeq}, _) -> - send_chunk( - Resp, - io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]) - ), - end_json_response(Resp); - (timeout, _) -> - send_chunk(Resp, "\n") - end - end, ChangesArgs = parse_changes_query(Req), - ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db), case ChangesArgs#changes_args.feed of "normal" -> - {ok, Info} = couch_db:get_db_info(Db), - CurrentEtag = chttpd:make_etag(Info), - chttpd:etag_respond( - Req, - CurrentEtag, - fun() -> - {ok, Resp} = chttpd:start_json_response( - Req, 200, [{"Etag", CurrentEtag}] - ), - ChangesFun(MakeCallback(Resp)) - end - ); - _ -> + T0 = now(), + {ok, Info} = fabric:get_db_info(Db), + Etag = chttpd:make_etag(Info), + DeltaT = timer:now_diff(now(), T0) / 1000, + couch_stats_collector:record({couchdb, dbinfo}, DeltaT), + chttpd:etag_respond(Req, Etag, fun() -> + {ok, Resp} = chttpd:start_json_response(Req, 200, [{"Etag",Etag}]), + fabric:changes(Db, ChangesArgs, fun changes_callback/2, + {"normal", Resp}) + end); + Feed -> % "longpoll" or "continuous" {ok, Resp} = chttpd:start_json_response(Req, 200), - ChangesFun(MakeCallback(Resp)) + fabric:changes(Db, ChangesArgs, fun changes_callback/2, {Feed, Resp}) end; - handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> send_method_not_allowed(Req, "GET,HEAD"). +% callbacks for continuous feed (newline-delimited JSON Objects) +changes_callback(start, {"continuous", _} = Acc) -> + {ok, Acc}; +changes_callback({change, Change}, {"continuous", Resp} = Acc) -> + send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]), + {ok, Acc}; +changes_callback({stop, EndSeq0}, {"continuous", Resp}) -> + EndSeq = case is_old_couch(Resp) of true -> 0; false -> EndSeq0 end, + send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]), + end_json_response(Resp); + +% callbacks for longpoll and normal (single JSON Object) +changes_callback(start, {_, Resp}) -> + send_chunk(Resp, "{\"results\":[\n"), + {ok, {"", Resp}}; +changes_callback({change, Change}, {Prepend, Resp}) -> + send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]), + {ok, {",\r\n", Resp}}; +changes_callback({stop, EndSeq}, {_, Resp}) -> + case is_old_couch(Resp) of + true -> + send_chunk(Resp, "\n],\n\"last_seq\":0}\n"); + false -> + send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":\"~s\"}\n",[EndSeq])) + end, + end_json_response(Resp); + +changes_callback(timeout, {Prepend, Resp}) -> + send_chunk(Resp, "\n"), + {ok, {Prepend, Resp}}; +changes_callback({error, Reason}, Resp) -> + chttpd:send_chunked_error(Resp, {error, Reason}). + +is_old_couch(Resp) -> + MochiReq = Resp:get(request), + case MochiReq:get_header_value("user-agent") of + undefined -> + false; + UserAgent -> + string:str(UserAgent, "CouchDB/0") > 0 + end. + handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) -> ok = ?COUCH:compact_view_group(DbName, Id), send_json(Req, 202, {[{ok, true}]}); @@ -382,7 +395,7 @@ all_docs_view(Req, Db, Keys) -> {ok, Info} = fabric:get_db_info(Db), Etag = couch_httpd:make_etag(Info), DeltaT = timer:now_diff(now(), T0) / 1000, - couch_stats_collector:record({couchdb, all_docs_etag}, DeltaT), + couch_stats_collector:record({couchdb, dbinfo}, DeltaT), QueryArgs = chttpd_view:parse_view_params(Req, Keys, map), chttpd:etag_respond(Req, Etag, fun() -> {ok, Resp} = chttpd:start_json_response(Req, 200, [{"Etag",Etag}]), @@ -398,8 +411,7 @@ all_docs_callback({row, Row}, {Prepend, Resp}) -> {ok, {",\r\n", Resp}}; all_docs_callback(complete, {_, Resp}) -> send_chunk(Resp, "\r\n]}"), - end_json_response(Resp), - {ok, Resp}; + end_json_response(Resp); all_docs_callback({error, Reason}, Resp) -> chttpd:send_chunked_error(Resp, {error, Reason}). @@ -780,7 +792,7 @@ parse_changes_query(Req) -> {"descending", "true"} -> Args#changes_args{dir=rev}; {"since", _} -> - Args#changes_args{since=list_to_integer(Value)}; + Args#changes_args{since=Value}; {"limit", _} -> Args#changes_args{limit=list_to_integer(Value)}; {"style", _} -> -- cgit v1.2.3