summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_file.erl
diff options
context:
space:
mode:
authorChristopher Lenz <cmlenz@apache.org>2008-03-28 23:32:19 +0000
committerChristopher Lenz <cmlenz@apache.org>2008-03-28 23:32:19 +0000
commit544a38dd45f6a58d34296c6c768afd086eb2ac70 (patch)
treec84cc02340b06aae189cff0dbfaee698f273f1f5 /src/couchdb/couch_file.erl
parent804cbbe033b8e7a3e8d7058aaf31bdf69ef18ac5 (diff)
Imported trunk.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@642432 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_file.erl')
-rw-r--r--src/couchdb/couch_file.erl323
1 files changed, 323 insertions, 0 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
new file mode 100644
index 00000000..6cbad44a
--- /dev/null
+++ b/src/couchdb/couch_file.erl
@@ -0,0 +1,323 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_file).
+-behaviour(gen_server).
+
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+
+-export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
+-export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+
+%%----------------------------------------------------------------------
+%% Args: Valid Options are [create] and [create,overwrite].
+%% Files are opened in read/write mode.
+%% Returns: On success, {ok, Fd}
+%% or {error, Reason} if the file could not be opened.
+%%----------------------------------------------------------------------
+
+open(Filepath) ->
+ open(Filepath, []).
+
+open(Filepath, Options) ->
+ case gen_server:start_link(couch_file, {Filepath, Options, self()}, []) of
+ {ok, FdPid} ->
+ % we got back an ok, but that doesn't really mean it was successful.
+ % Instead the true status has been sent back to us as a message.
+ % We do this because if the gen_server doesn't initialize properly,
+ % it generates a crash report that will get logged. This avoids
+ % that mess, because we don't want crash reports generated
+ % every time a file cannot be found.
+ receive
+ {FdPid, ok} ->
+ {ok, FdPid};
+ {FdPid, Error} ->
+ Error
+ end;
+ Error ->
+ Error
+ end.
+
+
+%%----------------------------------------------------------------------
+%% Args: Pos is the offset from the beginning of the file, Bytes is
+%% is the number of bytes to read.
+%% Returns: {ok, Binary} where Binary is a binary data from disk
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread(Fd, Pos, Bytes) when Bytes > 0 ->
+ gen_server:call(Fd, {pread, Pos, Bytes}).
+
+
+%%----------------------------------------------------------------------
+%% Args: Pos is the offset from the beginning of the file, Bin is
+%% is the binary to write
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pwrite(Fd, Pos, Bin) ->
+ gen_server:call(Fd, {pwrite, Pos, Bin}).
+
+%%----------------------------------------------------------------------
+%% Purpose: To append a segment of zeros to the end of the file.
+%% Args: Bytes is the number of bytes to append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
+%% the new segments.
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+expand(Fd, Bytes) when Bytes > 0 ->
+ gen_server:call(Fd, {expand, Bytes}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: To append an Erlang term to the end of the file.
+%% Args: Erlang term to serialize and append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
+%% serialized term. Use pread_term to read the term back.
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+append_term(Fd, Term) ->
+ gen_server:call(Fd, {append_term, Term}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: Reads a term from a file that was written with append_term
+%% Args: Pos, the offset into the file where the term is serialized.
+%% Returns: {ok, Term}
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread_term(Fd, Pos) ->
+ gen_server:call(Fd, {pread_term, Pos}).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: The length of a file, in bytes.
+%% Returns: {ok, Bytes}
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+% length in bytes
+bytes(Fd) ->
+ gen_server:call(Fd, bytes).
+
+%%----------------------------------------------------------------------
+%% Purpose: Truncate a file to the number of bytes.
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+truncate(Fd, Pos) ->
+ gen_server:call(Fd, {truncate, Pos}).
+
+%%----------------------------------------------------------------------
+%% Purpose: Ensure all bytes written to the file are flushed to disk.
+%% Returns: ok
+%% or {error, Reason}.
+%%----------------------------------------------------------------------
+
+sync(Fd) ->
+ gen_server:call(Fd, sync).
+
+%%----------------------------------------------------------------------
+%% Purpose: Close the file. Is performed asynchronously.
+%% Returns: ok
+%%----------------------------------------------------------------------
+close(Fd) ->
+ gen_server:cast(Fd, close).
+
+
+write_header(Fd, Prefix, Data) ->
+ % The leading bytes in every db file, the sig and the file version:
+ %the actual header data
+ TermBin = term_to_binary(Data),
+ % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+ FilledSize = size(Prefix) + size(TermBin) + 16,
+ case FilledSize > ?HEADER_SIZE of
+ true ->
+ % too big!
+ {error, error_header_too_large};
+ false ->
+ % pad out the header with zeros, then take the md5 hash
+ PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>,
+ Sig = erlang:md5([TermBin, PadZeros]),
+ % now we assemble the final header binary and write to disk
+ WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
+ ?HEADER_SIZE = size(WriteBin), % sanity check
+ DblWriteBin = [WriteBin, WriteBin],
+ ok = pwrite(Fd, 0, DblWriteBin)
+ end.
+
+
+read_header(Fd, Prefix) ->
+ {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+ <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
+ % read the first header
+ case extract_header(Prefix, Bin1) of
+ {ok, Header1} ->
+ case extract_header(Prefix, Bin2) of
+ {ok, Header2} ->
+ case Header1 == Header2 of
+ true ->
+ % Everything is completely normal!
+ {ok, Header1};
+ false ->
+ % To get here we must have two different header versions with signatures intact.
+ % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first.
+ couch_log:info("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
+ {ok, Header1}
+ end;
+ {error, Error} ->
+ % error reading second header. It's ok, but log it.
+ couch_log:info("Secondary header corruption (error: ~p). Using primary header.", [Error]),
+ {ok, Header1}
+ end;
+ {error, Error} ->
+ % error reading primary header
+ case extract_header(Prefix, Bin2) of
+ {ok, Header2} ->
+ % log corrupt primary header. It's ok since the secondary is still good.
+ couch_log:info("Primary header corruption (error: ~p). Using secondary header.", [Error]),
+ {ok, Header2};
+ _ ->
+ % error reading secondary header too
+ % return the error, no need to log anything as the caller will be responsible for dealing with the error.
+ {error, Error}
+ end
+ end.
+
+
+extract_header(Prefix, Bin) ->
+ SizeOfPrefix = size(Prefix),
+ SizeOfTermBin = ?HEADER_SIZE -
+ SizeOfPrefix -
+ 16, % md5 sig
+
+ <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
+
+ % check the header prefix
+ case HeaderPrefix of
+ Prefix ->
+ % check the integrity signature
+ case erlang:md5(TermBin) == Sig of
+ true ->
+ Header = binary_to_term(TermBin),
+ {ok, Header};
+ false ->
+ {error, header_corrupt}
+ end;
+ _ ->
+ {error, unknown_header_type}
+ end.
+
+
+
+init_status_ok(ReturnPid, Fd) ->
+ ReturnPid ! {self(), ok}, % signal back ok
+ {ok, Fd}.
+
+init_status_error(ReturnPid, Error) ->
+ ReturnPid ! {self(), Error}, % signal back error status
+ self() ! self_close, % tell ourself to close async
+ {ok, nil}.
+
+% server functions
+
+init({Filepath, Options, ReturnPid}) ->
+ case lists:member(create, Options) of
+ true ->
+ filelib:ensure_dir(Filepath),
+ case file:open(Filepath, [read, write, raw, binary]) of
+ {ok, Fd} ->
+ {ok, Length} = file:position(Fd, eof),
+ case Length > 0 of
+ true ->
+ % this means the file already exists and has data.
+ % FYI: We don't differentiate between empty files and non-existant
+ % files here.
+ case lists:member(overwrite, Options) of
+ true ->
+ {ok, 0} = file:position(Fd, 0),
+ ok = file:truncate(Fd),
+ init_status_ok(ReturnPid, Fd);
+ false ->
+ ok = file:close(Fd),
+ init_status_error(ReturnPid, {error, file_exists})
+ end;
+ false ->
+ init_status_ok(ReturnPid, Fd)
+ end;
+ Error ->
+ init_status_error(ReturnPid, Error)
+ end;
+ false ->
+ % open in read mode first, so we don't create the file if it doesn't exist.
+ case file:open(Filepath, [read, raw]) of
+ {ok, Fd_Read} ->
+ {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
+ ok = file:close(Fd_Read),
+ init_status_ok(ReturnPid, Fd);
+ Error ->
+ init_status_error(ReturnPid, Error)
+ end
+ end.
+
+
+terminate(_Reason, nil) ->
+ ok;
+terminate(_Reason, Fd) ->
+ file:close(Fd),
+ ok.
+
+
+handle_call({pread, Pos, Bytes}, _From, Fd) ->
+ {reply, file:pread(Fd, Pos, Bytes), Fd};
+handle_call({pwrite, Pos, Bin}, _From, Fd) ->
+ {reply, file:pwrite(Fd, Pos, Bin), Fd};
+handle_call({expand, Num}, _From, Fd) ->
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
+handle_call(bytes, _From, Fd) ->
+ {reply, file:position(Fd, eof), Fd};
+handle_call(sync, _From, Fd) ->
+ {reply, file:sync(Fd), Fd};
+handle_call({truncate, Pos}, _From, Fd) ->
+ {ok, Pos} = file:position(Fd, Pos),
+ {reply, file:truncate(Fd), Fd};
+handle_call({append_term, Term}, _From, Fd) ->
+ Bin = term_to_binary(Term, [compressed]),
+ TermLen = size(Bin),
+ Bin2 = <<TermLen:32, Bin/binary>>,
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
+handle_call({pread_term, Pos}, _From, Fd) ->
+ {ok, <<TermLen:32>>}
+ = file:pread(Fd, Pos, 4),
+ {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
+ {reply, {ok, binary_to_term(Bin)}, Fd}.
+
+
+handle_cast(close, Fd) ->
+ {stop,normal,Fd}. % causes terminate to be called
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_info(self_close, State) ->
+ {stop,normal,State};
+handle_info(_Info, State) ->
+ {noreply, State}.