diff options
author | John Christopher Anderson <jchris@apache.org> | 2010-01-29 22:43:33 +0000 |
---|---|---|
committer | John Christopher Anderson <jchris@apache.org> | 2010-01-29 22:43:33 +0000 |
commit | ee09a0de9f8356abe24a0ac0f26cdff35f8fa704 (patch) | |
tree | 1cf02264f5ee72216e5add7deda235c1504cf5ec /src/couchdb | |
parent | 5affb01e4ee059ad9b82000625f2bdc989019a16 (diff) |
Allow storing attachments in compressed form. Closes COUCHDB-583. Thanks Filipe Manana
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@904650 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb')
-rw-r--r-- | src/couchdb/couch_config.erl | 24 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 56 | ||||
-rw-r--r-- | src/couchdb/couch_db.hrl | 7 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 39 | ||||
-rw-r--r-- | src/couchdb/couch_doc.erl | 48 | ||||
-rw-r--r-- | src/couchdb/couch_httpd.erl | 17 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_db.erl | 47 | ||||
-rw-r--r-- | src/couchdb/couch_stream.erl | 153 | ||||
-rw-r--r-- | src/couchdb/couch_util.erl | 23 |
9 files changed, 329 insertions, 85 deletions
diff --git a/src/couchdb/couch_config.erl b/src/couchdb/couch_config.erl index d8473e08..1fe5aa0d 100644 --- a/src/couchdb/couch_config.erl +++ b/src/couchdb/couch_config.erl @@ -194,8 +194,28 @@ parse_ini_file(IniFile) -> {AccSectionName, AccValues}; Line2 -> case re:split(Line2, "\s?=\s?", [{return, list}]) of - [_SingleElement] -> % no "=" found, ignore this line - {AccSectionName, AccValues}; + [Value] -> + MultiLineValuePart = case re:run(Line, "^ \\S", []) of + {match, _} -> + true; + _ -> + false + end, + case {MultiLineValuePart, AccValues} of + {true, [{{_, ValueName}, PrevValue} | AccValuesRest]} -> + % remove comment + case re:split(Value, " ;|\t;", [{return, list}]) of + [[]] -> + % empty line + {AccSectionName, AccValues}; + [LineValue | _Rest] -> + E = {{AccSectionName, ValueName}, + PrevValue ++ " " ++ LineValue}, + {AccSectionName, [E | AccValuesRest]} + end; + _ -> + {AccSectionName, AccValues} + end; [""|_LineValues] -> % line begins with "=", ignore {AccSectionName, AccValues}; [ValueName|LineValues] -> % yeehaw, got a line! diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 79e00ff8..f4b024ad 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -694,17 +694,17 @@ flush_att(Fd, #att{data={Fd0, _}}=Att) when Fd0 == Fd -> Att; flush_att(Fd, #att{data={OtherFd,StreamPointer}, md5=InMd5}=Att) -> - {NewStreamData, Len, Md5} = + {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} = couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), - check_md5(Md5, InMd5), - Att#att{data={Fd, NewStreamData}, md5=Md5, len=Len}; + check_md5(IdentityMd5, InMd5), + Att#att{data={Fd, NewStreamData}, md5=Md5, att_len=Len, disk_len=Len}; flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) -> with_stream(Fd, Att, fun(OutputStream) -> couch_stream:write(OutputStream, Data) end); -flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) -> +flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) -> with_stream(Fd, Att, fun(OutputStream) -> % Fun(MaxChunkSize, WriterFun) must call WriterFun % once for each chunk of the attachment, @@ -726,9 +726,9 @@ flush_att(Fd, #att{data=Fun,len=undefined}=Att) when is_function(Fun) -> end, ok) end); -flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) -> +flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) -> with_stream(Fd, Att, fun(OutputStream) -> - write_streamed_attachment(OutputStream, Fun, Len) + write_streamed_attachment(OutputStream, Fun, AttLen) end). % From RFC 2616 3.6.1 - Chunked Transfer Coding @@ -741,8 +741,16 @@ flush_att(Fd, #att{data=Fun,len=Len}=Att) when is_function(Fun) -> % is present in the request, but there is no Content-MD5 % trailer, we're free to ignore this inconsistency and % pretend that no Content-MD5 exists. -with_stream(Fd, #att{md5=InMd5}=Att, Fun) -> - {ok, OutputStream} = couch_stream:open(Fd), +with_stream(Fd, #att{md5=InMd5,type=Type}=Att, Fun) -> + {ok, OutputStream} = case couch_util:compressible_att_type(Type) of + true -> + CompLevel = list_to_integer( + couch_config:get("attachments", "compression_level", "0") + ), + couch_stream:open(Fd, CompLevel); + _ -> + couch_stream:open(Fd) + end, ReqMd5 = case Fun(OutputStream) of {md5, FooterMd5} -> case InMd5 of @@ -752,9 +760,16 @@ with_stream(Fd, #att{md5=InMd5}=Att, Fun) -> _ -> InMd5 end, - {StreamInfo, Len, Md5} = couch_stream:close(OutputStream), - check_md5(Md5, ReqMd5), - Att#att{data={Fd,StreamInfo},len=Len,md5=Md5}. + {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} = + couch_stream:close(OutputStream), + check_md5(IdentityMd5, ReqMd5), + Att#att{ + data={Fd,StreamInfo}, + att_len=Len, + disk_len=IdentityLen, + md5=Md5, + comp=(IdentityMd5 =/= Md5) + }. write_streamed_attachment(_Stream, _F, 0) -> @@ -983,17 +998,28 @@ make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) -> {ok, {BodyData0, Atts0}} = read_doc(Db, Bp), {BodyData0, lists:map( - fun({Name,Type,Sp,Len,RevPos,Md5}) -> + fun({Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Comp}) -> + #att{name=Name, + type=Type, + att_len=AttLen, + disk_len=DiskLen, + md5=Md5, + revpos=RevPos, + data={Fd,Sp}, + comp=Comp}; + ({Name,Type,Sp,AttLen,RevPos,Md5}) -> #att{name=Name, type=Type, - len=Len, + att_len=AttLen, + disk_len=AttLen, md5=Md5, revpos=RevPos, data={Fd,Sp}}; - ({Name,{Type,Sp,Len}}) -> + ({Name,{Type,Sp,AttLen}}) -> #att{name=Name, type=Type, - len=Len, + att_len=AttLen, + disk_len=AttLen, md5= <<>>, revpos=0, data={Fd,Sp}} diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 17917312..31b66edb 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -99,10 +99,13 @@ { name, type, - len, + att_len, + disk_len, % length of the attachment in uncompressed form + % differs from at_len when comp =:= true md5= <<>>, revpos=0, - data + data, + comp=false % gzip compression Y/N }). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 7292221a..723fc11c 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -429,8 +429,9 @@ flush_trees(#db{fd=Fd,header=Header}=Db, case Atts of [] -> []; [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd -> - [{N,T,P,L,R,M} - || #att{name=N,type=T,data={_,P},md5=M,revpos=R,len=L} + [{N,T,P,AL,DL,R,M,C} + || #att{name=N,type=T,data={_,P},md5=M,revpos=R, + att_len=AL,disk_len=DL,comp=C} <- Atts]; _ -> % BinFd must not equal our Fd. This can happen when a database @@ -696,21 +697,31 @@ copy_doc_attachments(#db{fd=SrcFd}=SrcDb, {Pos,_RevId}, SrcSp, DestFd) -> {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcDb, SrcSp), % copy the bin values NewBinInfos = lists:map( - fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null -> + fun({Name, {Type, BinSp, AttLen}}) when is_tuple(BinSp) orelse BinSp == null -> % 09 UPGRADE CODE - {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), - {Name, Type, NewBinSp, Len, Pos, Md5}; - ({Name, {Type, BinSp, Len}}) -> + {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = + couch_stream:old_copy_to_new_stream(SrcFd, BinSp, AttLen, DestFd), + {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, false}; + ({Name, {Type, BinSp, AttLen}}) -> % 09 UPGRADE CODE - {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - {Name, Type, NewBinSp, Len, Pos, Md5}; - ({Name, Type, BinSp, Len, RevPos, <<>>}) when is_tuple(BinSp) orelse BinSp == null -> + {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = + couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, Type, NewBinSp, AttLen, AttLen, Pos, Md5, false}; + ({Name, Type, BinSp, AttLen, _RevPos, <<>>}) when + is_tuple(BinSp) orelse BinSp == null -> % 09 UPGRADE CODE - {NewBinSp, Len, Md5} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), - {Name, Type, NewBinSp, Len, Len, Md5}; - ({Name, Type, BinSp, Len, RevPos, Md5}) -> - {NewBinSp, Len, Md5} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - {Name, Type, NewBinSp, Len, RevPos, Md5} + {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = + couch_stream:old_copy_to_new_stream(SrcFd, BinSp, AttLen, DestFd), + {Name, Type, NewBinSp, AttLen, AttLen, AttLen, Md5, false}; + ({Name, Type, BinSp, AttLen, RevPos, Md5}) -> + % 010 UPGRADE CODE + {NewBinSp, AttLen, AttLen, Md5, _IdentityMd5} = + couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, Type, NewBinSp, AttLen, AttLen, RevPos, Md5, false}; + ({Name, Type, BinSp, AttLen, DiskLen, RevPos, Md5, Comp}) -> + {NewBinSp, AttLen, _, Md5, _IdentityMd5} = + couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, Md5, Comp} end, BinInfos), {BodyData, NewBinInfos}. diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl index ba5c7450..48ed1530 100644 --- a/src/couchdb/couch_doc.erl +++ b/src/couchdb/couch_doc.erl @@ -13,7 +13,7 @@ -module(couch_doc). -export([to_doc_info/1,to_doc_info_path/1,parse_rev/1,parse_revs/1,rev_to_str/1,revs_to_strs/1]). --export([att_foldl/3,get_validate_doc_fun/1]). +-export([att_foldl/3,att_foldl_unzip/3,get_validate_doc_fun/1]). -export([from_json_obj/1,to_json_obj/2,has_stubs/1, merge_stubs/2]). -export([validate_docid/1]). -export([doc_from_multi_part_stream/2]). @@ -87,20 +87,26 @@ to_json_attachments([], _RevPosIncludeAfter, _DataToFollow) -> []; to_json_attachments(Atts, RevPosIncludeAfter, DataToFollow) -> AttProps = lists:map( - fun(#att{len=Len}=Att) -> + fun(#att{disk_len=DiskLen}=Att) -> {Att#att.name, {[ {<<"content_type">>, Att#att.type}, {<<"revpos">>, Att#att.revpos} ] ++ if Att#att.revpos > RevPosIncludeAfter -> if DataToFollow -> - [{<<"length">>, Len}, {<<"follows">>, true}]; + [{<<"length">>, DiskLen}, {<<"follows">>, true}]; true -> + AttData = case Att#att.comp of + true -> + zlib:gunzip(att_to_iolist(Att)); + _ -> + att_to_iolist(Att) + end, [{<<"data">>, - couch_util:encodeBase64(att_to_iolist(Att))}] + couch_util:encodeBase64(AttData)}] end; true -> - [{<<"length">>, Len}, {<<"stub">>, true}] + [{<<"length">>, DiskLen}, {<<"stub">>, true}] end }} end, Atts), @@ -187,21 +193,23 @@ transfer_fields([{<<"_attachments">>, {JsonBins}} | Rest], Doc) -> Type = proplists:get_value(<<"content_type">>, BinProps), Length = proplists:get_value(<<"length">>, BinProps), RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), - #att{name=Name, data=stub, type=Type, len=Length, revpos=RevPos}; + #att{name=Name, data=stub, type=Type, att_len=Length, + disk_len=Length, revpos=RevPos}; _ -> Type = proplists:get_value(<<"content_type">>, BinProps, ?DEFAULT_ATTACHMENT_CONTENT_TYPE), RevPos = proplists:get_value(<<"revpos">>, BinProps, 0), case proplists:get_value(<<"follows">>, BinProps) of true -> + Len = proplists:get_value(<<"length">>, BinProps), #att{name=Name, data=follows, type=Type, - len=proplists:get_value(<<"length">>, BinProps), - revpos=RevPos}; + att_len=Len, disk_len=Len, revpos=RevPos}; _ -> Value = proplists:get_value(<<"data">>, BinProps), Bin = couch_util:decodeBase64(Value), - #att{name=Name, data=Bin, type=Type, len=size(Bin), - revpos=RevPos} + LenBin = size(Bin), + #att{name=Name, data=Bin, type=Type, att_len=LenBin, + disk_len=LenBin, revpos=RevPos} end end end, JsonBins), @@ -272,14 +280,16 @@ to_doc_info_path(#full_doc_info{id=Id,rev_tree=Tree}) -> att_foldl(#att{data=Bin}, Fun, Acc) when is_binary(Bin) -> Fun(Bin, Acc); -att_foldl(#att{data={Fd,Sp},len=Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null -> +att_foldl(#att{data={Fd,Sp},att_len=Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null -> % 09 UPGRADE CODE couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc); att_foldl(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> couch_stream:foldl(Fd, Sp, Md5, Fun, Acc); -att_foldl(#att{data=DataFun,len=Len}, Fun, Acc) when is_function(DataFun) -> +att_foldl(#att{data=DataFun,att_len=Len}, Fun, Acc) when is_function(DataFun) -> fold_streamed_data(DataFun, Len, Fun, Acc). +att_foldl_unzip(#att{data={Fd,Sp},md5=Md5}, Fun, Acc) -> + couch_stream:foldl_unzip(Fd, Sp, Md5, Fun, Acc). att_to_iolist(#att{data=Bin}) when is_binary(Bin) -> Bin; @@ -288,7 +298,7 @@ att_to_iolist(#att{data=Iolist}) when is_list(Iolist) -> att_to_iolist(#att{data={Fd,Sp}}=Att) -> lists:reverse(att_foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, [])); -att_to_iolist(#att{data=DataFun, len=Len}) when is_function(DataFun)-> +att_to_iolist(#att{data=DataFun, att_len=Len}) when is_function(DataFun)-> lists:reverse(fold_streamed_data(DataFun, Len, fun(Data, Acc) -> [Data | Acc] end, [])). @@ -342,11 +352,11 @@ len_doc_to_multi_part_stream(Boundary,JsonBytes,Atts,AttsSinceRevPos) -> iolist_size(JsonBytes) + 4 + % "\r\n--" size(Boundary) + - + lists:foldl(fun(#att{revpos=RevPos,len=Len}, AccAttsSize) -> + + lists:foldl(fun(#att{revpos=RevPos,disk_len=DiskLen}, AccAttsSize) -> if RevPos > AttsSinceRevPos -> AccAttsSize + 4 + % "\r\n\r\n" - Len + + DiskLen + 4 + % "\r\n--" size(Boundary); true -> @@ -366,7 +376,13 @@ atts_to_mp([], _Boundary, WriteFun, _AttsSinceRevPos) -> atts_to_mp([#att{revpos=RevPos} = Att | RestAtts], Boundary, WriteFun, AttsSinceRevPos) when RevPos > AttsSinceRevPos -> WriteFun(<<"\r\n\r\n">>), - att_foldl(Att, fun(Data, ok) -> WriteFun(Data) end, ok), + AttFun = case Att#att.comp of + true -> + fun att_foldl_unzip/3; + _ -> + fun att_foldl/3 + end, + AttFun(Att, fun(Data, ok) -> WriteFun(Data) end, ok), WriteFun(<<"\r\n--", Boundary/binary>>), atts_to_mp(RestAtts, Boundary, WriteFun, AttsSinceRevPos); atts_to_mp([_ | RestAtts], Boundary, WriteFun, AttsSinceRevPos) -> diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index 68478f4d..a36782db 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -25,6 +25,7 @@ -export([start_json_response/2, start_json_response/3, end_json_response/1]). -export([send_response/4,send_method_not_allowed/2,send_error/4, send_redirect/2,send_chunked_error/2]). -export([send_json/2,send_json/3,send_json/4,last_chunk/1,parse_multipart_request/3]). +-export([accepted_encodings/1]). start_link() -> % read config and register for configuration changes @@ -193,6 +194,12 @@ handle_request(MochiReq, DefaultFun, throw:{invalid_json, S} -> ?LOG_ERROR("attempted upload of invalid JSON ~s", [S]), send_error(HttpReq, {bad_request, "invalid UTF-8 JSON"}); + throw:unacceptable_encoding -> + ?LOG_ERROR("unsupported encoding method for the response", []), + send_error(HttpReq, {not_acceptable, "unsupported encoding"}); + throw:bad_accept_encoding_value -> + ?LOG_ERROR("received invalid Accept-Encoding header", []), + send_error(HttpReq, bad_request); exit:normal -> exit(normal); throw:Error -> @@ -261,6 +268,16 @@ header_value(#httpd{mochi_req=MochiReq}, Key, Default) -> primary_header_value(#httpd{mochi_req=MochiReq}, Key) -> MochiReq:get_primary_header_value(Key). +accepted_encodings(#httpd{mochi_req=MochiReq}) -> + case MochiReq:accepted_encodings(["gzip", "identity"]) of + bad_accept_encoding_value -> + throw(bad_accept_encoding_value); + [] -> + throw(unacceptable_encoding); + EncList -> + EncList + end. + serve_file(#httpd{mochi_req=MochiReq}=Req, RelativePath, DocumentRoot) -> serve_file(Req, RelativePath, DocumentRoot, []). diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl index dd13cb59..fd143fa1 100644 --- a/src/couchdb/couch_httpd_db.erl +++ b/src/couchdb/couch_httpd_db.erl @@ -21,7 +21,7 @@ -import(couch_httpd, [send_json/2,send_json/3,send_json/4,send_method_not_allowed/2, start_json_response/2,start_json_response/3, - send_chunk/2,end_json_response/1, + send_chunk/2,last_chunk/1,end_json_response/1, start_chunked_response/3, absolute_uri/2, send/2, start_response_length/4]). @@ -993,20 +993,37 @@ db_attachment_req(#httpd{method='GET'}=Req, Db, DocId, FileNameParts) -> case [A || A <- Atts, A#att.name == FileName] of [] -> throw({not_found, "Document is missing attachment"}); - [#att{type=Type, len=Len}=Att] -> + [#att{type=Type, comp=Comp}=Att] -> Etag = couch_httpd:doc_etag(Doc), - couch_httpd:etag_respond(Req, Etag, fun() -> - {ok, Resp} = start_response_length(Req, 200, [ - {"ETag", Etag}, - {"Cache-Control", "must-revalidate"}, - {"Content-Type", binary_to_list(Type)} - ], integer_to_list(Len)), - couch_doc:att_foldl( - Att, - fun(BinSegment, _) -> send(Resp, BinSegment) end, - {ok, Resp} % Seed in case of 0 byte attachment. - ) - end) + ReqAcceptsGzip = lists:member( + "gzip", + couch_httpd:accepted_encodings(Req) + ), + Headers = [ + {"ETag", Etag}, + {"Cache-Control", "must-revalidate"}, + {"Content-Type", binary_to_list(Type)} + ] ++ case {Comp, ReqAcceptsGzip} of + {true, true} -> + [{"Content-Encoding", "gzip"}]; + _ -> + [] + end, + AttFun = case {Comp, ReqAcceptsGzip} of + {true, false} -> + fun couch_doc:att_foldl_unzip/3; + _ -> + fun couch_doc:att_foldl/3 + end, + couch_httpd:etag_respond( + Req, + Etag, + fun() -> + {ok, Resp} = start_chunked_response(Req, 200, Headers), + AttFun(Att, fun(Seg, _) -> send_chunk(Resp, Seg) end, ok), + last_chunk(Resp) + end + ) end; @@ -1048,7 +1065,7 @@ db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts) Length -> exit({length_not_integer, Length}) end, - len = case couch_httpd:header_value(Req,"Content-Length") of + att_len = case couch_httpd:header_value(Req,"Content-Length") of undefined -> undefined; Length -> diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index 2a873e4c..cdbbe552 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -24,7 +24,7 @@ -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data --export([open/1, close/1, write/2, foldl/4, foldl/5, +-export([open/1, open/2, close/1, write/2, foldl/4, foldl/5, foldl_unzip/5, old_foldl/5,old_copy_to_new_stream/4]). -export([copy_to_new_stream/3,old_read_term/2]). -export([init/1, terminate/2, handle_call/3]). @@ -39,14 +39,22 @@ buffer_len = 0, max_buffer = 4096, written_len = 0, - md5 + md5, + % md5 of the content without any transformation applied (e.g. compression) + % needed for the attachment upload integrity check (ticket 558) + identity_md5, + identity_len = 0, + zstream }). %%% Interface functions %%% open(Fd) -> - gen_server:start_link(couch_stream, Fd, []). + open(Fd, 0). + +open(Fd, CompressionLevel) -> + gen_server:start_link(couch_stream, {Fd, CompressionLevel}, []). close(Pid) -> gen_server:call(Pid, close, infinity). @@ -82,11 +90,31 @@ foldl(Fd, [Pos|Rest], Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Fun, Fun(Bin, Acc)). +foldl_unzip(Fd, PosList, Fun, Acc) -> + Z = unzip_init(), + Result = do_foldl_unzip(Z, Fd, PosList, Fun, Acc), + unzip_end(Z), + Result. + +do_foldl_unzip(_Z, _Fd, [], _Fun, Acc) -> + Acc; +do_foldl_unzip(Z, Fd, [Pos|Rest], Fun, Acc) -> + {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), + Bin = zlib:inflate(Z, BinZip), + do_foldl_unzip(Z, Fd, Rest, Fun, Fun(Bin, Acc)). + foldl(Fd, PosList, <<>>, Fun, Acc) -> foldl(Fd, PosList, Fun, Acc); foldl(Fd, PosList, Md5, Fun, Acc) -> foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc). +foldl_unzip(Fd, PosList, <<>>, Fun, Acc) -> + foldl_unzip(Fd, PosList, Fun, Acc); +foldl_unzip(Fd, PosList, Md5, Fun, Acc) -> + Z = unzip_init(), + Result = foldl_unzip(Z, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc), + unzip_end(Z), + Result. foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> Md5 = erlang:md5_final(Md5Acc), @@ -99,14 +127,62 @@ foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> {ok, Bin} = couch_file:pread_iolist(Fd, Pos), foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). +foldl_unzip(_Z, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> + Md5 = erlang:md5_final(Md5Acc), + Acc; +foldl_unzip(Z, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> + {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), + Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, BinZip)), + Bin = zlib:inflate(Z, BinZip), + Fun(Bin, Acc); +foldl_unzip(Z, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> + {ok, BinZip} = couch_file:pread_iolist(Fd, Pos), + Bin = zlib:inflate(Z, BinZip), + Md5Acc2 = erlang:md5_update(Md5Acc, BinZip), + foldl_unzip(Z, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). + +zip_init(CompressionLevel) -> + Z = zlib:open(), + % 15 = ?MAX_WBITS (defined in the zlib module) + % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 + zlib:deflateInit(Z, CompressionLevel, deflated, 16 + 15, 8, default), + Z. + +zip_end(Z, Data) -> + Last = zlib:deflate(Z, Data, finish), + zlib:deflateEnd(Z), + zlib:close(Z), + Last. + +unzip_init() -> + Z = zlib:open(), + zlib:inflateInit(Z, 16 + 15), + Z. + +unzip_end(Z) -> + zlib:inflateEnd(Z), + zlib:close(Z). + write(_Pid, <<>>) -> ok; write(Pid, Bin) -> gen_server:call(Pid, {write, Bin}, infinity). -init(Fd) -> - {ok, #stream{fd=Fd, md5=erlang:md5_init()}}. +init({Fd, CompressionLevel}) -> + Z = case CompressionLevel >= 1 andalso CompressionLevel =< 9 of + true -> + zip_init(CompressionLevel); + _ -> + undefined + end, + {ok, #stream{ + fd=Fd, + md5=erlang:md5_init(), + identity_md5=erlang:md5_init(), + zstream=Z + } + }. terminate(_Reason, _Stream) -> ok. @@ -120,39 +196,74 @@ handle_call({write, Bin}, _From, Stream) -> buffer_len = BufferLen, buffer_list = Buffer, max_buffer = Max, - md5 = Md5} = Stream, + md5 = Md5, + identity_md5 = IdenMd5, + identity_len = IdenLen, + zstream = Z} = Stream, if BinSize + BufferLen > Max -> WriteBin = lists:reverse(Buffer, [Bin]), - Md5_2 = erlang:md5_update(Md5, WriteBin), - {ok, Pos} = couch_file:append_binary(Fd, WriteBin), + IdenMd5_2 = erlang:md5_update(IdenMd5, WriteBin), + WriteBin2 = case Z of + undefined -> + WriteBin; + _ -> + zlib:deflate(Z, WriteBin) + end, + case WriteBin2 of + [] -> + % case where zlib did some internal buffering + WrittenLen2 = WrittenLen, + Md5_2 = Md5, + Written2 = Written; + _ -> + {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), + WrittenLen2 = WrittenLen + iolist_size(WriteBin2), + Md5_2 = erlang:md5_update(Md5, WriteBin2), + Written2 = [Pos|Written] + end, + {reply, ok, Stream#stream{ - written_len=WrittenLen + BufferLen + BinSize, - written_pointers=[Pos|Written], + written_len=WrittenLen2, + written_pointers=Written2, buffer_list=[], buffer_len=0, - md5=Md5_2}}; + md5=Md5_2, + identity_md5=IdenMd5_2, + identity_len=IdenLen + BinSize}}; true -> {reply, ok, Stream#stream{ buffer_list=[Bin|Buffer], - buffer_len=BufferLen + BinSize}} + buffer_len=BufferLen + BinSize, + identity_len=IdenLen + BinSize}} end; handle_call(close, _From, Stream) -> #stream{ fd = Fd, written_len = WrittenLen, written_pointers = Written, - buffer_len = BufferLen, buffer_list = Buffer, - md5 = Md5} = Stream, - - case Buffer of + md5 = Md5, + identity_md5 = IdenMd5, + identity_len = IdenLen, + zstream = Z} = Stream, + + WriteBin = lists:reverse(Buffer), + IdenMd5Final = erlang:md5_final(erlang:md5_update(IdenMd5, WriteBin)), + WriteBin2 = case Z of + undefined -> + WriteBin; + _ -> + zip_end(Z, WriteBin) + end, + Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin2)), + Result = case WriteBin2 of [] -> - Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)}; + {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; _ -> - WriteBin = lists:reverse(Buffer), - Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)), - {ok, Pos} = couch_file:append_binary(Fd, WriteBin), - Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final} + {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), + StreamInfo = lists:reverse(Written, [Pos]), + StreamLen = WrittenLen + iolist_size(WriteBin2), + {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final} end, {stop, normal, Result, Stream}. diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl index 45d1d336..ca36a6db 100644 --- a/src/couchdb/couch_util.erl +++ b/src/couchdb/couch_util.erl @@ -22,6 +22,7 @@ -export([to_binary/1, to_integer/1, to_list/1, url_encode/1]). -export([json_encode/1, json_decode/1]). -export([verify/2]). +-export([compressible_att_type/1]). -include("couch_db.hrl"). -include_lib("kernel/include/file.hrl"). @@ -440,3 +441,25 @@ verify(X, Y) when is_list(X) and is_list(Y) -> false end; verify(_X, _Y) -> false. + +compressible_att_type(MimeType) when is_binary(MimeType) -> + compressible_att_type(?b2l(MimeType)); +compressible_att_type(MimeType) -> + TypeExpList = re:split( + couch_config:get("attachments", "compressible_types", ""), + "\\s+", + [{return, list}] + ), + lists:any( + fun(TypeExp) -> + Regexp = "^\\s*" ++ + re:replace(TypeExp, "\\*", ".*", [{return, list}]) ++ "\\s*$", + case re:run(MimeType, Regexp, [caseless]) of + {match, _} -> + true; + _ -> + false + end + end, + [T || T <- TypeExpList, T /= []] + ). |