summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_stream.erl
diff options
context:
space:
mode:
authorChristopher Lenz <cmlenz@apache.org>2008-03-28 23:32:19 +0000
committerChristopher Lenz <cmlenz@apache.org>2008-03-28 23:32:19 +0000
commit544a38dd45f6a58d34296c6c768afd086eb2ac70 (patch)
treec84cc02340b06aae189cff0dbfaee698f273f1f5 /src/couchdb/couch_stream.erl
parent804cbbe033b8e7a3e8d7058aaf31bdf69ef18ac5 (diff)
Imported trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@642432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_stream.erl')
-rw-r--r--src/couchdb/couch_stream.erl252
1 files changed, 252 insertions, 0 deletions
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
new file mode 100644
index 00000000..d5157b4d
--- /dev/null
+++ b/src/couchdb/couch_stream.erl
@@ -0,0 +1,252 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_stream).
+-behaviour(gen_server).
+
+-export([test/1]).
+-export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
+-export([copy/4]).
+-export([ensure_buffer/2, set_min_buffer/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(FILE_POINTER_BYTES, 8).
+-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
+
+-define(STREAM_OFFSET_BYTES, 4).
+-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
+
+-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
+
+-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+
+
+-record(write_stream,
+ {fd = 0,
+ current_pos = 0,
+ bytes_remaining = 0,
+ next_alloc = 0,
+ min_alloc = 16#00010000
+ }).
+
+-record(stream,
+ {
+ pid,
+ fd
+ }).
+
+
+%%% Interface functions %%%
+
+open(Fd) ->
+ open(nil, Fd).
+
+open(nil, Fd) ->
+ open({0,0}, Fd);
+open(State, Fd) ->
+ {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
+ {ok, #stream{pid = Pid, fd = Fd}}.
+
+close(#stream{pid = Pid, fd = _Fd}) ->
+ gen_server:call(Pid, close).
+
+get_state(#stream{pid = Pid, fd = _Fd}) ->
+ gen_server:call(Pid, get_state).
+
+ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
+ gen_server:call(Pid, {ensure_buffer, Bytes}).
+
+set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
+ gen_server:call(Pid, {set_min_buffer, Bytes}).
+
+read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
+ read(Fd, Sp, Num);
+read(Fd, Sp, Num) ->
+ {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
+ Bin = list_to_binary(lists:reverse(RevBin)),
+ {ok, Bin, Sp2}.
+
+copy(#stream{pid = _Pid, fd = Fd}, Sp, Num, DestStream) ->
+ copy(Fd, Sp, Num, DestStream);
+copy(Fd, Sp, Num, DestStream) ->
+ {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK,
+ fun(Bin, AccPointer) ->
+ {ok, NewPointer} = write(Bin, DestStream),
+ if AccPointer == null -> NewPointer; true -> AccPointer end
+ end,
+ null),
+ {ok, NewSp}.
+
+foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
+ foldl(Fd, Sp, Num, Fun, Acc);
+foldl(Fd, Sp, Num, Fun, Acc) ->
+ {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
+
+read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
+ read_term(Fd, Sp);
+read_term(Fd, Sp) ->
+ {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+ = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+ {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
+ {ok, binary_to_term(Bin)}.
+
+write_term(Stream, Term) ->
+ Bin = term_to_binary(Term),
+ Size = size(Bin),
+ Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
+ write(Stream, Bin2).
+
+write(#stream{}, <<>>) ->
+ {ok, {0,0}};
+write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+ gen_server:call(Pid, {write, Bin}).
+
+
+init({{Pos, BytesRemaining}, Fd}) ->
+ {ok, #write_stream
+ {fd = Fd,
+ current_pos = Pos,
+ bytes_remaining = BytesRemaining
+ }}.
+
+terminate(_Reason, _Stream) ->
+ ok.
+
+handle_call(get_state, _From, Stream) ->
+ #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
+ {reply, {Pos, BytesRemaining}, Stream};
+handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
+ {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
+handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
+ #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
+ case BytesRemainingInCurrentBuffer < BufferSizeRequested of
+ true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
+ false -> NextAlloc = 0 % enough room in current segment
+ end,
+ {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
+handle_call({write, Bin}, _From, Stream) ->
+ % ensure init is called first so we can get a pointer to the begining of the binary
+ {ok, Sp, Stream2} = write_data(Stream, Bin),
+ {reply, {ok, Sp}, Stream2};
+handle_call(close, _From, Stream) ->
+ #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
+ {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+
+handle_cast(_Msg, State) ->
+ {noreply,State}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%% Internal function %%%
+
+stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+ {ok, Acc, Sp};
+stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+ {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
+ = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+ Sp = {NextPos, NextOffset},
+ % Check NextPos is past current Pos (this is always true in a stream)
+ % Guards against potential infinite loops caused by corruption.
+ case NextPos > Pos of
+ true -> ok;
+ false -> throw({error, stream_corruption})
+ end,
+ stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+ ReadAmount = lists:min([MaxChunk, Num, Offset]),
+ {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
+ Sp = {Pos + ReadAmount, Offset - ReadAmount},
+ case Fun(Bin, Acc) of
+ {ok, Acc2} ->
+ stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
+ {stop, Acc2} ->
+ {ok, Acc2, Sp}
+ end.
+
+write_data(Stream, <<>>) ->
+ {ok, {0,0}, Stream};
+write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
+ #write_stream {
+ fd = Fd,
+ current_pos = CurrentPos,
+ next_alloc = NextAlloc,
+ min_alloc = MinAlloc
+ }= Stream,
+
+ NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
+ % no space in the current segment, must alloc a new segment
+ {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+
+ case CurrentPos of
+ 0 ->
+ ok;
+ _ ->
+ ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
+ end,
+ Stream2 = Stream#write_stream{
+ current_pos=NewPos,
+ bytes_remaining=NewSize,
+ next_alloc=0},
+ write_data(Stream2, Bin);
+write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
+ BytesToWrite = lists:min([size(Bin), BytesRemaining]),
+ {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
+ ok = couch_file:pwrite(Fd, Pos, WriteBin),
+ Stream2 = Stream#write_stream{
+ bytes_remaining=BytesRemaining - BytesToWrite,
+ current_pos=Pos + BytesToWrite
+ },
+ {ok, _, Stream3} = write_data(Stream2, Rest),
+ {ok, {Pos, BytesRemaining}, Stream3}.
+
+
+
+%%% Tests %%%
+
+
+test(Term) ->
+ {ok, Fd} = couch_file:open("foo", [write]),
+ {ok, Stream} = open({0,0}, Fd),
+ {ok, Pos} = write_term(Stream, Term),
+ {ok, Pos2} = write_term(Stream, {Term, Term}),
+ close(Stream),
+ couch_file:close(Fd),
+ {ok, Fd2} = couch_file:open("foo", [read, write]),
+ {ok, Stream2} = open({0,0}, Fd2),
+ {ok, Term1} = read_term(Fd2, Pos),
+ io:format("Term1: ~w ~n",[Term1]),
+ {ok, Term2} = read_term(Fd2, Pos2),
+ io:format("Term2: ~w ~n",[Term2]),
+ {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
+ deep_read_test(Fd2, PointerList),
+ close(Stream2),
+ couch_file:close(Fd2).
+
+deep_read_test(_Fd, []) ->
+ ok;
+deep_read_test(Fd, [Pointer | RestPointerList]) ->
+ {ok, _Term} = read_term(Fd, Pointer),
+ deep_read_test(Fd, RestPointerList).
+
+deep_write_test(_Stream, _Term, 0, PointerList) ->
+ {ok, PointerList};
+deep_write_test(Stream, Term, N, PointerList) ->
+ WriteList = lists:duplicate(random:uniform(N), Term),
+ {ok, Pointer} = write_term(Stream, WriteList),
+ deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).