summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_file.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_file.erl')
-rw-r--r--src/couchdb/couch_file.erl228
1 files changed, 162 insertions, 66 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 5904260c..56ebc4f2 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -19,7 +19,8 @@
-record(file, {
fd,
- tail_append_begin=0 % 09 UPGRADE CODE
+ tail_append_begin = 0, % 09 UPGRADE CODE
+ eof = 0
}).
-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
@@ -27,6 +28,7 @@
-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
-export([append_term_md5/2,append_binary_md5/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+-export([delete/2,delete/3,init_delete_dir/1]).
%%----------------------------------------------------------------------
%% Args: Valid Options are [create] and [create,overwrite].
@@ -88,8 +90,8 @@ append_binary(Fd, Bin) ->
append_binary_md5(Fd, Bin) ->
Size = iolist_size(Bin),
- gen_server:call(Fd, {append_bin,
- [<<1:1/integer,Size:31/integer>>, erlang:md5(Bin), Bin]}, infinity).
+ gen_server:call(Fd, {append_bin,
+ [<<1:1/integer,Size:31/integer>>, couch_util:md5(Bin), Bin]}, infinity).
%%----------------------------------------------------------------------
@@ -118,33 +120,18 @@ pread_binary(Fd, Pos) ->
pread_iolist(Fd, Pos) ->
- {ok, LenIolist, NextPos} = read_raw_iolist(Fd, Pos, 4),
- case iolist_to_binary(LenIolist) of
- <<1:1/integer,Len:31/integer>> ->
- {ok, Md5List, ValPos} = read_raw_iolist(Fd, NextPos, 16),
- Md5 = iolist_to_binary(Md5List),
- {ok, IoList, _} = read_raw_iolist(Fd,ValPos,Len),
- case erlang:md5(IoList) of
- Md5 -> ok;
- _ -> throw(file_corruption)
- end,
+ case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of
+ {ok, IoList, <<>>} ->
{ok, IoList};
- <<0:1/integer,Len:31/integer>> ->
- {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
- {ok, Iolist}
- end.
-
-
-read_raw_iolist(Fd, Pos, Len) ->
- BlockOffset = Pos rem ?SIZE_BLOCK,
- TotalBytes = calculate_total_read_len(BlockOffset, Len),
- {ok, <<RawBin:TotalBytes/binary>>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
- if HasPrefixes ->
- {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
- true ->
- % 09 UPGRADE CODE
- <<ReturnBin:Len/binary, _/binary>> = RawBin,
- {ok, [ReturnBin], Pos + Len}
+ {ok, IoList, Md5} ->
+ case couch_util:md5(IoList) of
+ Md5 ->
+ {ok, IoList};
+ _ ->
+ exit({file_corruption, <<"file corruption">>})
+ end;
+ Error ->
+ Error
end.
%%----------------------------------------------------------------------
@@ -172,17 +159,49 @@ truncate(Fd, Pos) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
+sync(Filepath) when is_list(Filepath) ->
+ {ok, Fd} = file:open(Filepath, [append, raw]),
+ try file:sync(Fd) after file:close(Fd) end;
sync(Fd) ->
gen_server:call(Fd, sync, infinity).
%%----------------------------------------------------------------------
-%% Purpose: Close the file. Is performed asynchronously.
+%% Purpose: Close the file.
%% Returns: ok
%%----------------------------------------------------------------------
close(Fd) ->
- Result = gen_server:cast(Fd, close),
- catch unlink(Fd),
- Result.
+ couch_util:shutdown_sync(Fd).
+
+
+delete(RootDir, Filepath) ->
+ delete(RootDir, Filepath, true).
+
+
+delete(RootDir, Filepath, Async) ->
+ DelFile = filename:join([RootDir,".delete", ?b2l(couch_uuids:random())]),
+ case file:rename(Filepath, DelFile) of
+ ok ->
+ if (Async) ->
+ spawn(file, delete, [DelFile]),
+ ok;
+ true ->
+ file:delete(DelFile)
+ end;
+ Error ->
+ Error
+ end.
+
+
+init_delete_dir(RootDir) ->
+ Dir = filename:join(RootDir,".delete"),
+ % note: ensure_dir requires an actual filename companent, which is the
+ % reason for "foo".
+ filelib:ensure_dir(filename:join(Dir,"foo")),
+ filelib:fold_files(Dir, ".*", true,
+ fun(Filename, _) ->
+ ok = file:delete(Filename)
+ end, ok).
+
% 09 UPGRADE CODE
old_pread(Fd, Pos, Len) ->
@@ -204,7 +223,7 @@ read_header(Fd) ->
write_header(Fd, Data) ->
Bin = term_to_binary(Data),
- Md5 = erlang:md5(Bin),
+ Md5 = couch_util:md5(Bin),
% now we assemble the final header binary and write to disk
FinalBin = <<Md5/binary, Bin/binary>>,
gen_server:call(Fd, {write_header, FinalBin}, infinity).
@@ -219,10 +238,11 @@ init_status_error(ReturnPid, Ref, Error) ->
% server functions
init({Filepath, Options, ReturnPid, Ref}) ->
+ process_flag(trap_exit, true),
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
- case file:open(Filepath, [read, write, raw, binary]) of
+ case file:open(Filepath, [read, append, raw, binary]) of
{ok, Fd} ->
{ok, Length} = file:position(Fd, eof),
case Length > 0 of
@@ -235,16 +255,14 @@ init({Filepath, Options, ReturnPid, Ref}) ->
{ok, 0} = file:position(Fd, 0),
ok = file:truncate(Fd),
ok = file:sync(Fd),
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
+ maybe_track_open_os_files(Options),
{ok, #file{fd=Fd}};
false ->
ok = file:close(Fd),
init_status_error(ReturnPid, Ref, file_exists)
end;
false ->
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
+ maybe_track_open_os_files(Options),
{ok, #file{fd=Fd}}
end;
Error ->
@@ -254,41 +272,71 @@ init({Filepath, Options, ReturnPid, Ref}) ->
% open in read mode first, so we don't create the file if it doesn't exist.
case file:open(Filepath, [read, raw]) of
{ok, Fd_Read} ->
- {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+ {ok, Fd} = file:open(Filepath, [read, append, raw, binary]),
ok = file:close(Fd_Read),
- couch_stats_collector:track_process_count({couchdb, open_os_files}),
- {ok, #file{fd=Fd}};
+ maybe_track_open_os_files(Options),
+ {ok, Length} = file:position(Fd, eof),
+ {ok, #file{fd=Fd, eof=Length}};
Error ->
init_status_error(ReturnPid, Ref, Error)
end
end.
+maybe_track_open_os_files(FileOptions) ->
+ case lists:member(sys_db, FileOptions) of
+ true ->
+ ok;
+ false ->
+ couch_stats_collector:track_process_count({couchdb, open_os_files})
+ end.
-terminate(_Reason, _Fd) ->
- ok.
+terminate(_Reason, #file{fd = Fd}) ->
+ ok = file:close(Fd).
+handle_call({pread_iolist, Pos}, _From, File) ->
+ {RawData, NextPos} = try
+ % up to 8Kbs of read ahead
+ read_raw_iolist_int(File, Pos, 2 * ?SIZE_BLOCK - (Pos rem ?SIZE_BLOCK))
+ catch
+ _:_ ->
+ read_raw_iolist_int(File, Pos, 4)
+ end,
+ <<Prefix:1/integer, Len:31/integer, RestRawData/binary>> =
+ iolist_to_binary(RawData),
+ case Prefix of
+ 1 ->
+ {Md5, IoList} = extract_md5(
+ maybe_read_more_iolist(RestRawData, 16 + Len, NextPos, File)),
+ {reply, {ok, IoList, Md5}, File};
+ 0 ->
+ IoList = maybe_read_more_iolist(RestRawData, Len, NextPos, File),
+ {reply, {ok, IoList, <<>>}, File}
+ end;
handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
{ok, Bin} = file:pread(Fd, Pos, Bytes),
{reply, {ok, Bin, Pos >= TailAppendBegin}, File};
-handle_call(bytes, _From, #file{fd=Fd}=File) ->
- {reply, file:position(Fd, eof), File};
+handle_call(bytes, _From, #file{eof=Length}=File) ->
+ {reply, {ok, Length}, File};
handle_call(sync, _From, #file{fd=Fd}=File) ->
{reply, file:sync(Fd), File};
handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
{ok, Pos} = file:position(Fd, Pos),
- {reply, file:truncate(Fd), File};
-handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) ->
- {ok, Pos} = file:position(Fd, eof),
+ case file:truncate(Fd) of
+ ok ->
+ {reply, ok, File#file{eof=Pos}};
+ Error ->
+ {reply, Error, File}
+ end;
+handle_call({append_bin, Bin}, _From, #file{fd=Fd, eof=Pos}=File) ->
Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
- case file:pwrite(Fd, Pos, Blocks) of
+ case file:write(Fd, Blocks) of
ok ->
- {reply, {ok, Pos}, File};
+ {reply, {ok, Pos}, File#file{eof=Pos+iolist_size(Blocks)}};
Error ->
{reply, Error, File}
end;
-handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) ->
- {ok, Pos} = file:position(Fd, eof),
+handle_call({write_header, Bin}, _From, #file{fd=Fd, eof=Pos}=File) ->
BinSize = size(Bin),
case Pos rem ?SIZE_BLOCK of
0 ->
@@ -296,16 +344,21 @@ handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) ->
BlockOffset ->
Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
end,
- FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])],
- {reply, file:pwrite(Fd, Pos, FinalBin), File};
+ FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
+ case file:write(Fd, FinalBin) of
+ ok ->
+ {reply, ok, File#file{eof=Pos+iolist_size(FinalBin)}};
+ Error ->
+ {reply, Error, File}
+ end;
handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
case (catch read_old_header(Fd, Prefix)) of
{ok, Header} ->
- {ok, TailAppendBegin} = file:position(Fd, eof),
+ TailAppendBegin = File#file.eof,
Bin = term_to_binary(Header),
- Md5 = erlang:md5(Bin),
+ Md5 = couch_util:md5(Bin),
% now we assemble the final header binary and write to disk
FinalBin = <<Md5/binary, Bin/binary>>,
{reply, ok, _} = handle_call({write_header, FinalBin}, ok, File),
@@ -321,8 +374,7 @@ handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
end;
-handle_call(find_header, _From, #file{fd=Fd}=File) ->
- {ok, Pos} = file:position(Fd, eof),
+handle_call(find_header, _From, #file{fd=Fd, eof=Pos}=File) ->
{reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
% 09 UPGRADE CODE
@@ -386,7 +438,7 @@ extract_header(Prefix, Bin) ->
case HeaderPrefix of
Prefix ->
% check the integrity signature
- case erlang:md5(TermBin) == Sig of
+ case couch_util:md5(TermBin) == Sig of
true ->
Header = binary_to_term(TermBin),
{ok, Header};
@@ -416,7 +468,7 @@ write_old_header(Fd, Prefix, Data) ->
ok = file:sync(Fd),
% pad out the header with zeros, then take the md5 hash
PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
- Sig = erlang:md5([TermBin2, PadZeros]),
+ Sig = couch_util:md5([TermBin2, PadZeros]),
% now we assemble the final header binary and write to disk
WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
?HEADER_SIZE = size(WriteBin), % sanity check
@@ -432,6 +484,8 @@ handle_cast(close, Fd) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+handle_info({'EXIT', _, normal}, Fd) ->
+ {noreply, Fd};
handle_info({'EXIT', _, Reason}, Fd) ->
{stop, Reason, Fd}.
@@ -447,16 +501,53 @@ find_header(Fd, Block) ->
end.
load_header(Fd, Block) ->
- {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
- {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
+ {ok, <<1, HeaderLen:32/integer, RestBlock/binary>>} =
+ file:pread(Fd, Block * ?SIZE_BLOCK, ?SIZE_BLOCK),
TotalBytes = calculate_total_read_len(1, HeaderLen),
- {ok, <<RawBin:TotalBytes/binary>>} =
- file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
+ case TotalBytes > byte_size(RestBlock) of
+ false ->
+ <<RawBin:TotalBytes/binary, _/binary>> = RestBlock;
+ true ->
+ {ok, Missing} = file:pread(
+ Fd, (Block * ?SIZE_BLOCK) + 5 + byte_size(RestBlock),
+ TotalBytes - byte_size(RestBlock)),
+ RawBin = <<RestBlock/binary, Missing/binary>>
+ end,
<<Md5Sig:16/binary, HeaderBin/binary>> =
iolist_to_binary(remove_block_prefixes(1, RawBin)),
- Md5Sig = erlang:md5(HeaderBin),
+ Md5Sig = couch_util:md5(HeaderBin),
{ok, HeaderBin}.
+maybe_read_more_iolist(Buffer, DataSize, _, _)
+ when DataSize =< byte_size(Buffer) ->
+ <<Data:DataSize/binary, _/binary>> = Buffer,
+ [Data];
+maybe_read_more_iolist(Buffer, DataSize, NextPos, File) ->
+ {Missing, _} =
+ read_raw_iolist_int(File, NextPos, DataSize - byte_size(Buffer)),
+ [Buffer, Missing].
+
+-spec read_raw_iolist_int(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) ->
+ {Data::iolist(), CurPos::non_neg_integer()}.
+read_raw_iolist_int(Fd, {Pos, _Size}, Len) -> % 0110 UPGRADE CODE
+ read_raw_iolist_int(Fd, Pos, Len);
+read_raw_iolist_int(#file{fd=Fd, tail_append_begin=TAB}, Pos, Len) ->
+ BlockOffset = Pos rem ?SIZE_BLOCK,
+ TotalBytes = calculate_total_read_len(BlockOffset, Len),
+ {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
+ if Pos >= TAB ->
+ {remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
+ true ->
+ % 09 UPGRADE CODE
+ <<ReturnBin:Len/binary, _/binary>> = RawBin,
+ {[ReturnBin], Pos + Len}
+ end.
+
+-spec extract_md5(iolist()) -> {binary(), iolist()}.
+extract_md5(FullIoList) ->
+ {Md5List, IoList} = split_iolist(FullIoList, 16, []),
+ {iolist_to_binary(Md5List), IoList}.
+
calculate_total_read_len(0, FinalLen) ->
calculate_total_read_len(1, FinalLen) + 1;
calculate_total_read_len(BlockOffset, FinalLen) ->
@@ -495,6 +586,11 @@ make_blocks(BlockOffset, IoList) ->
IoList
end.
+%% @doc Returns a tuple where the first element contains the leading SplitAt
+%% bytes of the original iolist, and the 2nd element is the tail. If SplitAt
+%% is larger than byte_size(IoList), return the difference.
+-spec split_iolist(IoList::iolist(), SplitAt::non_neg_integer(), Acc::list()) ->
+ {iolist(), iolist()} | non_neg_integer().
split_iolist(List, 0, BeginAcc) ->
{lists:reverse(BeginAcc), List};
split_iolist([], SplitAt, _BeginAcc) ->