% Licensed under the Apache License, Version 2.0 (the "License"); you may not % use this file except in compliance with the License. You may obtain a copy of % the License at % % http://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, software % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the % License for the specific language governing permissions and limitations under % the License. -module(couch_db). -behaviour(gen_server). -export([open/2,create/2,create/3,get_doc_info/2]). -export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). -export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). -export([start_update_loop/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include("couch_db.hrl"). -record(db_header, {write_version = 0, last_update_seq = 0, summary_stream_state = nil, docinfo_by_Id_btree_state = nil, docinfo_by_seq_btree_state = nil, local_docs_btree_state = nil, doc_count=0, doc_del_count=0 }). -record(db, {main_pid, update_pid, fd, header = #db_header{}, summary_stream, docinfo_by_Id_btree, docinfo_by_seq_btree, local_docs_btree, last_update_seq, doc_count, doc_del_count, name }). start_link(DbName, Filepath, Options) -> case couch_file:open(Filepath, Options) of {ok, Fd} -> Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []), unlink(Fd), Result; {error, enoent} -> % couldn't find file {error, not_found}; Else -> Else end. %%% Interface functions %%% create(Filepath, Options) -> create(Filepath, Filepath, Options). create(DbName, Filepath, Options) when is_list(Options) -> start_link(DbName, Filepath, [create | Options]). open(DbName, Filepath) -> start_link(DbName, Filepath, []). delete_doc(MainPid, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], {ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]), {ok, Result}. open_doc(MainPid, IdOrDocInfo) -> open_doc(MainPid, IdOrDocInfo, []). open_doc(MainPid, Id, Options) -> case open_doc_int(get_db(MainPid), Id, Options) of {ok, #doc{deleted=true}=Doc} -> case lists:member(deleted, Options) of true -> {ok, Doc}; false -> {not_found, deleted} end; Else -> Else end. open_doc_revs(MainPid, Id, Revs, Options) -> open_doc_revs_int(get_db(MainPid), Id, Revs, Options). get_missing_revs(MainPid, IdRevsList) -> Ids = [Id1 || {Id1, _Revs} <- IdRevsList], FullDocInfoResults = get_full_doc_infos(MainPid, Ids), Results = lists:zipwith( fun({Id, Revs}, FullDocInfoResult) -> case FullDocInfoResult of {ok, #full_doc_info{rev_tree=RevisionTree}} -> {Id, couch_key_tree:find_missing(RevisionTree, Revs)}; not_found -> {Id, Revs} end end, IdRevsList, FullDocInfoResults), {ok, Results}. get_doc_info(Db, Id) -> case get_full_doc_info(Db, Id) of {ok, DocInfo} -> {ok, couch_doc:to_doc_info(DocInfo)}; Else -> Else end. % returns {ok, DocInfo} or not_found get_full_doc_info(Db, Id) -> [Result] = get_full_doc_infos(Db, [Id]), Result. get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) -> get_full_doc_infos(get_db(MainPid), Ids); get_full_doc_infos(#db{}=Db, Ids) -> couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids). get_db_info(MainPid) when is_pid(MainPid) -> get_db_info(get_db(MainPid)); get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) -> InfoList = [ {doc_count, Count}, {doc_del_count, DelCount}, {last_update_seq, SeqNum} ], {ok, InfoList}. update_doc(MainPid, Doc, Options) -> {ok, [NewRev]} = update_docs(MainPid, [Doc], Options), {ok, NewRev}. update_docs(MainPid, Docs) -> update_docs(MainPid, Docs, []). % group_alike_docs groups the sorted documents into sublist buckets, by id. % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] group_alike_docs(Docs) -> Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), group_alike_docs(Sorted, []). group_alike_docs([], Buckets) -> lists:reverse(Buckets); group_alike_docs([Doc|Rest], []) -> group_alike_docs(Rest, [[Doc]]); group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> [#doc{id=BucketId}|_] = Bucket, case Doc#doc.id == BucketId of true -> % add to existing bucket group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]); false -> % add to new bucket group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) end. prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocInfo, LeafRevsDict) -> case PrevRevs of [PrevRev|_] -> case dict:find(PrevRev, LeafRevsDict) of {ok, {Deleted, Sp, DiskRevs}} -> case couch_doc:has_stubs(Doc) of true -> DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs), Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), Doc2#doc{revs=[NewRev|DiskRevs]}; false -> Doc#doc{revs=[NewRev|DiskRevs]} end; error -> throw(conflict) end; [] -> % new doc, and we have existing revs. OldDocInfo = couch_doc:to_doc_info(OldFullDocInfo), if OldDocInfo#doc_info.deleted -> % existing doc is a deleton % allow this new doc to be a later revision. {_Deleted, _Sp, Revs} = dict:fetch(OldDocInfo#doc_info.rev, LeafRevsDict), Doc#doc{revs=[NewRev|Revs]}; true -> throw(conflict) end end. update_docs(MainPid, Docs, Options) -> Docs2 = lists:map( fun(#doc{id=Id,revs=Revs}=Doc) -> case Id of ?LOCAL_DOC_PREFIX ++ _ -> Rev = case Revs of [] -> 0; [Rev0|_] -> list_to_integer(Rev0) end, Doc#doc{revs=[integer_to_list(Rev + 1)]}; _ -> Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]} end end, Docs), DocBuckets = group_alike_docs(Docs2), Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], Db = get_db(MainPid), % first things first, lookup the doc by id and get the most recent ExistingDocs = get_full_doc_infos(Db, Ids), DocBuckets2 = lists:zipwith( fun(Bucket, not_found) -> % no existing revs, make sure no old revision is specified. [throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket], Bucket; (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]), [prepare_doc_for_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket] end, DocBuckets, ExistingDocs), % flush unwritten binaries to disk. DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2], case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of ok -> % return back the new rev ids, in the same order input. {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]}; Else-> throw(Else) end. save_docs(MainPid, Docs) -> save_docs(MainPid, Docs, []). save_docs(MainPid, Docs, Options) -> % flush unwritten binaries to disk. Db = get_db(MainPid), DocBuckets = group_alike_docs(Docs), DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}). doc_flush_binaries(Doc, Fd) -> % calc size of binaries to write out Bins = Doc#doc.attachments, PreAllocSize = lists:foldl( fun(BinValue, SizeAcc) -> case BinValue of {_Key, {_Type, {Fd0, _StreamPointer, _Len}}} when Fd0 == Fd -> % already written to our file, nothing to write SizeAcc; {_Key, {_Type, {_OtherFd, _StreamPointer, Len}}} -> % written to a different file SizeAcc + Len; {_Key, {_Type, Bin}} when is_binary(Bin) -> SizeAcc + size(Bin) end end, 0, Bins), {ok, OutputStream} = couch_stream:open(Fd), ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize), NewBins = lists:map( fun({Key, {Type, BinValue}}) -> NewBinValue = case BinValue of {Fd0, StreamPointer, Len} when Fd0 == Fd -> % already written to our file, nothing to write {Fd, StreamPointer, Len}; {OtherFd, StreamPointer, Len} -> % written to a different file (or a closed file % instance, which will cause an error) {ok, {NewStreamPointer, Len}, _EndSp} = couch_stream:foldl(OtherFd, StreamPointer, Len, fun(Bin, {BeginPointer, SizeAcc}) -> {ok, Pointer} = couch_stream:write(OutputStream, Bin), case SizeAcc of 0 -> % this was the first write, record the pointer {ok, {Pointer, size(Bin)}}; _ -> {ok, {BeginPointer, SizeAcc + size(Bin)}} end end, {{0,0}, 0}), {Fd, NewStreamPointer, Len}; Bin when is_binary(Bin), size(Bin) > 0 -> {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), {Fd, StreamPointer, size(Bin)} end, {Key, {Type, NewBinValue}} end, Bins), {ok, _FinalPos} = couch_stream:close(OutputStream), Doc#doc{attachments = NewBins}. enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) -> Db = get_db(MainPid), couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). enum_docs_since(MainPid, SinceSeq, InFun, Acc) -> enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc). enum_docs(MainPid, StartId, Direction, InFun, InAcc) -> Db = get_db(MainPid), couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc). enum_docs(MainPid, StartId, InFun, Ctx) -> enum_docs(MainPid, StartId, fwd, InFun, Ctx). close(MainPid) -> Ref = erlang:monitor(process, MainPid), unlink(MainPid), exit(MainPid, normal), receive {'DOWN', Ref, process, MainPid, _Reason} -> ok end. % server functions init({DbName, Fd, Options}) -> link(Fd), case lists:member(create, Options) of true -> % create a new header and writes it to the file Header = #db_header{}, ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), ok = couch_file:sync(Fd), init_main(DbName, Fd, Header); false -> {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>), init_main(DbName, Fd, Header) end. btree_by_seq_split(DocInfo) -> #doc_info{ id = Id, rev = Rev, update_seq = Seq, summary_pointer = Sp, conflict_revs = Conflicts, deleted_conflict_revs = DelConflicts, deleted = Deleted} = DocInfo, {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}. btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) -> #doc_info{ id = Id, rev = Rev, update_seq = Seq, summary_pointer = Sp, conflict_revs = Conflicts, deleted_conflict_revs = DelConflicts, deleted = Deleted}. btree_by_name_split(#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}) -> {Id, {Seq, Tree}}. btree_by_name_join(Id, {Seq, Tree}) -> #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}. init_main(DbName, Fd, Header) -> {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), ok = couch_stream:set_min_buffer(SummaryStream, 10000), {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd, [{split, fun(V) -> btree_by_name_split(V) end}, {join, fun(K,V) -> btree_by_name_join(K,V) end}] ), {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, [{split, fun(V) -> btree_by_seq_split(V) end}, {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ), {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), Db = #db{ main_pid=self(), fd=Fd, header=Header, summary_stream = SummaryStream, docinfo_by_Id_btree = IdBtree, docinfo_by_seq_btree = SeqBtree, local_docs_btree = LocalDocsBtree, last_update_seq = Header#db_header.last_update_seq, doc_count = Header#db_header.doc_count, doc_del_count = Header#db_header.doc_del_count, name = DbName }, UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), {ok, Db#db{update_pid=UpdatePid}}. terminate(_Reason, Db) -> Db#db.update_pid ! close, couch_file:close(Db#db.fd). handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> Updater ! {From, update_docs, DocActions, Options}, {noreply, Db}; handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; handle_call({db_updated, NewDb}, _From, _OldDb) -> {reply, ok, NewDb}. handle_cast(foo, Main) -> {noreply, Main}. %%% Internal function %%% start_update_loop(Db) -> update_loop(Db#db{update_pid=self()}). update_loop(Db) -> receive {OrigFrom, update_docs, DocActions, Options} -> case (catch update_docs_int(Db, DocActions, Options)) of {ok, Db2} -> ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), gen_server:reply(OrigFrom, ok), couch_db_update_notifier:notify({updated, Db2#db.name}), update_loop(Db2); conflict -> gen_server:reply(OrigFrom, conflict), update_loop(Db); Error -> exit(Error) % we crashed end; close -> % terminate loop exit(normal) end. get_db(MainPid) -> {ok, Db} = gen_server:call(MainPid, get_db), Db. open_doc_revs_int(Db, Id, Revs, Options) -> case get_full_doc_info(Db, Id) of {ok, #full_doc_info{rev_tree=RevTree}} -> {FoundRevs, MissingRevs} = case Revs of all -> {couch_key_tree:get_all_leafs(RevTree), []}; _ -> case lists:member(latest, Options) of true -> couch_key_tree:get_key_leafs(RevTree, Revs); false -> couch_key_tree:get(RevTree, Revs) end end, FoundResults = lists:map(fun({Rev, Value, FoundRevPath}) -> case Value of 0 -> % we have the rev in our list but know nothing about it {{not_found, missing}, Rev}; {IsDeleted, SummaryPtr} -> {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} end end, FoundRevs), Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs], {ok, Results}; not_found when Revs == all -> {ok, []}; not_found -> {ok, [{{not_found, missing}, Rev} || Rev <- Revs]} end. open_doc_int(Db, ?LOCAL_DOC_PREFIX ++ _ = Id, _Options) -> case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of [{ok, {_, {Rev, BodyData}}}] -> {ok, #doc{id=Id, revs=[integer_to_list(Rev)], body=BodyData}}; [not_found] -> {not_found, missing} end; open_doc_int(Db, #doc_info{id=Id,rev=Rev,deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) -> Doc = make_doc(Db, Id, IsDeleted, Sp, [Rev]), {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}; open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> #doc_info{deleted=IsDeleted,rev=Rev, summary_pointer=Sp} = DocInfo = couch_doc:to_doc_info(FullDocInfo), {[{_Rev,_Value, Revs}], []} = couch_key_tree:get(RevTree, [Rev]), Doc = make_doc(Db, Id, IsDeleted, Sp, Revs), {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; open_doc_int(Db, Id, Options) -> case get_full_doc_info(Db, Id) of {ok, FullDocInfo} -> open_doc_int(Db, FullDocInfo, Options); not_found -> throw({not_found, missing}) end. doc_meta_info(DocInfo, RevTree, Options) -> case lists:member(revs_info, Options) of false -> []; true -> {[RevPath],[]} = couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]), [{revs_info, [{Rev, Deleted} || {Rev, {Deleted, _Sp0}} <- RevPath]}] end ++ case lists:member(conflicts, Options) of false -> []; true -> case DocInfo#doc_info.conflict_revs of [] -> []; _ -> [{conflicts, DocInfo#doc_info.conflict_revs}] end end ++ case lists:member(deleted_conflicts, Options) of false -> []; true -> case DocInfo#doc_info.deleted_conflict_revs of [] -> []; _ -> [{deleted_conflicts, DocInfo#doc_info.deleted_conflict_revs}] end end. % rev tree functions doc_to_tree(Doc) -> doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). doc_to_tree(Doc, [RevId]) -> [{RevId, Doc, []}]; doc_to_tree(Doc, [RevId | Rest]) -> [{RevId, [], doc_to_tree(Doc, Rest)}]. make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> {BodyData, BinValues} = case SummaryPointer of nil -> {[], []}; _ -> {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer), {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]} end, #doc{ id = Id, revs = RevisionPath, body = BodyData, attachments = BinValues, deleted = Deleted }. flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) -> Flushed = couch_key_tree:map( fun(_Rev, Value) -> case Value of #doc{attachments=Atts,deleted=IsDeleted}=Doc -> % this node value is actually an unwritten document summary, % write to disk. % convert bins, removing the FD. % All bins should have been flushed to disk already. Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts], {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), {IsDeleted, NewSummaryPointer}; _ -> Value end end, Unflushed), flush_trees(Db, RestUnflushed, [Flushed | AccFlushed]). merge_rev_trees(_NoConflicts, [], [], AccNewTrees) -> {ok, lists:reverse(AccNewTrees)}; merge_rev_trees(NoConflicts, [NewDocs | RestDocsList], [OldTree | RestOldTrees], AccNewTrees) -> UpdatesRevTree = lists:foldl( fun(NewDoc, AccTree) -> couch_key_tree:merge(AccTree, doc_to_tree(NewDoc)) end, [], NewDocs), NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), if NoConflicts andalso OldTree == [] -> OldConflicts = couch_key_tree:count_leafs(OldTree), NewConflicts = couch_key_tree:count_leafs(NewRevTree), if NewConflicts > OldConflicts -> throw(conflict); true -> ok end; true -> ok end, merge_rev_trees(NoConflicts, RestDocsList, RestOldTrees, [NewRevTree | AccNewTrees]). new_index_entries([], [], Seq, DocCount, DelCount, AccById, AccBySeq) -> {ok, Seq, DocCount, DelCount, AccById, AccBySeq}; new_index_entries([Id|RestIds], [RevTree|RestTrees], Seq0, DocCount, DelCount, AccById, AccBySeq) -> Seq = Seq0 + 1, FullDocInfo = #full_doc_info{id=Id, update_seq=Seq, rev_tree=RevTree}, #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), {DocCount2, DelCount2} = if Deleted -> {DocCount, DelCount + 1}; true -> {DocCount + 1, DelCount} end, new_index_entries(RestIds, RestTrees, Seq, DocCount2, DelCount2, [FullDocInfo|AccById], [DocInfo|AccBySeq]). update_docs_int(Db, DocsList, Options) -> #db{ docinfo_by_Id_btree = DocInfoByIdBTree, docinfo_by_seq_btree = DocInfoBySeqBTree, last_update_seq = LastSeq, doc_count = FullDocCount, doc_del_count = FullDelCount } = Db, % separate out the NonRep documents from the rest of the documents {DocsList2, NonRepDocs} = lists:foldl( fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> case Id of ?LOCAL_DOC_PREFIX ++ _ when Rest==[] -> % when saving NR (non rep) documents, you can only save a single rev {DocsListAcc, [Doc | NonRepDocsAcc]}; Id-> {[Docs | DocsListAcc], NonRepDocsAcc} end end, {[], []}, DocsList), Ids = [Id || [#doc{id=Id}|_] <- DocsList2], % lookup up the existing documents, if they exist. OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), OldDocTrees = lists:map( fun({ok, #full_doc_info{rev_tree=OldRevTree}}) -> OldRevTree; (not_found) -> [] end, OldDocLookups), {OldCount, OldDelCount} = lists:foldl( fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> case couch_doc:to_doc_info(FullDocInfo) of #doc_info{deleted=false} -> {OldCountAcc + 1, OldDelCountAcc}; _ -> {OldCountAcc , OldDelCountAcc + 1} end; (not_found, Acc) -> Acc end, {0, 0}, OldDocLookups), % Merge the new docs into the revision trees. NoConflicts = lists:member(no_conflicts, Options), {ok, NewRevTrees} = merge_rev_trees(NoConflicts, DocsList2, OldDocTrees, []), RemoveSeqs = [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], % All regular documents are now ready to write. % Try to write the local documents first, a conflict might be generated {ok, Db2} = update_local_docs(Db, NonRepDocs), % Write out the documents summaries (they are stored in the nodes of the rev trees) {ok, FlushedRevTrees} = flush_trees(Db2, NewRevTrees, []), {ok, NewSeq, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = new_index_entries(Ids, FlushedRevTrees, LastSeq, 0, 0, [], []), % and the indexes to the documents {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), Db3 = Db2#db{ docinfo_by_Id_btree = DocInfoByIdBTree2, docinfo_by_seq_btree = DocInfoBySeqBTree2, last_update_seq = NewSeq, doc_count = FullDocCount + NewDocsCount - OldCount, doc_del_count = FullDelCount + NewDelCount - OldDelCount }, case lists:member(delay_commit, Options) of true -> {ok, Db3}; false -> commit_outstanding(Db3) end. update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> Ids = [Id || #doc{id=Id} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> BasedOnRev = case Revs of [] -> 0; [RevStr|_] -> list_to_integer(RevStr) - 1 end, OldRev = case OldDocLookup of {ok, {_, {OldRev0, _}}} -> OldRev0; not_found -> 0 end, case OldRev == BasedOnRev of true -> case Delete of false -> {update, {Id, {OldRev+1, Body}}}; true -> {remove, Id} end; false -> throw(conflict) end end, Docs, OldDocLookups), BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), {ok, Db#db{local_docs_btree = Btree2}}. commit_outstanding(#db{fd=Fd, header=Header} = Db) -> ok = couch_file:sync(Fd), % commit outstanding data Header2 = Header#db_header{ last_update_seq = Db#db.last_update_seq, summary_stream_state = couch_stream:get_state(Db#db.summary_stream), docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree), local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), doc_count = Db#db.doc_count, doc_del_count = Db#db.doc_del_count }, ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), ok = couch_file:sync(Fd), % commit header to disk Db2 = Db#db{ header = Header2 }, {ok, Db2}. code_change(_OldVsn, State, _Extra) -> {ok, State}. handle_info(_Info, State) -> {noreply, State}.