From 16ccd4c0b8ae4272fa27d32948658b1424a291fc Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Mon, 25 May 2009 19:52:28 +0000 Subject: Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_stream.erl | 316 ++++++++++++++++++------------------------- 1 file changed, 134 insertions(+), 182 deletions(-) (limited to 'src/couchdb/couch_stream.erl') diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index d6f72696..bf9fd3c2 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -13,14 +13,6 @@ -module(couch_stream). -behaviour(gen_server). --export([test/1]). --export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]). --export([copy/4, copy_to_new_stream/4]). --export([ensure_buffer/2, set_min_buffer/2]). --export([init/1, terminate/2, handle_call/3]). --export([handle_cast/2,code_change/3,handle_info/2]). - --include("couch_db.hrl"). -define(FILE_POINTER_BYTES, 8). -define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)). @@ -32,125 +24,111 @@ -define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data +-export([test/0]). +-export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]). +-export([copy_to_new_stream/3,old_read_term/2]). +-export([init/1, terminate/2, handle_call/3]). +-export([handle_cast/2,code_change/3,handle_info/2]). --record(write_stream, - {fd = 0, - current_pos = 0, - bytes_remaining = 0, - next_alloc = 0, - min_alloc = 16#00010000 - }). +-include("couch_db.hrl"). -record(stream, - { - pid, - fd + {fd = 0, + written_pointers=[], + buffer_list = [], + buffer_len = 0, + max_buffer = 4096, + written_len = 0 }). %%% Interface functions %%% open(Fd) -> - open(nil, Fd). + gen_server:start_link(couch_stream, Fd, []). -open(nil, Fd) -> - open({0,0}, Fd); -open(State, Fd) -> - {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []), - {ok, #stream{pid = Pid, fd = Fd}}. - -close(#stream{pid = Pid, fd = _Fd}) -> +close(Pid) -> gen_server:call(Pid, close, infinity). -get_state(#stream{pid = Pid, fd = _Fd}) -> - gen_server:call(Pid, get_state, infinity). - -ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) -> - gen_server:call(Pid, {ensure_buffer, Bytes}). - -set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) -> - gen_server:call(Pid, {set_min_buffer, Bytes}). - -read(#stream{pid = _Pid, fd = Fd}, Sp, Num) -> - read(Fd, Sp, Num); -read(Fd, Sp, Num) -> - {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []), - Bin = list_to_binary(lists:reverse(RevBin)), - {ok, Bin, Sp2}. - -copy_to_new_stream(Src, Sp, Len, DestFd) -> +copy_to_new_stream(Fd, PosList, DestFd) -> {ok, Dest} = open(DestFd), - ok = set_min_buffer(Dest, 0), - {ok, NewSp} = copy(Src, Sp, Len, Dest), - close(Dest), - {ok, NewSp}. - -copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) -> - copy(Fd, Sp, Len, DestStream); -copy(Fd, Sp, Len, DestStream) -> - {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK, - fun(Bin, AccPointer) -> - {ok, NewPointer} = write(DestStream, Bin), - {ok, if AccPointer == null -> NewPointer; true -> AccPointer end} - end, - null), - {ok, NewSp}. - -foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) -> - foldl(Fd, Sp, Num, Fun, Acc); -foldl(Fd, Sp, Num, Fun, Acc) -> - {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc). - -read_term(#stream{pid = _Pid, fd = Fd}, Sp) -> - read_term(Fd, Sp); -read_term(Fd, Sp) -> - {ok, <>, Sp2} - = read(Fd, Sp, ?STREAM_OFFSET_BYTES), - {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen), - {ok, binary_to_term(Bin)}. + foldl(Fd, PosList, + fun(Bin, _) -> + ok = write(Dest, Bin) + end, ok), + close(Dest). -write_term(Stream, Term) -> - Bin = term_to_binary(Term), - Size = size(Bin), - Bin2 = <>, - write(Stream, Bin2). -write(#stream{}, <<>>) -> - {ok, {0,0}}; -write(#stream{pid = Pid}, Bin) when is_binary(Bin) -> +% 09 UPGRADE CODE +old_copy_to_new_stream(Fd, Pos, Len, DestFd) -> + {ok, Dest} = open(DestFd), + old_foldl(Fd, Pos, Len, + fun(Bin, _) -> + ok = write(Dest, Bin) + end, ok), + close(Dest). + +% 09 UPGRADE CODE +old_foldl(_Fd, null, 0, _Fun, Acc) -> + Acc; +old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)-> + old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc). + +foldl(_Fd, [], _Fun, Acc) -> + Acc; +foldl(Fd, [Pos|Rest], Fun, Acc) -> + {ok, Bin} = couch_file:pread_iolist(Fd, Pos), + foldl(Fd, Rest, Fun, Fun(Bin, Acc)). + +write(_Pid, <<>>) -> + ok; +write(Pid, Bin) -> gen_server:call(Pid, {write, Bin}, infinity). -init({{Pos, BytesRemaining}, Fd}) -> - {ok, #write_stream - {fd = Fd, - current_pos = Pos, - bytes_remaining = BytesRemaining - }}. +init(Fd) -> + {ok, #stream{fd = Fd}}. terminate(_Reason, _Stream) -> ok. -handle_call(get_state, _From, Stream) -> - #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream, - {reply, {Pos, BytesRemaining}, Stream}; -handle_call({set_min_buffer, MinBuffer}, _From, Stream) -> - {reply, ok, Stream#write_stream{min_alloc = MinBuffer}}; -% set next_alloc if we need more room -handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) -> - #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream, - case BytesRemainingInCurrentBuffer < BufferSizeRequested of - true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer; - false -> NextAlloc = 0 % enough room in current segment - end, - {reply, ok, Stream#write_stream{next_alloc = NextAlloc}}; handle_call({write, Bin}, _From, Stream) -> - % ensure init is called first so we can get a pointer to the begining of the binary - {ok, Sp, Stream2} = write_data(Stream, Bin), - {reply, {ok, Sp}, Stream2}; + BinSize = iolist_size(Bin), + #stream{ + fd = Fd, + written_len = WrittenLen, + written_pointers = Written, + buffer_len = BufferLen, + buffer_list = Buffer, + max_buffer = Max} = Stream, + if BinSize + BufferLen > Max -> + {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, Bin)), + {reply, ok, Stream#stream{ + written_len=WrittenLen + BufferLen + BinSize, + written_pointers=[Pos|Written], + buffer_list=[], + buffer_len=0}}; + true -> + {reply, ok, Stream#stream{ + buffer_list=[Bin|Buffer], + buffer_len=BufferLen + BinSize}} + end; handle_call(close, _From, Stream) -> - #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream, - {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}. + #stream{ + fd = Fd, + written_len = WrittenLen, + written_pointers = Written, + buffer_len = BufferLen, + buffer_list = Buffer} = Stream, + + case Buffer of + [] -> + Result = {Written, WrittenLen}; + _ -> + {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)), + Result = {[Pos|Written], WrittenLen + BufferLen} + end, + {stop, normal, Result, Stream}. handle_cast(_Msg, State) -> {noreply,State}. @@ -160,14 +138,27 @@ code_change(_OldVsn, State, _Extra) -> handle_info(_Info, State) -> {noreply, State}. + + -%%% Internal function %%% +% 09 UPGRADE CODE +old_read_term(Fd, Sp) -> + {ok, <>, Sp2} + = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES), + {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen), + {ok, binary_to_term(Bin)}. -stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) -> +old_read(Fd, Sp, Num) -> + {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []), + Bin = list_to_binary(lists:reverse(RevBin)), + {ok, Bin, Sp2}. + +% 09 UPGRADE CODE +old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) -> {ok, Acc, Sp}; -stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> +old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> {ok, <>} - = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), + = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), Sp = {NextPos, NextOffset}, % Check NextPos is past current Pos (this is always true in a stream) % Guards against potential infinite loops caused by corruption. @@ -175,86 +166,47 @@ stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) -> true -> ok; false -> throw({error, stream_corruption}) end, - stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc); -stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) -> + old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc); +old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) -> ReadAmount = lists:min([MaxChunk, Num, Offset]), - {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount), + {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount), Sp = {Pos + ReadAmount, Offset - ReadAmount}, - case Fun(Bin, Acc) of - {ok, Acc2} -> - stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2); - {stop, Acc2} -> - {ok, Acc2, Sp} - end. - -write_data(Stream, <<>>) -> - {ok, {0,0}, Stream}; -write_data(#write_stream{bytes_remaining=0} = Stream, Bin) -> - #write_stream { - fd = Fd, - current_pos = CurrentPos, - next_alloc = NextAlloc, - min_alloc = MinAlloc - }= Stream, - - NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]), - % no space in the current segment, must alloc a new segment - {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES), - - case CurrentPos of - 0 -> - ok; - _ -> - ok = couch_file:pwrite(Fd, CurrentPos, <>) - end, - Stream2 = Stream#write_stream{ - current_pos=NewPos, - bytes_remaining=NewSize, - next_alloc=0}, - write_data(Stream2, Bin); -write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) -> - BytesToWrite = lists:min([size(Bin), BytesRemaining]), - {WriteBin, Rest} = split_binary(Bin, BytesToWrite), - ok = couch_file:pwrite(Fd, Pos, WriteBin), - Stream2 = Stream#write_stream{ - bytes_remaining=BytesRemaining - BytesToWrite, - current_pos=Pos + BytesToWrite - }, - {ok, _, Stream3} = write_data(Stream2, Rest), - {ok, {Pos, BytesRemaining}, Stream3}. + old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)). %%% Tests %%% - -test(Term) -> - {ok, Fd} = couch_file:open("foo", [write]), - {ok, Stream} = open({0,0}, Fd), - {ok, Pos} = write_term(Stream, Term), - {ok, Pos2} = write_term(Stream, {Term, Term}), - close(Stream), +read_all(Fd, PosList) -> + iolist_to_binary(foldl(Fd, PosList, + fun(Bin, Acc) -> + [Bin, Acc] + end, [])). + + +test() -> + {ok, Fd} = couch_file:open("foo", [create,overwrite]), + ok = couch_file:write_header(Fd, {howdy, howdy}), + Bin = <<"damienkatz">>, + {ok, Pos} = couch_file:append_binary(Fd, Bin), + {ok, Bin} = couch_file:pread_binary(Fd, Pos), + {ok, {howdy, howdy}} = couch_file:read_header(Fd), + ok = couch_file:write_header(Fd, {foo, foo}), + {ok, {foo, foo}} = couch_file:read_header(Fd), + + {ok, Stream} = open(Fd), + ok = write(Stream, <<"food">>), + ok = write(Stream, <<"foob">>), + {PosList, 8} = close(Stream), + <<"foodfoob">> = read_all(Fd, PosList), + {ok, Stream2} = open(Fd), + OneBits = <<1:(8*10)>>, + ZeroBits = <<0:(8*10)>>, + ok = write(Stream2, OneBits), + ok = write(Stream2, ZeroBits), + {PosList2, 20} = close(Stream2), + AllBits = iolist_to_binary([OneBits,ZeroBits]), + AllBits = read_all(Fd, PosList2), couch_file:close(Fd), - {ok, Fd2} = couch_file:open("foo", [read, write]), - {ok, Stream2} = open({0,0}, Fd2), - {ok, Term1} = read_term(Fd2, Pos), - io:format("Term1: ~w ~n",[Term1]), - {ok, Term2} = read_term(Fd2, Pos2), - io:format("Term2: ~w ~n",[Term2]), - {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []), - deep_read_test(Fd2, PointerList), - close(Stream2), - couch_file:close(Fd2). - -deep_read_test(_Fd, []) -> - ok; -deep_read_test(Fd, [Pointer | RestPointerList]) -> - {ok, _Term} = read_term(Fd, Pointer), - deep_read_test(Fd, RestPointerList). - -deep_write_test(_Stream, _Term, 0, PointerList) -> - {ok, PointerList}; -deep_write_test(Stream, Term, N, PointerList) -> - WriteList = lists:duplicate(random:uniform(N), Term), - {ok, Pointer} = write_term(Stream, WriteList), - deep_write_test(Stream, Term, N-1, [Pointer | PointerList]). + PosList2. + -- cgit v1.2.3