summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_stream.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-11 15:22:33 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-11 17:39:37 -0400
commit81bdbed444df2cbcf3cdb32f7d4a74019de06454 (patch)
treeeade7d0d9bb4cac01b55fd8642adfe0f7da35161 /apps/couch/src/couch_stream.erl
parentcc1910f73fbd20c5ffc94bd61e7701d7f5e4c92a (diff)
reorganize couch .erl and driver code into rebar layout
Diffstat (limited to 'apps/couch/src/couch_stream.erl')
-rw-r--r--apps/couch/src/couch_stream.erl319
1 files changed, 319 insertions, 0 deletions
diff --git a/apps/couch/src/couch_stream.erl b/apps/couch/src/couch_stream.erl
new file mode 100644
index 00000000..04c17770
--- /dev/null
+++ b/apps/couch/src/couch_stream.erl
@@ -0,0 +1,319 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stream).
+-behaviour(gen_server).
+
+
+-define(FILE_POINTER_BYTES, 8).
+-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
+
+-define(STREAM_OFFSET_BYTES, 4).
+-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
+
+-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
+
+-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+
+-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6,
+ old_foldl/5,old_copy_to_new_stream/4]).
+-export([copy_to_new_stream/3,old_read_term/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-record(stream,
+ {fd = 0,
+ written_pointers=[],
+ buffer_list = [],
+ buffer_len = 0,
+ max_buffer = 4096,
+ written_len = 0,
+ md5,
+ % md5 of the content without any transformation applied (e.g. compression)
+ % needed for the attachment upload integrity check (ticket 558)
+ identity_md5,
+ identity_len = 0,
+ encoding_fun,
+ end_encoding_fun
+ }).
+
+
+%%% Interface functions %%%
+
+open(Fd) ->
+ open(Fd, identity, []).
+
+open(Fd, Encoding, Options) ->
+ gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []).
+
+close(Pid) ->
+ gen_server:call(Pid, close, infinity).
+
+copy_to_new_stream(Fd, PosList, DestFd) ->
+ {ok, Dest} = open(DestFd),
+ foldl(Fd, PosList,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
+
+
+% 09 UPGRADE CODE
+old_copy_to_new_stream(Fd, Pos, Len, DestFd) ->
+ {ok, Dest} = open(DestFd),
+ old_foldl(Fd, Pos, Len,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
+
+% 09 UPGRADE CODE
+old_foldl(_Fd, null, 0, _Fun, Acc) ->
+ Acc;
+old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)->
+ {ok, Acc2, _} = old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc),
+ Acc2.
+
+foldl(_Fd, [], _Fun, Acc) ->
+ Acc;
+foldl(Fd, [Pos|Rest], Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
+
+foldl(Fd, PosList, <<>>, Fun, Acc) ->
+ foldl(Fd, PosList, Fun, Acc);
+foldl(Fd, PosList, Md5, Fun, Acc) ->
+ foldl(Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc).
+
+foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
+ {DecDataFun, DecEndFun} = case Enc of
+ gzip ->
+ ungzip_init();
+ identity ->
+ identity_enc_dec_funs()
+ end,
+ Result = foldl_decode(
+ DecDataFun, Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc
+ ),
+ DecEndFun(),
+ Result.
+
+foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+ Md5 = couch_util:md5_final(Md5Acc),
+ Acc;
+foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, Bin)),
+ Fun(Bin, Acc);
+foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Md5, couch_util:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
+
+foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
+ Md5 = couch_util:md5_final(Md5Acc),
+ Acc;
+foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, EncBin)),
+ Bin = DecFun(EncBin),
+ Fun(Bin, Acc);
+foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
+ {ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
+ Bin = DecFun(EncBin),
+ Md5Acc2 = couch_util:md5_update(Md5Acc, EncBin),
+ foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
+
+gzip_init(Options) ->
+ case couch_util:get_value(compression_level, Options, 0) of
+ Lvl when Lvl >= 1 andalso Lvl =< 9 ->
+ Z = zlib:open(),
+ % 15 = ?MAX_WBITS (defined in the zlib module)
+ % the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
+ ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
+ {
+ fun(Data) ->
+ zlib:deflate(Z, Data)
+ end,
+ fun() ->
+ Last = zlib:deflate(Z, [], finish),
+ ok = zlib:deflateEnd(Z),
+ ok = zlib:close(Z),
+ Last
+ end
+ };
+ _ ->
+ identity_enc_dec_funs()
+ end.
+
+ungzip_init() ->
+ Z = zlib:open(),
+ zlib:inflateInit(Z, 16 + 15),
+ {
+ fun(Data) ->
+ zlib:inflate(Z, Data)
+ end,
+ fun() ->
+ ok = zlib:inflateEnd(Z),
+ ok = zlib:close(Z)
+ end
+ }.
+
+identity_enc_dec_funs() ->
+ {
+ fun(Data) -> Data end,
+ fun() -> [] end
+ }.
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
+ gen_server:call(Pid, {write, Bin}, infinity).
+
+
+init({Fd, Encoding, Options}) ->
+ {EncodingFun, EndEncodingFun} = case Encoding of
+ identity ->
+ identity_enc_dec_funs();
+ gzip ->
+ gzip_init(Options)
+ end,
+ {ok, #stream{
+ fd=Fd,
+ md5=couch_util:md5_init(),
+ identity_md5=couch_util:md5_init(),
+ encoding_fun=EncodingFun,
+ end_encoding_fun=EndEncodingFun
+ }
+ }.
+
+terminate(_Reason, _Stream) ->
+ ok.
+
+handle_call({write, Bin}, _From, Stream) ->
+ BinSize = iolist_size(Bin),
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer,
+ max_buffer = Max,
+ md5 = Md5,
+ identity_md5 = IdenMd5,
+ identity_len = IdenLen,
+ encoding_fun = EncodingFun} = Stream,
+ if BinSize + BufferLen > Max ->
+ WriteBin = lists:reverse(Buffer, [Bin]),
+ IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin),
+ case EncodingFun(WriteBin) of
+ [] ->
+ % case where the encoder did some internal buffering
+ % (zlib does it for example)
+ WrittenLen2 = WrittenLen,
+ Md5_2 = Md5,
+ Written2 = Written;
+ WriteBin2 ->
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
+ WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
+ Md5_2 = couch_util:md5_update(Md5, WriteBin2),
+ Written2 = [Pos|Written]
+ end,
+
+ {reply, ok, Stream#stream{
+ written_len=WrittenLen2,
+ written_pointers=Written2,
+ buffer_list=[],
+ buffer_len=0,
+ md5=Md5_2,
+ identity_md5=IdenMd5_2,
+ identity_len=IdenLen + BinSize}};
+ true ->
+ {reply, ok, Stream#stream{
+ buffer_list=[Bin|Buffer],
+ buffer_len=BufferLen + BinSize,
+ identity_len=IdenLen + BinSize}}
+ end;
+handle_call(close, _From, Stream) ->
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_list = Buffer,
+ md5 = Md5,
+ identity_md5 = IdenMd5,
+ identity_len = IdenLen,
+ encoding_fun = EncodingFun,
+ end_encoding_fun = EndEncodingFun} = Stream,
+
+ WriteBin = lists:reverse(Buffer),
+ IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)),
+ WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
+ Md5Final = couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2)),
+ Result = case WriteBin2 of
+ [] ->
+ {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
+ _ ->
+ {ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
+ StreamInfo = lists:reverse(Written, [Pos]),
+ StreamLen = WrittenLen + iolist_size(WriteBin2),
+ {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
+ end,
+ {stop, normal, Result, Stream}.
+
+handle_cast(_Msg, State) ->
+ {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+
+
+% 09 UPGRADE CODE
+old_read_term(Fd, Sp) ->
+ {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+ = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+ {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen),
+ {ok, binary_to_term(Bin)}.
+
+old_read(Fd, Sp, Num) ->
+ {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []),
+ Bin = list_to_binary(lists:reverse(RevBin)),
+ {ok, Bin, Sp2}.
+
+% 09 UPGRADE CODE
+old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+ {ok, Acc, Sp};
+old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+ {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
+ = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+ Sp = {NextPos, NextOffset},
+ % Check NextPos is past current Pos (this is always true in a stream)
+ % Guards against potential infinite loops caused by corruption.
+ case NextPos > Pos of
+ true -> ok;
+ false -> throw({error, stream_corruption})
+ end,
+ old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+ ReadAmount = lists:min([MaxChunk, Num, Offset]),
+ {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount),
+ Sp = {Pos + ReadAmount, Offset - ReadAmount},
+ old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)).
+
+
+% Tests moved to tests/etap/050-stream.t
+