summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_file.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_file.erl')
-rw-r--r--src/couchdb/couch_file.erl421
1 files changed, 280 insertions, 141 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 430aa6b7..c773be98 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -15,10 +15,16 @@
-include("couch_db.hrl").
--define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+-define(SIZE_BLOCK, 4096).
+
+-record(file, {
+ fd,
+ tail_append_begin=0 % 09 UPGRADE CODE
+ }).
--export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
--export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
+-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
+-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
%%----------------------------------------------------------------------
@@ -52,39 +58,6 @@ open(Filepath, Options) ->
%%----------------------------------------------------------------------
-%% Args: Pos is the offset from the beginning of the file, Bytes is
-%% is the number of bytes to read.
-%% Returns: {ok, Binary} where Binary is a binary data from disk
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pread(Fd, Pos, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {pread, Pos, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
-%% Args: Pos is the offset from the beginning of the file, Bin is
-%% is the binary to write
-%% Returns: ok
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pwrite(Fd, Pos, Bin) ->
- gen_server:call(Fd, {pwrite, Pos, Bin}, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: To append a segment of zeros to the end of the file.
-%% Args: Bytes is the number of bytes to append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
-%% the new segments.
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-expand(Fd, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {expand, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
%% Purpose: To append an Erlang term to the end of the file.
%% Args: Erlang term to serialize and append to the file.
%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
@@ -93,7 +66,7 @@ expand(Fd, Bytes) when Bytes > 0 ->
%%----------------------------------------------------------------------
append_term(Fd, Term) ->
- append_binary(Fd, term_to_binary(Term, [compressed])).
+ append_binary(Fd, term_to_binary(Term)).
%%----------------------------------------------------------------------
@@ -105,7 +78,8 @@ append_term(Fd, Term) ->
%%----------------------------------------------------------------------
append_binary(Fd, Bin) ->
- gen_server:call(Fd, {append_bin, Bin}, infinity).
+ Size = iolist_size(Bin),
+ gen_server:call(Fd, {append_bin, [<<Size:32/integer>>, Bin]}, infinity).
%%----------------------------------------------------------------------
@@ -115,10 +89,12 @@ append_binary(Fd, Bin) ->
%% or {error, Reason}.
%%----------------------------------------------------------------------
+
pread_term(Fd, Pos) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, binary_to_term(Bin)}.
+
%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args: Pos, the offset into the file where the term is serialized.
@@ -127,8 +103,26 @@ pread_term(Fd, Pos) ->
%%----------------------------------------------------------------------
pread_binary(Fd, Pos) ->
- gen_server:call(Fd, {pread_bin, Pos}, infinity).
-
+ {ok, L} = pread_iolist(Fd, Pos),
+ {ok, iolist_to_binary(L)}.
+
+pread_iolist(Fd, Pos) ->
+ {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4),
+ <<Len:32/integer>> = iolist_to_binary(LenIolist),
+ {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
+ {ok, Iolist}.
+
+read_raw_iolist(Fd, Pos, Len) ->
+ BlockOffset = Pos rem ?SIZE_BLOCK,
+ TotalBytes = calculate_total_read_len(BlockOffset, Len),
+ {ok, <<RawBin:TotalBytes/binary>>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
+ if HasPrefixes ->
+ {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
+ true ->
+ % 09 UPGRADE CODE
+ <<ReturnBin:Len/binary, _/binary>> = RawBin,
+ {ok, [ReturnBin], Pos + Len}
+ end.
%%----------------------------------------------------------------------
%% Purpose: The length of a file, in bytes.
@@ -167,35 +161,153 @@ close(Fd) ->
catch unlink(Fd),
Result.
+% 09 UPGRADE CODE
+old_pread(Fd, Pos, Len) ->
+ {ok, <<RawBin:Len/binary>>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity),
+ {ok, RawBin}.
-write_header(Fd, Prefix, Data) ->
- TermBin = term_to_binary(Data),
- % the size of all the bytes written to the header, including the md5 signature (16 bytes)
- FilledSize = size(Prefix) + size(TermBin) + 16,
- {TermBin2, FilledSize2} =
- case FilledSize > ?HEADER_SIZE of
+% 09 UPGRADE CODE
+upgrade_old_header(Fd, Sig) ->
+ gen_server:call(Fd, {upgrade_old_header, Sig}, infinity).
+
+
+read_header(Fd) ->
+ case gen_server:call(Fd, find_header, infinity) of
+ {ok, Bin} ->
+ {ok, binary_to_term(Bin)};
+ Else ->
+ Else
+ end.
+
+write_header(Fd, Data) ->
+ Bin = term_to_binary(Data),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ gen_server:call(Fd, {write_header, FinalBin}, infinity).
+
+
+
+
+init_status_error(ReturnPid, Ref, Error) ->
+ ReturnPid ! {Ref, self(), Error},
+ ignore.
+
+% server functions
+
+init({Filepath, Options, ReturnPid, Ref}) ->
+ case lists:member(create, Options) of
true ->
- % too big!
- {ok, Pos} = append_binary(Fd, TermBin),
- PtrBin = term_to_binary({pointer_to_header_data, Pos}),
- {PtrBin, size(Prefix) + size(PtrBin) + 16};
+ filelib:ensure_dir(Filepath),
+ case file:open(Filepath, [read, write, raw, binary]) of
+ {ok, Fd} ->
+ {ok, Length} = file:position(Fd, eof),
+ case Length > 0 of
+ true ->
+ % this means the file already exists and has data.
+ % FYI: We don't differentiate between empty files and non-existant
+ % files here.
+ case lists:member(overwrite, Options) of
+ true ->
+ {ok, 0} = file:position(Fd, 0),
+ ok = file:truncate(Fd),
+ ok = file:sync(Fd),
+ couch_stats_collector:track_process_count(
+ {couchdb, open_os_files}),
+ {ok, #file{fd=Fd}};
+ false ->
+ ok = file:close(Fd),
+ init_status_error(ReturnPid, Ref, file_exists)
+ end;
+ false ->
+ couch_stats_collector:track_process_count(
+ {couchdb, open_os_files}),
+ {ok, #file{fd=Fd}}
+ end;
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end;
false ->
- {TermBin, FilledSize}
+ % open in read mode first, so we don't create the file if it doesn't exist.
+ case file:open(Filepath, [read, raw]) of
+ {ok, Fd_Read} ->
+ {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+ ok = file:close(Fd_Read),
+ couch_stats_collector:track_process_count({couchdb, open_os_files}),
+ {ok, #file{fd=Fd}};
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end
+ end.
+
+
+terminate(_Reason, _Fd) ->
+ ok.
+
+
+handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
+ {ok, Bin} = file:pread(Fd, Pos, Bytes),
+ {reply, {ok, Bin, Pos >= TailAppendBegin}, File};
+handle_call(bytes, _From, #file{fd=Fd}=File) ->
+ {reply, file:position(Fd, eof), File};
+handle_call(sync, _From, #file{fd=Fd}=File) ->
+ {reply, file:sync(Fd), File};
+handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, Pos),
+ {reply, file:truncate(Fd), File};
+handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
+ case file:pwrite(Fd, Pos, Blocks) of
+ ok ->
+ {reply, {ok, Pos}, File};
+ Error ->
+ {reply, Error, File}
+ end;
+handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ BinSize = size(Bin),
+ case Pos rem ?SIZE_BLOCK of
+ 0 ->
+ Padding = <<>>;
+ BlockOffset ->
+ Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
end,
- ok = sync(Fd),
- % pad out the header with zeros, then take the md5 hash
- PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
- Sig = erlang:md5([TermBin2, PadZeros]),
- % now we assemble the final header binary and write to disk
- WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
- ?HEADER_SIZE = size(WriteBin), % sanity check
- DblWriteBin = [WriteBin, WriteBin],
- ok = pwrite(Fd, 0, DblWriteBin),
- ok = sync(Fd).
+ FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])],
+ {reply, file:pwrite(Fd, Pos, FinalBin), File};
+
+
+handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
+ case (catch read_old_header(Fd, Prefix)) of
+ {ok, Header} ->
+ {ok, TailAppendBegin} = file:position(Fd, eof),
+ Bin = term_to_binary(Header),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File),
+ ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin),
+ {reply, ok, File#file{tail_append_begin=TailAppendBegin}};
+ _Error ->
+ case (catch read_old_header(Fd, <<"upgraded">>)) of
+ {ok, TailAppendBegin} ->
+ {reply, ok, File#file{tail_append_begin = TailAppendBegin}};
+ _Error2 ->
+ {reply, ok, File}
+ end
+ end;
+
+handle_call(find_header, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+
+% 09 UPGRADE CODE
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
-read_header(Fd, Prefix) ->
- {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+% 09 UPGRADE CODE
+read_old_header(Fd, Prefix) ->
+ {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)),
<<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
Result =
% read the first header
@@ -238,6 +350,7 @@ read_header(Fd, Prefix) ->
Result
end.
+% 09 UPGRADE CODE
extract_header(Prefix, Bin) ->
SizeOfPrefix = size(Prefix),
SizeOfTermBin = ?HEADER_SIZE -
@@ -260,88 +373,35 @@ extract_header(Prefix, Bin) ->
_ ->
unknown_header_type
end.
+
-
-init_status_error(ReturnPid, Ref, Error) ->
- ReturnPid ! {Ref, self(), Error},
- ignore.
-
-% server functions
-
-init({Filepath, Options, ReturnPid, Ref}) ->
- case lists:member(create, Options) of
+% 09 UPGRADE CODE
+write_old_header(Fd, Prefix, Data) ->
+ TermBin = term_to_binary(Data),
+ % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+ FilledSize = size(Prefix) + size(TermBin) + 16,
+ {TermBin2, FilledSize2} =
+ case FilledSize > ?HEADER_SIZE of
true ->
- filelib:ensure_dir(Filepath),
- case file:open(Filepath, [read, write, raw, binary]) of
- {ok, Fd} ->
- {ok, Length} = file:position(Fd, eof),
- case Length > 0 of
- true ->
- % this means the file already exists and has data.
- % FYI: We don't differentiate between empty files and non-existant
- % files here.
- case lists:member(overwrite, Options) of
- true ->
- {ok, 0} = file:position(Fd, 0),
- ok = file:truncate(Fd),
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
- {ok, Fd};
- false ->
- ok = file:close(Fd),
- init_status_error(ReturnPid, Ref, file_exists)
- end;
- false ->
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
- {ok, Fd}
- end;
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end;
+ % too big!
+ {ok, Pos} = append_binary(Fd, TermBin),
+ PtrBin = term_to_binary({pointer_to_header_data, Pos}),
+ {PtrBin, size(Prefix) + size(PtrBin) + 16};
false ->
- % open in read mode first, so we don't create the file if it doesn't exist.
- case file:open(Filepath, [read, raw]) of
- {ok, Fd_Read} ->
- {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
- ok = file:close(Fd_Read),
- couch_stats_collector:track_process_count({couchdb, open_os_files}),
- {ok, Fd};
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end
- end.
-
-
-terminate(_Reason, _Fd) ->
- ok.
-
-
-handle_call({pread, Pos, Bytes}, _From, Fd) ->
- {reply, file:pread(Fd, Pos, Bytes), Fd};
-handle_call({pwrite, Pos, Bin}, _From, Fd) ->
- {reply, file:pwrite(Fd, Pos, Bin), Fd};
-handle_call({expand, Num}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
-handle_call(bytes, _From, Fd) ->
- {reply, file:position(Fd, eof), Fd};
-handle_call(sync, _From, Fd) ->
- {reply, file:sync(Fd), Fd};
-handle_call({truncate, Pos}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, Pos),
- {reply, file:truncate(Fd), Fd};
-handle_call({append_bin, Bin}, _From, Fd) ->
- Len = size(Bin),
- Bin2 = <<Len:32, Bin/binary>>,
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_bin, Pos}, _From, Fd) ->
- {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4),
- {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, Bin}, Fd}.
-
+ {TermBin, FilledSize}
+ end,
+ ok = file:sync(Fd),
+ % pad out the header with zeros, then take the md5 hash
+ PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
+ Sig = erlang:md5([TermBin2, PadZeros]),
+ % now we assemble the final header binary and write to disk
+ WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
+ ?HEADER_SIZE = size(WriteBin), % sanity check
+ DblWriteBin = [WriteBin, WriteBin],
+ ok = file:pwrite(Fd, 0, DblWriteBin),
+ ok = file:sync(Fd).
+
handle_cast(close, Fd) ->
{stop,normal,Fd}.
@@ -351,3 +411,82 @@ code_change(_OldVsn, State, _Extra) ->
handle_info({'EXIT', _, Reason}, Fd) ->
{stop, Reason, Fd}.
+
+
+find_header(_Fd, -1) ->
+ no_valid_header;
+find_header(Fd, Block) ->
+ case (catch load_header(Fd, Block)) of
+ {ok, Bin} ->
+ {ok, Bin};
+ _Error ->
+ find_header(Fd, Block -1)
+ end.
+
+load_header(Fd, Block) ->
+ {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
+ {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
+ TotalBytes = calculate_total_read_len(1, HeaderLen),
+ {ok, <<RawBin:TotalBytes/binary>>} =
+ file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
+ <<Md5Sig:16/binary, HeaderBin/binary>> =
+ iolist_to_binary(remove_block_prefixes(1, RawBin)),
+ Md5Sig = erlang:md5(HeaderBin),
+ {ok, HeaderBin}.
+
+calculate_total_read_len(0, FinalLen) ->
+ calculate_total_read_len(1, FinalLen) + 1;
+calculate_total_read_len(BlockOffset, FinalLen) ->
+ case ?SIZE_BLOCK - BlockOffset of
+ BlockLeft when BlockLeft >= FinalLen ->
+ FinalLen;
+ BlockLeft ->
+ FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) +
+ if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0;
+ true -> 1 end
+ end.
+
+remove_block_prefixes(_BlockOffset, <<>>) ->
+ [];
+remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) ->
+ remove_block_prefixes(1, Rest);
+remove_block_prefixes(BlockOffset, Bin) ->
+ BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
+ case size(Bin) of
+ Size when Size > BlockBytesAvailable ->
+ <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin,
+ [DataBlock | remove_block_prefixes(0, Rest)];
+ _Size ->
+ [Bin]
+ end.
+
+make_blocks(_BlockOffset, []) ->
+ [];
+make_blocks(0, IoList) ->
+ [<<0>> | make_blocks(1, IoList)];
+make_blocks(BlockOffset, IoList) ->
+ case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
+ {Begin, End} ->
+ [Begin | make_blocks(0, End)];
+ _Size ->
+ IoList
+ end.
+
+split_iolist(List, 0, BeginAcc) ->
+ {lists:reverse(BeginAcc), List};
+split_iolist([], SplitAt, _BeginAcc) ->
+ SplitAt;
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > size(Bin) ->
+ split_iolist(Rest, SplitAt - size(Bin), [Bin | BeginAcc]);
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
+ <<Begin:SplitAt/binary,End/binary>> = Bin,
+ split_iolist([End | Rest], 0, [Begin | BeginAcc]);
+split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
+ case split_iolist(Sublist, SplitAt, BeginAcc) of
+ {Begin, End} ->
+ {Begin, [End | Rest]};
+ Len ->
+ split_iolist(Rest, SplitAt - Len, [Sublist | BeginAcc])
+ end;
+split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
+ split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).