summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2009-08-18 14:48:03 +0000
committerAdam Kocoloski <kocolosk@apache.org>2009-08-18 14:48:03 +0000
commit7499f6c0547b4ec35d60df390d835615f7de06e6 (patch)
treeeda1731ad41f5ab933a7ef7d7b345bb683199421
parent3b9a900cabff50c0c48aaf454dde0a0f1cc3545e (diff)
continuous _changes are now newline-delimited JSON Objects (no commas)
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@805430 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--share/www/script/test/changes.js31
-rw-r--r--src/couchdb/couch_httpd_db.erl77
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl3
3 files changed, 67 insertions, 44 deletions
diff --git a/share/www/script/test/changes.js b/share/www/script/test/changes.js
index edf3ed18..e7721cbb 100644
--- a/share/www/script/test/changes.js
+++ b/share/www/script/test/changes.js
@@ -45,9 +45,9 @@ couchTests.changes = function(debug) {
req = CouchDB.request("GET", "/test_suite_db/_changes?feed=continuous&timeout=10");
- var resp = JSON.parse(req.responseText);
- T(resp.results.length == 1 && resp.last_seq==1)
- T(resp.results[0].changes[0].rev == docFoo._rev)
+ var lines = req.responseText.split("\n");
+ T(JSON.parse(lines[0]).changes[0].rev == docFoo._rev);
+ T(JSON.parse(lines[1]).last_seq == 1);
var xhr;
@@ -67,14 +67,6 @@ couchTests.changes = function(debug) {
T(JSON.parse(req.responseText).ok == true);
}
- var parse_changes_line = function(line) {
- if (line.charAt(line.length-1) == ",") {
- line = line.substring(0, line.length-1);
- }
- return JSON.parse(line);
- }
-
-
xhr.open("GET", "/test_suite_db/_changes?feed=continuous", true);
xhr.send("");
@@ -83,15 +75,13 @@ couchTests.changes = function(debug) {
sleep(100);
var lines = xhr.responseText.split("\n");
-
- T(lines[0]='{"results":[');
-
- var change = parse_changes_line(lines[1]);
+
+ var change = JSON.parse(lines[0]);
T(change.seq == 1)
T(change.id == "foo")
- change = parse_changes_line(lines[2]);
+ change = JSON.parse(lines[1]);
T(change.seq == 2)
T(change.id == "bar")
@@ -103,7 +93,7 @@ couchTests.changes = function(debug) {
sleep(100);
var lines = xhr.responseText.split("\n");
- change = parse_changes_line(lines[3]);
+ change = JSON.parse(lines[2]);
T(change.seq == 3);
T(change.id == "baz");
@@ -148,6 +138,13 @@ couchTests.changes = function(debug) {
var lines = xhr.responseText.split("\n");
+ var parse_changes_line = function(line) {
+ if (line.charAt(line.length-1) == ",") {
+ line = line.substring(0, line.length-1);
+ }
+ return JSON.parse(line);
+ }
+
change = parse_changes_line(lines[1]);
T(change.seq == 4);
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 8a46349a..32fa2935 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -66,12 +66,17 @@ get_changes_timeout(Req, Resp) ->
end.
+start_sending_changes(_Resp, "continuous") ->
+ ok;
+start_sending_changes(Resp, _Else) ->
+ send_chunk(Resp, "{\"results\":[\n").
+
handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")),
{ok, Resp} = start_json_response(Req, 200),
- send_chunk(Resp, "{\"results\":[\n"),
- case couch_httpd:qs_value(Req, "feed", "normal") of
- ResponseType when ResponseType == "continuous" orelse ResponseType == "longpoll"->
+ ResponseType = couch_httpd:qs_value(Req, "feed", "normal"),
+ start_sending_changes(Resp, ResponseType),
+ if ResponseType == "continuous" orelse ResponseType == "longpoll" ->
Self = self(),
{ok, Notify} = couch_db_update_notifier:start_link(
fun({_, DbName0}) when DbName0 == DbName ->
@@ -88,10 +93,10 @@ handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
couch_db_update_notifier:stop(Notify),
get_rest_db_updated() % clean out any remaining update messages
end;
- "normal" ->
- {ok, {LastSeq, _Prepend}} =
- send_changes(Req, Resp, Db, StartSeq, <<"">>),
- end_sending_changes(Resp, LastSeq)
+ true ->
+ {ok, {LastSeq, _Prepend, _, _, _}} =
+ send_changes(Req, Resp, Db, StartSeq, <<"">>, "normal"),
+ end_sending_changes(Resp, LastSeq, ResponseType)
end;
handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
@@ -112,45 +117,63 @@ get_rest_db_updated() ->
after 0 -> updated
end.
-end_sending_changes(Resp, EndSeq) ->
+end_sending_changes(Resp, EndSeq, "continuous") ->
+ send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
+ end_json_response(Resp);
+end_sending_changes(Resp, EndSeq, _Else) ->
send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])),
end_json_response(Resp).
-keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) ->
- {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend),
+keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp,
+ Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) ->
+ {ok, {EndSeq, Prepend2, _, _, _}} = send_changes(Req, Resp, Db, StartSeq,
+ Prepend, ResponseType),
couch_db:close(Db),
if
EndSeq > StartSeq, ResponseType == "longpoll" ->
- end_sending_changes(Resp, EndSeq);
+ end_sending_changes(Resp, EndSeq, ResponseType);
true ->
case wait_db_updated(Timeout, TimeoutFun) of
updated ->
{ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun, ResponseType);
stop ->
- end_sending_changes(Resp, EndSeq)
+ end_sending_changes(Resp, EndSeq, ResponseType)
end
end.
-send_changes(Req, Resp, Db, StartSeq, Prepend0) ->
+changes_enumerator(DocInfos, {_, _, FilterFun, Resp, "continuous"}) ->
+ [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos,
+ Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
+ Results = [Result || Result <- Results0, Result /= null],
+ case Results of
+ [] ->
+ {ok, {Seq, nil, FilterFun, Resp, "continuous"}};
+ _ ->
+ send_chunk(Resp, [?JSON_ENCODE({[{seq,Seq},{id,Id},{changes,Results}]})
+ |"\n"]),
+ {ok, {Seq, nil, FilterFun, Resp, "continuous"}}
+ end;
+changes_enumerator(DocInfos, {_, Prepend, FilterFun, Resp, _}) ->
+ [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos,
+ Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
+ Results = [Result || Result <- Results0, Result /= null],
+ case Results of
+ [] ->
+ {ok, {Seq, Prepend, FilterFun, Resp, nil}};
+ _ ->
+ send_chunk(Resp, [Prepend, ?JSON_ENCODE({[{seq,Seq},{id,Id},
+ {changes,Results}]})]),
+ {ok, {Seq, <<",\n">>, FilterFun, Resp, nil}}
+ end.
+
+send_changes(Req, Resp, Db, StartSeq, Prepend, ResponseType) ->
Style = list_to_existing_atom(
couch_httpd:qs_value(Req, "style", "main_only")),
{FilterFun, EndFilterFun} = make_filter_funs(Req, Db),
try
- couch_db:changes_since(Db, Style, StartSeq,
- fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, {_, Prepend}) ->
- Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
- Results = [Result || Result <- Results0, Result /= null],
- case Results of
- [] ->
- {ok, {Seq, Prepend}};
- _ ->
- send_chunk(Resp,
- [Prepend, ?JSON_ENCODE({[{seq,Seq}, {id, Id},
- {changes,Results}]})]),
- {ok, {Seq, <<",\n">>}}
- end
- end, {StartSeq, Prepend0})
+ couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2,
+ {StartSeq, Prepend, FilterFun, Resp, ResponseType})
after
EndFilterFun()
end.
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index 3f8e20a3..c124af1b 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -206,6 +206,9 @@ handle_response(<<"{\"results\":[\n">>, State) ->
handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) ->
LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
{noreply, State#state{last_seq = LastSeq}};
+handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) ->
+ LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
+ {noreply, State#state{last_seq = LastSeq}};
handle_response(Chunk, #state{partial_chunk=nil} = State) ->
#state{
count = Count,