From 16ccd4c0b8ae4272fa27d32948658b1424a291fc Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Mon, 25 May 2009 19:52:28 +0000 Subject: Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_file.erl | 421 ++++++++++++++++++++++++++++++--------------- 1 file changed, 280 insertions(+), 141 deletions(-) (limited to 'src/couchdb/couch_file.erl') diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index 430aa6b7..c773be98 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -15,10 +15,16 @@ -include("couch_db.hrl"). --define(HEADER_SIZE, 2048). % size of each segment of the doubly written header +-define(SIZE_BLOCK, 4096). + +-record(file, { + fd, + tail_append_begin=0 % 09 UPGRADE CODE + }). --export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]). --export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]). +-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]). +-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]). +-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). %%---------------------------------------------------------------------- @@ -51,39 +57,6 @@ open(Filepath, Options) -> end. -%%---------------------------------------------------------------------- -%% Args: Pos is the offset from the beginning of the file, Bytes is -%% is the number of bytes to read. -%% Returns: {ok, Binary} where Binary is a binary data from disk -%% or {error, Reason}. -%%---------------------------------------------------------------------- - -pread(Fd, Pos, Bytes) when Bytes > 0 -> - gen_server:call(Fd, {pread, Pos, Bytes}, infinity). - - -%%---------------------------------------------------------------------- -%% Args: Pos is the offset from the beginning of the file, Bin is -%% is the binary to write -%% Returns: ok -%% or {error, Reason}. -%%---------------------------------------------------------------------- - -pwrite(Fd, Pos, Bin) -> - gen_server:call(Fd, {pwrite, Pos, Bin}, infinity). - -%%---------------------------------------------------------------------- -%% Purpose: To append a segment of zeros to the end of the file. -%% Args: Bytes is the number of bytes to append to the file. -%% Returns: {ok, Pos} where Pos is the file offset to the beginning of -%% the new segments. -%% or {error, Reason}. -%%---------------------------------------------------------------------- - -expand(Fd, Bytes) when Bytes > 0 -> - gen_server:call(Fd, {expand, Bytes}, infinity). - - %%---------------------------------------------------------------------- %% Purpose: To append an Erlang term to the end of the file. %% Args: Erlang term to serialize and append to the file. @@ -93,7 +66,7 @@ expand(Fd, Bytes) when Bytes > 0 -> %%---------------------------------------------------------------------- append_term(Fd, Term) -> - append_binary(Fd, term_to_binary(Term, [compressed])). + append_binary(Fd, term_to_binary(Term)). %%---------------------------------------------------------------------- @@ -105,7 +78,8 @@ append_term(Fd, Term) -> %%---------------------------------------------------------------------- append_binary(Fd, Bin) -> - gen_server:call(Fd, {append_bin, Bin}, infinity). + Size = iolist_size(Bin), + gen_server:call(Fd, {append_bin, [<>, Bin]}, infinity). %%---------------------------------------------------------------------- @@ -115,10 +89,12 @@ append_binary(Fd, Bin) -> %% or {error, Reason}. %%---------------------------------------------------------------------- + pread_term(Fd, Pos) -> {ok, Bin} = pread_binary(Fd, Pos), {ok, binary_to_term(Bin)}. + %%---------------------------------------------------------------------- %% Purpose: Reads a binrary from a file that was written with append_binary %% Args: Pos, the offset into the file where the term is serialized. @@ -127,8 +103,26 @@ pread_term(Fd, Pos) -> %%---------------------------------------------------------------------- pread_binary(Fd, Pos) -> - gen_server:call(Fd, {pread_bin, Pos}, infinity). - + {ok, L} = pread_iolist(Fd, Pos), + {ok, iolist_to_binary(L)}. + +pread_iolist(Fd, Pos) -> + {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4), + <> = iolist_to_binary(LenIolist), + {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len), + {ok, Iolist}. + +read_raw_iolist(Fd, Pos, Len) -> + BlockOffset = Pos rem ?SIZE_BLOCK, + TotalBytes = calculate_total_read_len(BlockOffset, Len), + {ok, <>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity), + if HasPrefixes -> + {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}; + true -> + % 09 UPGRADE CODE + <> = RawBin, + {ok, [ReturnBin], Pos + Len} + end. %%---------------------------------------------------------------------- %% Purpose: The length of a file, in bytes. @@ -167,35 +161,153 @@ close(Fd) -> catch unlink(Fd), Result. +% 09 UPGRADE CODE +old_pread(Fd, Pos, Len) -> + {ok, <>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity), + {ok, RawBin}. -write_header(Fd, Prefix, Data) -> - TermBin = term_to_binary(Data), - % the size of all the bytes written to the header, including the md5 signature (16 bytes) - FilledSize = size(Prefix) + size(TermBin) + 16, - {TermBin2, FilledSize2} = - case FilledSize > ?HEADER_SIZE of +% 09 UPGRADE CODE +upgrade_old_header(Fd, Sig) -> + gen_server:call(Fd, {upgrade_old_header, Sig}, infinity). + + +read_header(Fd) -> + case gen_server:call(Fd, find_header, infinity) of + {ok, Bin} -> + {ok, binary_to_term(Bin)}; + Else -> + Else + end. + +write_header(Fd, Data) -> + Bin = term_to_binary(Data), + Md5 = erlang:md5(Bin), + % now we assemble the final header binary and write to disk + FinalBin = <>, + gen_server:call(Fd, {write_header, FinalBin}, infinity). + + + + +init_status_error(ReturnPid, Ref, Error) -> + ReturnPid ! {Ref, self(), Error}, + ignore. + +% server functions + +init({Filepath, Options, ReturnPid, Ref}) -> + case lists:member(create, Options) of true -> - % too big! - {ok, Pos} = append_binary(Fd, TermBin), - PtrBin = term_to_binary({pointer_to_header_data, Pos}), - {PtrBin, size(Prefix) + size(PtrBin) + 16}; + filelib:ensure_dir(Filepath), + case file:open(Filepath, [read, write, raw, binary]) of + {ok, Fd} -> + {ok, Length} = file:position(Fd, eof), + case Length > 0 of + true -> + % this means the file already exists and has data. + % FYI: We don't differentiate between empty files and non-existant + % files here. + case lists:member(overwrite, Options) of + true -> + {ok, 0} = file:position(Fd, 0), + ok = file:truncate(Fd), + ok = file:sync(Fd), + couch_stats_collector:track_process_count( + {couchdb, open_os_files}), + {ok, #file{fd=Fd}}; + false -> + ok = file:close(Fd), + init_status_error(ReturnPid, Ref, file_exists) + end; + false -> + couch_stats_collector:track_process_count( + {couchdb, open_os_files}), + {ok, #file{fd=Fd}} + end; + Error -> + init_status_error(ReturnPid, Ref, Error) + end; false -> - {TermBin, FilledSize} + % open in read mode first, so we don't create the file if it doesn't exist. + case file:open(Filepath, [read, raw]) of + {ok, Fd_Read} -> + {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), + ok = file:close(Fd_Read), + couch_stats_collector:track_process_count({couchdb, open_os_files}), + {ok, #file{fd=Fd}}; + Error -> + init_status_error(ReturnPid, Ref, Error) + end + end. + + +terminate(_Reason, _Fd) -> + ok. + + +handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) -> + {ok, Bin} = file:pread(Fd, Pos, Bytes), + {reply, {ok, Bin, Pos >= TailAppendBegin}, File}; +handle_call(bytes, _From, #file{fd=Fd}=File) -> + {reply, file:position(Fd, eof), File}; +handle_call(sync, _From, #file{fd=Fd}=File) -> + {reply, file:sync(Fd), File}; +handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) -> + {ok, Pos} = file:position(Fd, Pos), + {reply, file:truncate(Fd), File}; +handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) -> + {ok, Pos} = file:position(Fd, eof), + Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin), + case file:pwrite(Fd, Pos, Blocks) of + ok -> + {reply, {ok, Pos}, File}; + Error -> + {reply, Error, File} + end; +handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) -> + {ok, Pos} = file:position(Fd, eof), + BinSize = size(Bin), + case Pos rem ?SIZE_BLOCK of + 0 -> + Padding = <<>>; + BlockOffset -> + Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>> end, - ok = sync(Fd), - % pad out the header with zeros, then take the md5 hash - PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>, - Sig = erlang:md5([TermBin2, PadZeros]), - % now we assemble the final header binary and write to disk - WriteBin = <>, - ?HEADER_SIZE = size(WriteBin), % sanity check - DblWriteBin = [WriteBin, WriteBin], - ok = pwrite(Fd, 0, DblWriteBin), - ok = sync(Fd). + FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])], + {reply, file:pwrite(Fd, Pos, FinalBin), File}; + + +handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) -> + case (catch read_old_header(Fd, Prefix)) of + {ok, Header} -> + {ok, TailAppendBegin} = file:position(Fd, eof), + Bin = term_to_binary(Header), + Md5 = erlang:md5(Bin), + % now we assemble the final header binary and write to disk + FinalBin = <>, + {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File), + ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin), + {reply, ok, File#file{tail_append_begin=TailAppendBegin}}; + _Error -> + case (catch read_old_header(Fd, <<"upgraded">>)) of + {ok, TailAppendBegin} -> + {reply, ok, File#file{tail_append_begin = TailAppendBegin}}; + _Error2 -> + {reply, ok, File} + end + end; + +handle_call(find_header, _From, #file{fd=Fd}=File) -> + {ok, Pos} = file:position(Fd, eof), + {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}. + +% 09 UPGRADE CODE +-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header -read_header(Fd, Prefix) -> - {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)), +% 09 UPGRADE CODE +read_old_header(Fd, Prefix) -> + {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)), <> = Bin, Result = % read the first header @@ -238,6 +350,7 @@ read_header(Fd, Prefix) -> Result end. +% 09 UPGRADE CODE extract_header(Prefix, Bin) -> SizeOfPrefix = size(Prefix), SizeOfTermBin = ?HEADER_SIZE - @@ -260,88 +373,35 @@ extract_header(Prefix, Bin) -> _ -> unknown_header_type end. + - -init_status_error(ReturnPid, Ref, Error) -> - ReturnPid ! {Ref, self(), Error}, - ignore. - -% server functions - -init({Filepath, Options, ReturnPid, Ref}) -> - case lists:member(create, Options) of +% 09 UPGRADE CODE +write_old_header(Fd, Prefix, Data) -> + TermBin = term_to_binary(Data), + % the size of all the bytes written to the header, including the md5 signature (16 bytes) + FilledSize = size(Prefix) + size(TermBin) + 16, + {TermBin2, FilledSize2} = + case FilledSize > ?HEADER_SIZE of true -> - filelib:ensure_dir(Filepath), - case file:open(Filepath, [read, write, raw, binary]) of - {ok, Fd} -> - {ok, Length} = file:position(Fd, eof), - case Length > 0 of - true -> - % this means the file already exists and has data. - % FYI: We don't differentiate between empty files and non-existant - % files here. - case lists:member(overwrite, Options) of - true -> - {ok, 0} = file:position(Fd, 0), - ok = file:truncate(Fd), - couch_stats_collector:track_process_count( - {couchdb, open_os_files}), - {ok, Fd}; - false -> - ok = file:close(Fd), - init_status_error(ReturnPid, Ref, file_exists) - end; - false -> - couch_stats_collector:track_process_count( - {couchdb, open_os_files}), - {ok, Fd} - end; - Error -> - init_status_error(ReturnPid, Ref, Error) - end; + % too big! + {ok, Pos} = append_binary(Fd, TermBin), + PtrBin = term_to_binary({pointer_to_header_data, Pos}), + {PtrBin, size(Prefix) + size(PtrBin) + 16}; false -> - % open in read mode first, so we don't create the file if it doesn't exist. - case file:open(Filepath, [read, raw]) of - {ok, Fd_Read} -> - {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), - ok = file:close(Fd_Read), - couch_stats_collector:track_process_count({couchdb, open_os_files}), - {ok, Fd}; - Error -> - init_status_error(ReturnPid, Ref, Error) - end - end. - - -terminate(_Reason, _Fd) -> - ok. - - -handle_call({pread, Pos, Bytes}, _From, Fd) -> - {reply, file:pread(Fd, Pos, Bytes), Fd}; -handle_call({pwrite, Pos, Bin}, _From, Fd) -> - {reply, file:pwrite(Fd, Pos, Bin), Fd}; -handle_call({expand, Num}, _From, Fd) -> - {ok, Pos} = file:position(Fd, eof), - {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd}; -handle_call(bytes, _From, Fd) -> - {reply, file:position(Fd, eof), Fd}; -handle_call(sync, _From, Fd) -> - {reply, file:sync(Fd), Fd}; -handle_call({truncate, Pos}, _From, Fd) -> - {ok, Pos} = file:position(Fd, Pos), - {reply, file:truncate(Fd), Fd}; -handle_call({append_bin, Bin}, _From, Fd) -> - Len = size(Bin), - Bin2 = <>, - {ok, Pos} = file:position(Fd, eof), - {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd}; -handle_call({pread_bin, Pos}, _From, Fd) -> - {ok, <>} = file:pread(Fd, Pos, 4), - {ok, Bin} = file:pread(Fd, Pos + 4, TermLen), - {reply, {ok, Bin}, Fd}. - + {TermBin, FilledSize} + end, + ok = file:sync(Fd), + % pad out the header with zeros, then take the md5 hash + PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>, + Sig = erlang:md5([TermBin2, PadZeros]), + % now we assemble the final header binary and write to disk + WriteBin = <>, + ?HEADER_SIZE = size(WriteBin), % sanity check + DblWriteBin = [WriteBin, WriteBin], + ok = file:pwrite(Fd, 0, DblWriteBin), + ok = file:sync(Fd). + handle_cast(close, Fd) -> {stop,normal,Fd}. @@ -351,3 +411,82 @@ code_change(_OldVsn, State, _Extra) -> handle_info({'EXIT', _, Reason}, Fd) -> {stop, Reason, Fd}. + + +find_header(_Fd, -1) -> + no_valid_header; +find_header(Fd, Block) -> + case (catch load_header(Fd, Block)) of + {ok, Bin} -> + {ok, Bin}; + _Error -> + find_header(Fd, Block -1) + end. + +load_header(Fd, Block) -> + {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1), + {ok, <>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4), + TotalBytes = calculate_total_read_len(1, HeaderLen), + {ok, <>} = + file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes), + <> = + iolist_to_binary(remove_block_prefixes(1, RawBin)), + Md5Sig = erlang:md5(HeaderBin), + {ok, HeaderBin}. + +calculate_total_read_len(0, FinalLen) -> + calculate_total_read_len(1, FinalLen) + 1; +calculate_total_read_len(BlockOffset, FinalLen) -> + case ?SIZE_BLOCK - BlockOffset of + BlockLeft when BlockLeft >= FinalLen -> + FinalLen; + BlockLeft -> + FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) + + if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0; + true -> 1 end + end. + +remove_block_prefixes(_BlockOffset, <<>>) -> + []; +remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) -> + remove_block_prefixes(1, Rest); +remove_block_prefixes(BlockOffset, Bin) -> + BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset, + case size(Bin) of + Size when Size > BlockBytesAvailable -> + <> = Bin, + [DataBlock | remove_block_prefixes(0, Rest)]; + _Size -> + [Bin] + end. + +make_blocks(_BlockOffset, []) -> + []; +make_blocks(0, IoList) -> + [<<0>> | make_blocks(1, IoList)]; +make_blocks(BlockOffset, IoList) -> + case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of + {Begin, End} -> + [Begin | make_blocks(0, End)]; + _Size -> + IoList + end. + +split_iolist(List, 0, BeginAcc) -> + {lists:reverse(BeginAcc), List}; +split_iolist([], SplitAt, _BeginAcc) -> + SplitAt; +split_iolist([<> | Rest], SplitAt, BeginAcc) when SplitAt > size(Bin) -> + split_iolist(Rest, SplitAt - size(Bin), [Bin | BeginAcc]); +split_iolist([<> | Rest], SplitAt, BeginAcc) -> + <> = Bin, + split_iolist([End | Rest], 0, [Begin | BeginAcc]); +split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) -> + case split_iolist(Sublist, SplitAt, BeginAcc) of + {Begin, End} -> + {Begin, [End | Rest]}; + Len -> + split_iolist(Rest, SplitAt - Len, [Sublist | BeginAcc]) + end; +split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) -> + split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]). -- cgit v1.2.3