From ee09a0de9f8356abe24a0ac0f26cdff35f8fa704 Mon Sep 17 00:00:00 2001 From: John Christopher Anderson Date: Fri, 29 Jan 2010 22:43:33 +0000 Subject: Allow storing attachments in compressed form. Closes COUCHDB-583. Thanks Filipe Manana git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@904650 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_stream.erl | 153 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 132 insertions(+), 21 deletions(-) (limited to 'src/couchdb/couch_stream.erl') diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index 2a873e4c..cdbbe552 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -24,7 +24,7 @@ -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data --export([open/1, close/1, write/2, foldl/4, foldl/5, +-export([open/1, open/2, close/1, write/2, foldl/4, foldl/5, foldl_unzip/5, old_foldl/5,old_copy_to_new_stream/4]). -export([copy_to_new_stream/3,old_read_term/2]). -export([init/1, terminate/2, handle_call/3]). @@ -39,14 +39,22 @@ buffer_len = 0, max_buffer = 4096, written_len = 0, - md5 + md5, + % md5 of the content without any transformation applied (e.g. compression) + % needed for the attachment upload integrity check (ticket 558) + identity_md5, + identity_len = 0, + zstream }). %%% Interface functions %%% open(Fd) -> - gen_server:start_link(couch_stream, Fd, []). + open(Fd, 0). + +open(Fd, CompressionLevel) -> + gen_server:start_link(couch_stream, {Fd, CompressionLevel}, []). close(Pid) -> gen_server:call(Pid, close, infinity). @@ -82,11 +90,31 @@ foldl(Fd, [Pos|Rest], Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Fun, Fun(Bin, Acc)). +foldl_unzip(Fd, PosList, Fun, Acc) -> + Z = unzip_init(), + Result = do_foldl_unzip(Z, Fd, PosList, Fun, Acc), + unzip_end(Z), + Result. + +do_foldl_unzip(_Z, _Fd, [], _Fun, Acc) -> + Acc; +do_foldl_unzip(Z, Fd, [Pos|Rest], Fun, Acc) -> + {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), + Bin = zlib:inflate(Z, BinZip), + do_foldl_unzip(Z, Fd, Rest, Fun, Fun(Bin, Acc)). + foldl(Fd, PosList, <<>>, Fun, Acc) -> foldl(Fd, PosList, Fun, Acc); foldl(Fd, PosList, Md5, Fun, Acc) -> foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc). +foldl_unzip(Fd, PosList, <<>>, Fun, Acc) -> + foldl_unzip(Fd, PosList, Fun, Acc); +foldl_unzip(Fd, PosList, Md5, Fun, Acc) -> + Z = unzip_init(), + Result = foldl_unzip(Z, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc), + unzip_end(Z), + Result. foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> Md5 = erlang:md5_final(Md5Acc), @@ -99,14 +127,62 @@ foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). +foldl_unzip(_Z, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> + Md5 = erlang:md5_final(Md5Acc), + Acc; +foldl_unzip(Z, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> + {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), + Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, BinZip)), + Bin = zlib:inflate(Z, BinZip), + Fun(Bin, Acc); +foldl_unzip(Z, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> + {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), + Bin = zlib:inflate(Z, BinZip), + Md5Acc2 = erlang:md5_update(Md5Acc, BinZip), + foldl_unzip(Z, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). + +zip_init(CompressionLevel) -> + Z = zlib:open(), + % 15 = ?MAX_WBITS (defined in the zlib module) + % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 + zlib:deflateInit(Z, CompressionLevel, deflated, 16 + 15, 8, default), + Z. + +zip_end(Z, Data) -> + Last = zlib:deflate(Z, Data, finish), + zlib:deflateEnd(Z), + zlib:close(Z), + Last. + +unzip_init() -> + Z = zlib:open(), + zlib:inflateInit(Z, 16 + 15), + Z. + +unzip_end(Z) -> + zlib:inflateEnd(Z), + zlib:close(Z). + write(_Pid, <<>>) -> ok; write(Pid, Bin) -> gen_server:call(Pid, {write, Bin}, infinity). -init(Fd) -> - {ok, #stream{fd=Fd, md5=erlang:md5_init()}}. +init({Fd, CompressionLevel}) -> + Z = case CompressionLevel >= 1 andalso CompressionLevel =< 9 of + true -> + zip_init(CompressionLevel); + _ -> + undefined + end, + {ok, #stream{ + fd=Fd, + md5=erlang:md5_init(), + identity_md5=erlang:md5_init(), + zstream=Z + } + }. terminate(_Reason, _Stream) -> ok. @@ -120,39 +196,74 @@ handle_call({write, Bin}, _From, Stream) -> buffer_len = BufferLen, buffer_list = Buffer, max_buffer = Max, - md5 = Md5} = Stream, + md5 = Md5, + identity_md5 = IdenMd5, + identity_len = IdenLen, + zstream = Z} = Stream, if BinSize + BufferLen > Max -> WriteBin = lists:reverse(Buffer, [Bin]), - Md5_2 = erlang:md5_update(Md5, WriteBin), - {ok, Pos} = couch_file:append_binary(Fd, WriteBin), + IdenMd5_2 = erlang:md5_update(IdenMd5, WriteBin), + WriteBin2 = case Z of + undefined -> + WriteBin; + _ -> + zlib:deflate(Z, WriteBin) + end, + case WriteBin2 of + [] -> + % case where zlib did some internal buffering + WrittenLen2 = WrittenLen, + Md5_2 = Md5, + Written2 = Written; + _ -> + {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), + WrittenLen2 = WrittenLen + iolist_size(WriteBin2), + Md5_2 = erlang:md5_update(Md5, WriteBin2), + Written2 = [Pos|Written] + end, + {reply, ok, Stream#stream{ - written_len=WrittenLen + BufferLen + BinSize, - written_pointers=[Pos|Written], + written_len=WrittenLen2, + written_pointers=Written2, buffer_list=[], buffer_len=0, - md5=Md5_2}}; + md5=Md5_2, + identity_md5=IdenMd5_2, + identity_len=IdenLen + BinSize}}; true -> {reply, ok, Stream#stream{ buffer_list=[Bin|Buffer], - buffer_len=BufferLen + BinSize}} + buffer_len=BufferLen + BinSize, + identity_len=IdenLen + BinSize}} end; handle_call(close, _From, Stream) -> #stream{ fd = Fd, written_len = WrittenLen, written_pointers = Written, - buffer_len = BufferLen, buffer_list = Buffer, - md5 = Md5} = Stream, - - case Buffer of + md5 = Md5, + identity_md5 = IdenMd5, + identity_len = IdenLen, + zstream = Z} = Stream, + + WriteBin = lists:reverse(Buffer), + IdenMd5Final = erlang:md5_final(erlang:md5_update(IdenMd5, WriteBin)), + WriteBin2 = case Z of + undefined -> + WriteBin; + _ -> + zip_end(Z, WriteBin) + end, + Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin2)), + Result = case WriteBin2 of [] -> - Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)}; + {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; _ -> - WriteBin = lists:reverse(Buffer), - Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)), - {ok, Pos} = couch_file:append_binary(Fd, WriteBin), - Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final} + {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), + StreamInfo = lists:reverse(Written, [Pos]), + StreamLen = WrittenLen + iolist_size(WriteBin2), + {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final} end, {stop, normal, Result, Stream}. -- cgit v1.2.3