summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_httpd_db.erl77
-rw-r--r--src/couchdb/couch_rep_changes_feed.erl3
2 files changed, 53 insertions, 27 deletions
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 8a46349a..32fa2935 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -66,12 +66,17 @@ get_changes_timeout(Req, Resp) ->
end.
+start_sending_changes(_Resp, "continuous") ->
+ ok;
+start_sending_changes(Resp, _Else) ->
+ send_chunk(Resp, "{\"results\":[\n").
+
handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")),
{ok, Resp} = start_json_response(Req, 200),
- send_chunk(Resp, "{\"results\":[\n"),
- case couch_httpd:qs_value(Req, "feed", "normal") of
- ResponseType when ResponseType == "continuous" orelse ResponseType == "longpoll"->
+ ResponseType = couch_httpd:qs_value(Req, "feed", "normal"),
+ start_sending_changes(Resp, ResponseType),
+ if ResponseType == "continuous" orelse ResponseType == "longpoll" ->
Self = self(),
{ok, Notify} = couch_db_update_notifier:start_link(
fun({_, DbName0}) when DbName0 == DbName ->
@@ -88,10 +93,10 @@ handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
couch_db_update_notifier:stop(Notify),
get_rest_db_updated() % clean out any remaining update messages
end;
- "normal" ->
- {ok, {LastSeq, _Prepend}} =
- send_changes(Req, Resp, Db, StartSeq, <<"">>),
- end_sending_changes(Resp, LastSeq)
+ true ->
+ {ok, {LastSeq, _Prepend, _, _, _}} =
+ send_changes(Req, Resp, Db, StartSeq, <<"">>, "normal"),
+ end_sending_changes(Resp, LastSeq, ResponseType)
end;
handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
@@ -112,45 +117,63 @@ get_rest_db_updated() ->
after 0 -> updated
end.
-end_sending_changes(Resp, EndSeq) ->
+end_sending_changes(Resp, EndSeq, "continuous") ->
+ send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
+ end_json_response(Resp);
+end_sending_changes(Resp, EndSeq, _Else) ->
send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])),
end_json_response(Resp).
-keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) ->
- {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend),
+keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp,
+ Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) ->
+ {ok, {EndSeq, Prepend2, _, _, _}} = send_changes(Req, Resp, Db, StartSeq,
+ Prepend, ResponseType),
couch_db:close(Db),
if
EndSeq > StartSeq, ResponseType == "longpoll" ->
- end_sending_changes(Resp, EndSeq);
+ end_sending_changes(Resp, EndSeq, ResponseType);
true ->
case wait_db_updated(Timeout, TimeoutFun) of
updated ->
{ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun, ResponseType);
stop ->
- end_sending_changes(Resp, EndSeq)
+ end_sending_changes(Resp, EndSeq, ResponseType)
end
end.
-send_changes(Req, Resp, Db, StartSeq, Prepend0) ->
+changes_enumerator(DocInfos, {_, _, FilterFun, Resp, "continuous"}) ->
+ [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos,
+ Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
+ Results = [Result || Result <- Results0, Result /= null],
+ case Results of
+ [] ->
+ {ok, {Seq, nil, FilterFun, Resp, "continuous"}};
+ _ ->
+ send_chunk(Resp, [?JSON_ENCODE({[{seq,Seq},{id,Id},{changes,Results}]})
+ |"\n"]),
+ {ok, {Seq, nil, FilterFun, Resp, "continuous"}}
+ end;
+changes_enumerator(DocInfos, {_, Prepend, FilterFun, Resp, _}) ->
+ [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos,
+ Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
+ Results = [Result || Result <- Results0, Result /= null],
+ case Results of
+ [] ->
+ {ok, {Seq, Prepend, FilterFun, Resp, nil}};
+ _ ->
+ send_chunk(Resp, [Prepend, ?JSON_ENCODE({[{seq,Seq},{id,Id},
+ {changes,Results}]})]),
+ {ok, {Seq, <<",\n">>, FilterFun, Resp, nil}}
+ end.
+
+send_changes(Req, Resp, Db, StartSeq, Prepend, ResponseType) ->
Style = list_to_existing_atom(
couch_httpd:qs_value(Req, "style", "main_only")),
{FilterFun, EndFilterFun} = make_filter_funs(Req, Db),
try
- couch_db:changes_since(Db, Style, StartSeq,
- fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, {_, Prepend}) ->
- Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos],
- Results = [Result || Result <- Results0, Result /= null],
- case Results of
- [] ->
- {ok, {Seq, Prepend}};
- _ ->
- send_chunk(Resp,
- [Prepend, ?JSON_ENCODE({[{seq,Seq}, {id, Id},
- {changes,Results}]})]),
- {ok, {Seq, <<",\n">>}}
- end
- end, {StartSeq, Prepend0})
+ couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2,
+ {StartSeq, Prepend, FilterFun, Resp, ResponseType})
after
EndFilterFun()
end.
diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl
index 3f8e20a3..c124af1b 100644
--- a/src/couchdb/couch_rep_changes_feed.erl
+++ b/src/couchdb/couch_rep_changes_feed.erl
@@ -206,6 +206,9 @@ handle_response(<<"{\"results\":[\n">>, State) ->
handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) ->
LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
{noreply, State#state{last_seq = LastSeq}};
+handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) ->
+ LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))),
+ {noreply, State#state{last_seq = LastSeq}};
handle_response(Chunk, #state{partial_chunk=nil} = State) ->
#state{
count = Count,