diff options
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 77 | ||||
-rw-r--r-- | src/couchdb/couch_rep_changes_feed.erl | 3 |
2 files changed, 53 insertions, 27 deletions
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 8a46349a..32fa2935 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -66,12 +66,17 @@ get_changes_timeout(Req, Resp) -> end. +start_sending_changes(_Resp, "continuous") -> + ok; +start_sending_changes(Resp, _Else) -> + send_chunk(Resp, "{\"results\":[\n"). + handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")), {ok, Resp} = start_json_response(Req, 200), - send_chunk(Resp, "{\"results\":[\n"), - case couch_httpd:qs_value(Req, "feed", "normal") of - ResponseType when ResponseType == "continuous" orelse ResponseType == "longpoll"-> + ResponseType = couch_httpd:qs_value(Req, "feed", "normal"), + start_sending_changes(Resp, ResponseType), + if ResponseType == "continuous" orelse ResponseType == "longpoll" -> Self = self(), {ok, Notify} = couch_db_update_notifier:start_link( fun({_, DbName0}) when DbName0 == DbName -> @@ -88,10 +93,10 @@ handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) -> couch_db_update_notifier:stop(Notify), get_rest_db_updated() % clean out any remaining update messages end; - "normal" -> - {ok, {LastSeq, _Prepend}} = - send_changes(Req, Resp, Db, StartSeq, <<"">>), - end_sending_changes(Resp, LastSeq) + true -> + {ok, {LastSeq, _Prepend, _, _, _}} = + send_changes(Req, Resp, Db, StartSeq, <<"">>, "normal"), + end_sending_changes(Resp, LastSeq, ResponseType) end; handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) -> @@ -112,45 +117,63 @@ get_rest_db_updated() -> after 0 -> updated end. -end_sending_changes(Resp, EndSeq) -> +end_sending_changes(Resp, EndSeq, "continuous") -> + send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]), + end_json_response(Resp); +end_sending_changes(Resp, EndSeq, _Else) -> send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])), end_json_response(Resp). -keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) -> - {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend), +keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, + Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType) -> + {ok, {EndSeq, Prepend2, _, _, _}} = send_changes(Req, Resp, Db, StartSeq, + Prepend, ResponseType), couch_db:close(Db), if EndSeq > StartSeq, ResponseType == "longpoll" -> - end_sending_changes(Resp, EndSeq); + end_sending_changes(Resp, EndSeq, ResponseType); true -> case wait_db_updated(Timeout, TimeoutFun) of updated -> {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]), keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun, ResponseType); stop -> - end_sending_changes(Resp, EndSeq) + end_sending_changes(Resp, EndSeq, ResponseType) end end. -send_changes(Req, Resp, Db, StartSeq, Prepend0) -> +changes_enumerator(DocInfos, {_, _, FilterFun, Resp, "continuous"}) -> + [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos, + Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], + Results = [Result || Result <- Results0, Result /= null], + case Results of + [] -> + {ok, {Seq, nil, FilterFun, Resp, "continuous"}}; + _ -> + send_chunk(Resp, [?JSON_ENCODE({[{seq,Seq},{id,Id},{changes,Results}]}) + |"\n"]), + {ok, {Seq, nil, FilterFun, Resp, "continuous"}} + end; +changes_enumerator(DocInfos, {_, Prepend, FilterFun, Resp, _}) -> + [#doc_info{id=Id, high_seq=Seq}|_] = DocInfos, + Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], + Results = [Result || Result <- Results0, Result /= null], + case Results of + [] -> + {ok, {Seq, Prepend, FilterFun, Resp, nil}}; + _ -> + send_chunk(Resp, [Prepend, ?JSON_ENCODE({[{seq,Seq},{id,Id}, + {changes,Results}]})]), + {ok, {Seq, <<",\n">>, FilterFun, Resp, nil}} + end. + +send_changes(Req, Resp, Db, StartSeq, Prepend, ResponseType) -> Style = list_to_existing_atom( couch_httpd:qs_value(Req, "style", "main_only")), {FilterFun, EndFilterFun} = make_filter_funs(Req, Db), try - couch_db:changes_since(Db, Style, StartSeq, - fun([#doc_info{id=Id, high_seq=Seq}|_]=DocInfos, {_, Prepend}) -> - Results0 = [FilterFun(DocInfo) || DocInfo <- DocInfos], - Results = [Result || Result <- Results0, Result /= null], - case Results of - [] -> - {ok, {Seq, Prepend}}; - _ -> - send_chunk(Resp, - [Prepend, ?JSON_ENCODE({[{seq,Seq}, {id, Id}, - {changes,Results}]})]), - {ok, {Seq, <<",\n">>}} - end - end, {StartSeq, Prepend0}) + couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2, + {StartSeq, Prepend, FilterFun, Resp, ResponseType}) after EndFilterFun() end. diff --git a/src/couchdb/couch_rep_changes_feed.erl b/src/couchdb/couch_rep_changes_feed.erl index 3f8e20a3..c124af1b 100644 --- a/src/couchdb/couch_rep_changes_feed.erl +++ b/src/couchdb/couch_rep_changes_feed.erl @@ -206,6 +206,9 @@ handle_response(<<"{\"results\":[\n">>, State) -> handle_response(<<"\n],\n\"last_seq\":", LastSeqStr/binary>>, State) -> LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), {noreply, State#state{last_seq = LastSeq}}; +handle_response(<<"{\"last_seq\":", LastSeqStr/binary>>, State) -> + LastSeq = list_to_integer(?b2l(hd(re:split(LastSeqStr, "}")))), + {noreply, State#state{last_seq = LastSeq}}; handle_response(Chunk, #state{partial_chunk=nil} = State) -> #state{ count = Count, |