summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_file.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-11 15:22:33 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-11 17:39:37 -0400
commit81bdbed444df2cbcf3cdb32f7d4a74019de06454 (patch)
treeeade7d0d9bb4cac01b55fd8642adfe0f7da35161 /src/couchdb/couch_file.erl
parentcc1910f73fbd20c5ffc94bd61e7701d7f5e4c92a (diff)
reorganize couch .erl and driver code into rebar layout
Diffstat (limited to 'src/couchdb/couch_file.erl')
-rw-r--r--src/couchdb/couch_file.erl588
1 files changed, 0 insertions, 588 deletions
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
deleted file mode 100644
index 0a891712..00000000
--- a/src/couchdb/couch_file.erl
+++ /dev/null
@@ -1,588 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_file).
--behaviour(gen_server).
-
--include("couch_db.hrl").
-
--define(SIZE_BLOCK, 4096).
-
--record(file, {
- fd,
- tail_append_begin = 0, % 09 UPGRADE CODE
- eof = 0
- }).
-
--export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
--export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
--export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
--export([append_term_md5/2,append_binary_md5/2]).
--export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
--export([delete/2,delete/3,init_delete_dir/1]).
-
-%%----------------------------------------------------------------------
-%% Args: Valid Options are [create] and [create,overwrite].
-%% Files are opened in read/write mode.
-%% Returns: On success, {ok, Fd}
-%% or {error, Reason} if the file could not be opened.
-%%----------------------------------------------------------------------
-
-open(Filepath) ->
- open(Filepath, []).
-
-open(Filepath, Options) ->
- case gen_server:start_link(couch_file,
- {Filepath, Options, self(), Ref = make_ref()}, []) of
- {ok, Fd} ->
- {ok, Fd};
- ignore ->
- % get the error
- receive
- {Ref, Pid, Error} ->
- case process_info(self(), trap_exit) of
- {trap_exit, true} -> receive {'EXIT', Pid, _} -> ok end;
- {trap_exit, false} -> ok
- end,
- Error
- end;
- Error ->
- Error
- end.
-
-
-%%----------------------------------------------------------------------
-%% Purpose: To append an Erlang term to the end of the file.
-%% Args: Erlang term to serialize and append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
-%% serialized term. Use pread_term to read the term back.
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-append_term(Fd, Term) ->
- append_binary(Fd, term_to_binary(Term)).
-
-append_term_md5(Fd, Term) ->
- append_binary_md5(Fd, term_to_binary(Term)).
-
-
-%%----------------------------------------------------------------------
-%% Purpose: To append an Erlang binary to the end of the file.
-%% Args: Erlang term to serialize and append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
-%% serialized term. Use pread_term to read the term back.
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-append_binary(Fd, Bin) ->
- Size = iolist_size(Bin),
- gen_server:call(Fd, {append_bin,
- [<<0:1/integer,Size:31/integer>>, Bin]}, infinity).
-
-append_binary_md5(Fd, Bin) ->
- Size = iolist_size(Bin),
- gen_server:call(Fd, {append_bin,
- [<<1:1/integer,Size:31/integer>>, couch_util:md5(Bin), Bin]}, infinity).
-
-
-%%----------------------------------------------------------------------
-%% Purpose: Reads a term from a file that was written with append_term
-%% Args: Pos, the offset into the file where the term is serialized.
-%% Returns: {ok, Term}
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-
-pread_term(Fd, Pos) ->
- {ok, Bin} = pread_binary(Fd, Pos),
- {ok, binary_to_term(Bin)}.
-
-
-%%----------------------------------------------------------------------
-%% Purpose: Reads a binrary from a file that was written with append_binary
-%% Args: Pos, the offset into the file where the term is serialized.
-%% Returns: {ok, Term}
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pread_binary(Fd, Pos) ->
- {ok, L} = pread_iolist(Fd, Pos),
- {ok, iolist_to_binary(L)}.
-
-
-pread_iolist(Fd, Pos) ->
- gen_server:call(Fd, {pread_iolist, Pos}, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: The length of a file, in bytes.
-%% Returns: {ok, Bytes}
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-% length in bytes
-bytes(Fd) ->
- gen_server:call(Fd, bytes, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: Truncate a file to the number of bytes.
-%% Returns: ok
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-truncate(Fd, Pos) ->
- gen_server:call(Fd, {truncate, Pos}, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: Ensure all bytes written to the file are flushed to disk.
-%% Returns: ok
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-sync(Filepath) when is_list(Filepath) ->
- {ok, Fd} = file:open(Filepath, [append, raw]),
- try file:sync(Fd) after file:close(Fd) end;
-sync(Fd) ->
- gen_server:call(Fd, sync, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: Close the file.
-%% Returns: ok
-%%----------------------------------------------------------------------
-close(Fd) ->
- MRef = erlang:monitor(process, Fd),
- try
- catch unlink(Fd),
- catch exit(Fd, shutdown),
- receive
- {'DOWN', MRef, _, _, _} ->
- ok
- end
- after
- erlang:demonitor(MRef, [flush])
- end.
-
-
-delete(RootDir, Filepath) ->
- delete(RootDir, Filepath, true).
-
-
-delete(RootDir, Filepath, Async) ->
- DelFile = filename:join([RootDir,".delete", ?b2l(couch_uuids:random())]),
- case file:rename(Filepath, DelFile) of
- ok ->
- if (Async) ->
- spawn(file, delete, [DelFile]),
- ok;
- true ->
- file:delete(DelFile)
- end;
- Error ->
- Error
- end.
-
-
-init_delete_dir(RootDir) ->
- Dir = filename:join(RootDir,".delete"),
- % note: ensure_dir requires an actual filename companent, which is the
- % reason for "foo".
- filelib:ensure_dir(filename:join(Dir,"foo")),
- filelib:fold_files(Dir, ".*", true,
- fun(Filename, _) ->
- ok = file:delete(Filename)
- end, ok).
-
-
-% 09 UPGRADE CODE
-old_pread(Fd, Pos, Len) ->
- {ok, <<RawBin:Len/binary>>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity),
- {ok, RawBin}.
-
-% 09 UPGRADE CODE
-upgrade_old_header(Fd, Sig) ->
- gen_server:call(Fd, {upgrade_old_header, Sig}, infinity).
-
-
-read_header(Fd) ->
- case gen_server:call(Fd, find_header, infinity) of
- {ok, Bin} ->
- {ok, binary_to_term(Bin)};
- Else ->
- Else
- end.
-
-write_header(Fd, Data) ->
- Bin = term_to_binary(Data),
- Md5 = couch_util:md5(Bin),
- % now we assemble the final header binary and write to disk
- FinalBin = <<Md5/binary, Bin/binary>>,
- gen_server:call(Fd, {write_header, FinalBin}, infinity).
-
-
-
-
-init_status_error(ReturnPid, Ref, Error) ->
- ReturnPid ! {Ref, self(), Error},
- ignore.
-
-% server functions
-
-init({Filepath, Options, ReturnPid, Ref}) ->
- process_flag(trap_exit, true),
- case lists:member(create, Options) of
- true ->
- filelib:ensure_dir(Filepath),
- case file:open(Filepath, [read, append, raw, binary]) of
- {ok, Fd} ->
- {ok, Length} = file:position(Fd, eof),
- case Length > 0 of
- true ->
- % this means the file already exists and has data.
- % FYI: We don't differentiate between empty files and non-existant
- % files here.
- case lists:member(overwrite, Options) of
- true ->
- {ok, 0} = file:position(Fd, 0),
- ok = file:truncate(Fd),
- ok = file:sync(Fd),
- maybe_track_open_os_files(Options),
- {ok, #file{fd=Fd}};
- false ->
- ok = file:close(Fd),
- init_status_error(ReturnPid, Ref, file_exists)
- end;
- false ->
- maybe_track_open_os_files(Options),
- {ok, #file{fd=Fd}}
- end;
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end;
- false ->
- % open in read mode first, so we don't create the file if it doesn't exist.
- case file:open(Filepath, [read, raw]) of
- {ok, Fd_Read} ->
- {ok, Fd} = file:open(Filepath, [read, append, raw, binary]),
- ok = file:close(Fd_Read),
- maybe_track_open_os_files(Options),
- {ok, Length} = file:position(Fd, eof),
- {ok, #file{fd=Fd, eof=Length}};
- Error ->
- init_status_error(ReturnPid, Ref, Error)
- end
- end.
-
-maybe_track_open_os_files(FileOptions) ->
- case lists:member(sys_db, FileOptions) of
- true ->
- ok;
- false ->
- couch_stats_collector:track_process_count({couchdb, open_os_files})
- end.
-
-terminate(_Reason, _Fd) ->
- ok.
-
-
-handle_call({pread_iolist, Pos}, _From, File) ->
- {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
- case iolist_to_binary(LenIolist) of
- <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term
- {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16),
- {Md5, IoList} = extract_md5(Md5AndIoList),
- case couch_util:md5(IoList) of
- Md5 ->
- {reply, {ok, IoList}, File};
- _ ->
- {stop, file_corruption, {error,file_corruption}, File}
- end;
- <<0:1/integer,Len:31/integer>> ->
- {Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
- {reply, {ok, Iolist}, File}
- end;
-handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
- {ok, Bin} = file:pread(Fd, Pos, Bytes),
- {reply, {ok, Bin, Pos >= TailAppendBegin}, File};
-handle_call(bytes, _From, #file{eof=Length}=File) ->
- {reply, {ok, Length}, File};
-handle_call(sync, _From, #file{fd=Fd}=File) ->
- {reply, file:sync(Fd), File};
-handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
- {ok, Pos} = file:position(Fd, Pos),
- case file:truncate(Fd) of
- ok ->
- {reply, ok, File#file{eof=Pos}};
- Error ->
- {reply, Error, File}
- end;
-handle_call({append_bin, Bin}, _From, #file{fd=Fd, eof=Pos}=File) ->
- Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
- case file:write(Fd, Blocks) of
- ok ->
- {reply, {ok, Pos}, File#file{eof=Pos+iolist_size(Blocks)}};
- Error ->
- {reply, Error, File}
- end;
-handle_call({write_header, Bin}, _From, #file{fd=Fd, eof=Pos}=File) ->
- BinSize = size(Bin),
- case Pos rem ?SIZE_BLOCK of
- 0 ->
- Padding = <<>>;
- BlockOffset ->
- Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
- end,
- FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
- case file:write(Fd, FinalBin) of
- ok ->
- {reply, ok, File#file{eof=Pos+iolist_size(FinalBin)}};
- Error ->
- {reply, Error, File}
- end;
-
-
-handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
- case (catch read_old_header(Fd, Prefix)) of
- {ok, Header} ->
- TailAppendBegin = File#file.eof,
- Bin = term_to_binary(Header),
- Md5 = couch_util:md5(Bin),
- % now we assemble the final header binary and write to disk
- FinalBin = <<Md5/binary, Bin/binary>>,
- {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File),
- ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin),
- {reply, ok, File#file{tail_append_begin=TailAppendBegin}};
- _Error ->
- case (catch read_old_header(Fd, <<"upgraded">>)) of
- {ok, TailAppendBegin} ->
- {reply, ok, File#file{tail_append_begin = TailAppendBegin}};
- _Error2 ->
- {reply, ok, File}
- end
- end;
-
-
-handle_call(find_header, _From, #file{fd=Fd, eof=Pos}=File) ->
- {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
-
-% 09 UPGRADE CODE
--define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
-
-% 09 UPGRADE CODE
-read_old_header(Fd, Prefix) ->
- {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)),
- <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
- Result =
- % read the first header
- case extract_header(Prefix, Bin1) of
- {ok, Header1} ->
- case extract_header(Prefix, Bin2) of
- {ok, Header2} ->
- case Header1 == Header2 of
- true ->
- % Everything is completely normal!
- {ok, Header1};
- false ->
- % To get here we must have two different header versions with signatures intact.
- % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first.
- ?LOG_INFO("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
- {ok, Header1}
- end;
- Error ->
- % error reading second header. It's ok, but log it.
- ?LOG_INFO("Secondary header corruption (error: ~p). Using primary header.", [Error]),
- {ok, Header1}
- end;
- Error ->
- % error reading primary header
- case extract_header(Prefix, Bin2) of
- {ok, Header2} ->
- % log corrupt primary header. It's ok since the secondary is still good.
- ?LOG_INFO("Primary header corruption (error: ~p). Using secondary header.", [Error]),
- {ok, Header2};
- _ ->
- % error reading secondary header too
- % return the error, no need to log anything as the caller will be responsible for dealing with the error.
- Error
- end
- end,
- case Result of
- {ok, {pointer_to_header_data, Ptr}} ->
- pread_term(Fd, Ptr);
- _ ->
- Result
- end.
-
-% 09 UPGRADE CODE
-extract_header(Prefix, Bin) ->
- SizeOfPrefix = size(Prefix),
- SizeOfTermBin = ?HEADER_SIZE -
- SizeOfPrefix -
- 16, % md5 sig
-
- <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
-
- % check the header prefix
- case HeaderPrefix of
- Prefix ->
- % check the integrity signature
- case couch_util:md5(TermBin) == Sig of
- true ->
- Header = binary_to_term(TermBin),
- {ok, Header};
- false ->
- header_corrupt
- end;
- _ ->
- unknown_header_type
- end.
-
-
-% 09 UPGRADE CODE
-write_old_header(Fd, Prefix, Data) ->
- TermBin = term_to_binary(Data),
- % the size of all the bytes written to the header, including the md5 signature (16 bytes)
- FilledSize = byte_size(Prefix) + byte_size(TermBin) + 16,
- {TermBin2, FilledSize2} =
- case FilledSize > ?HEADER_SIZE of
- true ->
- % too big!
- {ok, Pos} = append_binary(Fd, TermBin),
- PtrBin = term_to_binary({pointer_to_header_data, Pos}),
- {PtrBin, byte_size(Prefix) + byte_size(PtrBin) + 16};
- false ->
- {TermBin, FilledSize}
- end,
- ok = file:sync(Fd),
- % pad out the header with zeros, then take the md5 hash
- PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
- Sig = couch_util:md5([TermBin2, PadZeros]),
- % now we assemble the final header binary and write to disk
- WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
- ?HEADER_SIZE = size(WriteBin), % sanity check
- DblWriteBin = [WriteBin, WriteBin],
- ok = file:pwrite(Fd, 0, DblWriteBin),
- ok = file:sync(Fd).
-
-
-handle_cast(close, Fd) ->
- {stop,normal,Fd}.
-
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-handle_info({'EXIT', _, normal}, Fd) ->
- {noreply, Fd};
-handle_info({'EXIT', _, Reason}, Fd) ->
- {stop, Reason, Fd}.
-
-
-find_header(_Fd, -1) ->
- no_valid_header;
-find_header(Fd, Block) ->
- case (catch load_header(Fd, Block)) of
- {ok, Bin} ->
- {ok, Bin};
- _Error ->
- find_header(Fd, Block -1)
- end.
-
-load_header(Fd, Block) ->
- {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
- {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
- TotalBytes = calculate_total_read_len(1, HeaderLen),
- {ok, <<RawBin:TotalBytes/binary>>} =
- file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
- <<Md5Sig:16/binary, HeaderBin/binary>> =
- iolist_to_binary(remove_block_prefixes(1, RawBin)),
- Md5Sig = couch_util:md5(HeaderBin),
- {ok, HeaderBin}.
-
--spec read_raw_iolist_int(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) ->
- {Data::iolist(), CurPos::non_neg_integer()}.
-read_raw_iolist_int(#file{fd=Fd, tail_append_begin=TAB}, Pos, Len) ->
- BlockOffset = Pos rem ?SIZE_BLOCK,
- TotalBytes = calculate_total_read_len(BlockOffset, Len),
- {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes),
- if Pos >= TAB ->
- {remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
- true ->
- % 09 UPGRADE CODE
- <<ReturnBin:Len/binary, _/binary>> = RawBin,
- {[ReturnBin], Pos + Len}
- end.
-
--spec extract_md5(iolist()) -> {binary(), iolist()}.
-extract_md5(FullIoList) ->
- {Md5List, IoList} = split_iolist(FullIoList, 16, []),
- {iolist_to_binary(Md5List), IoList}.
-
-calculate_total_read_len(0, FinalLen) ->
- calculate_total_read_len(1, FinalLen) + 1;
-calculate_total_read_len(BlockOffset, FinalLen) ->
- case ?SIZE_BLOCK - BlockOffset of
- BlockLeft when BlockLeft >= FinalLen ->
- FinalLen;
- BlockLeft ->
- FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) +
- if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) =:= 0 -> 0;
- true -> 1 end
- end.
-
-remove_block_prefixes(_BlockOffset, <<>>) ->
- [];
-remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) ->
- remove_block_prefixes(1, Rest);
-remove_block_prefixes(BlockOffset, Bin) ->
- BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
- case size(Bin) of
- Size when Size > BlockBytesAvailable ->
- <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin,
- [DataBlock | remove_block_prefixes(0, Rest)];
- _Size ->
- [Bin]
- end.
-
-make_blocks(_BlockOffset, []) ->
- [];
-make_blocks(0, IoList) ->
- [<<0>> | make_blocks(1, IoList)];
-make_blocks(BlockOffset, IoList) ->
- case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
- {Begin, End} ->
- [Begin | make_blocks(0, End)];
- _SplitRemaining ->
- IoList
- end.
-
-%% @doc Returns a tuple where the first element contains the leading SplitAt
-%% bytes of the original iolist, and the 2nd element is the tail. If SplitAt
-%% is larger than byte_size(IoList), return the difference.
--spec split_iolist(IoList::iolist(), SplitAt::non_neg_integer(), Acc::list()) ->
- {iolist(), iolist()} | non_neg_integer().
-split_iolist(List, 0, BeginAcc) ->
- {lists:reverse(BeginAcc), List};
-split_iolist([], SplitAt, _BeginAcc) ->
- SplitAt;
-split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > byte_size(Bin) ->
- split_iolist(Rest, SplitAt - byte_size(Bin), [Bin | BeginAcc]);
-split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
- <<Begin:SplitAt/binary,End/binary>> = Bin,
- split_iolist([End | Rest], 0, [Begin | BeginAcc]);
-split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
- case split_iolist(Sublist, SplitAt, BeginAcc) of
- {Begin, End} ->
- {Begin, [End | Rest]};
- SplitRemaining ->
- split_iolist(Rest, SplitAt - (SplitAt - SplitRemaining), [Sublist | BeginAcc])
- end;
-split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
- split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).