diff options
-rw-r--r-- | share/www/script/test/changes.js | 31 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 77 | ||||
-rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 3 |
3 files changed, 67 insertions, 44 deletions
diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js index edf3ed18..e7721cbb 100644 --- a/share/www/script/test/changes.js +++ b/share/www/script/test/changes.js @@ -45,9 +45,9 @@ couchTests.changes = function(debug) { req = CouchDB.request("GET", "/test_suite_db/_changes?feed=continuous&timeout=10"); - var resp = JSON.parse(req.responseText); - T(resp.results.length == 1 && resp.last_seq==1) - T(resp.results[0].changes[0].rev == docFoo._rev) + var lines = req.responseText.split("\n"); + T(JSON.parse(lines[0]).changes[0].rev == docFoo._rev); + T(JSON.parse(lines[1]).last_seq == 1); var xhr; @@ -67,14 +67,6 @@ couchTests.changes = function(debug) { T(JSON.parse(req.responseText).ok == true); } - var parse_changes_line = function(line) { - if (line.charAt(line.length-1) == ",") { - line = line.substring(0, line.length-1); - } - return JSON.parse(line); - } - - xhr.open("GET", "/test_suite_db/_changes?feed=continuous", true); xhr.send(""); @@ -83,15 +75,13 @@ couchTests.changes = function(debug) { sleep(100); var lines = xhr.responseText.split("\n"); - - T(lines[0]='{"results":['); - - var change = parse_changes_line(lines[1]); + + var change = JSON.parse(lines[0]); T(change.seq == 1) T(change.id == "foo") - change = parse_changes_line(lines[2]); + change = JSON.parse(lines[1]); T(change.seq == 2) T(change.id == "bar") @@ -103,7 +93,7 @@ couchTests.changes = function(debug) { sleep(100); var lines = xhr.responseText.split("\n"); - change = parse_changes_line(lines[3]); + change = JSON.parse(lines[2]); T(change.seq == 3); T(change.id == "baz"); @@ -148,6 +138,13 @@ couchTests.changes = function(debug) { var lines = xhr.responseText.split("\n"); + var parse_changes_line = function(line) { + if (line.charAt(line.length-1) == ",") { + line = line.substring(0, line.length-1); + } + return JSON.parse(line); + } + change = parse_changes_line(lines[1]); T(change.seq == 4); diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 8a46349a..32fa2935 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -66,12 +66,17 @@ get_changes_timeout(Req, Resp) -> end. +start_sending_changes(_Resp, "continuous") -> + ok; +start_sending_changes(Resp, _Else) -> + send_chunk(Resp, "{\"results\":[\n"). + handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")), {ok, Resp} = start_json_response(Req, 200), - send_chunk(Resp, "{\"results\":[\n"), - case couch_httpd:qs_value(Req, "feed", "normal") of - ResponseType when ResponseType == "continuous" orelse ResponseType == "longpoll"-> + ResponseType = couch_httpd:qs_value(Req, "feed", "normal"), + start_sending_changes(Resp, ResponseType), + if ResponseType == "continuous" orelse ResponseType == "longpoll" -> Self = self(), {ok, Notify} = couch_db_update_notifier:start_link( fun({_, DbName0}) when DbName0 == DbName -> @@ -88,10 +93,10 @@ handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> couch_db_update_notifier:stop(Notify), get_rest_db_updated() % clean out any remaining update messages end; - "normal" -> - {ok, {LastSeq, _Prepend}} = - send_changes(Req, Resp, Db, StartSeq, <<"">>), - end_sending_changes(Resp, LastSeq) + true -> + {ok, {LastSeq, _Prepend, _, _, _}} = + send_changes(Req, Resp, Db, StartSeq, <<"">>, "normal"), + end_sending_changes(Resp, LastSeq, ResponseType) end; handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> @@ -112,45 +117,63 @@ get_rest_db_updated() -> after 0 -> updated end. -end_sending_changes(Resp, EndSeq) -> +end_sending_changes(Resp, EndSeq, "continuous") -> + send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]), + end_json_response(Resp); +end_sending_changes(Resp, EndSeq, _Else) -> send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])), end_json_response(Resp). -keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) -> - {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend), +keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, + Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) -> + {ok, {EndSeq, Prepend2, _, _, _}} = send_changes(Req, Resp, Db, StartSeq, + Prepend, ResponseType), couch_db:close(Db), if EndSeq > StartSeq, ResponseType == "longpoll" -> - end_sending_changes(Resp, EndSeq); + end_sending_changes(Resp, EndSeq, ResponseType); true -> case wait_db_updated(Timeout, TimeoutFun) of updated -> {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]), keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun, ResponseType); stop -> - end_sending_changes(Resp, EndSeq) + end_sending_changes(Resp, EndSeq, ResponseType) end end. -send_changes(Req, Resp, Db, StartSeq, Prepend0) -> +changes_enumerator(DocInfos, {_, _, FilterFun, Resp, "continuous"}) -> + [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos, + Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], + Results = [Result || Result <- Results0, Result /= null], + case Results of + [] -> + {ok, {Seq, nil, FilterFun, Resp, "continuous"}}; + _ -> + send_chunk(Resp, [?JSON_ENCODE({[{seq,Seq},{id,Id},{changes,Results}]}) + |"\n"]), + {ok, {Seq, nil, FilterFun, Resp, "continuous"}} + end; +changes_enumerator(DocInfos, {_, Prepend, FilterFun, Resp, _}) -> + [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos, + Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], + Results = [Result || Result <- Results0, Result /= null], + case Results of + [] -> + {ok, {Seq, Prepend, FilterFun, Resp, nil}}; + _ -> + send_chunk(Resp, [Prepend, ?JSON_ENCODE({[{seq,Seq},{id,Id}, + {changes,Results}]})]), + {ok, {Seq, <<",\n">>, FilterFun, Resp, nil}} + end. + +send_changes(Req, Resp, Db, StartSeq, Prepend, ResponseType) -> Style = list_to_existing_atom( couch_httpd:qs_value(Req, "style", "main_only")), {FilterFun, EndFilterFun} = make_filter_funs(Req, Db), try - couch_db:changes_since(Db, Style, StartSeq, - fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, {_, Prepend}) -> - Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], - Results = [Result || Result <- Results0, Result /= null], - case Results of - [] -> - {ok, {Seq, Prepend}}; - _ -> - send_chunk(Resp, - [Prepend, ?JSON_ENCODE({[{seq,Seq}, {id, Id}, - {changes,Results}]})]), - {ok, {Seq, <<",\n">>}} - end - end, {StartSeq, Prepend0}) + couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2, + {StartSeq, Prepend, FilterFun, Resp, ResponseType}) after EndFilterFun() end. diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index 3f8e20a3..c124af1b 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -206,6 +206,9 @@ handle_response(<<"{\"results\":[\n">>, State) -> handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) -> LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), {noreply, State#state{last_seq = LastSeq}}; +handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) -> + LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), + {noreply, State#state{last_seq = LastSeq}}; handle_response(Chunk, #state{partial_chunk=nil} = State) -> #state{ count = Count, |