summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_stream.erl
diff options
context:
space:
mode:
authorJohn Christopher Anderson <jchris@apache.org>2010-01-29 22:43:33 +0000
committerJohn Christopher Anderson <jchris@apache.org>2010-01-29 22:43:33 +0000
commitee09a0de9f8356abe24a0ac0f26cdff35f8fa704 (patch)
tree1cf02264f5ee72216e5add7deda235c1504cf5ec /src/couchdb/couch_stream.erl
parent5affb01e4ee059ad9b82000625f2bdc989019a16 (diff)
Allow storing attachments in compressed form. Closes COUCHDB-583. Thanks Filipe Manana
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@904650 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_stream.erl')
-rw-r--r--src/couchdb/couch_stream.erl153
1 files changed, 132 insertions, 21 deletions
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index 2a873e4c..cdbbe552 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -24,7 +24,7 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
--export([open/1, close/1, write/2, foldl/4, foldl/5,
+-export([open/1, open/2, close/1, write/2, foldl/4, foldl/5, foldl_unzip/5,
old_foldl/5,old_copy_to_new_stream/4]).
-export([copy_to_new_stream/3,old_read_term/2]).
-export([init/1, terminate/2, handle_call/3]).
@@ -39,14 +39,22 @@
buffer_len = 0,
max_buffer = 4096,
written_len = 0,
- md5
+ md5,
+ % md5 of the content without any transformation applied (e.g. compression)
+ % needed for the attachment upload integrity check (ticket 558)
+ identity_md5,
+ identity_len = 0,
+ zstream
}).
%%% Interface functions %%%
open(Fd) ->
- gen_server:start_link(couch_stream, Fd, []).
+ open(Fd, 0).
+
+open(Fd, CompressionLevel) ->
+ gen_server:start_link(couch_stream, {Fd, CompressionLevel}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
@@ -82,11 +90,31 @@ foldl(Fd, [Pos|Rest], Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
+foldl_unzip(Fd, PosList, Fun, Acc) ->
+ Z = unzip_init(),
+ Result = do_foldl_unzip(Z, Fd, PosList, Fun, Acc),
+ unzip_end(Z),
+ Result.
+
+do_foldl_unzip(_Z, _Fd, [], _Fun, Acc) ->
+ Acc;
+do_foldl_unzip(Z, Fd, [Pos|Rest], Fun, Acc) ->
+ {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
+ Bin = zlib:inflate(Z, BinZip),
+ do_foldl_unzip(Z, Fd, Rest, Fun, Fun(Bin, Acc)).
+
foldl(Fd, PosList, <<>>, Fun, Acc) ->
foldl(Fd, PosList, Fun, Acc);
foldl(Fd, PosList, Md5, Fun, Acc) ->
foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc).
+foldl_unzip(Fd, PosList, <<>>, Fun, Acc) ->
+ foldl_unzip(Fd, PosList, Fun, Acc);
+foldl_unzip(Fd, PosList, Md5, Fun, Acc) ->
+ Z = unzip_init(),
+ Result = foldl_unzip(Z, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc),
+ unzip_end(Z),
+ Result.
foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = erlang:md5_final(Md5Acc),
@@ -99,14 +127,62 @@ foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
+foldl_unzip(_Z, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+ Md5 = erlang:md5_final(Md5Acc),
+ Acc;
+foldl_unzip(Z, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, BinZip)),
+ Bin = zlib:inflate(Z, BinZip),
+ Fun(Bin, Acc);
+foldl_unzip(Z, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
+ Bin = zlib:inflate(Z, BinZip),
+ Md5Acc2 = erlang:md5_update(Md5Acc, BinZip),
+ foldl_unzip(Z, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+zip_init(CompressionLevel) ->
+ Z = zlib:open(),
+ % 15 = ?MAX_WBITS (defined in the zlib module)
+ % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
+ zlib:deflateInit(Z, CompressionLevel, deflated, 16 + 15, 8, default),
+ Z.
+
+zip_end(Z, Data) ->
+ Last = zlib:deflate(Z, Data, finish),
+ zlib:deflateEnd(Z),
+ zlib:close(Z),
+ Last.
+
+unzip_init() ->
+ Z = zlib:open(),
+ zlib:inflateInit(Z, 16 + 15),
+ Z.
+
+unzip_end(Z) ->
+ zlib:inflateEnd(Z),
+ zlib:close(Z).
+
write(_Pid, <<>>) ->
ok;
write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
-init(Fd) ->
- {ok, #stream{fd=Fd, md5=erlang:md5_init()}}.
+init({Fd, CompressionLevel}) ->
+ Z = case CompressionLevel >= 1 andalso CompressionLevel =< 9 of
+ true ->
+ zip_init(CompressionLevel);
+ _ ->
+ undefined
+ end,
+ {ok, #stream{
+ fd=Fd,
+ md5=erlang:md5_init(),
+ identity_md5=erlang:md5_init(),
+ zstream=Z
+ }
+ }.
terminate(_Reason, _Stream) ->
ok.
@@ -120,39 +196,74 @@ handle_call({write, Bin}, _From, Stream) ->
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
- md5 = Md5} = Stream,
+ md5 = Md5,
+ identity_md5 = IdenMd5,
+ identity_len = IdenLen,
+ zstream = Z} = Stream,
if BinSize + BufferLen > Max ->
WriteBin = lists:reverse(Buffer, [Bin]),
- Md5_2 = erlang:md5_update(Md5, WriteBin),
- {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
+ IdenMd5_2 = erlang:md5_update(IdenMd5, WriteBin),
+ WriteBin2 = case Z of
+ undefined ->
+ WriteBin;
+ _ ->
+ zlib:deflate(Z, WriteBin)
+ end,
+ case WriteBin2 of
+ [] ->
+ % case where zlib did some internal buffering
+ WrittenLen2 = WrittenLen,
+ Md5_2 = Md5,
+ Written2 = Written;
+ _ ->
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
+ WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
+ Md5_2 = erlang:md5_update(Md5, WriteBin2),
+ Written2 = [Pos|Written]
+ end,
+
{reply, ok, Stream#stream{
- written_len=WrittenLen + BufferLen + BinSize,
- written_pointers=[Pos|Written],
+ written_len=WrittenLen2,
+ written_pointers=Written2,
buffer_list=[],
buffer_len=0,
- md5=Md5_2}};
+ md5=Md5_2,
+ identity_md5=IdenMd5_2,
+ identity_len=IdenLen + BinSize}};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
- buffer_len=BufferLen + BinSize}}
+ buffer_len=BufferLen + BinSize,
+ identity_len=IdenLen + BinSize}}
end;
handle_call(close, _From, Stream) ->
#stream{
fd = Fd,
written_len = WrittenLen,
written_pointers = Written,
- buffer_len = BufferLen,
buffer_list = Buffer,
- md5 = Md5} = Stream,
-
- case Buffer of
+ md5 = Md5,
+ identity_md5 = IdenMd5,
+ identity_len = IdenLen,
+ zstream = Z} = Stream,
+
+ WriteBin = lists:reverse(Buffer),
+ IdenMd5Final = erlang:md5_final(erlang:md5_update(IdenMd5, WriteBin)),
+ WriteBin2 = case Z of
+ undefined ->
+ WriteBin;
+ _ ->
+ zip_end(Z, WriteBin)
+ end,
+ Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin2)),
+ Result = case WriteBin2 of
[] ->
- Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)};
+ {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
- WriteBin = lists:reverse(Buffer),
- Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)),
- {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
- Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final}
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
+ StreamInfo = lists:reverse(Written, [Pos]),
+ StreamLen = WrittenLen + iolist_size(WriteBin2),
+ {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
{stop, normal, Result, Stream}.