From 16ccd4c0b8ae4272fa27d32948658b1424a291fc Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Mon, 25 May 2009 19:52:28 +0000 Subject: Merging new tail append storage into trunk. Upgrades are automatic, once opened by this version old versions of CouchDB will not be able to open the files. As a precaution, you should back-up your production databases before upgrading. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@778485 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db_updater.erl | 148 +++++++++++++++++++++++++-------------- 1 file changed, 96 insertions(+), 52 deletions(-) (limited to 'src/couchdb/couch_db_updater.erl') diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index b1cb9037..31ddbf8c 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -19,18 +19,18 @@ -include("couch_db.hrl"). --define(HEADER_SIG, <<$g, $m, $k, 0>>). init({MainPid, DbName, Filepath, Fd, Options}) -> case lists:member(create, Options) of true -> % create a new header and writes it to the file Header = #db_header{}, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header), + ok = couch_file:write_header(Fd, Header), % delete any old compaction files that might be hanging around file:delete(Filepath ++ ".compact"); false -> - {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG) + ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE + {ok, Header} = couch_file:read_header(Fd) end, Db = init_db(DbName, Filepath, Fd, Header), @@ -56,7 +56,7 @@ handle_call({update_docs, DocActions, Options}, _From, Db) -> end; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately -handle_call(full_commit, _From, Db) -> +handle_call(full_commit, _From, Db) -> {reply, ok, commit_data(Db)}; % commit the data and return ok handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), @@ -158,7 +158,7 @@ handle_cast(start_compact, Db) -> end; handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {ok, NewFd} = couch_file:open(CompactFilepath), - {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG), + {ok, NewHeader} = couch_file:read_header(NewFd), #db{update_seq=NewSeq} = NewDb = init_db(Db#db.name, Filepath, NewFd, NewHeader), unlink(NewFd), @@ -191,7 +191,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> end. handle_info(delayed_commit, Db) -> - {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}. + {noreply, commit_data(Db)}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -214,6 +214,7 @@ btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || {Rev, Seq, Bp} <- DeletedRevInfos]}; btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) -> + % 09 UPGRADE CODE % this is the 0.9.0 and earlier by_seq record. It's missing the body pointers % and individual seq nums for conflicts that are currently in the index, % meaning the filtered _changes api will not work except for on main docs. @@ -244,6 +245,7 @@ btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> (_RevId, ?REV_MISSING) -> ?REV_MISSING; (_RevId, {IsDeleted, BodyPointer}) -> + % 09 UPGRADE CODE % this is the 0.9.0 and earlier rev info record. It's missing the seq % nums, which means couchdb will sometimes reexamine unchanged % documents with the _changes API. @@ -280,17 +282,27 @@ less_docid(nil, _) -> true; % nil - special key sorts before all less_docid({}, _) -> false; % {} -> special key sorts after all less_docid(A, B) -> A < B. + init_db(DbName, Filepath, Fd, Header0) -> - case element(2, Header0) of - ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly - ?LATEST_DISK_VERSION -> ok; + Header1 = simple_upgrade_record(Header0, #db_header{}), + Header = + case element(2, Header1) of + 1 -> Header1#db_header{unused = 0}; % 0.9 + 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10 + ?LATEST_DISK_VERSION -> Header1; _ -> throw({database_disk_version_error, "Incorrect disk header version"}) end, - Header = simple_upgrade_record(Header0, #db_header{}), - {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), - ok = couch_stream:set_min_buffer(SummaryStream, 10000), Less = fun less_docid/2, - + + {ok, FsyncOptions} = couch_util:parse_term( + couch_config:get("couchdb", "fsync_options", + "[before_header, after_header, on_file_open]")), + + case lists:member(on_file_open, FsyncOptions) of + true -> ok = couch_file:sync(Fd); + _ -> ok + end, + {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, [{split, fun(X) -> btree_by_id_split(X) end}, {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, @@ -308,7 +320,6 @@ init_db(DbName, Filepath, Fd, Header0) -> AdminsPtr -> {ok, Admins} = couch_file:pread_term(Fd, AdminsPtr) end, - % convert start time tuple to microsecs and store as a binary string {MegaSecs, Secs, MicroSecs} = now(), StartTime = ?l2b(io_lib:format("~p", @@ -319,22 +330,22 @@ init_db(DbName, Filepath, Fd, Header0) -> fd=Fd, fd_ref_counter = RefCntr, header=Header, - summary_stream = SummaryStream, fulldocinfo_by_id_btree = IdBtree, docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, + committed_update_seq = Header#db_header.update_seq, update_seq = Header#db_header.update_seq, name = DbName, filepath = Filepath, admins = Admins, admins_ptr = AdminsPtr, instance_start_time = StartTime, - revs_limit = Header#db_header.revs_limit + revs_limit = Header#db_header.revs_limit, + fsync_options = FsyncOptions }. -close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) -> - couch_stream:close(SummaryStream), +close_db(#db{fd_ref_counter = RefCntr}) -> couch_ref_counter:drop(RefCntr). @@ -387,7 +398,7 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), throw(retry) end, - {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), + {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}), {IsDeleted, NewSummaryPointer, UpdateSeq}; _ -> Value @@ -549,18 +560,18 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> commit_data(Db) -> commit_data(Db, false). - -commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> - Header2 = Header#db_header{ +db_to_header(Db, Header) -> + Header#db_header{ update_seq = Db#db.update_seq, - summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), admins_ptr = Db#db.admins_ptr, - revs_limit = Db#db.revs_limit - }, - if Header == Header2 -> + revs_limit = Db#db.revs_limit}. + +commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) -> + Header = db_to_header(Db, OldHeader), + if OldHeader == Header -> Db; Delay and (Db#db.waiting_delayed_commit == nil) -> Db#db{waiting_delayed_commit= @@ -575,43 +586,75 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) -> end; true -> ok end, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), - Db#db{waiting_delayed_commit=nil,header=Header2} + case lists:member(before_header, FsyncOptions) of + true -> ok = couch_file:sync(Fd); + _ -> ok + end, + + ok = couch_file:write_header(Fd, Header), + + case lists:member(after_header, FsyncOptions) of + true -> ok = couch_file:sync(Fd); + _ -> ok + end, + + Db#db{waiting_delayed_commit=nil, + header=Header, + committed_update_seq=Db#db.update_seq} end. -copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> - {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), + +copy_doc_attachments(SrcFd, SrcSp, DestFd) -> + {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp), % copy the bin values - NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> - {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd), - {Name, {Type, NewBinSp, Len}} + NewBinInfos = lists:map( + fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null -> + % 09 UPGRADE CODE + {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd), + {Name, {Type, NewBinSp, Len}}; + ({Name, {Type, BinSp, Len}}) -> + {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), + {Name, {Type, NewBinSp, Len}} end, BinInfos), - % now write the document summary - {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}), - Sp. + {BodyData, NewBinInfos}. -copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> +copy_rev_tree_attachments(_SrcFd, _DestFd, []) -> []; -copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) -> +copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) -> % root nner node, only copy info/data from leaf nodes - [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]), - [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) -> + [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]), + [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; +copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) -> % This is a leaf node, copy it over - NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), - [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> + DocBody = copy_doc_attachments(SrcFd, Sp, DestFd), + [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]; +copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) -> % inner node, only copy info/data from leaf nodes - [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. + [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)]. + -copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) -> +copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> Ids = [Id || #doc_info{id=Id} <- InfoBySeq], LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), + + % write out the attachments NewFullDocInfos0 = lists:map( fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} + Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)} end, LookupResults), - NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0), + % write out the docs + % we do this in 2 stages so the docs are written out contiguously, making + % view indexing and replication faster. + NewFullDocInfos1 = lists:map( + fun(#full_doc_info{rev_tree=RevTree}=Info) -> + Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( + fun(_Key, {IsDel, DocBody, Seq}) -> + {ok, Pos} = couch_file:append_term(DestFd, DocBody), + {IsDel, Pos, Seq} + end, RevTree)} + end, NewFullDocInfos0), + + NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1), NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], RemoveSeqs = case Retry of @@ -633,7 +676,9 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info -copy_compact(Db, NewDb, Retry) -> +copy_compact(Db, NewDb0, Retry) -> + FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header], + NewDb = NewDb0#db{fsync_options=FsyncOptions}, TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), EnumBySeqFun = fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> @@ -677,15 +722,14 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> {ok, Fd} -> couch_task_status:add_task(<<"Database Compaction">>, <>, <<"Starting">>), Retry = true, - {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); + {ok, Header} = couch_file:read_header(Fd); {error, enoent} -> couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), Retry = false, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{}) + ok = couch_file:write_header(Fd, Header=#db_header{}) end, NewDb = init_db(Name, CompactFile, Fd, Header), - unlink(Fd), NewDb2 = copy_compact(Db, NewDb, Retry), gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}), -- cgit v1.2.3