summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_stream.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_stream.erl')
-rw-r--r--src/couchdb/couch_stream.erl156
1 files changed, 80 insertions, 76 deletions
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index cdbbe552..cc521241 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -24,7 +24,7 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
--export([open/1, open/2, close/1, write/2, foldl/4, foldl/5, foldl_unzip/5,
+-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6,
old_foldl/5,old_copy_to_new_stream/4]).
-export([copy_to_new_stream/3,old_read_term/2]).
-export([init/1, terminate/2, handle_call/3]).
@@ -44,17 +44,18 @@
% needed for the attachment upload integrity check (ticket 558)
identity_md5,
identity_len = 0,
- zstream
+ encoding_fun,
+ end_encoding_fun
}).
%%% Interface functions %%%
open(Fd) ->
- open(Fd, 0).
+ open(Fd, identity, []).
-open(Fd, CompressionLevel) ->
- gen_server:start_link(couch_stream, {Fd, CompressionLevel}, []).
+open(Fd, Encoding, Options) ->
+ gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
@@ -90,30 +91,22 @@ foldl(Fd, [Pos|Rest], Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
-foldl_unzip(Fd, PosList, Fun, Acc) ->
- Z = unzip_init(),
- Result = do_foldl_unzip(Z, Fd, PosList, Fun, Acc),
- unzip_end(Z),
- Result.
-
-do_foldl_unzip(_Z, _Fd, [], _Fun, Acc) ->
- Acc;
-do_foldl_unzip(Z, Fd, [Pos|Rest], Fun, Acc) ->
- {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
- Bin = zlib:inflate(Z, BinZip),
- do_foldl_unzip(Z, Fd, Rest, Fun, Fun(Bin, Acc)).
-
foldl(Fd, PosList, <<>>, Fun, Acc) ->
foldl(Fd, PosList, Fun, Acc);
foldl(Fd, PosList, Md5, Fun, Acc) ->
foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc).
-foldl_unzip(Fd, PosList, <<>>, Fun, Acc) ->
- foldl_unzip(Fd, PosList, Fun, Acc);
-foldl_unzip(Fd, PosList, Md5, Fun, Acc) ->
- Z = unzip_init(),
- Result = foldl_unzip(Z, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc),
- unzip_end(Z),
+foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+ {DecDataFun, DecEndFun} = case Enc of
+ gzip ->
+ ungzip_init();
+ identity ->
+ identity_enc_dec_funs()
+ end,
+ Result = foldl_decode(
+ DecDataFun, Fd, PosList, Md5, erlang:md5_init(), Fun, Acc
+ ),
+ DecEndFun(),
Result.
foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
@@ -127,41 +120,60 @@ foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
-foldl_unzip(_Z, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = erlang:md5_final(Md5Acc),
Acc;
-foldl_unzip(Z, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
- {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
- Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, BinZip)),
- Bin = zlib:inflate(Z, BinZip),
+foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, EncBin)),
+ Bin = DecFun(EncBin),
Fun(Bin, Acc);
-foldl_unzip(Z, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
- {ok, BinZip} = couch_file:pread_iolist(Fd, Pos),
- Bin = zlib:inflate(Z, BinZip),
- Md5Acc2 = erlang:md5_update(Md5Acc, BinZip),
- foldl_unzip(Z, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Bin = DecFun(EncBin),
+ Md5Acc2 = erlang:md5_update(Md5Acc, EncBin),
+ foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+gzip_init(Options) ->
+ case proplists:get_value(compression_level, Options, 0) of
+ Lvl when Lvl >= 1 andalso Lvl =< 9 ->
+ Z = zlib:open(),
+ % 15 = ?MAX_WBITS (defined in the zlib module)
+ % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
+ ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
+ {
+ fun(Data) ->
+ zlib:deflate(Z, Data)
+ end,
+ fun() ->
+ Last = zlib:deflate(Z, [], finish),
+ ok = zlib:deflateEnd(Z),
+ ok = zlib:close(Z),
+ Last
+ end
+ };
+ _ ->
+ identity_enc_dec_funs()
+ end.
-zip_init(CompressionLevel) ->
- Z = zlib:open(),
- % 15 = ?MAX_WBITS (defined in the zlib module)
- % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
- zlib:deflateInit(Z, CompressionLevel, deflated, 16 + 15, 8, default),
- Z.
-
-zip_end(Z, Data) ->
- Last = zlib:deflate(Z, Data, finish),
- zlib:deflateEnd(Z),
- zlib:close(Z),
- Last.
-
-unzip_init() ->
+ungzip_init() ->
Z = zlib:open(),
zlib:inflateInit(Z, 16 + 15),
- Z.
+ {
+ fun(Data) ->
+ zlib:inflate(Z, Data)
+ end,
+ fun() ->
+ ok = zlib:inflateEnd(Z),
+ ok = zlib:close(Z)
+ end
+ }.
-unzip_end(Z) ->
- zlib:inflateEnd(Z),
- zlib:close(Z).
+identity_enc_dec_funs() ->
+ {
+ fun(Data) -> Data end,
+ fun() -> [] end
+ }.
write(_Pid, <<>>) ->
ok;
@@ -169,18 +181,19 @@ write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
-init({Fd, CompressionLevel}) ->
- Z = case CompressionLevel >= 1 andalso CompressionLevel =< 9 of
- true ->
- zip_init(CompressionLevel);
- _ ->
- undefined
+init({Fd, Encoding, Options}) ->
+ {EncodingFun, EndEncodingFun} = case Encoding of
+ identity ->
+ identity_enc_dec_funs();
+ gzip ->
+ gzip_init(Options)
end,
{ok, #stream{
fd=Fd,
md5=erlang:md5_init(),
identity_md5=erlang:md5_init(),
- zstream=Z
+ encoding_fun=EncodingFun,
+ end_encoding_fun=EndEncodingFun
}
}.
@@ -199,23 +212,18 @@ handle_call({write, Bin}, _From, Stream) ->
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
- zstream = Z} = Stream,
+ encoding_fun = EncodingFun} = Stream,
if BinSize + BufferLen > Max ->
WriteBin = lists:reverse(Buffer, [Bin]),
IdenMd5_2 = erlang:md5_update(IdenMd5, WriteBin),
- WriteBin2 = case Z of
- undefined ->
- WriteBin;
- _ ->
- zlib:deflate(Z, WriteBin)
- end,
- case WriteBin2 of
+ case EncodingFun(WriteBin) of
[] ->
- % case where zlib did some internal buffering
+ % case where the encoder did some internal buffering
+ % (zlib does it for example)
WrittenLen2 = WrittenLen,
Md5_2 = Md5,
Written2 = Written;
- _ ->
+ WriteBin2 ->
{ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
Md5_2 = erlang:md5_update(Md5, WriteBin2),
@@ -245,16 +253,12 @@ handle_call(close, _From, Stream) ->
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
- zstream = Z} = Stream,
+ encoding_fun = EncodingFun,
+ end_encoding_fun = EndEncodingFun} = Stream,
WriteBin = lists:reverse(Buffer),
IdenMd5Final = erlang:md5_final(erlang:md5_update(IdenMd5, WriteBin)),
- WriteBin2 = case Z of
- undefined ->
- WriteBin;
- _ ->
- zip_end(Z, WriteBin)
- end,
+ WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->