From 544a38dd45f6a58d34296c6c768afd086eb2ac70 Mon Sep 17 00:00:00 2001 From: Christopher Lenz Date: Fri, 28 Mar 2008 23:32:19 +0000 Subject: Imported trunk. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@642432 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 757 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 757 insertions(+) create mode 100644 src/couchdb/couch_db.erl (limited to 'src/couchdb/couch_db.erl') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl new file mode 100644 index 00000000..e567d27b --- /dev/null +++ b/src/couchdb/couch_db.erl @@ -0,0 +1,757 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_db). +-behaviour(gen_server). + +-export([open/2,create/2,create/3,get_doc_info/2]). +-export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). +-export([delete_doc/3,open_doc/2,open_doc/3,close/1,enum_docs_since/4,enum_docs_since/5]). +-export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). +-export([start_update_loop/1]). +-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). + +-include("couch_db.hrl"). + +-record(db_header, + {write_version = 0, + last_update_seq = 0, + summary_stream_state = nil, + docinfo_by_Id_btree_state = nil, + docinfo_by_seq_btree_state = nil, + local_docs_btree_state = nil, + doc_count=0, + doc_del_count=0 + }). + +-record(db, + {main_pid, + update_pid, + fd, + header = #db_header{}, + summary_stream, + docinfo_by_Id_btree, + docinfo_by_seq_btree, + local_docs_btree, + last_update_seq, + doc_count, + doc_del_count, + name + }). + +start_link(DbName, Filepath, Options) -> + case couch_file:open(Filepath, Options) of + {ok, Fd} -> + Result = gen_server:start_link(couch_db, {DbName, Fd, Options}, []), + unlink(Fd), + Result; + {error, enoent} -> + % couldn't find file + {error, not_found}; + Else -> + Else + end. + +%%% Interface functions %%% + +create(Filepath, Options) -> + create(Filepath, Filepath, Options). + +create(DbName, Filepath, Options) when is_list(Options) -> + start_link(DbName, Filepath, [create | Options]). + +open(DbName, Filepath) -> + start_link(DbName, Filepath, []). + +delete_doc(MainPid, Id, Revisions) -> + DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], + {ok, [Result]} = update_docs(MainPid, DeletedDocs, [new_edits]), + {ok, Result}. + +open_doc(MainPid, IdOrDocInfo) -> + open_doc(MainPid, IdOrDocInfo, []). + +open_doc(MainPid, Id, Options) -> + case open_doc_int(get_db(MainPid), Id, Options) of + {ok, #doc{deleted=true}=Doc} -> + case lists:member(deleted, Options) of + true -> + {ok, Doc}; + false -> + {not_found, deleted} + end; + Else -> + Else + end. + +open_doc_revs(MainPid, Id, Revs, Options) -> + open_doc_revs_int(get_db(MainPid), Id, Revs, Options). + +get_missing_revs(MainPid, IdRevsList) -> + Ids = [Id1 || {Id1, _Revs} <- IdRevsList], + FullDocInfoResults = get_full_doc_infos(MainPid, Ids), + Results = lists:zipwith( + fun({Id, Revs}, FullDocInfoResult) -> + case FullDocInfoResult of + {ok, #full_doc_info{rev_tree=RevisionTree}} -> + {Id, couch_key_tree:find_missing(RevisionTree, Revs)}; + not_found -> + {Id, Revs} + end + end, + IdRevsList, FullDocInfoResults), + {ok, Results}. + +get_doc_info(Db, Id) -> + case get_full_doc_info(Db, Id) of + {ok, DocInfo} -> + {ok, couch_doc:to_doc_info(DocInfo)}; + Else -> + Else + end. + +% returns {ok, DocInfo} or not_found +get_full_doc_info(Db, Id) -> + [Result] = get_full_doc_infos(Db, [Id]), + Result. + + +get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) -> + get_full_doc_infos(get_db(MainPid), Ids); +get_full_doc_infos(#db{}=Db, Ids) -> + couch_btree:lookup(Db#db.docinfo_by_Id_btree, Ids). + +get_db_info(MainPid) when is_pid(MainPid) -> + get_db_info(get_db(MainPid)); +get_db_info(#db{doc_count=Count, doc_del_count=DelCount, last_update_seq=SeqNum}) -> + InfoList = [ + {doc_count, Count}, + {doc_del_count, DelCount}, + {last_update_seq, SeqNum} + ], + {ok, InfoList}. + +update_doc(MainPid, Doc, Options) -> + {ok, [NewRev]} = update_docs(MainPid, [Doc], Options), + {ok, NewRev}. + +update_docs(MainPid, Docs) -> + update_docs(MainPid, Docs, []). + +% group_alike_docs groups the sorted documents into sublist buckets, by id. +% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] +group_alike_docs(Docs) -> + Sorted = lists:sort(fun(#doc{id=A},#doc{id=B})-> A < B end, Docs), + group_alike_docs(Sorted, []). + +group_alike_docs([], Buckets) -> + lists:reverse(Buckets); +group_alike_docs([Doc|Rest], []) -> + group_alike_docs(Rest, [[Doc]]); +group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> + [#doc{id=BucketId}|_] = Bucket, + case Doc#doc.id == BucketId of + true -> + % add to existing bucket + group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]); + false -> + % add to new bucket + group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]]) + end. + + +prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocInfo, LeafRevsDict) -> + case PrevRevs of + [PrevRev|_] -> + case dict:find(PrevRev, LeafRevsDict) of + {ok, {Deleted, Sp, DiskRevs}} -> + case couch_doc:has_stubs(Doc) of + true -> + DiskDoc = make_doc(Db, Id, Deleted, Sp, DiskRevs), + Doc2 = couch_doc:merge_stubs(Doc, DiskDoc), + Doc2#doc{revs=[NewRev|DiskRevs]}; + false -> + Doc#doc{revs=[NewRev|DiskRevs]} + end; + error -> + throw(conflict) + end; + [] -> + % new doc, and we have existing revs. + OldDocInfo = couch_doc:to_doc_info(OldFullDocInfo), + if OldDocInfo#doc_info.deleted -> + % existing doc is a deleton + % allow this new doc to be a later revision. + {_Deleted, _Sp, Revs} = dict:fetch(OldDocInfo#doc_info.rev, LeafRevsDict), + Doc#doc{revs=[NewRev|Revs]}; + true -> + throw(conflict) + end + end. + +update_docs(MainPid, Docs, Options) -> + Docs2 = lists:map( + fun(#doc{id=Id,revs=Revs}=Doc) -> + case Id of + ?LOCAL_DOC_PREFIX ++ _ -> + Rev = case Revs of [] -> 0; [Rev0|_] -> list_to_integer(Rev0) end, + Doc#doc{revs=[integer_to_list(Rev + 1)]}; + _ -> + Doc#doc{revs=[integer_to_list(couch_util:rand32()) | Revs]} + end + end, Docs), + DocBuckets = group_alike_docs(Docs2), + Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], + Db = get_db(MainPid), + + % first things first, lookup the doc by id and get the most recent + + ExistingDocs = get_full_doc_infos(Db, Ids), + + DocBuckets2 = lists:zipwith( + fun(Bucket, not_found) -> + % no existing revs, make sure no old revision is specified. + [throw(conflict) || #doc{revs=[_NewRev, _OldRev | _]} <- Bucket], + Bucket; + (Bucket, {ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}) -> + Leafs = couch_key_tree:get_all_leafs(OldRevTree), + LeafRevsDict = dict:from_list([{Rev, {Deleted, Sp, Revs}} || {Rev, {Deleted, Sp}, Revs} <- Leafs]), + [prepare_doc_for_new_edit(Db, Doc, OldFullDocInfo, LeafRevsDict) || Doc <- Bucket] + end, + DocBuckets, ExistingDocs), + + % flush unwritten binaries to disk. + DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2], + + case gen_server:call(MainPid, {update_docs, DocBuckets3, Options}) of + ok -> + % return back the new rev ids, in the same order input. + {ok, [NewRev || #doc{revs=[NewRev|_]} <- Docs2]}; + Else-> + throw(Else) + end. + +save_docs(MainPid, Docs) -> + save_docs(MainPid, Docs, []). + +save_docs(MainPid, Docs, Options) -> + % flush unwritten binaries to disk. + Db = get_db(MainPid), + DocBuckets = group_alike_docs(Docs), + DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], + ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}). + + +doc_flush_binaries(Doc, Fd) -> + % calc size of binaries to write out + Bins = Doc#doc.attachments, + PreAllocSize = lists:foldl( + fun(BinValue, SizeAcc) -> + case BinValue of + {_Key, {_Type, {Fd0, _StreamPointer, _Len}}} when Fd0 == Fd -> + % already written to our file, nothing to write + SizeAcc; + {_Key, {_Type, {_OtherFd, _StreamPointer, Len}}} -> + % written to a different file + SizeAcc + Len; + {_Key, {_Type, Bin}} when is_binary(Bin) -> + SizeAcc + size(Bin) + end + end, + 0, Bins), + + {ok, OutputStream} = couch_stream:open(Fd), + ok = couch_stream:ensure_buffer(OutputStream, PreAllocSize), + + NewBins = lists:map( + fun({Key, {Type, BinValue}}) -> + NewBinValue = + case BinValue of + {Fd0, StreamPointer, Len} when Fd0 == Fd -> + % already written to our file, nothing to write + {Fd, StreamPointer, Len}; + {OtherFd, StreamPointer, Len} -> + % written to a different file (or a closed file + % instance, which will cause an error) + {ok, {NewStreamPointer, Len}, _EndSp} = + couch_stream:foldl(OtherFd, StreamPointer, Len, + fun(Bin, {BeginPointer, SizeAcc}) -> + {ok, Pointer} = couch_stream:write(OutputStream, Bin), + case SizeAcc of + 0 -> % this was the first write, record the pointer + {ok, {Pointer, size(Bin)}}; + _ -> + {ok, {BeginPointer, SizeAcc + size(Bin)}} + end + end, + {{0,0}, 0}), + {Fd, NewStreamPointer, Len}; + Bin when is_binary(Bin), size(Bin) > 0 -> + {ok, StreamPointer} = couch_stream:write(OutputStream, Bin), + {Fd, StreamPointer, size(Bin)} + end, + {Key, {Type, NewBinValue}} + end, Bins), + + {ok, _FinalPos} = couch_stream:close(OutputStream), + + Doc#doc{attachments = NewBins}. + +enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) -> + Db = get_db(MainPid), + couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). + +enum_docs_since(MainPid, SinceSeq, InFun, Acc) -> + enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc). + +enum_docs(MainPid, StartId, Direction, InFun, InAcc) -> + Db = get_db(MainPid), + couch_btree:fold(Db#db.docinfo_by_Id_btree, StartId, Direction, InFun, InAcc). + +enum_docs(MainPid, StartId, InFun, Ctx) -> + enum_docs(MainPid, StartId, fwd, InFun, Ctx). + +close(MainPid) -> + Ref = erlang:monitor(process, MainPid), + unlink(MainPid), + exit(MainPid, normal), + receive + {'DOWN', Ref, process, MainPid, _Reason} -> + ok + end. + + +% server functions + +init({DbName, Fd, Options}) -> + link(Fd), + case lists:member(create, Options) of + true -> + % create a new header and writes it to the file + Header = #db_header{}, + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header), + ok = couch_file:sync(Fd), + init_main(DbName, Fd, Header); + false -> + {ok, Header} = couch_file:read_header(Fd, <<$g, $m, $k, 0>>), + init_main(DbName, Fd, Header) + end. + +btree_by_seq_split(DocInfo) -> + #doc_info{ + id = Id, + rev = Rev, + update_seq = Seq, + summary_pointer = Sp, + conflict_revs = Conflicts, + deleted_conflict_revs = DelConflicts, + deleted = Deleted} = DocInfo, + {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}. + +btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) -> + #doc_info{ + id = Id, + rev = Rev, + update_seq = Seq, + summary_pointer = Sp, + conflict_revs = Conflicts, + deleted_conflict_revs = DelConflicts, + deleted = Deleted}. + +btree_by_name_split(#full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}) -> + {Id, {Seq, Tree}}. + +btree_by_name_join(Id, {Seq, Tree}) -> + #full_doc_info{id=Id, update_seq=Seq, rev_tree=Tree}. + + +init_main(DbName, Fd, Header) -> + {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), + ok = couch_stream:set_min_buffer(SummaryStream, 10000), + {ok, IdBtree} = couch_btree:open(Header#db_header.docinfo_by_Id_btree_state, Fd, + [{split, fun(V) -> btree_by_name_split(V) end}, + {join, fun(K,V) -> btree_by_name_join(K,V) end}] ), + {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, + [{split, fun(V) -> btree_by_seq_split(V) end}, + {join, fun(K,V) -> btree_by_seq_join(K,V) end}] ), + {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), + + Db = #db{ + main_pid=self(), + fd=Fd, + header=Header, + summary_stream = SummaryStream, + docinfo_by_Id_btree = IdBtree, + docinfo_by_seq_btree = SeqBtree, + local_docs_btree = LocalDocsBtree, + last_update_seq = Header#db_header.last_update_seq, + doc_count = Header#db_header.doc_count, + doc_del_count = Header#db_header.doc_del_count, + name = DbName + }, + + UpdatePid = spawn_link(couch_db, start_update_loop, [Db]), + + {ok, Db#db{update_pid=UpdatePid}}. + +terminate(_Reason, Db) -> + Db#db.update_pid ! close, + couch_file:close(Db#db.fd). + +handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> + Updater ! {From, update_docs, DocActions, Options}, + {noreply, Db}; +handle_call(get_db, _From, Db) -> + {reply, {ok, Db}, Db}; +handle_call({db_updated, NewDb}, _From, _OldDb) -> + {reply, ok, NewDb}. + + +handle_cast(foo, Main) -> + {noreply, Main}. + +%%% Internal function %%% + +start_update_loop(Db) -> + update_loop(Db#db{update_pid=self()}). + +update_loop(Db) -> + receive + {OrigFrom, update_docs, DocActions, Options} -> + case (catch update_docs_int(Db, DocActions, Options)) of + {ok, Db2} -> + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + gen_server:reply(OrigFrom, ok), + couch_db_update_notifier:notify({updated, Db2#db.name}), + update_loop(Db2); + conflict -> + gen_server:reply(OrigFrom, conflict), + update_loop(Db); + Error -> + exit(Error) % we crashed + end; + close -> + % terminate loop + exit(normal) + end. + +get_db(MainPid) -> + {ok, Db} = gen_server:call(MainPid, get_db), + Db. + +open_doc_revs_int(Db, Id, Revs, Options) -> + case get_full_doc_info(Db, Id) of + {ok, #full_doc_info{rev_tree=RevTree}} -> + {FoundRevs, MissingRevs} = + case Revs of + all -> + {couch_key_tree:get_all_leafs(RevTree), []}; + _ -> + case lists:member(latest, Options) of + true -> + couch_key_tree:get_key_leafs(RevTree, Revs); + false -> + couch_key_tree:get(RevTree, Revs) + end + end, + FoundResults = + lists:map(fun({Rev, Value, FoundRevPath}) -> + case Value of + 0 -> + % we have the rev in our list but know nothing about it + {{not_found, missing}, Rev}; + {IsDeleted, SummaryPtr} -> + {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)} + end + end, FoundRevs), + Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs], + {ok, Results}; + not_found when Revs == all -> + {ok, []}; + not_found -> + {ok, [{{not_found, missing}, Rev} || Rev <- Revs]} + end. + +open_doc_int(Db, ?LOCAL_DOC_PREFIX ++ _ = Id, _Options) -> + case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of + [{ok, {_, {Rev, BodyData}}}] -> + {ok, #doc{id=Id, revs=[integer_to_list(Rev)], body=BodyData}}; + [not_found] -> + {not_found, missing} + end; +open_doc_int(Db, #doc_info{id=Id,rev=Rev,deleted=IsDeleted,summary_pointer=Sp}=DocInfo, Options) -> + Doc = make_doc(Db, Id, IsDeleted, Sp, [Rev]), + {ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}; +open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> + #doc_info{deleted=IsDeleted,rev=Rev, summary_pointer=Sp} = DocInfo = + couch_doc:to_doc_info(FullDocInfo), + {[{_Rev,_Value, Revs}], []} = couch_key_tree:get(RevTree, [Rev]), + Doc = make_doc(Db, Id, IsDeleted, Sp, Revs), + {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}; +open_doc_int(Db, Id, Options) -> + case get_full_doc_info(Db, Id) of + {ok, FullDocInfo} -> + open_doc_int(Db, FullDocInfo, Options); + not_found -> + throw({not_found, missing}) + end. + +doc_meta_info(DocInfo, RevTree, Options) -> + case lists:member(revs_info, Options) of + false -> []; + true -> + {[RevPath],[]} = + couch_key_tree:get_full_key_paths(RevTree, [DocInfo#doc_info.rev]), + [{revs_info, [{Rev, Deleted} || {Rev, {Deleted, _Sp0}} <- RevPath]}] + end ++ + case lists:member(conflicts, Options) of + false -> []; + true -> + case DocInfo#doc_info.conflict_revs of + [] -> []; + _ -> [{conflicts, DocInfo#doc_info.conflict_revs}] + end + end ++ + case lists:member(deleted_conflicts, Options) of + false -> []; + true -> + case DocInfo#doc_info.deleted_conflict_revs of + [] -> []; + _ -> [{deleted_conflicts, DocInfo#doc_info.deleted_conflict_revs}] + end + end. + +% rev tree functions + +doc_to_tree(Doc) -> + doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). + +doc_to_tree(Doc, [RevId]) -> + [{RevId, Doc, []}]; +doc_to_tree(Doc, [RevId | Rest]) -> + [{RevId, [], doc_to_tree(Doc, Rest)}]. + +make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> + {BodyData, BinValues} = + case SummaryPointer of + nil -> + {[], []}; + _ -> + {ok, {BodyData0, BinValues0}} = couch_stream:read_term(Db#db.summary_stream, SummaryPointer), + {BodyData0, [{Name, {Type, {Db#db.fd, Sp, Len}}} || {Name, {Type, Sp, Len}} <- BinValues0]} + end, + #doc{ + id = Id, + revs = RevisionPath, + body = BodyData, + attachments = BinValues, + deleted = Deleted + }. + +flush_trees(_Db, [], AccFlushedTrees) -> + {ok, lists:reverse(AccFlushedTrees)}; +flush_trees(Db, [Unflushed | RestUnflushed], AccFlushed) -> + Flushed = couch_key_tree:map( + fun(_Rev, Value) -> + case Value of + #doc{attachments=Atts,deleted=IsDeleted}=Doc -> + % this node value is actually an unwritten document summary, + % write to disk. + + % convert bins, removing the FD. + % All bins should have been flushed to disk already. + Bins = [{BinName, {BinType, BinSp, BinLen}} || {BinName, {BinType, {_Fd, BinSp, BinLen}}} <- Atts], + {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), + {IsDeleted, NewSummaryPointer}; + _ -> + Value + end + end, Unflushed), + flush_trees(Db, RestUnflushed, [Flushed | AccFlushed]). + +merge_rev_trees(_NoConflicts, [], [], AccNewTrees) -> + {ok, lists:reverse(AccNewTrees)}; +merge_rev_trees(NoConflicts, [NewDocs | RestDocsList], + [OldTree | RestOldTrees], AccNewTrees) -> + UpdatesRevTree = lists:foldl( + fun(NewDoc, AccTree) -> + couch_key_tree:merge(AccTree, doc_to_tree(NewDoc)) + end, + [], NewDocs), + NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), + if NoConflicts andalso OldTree == [] -> + OldConflicts = couch_key_tree:count_leafs(OldTree), + NewConflicts = couch_key_tree:count_leafs(NewRevTree), + if NewConflicts > OldConflicts -> + throw(conflict); + true -> ok + end; + true -> ok + end, + merge_rev_trees(NoConflicts, RestDocsList, RestOldTrees, [NewRevTree | AccNewTrees]). + +new_index_entries([], [], Seq, DocCount, DelCount, AccById, AccBySeq) -> + {ok, Seq, DocCount, DelCount, AccById, AccBySeq}; +new_index_entries([Id|RestIds], [RevTree|RestTrees], Seq0, DocCount, DelCount, AccById, AccBySeq) -> + Seq = Seq0 + 1, + FullDocInfo = #full_doc_info{id=Id, update_seq=Seq, rev_tree=RevTree}, + #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), + {DocCount2, DelCount2} = + if Deleted -> {DocCount, DelCount + 1}; + true -> {DocCount + 1, DelCount} + end, + new_index_entries(RestIds, RestTrees, Seq, DocCount2, DelCount2, [FullDocInfo|AccById], [DocInfo|AccBySeq]). + +update_docs_int(Db, DocsList, Options) -> + #db{ + docinfo_by_Id_btree = DocInfoByIdBTree, + docinfo_by_seq_btree = DocInfoBySeqBTree, + last_update_seq = LastSeq, + doc_count = FullDocCount, + doc_del_count = FullDelCount + } = Db, + + % separate out the NonRep documents from the rest of the documents + {DocsList2, NonRepDocs} = lists:foldl( + fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> + case Id of + ?LOCAL_DOC_PREFIX ++ _ when Rest==[] -> + % when saving NR (non rep) documents, you can only save a single rev + {DocsListAcc, [Doc | NonRepDocsAcc]}; + Id-> + {[Docs | DocsListAcc], NonRepDocsAcc} + end + end, {[], []}, DocsList), + + Ids = [Id || [#doc{id=Id}|_] <- DocsList2], + + % lookup up the existing documents, if they exist. + OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), + OldDocTrees = lists:map( + fun({ok, #full_doc_info{rev_tree=OldRevTree}}) -> + OldRevTree; + (not_found) -> + [] + end, + OldDocLookups), + + {OldCount, OldDelCount} = lists:foldl( + fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{deleted=false} -> + {OldCountAcc + 1, OldDelCountAcc}; + _ -> + {OldCountAcc , OldDelCountAcc + 1} + end; + (not_found, Acc) -> + Acc + end, {0, 0}, OldDocLookups), + + % Merge the new docs into the revision trees. + NoConflicts = lists:member(no_conflicts, Options), + {ok, NewRevTrees} = merge_rev_trees(NoConflicts, DocsList2, OldDocTrees, []), + + RemoveSeqs = [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], + + % All regular documents are now ready to write. + + % Try to write the local documents first, a conflict might be generated + {ok, Db2} = update_local_docs(Db, NonRepDocs), + + % Write out the documents summaries (they are stored in the nodes of the rev trees) + {ok, FlushedRevTrees} = flush_trees(Db2, NewRevTrees, []), + + {ok, NewSeq, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = + new_index_entries(Ids, FlushedRevTrees, LastSeq, 0, 0, [], []), + + % and the indexes to the documents + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), + + Db3 = Db2#db{ + docinfo_by_Id_btree = DocInfoByIdBTree2, + docinfo_by_seq_btree = DocInfoBySeqBTree2, + last_update_seq = NewSeq, + doc_count = FullDocCount + NewDocsCount - OldCount, + doc_del_count = FullDelCount + NewDelCount - OldDelCount + }, + + case lists:member(delay_commit, Options) of + true -> + {ok, Db3}; + false -> + commit_outstanding(Db3) + end. + +update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> + Ids = [Id || #doc{id=Id} <- Docs], + OldDocLookups = couch_btree:lookup(Btree, Ids), + BtreeEntries = lists:zipwith( + fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> + BasedOnRev = + case Revs of + [] -> 0; + [RevStr|_] -> list_to_integer(RevStr) - 1 + end, + OldRev = + case OldDocLookup of + {ok, {_, {OldRev0, _}}} -> OldRev0; + not_found -> 0 + end, + case OldRev == BasedOnRev of + true -> + case Delete of + false -> {update, {Id, {OldRev+1, Body}}}; + true -> {remove, Id} + end; + false -> + throw(conflict) + end + + end, Docs, OldDocLookups), + + BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], + BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], + + {ok, Btree2} = + couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), + + {ok, Db#db{local_docs_btree = Btree2}}. + + + +commit_outstanding(#db{fd=Fd, header=Header} = Db) -> + ok = couch_file:sync(Fd), % commit outstanding data + Header2 = Header#db_header{ + last_update_seq = Db#db.last_update_seq, + summary_stream_state = couch_stream:get_state(Db#db.summary_stream), + docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), + docinfo_by_Id_btree_state = couch_btree:get_state(Db#db.docinfo_by_Id_btree), + local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), + doc_count = Db#db.doc_count, + doc_del_count = Db#db.doc_del_count + }, + ok = couch_file:write_header(Fd, <<$g, $m, $k, 0>>, Header2), + ok = couch_file:sync(Fd), % commit header to disk + Db2 = Db#db{ + header = Header2 + }, + {ok, Db2}. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(_Info, State) -> + {noreply, State}. + + -- cgit v1.2.3