From f3e688373082574d6f469acc282b873658a2321a Mon Sep 17 00:00:00 2001 From: Adam Kocoloski Date: Thu, 15 Apr 2010 16:34:25 +0000 Subject: refactor att compression to allow more encodings. thanks fdmanana. COUCHDB-710 git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@934475 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 38 +++++++--- src/couchdb/couch_db.hrl | 10 ++- src/couchdb/couch_db_updater.erl | 26 +++++-- src/couchdb/couch_doc.erl | 91 ++++++++++++----------- src/couchdb/couch_httpd_db.erl | 24 +++--- src/couchdb/couch_rep_reader.erl | 4 +- src/couchdb/couch_rep_writer.erl | 2 +- src/couchdb/couch_stream.erl | 156 ++++++++++++++++++++------------------- 8 files changed, 195 insertions(+), 156 deletions(-) (limited to 'src') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 622e0ee9..f4a9e352 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -818,14 +818,14 @@ flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) -> % is present in the request, but there is no Content-MD5 % trailer, we're free to ignore this inconsistency and % pretend that no Content-MD5 exists. -with_stream(Fd, #att{md5=InMd5,type=Type,comp=AlreadyComp}=Att, Fun) -> - {ok, OutputStream} = case (not AlreadyComp) andalso +with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) -> + {ok, OutputStream} = case (Enc =:= identity) andalso couch_util:compressible_att_type(Type) of true -> CompLevel = list_to_integer( couch_config:get("attachments", "compression_level", "0") ), - couch_stream:open(Fd, CompLevel); + couch_stream:open(Fd, gzip, [{compression_level, CompLevel}]); _ -> couch_stream:open(Fd) end, @@ -841,18 +841,23 @@ with_stream(Fd, #att{md5=InMd5,type=Type,comp=AlreadyComp}=Att, Fun) -> {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} = couch_stream:close(OutputStream), check_md5(IdentityMd5, ReqMd5), - {AttLen, DiskLen} = case AlreadyComp of - true -> - {Att#att.att_len, Att#att.disk_len}; - _ -> - {Len, IdentityLen} + {AttLen, DiskLen, NewEnc} = case Enc of + identity -> + case {Md5, IdentityMd5} of + {Same, Same} -> + {Len, IdentityLen, identity}; + _ -> + {Len, IdentityLen, gzip} + end; + gzip -> + {Att#att.att_len, Att#att.disk_len, Enc} end, Att#att{ data={Fd,StreamInfo}, att_len=AttLen, disk_len=DiskLen, md5=Md5, - comp=(AlreadyComp orelse (IdentityMd5 =/= Md5)) + encoding=NewEnc }. @@ -1087,7 +1092,7 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> {ok, {BodyData0, Atts0}} = read_doc(Db, Bp), {BodyData0, lists:map( - fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Comp}) -> + fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> #att{name=Name, type=Type, att_len=AttLen, @@ -1095,7 +1100,18 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> md5=Md5, revpos=RevPos, data={Fd,Sp}, - comp=Comp}; + encoding= + case Enc of + true -> + % 0110 UPGRADE CODE + gzip; + false -> + % 0110 UPGRADE CODE + identity; + _ -> + Enc + end + }; ({Name,Type,Sp,AttLen,RevPos,Md5}) -> #att{name=Name, type=Type, diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 09590bc4..5bc6ebaa 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -102,12 +102,16 @@ name, type, att_len, - disk_len, % length of the attachment in uncompressed form - % differs from at_len when comp =:= true + disk_len, % length of the attachment in its identity form + % (that is, without a content encoding applied to it) + % differs from att_len when encoding /= identity md5= <<>>, revpos=0, data, - comp=false % gzip compression Y/N + encoding=identity % currently supported values are: + % identity, gzip + % additional values to support in the future: + % deflate, compress }). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index fdd79481..ecd7bd65 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -437,9 +437,9 @@ flush_trees(#db{fd=Fd,header=Header}=Db, case Atts of [] -> []; [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd -> - [{N,T,P,AL,DL,R,M,C} + [{N,T,P,AL,DL,R,M,E} || #att{name=N,type=T,data={_,P},md5=M,revpos=R, - att_len=AL,disk_len=DL,comp=C} + att_len=AL,disk_len=DL,encoding=E} <- Atts]; _ -> % BinFd must not equal our Fd. This can happen when a database @@ -709,27 +709,37 @@ copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> % 09 UPGRADE CODE {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, AttLen, DestFd), - {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, false}; + {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, identity}; ({Name, {Type, BinSp, AttLen}}) -> % 09 UPGRADE CODE {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, false}; + {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, identity}; ({Name, Type, BinSp, AttLen, _RevPos, <<>>}) when is_tuple(BinSp) orelse BinSp == null -> % 09 UPGRADE CODE {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, AttLen, DestFd), - {Name, Type, NewBinSp, AttLen, AttLen, AttLen, Md5, false}; + {Name, Type, NewBinSp, AttLen, AttLen, AttLen, Md5, identity}; ({Name, Type, BinSp, AttLen, RevPos, Md5}) -> % 010 UPGRADE CODE {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, false}; - ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Comp}) -> + {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, identity}; + ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Enc1}) -> {NewBinSp, AttLen, _, Md5, _IdentityMd5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Comp} + Enc = case Enc1 of + true -> + % 0110 UPGRADE CODE + gzip; + false -> + % 0110 UPGRADE CODE + identity; + _ -> + Enc1 + end, + {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Enc} end, BinInfos), {BodyData, NewBinInfos}. diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index 71d21dda..87602c23 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -13,7 +13,7 @@ -module(couch_doc). -export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]). --export([att_foldl/3,att_foldl_unzip/3,get_validate_doc_fun/1]). +-export([att_foldl/3,att_foldl_decode/3,get_validate_doc_fun/1]). -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -export([validate_docid/1]). -export([doc_from_multi_part_stream/2]). @@ -85,14 +85,14 @@ to_json_attachments(Attachments, Options) -> Attachments, RevPos, lists:member(follows, Options), - lists:member(att_gzip_length, Options) + lists:member(att_encoding_info, Options) ). -to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowGzipLen) -> +to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowEncInfo) -> []; -to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) -> +to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowEncInfo) -> AttProps = lists:map( - fun(#att{disk_len=DiskLen, att_len=AttLen, comp=Comp}=Att) -> + fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) -> {Att#att.name, {[ {<<"content_type">>, Att#att.type}, {<<"revpos">>, Att#att.revpos} @@ -101,10 +101,10 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) -> if DataToFollow -> [{<<"length">>, DiskLen}, {<<"follows">>, true}]; true -> - AttData = case Comp of - true -> + AttData = case Enc of + gzip -> zlib:gunzip(att_to_bin(Att)); - _ -> + identity -> att_to_bin(Att) end, [{<<"data">>, base64:encode(AttData)}] @@ -112,11 +112,16 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) -> true -> [{<<"length">>, DiskLen}, {<<"stub">>, true}] end ++ - case {ShowGzipLen, Comp} of - {true, true} -> - [{<<"gzip_length">>, AttLen}]; - _ -> - [] + case {ShowEncInfo, Enc} of + {false, _} -> + []; + {true, identity} -> + []; + {true, _} -> + [ + {<<"encoding">>, couch_util:to_binary(Enc)}, + {<<"encoded_length">>, AttLen} + ] end }} end, Atts), @@ -202,18 +207,20 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> true -> Type = proplists:get_value(<<"content_type">>, BinProps), RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), - {AttLen, DiskLen, Comp} = att_lengths(BinProps), - #att{name=Name, data=stub, type=Type, att_len=AttLen, - disk_len=DiskLen, comp=Comp, revpos=RevPos}; + DiskLen = proplists:get_value(<<"length">>, BinProps), + {Enc, EncLen} = att_encoding_info(BinProps), + #att{name=Name, data=stub, type=Type, att_len=EncLen, + disk_len=DiskLen, encoding=Enc, revpos=RevPos}; _ -> Type = proplists:get_value(<<"content_type">>, BinProps, ?DEFAULT_ATTACHMENT_CONTENT_TYPE), RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), case proplists:get_value(<<"follows">>, BinProps) of true -> - {AttLen, DiskLen, Comp} = att_lengths(BinProps), - #att{name=Name, data=follows, type=Type, comp=Comp, - att_len=AttLen, disk_len=DiskLen, revpos=RevPos}; + DiskLen = proplists:get_value(<<"length">>, BinProps), + {Enc, EncLen} = att_encoding_info(BinProps), + #att{name=Name, data=follows, type=Type, encoding=Enc, + att_len=EncLen, disk_len=DiskLen, revpos=RevPos}; _ -> Value = proplists:get_value(<<"data">>, BinProps), Bin = base64:decode(Value), @@ -261,14 +268,14 @@ transfer_fields([{<<"_",Name/binary>>, _} | _], _) -> transfer_fields([Field | Rest], #doc{body=Fields}=Doc) -> transfer_fields(Rest, Doc#doc{body=[Field|Fields]}). -att_lengths(BinProps) -> +att_encoding_info(BinProps) -> DiskLen = proplists:get_value(<<"length">>, BinProps), - GzipLen = proplists:get_value(<<"gzip_length">>, BinProps), - case GzipLen of + case proplists:get_value(<<"encoding">>, BinProps) of undefined -> - {DiskLen, DiskLen, false}; - _ -> - {GzipLen, DiskLen, true} + {identity, DiskLen}; + Enc -> + EncodedLen = proplists:get_value(<<"encoded_length">>, BinProps, DiskLen), + {list_to_atom(?b2l(Enc)), EncodedLen} end. to_doc_info(FullDocInfo) -> @@ -308,8 +315,8 @@ att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) -> fold_streamed_data(DataFun, Len, Fun, Acc). -att_foldl_unzip(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> - couch_stream:foldl_unzip(Fd, Sp, Md5, Fun, Acc). +att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) -> + couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc). att_to_bin(#att{data=Bin}) when is_binary(Bin) -> Bin; @@ -377,7 +384,7 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0-> fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc). len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, - SendGzipAtts) -> + SendEncodedAtts) -> 2 + % "--" size(Boundary) + 36 + % "\r\ncontent-type: application/json\r\n\r\n" @@ -388,7 +395,7 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, if RevPos > AttsSinceRevPos -> AccAttsSize + 4 + % "\r\n\r\n" - case SendGzipAtts of + case SendEncodedAtts of true -> Att#att.att_len; _ -> @@ -403,31 +410,29 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, 2. % "--" doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, WriteFun, - SendGzipAtts) -> + SendEncodedAtts) -> WriteFun([<<"--", Boundary/binary, "\r\ncontent-type: application/json\r\n\r\n">>, JsonBytes, <<"\r\n--", Boundary/binary>>]), - atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts). + atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts). -atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendGzipAtts) -> +atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendEncAtts) -> WriteFun(<<"--">>); atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, - AttsSinceRevPos, SendGzipAtts) when RevPos > AttsSinceRevPos -> + AttsSinceRevPos, SendEncodedAtts) when RevPos > AttsSinceRevPos -> WriteFun(<<"\r\n\r\n">>), - AttFun = case {Att#att.comp, SendGzipAtts} of - {true, false} -> - fun att_foldl_unzip/3; - _ -> - % receiver knows that the attachment is compressed by checking that the - % "gzip_length" field is present in the corresponding JSON attachment - % object found within the JSON doc + AttFun = case SendEncodedAtts of + false -> + fun att_foldl_decode/3; + true -> fun att_foldl/3 end, AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok), WriteFun(<<"\r\n--", Boundary/binary>>), - atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts); -atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts) -> - atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts). + atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts); +atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos, + SendEncodedAtts) -> + atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts). doc_from_multi_part_stream(ContentType, DataFun) -> diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index 463519a8..1e11e0d3 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -853,26 +853,26 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> case [A || A <- Atts, A#att.name == FileName] of [] -> throw({not_found, "Document is missing attachment"}); - [#att{type=Type, comp=Comp}=Att] -> + [#att{type=Type, encoding=Enc}=Att] -> Etag = couch_httpd:doc_etag(Doc), - ReqAcceptsGzip = lists:member( - "gzip", + ReqAcceptsAttEnc = lists:member( + atom_to_list(Enc), couch_httpd:accepted_encodings(Req) ), Headers = [ {"ETag", Etag}, {"Cache-Control", "must-revalidate"}, {"Content-Type", binary_to_list(Type)} - ] ++ case {Comp, ReqAcceptsGzip} of - {true, true} -> - [{"Content-Encoding", "gzip"}]; + ] ++ case ReqAcceptsAttEnc of + true -> + [{"Content-Encoding", atom_to_list(Enc)}]; _ -> [] end, - AttFun = case {Comp, ReqAcceptsGzip} of - {true, false} -> - fun couch_doc:att_foldl_unzip/3; - _ -> + AttFun = case ReqAcceptsAttEnc of + false -> + fun couch_doc:att_foldl_decode/3; + true -> fun couch_doc:att_foldl/3 end, couch_httpd:etag_respond( @@ -1045,8 +1045,8 @@ parse_doc_query(Req) -> Args#doc_query_args{update_type=replicated_changes}; {"new_edits", "true"} -> Args#doc_query_args{update_type=interactive_edit}; - {"att_gzip_length", "true"} -> - Options = [att_gzip_length | Args#doc_query_args.options], + {"att_encoding_info", "true"} -> + Options = [att_encoding_info | Args#doc_query_args.options], Args#doc_query_args{options=Options}; _Else -> % unknown key value pair, ignore. Args diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl index bd807eab..b01c2ada 100644 --- a/src/couchdb/couch_rep_reader.erl +++ b/src/couchdb/couch_rep_reader.erl @@ -227,7 +227,7 @@ update_sequence_lists(Seq, State) -> open_doc_revs(#http_db{} = DbS, DocId, Revs) -> %% all this logic just splits up revision lists that are too long for %% MochiWeb into multiple requests - BaseQS = [{revs,true}, {latest,true}, {att_gzip_length,true}], + BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}], BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS}, BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs= @@ -252,7 +252,7 @@ open_doc(#http_db{} = DbS, DocId) -> % get latest rev of the doc Req = DbS#http_db{ resource=url_encode(DocId), - qs=[{att_gzip_length, true}] + qs=[{att_encoding_info, true}] }, case couch_rep_httpc:request(Req) of {[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} -> diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl index cf01c576..9577e9ae 100644 --- a/src/couchdb/couch_rep_writer.erl +++ b/src/couchdb/couch_rep_writer.erl @@ -90,7 +90,7 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> JsonBytes = ?JSON_ENCODE( couch_doc:to_json_obj( Doc, - [follows, att_gzip_length, {atts_after_revpos, 0}] + [follows, att_encoding_info, {atts_after_revpos, 0}] ) ), Boundary = couch_uuids:random(), diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index cdbbe552..cc521241 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -24,7 +24,7 @@ -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data --export([open/1, open/2, close/1, write/2, foldl/4, foldl/5, foldl_unzip/5, +-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6, old_foldl/5,old_copy_to_new_stream/4]). -export([copy_to_new_stream/3,old_read_term/2]). -export([init/1, terminate/2, handle_call/3]). @@ -44,17 +44,18 @@ % needed for the attachment upload integrity check (ticket 558) identity_md5, identity_len = 0, - zstream + encoding_fun, + end_encoding_fun }). %%% Interface functions %%% open(Fd) -> - open(Fd, 0). + open(Fd, identity, []). -open(Fd, CompressionLevel) -> - gen_server:start_link(couch_stream, {Fd, CompressionLevel}, []). +open(Fd, Encoding, Options) -> + gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []). close(Pid) -> gen_server:call(Pid, close, infinity). @@ -90,30 +91,22 @@ foldl(Fd, [Pos|Rest], Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Fun, Fun(Bin, Acc)). -foldl_unzip(Fd, PosList, Fun, Acc) -> - Z = unzip_init(), - Result = do_foldl_unzip(Z, Fd, PosList, Fun, Acc), - unzip_end(Z), - Result. - -do_foldl_unzip(_Z, _Fd, [], _Fun, Acc) -> - Acc; -do_foldl_unzip(Z, Fd, [Pos|Rest], Fun, Acc) -> - {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), - Bin = zlib:inflate(Z, BinZip), - do_foldl_unzip(Z, Fd, Rest, Fun, Fun(Bin, Acc)). - foldl(Fd, PosList, <<>>, Fun, Acc) -> foldl(Fd, PosList, Fun, Acc); foldl(Fd, PosList, Md5, Fun, Acc) -> foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc). -foldl_unzip(Fd, PosList, <<>>, Fun, Acc) -> - foldl_unzip(Fd, PosList, Fun, Acc); -foldl_unzip(Fd, PosList, Md5, Fun, Acc) -> - Z = unzip_init(), - Result = foldl_unzip(Z, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc), - unzip_end(Z), +foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) -> + {DecDataFun, DecEndFun} = case Enc of + gzip -> + ungzip_init(); + identity -> + identity_enc_dec_funs() + end, + Result = foldl_decode( + DecDataFun, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc + ), + DecEndFun(), Result. foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> @@ -127,41 +120,60 @@ foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). -foldl_unzip(_Z, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> +foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> Md5 = erlang:md5_final(Md5Acc), Acc; -foldl_unzip(Z, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), - Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, BinZip)), - Bin = zlib:inflate(Z, BinZip), +foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> + {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), + Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, EncBin)), + Bin = DecFun(EncBin), Fun(Bin, Acc); -foldl_unzip(Z, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), - Bin = zlib:inflate(Z, BinZip), - Md5Acc2 = erlang:md5_update(Md5Acc, BinZip), - foldl_unzip(Z, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). +foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> + {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), + Bin = DecFun(EncBin), + Md5Acc2 = erlang:md5_update(Md5Acc, EncBin), + foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). + +gzip_init(Options) -> + case proplists:get_value(compression_level, Options, 0) of + Lvl when Lvl >= 1 andalso Lvl =< 9 -> + Z = zlib:open(), + % 15 = ?MAX_WBITS (defined in the zlib module) + % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 + ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default), + { + fun(Data) -> + zlib:deflate(Z, Data) + end, + fun() -> + Last = zlib:deflate(Z, [], finish), + ok = zlib:deflateEnd(Z), + ok = zlib:close(Z), + Last + end + }; + _ -> + identity_enc_dec_funs() + end. -zip_init(CompressionLevel) -> - Z = zlib:open(), - % 15 = ?MAX_WBITS (defined in the zlib module) - % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 - zlib:deflateInit(Z, CompressionLevel, deflated, 16 + 15, 8, default), - Z. - -zip_end(Z, Data) -> - Last = zlib:deflate(Z, Data, finish), - zlib:deflateEnd(Z), - zlib:close(Z), - Last. - -unzip_init() -> +ungzip_init() -> Z = zlib:open(), zlib:inflateInit(Z, 16 + 15), - Z. + { + fun(Data) -> + zlib:inflate(Z, Data) + end, + fun() -> + ok = zlib:inflateEnd(Z), + ok = zlib:close(Z) + end + }. -unzip_end(Z) -> - zlib:inflateEnd(Z), - zlib:close(Z). +identity_enc_dec_funs() -> + { + fun(Data) -> Data end, + fun() -> [] end + }. write(_Pid, <<>>) -> ok; @@ -169,18 +181,19 @@ write(Pid, Bin) -> gen_server:call(Pid, {write, Bin}, infinity). -init({Fd, CompressionLevel}) -> - Z = case CompressionLevel >= 1 andalso CompressionLevel =< 9 of - true -> - zip_init(CompressionLevel); - _ -> - undefined +init({Fd, Encoding, Options}) -> + {EncodingFun, EndEncodingFun} = case Encoding of + identity -> + identity_enc_dec_funs(); + gzip -> + gzip_init(Options) end, {ok, #stream{ fd=Fd, md5=erlang:md5_init(), identity_md5=erlang:md5_init(), - zstream=Z + encoding_fun=EncodingFun, + end_encoding_fun=EndEncodingFun } }. @@ -199,23 +212,18 @@ handle_call({write, Bin}, _From, Stream) -> md5 = Md5, identity_md5 = IdenMd5, identity_len = IdenLen, - zstream = Z} = Stream, + encoding_fun = EncodingFun} = Stream, if BinSize + BufferLen > Max -> WriteBin = lists:reverse(Buffer, [Bin]), IdenMd5_2 = erlang:md5_update(IdenMd5, WriteBin), - WriteBin2 = case Z of - undefined -> - WriteBin; - _ -> - zlib:deflate(Z, WriteBin) - end, - case WriteBin2 of + case EncodingFun(WriteBin) of [] -> - % case where zlib did some internal buffering + % case where the encoder did some internal buffering + % (zlib does it for example) WrittenLen2 = WrittenLen, Md5_2 = Md5, Written2 = Written; - _ -> + WriteBin2 -> {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), WrittenLen2 = WrittenLen + iolist_size(WriteBin2), Md5_2 = erlang:md5_update(Md5, WriteBin2), @@ -245,16 +253,12 @@ handle_call(close, _From, Stream) -> md5 = Md5, identity_md5 = IdenMd5, identity_len = IdenLen, - zstream = Z} = Stream, + encoding_fun = EncodingFun, + end_encoding_fun = EndEncodingFun} = Stream, WriteBin = lists:reverse(Buffer), IdenMd5Final = erlang:md5_final(erlang:md5_update(IdenMd5, WriteBin)), - WriteBin2 = case Z of - undefined -> - WriteBin; - _ -> - zip_end(Z, WriteBin) - end, + WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(), Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin2)), Result = case WriteBin2 of [] -> -- cgit v1.2.3