summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db_updater.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r--src/couchdb/couch_db_updater.erl148
1 files changed, 96 insertions, 52 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index b1cb9037..31ddbf8c 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -19,18 +19,18 @@
-include("couch_db.hrl").
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
init({MainPid, DbName, Filepath, Fd, Options}) ->
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+ ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
file:delete(Filepath ++ ".compact");
false ->
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+ ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>), % 09 UPGRADE CODE
+ {ok, Header} = couch_file:read_header(Fd)
end,
Db = init_db(DbName, Filepath, Fd, Header),
@@ -56,7 +56,7 @@ handle_call({update_docs, DocActions, Options}, _From, Db) ->
end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
+handle_call(full_commit, _From, Db) ->
{reply, ok, commit_data(Db)}; % commit the data and return ok
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
@@ -158,7 +158,7 @@ handle_cast(start_compact, Db) ->
end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+ {ok, NewHeader} = couch_file:read_header(NewFd),
#db{update_seq=NewSeq} = NewDb =
init_db(Db#db.name, Filepath, NewFd, NewHeader),
unlink(NewFd),
@@ -191,7 +191,7 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
end.
handle_info(delayed_commit, Db) ->
- {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}.
+ {noreply, commit_data(Db)}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -214,6 +214,7 @@ btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
[#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
{Rev, Seq, Bp} <- DeletedRevInfos]};
btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
+ % 09 UPGRADE CODE
% this is the 0.9.0 and earlier by_seq record. It's missing the body pointers
% and individual seq nums for conflicts that are currently in the index,
% meaning the filtered _changes api will not work except for on main docs.
@@ -244,6 +245,7 @@ btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
(_RevId, ?REV_MISSING) ->
?REV_MISSING;
(_RevId, {IsDeleted, BodyPointer}) ->
+ % 09 UPGRADE CODE
% this is the 0.9.0 and earlier rev info record. It's missing the seq
% nums, which means couchdb will sometimes reexamine unchanged
% documents with the _changes API.
@@ -280,17 +282,27 @@ less_docid(nil, _) -> true; % nil - special key sorts before all
less_docid({}, _) -> false; % {} -> special key sorts after all
less_docid(A, B) -> A < B.
+
init_db(DbName, Filepath, Fd, Header0) ->
- case element(2, Header0) of
- ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly
- ?LATEST_DISK_VERSION -> ok;
+ Header1 = simple_upgrade_record(Header0, #db_header{}),
+ Header =
+ case element(2, Header1) of
+ 1 -> Header1#db_header{unused = 0}; % 0.9
+ 2 -> Header1#db_header{unused = 0}; % post 0.9 and pre 0.10
+ ?LATEST_DISK_VERSION -> Header1;
_ -> throw({database_disk_version_error, "Incorrect disk header version"})
end,
- Header = simple_upgrade_record(Header0, #db_header{}),
- {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
- ok = couch_stream:set_min_buffer(SummaryStream, 10000),
Less = fun less_docid/2,
-
+
+ {ok, FsyncOptions} = couch_util:parse_term(
+ couch_config:get("couchdb", "fsync_options",
+ "[before_header, after_header, on_file_open]")),
+
+ case lists:member(on_file_open, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
{ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
[{split, fun(X) -> btree_by_id_split(X) end},
{join, fun(X,Y) -> btree_by_id_join(X,Y) end},
@@ -308,7 +320,6 @@ init_db(DbName, Filepath, Fd, Header0) ->
AdminsPtr ->
{ok, Admins} = couch_file:pread_term(Fd, AdminsPtr)
end,
-
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
@@ -319,22 +330,22 @@ init_db(DbName, Filepath, Fd, Header0) ->
fd=Fd,
fd_ref_counter = RefCntr,
header=Header,
- summary_stream = SummaryStream,
fulldocinfo_by_id_btree = IdBtree,
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalDocsBtree,
+ committed_update_seq = Header#db_header.update_seq,
update_seq = Header#db_header.update_seq,
name = DbName,
filepath = Filepath,
admins = Admins,
admins_ptr = AdminsPtr,
instance_start_time = StartTime,
- revs_limit = Header#db_header.revs_limit
+ revs_limit = Header#db_header.revs_limit,
+ fsync_options = FsyncOptions
}.
-close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) ->
- couch_stream:close(SummaryStream),
+close_db(#db{fd_ref_counter = RefCntr}) ->
couch_ref_counter:drop(RefCntr).
@@ -387,7 +398,7 @@ flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
throw(retry)
end,
- {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
{IsDeleted, NewSummaryPointer, UpdateSeq};
_ ->
Value
@@ -549,18 +560,18 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
commit_data(Db) ->
commit_data(Db, false).
-
-commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
- Header2 = Header#db_header{
+db_to_header(Db, Header) ->
+ Header#db_header{
update_seq = Db#db.update_seq,
- summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
admins_ptr = Db#db.admins_ptr,
- revs_limit = Db#db.revs_limit
- },
- if Header == Header2 ->
+ revs_limit = Db#db.revs_limit}.
+
+commit_data(#db{fd=Fd,header=OldHeader,fsync_options=FsyncOptions}=Db, Delay) ->
+ Header = db_to_header(Db, OldHeader),
+ if OldHeader == Header ->
Db;
Delay and (Db#db.waiting_delayed_commit == nil) ->
Db#db{waiting_delayed_commit=
@@ -575,43 +586,75 @@ commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
end;
true -> ok
end,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
- Db#db{waiting_delayed_commit=nil,header=Header2}
+ case lists:member(before_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ ok = couch_file:write_header(Fd, Header),
+
+ case lists:member(after_header, FsyncOptions) of
+ true -> ok = couch_file:sync(Fd);
+ _ -> ok
+ end,
+
+ Db#db{waiting_delayed_commit=nil,
+ header=Header,
+ committed_update_seq=Db#db.update_seq}
end.
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
- {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+
+copy_doc_attachments(SrcFd, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
% copy the bin values
- NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
- {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}}
+ NewBinInfos = lists:map(
+ fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
+ % 09 UPGRADE CODE
+ {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, {Type, NewBinSp, Len}};
+ ({Name, {Type, BinSp, Len}}) ->
+ {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+ {Name, {Type, NewBinSp, Len}}
end, BinInfos),
- % now write the document summary
- {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
- Sp.
+ {BodyData, NewBinInfos}.
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+copy_rev_tree_attachments(_SrcFd, _DestFd, []) ->
[];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+copy_rev_tree_attachments(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
% root nner node, only copy info/data from leaf nodes
- [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
- [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
+ [Tree2] = copy_rev_tree_attachments(SrcFd, DestFd, [Tree]),
+ [{Start, Tree2} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
% This is a leaf node, copy it over
- NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
- [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+ DocBody = copy_doc_attachments(SrcFd, Sp, DestFd),
+ [{RevId, {IsDel, DocBody, Seq}, []} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)];
+copy_rev_tree_attachments(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
% inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+ [{RevId, ?REV_MISSING, copy_rev_tree_attachments(SrcFd, DestFd, SubTree)} | copy_rev_tree_attachments(SrcFd, DestFd, RestTree)].
+
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+
+ % write out the attachments
NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+ Info#full_doc_info{rev_tree=copy_rev_tree_attachments(SrcFd, DestFd, RevTree)}
end, LookupResults),
- NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
+ % write out the docs
+ % we do this in 2 stages so the docs are written out contiguously, making
+ % view indexing and replication faster.
+ NewFullDocInfos1 = lists:map(
+ fun(#full_doc_info{rev_tree=RevTree}=Info) ->
+ Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
+ fun(_Key, {IsDel, DocBody, Seq}) ->
+ {ok, Pos} = couch_file:append_term(DestFd, DocBody),
+ {IsDel, Pos, Seq}
+ end, RevTree)}
+ end, NewFullDocInfos0),
+
+ NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
RemoveSeqs =
case Retry of
@@ -633,7 +676,9 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info
-copy_compact(Db, NewDb, Retry) ->
+copy_compact(Db, NewDb0, Retry) ->
+ FsyncOptions = [Op || Op <- NewDb0#db.fsync_options, Op == before_header],
+ NewDb = NewDb0#db{fsync_options=FsyncOptions},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
EnumBySeqFun =
fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
@@ -677,15 +722,14 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
{ok, Fd} ->
couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+ {ok, Header} = couch_file:read_header(Fd);
{error, enoent} ->
couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
+ ok = couch_file:write_header(Fd, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
- unlink(Fd),
NewDb2 = copy_compact(Db, NewDb, Retry),
gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}),