summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_stream.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_stream.erl')
-rw-r--r--src/couchdb/couch_stream.erl167
1 files changed, 141 insertions, 26 deletions
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index 2a873e4c..04c17770 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -24,7 +24,7 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
--export([open/1, close/1, write/2, foldl/4, foldl/5,
+-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6,
old_foldl/5,old_copy_to_new_stream/4]).
-export([copy_to_new_stream/3,old_read_term/2]).
-export([init/1, terminate/2, handle_call/3]).
@@ -39,14 +39,23 @@
buffer_len = 0,
max_buffer = 4096,
written_len = 0,
- md5
+ md5,
+ % md5 of the content without any transformation applied (e.g. compression)
+ % needed for the attachment upload integrity check (ticket 558)
+ identity_md5,
+ identity_len = 0,
+ encoding_fun,
+ end_encoding_fun
}).
%%% Interface functions %%%
open(Fd) ->
- gen_server:start_link(couch_stream, Fd, []).
+ open(Fd, identity, []).
+
+open(Fd, Encoding, Options) ->
+ gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
@@ -85,19 +94,86 @@ foldl(Fd, [Pos|Rest], Fun, Acc) ->
foldl(Fd, PosList, <<>>, Fun, Acc) ->
foldl(Fd, PosList, Fun, Acc);
foldl(Fd, PosList, Md5, Fun, Acc) ->
- foldl(Fd, PosList, Md5, erlang:md5_init(), Fun, Acc).
-
+ foldl(Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc).
+
+foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+ {DecDataFun, DecEndFun} = case Enc of
+ gzip ->
+ ungzip_init();
+ identity ->
+ identity_enc_dec_funs()
+ end,
+ Result = foldl_decode(
+ DecDataFun, Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc
+ ),
+ DecEndFun(),
+ Result.
foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
- Md5 = erlang:md5_final(Md5Acc),
+ Md5 = couch_util:md5_final(Md5Acc),
Acc;
foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- Md5 = erlang:md5_final(erlang:md5_update(Md5Acc, Bin)),
+ Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, Bin)),
Fun(Bin, Acc);
foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
- foldl(Fd, Rest, Md5, erlang:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
+ foldl(Fd, Rest, Md5, couch_util:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
+
+foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+ Md5 = couch_util:md5_final(Md5Acc),
+ Acc;
+foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, EncBin)),
+ Bin = DecFun(EncBin),
+ Fun(Bin, Acc);
+foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Bin = DecFun(EncBin),
+ Md5Acc2 = couch_util:md5_update(Md5Acc, EncBin),
+ foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+gzip_init(Options) ->
+ case couch_util:get_value(compression_level, Options, 0) of
+ Lvl when Lvl >= 1 andalso Lvl =< 9 ->
+ Z = zlib:open(),
+ % 15 = ?MAX_WBITS (defined in the zlib module)
+ % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
+ ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
+ {
+ fun(Data) ->
+ zlib:deflate(Z, Data)
+ end,
+ fun() ->
+ Last = zlib:deflate(Z, [], finish),
+ ok = zlib:deflateEnd(Z),
+ ok = zlib:close(Z),
+ Last
+ end
+ };
+ _ ->
+ identity_enc_dec_funs()
+ end.
+
+ungzip_init() ->
+ Z = zlib:open(),
+ zlib:inflateInit(Z, 16 + 15),
+ {
+ fun(Data) ->
+ zlib:inflate(Z, Data)
+ end,
+ fun() ->
+ ok = zlib:inflateEnd(Z),
+ ok = zlib:close(Z)
+ end
+ }.
+
+identity_enc_dec_funs() ->
+ {
+ fun(Data) -> Data end,
+ fun() -> [] end
+ }.
write(_Pid, <<>>) ->
ok;
@@ -105,8 +181,21 @@ write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
-init(Fd) ->
- {ok, #stream{fd=Fd, md5=erlang:md5_init()}}.
+init({Fd, Encoding, Options}) ->
+ {EncodingFun, EndEncodingFun} = case Encoding of
+ identity ->
+ identity_enc_dec_funs();
+ gzip ->
+ gzip_init(Options)
+ end,
+ {ok, #stream{
+ fd=Fd,
+ md5=couch_util:md5_init(),
+ identity_md5=couch_util:md5_init(),
+ encoding_fun=EncodingFun,
+ end_encoding_fun=EndEncodingFun
+ }
+ }.
terminate(_Reason, _Stream) ->
ok.
@@ -120,39 +209,65 @@ handle_call({write, Bin}, _From, Stream) ->
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
- md5 = Md5} = Stream,
+ md5 = Md5,
+ identity_md5 = IdenMd5,
+ identity_len = IdenLen,
+ encoding_fun = EncodingFun} = Stream,
if BinSize + BufferLen > Max ->
WriteBin = lists:reverse(Buffer, [Bin]),
- Md5_2 = erlang:md5_update(Md5, WriteBin),
- {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
+ IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin),
+ case EncodingFun(WriteBin) of
+ [] ->
+ % case where the encoder did some internal buffering
+ % (zlib does it for example)
+ WrittenLen2 = WrittenLen,
+ Md5_2 = Md5,
+ Written2 = Written;
+ WriteBin2 ->
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
+ WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
+ Md5_2 = couch_util:md5_update(Md5, WriteBin2),
+ Written2 = [Pos|Written]
+ end,
+
{reply, ok, Stream#stream{
- written_len=WrittenLen + BufferLen + BinSize,
- written_pointers=[Pos|Written],
+ written_len=WrittenLen2,
+ written_pointers=Written2,
buffer_list=[],
buffer_len=0,
- md5=Md5_2}};
+ md5=Md5_2,
+ identity_md5=IdenMd5_2,
+ identity_len=IdenLen + BinSize}};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
- buffer_len=BufferLen + BinSize}}
+ buffer_len=BufferLen + BinSize,
+ identity_len=IdenLen + BinSize}}
end;
handle_call(close, _From, Stream) ->
#stream{
fd = Fd,
written_len = WrittenLen,
written_pointers = Written,
- buffer_len = BufferLen,
buffer_list = Buffer,
- md5 = Md5} = Stream,
-
- case Buffer of
+ md5 = Md5,
+ identity_md5 = IdenMd5,
+ identity_len = IdenLen,
+ encoding_fun = EncodingFun,
+ end_encoding_fun = EndEncodingFun} = Stream,
+
+ WriteBin = lists:reverse(Buffer),
+ IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)),
+ WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
+ Md5Final = couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2)),
+ Result = case WriteBin2 of
[] ->
- Result = {lists:reverse(Written), WrittenLen, erlang:md5_final(Md5)};
+ {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
- WriteBin = lists:reverse(Buffer),
- Md5Final = erlang:md5_final(erlang:md5_update(Md5, WriteBin)),
- {ok, Pos} = couch_file:append_binary(Fd, WriteBin),
- Result = {lists:reverse(Written, [Pos]), WrittenLen + BufferLen, Md5Final}
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
+ StreamInfo = lists:reverse(Written, [Pos]),
+ StreamLen = WrittenLen + iolist_size(WriteBin2),
+ {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
{stop, normal, Result, Stream}.