diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_db.erl | 41 | ||||
-rw-r--r-- | src/couchdb/couch_httpd.erl | 8 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 21 | ||||
-rw-r--r-- | src/couchdb/couch_stream.erl | 1 | ||||
-rw-r--r-- | src/mochiweb/mochiweb_request.erl | 36 |
5 files changed, 92 insertions, 15 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index c35bb913..e9612b47 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -24,7 +24,6 @@ -export([start_link/3,make_doc/2,set_admins/2,get_admins/1,ensure_full_commit/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). - -include("couch_db.hrl"). @@ -400,8 +399,14 @@ doc_flush_binaries(Doc, Fd) -> % written to a different file SizeAcc + Len; {_Key, {_Type, Bin}} when is_binary(Bin) -> + % we have a new binary to write SizeAcc + size(Bin); + {_Key, {_Type, {Fun, undefined}}} when is_function(Fun) -> + % function without a known length + % we'll have to alloc as we go with this one, for now, nothing + SizeAcc; {_Key, {_Type, {Fun, Len}}} when is_function(Fun) -> + % function to yield binary data with known length SizeAcc + Len end end, @@ -409,7 +414,6 @@ doc_flush_binaries(Doc, Fd) -> {ok, OutputStream} = couch_stream:open(Fd), ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize), - NewBins = lists:map( fun({Key, {Type, BinValue}}) -> NewBinValue = @@ -436,6 +440,18 @@ doc_flush_binaries(Doc, Fd) -> Bin when is_binary(Bin) -> {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), {Fd, StreamPointer, size(Bin)}; + {StreamFun, undefined} when is_function(StreamFun) -> + % we will throw an error if the client + % sends a chunk larger than this size + MaxChunkSize = list_to_integer(couch_config:get("couchdb", + "max_attachment_chunk_size","4294967296")), + WriterFun = make_writer_fun(OutputStream), + % StreamFun(MaxChunkSize, WriterFun) + % will call our WriterFun + % once for each chunk of the attachment. + {ok, {TotalLength, NewStreamPointer}} = + StreamFun(MaxChunkSize, WriterFun, {0, nil}), + {Fd, NewStreamPointer, TotalLength}; {Fun, Len} when is_function(Fun) -> {ok, StreamPointer} = write_streamed_attachment(OutputStream, Fun, Len, nil), @@ -445,8 +461,27 @@ doc_flush_binaries(Doc, Fd) -> end, Bins), {ok, _FinalPos} = couch_stream:close(OutputStream), - Doc#doc{attachments = NewBins}. + + +make_writer_fun(Stream) -> + % WriterFun({Length, Binary}, State) + % WriterFun({0, _Footers}, State) + % Called with Length == 0 on the last time. + % WriterFun returns NewState. + fun + ({0, _Footers}, {FinalLen, SpFin}) -> + % last block, return the final tuple + {ok, {FinalLen, SpFin}}; + ({Length, Bin}, {Total, nil}) -> + % save StreamPointer + {ok, StreamPointer} = couch_stream:write(Stream, Bin), + {Total+Length, StreamPointer}; + ({Length, Bin}, {Total, SpAcc}) -> + % write the Bin to disk + {ok, _Sp} = couch_stream:write(Stream, Bin), + {Total+Length, SpAcc} + end. write_streamed_attachment(_Stream, _F, 0, SpAcc) -> {ok, SpAcc}; diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index 6b9079c4..9969177e 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -16,7 +16,7 @@ -export([start_link/0, stop/0, handle_request/3]). -export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2]). --export([verify_is_server_admin/1,unquote/1,quote/1,recv/2]). +-export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4]). -export([parse_form/1,json_body/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]). -export([primary_header_value/2,partition/1,serve_file/3]). -export([start_chunked_response/3,send_chunk/2]). @@ -260,6 +260,12 @@ parse_form(#httpd{mochi_req=MochiReq}) -> recv(#httpd{mochi_req=MochiReq}, Len) -> MochiReq:recv(Len). +recv_chunked(#httpd{mochi_req=MochiReq}, MaxChunkSize, ChunkFun, InitState) -> + % Fun is called once with each chunk + % Fun({Length, Binary}, State) + % called with Length == 0 on the last time. + MochiReq:recv_body(MaxChunkSize, ChunkFun, InitState). + body(#httpd{mochi_req=MochiReq}) -> % Maximum size of document PUT request body (4GB) MaxSize = list_to_integer( diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index ae925123..89f8ce04 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -585,18 +585,31 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) when (Method == 'PUT') or (Method == 'DELETE') -> - FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1, FileNameParts),"/")), + FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1, + FileNameParts),"/")), NewAttachment = case Method of 'DELETE' -> []; _ -> + % see couch_db:doc_flush_binaries for usage of this structure [{FileName, { - list_to_binary(couch_httpd:header_value(Req,"Content-Type")), + case couch_httpd:header_value(Req,"Content-Type") of + undefined -> + % We could throw an error here or guess by the FileName. + % Currently, just giving it a default. + <<"application/octet-stream">>; + CType -> + list_to_binary(CType) + end, case couch_httpd:header_value(Req,"Content-Length") of undefined -> - throw({bad_request, "Attachment uploads must be fixed length"}); + {fun(MaxChunkSize, ChunkFun, InitState) -> + couch_httpd:recv_chunked(Req, MaxChunkSize, + ChunkFun, InitState) + end, undefined}; Length -> - {fun() -> couch_httpd:recv(Req, 0) end, list_to_integer(Length)} + {fun() -> couch_httpd:recv(Req, 0) end, + list_to_integer(Length)} end }}] end, diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index d957268f..d6f72696 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -136,6 +136,7 @@ handle_call(get_state, _From, Stream) -> {reply, {Pos, BytesRemaining}, Stream}; handle_call({set_min_buffer, MinBuffer}, _From, Stream) -> {reply, ok, Stream#write_stream{min_alloc = MinBuffer}}; +% set next_alloc if we need more room handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) -> #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream, case BytesRemainingInCurrentBuffer < BufferSizeRequested of diff --git a/src/mochiweb/mochiweb_request.erl b/src/mochiweb/mochiweb_request.erl index 311ed507..7fc04f8b 100644 --- a/src/mochiweb/mochiweb_request.erl +++ b/src/mochiweb/mochiweb_request.erl @@ -12,7 +12,7 @@ -define(READ_SIZE, 8192). -export([get_header_value/1, get_primary_header_value/1, get/1, dump/0]). --export([send/1, recv/1, recv/2, recv_body/0, recv_body/1]). +-export([send/1, recv/1, recv/2, recv_body/0, recv_body/1, recv_body/3]). -export([start_response/1, start_response_length/1, start_raw_response/1]). -export([respond/1, ok/1]). -export([not_found/0, not_found/1]). @@ -171,6 +171,9 @@ recv_body() -> %% @doc Receive the body of the HTTP request (defined by Content-Length). %% Will receive up to MaxBody bytes. recv_body(MaxBody) -> + recv_body(MaxBody, nil, nil). + +recv_body(MaxBody, ChunkFun, ChunkAcc) -> case get_header_value("expect") of "100-continue" -> start_raw_response({100, gb_trees:empty()}); @@ -183,7 +186,15 @@ recv_body(MaxBody) -> {unknown_transfer_encoding, Unknown} -> exit({unknown_transfer_encoding, Unknown}); chunked -> - read_chunked_body(MaxBody, []); + case ChunkFun of + nil -> + read_chunked_body(MaxBody); + _StreamFun -> + % In this case the MaxBody is actually used to + % determine the maximum allowed size of a single + % chunk. + stream_chunked_body(MaxBody, ChunkFun, ChunkAcc) + end; 0 -> <<>>; Length when is_integer(Length), Length =< MaxBody -> @@ -408,15 +419,26 @@ parse_post() -> Cached end. -read_chunked_body(Max, Acc) -> +read_chunked_body(MaxBufferSize) -> + stream_chunked_body(MaxBufferSize, fun + ({0, _}, Acc) -> + iolist_to_binary(lists:reverse(Acc)); + ({_Length, Bin}, Acc) -> + [Bin | Acc] + end, []). + +%% @spec stream_chunked_body(integer(), fun(), term()) -> term() +%% @doc The function is called for each chunk. +%% Used internally by read_chunked_body. +stream_chunked_body(MaxChunkSize, Fun, FunState) -> case read_chunk_length() of 0 -> - read_chunk(0), - iolist_to_binary(lists:reverse(Acc)); - Length when Length > Max -> + Fun({0, read_chunk(0)}, FunState); + Length when Length > MaxChunkSize -> exit({body_too_large, chunked}); Length -> - read_chunked_body(Max - Length, [read_chunk(Length) | Acc]) + NewState = Fun({Length, read_chunk(Length)}, FunState), + stream_chunked_body(MaxChunkSize, Fun, NewState) end. %% @spec read_chunk_length() -> integer() |