diff options
Diffstat (limited to 'src/couchdb/couch_file.erl')
| -rw-r--r-- | src/couchdb/couch_file.erl | 228 |
1 files changed, 162 insertions, 66 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index 5904260c..56ebc4f2 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -19,7 +19,8 @@ -record(file, { fd, - tail_append_begin=0 % 09 UPGRADE CODE + tail_append_begin = 0, % 09 UPGRADE CODE + eof = 0 }). -export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]). @@ -27,6 +28,7 @@ -export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]). -export([append_term_md5/2,append_binary_md5/2]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). +-export([delete/2,delete/3,init_delete_dir/1]). %%---------------------------------------------------------------------- %% Args: Valid Options are [create] and [create,overwrite]. @@ -88,8 +90,8 @@ append_binary(Fd, Bin) -> append_binary_md5(Fd, Bin) -> Size = iolist_size(Bin), - gen_server:call(Fd, {append_bin, - [<<1:1/integer,Size:31/integer>>, erlang:md5(Bin), Bin]}, infinity). + gen_server:call(Fd, {append_bin, + [<<1:1/integer,Size:31/integer>>, couch_util:md5(Bin), Bin]}, infinity). %%---------------------------------------------------------------------- @@ -118,33 +120,18 @@ pread_binary(Fd, Pos) -> pread_iolist(Fd, Pos) -> - {ok, LenIolist, NextPos} = read_raw_iolist(Fd, Pos, 4), - case iolist_to_binary(LenIolist) of - <<1:1/integer,Len:31/integer>> -> - {ok, Md5List, ValPos} = read_raw_iolist(Fd, NextPos, 16), - Md5 = iolist_to_binary(Md5List), - {ok, IoList, _} = read_raw_iolist(Fd,ValPos,Len), - case erlang:md5(IoList) of - Md5 -> ok; - _ -> throw(file_corruption) - end, + case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of + {ok, IoList, <<>>} -> {ok, IoList}; - <<0:1/integer,Len:31/integer>> -> - {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len), - {ok, Iolist} - end. - - -read_raw_iolist(Fd, Pos, Len) -> - BlockOffset = Pos rem ?SIZE_BLOCK, - TotalBytes = calculate_total_read_len(BlockOffset, Len), - {ok, <<RawBin:TotalBytes/binary>>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity), - if HasPrefixes -> - {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}; - true -> - % 09 UPGRADE CODE - <<ReturnBin:Len/binary, _/binary>> = RawBin, - {ok, [ReturnBin], Pos + Len} + {ok, IoList, Md5} -> + case couch_util:md5(IoList) of + Md5 -> + {ok, IoList}; + _ -> + exit({file_corruption, <<"file corruption">>}) + end; + Error -> + Error end. %%---------------------------------------------------------------------- @@ -172,17 +159,49 @@ truncate(Fd, Pos) -> %% or {error, Reason}. %%---------------------------------------------------------------------- +sync(Filepath) when is_list(Filepath) -> + {ok, Fd} = file:open(Filepath, [append, raw]), + try file:sync(Fd) after file:close(Fd) end; sync(Fd) -> gen_server:call(Fd, sync, infinity). %%---------------------------------------------------------------------- -%% Purpose: Close the file. Is performed asynchronously. +%% Purpose: Close the file. %% Returns: ok %%---------------------------------------------------------------------- close(Fd) -> - Result = gen_server:cast(Fd, close), - catch unlink(Fd), - Result. + couch_util:shutdown_sync(Fd). + + +delete(RootDir, Filepath) -> + delete(RootDir, Filepath, true). + + +delete(RootDir, Filepath, Async) -> + DelFile = filename:join([RootDir,".delete", ?b2l(couch_uuids:random())]), + case file:rename(Filepath, DelFile) of + ok -> + if (Async) -> + spawn(file, delete, [DelFile]), + ok; + true -> + file:delete(DelFile) + end; + Error -> + Error + end. + + +init_delete_dir(RootDir) -> + Dir = filename:join(RootDir,".delete"), + % note: ensure_dir requires an actual filename companent, which is the + % reason for "foo". + filelib:ensure_dir(filename:join(Dir,"foo")), + filelib:fold_files(Dir, ".*", true, + fun(Filename, _) -> + ok = file:delete(Filename) + end, ok). + % 09 UPGRADE CODE old_pread(Fd, Pos, Len) -> @@ -204,7 +223,7 @@ read_header(Fd) -> write_header(Fd, Data) -> Bin = term_to_binary(Data), - Md5 = erlang:md5(Bin), + Md5 = couch_util:md5(Bin), % now we assemble the final header binary and write to disk FinalBin = <<Md5/binary, Bin/binary>>, gen_server:call(Fd, {write_header, FinalBin}, infinity). @@ -219,10 +238,11 @@ init_status_error(ReturnPid, Ref, Error) -> % server functions init({Filepath, Options, ReturnPid, Ref}) -> + process_flag(trap_exit, true), case lists:member(create, Options) of true -> filelib:ensure_dir(Filepath), - case file:open(Filepath, [read, write, raw, binary]) of + case file:open(Filepath, [read, append, raw, binary]) of {ok, Fd} -> {ok, Length} = file:position(Fd, eof), case Length > 0 of @@ -235,16 +255,14 @@ init({Filepath, Options, ReturnPid, Ref}) -> {ok, 0} = file:position(Fd, 0), ok = file:truncate(Fd), ok = file:sync(Fd), - couch_stats_collector:track_process_count( - {couchdb, open_os_files}), + maybe_track_open_os_files(Options), {ok, #file{fd=Fd}}; false -> ok = file:close(Fd), init_status_error(ReturnPid, Ref, file_exists) end; false -> - couch_stats_collector:track_process_count( - {couchdb, open_os_files}), + maybe_track_open_os_files(Options), {ok, #file{fd=Fd}} end; Error -> @@ -254,41 +272,71 @@ init({Filepath, Options, ReturnPid, Ref}) -> % open in read mode first, so we don't create the file if it doesn't exist. case file:open(Filepath, [read, raw]) of {ok, Fd_Read} -> - {ok, Fd} = file:open(Filepath, [read, write, raw, binary]), + {ok, Fd} = file:open(Filepath, [read, append, raw, binary]), ok = file:close(Fd_Read), - couch_stats_collector:track_process_count({couchdb, open_os_files}), - {ok, #file{fd=Fd}}; + maybe_track_open_os_files(Options), + {ok, Length} = file:position(Fd, eof), + {ok, #file{fd=Fd, eof=Length}}; Error -> init_status_error(ReturnPid, Ref, Error) end end. +maybe_track_open_os_files(FileOptions) -> + case lists:member(sys_db, FileOptions) of + true -> + ok; + false -> + couch_stats_collector:track_process_count({couchdb, open_os_files}) + end. -terminate(_Reason, _Fd) -> - ok. +terminate(_Reason, #file{fd = Fd}) -> + ok = file:close(Fd). +handle_call({pread_iolist, Pos}, _From, File) -> + {RawData, NextPos} = try + % up to 8Kbs of read ahead + read_raw_iolist_int(File, Pos, 2 * ?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK)) + catch + _:_ -> + read_raw_iolist_int(File, Pos, 4) + end, + <<Prefix:1/integer, Len:31/integer, RestRawData/binary>> = + iolist_to_binary(RawData), + case Prefix of + 1 -> + {Md5, IoList} = extract_md5( + maybe_read_more_iolist(RestRawData, 16 + Len, NextPos, File)), + {reply, {ok, IoList, Md5}, File}; + 0 -> + IoList = maybe_read_more_iolist(RestRawData, Len, NextPos, File), + {reply, {ok, IoList, <<>>}, File} + end; handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) -> {ok, Bin} = file:pread(Fd, Pos, Bytes), {reply, {ok, Bin, Pos >= TailAppendBegin}, File}; -handle_call(bytes, _From, #file{fd=Fd}=File) -> - {reply, file:position(Fd, eof), File}; +handle_call(bytes, _From, #file{eof=Length}=File) -> + {reply, {ok, Length}, File}; handle_call(sync, _From, #file{fd=Fd}=File) -> {reply, file:sync(Fd), File}; handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) -> {ok, Pos} = file:position(Fd, Pos), - {reply, file:truncate(Fd), File}; -handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) -> - {ok, Pos} = file:position(Fd, eof), + case file:truncate(Fd) of + ok -> + {reply, ok, File#file{eof=Pos}}; + Error -> + {reply, Error, File} + end; +handle_call({append_bin, Bin}, _From, #file{fd=Fd, eof=Pos}=File) -> Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin), - case file:pwrite(Fd, Pos, Blocks) of + case file:write(Fd, Blocks) of ok -> - {reply, {ok, Pos}, File}; + {reply, {ok, Pos}, File#file{eof=Pos+iolist_size(Blocks)}}; Error -> {reply, Error, File} end; -handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) -> - {ok, Pos} = file:position(Fd, eof), +handle_call({write_header, Bin}, _From, #file{fd=Fd, eof=Pos}=File) -> BinSize = size(Bin), case Pos rem ?SIZE_BLOCK of 0 -> @@ -296,16 +344,21 @@ handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) -> BlockOffset -> Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>> end, - FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])], - {reply, file:pwrite(Fd, Pos, FinalBin), File}; + FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])], + case file:write(Fd, FinalBin) of + ok -> + {reply, ok, File#file{eof=Pos+iolist_size(FinalBin)}}; + Error -> + {reply, Error, File} + end; handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) -> case (catch read_old_header(Fd, Prefix)) of {ok, Header} -> - {ok, TailAppendBegin} = file:position(Fd, eof), + TailAppendBegin = File#file.eof, Bin = term_to_binary(Header), - Md5 = erlang:md5(Bin), + Md5 = couch_util:md5(Bin), % now we assemble the final header binary and write to disk FinalBin = <<Md5/binary, Bin/binary>>, {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File), @@ -321,8 +374,7 @@ handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) -> end; -handle_call(find_header, _From, #file{fd=Fd}=File) -> - {ok, Pos} = file:position(Fd, eof), +handle_call(find_header, _From, #file{fd=Fd, eof=Pos}=File) -> {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}. % 09 UPGRADE CODE @@ -386,7 +438,7 @@ extract_header(Prefix, Bin) -> case HeaderPrefix of Prefix -> % check the integrity signature - case erlang:md5(TermBin) == Sig of + case couch_util:md5(TermBin) == Sig of true -> Header = binary_to_term(TermBin), {ok, Header}; @@ -416,7 +468,7 @@ write_old_header(Fd, Prefix, Data) -> ok = file:sync(Fd), % pad out the header with zeros, then take the md5 hash PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>, - Sig = erlang:md5([TermBin2, PadZeros]), + Sig = couch_util:md5([TermBin2, PadZeros]), % now we assemble the final header binary and write to disk WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>, ?HEADER_SIZE = size(WriteBin), % sanity check @@ -432,6 +484,8 @@ handle_cast(close, Fd) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_info({'EXIT', _, normal}, Fd) -> + {noreply, Fd}; handle_info({'EXIT', _, Reason}, Fd) -> {stop, Reason, Fd}. @@ -447,16 +501,53 @@ find_header(Fd, Block) -> end. load_header(Fd, Block) -> - {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1), - {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4), + {ok, <<1, HeaderLen:32/integer, RestBlock/binary>>} = + file:pread(Fd, Block * ?SIZE_BLOCK, ?SIZE_BLOCK), TotalBytes = calculate_total_read_len(1, HeaderLen), - {ok, <<RawBin:TotalBytes/binary>>} = - file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes), + case TotalBytes > byte_size(RestBlock) of + false -> + <<RawBin:TotalBytes/binary, _/binary>> = RestBlock; + true -> + {ok, Missing} = file:pread( + Fd, (Block * ?SIZE_BLOCK) + 5 + byte_size(RestBlock), + TotalBytes - byte_size(RestBlock)), + RawBin = <<RestBlock/binary, Missing/binary>> + end, <<Md5Sig:16/binary, HeaderBin/binary>> = iolist_to_binary(remove_block_prefixes(1, RawBin)), - Md5Sig = erlang:md5(HeaderBin), + Md5Sig = couch_util:md5(HeaderBin), {ok, HeaderBin}. +maybe_read_more_iolist(Buffer, DataSize, _, _) + when DataSize =< byte_size(Buffer) -> + <<Data:DataSize/binary, _/binary>> = Buffer, + [Data]; +maybe_read_more_iolist(Buffer, DataSize, NextPos, File) -> + {Missing, _} = + read_raw_iolist_int(File, NextPos, DataSize - byte_size(Buffer)), + [Buffer, Missing]. + +-spec read_raw_iolist_int(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) -> + {Data::iolist(), CurPos::non_neg_integer()}. +read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE + read_raw_iolist_int(Fd, Pos, Len); +read_raw_iolist_int(#file{fd=Fd, tail_append_begin=TAB}, Pos, Len) -> + BlockOffset = Pos rem ?SIZE_BLOCK, + TotalBytes = calculate_total_read_len(BlockOffset, Len), + {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes), + if Pos >= TAB -> + {remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}; + true -> + % 09 UPGRADE CODE + <<ReturnBin:Len/binary, _/binary>> = RawBin, + {[ReturnBin], Pos + Len} + end. + +-spec extract_md5(iolist()) -> {binary(), iolist()}. +extract_md5(FullIoList) -> + {Md5List, IoList} = split_iolist(FullIoList, 16, []), + {iolist_to_binary(Md5List), IoList}. + calculate_total_read_len(0, FinalLen) -> calculate_total_read_len(1, FinalLen) + 1; calculate_total_read_len(BlockOffset, FinalLen) -> @@ -495,6 +586,11 @@ make_blocks(BlockOffset, IoList) -> IoList end. +%% @doc Returns a tuple where the first element contains the leading SplitAt +%% bytes of the original iolist, and the 2nd element is the tail. If SplitAt +%% is larger than byte_size(IoList), return the difference. +-spec split_iolist(IoList::iolist(), SplitAt::non_neg_integer(), Acc::list()) -> + {iolist(), iolist()} | non_neg_integer(). split_iolist(List, 0, BeginAcc) -> {lists:reverse(BeginAcc), List}; split_iolist([], SplitAt, _BeginAcc) -> |
