diff options
Diffstat (limited to 'src/couchdb/couch_stream.erl')
-rw-r--r-- | src/couchdb/couch_stream.erl | 156 |
1 files changed, 80 insertions, 76 deletions
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index cdbbe552..cc521241 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -24,7 +24,7 @@ -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data --export([open/1, open/2, close/1, write/2, foldl/4, foldl/5, foldl_unzip/5, +-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6, old_foldl/5,old_copy_to_new_stream/4]). -export([copy_to_new_stream/3,old_read_term/2]). -export([init/1, terminate/2, handle_call/3]). @@ -44,17 +44,18 @@ % needed for the attachment upload integrity check (ticket 558) identity_md5, identity_len = 0, - zstream + encoding_fun, + end_encoding_fun }). %%% Interface functions %%% open(Fd) -> - open(Fd, 0). + open(Fd, identity, []). -open(Fd, CompressionLevel) -> - gen_server:start_link(couch_stream, {Fd, CompressionLevel}, []). +open(Fd, Encoding, Options) -> + gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []). close(Pid) -> gen_server:call(Pid, close, infinity). @@ -90,30 +91,22 @@ foldl(Fd, [Pos|Rest], Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Fun, Fun(Bin, Acc)). -foldl_unzip(Fd, PosList, Fun, Acc) -> - Z = unzip_init(), - Result = do_foldl_unzip(Z, Fd, PosList, Fun, Acc), - unzip_end(Z), - Result. - -do_foldl_unzip(_Z, _Fd, [], _Fun, Acc) -> - Acc; -do_foldl_unzip(Z, Fd, [Pos|Rest], Fun, Acc) -> - {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), - Bin = zlib:inflate(Z, BinZip), - do_foldl_unzip(Z, Fd, Rest, Fun, Fun(Bin, Acc)). - foldl(Fd, PosList, <<>>, Fun, Acc) -> foldl(Fd, PosList, Fun, Acc); foldl(Fd, PosList, Md5, Fun, Acc) -> foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc). -foldl_unzip(Fd, PosList, <<>>, Fun, Acc) -> - foldl_unzip(Fd, PosList, Fun, Acc); -foldl_unzip(Fd, PosList, Md5, Fun, Acc) -> - Z = unzip_init(), - Result = foldl_unzip(Z, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc), - unzip_end(Z), +foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) -> + {DecDataFun, DecEndFun} = case Enc of + gzip -> + ungzip_init(); + identity -> + identity_enc_dec_funs() + end, + Result = foldl_decode( + DecDataFun, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc + ), + DecEndFun(), Result. foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> @@ -127,41 +120,60 @@ foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). -foldl_unzip(_Z, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> +foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> Md5 = erlang:md5_final(Md5Acc), Acc; -foldl_unzip(Z, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), - Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, BinZip)), - Bin = zlib:inflate(Z, BinZip), +foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> + {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), + Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, EncBin)), + Bin = DecFun(EncBin), Fun(Bin, Acc); -foldl_unzip(Z, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), - Bin = zlib:inflate(Z, BinZip), - Md5Acc2 = erlang:md5_update(Md5Acc, BinZip), - foldl_unzip(Z, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). +foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> + {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), + Bin = DecFun(EncBin), + Md5Acc2 = erlang:md5_update(Md5Acc, EncBin), + foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). + +gzip_init(Options) -> + case proplists:get_value(compression_level, Options, 0) of + Lvl when Lvl >= 1 andalso Lvl =< 9 -> + Z = zlib:open(), + % 15 = ?MAX_WBITS (defined in the zlib module) + % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 + ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default), + { + fun(Data) -> + zlib:deflate(Z, Data) + end, + fun() -> + Last = zlib:deflate(Z, [], finish), + ok = zlib:deflateEnd(Z), + ok = zlib:close(Z), + Last + end + }; + _ -> + identity_enc_dec_funs() + end. -zip_init(CompressionLevel) -> - Z = zlib:open(), - % 15 = ?MAX_WBITS (defined in the zlib module) - % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 - zlib:deflateInit(Z, CompressionLevel, deflated, 16 + 15, 8, default), - Z. - -zip_end(Z, Data) -> - Last = zlib:deflate(Z, Data, finish), - zlib:deflateEnd(Z), - zlib:close(Z), - Last. - -unzip_init() -> +ungzip_init() -> Z = zlib:open(), zlib:inflateInit(Z, 16 + 15), - Z. + { + fun(Data) -> + zlib:inflate(Z, Data) + end, + fun() -> + ok = zlib:inflateEnd(Z), + ok = zlib:close(Z) + end + }. -unzip_end(Z) -> - zlib:inflateEnd(Z), - zlib:close(Z). +identity_enc_dec_funs() -> + { + fun(Data) -> Data end, + fun() -> [] end + }. write(_Pid, <<>>) -> ok; @@ -169,18 +181,19 @@ write(Pid, Bin) -> gen_server:call(Pid, {write, Bin}, infinity). -init({Fd, CompressionLevel}) -> - Z = case CompressionLevel >= 1 andalso CompressionLevel =< 9 of - true -> - zip_init(CompressionLevel); - _ -> - undefined +init({Fd, Encoding, Options}) -> + {EncodingFun, EndEncodingFun} = case Encoding of + identity -> + identity_enc_dec_funs(); + gzip -> + gzip_init(Options) end, {ok, #stream{ fd=Fd, md5=erlang:md5_init(), identity_md5=erlang:md5_init(), - zstream=Z + encoding_fun=EncodingFun, + end_encoding_fun=EndEncodingFun } }. @@ -199,23 +212,18 @@ handle_call({write, Bin}, _From, Stream) -> md5 = Md5, identity_md5 = IdenMd5, identity_len = IdenLen, - zstream = Z} = Stream, + encoding_fun = EncodingFun} = Stream, if BinSize + BufferLen > Max -> WriteBin = lists:reverse(Buffer, [Bin]), IdenMd5_2 = erlang:md5_update(IdenMd5, WriteBin), - WriteBin2 = case Z of - undefined -> - WriteBin; - _ -> - zlib:deflate(Z, WriteBin) - end, - case WriteBin2 of + case EncodingFun(WriteBin) of [] -> - % case where zlib did some internal buffering + % case where the encoder did some internal buffering + % (zlib does it for example) WrittenLen2 = WrittenLen, Md5_2 = Md5, Written2 = Written; - _ -> + WriteBin2 -> {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), WrittenLen2 = WrittenLen + iolist_size(WriteBin2), Md5_2 = erlang:md5_update(Md5, WriteBin2), @@ -245,16 +253,12 @@ handle_call(close, _From, Stream) -> md5 = Md5, identity_md5 = IdenMd5, identity_len = IdenLen, - zstream = Z} = Stream, + encoding_fun = EncodingFun, + end_encoding_fun = EndEncodingFun} = Stream, WriteBin = lists:reverse(Buffer), IdenMd5Final = erlang:md5_final(erlang:md5_update(IdenMd5, WriteBin)), - WriteBin2 = case Z of - undefined -> - WriteBin; - _ -> - zip_end(Z, WriteBin) - end, + WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(), Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin2)), Result = case WriteBin2 of [] -> |