summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_httpd_proxy.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2011-08-15 15:45:15 -0400
committerAdam Kocoloski <adam@cloudant.com>2011-08-15 15:45:15 -0400
commit6ffe1675dd7b004e48891956a6bdbe32899ce80c (patch)
tree57326d9a498481e65bb0db38c66daf10896801f1 /apps/couch/src/couch_httpd_proxy.erl
parent52ff89ff7996e839b9e2f91fd76184d362a8aeb0 (diff)
parentfdd1a5d0bc48b49b0df5c9217beff9574011283c (diff)
Merge branch '11554-merge-couchdb-1.1'
Diffstat (limited to 'apps/couch/src/couch_httpd_proxy.erl')
-rw-r--r--apps/couch/src/couch_httpd_proxy.erl431
1 files changed, 431 insertions, 0 deletions
diff --git a/apps/couch/src/couch_httpd_proxy.erl b/apps/couch/src/couch_httpd_proxy.erl
new file mode 100644
index 00000000..c196f72d
--- /dev/null
+++ b/apps/couch/src/couch_httpd_proxy.erl
@@ -0,0 +1,431 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(couch_httpd_proxy).
+
+-export([handle_proxy_req/2]).
+
+-include("couch_db.hrl").
+-include_lib("ibrowse/include/ibrowse.hrl").
+
+-define(TIMEOUT, infinity).
+-define(PKT_SIZE, 4096).
+
+
+handle_proxy_req(Req, ProxyDest) ->
+
+ %% Bug in Mochiweb?
+ %% Reported here: http://github.com/mochi/mochiweb/issues/issue/16
+ erase(mochiweb_request_body_length),
+
+ Method = get_method(Req),
+ Url = get_url(Req, ProxyDest),
+ Version = get_version(Req),
+ Headers = get_headers(Req),
+ Body = get_body(Req),
+ Options = [
+ {http_vsn, Version},
+ {headers_as_is, true},
+ {response_format, binary},
+ {stream_to, {self(), once}}
+ ],
+ case ibrowse:send_req(Url, Headers, Method, Body, Options, ?TIMEOUT) of
+ {ibrowse_req_id, ReqId} ->
+ stream_response(Req, ProxyDest, ReqId);
+ {error, Reason} ->
+ throw({error, Reason})
+ end.
+
+
+get_method(#httpd{mochi_req=MochiReq}) ->
+ case MochiReq:get(method) of
+ Method when is_atom(Method) ->
+ list_to_atom(string:to_lower(atom_to_list(Method)));
+ Method when is_list(Method) ->
+ list_to_atom(string:to_lower(Method));
+ Method when is_binary(Method) ->
+ list_to_atom(string:to_lower(?b2l(Method)))
+ end.
+
+
+get_url(Req, ProxyDest) when is_binary(ProxyDest) ->
+ get_url(Req, ?b2l(ProxyDest));
+get_url(#httpd{mochi_req=MochiReq}=Req, ProxyDest) ->
+ BaseUrl = case mochiweb_util:partition(ProxyDest, "/") of
+ {[], "/", _} -> couch_httpd:absolute_uri(Req, ProxyDest);
+ _ -> ProxyDest
+ end,
+ ProxyPrefix = "/" ++ ?b2l(hd(Req#httpd.path_parts)),
+ RequestedPath = MochiReq:get(raw_path),
+ case mochiweb_util:partition(RequestedPath, ProxyPrefix) of
+ {[], ProxyPrefix, []} ->
+ BaseUrl;
+ {[], ProxyPrefix, [$/ | DestPath]} ->
+ remove_trailing_slash(BaseUrl) ++ "/" ++ DestPath;
+ {[], ProxyPrefix, DestPath} ->
+ remove_trailing_slash(BaseUrl) ++ "/" ++ DestPath;
+ _Else ->
+ throw({invalid_url_path, {ProxyPrefix, RequestedPath}})
+ end.
+
+get_version(#httpd{mochi_req=MochiReq}) ->
+ MochiReq:get(version).
+
+
+get_headers(#httpd{mochi_req=MochiReq}) ->
+ to_ibrowse_headers(mochiweb_headers:to_list(MochiReq:get(headers)), []).
+
+to_ibrowse_headers([], Acc) ->
+ lists:reverse(Acc);
+to_ibrowse_headers([{K, V} | Rest], Acc) when is_atom(K) ->
+ to_ibrowse_headers([{atom_to_list(K), V} | Rest], Acc);
+to_ibrowse_headers([{K, V} | Rest], Acc) when is_list(K) ->
+ case string:to_lower(K) of
+ "content-length" ->
+ to_ibrowse_headers(Rest, [{content_length, V} | Acc]);
+ % This appears to make ibrowse too smart.
+ %"transfer-encoding" ->
+ % to_ibrowse_headers(Rest, [{transfer_encoding, V} | Acc]);
+ _ ->
+ to_ibrowse_headers(Rest, [{K, V} | Acc])
+ end.
+
+get_body(#httpd{method='GET'}) ->
+ fun() -> eof end;
+get_body(#httpd{method='HEAD'}) ->
+ fun() -> eof end;
+get_body(#httpd{method='DELETE'}) ->
+ fun() -> eof end;
+get_body(#httpd{mochi_req=MochiReq}) ->
+ case MochiReq:get(body_length) of
+ undefined ->
+ <<>>;
+ {unknown_transfer_encoding, Unknown} ->
+ exit({unknown_transfer_encoding, Unknown});
+ chunked ->
+ {fun stream_chunked_body/1, {init, MochiReq, 0}};
+ 0 ->
+ <<>>;
+ Length when is_integer(Length) andalso Length > 0 ->
+ {fun stream_length_body/1, {init, MochiReq, Length}};
+ Length ->
+ exit({invalid_body_length, Length})
+ end.
+
+
+remove_trailing_slash(Url) ->
+ rem_slash(lists:reverse(Url)).
+
+rem_slash([]) ->
+ [];
+rem_slash([$\s | RevUrl]) ->
+ rem_slash(RevUrl);
+rem_slash([$\t | RevUrl]) ->
+ rem_slash(RevUrl);
+rem_slash([$\r | RevUrl]) ->
+ rem_slash(RevUrl);
+rem_slash([$\n | RevUrl]) ->
+ rem_slash(RevUrl);
+rem_slash([$/ | RevUrl]) ->
+ rem_slash(RevUrl);
+rem_slash(RevUrl) ->
+ lists:reverse(RevUrl).
+
+
+stream_chunked_body({init, MReq, 0}) ->
+ % First chunk, do expect-continue dance.
+ init_body_stream(MReq),
+ stream_chunked_body({stream, MReq, 0, [], ?PKT_SIZE});
+stream_chunked_body({stream, MReq, 0, Buf, BRem}) ->
+ % Finished a chunk, get next length. If next length
+ % is 0, its time to try and read trailers.
+ {CRem, Data} = read_chunk_length(MReq),
+ case CRem of
+ 0 ->
+ BodyData = lists:reverse(Buf, Data),
+ {ok, BodyData, {trailers, MReq, [], ?PKT_SIZE}};
+ _ ->
+ stream_chunked_body(
+ {stream, MReq, CRem, [Data | Buf], BRem-size(Data)}
+ )
+ end;
+stream_chunked_body({stream, MReq, CRem, Buf, BRem}) when BRem =< 0 ->
+ % Time to empty our buffers to the upstream socket.
+ BodyData = lists:reverse(Buf),
+ {ok, BodyData, {stream, MReq, CRem, [], ?PKT_SIZE}};
+stream_chunked_body({stream, MReq, CRem, Buf, BRem}) ->
+ % Buffer some more data from the client.
+ Length = lists:min([CRem, BRem]),
+ Socket = MReq:get(socket),
+ NewState = case mochiweb_socket:recv(Socket, Length, ?TIMEOUT) of
+ {ok, Data} when size(Data) == CRem ->
+ case mochiweb_socket:recv(Socket, 2, ?TIMEOUT) of
+ {ok, <<"\r\n">>} ->
+ {stream, MReq, 0, [<<"\r\n">>, Data | Buf], BRem-Length-2};
+ _ ->
+ exit(normal)
+ end;
+ {ok, Data} ->
+ {stream, MReq, CRem-Length, [Data | Buf], BRem-Length};
+ _ ->
+ exit(normal)
+ end,
+ stream_chunked_body(NewState);
+stream_chunked_body({trailers, MReq, Buf, BRem}) when BRem =< 0 ->
+ % Empty our buffers and send data upstream.
+ BodyData = lists:reverse(Buf),
+ {ok, BodyData, {trailers, MReq, [], ?PKT_SIZE}};
+stream_chunked_body({trailers, MReq, Buf, BRem}) ->
+ % Read another trailer into the buffer or stop on an
+ % empty line.
+ Socket = MReq:get(socket),
+ mochiweb_socket:setopts(Socket, [{packet, line}]),
+ case mochiweb_socket:recv(Socket, 0, ?TIMEOUT) of
+ {ok, <<"\r\n">>} ->
+ mochiweb_socket:setopts(Socket, [{packet, raw}]),
+ BodyData = lists:reverse(Buf, <<"\r\n">>),
+ {ok, BodyData, eof};
+ {ok, Footer} ->
+ mochiweb_socket:setopts(Socket, [{packet, raw}]),
+ NewState = {trailers, MReq, [Footer | Buf], BRem-size(Footer)},
+ stream_chunked_body(NewState);
+ _ ->
+ exit(normal)
+ end;
+stream_chunked_body(eof) ->
+ % Tell ibrowse we're done sending data.
+ eof.
+
+
+stream_length_body({init, MochiReq, Length}) ->
+ % Do the expect-continue dance
+ init_body_stream(MochiReq),
+ stream_length_body({stream, MochiReq, Length});
+stream_length_body({stream, _MochiReq, 0}) ->
+ % Finished streaming.
+ eof;
+stream_length_body({stream, MochiReq, Length}) ->
+ BufLen = lists:min([Length, ?PKT_SIZE]),
+ case MochiReq:recv(BufLen) of
+ <<>> -> eof;
+ Bin -> {ok, Bin, {stream, MochiReq, Length-BufLen}}
+ end.
+
+
+init_body_stream(MochiReq) ->
+ Expect = case MochiReq:get_header_value("expect") of
+ undefined ->
+ undefined;
+ Value when is_list(Value) ->
+ string:to_lower(Value)
+ end,
+ case Expect of
+ "100-continue" ->
+ MochiReq:start_raw_response({100, gb_trees:empty()});
+ _Else ->
+ ok
+ end.
+
+
+read_chunk_length(MochiReq) ->
+ Socket = MochiReq:get(socket),
+ mochiweb_socket:setopts(Socket, [{packet, line}]),
+ case mochiweb_socket:recv(Socket, 0, ?TIMEOUT) of
+ {ok, Header} ->
+ mochiweb_socket:setopts(Socket, [{packet, raw}]),
+ Splitter = fun(C) ->
+ C =/= $\r andalso C =/= $\n andalso C =/= $\s
+ end,
+ {Hex, _Rest} = lists:splitwith(Splitter, ?b2l(Header)),
+ {mochihex:to_int(Hex), Header};
+ _ ->
+ exit(normal)
+ end.
+
+
+stream_response(Req, ProxyDest, ReqId) ->
+ receive
+ {ibrowse_async_headers, ReqId, "100", _} ->
+ % ibrowse doesn't handle 100 Continue responses which
+ % means we have to discard them so the proxy client
+ % doesn't get confused.
+ ibrowse:stream_next(ReqId),
+ stream_response(Req, ProxyDest, ReqId);
+ {ibrowse_async_headers, ReqId, Status, Headers} ->
+ {Source, Dest} = get_urls(Req, ProxyDest),
+ FixedHeaders = fix_headers(Source, Dest, Headers, []),
+ case body_length(FixedHeaders) of
+ chunked ->
+ {ok, Resp} = couch_httpd:start_chunked_response(
+ Req, list_to_integer(Status), FixedHeaders
+ ),
+ ibrowse:stream_next(ReqId),
+ stream_chunked_response(Req, ReqId, Resp),
+ {ok, Resp};
+ Length when is_integer(Length) ->
+ {ok, Resp} = couch_httpd:start_response_length(
+ Req, list_to_integer(Status), FixedHeaders, Length
+ ),
+ ibrowse:stream_next(ReqId),
+ stream_length_response(Req, ReqId, Resp),
+ {ok, Resp};
+ _ ->
+ {ok, Resp} = couch_httpd:start_response(
+ Req, list_to_integer(Status), FixedHeaders
+ ),
+ ibrowse:stream_next(ReqId),
+ stream_length_response(Req, ReqId, Resp),
+ % XXX: MochiWeb apparently doesn't look at the
+ % response to see if it must force close the
+ % connection. So we help it out here.
+ erlang:put(mochiweb_request_force_close, true),
+ {ok, Resp}
+ end
+ end.
+
+
+stream_chunked_response(Req, ReqId, Resp) ->
+ receive
+ {ibrowse_async_response, ReqId, {error, Reason}} ->
+ throw({error, Reason});
+ {ibrowse_async_response, ReqId, Chunk} ->
+ couch_httpd:send_chunk(Resp, Chunk),
+ ibrowse:stream_next(ReqId),
+ stream_chunked_response(Req, ReqId, Resp);
+ {ibrowse_async_response_end, ReqId} ->
+ couch_httpd:last_chunk(Resp)
+ end.
+
+
+stream_length_response(Req, ReqId, Resp) ->
+ receive
+ {ibrowse_async_response, ReqId, {error, Reason}} ->
+ throw({error, Reason});
+ {ibrowse_async_response, ReqId, Chunk} ->
+ couch_httpd:send(Resp, Chunk),
+ ibrowse:stream_next(ReqId),
+ stream_length_response(Req, ReqId, Resp);
+ {ibrowse_async_response_end, ReqId} ->
+ ok
+ end.
+
+
+get_urls(Req, ProxyDest) ->
+ SourceUrl = couch_httpd:absolute_uri(Req, "/" ++ hd(Req#httpd.path_parts)),
+ Source = parse_url(?b2l(iolist_to_binary(SourceUrl))),
+ case (catch parse_url(ProxyDest)) of
+ Dest when is_record(Dest, url) ->
+ {Source, Dest};
+ _ ->
+ DestUrl = couch_httpd:absolute_uri(Req, ProxyDest),
+ {Source, parse_url(DestUrl)}
+ end.
+
+
+fix_headers(_, _, [], Acc) ->
+ lists:reverse(Acc);
+fix_headers(Source, Dest, [{K, V} | Rest], Acc) ->
+ Fixed = case string:to_lower(K) of
+ "location" -> rewrite_location(Source, Dest, V);
+ "content-location" -> rewrite_location(Source, Dest, V);
+ "uri" -> rewrite_location(Source, Dest, V);
+ "destination" -> rewrite_location(Source, Dest, V);
+ "set-cookie" -> rewrite_cookie(Source, Dest, V);
+ _ -> V
+ end,
+ fix_headers(Source, Dest, Rest, [{K, Fixed} | Acc]).
+
+
+rewrite_location(Source, #url{host=Host, port=Port, protocol=Proto}, Url) ->
+ case (catch parse_url(Url)) of
+ #url{host=Host, port=Port, protocol=Proto} = Location ->
+ DestLoc = #url{
+ protocol=Source#url.protocol,
+ host=Source#url.host,
+ port=Source#url.port,
+ path=join_url_path(Source#url.path, Location#url.path)
+ },
+ url_to_url(DestLoc);
+ #url{} ->
+ Url;
+ _ ->
+ url_to_url(Source#url{path=join_url_path(Source#url.path, Url)})
+ end.
+
+
+rewrite_cookie(_Source, _Dest, Cookie) ->
+ Cookie.
+
+
+parse_url(Url) when is_binary(Url) ->
+ ibrowse_lib:parse_url(?b2l(Url));
+parse_url(Url) when is_list(Url) ->
+ ibrowse_lib:parse_url(?b2l(iolist_to_binary(Url))).
+
+
+join_url_path(Src, Dst) ->
+ Src2 = case lists:reverse(Src) of
+ "/" ++ RestSrc -> lists:reverse(RestSrc);
+ _ -> Src
+ end,
+ Dst2 = case Dst of
+ "/" ++ RestDst -> RestDst;
+ _ -> Dst
+ end,
+ Src2 ++ "/" ++ Dst2.
+
+
+url_to_url(#url{host=Host, port=Port, path=Path, protocol=Proto} = Url) ->
+ LPort = case {Proto, Port} of
+ {http, 80} -> "";
+ {https, 443} -> "";
+ _ -> ":" ++ integer_to_list(Port)
+ end,
+ LPath = case Path of
+ "/" ++ _RestPath -> Path;
+ _ -> "/" ++ Path
+ end,
+ HostPart = case Url#url.host_type of
+ ipv6_address ->
+ "[" ++ Host ++ "]";
+ _ ->
+ Host
+ end,
+ atom_to_list(Proto) ++ "://" ++ HostPart ++ LPort ++ LPath.
+
+
+body_length(Headers) ->
+ case is_chunked(Headers) of
+ true -> chunked;
+ _ -> content_length(Headers)
+ end.
+
+
+is_chunked([]) ->
+ false;
+is_chunked([{K, V} | Rest]) ->
+ case string:to_lower(K) of
+ "transfer-encoding" ->
+ string:to_lower(V) == "chunked";
+ _ ->
+ is_chunked(Rest)
+ end.
+
+content_length([]) ->
+ undefined;
+content_length([{K, V} | Rest]) ->
+ case string:to_lower(K) of
+ "content-length" ->
+ list_to_integer(V);
+ _ ->
+ content_length(Rest)
+ end.
+