From 7499f6c0547b4ec35d60df390d835615f7de06e6 Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Tue, 18 Aug 2009 14:48:03 +0000 Subject: continuous _changes are now newline-delimited JSON Objects (no commas) git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@805430 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_httpd_db.erl | 77 +++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 27 deletions(-) (limited to 'src/couchdb/couch_httpd_db.erl') diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 8a46349a..32fa2935 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -66,12 +66,17 @@ get_changes_timeout(Req, Resp) -> end. +start_sending_changes(_Resp, "continuous") -> + ok; +start_sending_changes(Resp, _Else) -> + send_chunk(Resp, "{\"results\":[\n"). + handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")), {ok, Resp} = start_json_response(Req, 200), - send_chunk(Resp, "{\"results\":[\n"), - case couch_httpd:qs_value(Req, "feed", "normal") of - ResponseType when ResponseType == "continuous" orelse ResponseType == "longpoll"-> + ResponseType = couch_httpd:qs_value(Req, "feed", "normal"), + start_sending_changes(Resp, ResponseType), + if ResponseType == "continuous" orelse ResponseType == "longpoll" -> Self = self(), {ok, Notify} = couch_db_update_notifier:start_link( fun({_, DbName0}) when DbName0 == DbName -> @@ -88,10 +93,10 @@ handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> couch_db_update_notifier:stop(Notify), get_rest_db_updated() % clean out any remaining update messages end; - "normal" -> - {ok, {LastSeq, _Prepend}} = - send_changes(Req, Resp, Db, StartSeq, <<"">>), - end_sending_changes(Resp, LastSeq) + true -> + {ok, {LastSeq, _Prepend, _, _, _}} = + send_changes(Req, Resp, Db, StartSeq, <<"">>, "normal"), + end_sending_changes(Resp, LastSeq, ResponseType) end; handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> @@ -112,45 +117,63 @@ get_rest_db_updated() -> after 0 -> updated end. -end_sending_changes(Resp, EndSeq) -> +end_sending_changes(Resp, EndSeq, "continuous") -> + send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]), + end_json_response(Resp); +end_sending_changes(Resp, EndSeq, _Else) -> send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])), end_json_response(Resp). -keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) -> - {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend), +keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, + Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) -> + {ok, {EndSeq, Prepend2, _, _, _}} = send_changes(Req, Resp, Db, StartSeq, + Prepend, ResponseType), couch_db:close(Db), if EndSeq > StartSeq, ResponseType == "longpoll" -> - end_sending_changes(Resp, EndSeq); + end_sending_changes(Resp, EndSeq, ResponseType); true -> case wait_db_updated(Timeout, TimeoutFun) of updated -> {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]), keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun, ResponseType); stop -> - end_sending_changes(Resp, EndSeq) + end_sending_changes(Resp, EndSeq, ResponseType) end end. -send_changes(Req, Resp, Db, StartSeq, Prepend0) -> +changes_enumerator(DocInfos, {_, _, FilterFun, Resp, "continuous"}) -> + [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos, + Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], + Results = [Result || Result <- Results0, Result /= null], + case Results of + [] -> + {ok, {Seq, nil, FilterFun, Resp, "continuous"}}; + _ -> + send_chunk(Resp, [?JSON_ENCODE({[{seq,Seq},{id,Id},{changes,Results}]}) + |"\n"]), + {ok, {Seq, nil, FilterFun, Resp, "continuous"}} + end; +changes_enumerator(DocInfos, {_, Prepend, FilterFun, Resp, _}) -> + [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos, + Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], + Results = [Result || Result <- Results0, Result /= null], + case Results of + [] -> + {ok, {Seq, Prepend, FilterFun, Resp, nil}}; + _ -> + send_chunk(Resp, [Prepend, ?JSON_ENCODE({[{seq,Seq},{id,Id}, + {changes,Results}]})]), + {ok, {Seq, <<",\n">>, FilterFun, Resp, nil}} + end. + +send_changes(Req, Resp, Db, StartSeq, Prepend, ResponseType) -> Style = list_to_existing_atom( couch_httpd:qs_value(Req, "style", "main_only")), {FilterFun, EndFilterFun} = make_filter_funs(Req, Db), try - couch_db:changes_since(Db, Style, StartSeq, - fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, {_, Prepend}) -> - Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], - Results = [Result || Result <- Results0, Result /= null], - case Results of - [] -> - {ok, {Seq, Prepend}}; - _ -> - send_chunk(Resp, - [Prepend, ?JSON_ENCODE({[{seq,Seq}, {id, Id}, - {changes,Results}]})]), - {ok, {Seq, <<",\n">>}} - end - end, {StartSeq, Prepend0}) + couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2, + {StartSeq, Prepend, FilterFun, Resp, ResponseType}) after EndFilterFun() end. -- cgit v1.2.3