summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db_updater.erl
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-05-25 19:52:28 +0000
committerDamien F. Katz <damien@apache.org>2009-05-25 19:52:28 +0000
commit16ccd4c0b8ae4272fa27d32948658b1424a291fc (patch)
treef6d59d017234409436091cc53938b27549d9b54f /src/couchdb/couch_db_updater.erl
parent4aac0f7c6dcd3f3a29cfe5e1bf2bee84b9bae9d5 (diff)
Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r--src/couchdb/couch_db_updater.erl148
1 files changed, 96 insertions, 52 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index b1cb9037..31ddbf8c 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -19,18 +19,18 @@
-include("couch_db.hrl").
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
init({MainPid, DbName, Filepath, Fd, Options}) ->
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+ ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
file:delete(Filepath ++ ".compact");
false ->
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+ ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE
+ {ok, Header} = couch_file:read_header(Fd)
end,
Db = init_db(DbName, Filepath, Fd, Header),
@@ -56,7 +56,7 @@ handle_call({update_docs, DocActions, Options}, _From, Db) ->
end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
+handle_call(full_commit, _From, Db) ->
{reply, ok, commit_data(Db)}; % commit the data and return ok
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
@@ -158,7 +158,7 @@ handle_cast(start_compact, Db) ->
end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+ {ok, NewHeader} = couch_file:read_header(NewFd),
#db{update_seq=NewSeq} = NewDb =
init_db(Db#db.name, Filepath, NewFd, NewHeader),
unlink(NewFd),
@@ -191,7 +191,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
end.
handle_info(delayed_commit, Db) ->
- {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}.
+ {noreply, commit_data(Db)}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -214,6 +214,7 @@ btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
[#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
{Rev, Seq, Bp} <- DeletedRevInfos]};
btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
+ % 09 UPGRADE CODE
% this is the 0.9.0 and earlier by_seq record. It's missing the body pointers
% and individual seq nums for conflicts that are currently in the index,
% meaning the filtered _changes api will not work except for on main docs.
@@ -244,6 +245,7 @@ btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
(_RevId, ?REV_MISSING) ->
?REV_MISSING;
(_RevId, {IsDeleted, BodyPointer}) ->
+ % 09 UPGRADE CODE
% this is the 0.9.0 and earlier rev info record. It's missing the seq
% nums, which means couchdb will sometimes reexamine unchanged
% documents with the _changes API.
@@ -280,17 +282,27 @@ less_docid(nil, _) -> true; % nil - special key sorts before all
less_docid({}, _) -> false; % {} -> special key sorts after all
less_docid(A, B) -> A < B.
+
init_db(DbName, Filepath, Fd, Header0) ->
- case element(2, Header0) of
- ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly
- ?LATEST_DISK_VERSION -> ok;
+ Header1 = simple_upgrade_record(Header0, #db_header{}),
+ Header =
+ case element(2, Header1) of
+ 1 -> Header1#db_header{unused = 0}; % 0.9
+ 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10
+ ?LATEST_DISK_VERSION -> Header1;
_ -> throw({database_disk_version_error, "Incorrect disk header version"})
end,
- Header = simple_upgrade_record(Header0, #db_header{}),
- {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
- ok = couch_stream:set_min_buffer(SummaryStream, 10000),
Less = fun less_docid/2,
-
+
+ {ok, FsyncOptions} = couch_util:parse_term(
+ couch_config:get("couchdb", "fsync_options",
+ "[before_header, after_header, on_file_open]")),
+
+ case lists:member(on_file_open, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
{ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
[{split, fun(X) -> btree_by_id_split(X) end},
{join, fun(X,Y) -> btree_by_id_join(X,Y) end},
@@ -308,7 +320,6 @@ init_db(DbName, Filepath, Fd, Header0) ->
AdminsPtr ->
{ok, Admins} = couch_file:pread_term(Fd, AdminsPtr)
end,
-
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
@@ -319,22 +330,22 @@ init_db(DbName, Filepath, Fd, Header0) ->
fd=Fd,
fd_ref_counter = RefCntr,
header=Header,
- summary_stream = SummaryStream,
fulldocinfo_by_id_btree = IdBtree,
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalDocsBtree,
+ committed_update_seq = Header#db_header.update_seq,
update_seq = Header#db_header.update_seq,
name = DbName,
filepath = Filepath,
admins = Admins,
admins_ptr = AdminsPtr,
instance_start_time = StartTime,
- revs_limit = Header#db_header.revs_limit
+ revs_limit = Header#db_header.revs_limit,
+ fsync_options = FsyncOptions
}.
-close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) ->
- couch_stream:close(SummaryStream),
+close_db(#db{fd_ref_counter = RefCntr}) ->
couch_ref_counter:drop(RefCntr).
@@ -387,7 +398,7 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
throw(retry)
end,
- {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
{IsDeleted, NewSummaryPointer, UpdateSeq};
_ ->
Value
@@ -549,18 +560,18 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
commit_data(Db) ->
commit_data(Db, false).
-
-commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
- Header2 = Header#db_header{
+db_to_header(Db, Header) ->
+ Header#db_header{
update_seq = Db#db.update_seq,
- summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
admins_ptr = Db#db.admins_ptr,
- revs_limit = Db#db.revs_limit
- },
- if Header == Header2 ->
+ revs_limit = Db#db.revs_limit}.
+
+commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) ->
+ Header = db_to_header(Db, OldHeader),
+ if OldHeader == Header ->
Db;
Delay and (Db#db.waiting_delayed_commit == nil) ->
Db#db{waiting_delayed_commit=
@@ -575,43 +586,75 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
end;
true -> ok
end,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
- Db#db{waiting_delayed_commit=nil,header=Header2}
+ case lists:member(before_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ ok = couch_file:write_header(Fd, Header),
+
+ case lists:member(after_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ Db#db{waiting_delayed_commit=nil,
+ header=Header,
+ committed_update_seq=Db#db.update_seq}
end.
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
- {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+
+copy_doc_attachments(SrcFd, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
% copy the bin values
- NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
- {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}}
+ NewBinInfos = lists:map(
+ fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
+ % 09 UPGRADE CODE
+ {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, {Type, NewBinSp, Len}};
+ ({Name, {Type, BinSp, Len}}) ->
+ {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, {Type, NewBinSp, Len}}
end, BinInfos),
- % now write the document summary
- {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
- Sp.
+ {BodyData, NewBinInfos}.
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+copy_rev_tree_attachments(_SrcFd, _DestFd, []) ->
[];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
% root nner node, only copy info/data from leaf nodes
- [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
- [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
+ [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]),
+ [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
% This is a leaf node, copy it over
- NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
- [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+ DocBody = copy_doc_attachments(SrcFd, Sp, DestFd),
+ [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
% inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+ [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)].
+
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+
+ % write out the attachments
NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+ Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)}
end, LookupResults),
- NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
+ % write out the docs
+ % we do this in 2 stages so the docs are written out contiguously, making
+ % view indexing and replication faster.
+ NewFullDocInfos1 = lists:map(
+ fun(#full_doc_info{rev_tree=RevTree}=Info) ->
+ Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
+ fun(_Key, {IsDel, DocBody, Seq}) ->
+ {ok, Pos} = couch_file:append_term(DestFd, DocBody),
+ {IsDel, Pos, Seq}
+ end, RevTree)}
+ end, NewFullDocInfos0),
+
+ NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
RemoveSeqs =
case Retry of
@@ -633,7 +676,9 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info
-copy_compact(Db, NewDb, Retry) ->
+copy_compact(Db, NewDb0, Retry) ->
+ FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
+ NewDb = NewDb0#db{fsync_options=FsyncOptions},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
EnumBySeqFun =
fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
@@ -677,15 +722,14 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
{ok, Fd} ->
couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+ {ok, Header} = couch_file:read_header(Fd);
{error, enoent} ->
couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
+ ok = couch_file:write_header(Fd, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
- unlink(Fd),
NewDb2 = copy_compact(Db, NewDb, Retry),
gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}),