diff options
Diffstat (limited to 'src/couchdb/couch_stream.erl')
-rw-r--r-- | src/couchdb/couch_stream.erl | 357 |
1 files changed, 0 insertions, 357 deletions
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl deleted file mode 100644 index 60af1c2b..00000000 --- a/src/couchdb/couch_stream.erl +++ /dev/null @@ -1,357 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_stream). --behaviour(gen_server). - - --define(FILE_POINTER_BYTES, 8). --define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)). - --define(STREAM_OFFSET_BYTES, 4). --define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)). - --define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go - --define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data - --export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, range_foldl/6, foldl_decode/6, - old_foldl/5,old_copy_to_new_stream/4]). --export([copy_to_new_stream/3,old_read_term/2]). --export([init/1, terminate/2, handle_call/3]). --export([handle_cast/2,code_change/3,handle_info/2]). - --include("couch_db.hrl"). - --record(stream, - {fd = 0, - written_pointers=[], - buffer_list = [], - buffer_len = 0, - max_buffer = 4096, - written_len = 0, - md5, - % md5 of the content without any transformation applied (e.g. compression) - % needed for the attachment upload integrity check (ticket 558) - identity_md5, - identity_len = 0, - encoding_fun, - end_encoding_fun - }). - - -%%% Interface functions %%% - -open(Fd) -> - open(Fd, identity, []). - -open(Fd, Encoding, Options) -> - gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []). - -close(Pid) -> - gen_server:call(Pid, close, infinity). - -copy_to_new_stream(Fd, PosList, DestFd) -> - {ok, Dest} = open(DestFd), - foldl(Fd, PosList, - fun(Bin, _) -> - ok = write(Dest, Bin) - end, ok), - close(Dest). - - -% 09 UPGRADE CODE -old_copy_to_new_stream(Fd, Pos, Len, DestFd) -> - {ok, Dest} = open(DestFd), - old_foldl(Fd, Pos, Len, - fun(Bin, _) -> - ok = write(Dest, Bin) - end, ok), - close(Dest). - -% 09 UPGRADE CODE -old_foldl(_Fd, null, 0, _Fun, Acc) -> - Acc; -old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)-> - {ok, Acc2, _} = old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc), - Acc2. - -foldl(_Fd, [], _Fun, Acc) -> - Acc; -foldl(Fd, [Pos|Rest], Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - foldl(Fd, Rest, Fun, Fun(Bin, Acc)). - -foldl(Fd, PosList, <<>>, Fun, Acc) -> - foldl(Fd, PosList, Fun, Acc); -foldl(Fd, PosList, Md5, Fun, Acc) -> - foldl(Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc). - -foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) -> - {DecDataFun, DecEndFun} = case Enc of - gzip -> - ungzip_init(); - identity -> - identity_enc_dec_funs() - end, - Result = foldl_decode( - DecDataFun, Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc - ), - DecEndFun(), - Result. - -foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> - Md5 = couch_util:md5_final(Md5Acc), - Acc; -foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE - foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc); -foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, Bin)), - Fun(Bin, Acc); -foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> - foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); -foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - foldl(Fd, Rest, Md5, couch_util:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). - -range_foldl(Fd, PosList, From, To, Fun, Acc) -> - range_foldl(Fd, PosList, From, To, 0, Fun, Acc). - -range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To -> - Acc; -range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc); -range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size -> - range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc); -range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - Bin1 = if - From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered - true -> - PrefixLen = clip(From - Off, 0, Size), - PostfixLen = clip(Off + Size - To, 0, Size), - MatchLen = Size - PrefixLen - PostfixLen, - <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin), - Match - end, - range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)). - -clip(Value, Lo, Hi) -> - if - Value < Lo -> Lo; - Value > Hi -> Hi; - true -> Value - end. - -foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> - Md5 = couch_util:md5_final(Md5Acc), - Acc; -foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> - foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc); -foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), - Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, EncBin)), - Bin = DecFun(EncBin), - Fun(Bin, Acc); -foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> - foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); -foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), - Bin = DecFun(EncBin), - Md5Acc2 = couch_util:md5_update(Md5Acc, EncBin), - foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). - -gzip_init(Options) -> - case couch_util:get_value(compression_level, Options, 0) of - Lvl when Lvl >= 1 andalso Lvl =< 9 -> - Z = zlib:open(), - % 15 = ?MAX_WBITS (defined in the zlib module) - % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1 - ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default), - { - fun(Data) -> - zlib:deflate(Z, Data) - end, - fun() -> - Last = zlib:deflate(Z, [], finish), - ok = zlib:deflateEnd(Z), - ok = zlib:close(Z), - Last - end - }; - _ -> - identity_enc_dec_funs() - end. - -ungzip_init() -> - Z = zlib:open(), - zlib:inflateInit(Z, 16 + 15), - { - fun(Data) -> - zlib:inflate(Z, Data) - end, - fun() -> - ok = zlib:inflateEnd(Z), - ok = zlib:close(Z) - end - }. - -identity_enc_dec_funs() -> - { - fun(Data) -> Data end, - fun() -> [] end - }. - -write(_Pid, <<>>) -> - ok; -write(Pid, Bin) -> - gen_server:call(Pid, {write, Bin}, infinity). - - -init({Fd, Encoding, Options}) -> - {EncodingFun, EndEncodingFun} = case Encoding of - identity -> - identity_enc_dec_funs(); - gzip -> - gzip_init(Options) - end, - {ok, #stream{ - fd=Fd, - md5=couch_util:md5_init(), - identity_md5=couch_util:md5_init(), - encoding_fun=EncodingFun, - end_encoding_fun=EndEncodingFun - } - }. - -terminate(_Reason, _Stream) -> - ok. - -handle_call({write, Bin}, _From, Stream) -> - BinSize = iolist_size(Bin), - #stream{ - fd = Fd, - written_len = WrittenLen, - written_pointers = Written, - buffer_len = BufferLen, - buffer_list = Buffer, - max_buffer = Max, - md5 = Md5, - identity_md5 = IdenMd5, - identity_len = IdenLen, - encoding_fun = EncodingFun} = Stream, - if BinSize + BufferLen > Max -> - WriteBin = lists:reverse(Buffer, [Bin]), - IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin), - case EncodingFun(WriteBin) of - [] -> - % case where the encoder did some internal buffering - % (zlib does it for example) - WrittenLen2 = WrittenLen, - Md5_2 = Md5, - Written2 = Written; - WriteBin2 -> - {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), - WrittenLen2 = WrittenLen + iolist_size(WriteBin2), - Md5_2 = couch_util:md5_update(Md5, WriteBin2), - Written2 = [{Pos, iolist_size(WriteBin2)}|Written] - end, - - {reply, ok, Stream#stream{ - written_len=WrittenLen2, - written_pointers=Written2, - buffer_list=[], - buffer_len=0, - md5=Md5_2, - identity_md5=IdenMd5_2, - identity_len=IdenLen + BinSize}}; - true -> - {reply, ok, Stream#stream{ - buffer_list=[Bin|Buffer], - buffer_len=BufferLen + BinSize, - identity_len=IdenLen + BinSize}} - end; -handle_call(close, _From, Stream) -> - #stream{ - fd = Fd, - written_len = WrittenLen, - written_pointers = Written, - buffer_list = Buffer, - md5 = Md5, - identity_md5 = IdenMd5, - identity_len = IdenLen, - encoding_fun = EncodingFun, - end_encoding_fun = EndEncodingFun} = Stream, - - WriteBin = lists:reverse(Buffer), - IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)), - WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(), - Md5Final = couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2)), - Result = case WriteBin2 of - [] -> - {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; - _ -> - {ok, Pos} = couch_file:append_binary(Fd, WriteBin2), - StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]), - StreamLen = WrittenLen + iolist_size(WriteBin2), - {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final} - end, - {stop, normal, Result, Stream}. - -handle_cast(_Msg, State) -> - {noreply,State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_info(_Info, State) -> - {noreply, State}. - - - -% 09 UPGRADE CODE -old_read_term(Fd, Sp) -> - {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2} - = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES), - {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen), - {ok, binary_to_term(Bin)}. - -old_read(Fd, Sp, Num) -> - {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []), - Bin = list_to_binary(lists:reverse(RevBin)), - {ok, Bin, Sp2}. - -% 09 UPGRADE CODE -old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) -> - {ok, Acc, Sp}; -old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> - {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>} - = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), - Sp = {NextPos, NextOffset}, - % Check NextPos is past current Pos (this is always true in a stream) - % Guards against potential infinite loops caused by corruption. - case NextPos > Pos of - true -> ok; - false -> throw({error, stream_corruption}) - end, - old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc); -old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) -> - ReadAmount = lists:min([MaxChunk, Num, Offset]), - {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount), - Sp = {Pos + ReadAmount, Offset - ReadAmount}, - old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)). - - -% Tests moved to tests/etap/050-stream.t - |