diff options
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 499 |
1 files changed, 499 insertions, 0 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl new file mode 100644 index 00000000..f0673af9 --- /dev/null +++ b/src/couchdb/couch_db_updater.erl @@ -0,0 +1,499 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_db_updater). +-behaviour(gen_server). + +-export([btree_by_id_reduce/2,btree_by_seq_reduce/2]). +-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). + +-include("couch_db.hrl"). + +-define(HEADER_SIG, <<$g, $m, $k, 0>>). + +init({MainPid, DbName, Filepath, Fd, Options}) -> + link(Fd), + case lists:member(create, Options) of + true -> + % create a new header and writes it to the file + Header = #db_header{}, + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header), + % delete any old compaction files that might be hanging around + file:delete(Filepath ++ ".compact"), + file:delete(Filepath ++ ".old"); + false -> + {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG) + end, + + Db = init_db(DbName, Filepath, Fd, Header), + {ok, Db#db{main_pid=MainPid}}. + +terminate(_Reason, Db) -> + close_db(Db). + +handle_call(get_db, _From, Db) -> + {reply, {ok, Db}, Db}; +handle_call({update_docs, DocActions, Options}, _From, Db) -> + try update_docs_int(Db, DocActions, Options) of + {ok, Db2} -> + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + couch_db_update_notifier:notify({updated, Db2#db.name}), + {reply, ok, Db2} + catch + throw: retry -> + {reply, retry, Db}; + throw: conflict -> + {reply, conflict, Db} + end; +handle_call(increment_update_seq, _From, Db) -> + Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + couch_db_update_notifier:notify({updated, Db#db.name}), + {reply, {ok, Db2#db.update_seq}, Db2}. + + +handle_cast(start_compact, Db) -> + case Db#db.compactor_pid of + nil -> + ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), + Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end), + Db2 = Db#db{compactor_pid=Pid}, + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + {noreply, Db2}; + _ -> + % compact currently running, this is a no-op + {noreply, Db} + end; +handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> + {ok, NewFd} = couch_file:open(CompactFilepath), + {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG), + #db{update_seq=NewSeq} = NewDb = + init_db(Db#db.name, CompactFilepath, NewFd, NewHeader), + case Db#db.update_seq == NewSeq of + true -> + NewDb2 = commit_data( + NewDb#db{ + main_pid = Db#db.main_pid, + doc_count = Db#db.doc_count, + doc_del_count = Db#db.doc_del_count, + filepath = Filepath}), + + ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), + file:delete(Filepath), + ok = file:rename(CompactFilepath, Filepath), + + couch_stream:close(Db#db.summary_stream), + couch_file:close_maybe(Db#db.fd), + file:delete(Filepath ++ ".old"), + + ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}), + ?LOG_INFO("Compaction for db ~p completed.", [Db#db.name]), + {noreply, NewDb2#db{compactor_pid=nil}}; + false -> + ?LOG_INFO("Compaction file still behind main file " + "(update seq=~p. compact update seq=~p). Retrying.", + [Db#db.update_seq, NewSeq]), + Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end), + Db2 = Db#db{compactor_pid=Pid}, + couch_file:close(NewFd), + {noreply, Db2} + end. + +handle_info(Msg, Db) -> + ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), + exit({error, Msg}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +btree_by_seq_split(DocInfo) -> + #doc_info{ + id = Id, + rev = Rev, + update_seq = Seq, + summary_pointer = Sp, + conflict_revs = Conflicts, + deleted_conflict_revs = DelConflicts, + deleted = Deleted} = DocInfo, + {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}. + +btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) -> + #doc_info{ + id = Id, + rev = Rev, + update_seq = Seq, + summary_pointer = Sp, + conflict_revs = Conflicts, + deleted_conflict_revs = DelConflicts, + deleted = Deleted}. + +btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, + deleted=Deleted, rev_tree=Tree}) -> + {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}. + +btree_by_id_join(Id, {Seq, Deleted, Tree}) -> + #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}. + + + +btree_by_id_reduce(reduce, FullDocInfos) -> + % count the number of deleted documents + length([1 || #full_doc_info{deleted=false} <- FullDocInfos]); +btree_by_id_reduce(rereduce, Reds) -> + lists:sum(Reds). + +btree_by_seq_reduce(reduce, DocInfos) -> + % count the number of deleted documents + length(DocInfos); +btree_by_seq_reduce(rereduce, Reds) -> + lists:sum(Reds). + +init_db(DbName, Filepath, Fd, Header) -> + {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), + ok = couch_stream:set_min_buffer(SummaryStream, 10000), + {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, + [{split, fun(X) -> btree_by_id_split(X) end}, + {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, + {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]), + {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, + [{split, fun(X) -> btree_by_seq_split(X) end}, + {join, fun(X,Y) -> btree_by_seq_join(X,Y) end}, + {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]), + {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), + + #db{ + update_pid=self(), + fd=Fd, + header=Header, + summary_stream = SummaryStream, + fulldocinfo_by_id_btree = IdBtree, + docinfo_by_seq_btree = SeqBtree, + local_docs_btree = LocalDocsBtree, + update_seq = Header#db_header.update_seq, + doc_count = Header#db_header.doc_count, + doc_del_count = Header#db_header.doc_del_count, + name = DbName, + filepath=Filepath }. + +close_db(#db{fd=Fd,summary_stream=Ss}) -> + couch_file:close(Fd), + couch_stream:close(Ss). + +% rev tree functions + +doc_to_tree(Doc) -> + doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). + +doc_to_tree(Doc, [RevId]) -> + [{RevId, Doc, []}]; +doc_to_tree(Doc, [RevId | Rest]) -> + [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}]. + +flush_trees(_Db, [], AccFlushedTrees) -> + {ok, lists:reverse(AccFlushedTrees)}; +flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> + #full_doc_info{rev_tree=Unflushed} = InfoUnflushed, + Flushed = couch_key_tree:map( + fun(_Rev, Value) -> + case Value of + #doc{attachments=Atts,deleted=IsDeleted}=Doc -> + % this node value is actually an unwritten document summary, + % write to disk. + % make sure the Fd in the written bins is the same Fd we are. + Bins = + case Atts of + [] -> []; + [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd -> + % convert bins, removing the FD. + % All bins should have been flushed to disk already. + [{BinName, {BinType, BinSp, BinLen}} + || {BinName, {BinType, {_Fd, BinSp, BinLen}}} + <- Atts]; + _ -> + % BinFd must not equal our Fd. This can happen when a database + % is being switched out during a compaction + ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), + throw(retry) + end, + {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), + {IsDeleted, NewSummaryPointer}; + _ -> + Value + end + end, Unflushed), + flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). + +merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) -> + {ok, lists:reverse(AccNewInfos), AccSeq}; +merge_rev_trees(NoConflicts, [NewDocs|RestDocsList], + [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) -> + #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo, + UpdatesRevTree = lists:foldl( + fun(NewDoc, AccTree) -> + couch_key_tree:merge(AccTree, doc_to_tree(NewDoc)) + end, + [], NewDocs), + NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), + if NewRevTree == OldTree -> + % nothing changed + merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq); + true -> + if NoConflicts andalso OldTree /= [] -> + OldConflicts = couch_key_tree:count_leafs(OldTree), + NewConflicts = couch_key_tree:count_leafs(NewRevTree), + if NewConflicts > OldConflicts -> + throw(conflict); + true -> ok + end; + true -> ok + end, + NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, + merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo, + [NewInfo|AccNewInfos],AccSeq+1) + end. + +new_index_entries([], DocCount, DelCount, AccById, AccBySeq) -> + {ok, DocCount, DelCount, AccById, AccBySeq}; +new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) -> + #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), + {DocCount2, DelCount2} = + if Deleted -> {DocCount, DelCount + 1}; + true -> {DocCount + 1, DelCount} + end, + new_index_entries(RestInfos, DocCount2, DelCount2, + [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], + [DocInfo|AccBySeq]). + +update_docs_int(Db, DocsList, Options) -> + #db{ + fulldocinfo_by_id_btree = DocInfoByIdBTree, + docinfo_by_seq_btree = DocInfoBySeqBTree, + update_seq = LastSeq, + doc_count = FullDocCount, + doc_del_count = FullDelCount + } = Db, + + % separate out the NonRep documents from the rest of the documents + {DocsList2, NonRepDocs} = lists:foldl( + fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> + case Id of + ?LOCAL_DOC_PREFIX ++ _ when Rest==[] -> + % when saving NR (non rep) documents, you can only save a single rev + {DocsListAcc, [Doc | NonRepDocsAcc]}; + Id-> + {[Docs | DocsListAcc], NonRepDocsAcc} + end + end, {[], []}, DocsList), + + Ids = [Id || [#doc{id=Id}|_] <- DocsList2], + + % lookup up the existing documents, if they exist. + OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), + OldDocInfos = lists:zipwith( + fun(_Id, {ok, FullDocInfo}) -> + FullDocInfo; + (Id, not_found) -> + #full_doc_info{id=Id} + end, + Ids, OldDocLookups), + + {OldCount, OldDelCount} = lists:foldl( + fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{deleted=false} -> + {OldCountAcc + 1, OldDelCountAcc}; + _ -> + {OldCountAcc, OldDelCountAcc + 1} + end; + (not_found, Acc) -> + Acc + end, {0, 0}, OldDocLookups), + + % Merge the new docs into the revision trees. + NoConflicts = lists:member(new_edits, Options), + {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq), + + RemoveSeqs = + [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], + + % All regular documents are now ready to write. + + % Try to write the local documents first, a conflict might be generated + {ok, Db2} = update_local_docs(Db, NonRepDocs), + + % Write out the documents summaries (they are stored in the nodes of the rev trees) + {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), + + {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = + new_index_entries(FlushedDocInfos, 0, 0, [], []), + + % and the indexes to the documents + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), + + Db3 = Db2#db{ + fulldocinfo_by_id_btree = DocInfoByIdBTree2, + docinfo_by_seq_btree = DocInfoBySeqBTree2, + update_seq = NewSeq, + doc_count = FullDocCount + NewDocsCount - OldCount, + doc_del_count = FullDelCount + NewDelCount - OldDelCount}, + + case lists:member(delay_commit, Options) of + true -> + {ok, Db3}; + false -> + {ok, commit_data(Db3)} + end. + +update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> + Ids = [Id || #doc{id=Id} <- Docs], + OldDocLookups = couch_btree:lookup(Btree, Ids), + BtreeEntries = lists:zipwith( + fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> + NewRev = + case Revs of + [] -> 0; + [RevStr|_] -> list_to_integer(RevStr) + end, + OldRev = + case OldDocLookup of + {ok, {_, {OldRev0, _}}} -> OldRev0; + not_found -> 0 + end, + case OldRev + 1 == NewRev of + true -> + case Delete of + false -> {update, {Id, {NewRev, Body}}}; + true -> {remove, Id} + end; + false -> + throw(conflict) + end + + end, Docs, OldDocLookups), + + BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], + BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], + + {ok, Btree2} = + couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), + + {ok, Db#db{local_docs_btree = Btree2}}. + + + +commit_data(#db{fd=Fd, header=Header} = Db) -> + Header2 = Header#db_header{ + update_seq = Db#db.update_seq, + summary_stream_state = couch_stream:get_state(Db#db.summary_stream), + docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), + fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), + local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), + doc_count = Db#db.doc_count, + doc_del_count = Db#db.doc_del_count + }, + if Header == Header2 -> + Db; % unchanged. nothing to do + true -> + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), + Db#db{header = Header2} + end. + +copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> + {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), + % copy the bin values + NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> + {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd), + {Name, {Type, NewBinSp, Len}} + end, BinInfos), + % now write the document summary + {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}), + Sp. + +copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> + []; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> + % This is a leaf node, copy it over + NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), + [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> + % inner node, only copy info/data from leaf nodes + [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. + +copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) -> + Ids = [Id || #doc_info{id=Id} <- InfoBySeq], + LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), + NewFullDocInfos = lists:map( + fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> + Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} + end, LookupResults), + NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos], + {ok, DocInfoBTree} = + couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []), + {ok, FullDocInfoBTree} = + couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), + NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}. + + + +copy_compact_docs(Db, NewDb) -> + EnumBySeqFun = + fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) -> + case couch_util:should_flush() of + true -> + NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])), + {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}}; + false -> + {ok, {AccNewDb, [DocInfo | AccUncopied]}} + end + end, + {ok, {NewDb2, Uncopied}} = + couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}), + + case Uncopied of + [#doc_info{update_seq=LastSeq} | _] -> + commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq}, + lists:reverse(Uncopied))); + [] -> + NewDb2 + end. + +start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> + CompactFile = Filepath ++ ".compact", + ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), + case couch_file:open(CompactFile) of + {ok, Fd} -> + ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]), + {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); + {error, enoent} -> + receive after 1000 -> ok end, + {ok, Fd} = couch_file:open(CompactFile, [create]), + Header = #db_header{}, + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header) + end, + NewDb = init_db(Name, CompactFile, Fd, Header), + NewDb2 = copy_compact_docs(Db, NewDb), + NewDb3 = + case CopyLocal of + true -> + % suck up all the local docs into memory and write them to the new db + {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), + commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}); + _ -> + NewDb2 + end, + close_db(NewDb3), + + gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}).
\ No newline at end of file |