diff options
Diffstat (limited to 'apps/couch/src/couch_stream.erl')
-rw-r--r-- | apps/couch/src/couch_stream.erl | 319 |
1 files changed, 319 insertions, 0 deletions
diff --git a/apps/couch/src/couch_stream.erl b/apps/couch/src/couch_stream.erl new file mode 100644 index 00000000..04c17770 --- /dev/null +++ b/apps/couch/src/couch_stream.erl @@ -0,0 +1,319 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_stream). +-behaviour(gen_server). + + +-define(FILE_POINTER_BYTES, 8). +-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)). + +-define(STREAM_OFFSET_BYTES, 4). +-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)). + +-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go + +-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data + +-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6, + old_foldl/5,old_copy_to_new_stream/4]). +-export([copy_to_new_stream/3,old_read_term/2]). +-export([init/1, terminate/2, handle_call/3]). +-export([handle_cast/2,code_change/3,handle_info/2]). + +-include("couch_db.hrl"). + +-record(stream, + {fd = 0, + written_pointers=[], + buffer_list = [], + buffer_len = 0, + max_buffer = 4096, + written_len = 0, + md5, + % md5 of the content without any transformation applied (e.g. compression) + % needed for the attachment upload integrity check (ticket 558) + identity_md5, + identity_len = 0, + encoding_fun, + end_encoding_fun + }). + + +%%% Interface functions %%% + +open(Fd) -> + open(Fd, identity, []). + +open(Fd, Encoding, Options) -> + gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []). + +close(Pid) -> + gen_server:call(Pid, close, infinity). + +copy_to_new_stream(Fd, PosList, DestFd) -> + {ok, Dest} = open(DestFd), + foldl(Fd, PosList, + fun(Bin, _) -> + ok = write(Dest, Bin) + end, ok), + close(Dest). + + +% 09 UPGRADE CODE +old_copy_to_new_stream(Fd, Pos, Len, DestFd) -> + {ok, Dest} = open(DestFd), + old_foldl(Fd, Pos, Len, + fun(Bin, _) -> + ok = write(Dest, Bin) + end, ok), + close(Dest). + +% 09 UPGRADE CODE +old_foldl(_Fd, null, 0, _Fun, Acc) -> + Acc; +old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)-> + {ok, Acc2, _} = old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc), + Acc2. + +foldl(_Fd, [], _Fun, Acc) -> + Acc; +foldl(Fd, [Pos|Rest], Fun, Acc) -> + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + foldl(Fd, Rest, Fun, Fun(Bin, Acc)). + +foldl(Fd, PosList, <<>>, Fun, Acc) -> + foldl(Fd, PosList, Fun, Acc); +foldl(Fd, PosList, Md5, Fun, Acc) -> + foldl(Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc). + +foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) -> + {DecDataFun, DecEndFun} = case Enc of + gzip -> + ungzip_init(); + identity -> + identity_enc_dec_funs() + end, + Result = foldl_decode( + DecDataFun, Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc + ), + DecEndFun(), + Result. + +foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> + Md5 = couch_util:md5_final(Md5Acc), + Acc; +foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, Bin)), + Fun(Bin, Acc); +foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + foldl(Fd, Rest, Md5, couch_util:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). + +foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> + Md5 = couch_util:md5_final(Md5Acc), + Acc; +foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> + {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), + Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, EncBin)), + Bin = DecFun(EncBin), + Fun(Bin, Acc); +foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> + {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), + Bin = DecFun(EncBin), + Md5Acc2 = couch_util:md5_update(Md5Acc, EncBin), + foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). + +gzip_init(Options) -> + case couch_util:get_value(compression_level, Options, 0) of + Lvl when Lvl >= 1 andalso Lvl =< 9 -> + Z = zlib:open(), + % 15 = ?MAX_WBITS (defined in the zlib module) + % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 + ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default), + { + fun(Data) -> + zlib:deflate(Z, Data) + end, + fun() -> + Last = zlib:deflate(Z, [], finish), + ok = zlib:deflateEnd(Z), + ok = zlib:close(Z), + Last + end + }; + _ -> + identity_enc_dec_funs() + end. + +ungzip_init() -> + Z = zlib:open(), + zlib:inflateInit(Z, 16 + 15), + { + fun(Data) -> + zlib:inflate(Z, Data) + end, + fun() -> + ok = zlib:inflateEnd(Z), + ok = zlib:close(Z) + end + }. + +identity_enc_dec_funs() -> + { + fun(Data) -> Data end, + fun() -> [] end + }. + +write(_Pid, <<>>) -> + ok; +write(Pid, Bin) -> + gen_server:call(Pid, {write, Bin}, infinity). + + +init({Fd, Encoding, Options}) -> + {EncodingFun, EndEncodingFun} = case Encoding of + identity -> + identity_enc_dec_funs(); + gzip -> + gzip_init(Options) + end, + {ok, #stream{ + fd=Fd, + md5=couch_util:md5_init(), + identity_md5=couch_util:md5_init(), + encoding_fun=EncodingFun, + end_encoding_fun=EndEncodingFun + } + }. + +terminate(_Reason, _Stream) -> + ok. + +handle_call({write, Bin}, _From, Stream) -> + BinSize = iolist_size(Bin), + #stream{ + fd = Fd, + written_len = WrittenLen, + written_pointers = Written, + buffer_len = BufferLen, + buffer_list = Buffer, + max_buffer = Max, + md5 = Md5, + identity_md5 = IdenMd5, + identity_len = IdenLen, + encoding_fun = EncodingFun} = Stream, + if BinSize + BufferLen > Max -> + WriteBin = lists:reverse(Buffer, [Bin]), + IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin), + case EncodingFun(WriteBin) of + [] -> + % case where the encoder did some internal buffering + % (zlib does it for example) + WrittenLen2 = WrittenLen, + Md5_2 = Md5, + Written2 = Written; + WriteBin2 -> + {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), + WrittenLen2 = WrittenLen + iolist_size(WriteBin2), + Md5_2 = couch_util:md5_update(Md5, WriteBin2), + Written2 = [Pos|Written] + end, + + {reply, ok, Stream#stream{ + written_len=WrittenLen2, + written_pointers=Written2, + buffer_list=[], + buffer_len=0, + md5=Md5_2, + identity_md5=IdenMd5_2, + identity_len=IdenLen + BinSize}}; + true -> + {reply, ok, Stream#stream{ + buffer_list=[Bin|Buffer], + buffer_len=BufferLen + BinSize, + identity_len=IdenLen + BinSize}} + end; +handle_call(close, _From, Stream) -> + #stream{ + fd = Fd, + written_len = WrittenLen, + written_pointers = Written, + buffer_list = Buffer, + md5 = Md5, + identity_md5 = IdenMd5, + identity_len = IdenLen, + encoding_fun = EncodingFun, + end_encoding_fun = EndEncodingFun} = Stream, + + WriteBin = lists:reverse(Buffer), + IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)), + WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(), + Md5Final = couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2)), + Result = case WriteBin2 of + [] -> + {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; + _ -> + {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), + StreamInfo = lists:reverse(Written, [Pos]), + StreamLen = WrittenLen + iolist_size(WriteBin2), + {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final} + end, + {stop, normal, Result, Stream}. + +handle_cast(_Msg, State) -> + {noreply,State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(_Info, State) -> + {noreply, State}. + + + +% 09 UPGRADE CODE +old_read_term(Fd, Sp) -> + {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2} + = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES), + {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen), + {ok, binary_to_term(Bin)}. + +old_read(Fd, Sp, Num) -> + {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []), + Bin = list_to_binary(lists:reverse(RevBin)), + {ok, Bin, Sp2}. + +% 09 UPGRADE CODE +old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) -> + {ok, Acc, Sp}; +old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> + {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>} + = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), + Sp = {NextPos, NextOffset}, + % Check NextPos is past current Pos (this is always true in a stream) + % Guards against potential infinite loops caused by corruption. + case NextPos > Pos of + true -> ok; + false -> throw({error, stream_corruption}) + end, + old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc); +old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) -> + ReadAmount = lists:min([MaxChunk, Num, Offset]), + {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount), + Sp = {Pos + ReadAmount, Offset - ReadAmount}, + old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)). + + +% Tests moved to tests/etap/050-stream.t + |