summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_stream.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-05-25 19:52:28 +0000
committerDamien F. Katz <damien@apache.org>2009-05-25 19:52:28 +0000
commit16ccd4c0b8ae4272fa27d32948658b1424a291fc (patch)
treef6d59d017234409436091cc53938b27549d9b54f /src/couchdb/couch_stream.erl
parent4aac0f7c6dcd3f3a29cfe5e1bf2bee84b9bae9d5 (diff)
Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_stream.erl')
-rw-r--r--src/couchdb/couch_stream.erl316
1 files changed, 134 insertions, 182 deletions
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index d6f72696..bf9fd3c2 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -13,14 +13,6 @@
-module(couch_stream).
-behaviour(gen_server).
--export([test/1]).
--export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
--export([copy/4, copy_to_new_stream/4]).
--export([ensure_buffer/2, set_min_buffer/2]).
--export([init/1, terminate/2, handle_call/3]).
--export([handle_cast/2,code_change/3,handle_info/2]).
-
--include("couch_db.hrl").
-define(FILE_POINTER_BYTES, 8).
-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
@@ -32,125 +24,111 @@
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+-export([test/0]).
+-export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]).
+-export([copy_to_new_stream/3,old_read_term/2]).
+-export([init/1, terminate/2, handle_call/3]).
+-export([handle_cast/2,code_change/3,handle_info/2]).
--record(write_stream,
- {fd = 0,
- current_pos = 0,
- bytes_remaining = 0,
- next_alloc = 0,
- min_alloc = 16#00010000
- }).
+-include("couch_db.hrl").
-record(stream,
- {
- pid,
- fd
+ {fd = 0,
+ written_pointers=[],
+ buffer_list = [],
+ buffer_len = 0,
+ max_buffer = 4096,
+ written_len = 0
}).
%%% Interface functions %%%
open(Fd) ->
- open(nil, Fd).
+ gen_server:start_link(couch_stream, Fd, []).
-open(nil, Fd) ->
- open({0,0}, Fd);
-open(State, Fd) ->
- {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
- {ok, #stream{pid = Pid, fd = Fd}}.
-
-close(#stream{pid = Pid, fd = _Fd}) ->
+close(Pid) ->
gen_server:call(Pid, close, infinity).
-get_state(#stream{pid = Pid, fd = _Fd}) ->
- gen_server:call(Pid, get_state, infinity).
-
-ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
- gen_server:call(Pid, {ensure_buffer, Bytes}).
-
-set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
- gen_server:call(Pid, {set_min_buffer, Bytes}).
-
-read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
- read(Fd, Sp, Num);
-read(Fd, Sp, Num) ->
- {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
- Bin = list_to_binary(lists:reverse(RevBin)),
- {ok, Bin, Sp2}.
-
-copy_to_new_stream(Src, Sp, Len, DestFd) ->
+copy_to_new_stream(Fd, PosList, DestFd) ->
{ok, Dest} = open(DestFd),
- ok = set_min_buffer(Dest, 0),
- {ok, NewSp} = copy(Src, Sp, Len, Dest),
- close(Dest),
- {ok, NewSp}.
-
-copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) ->
- copy(Fd, Sp, Len, DestStream);
-copy(Fd, Sp, Len, DestStream) ->
- {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK,
- fun(Bin, AccPointer) ->
- {ok, NewPointer} = write(DestStream, Bin),
- {ok, if AccPointer == null -> NewPointer; true -> AccPointer end}
- end,
- null),
- {ok, NewSp}.
-
-foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
- foldl(Fd, Sp, Num, Fun, Acc);
-foldl(Fd, Sp, Num, Fun, Acc) ->
- {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
-
-read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
- read_term(Fd, Sp);
-read_term(Fd, Sp) ->
- {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
- = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
- {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
- {ok, binary_to_term(Bin)}.
+ foldl(Fd, PosList,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
-write_term(Stream, Term) ->
- Bin = term_to_binary(Term),
- Size = size(Bin),
- Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
- write(Stream, Bin2).
-write(#stream{}, <<>>) ->
- {ok, {0,0}};
-write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+% 09 UPGRADE CODE
+old_copy_to_new_stream(Fd, Pos, Len, DestFd) ->
+ {ok, Dest} = open(DestFd),
+ old_foldl(Fd, Pos, Len,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
+
+% 09 UPGRADE CODE
+old_foldl(_Fd, null, 0, _Fun, Acc) ->
+ Acc;
+old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)->
+ old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
+
+foldl(_Fd, [], _Fun, Acc) ->
+ Acc;
+foldl(Fd, [Pos|Rest], Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
+
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
-init({{Pos, BytesRemaining}, Fd}) ->
- {ok, #write_stream
- {fd = Fd,
- current_pos = Pos,
- bytes_remaining = BytesRemaining
- }}.
+init(Fd) ->
+ {ok, #stream{fd = Fd}}.
terminate(_Reason, _Stream) ->
ok.
-handle_call(get_state, _From, Stream) ->
- #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
- {reply, {Pos, BytesRemaining}, Stream};
-handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
- {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
-% set next_alloc if we need more room
-handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
- #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
- case BytesRemainingInCurrentBuffer < BufferSizeRequested of
- true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
- false -> NextAlloc = 0 % enough room in current segment
- end,
- {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
handle_call({write, Bin}, _From, Stream) ->
- % ensure init is called first so we can get a pointer to the begining of the binary
- {ok, Sp, Stream2} = write_data(Stream, Bin),
- {reply, {ok, Sp}, Stream2};
+ BinSize = iolist_size(Bin),
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer,
+ max_buffer = Max} = Stream,
+ if BinSize + BufferLen > Max ->
+ {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, Bin)),
+ {reply, ok, Stream#stream{
+ written_len=WrittenLen + BufferLen + BinSize,
+ written_pointers=[Pos|Written],
+ buffer_list=[],
+ buffer_len=0}};
+ true ->
+ {reply, ok, Stream#stream{
+ buffer_list=[Bin|Buffer],
+ buffer_len=BufferLen + BinSize}}
+ end;
handle_call(close, _From, Stream) ->
- #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
- {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer} = Stream,
+
+ case Buffer of
+ [] ->
+ Result = {Written, WrittenLen};
+ _ ->
+ {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
+ Result = {[Pos|Written], WrittenLen + BufferLen}
+ end,
+ {stop, normal, Result, Stream}.
handle_cast(_Msg, State) ->
{noreply,State}.
@@ -160,14 +138,27 @@ code_change(_OldVsn, State, _Extra) ->
handle_info(_Info, State) ->
{noreply, State}.
+
+
-%%% Internal function %%%
+% 09 UPGRADE CODE
+old_read_term(Fd, Sp) ->
+ {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+ = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+ {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen),
+ {ok, binary_to_term(Bin)}.
-stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+old_read(Fd, Sp, Num) ->
+ {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []),
+ Bin = list_to_binary(lists:reverse(RevBin)),
+ {ok, Bin, Sp2}.
+
+% 09 UPGRADE CODE
+old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
{ok, Acc, Sp};
-stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
{ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
- = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+ = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
Sp = {NextPos, NextOffset},
% Check NextPos is past current Pos (this is always true in a stream)
% Guards against potential infinite loops caused by corruption.
@@ -175,86 +166,47 @@ stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
true -> ok;
false -> throw({error, stream_corruption})
end,
- stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
-stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+ old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
ReadAmount = lists:min([MaxChunk, Num, Offset]),
- {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
+ {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount),
Sp = {Pos + ReadAmount, Offset - ReadAmount},
- case Fun(Bin, Acc) of
- {ok, Acc2} ->
- stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
- {stop, Acc2} ->
- {ok, Acc2, Sp}
- end.
-
-write_data(Stream, <<>>) ->
- {ok, {0,0}, Stream};
-write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
- #write_stream {
- fd = Fd,
- current_pos = CurrentPos,
- next_alloc = NextAlloc,
- min_alloc = MinAlloc
- }= Stream,
-
- NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
- % no space in the current segment, must alloc a new segment
- {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
-
- case CurrentPos of
- 0 ->
- ok;
- _ ->
- ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
- end,
- Stream2 = Stream#write_stream{
- current_pos=NewPos,
- bytes_remaining=NewSize,
- next_alloc=0},
- write_data(Stream2, Bin);
-write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
- BytesToWrite = lists:min([size(Bin), BytesRemaining]),
- {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
- ok = couch_file:pwrite(Fd, Pos, WriteBin),
- Stream2 = Stream#write_stream{
- bytes_remaining=BytesRemaining - BytesToWrite,
- current_pos=Pos + BytesToWrite
- },
- {ok, _, Stream3} = write_data(Stream2, Rest),
- {ok, {Pos, BytesRemaining}, Stream3}.
+ old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)).
%%% Tests %%%
-
-test(Term) ->
- {ok, Fd} = couch_file:open("foo", [write]),
- {ok, Stream} = open({0,0}, Fd),
- {ok, Pos} = write_term(Stream, Term),
- {ok, Pos2} = write_term(Stream, {Term, Term}),
- close(Stream),
+read_all(Fd, PosList) ->
+ iolist_to_binary(foldl(Fd, PosList,
+ fun(Bin, Acc) ->
+ [Bin, Acc]
+ end, [])).
+
+
+test() ->
+ {ok, Fd} = couch_file:open("foo", [create,overwrite]),
+ ok = couch_file:write_header(Fd, {howdy, howdy}),
+ Bin = <<"damienkatz">>,
+ {ok, Pos} = couch_file:append_binary(Fd, Bin),
+ {ok, Bin} = couch_file:pread_binary(Fd, Pos),
+ {ok, {howdy, howdy}} = couch_file:read_header(Fd),
+ ok = couch_file:write_header(Fd, {foo, foo}),
+ {ok, {foo, foo}} = couch_file:read_header(Fd),
+
+ {ok, Stream} = open(Fd),
+ ok = write(Stream, <<"food">>),
+ ok = write(Stream, <<"foob">>),
+ {PosList, 8} = close(Stream),
+ <<"foodfoob">> = read_all(Fd, PosList),
+ {ok, Stream2} = open(Fd),
+ OneBits = <<1:(8*10)>>,
+ ZeroBits = <<0:(8*10)>>,
+ ok = write(Stream2, OneBits),
+ ok = write(Stream2, ZeroBits),
+ {PosList2, 20} = close(Stream2),
+ AllBits = iolist_to_binary([OneBits,ZeroBits]),
+ AllBits = read_all(Fd, PosList2),
couch_file:close(Fd),
- {ok, Fd2} = couch_file:open("foo", [read, write]),
- {ok, Stream2} = open({0,0}, Fd2),
- {ok, Term1} = read_term(Fd2, Pos),
- io:format("Term1: ~w ~n",[Term1]),
- {ok, Term2} = read_term(Fd2, Pos2),
- io:format("Term2: ~w ~n",[Term2]),
- {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
- deep_read_test(Fd2, PointerList),
- close(Stream2),
- couch_file:close(Fd2).
-
-deep_read_test(_Fd, []) ->
- ok;
-deep_read_test(Fd, [Pointer | RestPointerList]) ->
- {ok, _Term} = read_term(Fd, Pointer),
- deep_read_test(Fd, RestPointerList).
-
-deep_write_test(_Stream, _Term, 0, PointerList) ->
- {ok, PointerList};
-deep_write_test(Stream, Term, N, PointerList) ->
- WriteList = lists:duplicate(random:uniform(N), Term),
- {ok, Pointer} = write_term(Stream, WriteList),
- deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).
+ PosList2.
+