summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_httpd_db.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_httpd_db.erl')
-rw-r--r--src/couchdb/couch_httpd_db.erl265
1 files changed, 77 insertions, 188 deletions
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 9ad34752..1028e857 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -55,200 +55,62 @@ handle_request(#httpd{path_parts=[DbName|RestParts],method=Method,
do_db_req(Req, Handler)
end.
-get_changes_timeout(Req, Resp) ->
- DefaultTimeout = list_to_integer(
- couch_config:get("httpd", "changes_timeout", "60000")),
- case couch_httpd:qs_value(Req, "heartbeat") of
- undefined ->
- case couch_httpd:qs_value(Req, "timeout") of
- undefined ->
- {DefaultTimeout, fun() -> stop end};
- TimeoutList ->
- {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
- fun() -> stop end}
- end;
- "true" ->
- {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end};
- TimeoutList ->
- {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
- fun() -> send_chunk(Resp, "\n"), ok end}
- end.
-
-
-start_sending_changes(_Resp, "continuous") ->
- ok;
-start_sending_changes(Resp, _Else) ->
- send_chunk(Resp, "{\"results\":[\n").
-
-handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
- FilterFun = make_filter_fun(Req, Db),
- {ok, Info} = couch_db:get_db_info(Db),
- Seq = proplists:get_value(update_seq, Info),
- {Dir, StartSeq} = case couch_httpd:qs_value(Req, "descending", "false") of
- "false" ->
- {fwd, list_to_integer(couch_httpd:qs_value(Req, "since", "0"))};
- "true" ->
- {rev, Seq};
- _Bad -> throw({bad_request, "descending must be true or false"})
+handle_changes_req(#httpd{method='GET'}=Req, Db) ->
+ MakeCallback = fun(Resp) ->
+ fun({change, Change, _}, "continuous") ->
+ send_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]);
+ ({change, Change, Prepend}, _) ->
+ send_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]);
+ (start, "continuous") ->
+ ok;
+ (start, _) ->
+ send_chunk(Resp, "{\"results\":[\n");
+ ({stop, EndSeq}, "continuous") ->
+ send_chunk(
+ Resp,
+ [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]
+ ),
+ end_json_response(Resp);
+ ({stop, EndSeq}, _) ->
+ send_chunk(
+ Resp,
+ io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])
+ ),
+ end_json_response(Resp);
+ (timeout, _) ->
+ send_chunk(Resp, "\n")
+ end
end,
- Limit = list_to_integer(couch_httpd:qs_value(Req, "limit", "1000000000000000")),
- ResponseType = couch_httpd:qs_value(Req, "feed", "normal"),
- if ResponseType == "continuous" orelse ResponseType == "longpoll" ->
- {ok, Resp} = start_json_response(Req, 200),
- start_sending_changes(Resp, ResponseType),
-
- Self = self(),
- {ok, Notify} = couch_db_update_notifier:start_link(
- fun({_, DbName0}) when DbName0 == DbName ->
- Self ! db_updated;
- (_) ->
- ok
- end),
- {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp),
- couch_stats_collector:track_process_count(Self,
- {httpd, clients_requesting_changes}),
- try
- keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout,
- TimeoutFun, ResponseType, Limit, FilterFun)
- after
- couch_db_update_notifier:stop(Notify),
- get_rest_db_updated() % clean out any remaining update messages
- end;
- true ->
+ ChangesArgs = parse_changes_query(Req),
+ ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
+ WrapperFun = case ChangesArgs#changes_args.feed of
+ "normal" ->
+ {ok, Info} = couch_db:get_db_info(Db),
CurrentEtag = couch_httpd:make_etag(Info),
- couch_httpd:etag_respond(Req, CurrentEtag, fun() ->
- % send the etag
- {ok, Resp} = start_json_response(Req, 200, [{"Etag", CurrentEtag}]),
- start_sending_changes(Resp, ResponseType),
- {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} =
- send_changes(Req, Resp, Db, Dir, StartSeq, <<"">>, "normal",
- Limit, FilterFun),
- end_sending_changes(Resp, LastSeq, ResponseType)
- end)
- end;
+ fun(FeedChangesFun) ->
+ couch_httpd:etag_respond(
+ Req,
+ CurrentEtag,
+ fun() ->
+ {ok, Resp} = couch_httpd:start_json_response(
+ Req, 200, [{"Etag", CurrentEtag}]
+ ),
+ FeedChangesFun(MakeCallback(Resp))
+ end
+ )
+ end;
+ _ ->
+ % "longpoll" or "continuous"
+ {ok, Resp} = couch_httpd:start_json_response(Req, 200),
+ fun(FeedChangesFun) ->
+ FeedChangesFun(MakeCallback(Resp))
+ end
+ end,
+ WrapperFun(ChangesFun);
handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
send_method_not_allowed(Req, "GET,HEAD").
-% waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout, TimeoutFun) ->
- receive db_updated -> get_rest_db_updated()
- after Timeout ->
- case TimeoutFun() of
- ok -> wait_db_updated(Timeout, TimeoutFun);
- stop -> stop
- end
- end.
-
-get_rest_db_updated() ->
- receive db_updated -> get_rest_db_updated()
- after 0 -> updated
- end.
-
-end_sending_changes(Resp, EndSeq, "continuous") ->
- send_chunk(Resp, [?JSON_ENCODE({[{<<"last_seq">>, EndSeq}]}) | "\n"]),
- end_json_response(Resp);
-end_sending_changes(Resp, EndSeq, _Else) ->
- send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq])),
- end_json_response(Resp).
-
-keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp,
- Db, StartSeq, Prepend, Timeout, TimeoutFun, ResponseType, Limit, Filter) ->
- {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(Req, Resp, Db, fwd, StartSeq,
- Prepend, ResponseType, Limit, Filter),
- couch_db:close(Db),
- if
- Limit > NewLimit, ResponseType == "longpoll" ->
- end_sending_changes(Resp, EndSeq, ResponseType);
- true ->
- case wait_db_updated(Timeout, TimeoutFun) of
- updated ->
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db2} ->
- keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout,
- TimeoutFun, ResponseType, NewLimit, Filter);
- _Else ->
- end_sending_changes(Resp, EndSeq, ResponseType)
- end;
- stop ->
- end_sending_changes(Resp, EndSeq, ResponseType)
- end
- end.
-
-changes_enumerator(DocInfos, {Db, _, _, FilterFun, Resp, "continuous", Limit, IncludeDocs}) ->
- [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos,
- Results0 = FilterFun(DocInfos),
- Results = [Result || Result <- Results0, Result /= null],
- Go = if Limit =< 1 -> stop; true -> ok end,
- case Results of
- [] ->
- {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit, IncludeDocs}};
- _ ->
- send_chunk(Resp, [?JSON_ENCODE(changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs))
- |"\n"]),
- {Go, {Db, Seq, nil, FilterFun, Resp, "continuous", Limit-1, IncludeDocs}}
- end;
-changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Resp, _, Limit, IncludeDocs}) ->
- [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_] = DocInfos,
- Results0 = FilterFun(DocInfos),
- Results = [Result || Result <- Results0, Result /= null],
- Go = if Limit =< 1 -> stop; true -> ok end,
- case Results of
- [] ->
- {Go, {Db, Seq, Prepend, FilterFun, Resp, nil, Limit, IncludeDocs}};
- _ ->
- send_chunk(Resp, [Prepend, ?JSON_ENCODE(
- changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs))]),
- {Go, {Db, Seq, <<",\n">>, FilterFun, Resp, nil, Limit-1, IncludeDocs}}
- end.
-
-changes_row(Db, Seq, Id, Del, Results, Rev, true) ->
- {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del) ++
- couch_httpd_view:doc_member(Db, {Id, Rev})};
-changes_row(_, Seq, Id, Del, Results, _, false) ->
- {[{seq,Seq},{id,Id},{changes,Results}] ++ deleted_item(Del)}.
-
-deleted_item(true) -> [{deleted,true}];
-deleted_item(_) -> [].
-
-send_changes(Req, Resp, Db, Dir, StartSeq, Prepend, ResponseType, Limit, FilterFun) ->
- Style = list_to_existing_atom(
- couch_httpd:qs_value(Req, "style", "main_only")),
- IncludeDocs = list_to_existing_atom(
- couch_httpd:qs_value(Req, "include_docs", "false")),
- couch_db:changes_since(Db, Style, StartSeq, fun changes_enumerator/2,
- [{dir, Dir}], {Db, StartSeq, Prepend, FilterFun, Resp, ResponseType, Limit, IncludeDocs}).
-
-make_filter_fun(Req, Db) ->
- Filter = couch_httpd:qs_value(Req, "filter", ""),
- case [list_to_binary(couch_httpd:unquote(Part))
- || Part <- string:tokens(Filter, "/")] of
- [] ->
- fun(DocInfos) ->
- % doing this as a batch is more efficient for external filters
- [{[{rev, couch_doc:rev_to_str(Rev)}]} ||
- #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos]
- end;
- [DName, FName] ->
- DesignId = <<"_design/", DName/binary>>,
- DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
- % validate that the ddoc has the filter fun
- #doc{body={Props}} = DDoc,
- couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
- fun(DocInfos) ->
- Docs = [Doc || {ok, Doc} <- [
- {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts])
- || DInfo <- DocInfos]],
- {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
- [{[{rev, couch_doc:rev_to_str(Rev)}]}
- || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos,
- Pass <- Passes, Pass == true]
- end;
- _Else ->
- throw({bad_request,
- "filter parameter must be of the form `designname/filtername`"})
- end.
-
handle_compact_req(#httpd{method='POST',path_parts=[DbName,_,Id|_]}=Req, _Db) ->
ok = couch_view_compactor:start_compact(DbName, Id),
send_json(Req, 202, {[{ok, true}]});
@@ -1188,6 +1050,33 @@ parse_doc_query(Req) ->
end
end, #doc_query_args{}, couch_httpd:qs(Req)).
+parse_changes_query(Req) ->
+ lists:foldl(fun({Key, Value}, Args) ->
+ case {Key, Value} of
+ {"feed", _} ->
+ Args#changes_args{feed=Value};
+ {"descending", "true"} ->
+ Args#changes_args{dir=rev};
+ {"since", _} ->
+ Args#changes_args{since=list_to_integer(Value)};
+ {"limit", _} ->
+ Args#changes_args{limit=list_to_integer(Value)};
+ {"style", _} ->
+ Args#changes_args{style=list_to_existing_atom(Value)};
+ {"heartbeat", "true"} ->
+ Args#changes_args{heartbeat=true};
+ {"heartbeat", _} ->
+ Args#changes_args{heartbeat=list_to_integer(Value)};
+ {"timeout", _} ->
+ Args#changes_args{timeout=list_to_integer(Value)};
+ {"include_docs", "true"} ->
+ Args#changes_args{include_docs=true};
+ {"filter", _} ->
+ Args#changes_args{filter=Value};
+ _Else -> % unknown key value pair, ignore.
+ Args
+ end
+ end, #changes_args{}, couch_httpd:qs(Req)).
extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));