summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--etc/couchdb/default.ini.tpl.in1
-rw-r--r--src/couchdb/couch_db.erl41
-rw-r--r--src/couchdb/couch_httpd.erl8
-rw-r--r--src/couchdb/couch_httpd_db.erl21
-rw-r--r--src/couchdb/couch_stream.erl1
-rw-r--r--src/mochiweb/mochiweb_request.erl36
6 files changed, 93 insertions, 15 deletions
diff --git a/etc/couchdb/default.ini.tpl.in b/etc/couchdb/default.ini.tpl.in
index 7c6f417a..e94e1437 100644
--- a/etc/couchdb/default.ini.tpl.in
+++ b/etc/couchdb/default.ini.tpl.in
@@ -7,6 +7,7 @@ database_dir = %localstatelibdir%
view_index_dir = %localstatelibdir%
util_driver_dir = %couchprivlibdir%
max_document_size = 4294967296 ; 4 GB
+max_attachment_chunk_size = 4294967296 ; 4GB
view_timeout = 5000 ; 5 seconds
max_dbs_open = 100
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index c35bb913..e9612b47 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -24,7 +24,6 @@
-export([start_link/3,make_doc/2,set_admins/2,get_admins/1,ensure_full_commit/1]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-
-include("couch_db.hrl").
@@ -400,8 +399,14 @@ doc_flush_binaries(Doc, Fd) ->
% written to a different file
SizeAcc + Len;
{_Key, {_Type, Bin}} when is_binary(Bin) ->
+ % we have a new binary to write
SizeAcc + size(Bin);
+ {_Key, {_Type, {Fun, undefined}}} when is_function(Fun) ->
+ % function without a known length
+ % we'll have to alloc as we go with this one, for now, nothing
+ SizeAcc;
{_Key, {_Type, {Fun, Len}}} when is_function(Fun) ->
+ % function to yield binary data with known length
SizeAcc + Len
end
end,
@@ -409,7 +414,6 @@ doc_flush_binaries(Doc, Fd) ->
{ok, OutputStream} = couch_stream:open(Fd),
ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize),
-
NewBins = lists:map(
fun({Key, {Type, BinValue}}) ->
NewBinValue =
@@ -436,6 +440,18 @@ doc_flush_binaries(Doc, Fd) ->
Bin when is_binary(Bin) ->
{ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
{Fd, StreamPointer, size(Bin)};
+ {StreamFun, undefined} when is_function(StreamFun) ->
+ % we will throw an error if the client
+ % sends a chunk larger than this size
+ MaxChunkSize = list_to_integer(couch_config:get("couchdb",
+ "max_attachment_chunk_size","4294967296")),
+ WriterFun = make_writer_fun(OutputStream),
+ % StreamFun(MaxChunkSize, WriterFun)
+ % will call our WriterFun
+ % once for each chunk of the attachment.
+ {ok, {TotalLength, NewStreamPointer}} =
+ StreamFun(MaxChunkSize, WriterFun, {0, nil}),
+ {Fd, NewStreamPointer, TotalLength};
{Fun, Len} when is_function(Fun) ->
{ok, StreamPointer} =
write_streamed_attachment(OutputStream, Fun, Len, nil),
@@ -445,8 +461,27 @@ doc_flush_binaries(Doc, Fd) ->
end, Bins),
{ok, _FinalPos} = couch_stream:close(OutputStream),
-
Doc#doc{attachments = NewBins}.
+
+
+make_writer_fun(Stream) ->
+ % WriterFun({Length, Binary}, State)
+ % WriterFun({0, _Footers}, State)
+ % Called with Length == 0 on the last time.
+ % WriterFun returns NewState.
+ fun
+ ({0, _Footers}, {FinalLen, SpFin}) ->
+ % last block, return the final tuple
+ {ok, {FinalLen, SpFin}};
+ ({Length, Bin}, {Total, nil}) ->
+ % save StreamPointer
+ {ok, StreamPointer} = couch_stream:write(Stream, Bin),
+ {Total+Length, StreamPointer};
+ ({Length, Bin}, {Total, SpAcc}) ->
+ % write the Bin to disk
+ {ok, _Sp} = couch_stream:write(Stream, Bin),
+ {Total+Length, SpAcc}
+ end.
write_streamed_attachment(_Stream, _F, 0, SpAcc) ->
{ok, SpAcc};
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index 6b9079c4..9969177e 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -16,7 +16,7 @@
-export([start_link/0, stop/0, handle_request/3]).
-export([header_value/2,header_value/3,qs_value/2,qs_value/3,qs/1,path/1,absolute_uri/2]).
--export([verify_is_server_admin/1,unquote/1,quote/1,recv/2]).
+-export([verify_is_server_admin/1,unquote/1,quote/1,recv/2,recv_chunked/4]).
-export([parse_form/1,json_body/1,body/1,doc_etag/1, make_etag/1, etag_respond/3]).
-export([primary_header_value/2,partition/1,serve_file/3]).
-export([start_chunked_response/3,send_chunk/2]).
@@ -260,6 +260,12 @@ parse_form(#httpd{mochi_req=MochiReq}) ->
recv(#httpd{mochi_req=MochiReq}, Len) ->
MochiReq:recv(Len).
+recv_chunked(#httpd{mochi_req=MochiReq}, MaxChunkSize, ChunkFun, InitState) ->
+ % Fun is called once with each chunk
+ % Fun({Length, Binary}, State)
+ % called with Length == 0 on the last time.
+ MochiReq:recv_body(MaxChunkSize, ChunkFun, InitState).
+
body(#httpd{mochi_req=MochiReq}) ->
% Maximum size of document PUT request body (4GB)
MaxSize = list_to_integer(
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index ae925123..89f8ce04 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -585,18 +585,31 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts)
when (Method == 'PUT') or (Method == 'DELETE') ->
- FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1, FileNameParts),"/")),
+ FileName = list_to_binary(mochiweb_util:join(lists:map(fun binary_to_list/1,
+ FileNameParts),"/")),
NewAttachment = case Method of
'DELETE' ->
[];
_ ->
+ % see couch_db:doc_flush_binaries for usage of this structure
[{FileName, {
- list_to_binary(couch_httpd:header_value(Req,"Content-Type")),
+ case couch_httpd:header_value(Req,"Content-Type") of
+ undefined ->
+ % We could throw an error here or guess by the FileName.
+ % Currently, just giving it a default.
+ <<"application/octet-stream">>;
+ CType ->
+ list_to_binary(CType)
+ end,
case couch_httpd:header_value(Req,"Content-Length") of
undefined ->
- throw({bad_request, "Attachment uploads must be fixed length"});
+ {fun(MaxChunkSize, ChunkFun, InitState) ->
+ couch_httpd:recv_chunked(Req, MaxChunkSize,
+ ChunkFun, InitState)
+ end, undefined};
Length ->
- {fun() -> couch_httpd:recv(Req, 0) end, list_to_integer(Length)}
+ {fun() -> couch_httpd:recv(Req, 0) end,
+ list_to_integer(Length)}
end
}}]
end,
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index d957268f..d6f72696 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -136,6 +136,7 @@ handle_call(get_state, _From, Stream) ->
{reply, {Pos, BytesRemaining}, Stream};
handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
{reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
+% set next_alloc if we need more room
handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
#write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
case BytesRemainingInCurrentBuffer < BufferSizeRequested of
diff --git a/src/mochiweb/mochiweb_request.erl b/src/mochiweb/mochiweb_request.erl
index 311ed507..7fc04f8b 100644
--- a/src/mochiweb/mochiweb_request.erl
+++ b/src/mochiweb/mochiweb_request.erl
@@ -12,7 +12,7 @@
-define(READ_SIZE, 8192).
-export([get_header_value/1, get_primary_header_value/1, get/1, dump/0]).
--export([send/1, recv/1, recv/2, recv_body/0, recv_body/1]).
+-export([send/1, recv/1, recv/2, recv_body/0, recv_body/1, recv_body/3]).
-export([start_response/1, start_response_length/1, start_raw_response/1]).
-export([respond/1, ok/1]).
-export([not_found/0, not_found/1]).
@@ -171,6 +171,9 @@ recv_body() ->
%% @doc Receive the body of the HTTP request (defined by Content-Length).
%% Will receive up to MaxBody bytes.
recv_body(MaxBody) ->
+ recv_body(MaxBody, nil, nil).
+
+recv_body(MaxBody, ChunkFun, ChunkAcc) ->
case get_header_value("expect") of
"100-continue" ->
start_raw_response({100, gb_trees:empty()});
@@ -183,7 +186,15 @@ recv_body(MaxBody) ->
{unknown_transfer_encoding, Unknown} ->
exit({unknown_transfer_encoding, Unknown});
chunked ->
- read_chunked_body(MaxBody, []);
+ case ChunkFun of
+ nil ->
+ read_chunked_body(MaxBody);
+ _StreamFun ->
+ % In this case the MaxBody is actually used to
+ % determine the maximum allowed size of a single
+ % chunk.
+ stream_chunked_body(MaxBody, ChunkFun, ChunkAcc)
+ end;
0 ->
<<>>;
Length when is_integer(Length), Length =< MaxBody ->
@@ -408,15 +419,26 @@ parse_post() ->
Cached
end.
-read_chunked_body(Max, Acc) ->
+read_chunked_body(MaxBufferSize) ->
+ stream_chunked_body(MaxBufferSize, fun
+ ({0, _}, Acc) ->
+ iolist_to_binary(lists:reverse(Acc));
+ ({_Length, Bin}, Acc) ->
+ [Bin | Acc]
+ end, []).
+
+%% @spec stream_chunked_body(integer(), fun(), term()) -> term()
+%% @doc The function is called for each chunk.
+%% Used internally by read_chunked_body.
+stream_chunked_body(MaxChunkSize, Fun, FunState) ->
case read_chunk_length() of
0 ->
- read_chunk(0),
- iolist_to_binary(lists:reverse(Acc));
- Length when Length > Max ->
+ Fun({0, read_chunk(0)}, FunState);
+ Length when Length > MaxChunkSize ->
exit({body_too_large, chunked});
Length ->
- read_chunked_body(Max - Length, [read_chunk(Length) | Acc])
+ NewState = Fun({Length, read_chunk(Length)}, FunState),
+ stream_chunked_body(MaxChunkSize, Fun, NewState)
end.
%% @spec read_chunk_length() -> integer()