summaryrefslogtreecommitdiff
path: root/src/couchdb/couch_db_updater.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couchdb/couch_db_updater.erl')
-rw-r--r--src/couchdb/couch_db_updater.erl499
1 files changed, 499 insertions, 0 deletions
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
new file mode 100644
index 00000000..f0673af9
--- /dev/null
+++ b/src/couchdb/couch_db_updater.erl
@@ -0,0 +1,499 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater).
+-behaviour(gen_server).
+
+-export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(HEADER_SIG, <<$g, $m, $k, 0>>).
+
+init({MainPid, DbName, Filepath, Fd, Options}) ->
+ link(Fd),
+ case lists:member(create, Options) of
+ true ->
+ % create a new header and writes it to the file
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+ % delete any old compaction files that might be hanging around
+ file:delete(Filepath ++ ".compact"),
+ file:delete(Filepath ++ ".old");
+ false ->
+ {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+ end,
+
+ Db = init_db(DbName, Filepath, Fd, Header),
+ {ok, Db#db{main_pid=MainPid}}.
+
+terminate(_Reason, Db) ->
+ close_db(Db).
+
+handle_call(get_db, _From, Db) ->
+ {reply, {ok, Db}, Db};
+handle_call({update_docs, DocActions, Options}, _From, Db) ->
+ try update_docs_int(Db, DocActions, Options) of
+ {ok, Db2} ->
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ couch_db_update_notifier:notify({updated, Db2#db.name}),
+ {reply, ok, Db2}
+ catch
+ throw: retry ->
+ {reply, retry, Db};
+ throw: conflict ->
+ {reply, conflict, Db}
+ end;
+handle_call(increment_update_seq, _From, Db) ->
+ Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ couch_db_update_notifier:notify({updated, Db#db.name}),
+ {reply, {ok, Db2#db.update_seq}, Db2}.
+
+
+handle_cast(start_compact, Db) ->
+ case Db#db.compactor_pid of
+ nil ->
+ ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
+ Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end),
+ Db2 = Db#db{compactor_pid=Pid},
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ {noreply, Db2};
+ _ ->
+ % compact currently running, this is a no-op
+ {noreply, Db}
+ end;
+handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
+ {ok, NewFd} = couch_file:open(CompactFilepath),
+ {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+ #db{update_seq=NewSeq} = NewDb =
+ init_db(Db#db.name, CompactFilepath, NewFd, NewHeader),
+ case Db#db.update_seq == NewSeq of
+ true ->
+ NewDb2 = commit_data(
+ NewDb#db{
+ main_pid = Db#db.main_pid,
+ doc_count = Db#db.doc_count,
+ doc_del_count = Db#db.doc_del_count,
+ filepath = Filepath}),
+
+ ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
+ file:delete(Filepath),
+ ok = file:rename(CompactFilepath, Filepath),
+
+ couch_stream:close(Db#db.summary_stream),
+ couch_file:close_maybe(Db#db.fd),
+ file:delete(Filepath ++ ".old"),
+
+ ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
+ ?LOG_INFO("Compaction for db ~p completed.", [Db#db.name]),
+ {noreply, NewDb2#db{compactor_pid=nil}};
+ false ->
+ ?LOG_INFO("Compaction file still behind main file "
+ "(update seq=~p. compact update seq=~p). Retrying.",
+ [Db#db.update_seq, NewSeq]),
+ Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end),
+ Db2 = Db#db{compactor_pid=Pid},
+ couch_file:close(NewFd),
+ {noreply, Db2}
+ end.
+
+handle_info(Msg, Db) ->
+ ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
+ exit({error, Msg}).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+btree_by_seq_split(DocInfo) ->
+ #doc_info{
+ id = Id,
+ rev = Rev,
+ update_seq = Seq,
+ summary_pointer = Sp,
+ conflict_revs = Conflicts,
+ deleted_conflict_revs = DelConflicts,
+ deleted = Deleted} = DocInfo,
+ {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}.
+
+btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
+ #doc_info{
+ id = Id,
+ rev = Rev,
+ update_seq = Seq,
+ summary_pointer = Sp,
+ conflict_revs = Conflicts,
+ deleted_conflict_revs = DelConflicts,
+ deleted = Deleted}.
+
+btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
+ deleted=Deleted, rev_tree=Tree}) ->
+ {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}.
+
+btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
+ #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
+
+
+
+btree_by_id_reduce(reduce, FullDocInfos) ->
+ % count the number of deleted documents
+ length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
+btree_by_id_reduce(rereduce, Reds) ->
+ lists:sum(Reds).
+
+btree_by_seq_reduce(reduce, DocInfos) ->
+ % count the number of deleted documents
+ length(DocInfos);
+btree_by_seq_reduce(rereduce, Reds) ->
+ lists:sum(Reds).
+
+init_db(DbName, Filepath, Fd, Header) ->
+ {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
+ ok = couch_stream:set_min_buffer(SummaryStream, 10000),
+ {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
+ [{split, fun(X) -> btree_by_id_split(X) end},
+ {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
+ {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]),
+ {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
+ [{split, fun(X) -> btree_by_seq_split(X) end},
+ {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
+ {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]),
+ {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
+
+ #db{
+ update_pid=self(),
+ fd=Fd,
+ header=Header,
+ summary_stream = SummaryStream,
+ fulldocinfo_by_id_btree = IdBtree,
+ docinfo_by_seq_btree = SeqBtree,
+ local_docs_btree = LocalDocsBtree,
+ update_seq = Header#db_header.update_seq,
+ doc_count = Header#db_header.doc_count,
+ doc_del_count = Header#db_header.doc_del_count,
+ name = DbName,
+ filepath=Filepath }.
+
+close_db(#db{fd=Fd,summary_stream=Ss}) ->
+ couch_file:close(Fd),
+ couch_stream:close(Ss).
+
+% rev tree functions
+
+doc_to_tree(Doc) ->
+ doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
+
+doc_to_tree(Doc, [RevId]) ->
+ [{RevId, Doc, []}];
+doc_to_tree(Doc, [RevId | Rest]) ->
+ [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
+
+flush_trees(_Db, [], AccFlushedTrees) ->
+ {ok, lists:reverse(AccFlushedTrees)};
+flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
+ #full_doc_info{rev_tree=Unflushed} = InfoUnflushed,
+ Flushed = couch_key_tree:map(
+ fun(_Rev, Value) ->
+ case Value of
+ #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
+ % this node value is actually an unwritten document summary,
+ % write to disk.
+ % make sure the Fd in the written bins is the same Fd we are.
+ Bins =
+ case Atts of
+ [] -> [];
+ [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
+ % convert bins, removing the FD.
+ % All bins should have been flushed to disk already.
+ [{BinName, {BinType, BinSp, BinLen}}
+ || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+ <- Atts];
+ _ ->
+ % BinFd must not equal our Fd. This can happen when a database
+ % is being switched out during a compaction
+ ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
+ throw(retry)
+ end,
+ {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {IsDeleted, NewSummaryPointer};
+ _ ->
+ Value
+ end
+ end, Unflushed),
+ flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
+
+merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) ->
+ {ok, lists:reverse(AccNewInfos), AccSeq};
+merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
+ [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) ->
+ #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo,
+ UpdatesRevTree = lists:foldl(
+ fun(NewDoc, AccTree) ->
+ couch_key_tree:merge(AccTree, doc_to_tree(NewDoc))
+ end,
+ [], NewDocs),
+ NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
+ if NewRevTree == OldTree ->
+ % nothing changed
+ merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq);
+ true ->
+ if NoConflicts andalso OldTree /= [] ->
+ OldConflicts = couch_key_tree:count_leafs(OldTree),
+ NewConflicts = couch_key_tree:count_leafs(NewRevTree),
+ if NewConflicts > OldConflicts ->
+ throw(conflict);
+ true -> ok
+ end;
+ true -> ok
+ end,
+ NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
+ merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo,
+ [NewInfo|AccNewInfos],AccSeq+1)
+ end.
+
+new_index_entries([], DocCount, DelCount, AccById, AccBySeq) ->
+ {ok, DocCount, DelCount, AccById, AccBySeq};
+new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) ->
+ #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
+ {DocCount2, DelCount2} =
+ if Deleted -> {DocCount, DelCount + 1};
+ true -> {DocCount + 1, DelCount}
+ end,
+ new_index_entries(RestInfos, DocCount2, DelCount2,
+ [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
+ [DocInfo|AccBySeq]).
+
+update_docs_int(Db, DocsList, Options) ->
+ #db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree,
+ docinfo_by_seq_btree = DocInfoBySeqBTree,
+ update_seq = LastSeq,
+ doc_count = FullDocCount,
+ doc_del_count = FullDelCount
+ } = Db,
+
+ % separate out the NonRep documents from the rest of the documents
+ {DocsList2, NonRepDocs} = lists:foldl(
+ fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
+ case Id of
+ ?LOCAL_DOC_PREFIX ++ _ when Rest==[] ->
+ % when saving NR (non rep) documents, you can only save a single rev
+ {DocsListAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Docs | DocsListAcc], NonRepDocsAcc}
+ end
+ end, {[], []}, DocsList),
+
+ Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
+
+ % lookup up the existing documents, if they exist.
+ OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
+ OldDocInfos = lists:zipwith(
+ fun(_Id, {ok, FullDocInfo}) ->
+ FullDocInfo;
+ (Id, not_found) ->
+ #full_doc_info{id=Id}
+ end,
+ Ids, OldDocLookups),
+
+ {OldCount, OldDelCount} = lists:foldl(
+ fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{deleted=false} ->
+ {OldCountAcc + 1, OldDelCountAcc};
+ _ ->
+ {OldCountAcc, OldDelCountAcc + 1}
+ end;
+ (not_found, Acc) ->
+ Acc
+ end, {0, 0}, OldDocLookups),
+
+ % Merge the new docs into the revision trees.
+ NoConflicts = lists:member(new_edits, Options),
+ {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
+
+ RemoveSeqs =
+ [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
+
+ % All regular documents are now ready to write.
+
+ % Try to write the local documents first, a conflict might be generated
+ {ok, Db2} = update_local_docs(Db, NonRepDocs),
+
+ % Write out the documents summaries (they are stored in the nodes of the rev trees)
+ {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
+
+ {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
+ new_index_entries(FlushedDocInfos, 0, 0, [], []),
+
+ % and the indexes to the documents
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
+
+ Db3 = Db2#db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+ docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ update_seq = NewSeq,
+ doc_count = FullDocCount + NewDocsCount - OldCount,
+ doc_del_count = FullDelCount + NewDelCount - OldDelCount},
+
+ case lists:member(delay_commit, Options) of
+ true ->
+ {ok, Db3};
+ false ->
+ {ok, commit_data(Db3)}
+ end.
+
+update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+ Ids = [Id || #doc{id=Id} <- Docs],
+ OldDocLookups = couch_btree:lookup(Btree, Ids),
+ BtreeEntries = lists:zipwith(
+ fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
+ NewRev =
+ case Revs of
+ [] -> 0;
+ [RevStr|_] -> list_to_integer(RevStr)
+ end,
+ OldRev =
+ case OldDocLookup of
+ {ok, {_, {OldRev0, _}}} -> OldRev0;
+ not_found -> 0
+ end,
+ case OldRev + 1 == NewRev of
+ true ->
+ case Delete of
+ false -> {update, {Id, {NewRev, Body}}};
+ true -> {remove, Id}
+ end;
+ false ->
+ throw(conflict)
+ end
+
+ end, Docs, OldDocLookups),
+
+ BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+ BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
+
+ {ok, Btree2} =
+ couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
+
+ {ok, Db#db{local_docs_btree = Btree2}}.
+
+
+
+commit_data(#db{fd=Fd, header=Header} = Db) ->
+ Header2 = Header#db_header{
+ update_seq = Db#db.update_seq,
+ summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
+ docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
+ fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
+ local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+ doc_count = Db#db.doc_count,
+ doc_del_count = Db#db.doc_del_count
+ },
+ if Header == Header2 ->
+ Db; % unchanged. nothing to do
+ true ->
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
+ Db#db{header = Header2}
+ end.
+
+copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
+ {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+ % copy the bin values
+ NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
+ {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, {Type, NewBinSp, Len}}
+ end, BinInfos),
+ % now write the document summary
+ {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
+ Sp.
+
+copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+ [];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
+ % This is a leaf node, copy it over
+ NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
+ [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+ % inner node, only copy info/data from leaf nodes
+ [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
+ Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+ LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+ NewFullDocInfos = lists:map(
+ fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
+ Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+ end, LookupResults),
+ NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos],
+ {ok, DocInfoBTree} =
+ couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []),
+ {ok, FullDocInfoBTree} =
+ couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+ NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}.
+
+
+
+copy_compact_docs(Db, NewDb) ->
+ EnumBySeqFun =
+ fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
+ case couch_util:should_flush() of
+ true ->
+ NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
+ {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
+ false ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied]}}
+ end
+ end,
+ {ok, {NewDb2, Uncopied}} =
+ couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}),
+
+ case Uncopied of
+ [#doc_info{update_seq=LastSeq} | _] ->
+ commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq},
+ lists:reverse(Uncopied)));
+ [] ->
+ NewDb2
+ end.
+
+start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
+ CompactFile = Filepath ++ ".compact",
+ ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
+ case couch_file:open(CompactFile) of
+ {ok, Fd} ->
+ ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
+ {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+ {error, enoent} ->
+ receive after 1000 -> ok end,
+ {ok, Fd} = couch_file:open(CompactFile, [create]),
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header)
+ end,
+ NewDb = init_db(Name, CompactFile, Fd, Header),
+ NewDb2 = copy_compact_docs(Db, NewDb),
+ NewDb3 =
+ case CopyLocal of
+ true ->
+ % suck up all the local docs into memory and write them to the new db
+ {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
+ commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
+ _ ->
+ NewDb2
+ end,
+ close_db(NewDb3),
+
+ gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}). \ No newline at end of file