summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAdam Kocoloski <kocolosk@apache.org>2010-04-15 16:34:25 +0000
committerAdam Kocoloski <kocolosk@apache.org>2010-04-15 16:34:25 +0000
commitf3e688373082574d6f469acc282b873658a2321a (patch)
tree2fd8e4e84bb277d5deb5109d5fd498d337aad7df /src
parentfc8069eedf13c10b9b61f527964279fa2085009b (diff)
refactor att compression to allow more encodings. thanks fdmanana. COUCHDB-710
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@934475 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.erl38
-rw-r--r--src/couchdb/couch_db.hrl10
-rw-r--r--src/couchdb/couch_db_updater.erl26
-rw-r--r--src/couchdb/couch_doc.erl91
-rw-r--r--src/couchdb/couch_httpd_db.erl24
-rw-r--r--src/couchdb/couch_rep_reader.erl4
-rw-r--r--src/couchdb/couch_rep_writer.erl2
-rw-r--r--src/couchdb/couch_stream.erl156
8 files changed, 195 insertions, 156 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 622e0ee9..f4a9e352 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -818,14 +818,14 @@ flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
-with_stream(Fd, #att{md5=InMd5,type=Type,comp=AlreadyComp}=Att, Fun) ->
- {ok, OutputStream} = case (not AlreadyComp) andalso
+with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
+ {ok, OutputStream} = case (Enc =:= identity) andalso
couch_util:compressible_att_type(Type) of
true ->
CompLevel = list_to_integer(
couch_config:get("attachments", "compression_level", "0")
),
- couch_stream:open(Fd, CompLevel);
+ couch_stream:open(Fd, gzip, [{compression_level, CompLevel}]);
_ ->
couch_stream:open(Fd)
end,
@@ -841,18 +841,23 @@ with_stream(Fd, #att{md5=InMd5,type=Type,comp=AlreadyComp}=Att, Fun) ->
{StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
couch_stream:close(OutputStream),
check_md5(IdentityMd5, ReqMd5),
- {AttLen, DiskLen} = case AlreadyComp of
- true ->
- {Att#att.att_len, Att#att.disk_len};
- _ ->
- {Len, IdentityLen}
+ {AttLen, DiskLen, NewEnc} = case Enc of
+ identity ->
+ case {Md5, IdentityMd5} of
+ {Same, Same} ->
+ {Len, IdentityLen, identity};
+ _ ->
+ {Len, IdentityLen, gzip}
+ end;
+ gzip ->
+ {Att#att.att_len, Att#att.disk_len, Enc}
end,
Att#att{
data={Fd,StreamInfo},
att_len=AttLen,
disk_len=DiskLen,
md5=Md5,
- comp=(AlreadyComp orelse (IdentityMd5 =/= Md5))
+ encoding=NewEnc
}.
@@ -1087,7 +1092,7 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
{ok, {BodyData0, Atts0}} = read_doc(Db, Bp),
{BodyData0,
lists:map(
- fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Comp}) ->
+ fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
#att{name=Name,
type=Type,
att_len=AttLen,
@@ -1095,7 +1100,18 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
md5=Md5,
revpos=RevPos,
data={Fd,Sp},
- comp=Comp};
+ encoding=
+ case Enc of
+ true ->
+ % 0110 UPGRADE CODE
+ gzip;
+ false ->
+ % 0110 UPGRADE CODE
+ identity;
+ _ ->
+ Enc
+ end
+ };
({Name,Type,Sp,AttLen,RevPos,Md5}) ->
#att{name=Name,
type=Type,
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 09590bc4..5bc6ebaa 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -102,12 +102,16 @@
name,
type,
att_len,
- disk_len, % length of the attachment in uncompressed form
- % differs from at_len when comp =:= true
+ disk_len, % length of the attachment in its identity form
+ % (that is, without a content encoding applied to it)
+ % differs from att_len when encoding /= identity
md5= <<>>,
revpos=0,
data,
- comp=false % gzip compression Y/N
+ encoding=identity % currently supported values are:
+ % identity, gzip
+ % additional values to support in the future:
+ % deflate, compress
}).
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index fdd79481..ecd7bd65 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -437,9 +437,9 @@ flush_trees(#db{fd=Fd,header=Header}=Db,
case Atts of
[] -> [];
[#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd ->
- [{N,T,P,AL,DL,R,M,C}
+ [{N,T,P,AL,DL,R,M,E}
|| #att{name=N,type=T,data={_,P},md5=M,revpos=R,
- att_len=AL,disk_len=DL,comp=C}
+ att_len=AL,disk_len=DL,encoding=E}
<- Atts];
_ ->
% BinFd must not equal our Fd. This can happen when a database
@@ -709,27 +709,37 @@ copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) ->
% 09 UPGRADE CODE
{NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
couch_stream:old_copy_to_new_stream(SrcFd, BinSp, AttLen, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, false};
+ {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, identity};
({Name, {Type, BinSp, AttLen}}) ->
% 09 UPGRADE CODE
{NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, false};
+ {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, identity};
({Name, Type, BinSp, AttLen, _RevPos, <<>>}) when
is_tuple(BinSp) orelse BinSp == null ->
% 09 UPGRADE CODE
{NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
couch_stream:old_copy_to_new_stream(SrcFd, BinSp, AttLen, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, AttLen, Md5, false};
+ {Name, Type, NewBinSp, AttLen, AttLen, AttLen, Md5, identity};
({Name, Type, BinSp, AttLen, RevPos, Md5}) ->
% 010 UPGRADE CODE
{NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} =
couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, false};
- ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Comp}) ->
+ {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, identity};
+ ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Enc1}) ->
{NewBinSp, AttLen, _, Md5, _IdentityMd5} =
couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
- {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Comp}
+ Enc = case Enc1 of
+ true ->
+ % 0110 UPGRADE CODE
+ gzip;
+ false ->
+ % 0110 UPGRADE CODE
+ identity;
+ _ ->
+ Enc1
+ end,
+ {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Enc}
end, BinInfos),
{BodyData, NewBinInfos}.
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 71d21dda..87602c23 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -13,7 +13,7 @@
-module(couch_doc).
-export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]).
--export([att_foldl/3,att_foldl_unzip/3,get_validate_doc_fun/1]).
+-export([att_foldl/3,att_foldl_decode/3,get_validate_doc_fun/1]).
-export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]).
-export([validate_docid/1]).
-export([doc_from_multi_part_stream/2]).
@@ -85,14 +85,14 @@ to_json_attachments(Attachments, Options) ->
Attachments,
RevPos,
lists:member(follows, Options),
- lists:member(att_gzip_length, Options)
+ lists:member(att_encoding_info, Options)
).
-to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowGzipLen) ->
+to_json_attachments([], _RevPosIncludeAfter, _DataToFollow, _ShowEncInfo) ->
[];
-to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) ->
+to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowEncInfo) ->
AttProps = lists:map(
- fun(#att{disk_len=DiskLen, att_len=AttLen, comp=Comp}=Att) ->
+ fun(#att{disk_len=DiskLen, att_len=AttLen, encoding=Enc}=Att) ->
{Att#att.name, {[
{<<"content_type">>, Att#att.type},
{<<"revpos">>, Att#att.revpos}
@@ -101,10 +101,10 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) ->
if DataToFollow ->
[{<<"length">>, DiskLen}, {<<"follows">>, true}];
true ->
- AttData = case Comp of
- true ->
+ AttData = case Enc of
+ gzip ->
zlib:gunzip(att_to_bin(Att));
- _ ->
+ identity ->
att_to_bin(Att)
end,
[{<<"data">>, base64:encode(AttData)}]
@@ -112,11 +112,16 @@ to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow, ShowGzipLen) ->
true ->
[{<<"length">>, DiskLen}, {<<"stub">>, true}]
end ++
- case {ShowGzipLen, Comp} of
- {true, true} ->
- [{<<"gzip_length">>, AttLen}];
- _ ->
- []
+ case {ShowEncInfo, Enc} of
+ {false, _} ->
+ [];
+ {true, identity} ->
+ [];
+ {true, _} ->
+ [
+ {<<"encoding">>, couch_util:to_binary(Enc)},
+ {<<"encoded_length">>, AttLen}
+ ]
end
}}
end, Atts),
@@ -202,18 +207,20 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) ->
true ->
Type = proplists:get_value(<<"content_type">>, BinProps),
RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
- {AttLen, DiskLen, Comp} = att_lengths(BinProps),
- #att{name=Name, data=stub, type=Type, att_len=AttLen,
- disk_len=DiskLen, comp=Comp, revpos=RevPos};
+ DiskLen = proplists:get_value(<<"length">>, BinProps),
+ {Enc, EncLen} = att_encoding_info(BinProps),
+ #att{name=Name, data=stub, type=Type, att_len=EncLen,
+ disk_len=DiskLen, encoding=Enc, revpos=RevPos};
_ ->
Type = proplists:get_value(<<"content_type">>, BinProps,
?DEFAULT_ATTACHMENT_CONTENT_TYPE),
RevPos = proplists:get_value(<<"revpos">>, BinProps, 0),
case proplists:get_value(<<"follows">>, BinProps) of
true ->
- {AttLen, DiskLen, Comp} = att_lengths(BinProps),
- #att{name=Name, data=follows, type=Type, comp=Comp,
- att_len=AttLen, disk_len=DiskLen, revpos=RevPos};
+ DiskLen = proplists:get_value(<<"length">>, BinProps),
+ {Enc, EncLen} = att_encoding_info(BinProps),
+ #att{name=Name, data=follows, type=Type, encoding=Enc,
+ att_len=EncLen, disk_len=DiskLen, revpos=RevPos};
_ ->
Value = proplists:get_value(<<"data">>, BinProps),
Bin = base64:decode(Value),
@@ -261,14 +268,14 @@ transfer_fields([{<<"_",Name/binary>>, _} | _], _) ->
transfer_fields([Field | Rest], #doc{body=Fields}=Doc) ->
transfer_fields(Rest, Doc#doc{body=[Field|Fields]}).
-att_lengths(BinProps) ->
+att_encoding_info(BinProps) ->
DiskLen = proplists:get_value(<<"length">>, BinProps),
- GzipLen = proplists:get_value(<<"gzip_length">>, BinProps),
- case GzipLen of
+ case proplists:get_value(<<"encoding">>, BinProps) of
undefined ->
- {DiskLen, DiskLen, false};
- _ ->
- {GzipLen, DiskLen, true}
+ {identity, DiskLen};
+ Enc ->
+ EncodedLen = proplists:get_value(<<"encoded_length">>, BinProps, DiskLen),
+ {list_to_atom(?b2l(Enc)), EncodedLen}
end.
to_doc_info(FullDocInfo) ->
@@ -308,8 +315,8 @@ att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) ->
fold_streamed_data(DataFun, Len, Fun, Acc).
-att_foldl_unzip(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) ->
- couch_stream:foldl_unzip(Fd, Sp, Md5, Fun, Acc).
+att_foldl_decode(#att{data={Fd,Sp},md5=Md5,encoding=Enc}, Fun, Acc) ->
+ couch_stream:foldl_decode(Fd, Sp, Md5, Enc, Fun, Acc).
att_to_bin(#att{data=Bin}) when is_binary(Bin) ->
Bin;
@@ -377,7 +384,7 @@ fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos,
- SendGzipAtts) ->
+ SendEncodedAtts) ->
2 + % "--"
size(Boundary) +
36 + % "\r\ncontent-type: application/json\r\n\r\n"
@@ -388,7 +395,7 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos,
if RevPos > AttsSinceRevPos ->
AccAttsSize +
4 + % "\r\n\r\n"
- case SendGzipAtts of
+ case SendEncodedAtts of
true ->
Att#att.att_len;
_ ->
@@ -403,31 +410,29 @@ len_doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos,
2. % "--"
doc_to_multi_part_stream(Boundary, JsonBytes, Atts, AttsSinceRevPos, WriteFun,
- SendGzipAtts) ->
+ SendEncodedAtts) ->
WriteFun([<<"--", Boundary/binary,
"\r\ncontent-type: application/json\r\n\r\n">>,
JsonBytes, <<"\r\n--", Boundary/binary>>]),
- atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts).
+ atts_to_mp(Atts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts).
-atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendGzipAtts) ->
+atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos, _SendEncAtts) ->
WriteFun(<<"--">>);
atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun,
- AttsSinceRevPos, SendGzipAtts) when RevPos > AttsSinceRevPos ->
+ AttsSinceRevPos, SendEncodedAtts) when RevPos > AttsSinceRevPos ->
WriteFun(<<"\r\n\r\n">>),
- AttFun = case {Att#att.comp, SendGzipAtts} of
- {true, false} ->
- fun att_foldl_unzip/3;
- _ ->
- % receiver knows that the attachment is compressed by checking that the
- % "gzip_length" field is present in the corresponding JSON attachment
- % object found within the JSON doc
+ AttFun = case SendEncodedAtts of
+ false ->
+ fun att_foldl_decode/3;
+ true ->
fun att_foldl/3
end,
AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok),
WriteFun(<<"\r\n--", Boundary/binary>>),
- atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts);
-atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts) ->
- atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendGzipAtts).
+ atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts);
+atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos,
+ SendEncodedAtts) ->
+ atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos, SendEncodedAtts).
doc_from_multi_part_stream(ContentType, DataFun) ->
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 463519a8..1e11e0d3 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -853,26 +853,26 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) ->
case [A || A <- Atts, A#att.name == FileName] of
[] ->
throw({not_found, "Document is missing attachment"});
- [#att{type=Type, comp=Comp}=Att] ->
+ [#att{type=Type, encoding=Enc}=Att] ->
Etag = couch_httpd:doc_etag(Doc),
- ReqAcceptsGzip = lists:member(
- "gzip",
+ ReqAcceptsAttEnc = lists:member(
+ atom_to_list(Enc),
couch_httpd:accepted_encodings(Req)
),
Headers = [
{"ETag", Etag},
{"Cache-Control", "must-revalidate"},
{"Content-Type", binary_to_list(Type)}
- ] ++ case {Comp, ReqAcceptsGzip} of
- {true, true} ->
- [{"Content-Encoding", "gzip"}];
+ ] ++ case ReqAcceptsAttEnc of
+ true ->
+ [{"Content-Encoding", atom_to_list(Enc)}];
_ ->
[]
end,
- AttFun = case {Comp, ReqAcceptsGzip} of
- {true, false} ->
- fun couch_doc:att_foldl_unzip/3;
- _ ->
+ AttFun = case ReqAcceptsAttEnc of
+ false ->
+ fun couch_doc:att_foldl_decode/3;
+ true ->
fun couch_doc:att_foldl/3
end,
couch_httpd:etag_respond(
@@ -1045,8 +1045,8 @@ parse_doc_query(Req) ->
Args#doc_query_args{update_type=replicated_changes};
{"new_edits", "true"} ->
Args#doc_query_args{update_type=interactive_edit};
- {"att_gzip_length", "true"} ->
- Options = [att_gzip_length | Args#doc_query_args.options],
+ {"att_encoding_info", "true"} ->
+ Options = [att_encoding_info | Args#doc_query_args.options],
Args#doc_query_args{options=Options};
_Else -> % unknown key value pair, ignore.
Args
diff --git a/src/couchdb/couch_rep_reader.erl b/src/couchdb/couch_rep_reader.erl
index bd807eab..b01c2ada 100644
--- a/src/couchdb/couch_rep_reader.erl
+++ b/src/couchdb/couch_rep_reader.erl
@@ -227,7 +227,7 @@ update_sequence_lists(Seq, State) ->
open_doc_revs(#http_db{} = DbS, DocId, Revs) ->
%% all this logic just splits up revision lists that are too long for
%% MochiWeb into multiple requests
- BaseQS = [{revs,true}, {latest,true}, {att_gzip_length,true}],
+ BaseQS = [{revs,true}, {latest,true}, {att_encoding_info,true}],
BaseReq = DbS#http_db{resource=url_encode(DocId), qs=BaseQS},
BaseLength = length(couch_rep_httpc:full_url(BaseReq)) + 11, % &open_revs=
@@ -252,7 +252,7 @@ open_doc(#http_db{} = DbS, DocId) ->
% get latest rev of the doc
Req = DbS#http_db{
resource=url_encode(DocId),
- qs=[{att_gzip_length, true}]
+ qs=[{att_encoding_info, true}]
},
case couch_rep_httpc:request(Req) of
{[{<<"error">>,<<"not_found">>}, {<<"reason">>,<<"missing">>}]} ->
diff --git a/src/couchdb/couch_rep_writer.erl b/src/couchdb/couch_rep_writer.erl
index cf01c576..9577e9ae 100644
--- a/src/couchdb/couch_rep_writer.erl
+++ b/src/couchdb/couch_rep_writer.erl
@@ -90,7 +90,7 @@ write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) ->
JsonBytes = ?JSON_ENCODE(
couch_doc:to_json_obj(
Doc,
- [follows, att_gzip_length, {atts_after_revpos, 0}]
+ [follows, att_encoding_info, {atts_after_revpos, 0}]
)
),
Boundary = couch_uuids:random(),
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index cdbbe552..cc521241 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -24,7 +24,7 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
--export([open/1, open/2, close/1, write/2, foldl/4, foldl/5, foldl_unzip/5,
+-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6,
old_foldl/5,old_copy_to_new_stream/4]).
-export([copy_to_new_stream/3,old_read_term/2]).
-export([init/1, terminate/2, handle_call/3]).
@@ -44,17 +44,18 @@
% needed for the attachment upload integrity check (ticket 558)
identity_md5,
identity_len = 0,
- zstream
+ encoding_fun,
+ end_encoding_fun
}).
%%% Interface functions %%%
open(Fd) ->
- open(Fd, 0).
+ open(Fd, identity, []).
-open(Fd, CompressionLevel) ->
- gen_server:start_link(couch_stream, {Fd, CompressionLevel}, []).
+open(Fd, Encoding, Options) ->
+ gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
@@ -90,30 +91,22 @@ foldl(Fd, [Pos|Rest], Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
-foldl_unzip(Fd, PosList, Fun, Acc) ->
- Z = unzip_init(),
- Result = do_foldl_unzip(Z, Fd, PosList, Fun, Acc),
- unzip_end(Z),
- Result.
-
-do_foldl_unzip(_Z, _Fd, [], _Fun, Acc) ->
- Acc;
-do_foldl_unzip(Z, Fd, [Pos|Rest], Fun, Acc) ->
- {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
- Bin = zlib:inflate(Z, BinZip),
- do_foldl_unzip(Z, Fd, Rest, Fun, Fun(Bin, Acc)).
-
foldl(Fd, PosList, <<>>, Fun, Acc) ->
foldl(Fd, PosList, Fun, Acc);
foldl(Fd, PosList, Md5, Fun, Acc) ->
foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc).
-foldl_unzip(Fd, PosList, <<>>, Fun, Acc) ->
- foldl_unzip(Fd, PosList, Fun, Acc);
-foldl_unzip(Fd, PosList, Md5, Fun, Acc) ->
- Z = unzip_init(),
- Result = foldl_unzip(Z, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc),
- unzip_end(Z),
+foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+ {DecDataFun, DecEndFun} = case Enc of
+ gzip ->
+ ungzip_init();
+ identity ->
+ identity_enc_dec_funs()
+ end,
+ Result = foldl_decode(
+ DecDataFun, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc
+ ),
+ DecEndFun(),
Result.
foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
@@ -127,41 +120,60 @@ foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
-foldl_unzip(_Z, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = erlang:md5_final(Md5Acc),
Acc;
-foldl_unzip(Z, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
- Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, BinZip)),
- Bin = zlib:inflate(Z, BinZip),
+foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, EncBin)),
+ Bin = DecFun(EncBin),
Fun(Bin, Acc);
-foldl_unzip(Z, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
- Bin = zlib:inflate(Z, BinZip),
- Md5Acc2 = erlang:md5_update(Md5Acc, BinZip),
- foldl_unzip(Z, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Bin = DecFun(EncBin),
+ Md5Acc2 = erlang:md5_update(Md5Acc, EncBin),
+ foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+gzip_init(Options) ->
+ case proplists:get_value(compression_level, Options, 0) of
+ Lvl when Lvl >= 1 andalso Lvl =< 9 ->
+ Z = zlib:open(),
+ % 15 = ?MAX_WBITS (defined in the zlib module)
+ % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
+ ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
+ {
+ fun(Data) ->
+ zlib:deflate(Z, Data)
+ end,
+ fun() ->
+ Last = zlib:deflate(Z, [], finish),
+ ok = zlib:deflateEnd(Z),
+ ok = zlib:close(Z),
+ Last
+ end
+ };
+ _ ->
+ identity_enc_dec_funs()
+ end.
-zip_init(CompressionLevel) ->
- Z = zlib:open(),
- % 15 = ?MAX_WBITS (defined in the zlib module)
- % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
- zlib:deflateInit(Z, CompressionLevel, deflated, 16 + 15, 8, default),
- Z.
-
-zip_end(Z, Data) ->
- Last = zlib:deflate(Z, Data, finish),
- zlib:deflateEnd(Z),
- zlib:close(Z),
- Last.
-
-unzip_init() ->
+ungzip_init() ->
Z = zlib:open(),
zlib:inflateInit(Z, 16 + 15),
- Z.
+ {
+ fun(Data) ->
+ zlib:inflate(Z, Data)
+ end,
+ fun() ->
+ ok = zlib:inflateEnd(Z),
+ ok = zlib:close(Z)
+ end
+ }.
-unzip_end(Z) ->
- zlib:inflateEnd(Z),
- zlib:close(Z).
+identity_enc_dec_funs() ->
+ {
+ fun(Data) -> Data end,
+ fun() -> [] end
+ }.
write(_Pid, <<>>) ->
ok;
@@ -169,18 +181,19 @@ write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
-init({Fd, CompressionLevel}) ->
- Z = case CompressionLevel >= 1 andalso CompressionLevel =< 9 of
- true ->
- zip_init(CompressionLevel);
- _ ->
- undefined
+init({Fd, Encoding, Options}) ->
+ {EncodingFun, EndEncodingFun} = case Encoding of
+ identity ->
+ identity_enc_dec_funs();
+ gzip ->
+ gzip_init(Options)
end,
{ok, #stream{
fd=Fd,
md5=erlang:md5_init(),
identity_md5=erlang:md5_init(),
- zstream=Z
+ encoding_fun=EncodingFun,
+ end_encoding_fun=EndEncodingFun
}
}.
@@ -199,23 +212,18 @@ handle_call({write, Bin}, _From, Stream) ->
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
- zstream = Z} = Stream,
+ encoding_fun = EncodingFun} = Stream,
if BinSize + BufferLen > Max ->
WriteBin = lists:reverse(Buffer, [Bin]),
IdenMd5_2 = erlang:md5_update(IdenMd5, WriteBin),
- WriteBin2 = case Z of
- undefined ->
- WriteBin;
- _ ->
- zlib:deflate(Z, WriteBin)
- end,
- case WriteBin2 of
+ case EncodingFun(WriteBin) of
[] ->
- % case where zlib did some internal buffering
+ % case where the encoder did some internal buffering
+ % (zlib does it for example)
WrittenLen2 = WrittenLen,
Md5_2 = Md5,
Written2 = Written;
- _ ->
+ WriteBin2 ->
{ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
Md5_2 = erlang:md5_update(Md5, WriteBin2),
@@ -245,16 +253,12 @@ handle_call(close, _From, Stream) ->
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
- zstream = Z} = Stream,
+ encoding_fun = EncodingFun,
+ end_encoding_fun = EndEncodingFun} = Stream,
WriteBin = lists:reverse(Buffer),
IdenMd5Final = erlang:md5_final(erlang:md5_update(IdenMd5, WriteBin)),
- WriteBin2 = case Z of
- undefined ->
- WriteBin;
- _ ->
- zip_end(Z, WriteBin)
- end,
+ WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->