path: root/src/couchdb/couch_file.erl
diff options
authorDamien F. Katz <>2009-05-25 19:52:28 +0000
committerDamien F. Katz <>2009-05-25 19:52:28 +0000
commit16ccd4c0b8ae4272fa27d32948658b1424a291fc (patch)
treef6d59d017234409436091cc53938b27549d9b54f /src/couchdb/couch_file.erl
parent4aac0f7c6dcd3f3a29cfe5e1bf2bee84b9bae9d5 (diff)
Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading.
git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_file.erl')
1 files changed, 280 insertions, 141 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index 430aa6b7..c773be98 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -15,10 +15,16 @@
--define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+-define(SIZE_BLOCK, 4096).
+-record(file, {
+ fd,
+ tail_append_begin=0 % 09 UPGRADE CODE
+ }).
--export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
--export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
+-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
+-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
@@ -52,39 +58,6 @@ open(Filepath, Options) ->
-%% Args: Pos is the offset from the beginning of the file, Bytes is
-%% is the number of bytes to read.
-%% Returns: {ok, Binary} where Binary is a binary data from disk
-%% or {error, Reason}.
-pread(Fd, Pos, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {pread, Pos, Bytes}, infinity).
-%% Args: Pos is the offset from the beginning of the file, Bin is
-%% is the binary to write
-%% Returns: ok
-%% or {error, Reason}.
-pwrite(Fd, Pos, Bin) ->
- gen_server:call(Fd, {pwrite, Pos, Bin}, infinity).
-%% Purpose: To append a segment of zeros to the end of the file.
-%% Args: Bytes is the number of bytes to append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
-%% the new segments.
-%% or {error, Reason}.
-expand(Fd, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {expand, Bytes}, infinity).
%% Purpose: To append an Erlang term to the end of the file.
%% Args: Erlang term to serialize and append to the file.
%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
@@ -93,7 +66,7 @@ expand(Fd, Bytes) when Bytes > 0 ->
append_term(Fd, Term) ->
- append_binary(Fd, term_to_binary(Term, [compressed])).
+ append_binary(Fd, term_to_binary(Term)).
@@ -105,7 +78,8 @@ append_term(Fd, Term) ->
append_binary(Fd, Bin) ->
- gen_server:call(Fd, {append_bin, Bin}, infinity).
+ Size = iolist_size(Bin),
+ gen_server:call(Fd, {append_bin, [<<Size:32/integer>>, Bin]}, infinity).
@@ -115,10 +89,12 @@ append_binary(Fd, Bin) ->
%% or {error, Reason}.
pread_term(Fd, Pos) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, binary_to_term(Bin)}.
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args: Pos, the offset into the file where the term is serialized.
@@ -127,8 +103,26 @@ pread_term(Fd, Pos) ->
pread_binary(Fd, Pos) ->
- gen_server:call(Fd, {pread_bin, Pos}, infinity).
+ {ok, L} = pread_iolist(Fd, Pos),
+ {ok, iolist_to_binary(L)}.
+pread_iolist(Fd, Pos) ->
+ {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4),
+ <<Len:32/integer>> = iolist_to_binary(LenIolist),
+ {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
+ {ok, Iolist}.
+read_raw_iolist(Fd, Pos, Len) ->
+ BlockOffset = Pos rem ?SIZE_BLOCK,
+ TotalBytes = calculate_total_read_len(BlockOffset, Len),
+ {ok, <<RawBin:TotalBytes/binary>>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
+ if HasPrefixes ->
+ {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
+ true ->
+ <<ReturnBin:Len/binary, _/binary>> = RawBin,
+ {ok, [ReturnBin], Pos + Len}
+ end.
%% Purpose: The length of a file, in bytes.
@@ -167,35 +161,153 @@ close(Fd) ->
catch unlink(Fd),
+old_pread(Fd, Pos, Len) ->
+ {ok, <<RawBin:Len/binary>>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity),
+ {ok, RawBin}.
-write_header(Fd, Prefix, Data) ->
- TermBin = term_to_binary(Data),
- % the size of all the bytes written to the header, including the md5 signature (16 bytes)
- FilledSize = size(Prefix) + size(TermBin) + 16,
- {TermBin2, FilledSize2} =
- case FilledSize > ?HEADER_SIZE of
+upgrade_old_header(Fd, Sig) ->
+ gen_server:call(Fd, {upgrade_old_header, Sig}, infinity).
+read_header(Fd) ->
+ case gen_server:call(Fd, find_header, infinity) of
+ {ok, Bin} ->
+ {ok, binary_to_term(Bin)};
+ Else ->
+ Else
+ end.
+write_header(Fd, Data) ->
+ Bin = term_to_binary(Data),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ gen_server:call(Fd, {write_header, FinalBin}, infinity).
+init_status_error(ReturnPid, Ref, Error) ->
+ ReturnPid ! {Ref, self(), Error},
+ ignore.
+% server functions
+init({Filepath, Options, ReturnPid, Ref}) ->
+ case lists:member(create, Options) of
true ->
- % too big!
- {ok, Pos} = append_binary(Fd, TermBin),
- PtrBin = term_to_binary({pointer_to_header_data, Pos}),
- {PtrBin, size(Prefix) + size(PtrBin) + 16};
+ filelib:ensure_dir(Filepath),
+ case file:open(Filepath, [read, write, raw, binary]) of
+ {ok, Fd} ->
+ {ok, Length} = file:position(Fd, eof),
+ case Length > 0 of
+ true ->
+ % this means the file already exists and has data.
+ % FYI: We don't differentiate between empty files and non-existant
+ % files here.
+ case lists:member(overwrite, Options) of
+ true ->
+ {ok, 0} = file:position(Fd, 0),
+ ok = file:truncate(Fd),
+ ok = file:sync(Fd),
+ couch_stats_collector:track_process_count(
+ {couchdb, open_os_files}),
+ {ok, #file{fd=Fd}};
+ false ->
+ ok = file:close(Fd),
+ init_status_error(ReturnPid, Ref, file_exists)
+ end;
+ false ->
+ couch_stats_collector:track_process_count(
+ {couchdb, open_os_files}),
+ {ok, #file{fd=Fd}}
+ end;
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end;
false ->
- {TermBin, FilledSize}
+ % open in read mode first, so we don't create the file if it doesn't exist.
+ case file:open(Filepath, [read, raw]) of
+ {ok, Fd_Read} ->
+ {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+ ok = file:close(Fd_Read),
+ couch_stats_collector:track_process_count({couchdb, open_os_files}),
+ {ok, #file{fd=Fd}};
+ Error ->
+ init_status_error(ReturnPid, Ref, Error)
+ end
+ end.
+terminate(_Reason, _Fd) ->
+ ok.
+handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
+ {ok, Bin} = file:pread(Fd, Pos, Bytes),
+ {reply, {ok, Bin, Pos >= TailAppendBegin}, File};
+handle_call(bytes, _From, #file{fd=Fd}=File) ->
+ {reply, file:position(Fd, eof), File};
+handle_call(sync, _From, #file{fd=Fd}=File) ->
+ {reply, file:sync(Fd), File};
+handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, Pos),
+ {reply, file:truncate(Fd), File};
+handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
+ case file:pwrite(Fd, Pos, Blocks) of
+ ok ->
+ {reply, {ok, Pos}, File};
+ Error ->
+ {reply, Error, File}
+ end;
+handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ BinSize = size(Bin),
+ case Pos rem ?SIZE_BLOCK of
+ 0 ->
+ Padding = <<>>;
+ BlockOffset ->
+ Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
- ok = sync(Fd),
- % pad out the header with zeros, then take the md5 hash
- PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
- Sig = erlang:md5([TermBin2, PadZeros]),
- % now we assemble the final header binary and write to disk
- WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
- ?HEADER_SIZE = size(WriteBin), % sanity check
- DblWriteBin = [WriteBin, WriteBin],
- ok = pwrite(Fd, 0, DblWriteBin),
- ok = sync(Fd).
+ FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])],
+ {reply, file:pwrite(Fd, Pos, FinalBin), File};
+handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
+ case (catch read_old_header(Fd, Prefix)) of
+ {ok, Header} ->
+ {ok, TailAppendBegin} = file:position(Fd, eof),
+ Bin = term_to_binary(Header),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File),
+ ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin),
+ {reply, ok, File#file{tail_append_begin=TailAppendBegin}};
+ _Error ->
+ case (catch read_old_header(Fd, <<"upgraded">>)) of
+ {ok, TailAppendBegin} ->
+ {reply, ok, File#file{tail_append_begin = TailAppendBegin}};
+ _Error2 ->
+ {reply, ok, File}
+ end
+ end;
+handle_call(find_header, _From, #file{fd=Fd}=File) ->
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
-read_header(Fd, Prefix) ->
- {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+read_old_header(Fd, Prefix) ->
+ {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)),
<<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
Result =
% read the first header
@@ -238,6 +350,7 @@ read_header(Fd, Prefix) ->
extract_header(Prefix, Bin) ->
SizeOfPrefix = size(Prefix),
SizeOfTermBin = ?HEADER_SIZE -
@@ -260,88 +373,35 @@ extract_header(Prefix, Bin) ->
_ ->
-init_status_error(ReturnPid, Ref, Error) ->
- ReturnPid ! {Ref, self(), Error},
- ignore.
-% server functions
-init({Filepath, Options, ReturnPid, Ref}) ->
- case lists:member(create, Options) of
+write_old_header(Fd, Prefix, Data) ->
+ TermBin = term_to_binary(Data),
+ % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+ FilledSize = size(Prefix) + size(TermBin) + 16,
+ {TermBin2, FilledSize2} =
+ case FilledSize > ?HEADER_SIZE of
true ->
- filelib:ensure_dir(Filepath),
- case file:open(Filepath, [read, write, raw, binary]) of
- {ok, Fd} ->
- {ok, Length} = file:position(Fd, eof),
- case Length > 0 of
- true ->
- % this means the file already exists and has data.
- % FYI: We don't differentiate between empty files and non-existant
- % files here.
- case lists:member(overwrite, Options) of
- true ->
- {ok, 0} = file:position(Fd, 0),
- ok = file:truncate(Fd),
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
- {ok, Fd};
- false ->
- ok = file:close(Fd),
- init_status_error(ReturnPid, Ref, file_exists)
- end;
- false ->
- couch_stats_collector:track_process_count(
- {couchdb, open_os_files}),
- {ok, Fd}
- end;
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end;
+ % too big!
+ {ok, Pos} = append_binary(Fd, TermBin),
+ PtrBin = term_to_binary({pointer_to_header_data, Pos}),
+ {PtrBin, size(Prefix) + size(PtrBin) + 16};
false ->
- % open in read mode first, so we don't create the file if it doesn't exist.
- case file:open(Filepath, [read, raw]) of
- {ok, Fd_Read} ->
- {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
- ok = file:close(Fd_Read),
- couch_stats_collector:track_process_count({couchdb, open_os_files}),
- {ok, Fd};
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end
- end.
-terminate(_Reason, _Fd) ->
- ok.
-handle_call({pread, Pos, Bytes}, _From, Fd) ->
- {reply, file:pread(Fd, Pos, Bytes), Fd};
-handle_call({pwrite, Pos, Bin}, _From, Fd) ->
- {reply, file:pwrite(Fd, Pos, Bin), Fd};
-handle_call({expand, Num}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
-handle_call(bytes, _From, Fd) ->
- {reply, file:position(Fd, eof), Fd};
-handle_call(sync, _From, Fd) ->
- {reply, file:sync(Fd), Fd};
-handle_call({truncate, Pos}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, Pos),
- {reply, file:truncate(Fd), Fd};
-handle_call({append_bin, Bin}, _From, Fd) ->
- Len = size(Bin),
- Bin2 = <<Len:32, Bin/binary>>,
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_bin, Pos}, _From, Fd) ->
- {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4),
- {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, Bin}, Fd}.
+ {TermBin, FilledSize}
+ end,
+ ok = file:sync(Fd),
+ % pad out the header with zeros, then take the md5 hash
+ PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
+ Sig = erlang:md5([TermBin2, PadZeros]),
+ % now we assemble the final header binary and write to disk
+ WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
+ ?HEADER_SIZE = size(WriteBin), % sanity check
+ DblWriteBin = [WriteBin, WriteBin],
+ ok = file:pwrite(Fd, 0, DblWriteBin),
+ ok = file:sync(Fd).
handle_cast(close, Fd) ->
@@ -351,3 +411,82 @@ code_change(_OldVsn, State, _Extra) ->
handle_info({'EXIT', _, Reason}, Fd) ->
{stop, Reason, Fd}.
+find_header(_Fd, -1) ->
+ no_valid_header;
+find_header(Fd, Block) ->
+ case (catch load_header(Fd, Block)) of
+ {ok, Bin} ->
+ {ok, Bin};
+ _Error ->
+ find_header(Fd, Block -1)
+ end.
+load_header(Fd, Block) ->
+ {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
+ {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
+ TotalBytes = calculate_total_read_len(1, HeaderLen),
+ {ok, <<RawBin:TotalBytes/binary>>} =
+ file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
+ <<Md5Sig:16/binary, HeaderBin/binary>> =
+ iolist_to_binary(remove_block_prefixes(1, RawBin)),
+ Md5Sig = erlang:md5(HeaderBin),
+ {ok, HeaderBin}.
+calculate_total_read_len(0, FinalLen) ->
+ calculate_total_read_len(1, FinalLen) + 1;
+calculate_total_read_len(BlockOffset, FinalLen) ->
+ case ?SIZE_BLOCK - BlockOffset of
+ BlockLeft when BlockLeft >= FinalLen ->
+ FinalLen;
+ BlockLeft ->
+ FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) +
+ if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0;
+ true -> 1 end
+ end.
+remove_block_prefixes(_BlockOffset, <<>>) ->
+ [];
+remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) ->
+ remove_block_prefixes(1, Rest);
+remove_block_prefixes(BlockOffset, Bin) ->
+ BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
+ case size(Bin) of
+ Size when Size > BlockBytesAvailable ->
+ <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin,
+ [DataBlock | remove_block_prefixes(0, Rest)];
+ _Size ->
+ [Bin]
+ end.
+make_blocks(_BlockOffset, []) ->
+ [];
+make_blocks(0, IoList) ->
+ [<<0>> | make_blocks(1, IoList)];
+make_blocks(BlockOffset, IoList) ->
+ case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
+ {Begin, End} ->
+ [Begin | make_blocks(0, End)];
+ _Size ->
+ IoList
+ end.
+split_iolist(List, 0, BeginAcc) ->
+ {lists:reverse(BeginAcc), List};
+split_iolist([], SplitAt, _BeginAcc) ->
+ SplitAt;
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > size(Bin) ->
+ split_iolist(Rest, SplitAt - size(Bin), [Bin | BeginAcc]);
+split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
+ <<Begin:SplitAt/binary,End/binary>> = Bin,
+ split_iolist([End | Rest], 0, [Begin | BeginAcc]);
+split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
+ case split_iolist(Sublist, SplitAt, BeginAcc) of
+ {Begin, End} ->
+ {Begin, [End | Rest]};
+ Len ->
+ split_iolist(Rest, SplitAt - Len, [Sublist | BeginAcc])
+ end;
+split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
+ split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).