diff options
author | Damien F. Katz <damien@apache.org> | 2008-08-05 01:43:40 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2008-08-05 01:43:40 +0000 |
commit | 88ec14c220592c8c0db7869c9961423e9ee97e7c (patch) | |
tree | 67974f234e4a0201302506e3b7c56a73cf909376 /src/couchdb/couch_db.erl | |
parent | b218a0e7d425f7b3660433a17c6558f676524730 (diff) |
Added concurrent open db limit and a LRU cache for closing old databases when limit reached (configurable via MaxDbsOpen var in couch.ini). Refactored db update code in couch_db.erl into couch_db_updater.erl.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@682560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src/couchdb/couch_db.erl')
-rw-r--r-- | src/couchdb/couch_db.erl | 681 |
1 files changed, 74 insertions, 607 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 2f5df3b5..098f3de7 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -13,63 +13,24 @@ -module(couch_db). -behaviour(gen_server). --export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]). --export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). --export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]). --export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). +-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]). +-export([open_ref_counted/2,num_refs/1,monitor/1]). +-export([save_docs/3,update_doc/3,update_docs/2,update_docs/3,delete_doc/3]). +-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). +-export([get_missing_revs/2]). +-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1]). --export([start_update_loop/2]). +-export([start_link/3]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([start_copy_compact_int/2]). --export([btree_by_id_split/1, - btree_by_id_join/2, - btree_by_id_reduce/2, - btree_by_seq_split/1, - btree_by_seq_join/2, - btree_by_seq_reduce/2]). -include("couch_db.hrl"). --record(db_header, - {write_version = 0, - update_seq = 0, - summary_stream_state = nil, - fulldocinfo_by_id_btree_state = nil, - docinfo_by_seq_btree_state = nil, - local_docs_btree_state = nil, - doc_count=0, - doc_del_count=0 - }). - --record(db, - {main_pid=nil, - update_pid=nil, - compactor_pid=nil, - fd, - header = #db_header{}, - summary_stream, - fulldocinfo_by_id_btree, - docinfo_by_seq_btree, - local_docs_btree, - update_seq, - doc_count, - doc_del_count, - name, - filepath - }). - -% small value used in revision trees to indicate the revision isn't stored --define(REV_MISSING, []). - --define(HEADER_SIG, <<$g, $m, $k, 0>>). - start_link(DbName, Filepath, Options) -> catch start_link0(DbName, Filepath, Options). start_link0(DbName, Filepath, Options) -> - % first delete the old file previous compaction Fd = case couch_file:open(Filepath, Options) of {ok, Fd0} -> @@ -105,33 +66,38 @@ start_link0(DbName, Filepath, Options) -> end, StartResult. -%%% Interface functions %%% -create(Filepath, Options) -> - create(Filepath, Filepath, Options). +create(DbName, Options) -> + couch_server:create(DbName, Options). -create(DbName, Filepath, Options) when is_list(Options) -> - start_link(DbName, Filepath, [create | Options]). +open(DbName, Options) -> + couch_server:open(DbName, Options). -open(DbName, Filepath) -> - start_link(DbName, Filepath, []). +close(#db{fd=Fd}) -> + couch_file:drop_ref(Fd). +open_ref_counted(MainPid, OpeningPid) -> + gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}). -% Compaction still needs work. Right now readers and writers can get an error -% file compaction changeover. This doesn't need to be the case. -start_compact(MainPid) -> - gen_server:cast(MainPid, start_compact). +num_refs(MainPid) -> + gen_server:call(MainPid, num_refs). -delete_doc(MainPid, Id, Revisions) -> +monitor(#db{main_pid=MainPid}) -> + erlang:monitor(process, MainPid). + +start_compact(#db{update_pid=Pid}) -> + gen_server:cast(Pid, start_compact). + +delete_doc(Db, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], - {ok, [Result]} = update_docs(MainPid, DeletedDocs, []), + {ok, [Result]} = update_docs(Db, DeletedDocs, []), {ok, Result}. -open_doc(MainPid, IdOrDocInfo) -> - open_doc(MainPid, IdOrDocInfo, []). +open_doc(Db, IdOrDocInfo) -> + open_doc(Db, IdOrDocInfo, []). -open_doc(MainPid, Id, Options) -> - case open_doc_int(get_db(MainPid), Id, Options) of +open_doc(Db, Id, Options) -> + case open_doc_int(Db, Id, Options) of {ok, #doc{deleted=true}=Doc} -> case lists:member(deleted, Options) of true -> @@ -143,13 +109,13 @@ open_doc(MainPid, Id, Options) -> Else end. -open_doc_revs(MainPid, Id, Revs, Options) -> - [Result] = open_doc_revs_int(get_db(MainPid), [{Id, Revs}], Options), +open_doc_revs(Db, Id, Revs, Options) -> + [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options), Result. -get_missing_revs(MainPid, IdRevsList) -> +get_missing_revs(Db, IdRevsList) -> Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - FullDocInfoResults = get_full_doc_infos(MainPid, Ids), + FullDocInfoResults = get_full_doc_infos(Db, Ids), Results = lists:zipwith( fun({Id, Revs}, FullDocInfoResult) -> case FullDocInfoResult of @@ -177,18 +143,12 @@ get_full_doc_info(Db, Id) -> [Result] = get_full_doc_infos(Db, [Id]), Result. - -get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) -> - get_full_doc_infos(get_db(MainPid), Ids); -get_full_doc_infos(#db{}=Db, Ids) -> +get_full_doc_infos(Db, Ids) -> couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). -increment_update_seq(MainPid) -> - gen_server:call(MainPid, increment_update_seq). +increment_update_seq(#db{update_pid=UpdatePid}) -> + gen_server:call(UpdatePid, increment_update_seq). - -get_db_info(MainPid) when is_pid(MainPid) -> - get_db_info(get_db(MainPid)); get_db_info(Db) -> #db{fd=Fd, compactor_pid=Compactor, @@ -205,12 +165,12 @@ get_db_info(Db) -> ], {ok, InfoList}. -update_doc(MainPid, Doc, Options) -> - {ok, [NewRev]} = update_docs(MainPid, [Doc], Options), +update_doc(Db, Doc, Options) -> + {ok, [NewRev]} = update_docs(Db, [Doc], Options), {ok, NewRev}. -update_docs(MainPid, Docs) -> - update_docs(MainPid, Docs, []). +update_docs(Db, Docs) -> + update_docs(Db, Docs, []). % group_alike_docs groups the sorted documents into sublist buckets, by id. % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] @@ -263,7 +223,7 @@ prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocI end end. -update_docs(MainPid, Docs, Options) -> +update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> % go ahead and generate the new revision ids for the documents. Docs2 = lists:map( fun(#doc{id=Id,revs=Revs}=Doc) -> @@ -278,7 +238,6 @@ update_docs(MainPid, Docs, Options) -> NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2], DocBuckets = group_alike_docs(Docs2), Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], - Db = get_db(MainPid), % lookup the doc by id and get the most recent @@ -298,13 +257,14 @@ update_docs(MainPid, Docs, Options) -> % flush unwritten binaries to disk. DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2], - case gen_server:call(MainPid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of + case gen_server:call(UpdatePid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of ok -> {ok, NewRevs}; retry -> - Db2 = get_db(MainPid), + Db2 = open_ref_counted(Db#db.main_pid, self()), DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3], % We only retry once - case gen_server:call(MainPid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of + ok = close(Db2), + case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of ok -> {ok, NewRevs}; Else -> throw(Else) end; @@ -312,15 +272,11 @@ update_docs(MainPid, Docs, Options) -> throw(Else) end. -save_docs(MainPid, Docs) -> - save_docs(MainPid, Docs, []). - -save_docs(MainPid, Docs, Options) -> +save_docs(#db{update_pid=UpdatePid, fd=Fd}, Docs, Options) -> % flush unwritten binaries to disk. - Db = get_db(MainPid), DocBuckets = group_alike_docs(Docs), - DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], - ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}, infinity). + DocBuckets2 = [[doc_flush_binaries(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets], + ok = gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity). doc_flush_binaries(Doc, Fd) -> @@ -379,125 +335,51 @@ doc_flush_binaries(Doc, Fd) -> Doc#doc{attachments = NewBins}. enum_docs_since_reduce_to_count(Reds) -> - couch_btree:final_reduce(fun btree_by_seq_reduce/2, Reds). + couch_btree:final_reduce(fun couch_db_updater:btree_by_seq_reduce/2, Reds). enum_docs_reduce_to_count(Reds) -> - couch_btree:final_reduce(fun btree_by_id_reduce/2, Reds). + couch_btree:final_reduce(fun couch_db_updater:btree_by_id_reduce/2, Reds). -enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) -> - Db = get_db(MainPid), +enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) -> couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). -enum_docs_since(MainPid, SinceSeq, InFun, Acc) -> - enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc). +enum_docs_since(Db, SinceSeq, InFun, Acc) -> + enum_docs_since(Db, SinceSeq, fwd, InFun, Acc). -enum_docs(MainPid, StartId, Direction, InFun, InAcc) -> - Db = get_db(MainPid), +enum_docs(Db, StartId, Direction, InFun, InAcc) -> couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc). -enum_docs(MainPid, StartId, InFun, Ctx) -> - enum_docs(MainPid, StartId, fwd, InFun, Ctx). +enum_docs(Db, StartId, InFun, Ctx) -> + enum_docs(Db, StartId, fwd, InFun, Ctx). % server functions -init(InitArgs) -> - spawn_link(couch_db, start_update_loop, [self(), InitArgs]), - receive - {initialized, Db} -> - {ok, Db} - end. - -btree_by_seq_split(DocInfo) -> - #doc_info{ - id = Id, - rev = Rev, - update_seq = Seq, - summary_pointer = Sp, - conflict_revs = Conflicts, - deleted_conflict_revs = DelConflicts, - deleted = Deleted} = DocInfo, - {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}. - -btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) -> - #doc_info{ - id = Id, - rev = Rev, - update_seq = Seq, - summary_pointer = Sp, - conflict_revs = Conflicts, - deleted_conflict_revs = DelConflicts, - deleted = Deleted}. - -btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, - deleted=Deleted, rev_tree=Tree}) -> - {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}. - -btree_by_id_join(Id, {Seq, Deleted, Tree}) -> - #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}. - - - -btree_by_id_reduce(reduce, FullDocInfos) -> - % count the number of deleted documents - length([1 || #full_doc_info{deleted=false} <- FullDocInfos]); -btree_by_id_reduce(rereduce, Reds) -> - lists:sum(Reds). - -btree_by_seq_reduce(reduce, DocInfos) -> - % count the number of deleted documents - length(DocInfos); -btree_by_seq_reduce(rereduce, Reds) -> - lists:sum(Reds). - -init_db(DbName, Filepath, Fd, Header) -> - {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), - ok = couch_stream:set_min_buffer(SummaryStream, 10000), - {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, - [{split, fun btree_by_id_split/1}, - {join, fun btree_by_id_join/2}, - {reduce, fun btree_by_id_reduce/2}]), - {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, - [{split, fun btree_by_seq_split/1}, - {join, fun btree_by_seq_join/2}, - {reduce, fun btree_by_seq_reduce/2}]), - {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), - - #db{ - update_pid=self(), - fd=Fd, - header=Header, - summary_stream = SummaryStream, - fulldocinfo_by_id_btree = IdBtree, - docinfo_by_seq_btree = SeqBtree, - local_docs_btree = LocalDocsBtree, - update_seq = Header#db_header.update_seq, - doc_count = Header#db_header.doc_count, - doc_del_count = Header#db_header.doc_del_count, - name = DbName, - filepath=Filepath }. +init({DbName, Filepath, Fd, Options}) -> + {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), + ok = couch_file:add_ref(Fd), + gen_server:call(UpdaterPid, get_db). -close_db(#db{fd=Fd,summary_stream=Ss}) -> - couch_file:close(Fd), - couch_stream:close(Ss). - terminate(_Reason, Db) -> exit(Db#db.update_pid, kill). -handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> - Updater ! {From, update_docs, DocActions, Options}, - {noreply, Db}; -handle_call(increment_update_seq, From, #db{update_pid=Updater}=Db) -> - Updater ! {From, increment_update_seq}, - {noreply, Db}; -handle_call(get_db, _From, Db) -> +handle_call({open_ref_counted_instance, OpenerPid}, _From, #db{fd=Fd}=Db) -> + ok = couch_file:add_ref(Fd, OpenerPid), {reply, {ok, Db}, Db}; -handle_call({db_updated, NewDb}, _From, _OldDb) -> +handle_call(num_refs, _From, #db{fd=Fd}=Db) -> + {reply, couch_file:num_refs(Fd) - 1, Db}; +handle_call({db_updated, #db{fd=NewFd}=NewDb}, _From, #db{fd=OldFd}) -> + case NewFd == OldFd of + true -> ok; + false -> + couch_file:add_ref(NewFd), + couch_file:drop_ref(OldFd) + end, {reply, ok, NewDb}. -handle_cast(start_compact, #db{update_pid=Updater}=Db) -> - Updater ! compact, - {noreply, Db}. +handle_cast(Msg, Db) -> + ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]), + exit({error, Msg}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -508,114 +390,6 @@ handle_info(Msg, Db) -> %%% Internal function %%% - -start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) -> - link(Fd), - - case lists:member(create, Options) of - true -> - % create a new header and writes it to the file - Header = #db_header{}, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header), - % delete any old compaction files that might be hanging around - file:delete(Filepath ++ ".compact"), - file:delete(Filepath ++ ".old"); - false -> - {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG) - end, - - Db = init_db(DbName, Filepath, Fd, Header), - Db2 = Db#db{main_pid=MainPid}, - MainPid ! {initialized, Db2}, - update_loop(Db2). - -update_loop(#db{fd=Fd,name=Name, - filepath=Filepath, - main_pid=MainPid, - update_seq=UpdateSeq}=Db) -> - receive - {OrigFrom, update_docs, DocActions, Options} -> - case (catch update_docs_int(Db, DocActions, Options)) of - {ok, Db2} -> - ok = gen_server:call(MainPid, {db_updated, Db2}), - gen_server:reply(OrigFrom, ok), - couch_db_update_notifier:notify({updated, Name}), - update_loop(Db2); - retry -> - gen_server:reply(OrigFrom, retry), - update_loop(Db); - conflict -> - gen_server:reply(OrigFrom, conflict), - update_loop(Db); - Error -> - exit(Error) % we crashed - end; - compact -> - case Db#db.compactor_pid of - nil -> - ?LOG_INFO("Starting compaction for db \"~s\"", [Name]), - Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]), - Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(MainPid, {db_updated, Db2}), - update_loop(Db2); - _ -> - update_loop(Db) % already started - end; - {compact_done, CompactFilepath} -> - {ok, NewFd} = couch_file:open(CompactFilepath), - {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG), - #db{update_seq=NewSeq}= NewDb = - init_db(Name, CompactFilepath, NewFd, NewHeader), - case Db#db.update_seq == NewSeq of - true -> - NewDb2 = commit_data( - NewDb#db{ - main_pid = Db#db.main_pid, - doc_count = Db#db.doc_count, - doc_del_count = Db#db.doc_del_count, - filepath = Filepath}), - - ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), - ok = file:rename(Filepath, Filepath ++ ".old"), - ok = file:rename(CompactFilepath, Filepath), - - couch_stream:close(Db#db.summary_stream), - % close file handle async. - % wait 5 secs before closing, allowing readers to finish - unlink(Fd), - spawn_link(fun() -> - receive after 5000 -> ok end, - couch_file:close(Fd), - file:delete(Filepath ++ ".old") - end), - - ok = gen_server:call(MainPid, {db_updated, NewDb2}), - ?LOG_INFO("Compaction for db ~p completed.", [Name]), - update_loop(NewDb2#db{compactor_pid=nil}); - false -> - ?LOG_INFO("Compaction file still behind main file " - "(update seq=~p. compact update seq=~p). Retrying.", - [Db#db.update_seq, NewSeq]), - Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]), - Db2 = Db#db{compactor_pid=Pid}, - couch_file:close(NewFd), - update_loop(Db2) - end; - {OrigFrom, increment_update_seq} -> - Db2 = commit_data(Db#db{update_seq=UpdateSeq+1}), - ok = gen_server:call(MainPid, {db_updated, Db2}), - gen_server:reply(OrigFrom, {ok, UpdateSeq+1}), - couch_db_update_notifier:notify({updated, Name}), - update_loop(Db2); - Else -> - ?LOG_ERROR("Unknown message received in db ~s:~p", [Db#db.name, Else]), - exit({error, Else}) - end. - -get_db(MainPid) -> - {ok, Db} = gen_server:call(MainPid, get_db), - Db. - open_doc_revs_int(Db, IdRevs, Options) -> Ids = [Id || {Id, _Revs} <- IdRevs], LookupResults = get_full_doc_infos(Db, Ids), @@ -711,16 +485,6 @@ doc_meta_info(DocInfo, RevTree, Options) -> end end. -% rev tree functions - -doc_to_tree(Doc) -> - doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). - -doc_to_tree(Doc, [RevId]) -> - [{RevId, Doc, []}]; -doc_to_tree(Doc, [RevId | Rest]) -> - [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}]. - make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> {BodyData, BinValues} = case SummaryPointer of @@ -737,303 +501,6 @@ make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> attachments = BinValues, deleted = Deleted }. - -flush_trees(_Db, [], AccFlushedTrees) -> - {ok, lists:reverse(AccFlushedTrees)}; -flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> - #full_doc_info{rev_tree=Unflushed} = InfoUnflushed, - Flushed = couch_key_tree:map( - fun(_Rev, Value) -> - case Value of - #doc{attachments=Atts,deleted=IsDeleted}=Doc -> - % this node value is actually an unwritten document summary, - % write to disk. - % make sure the Fd in the written bins is the same Fd we are. - Bins = - case Atts of - [] -> []; - [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd -> - % convert bins, removing the FD. - % All bins should have been flushed to disk already. - [{BinName, {BinType, BinSp, BinLen}} - || {BinName, {BinType, {_Fd, BinSp, BinLen}}} - <- Atts]; - _ -> - % BinFd must not equal our Fd. This can happen when a database - % is being updated during a compaction - ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), - throw(retry) - end, - {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), - {IsDeleted, NewSummaryPointer}; - _ -> - Value - end - end, Unflushed), - flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). - -merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) -> - {ok, lists:reverse(AccNewInfos), AccSeq}; -merge_rev_trees(NoConflicts, [NewDocs|RestDocsList], - [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) -> - #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo, - UpdatesRevTree = lists:foldl( - fun(NewDoc, AccTree) -> - couch_key_tree:merge(AccTree, doc_to_tree(NewDoc)) - end, - [], NewDocs), - NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), - if NewRevTree == OldTree -> - % nothing changed - merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq); - true -> - if NoConflicts andalso OldTree /= [] -> - OldConflicts = couch_key_tree:count_leafs(OldTree), - NewConflicts = couch_key_tree:count_leafs(NewRevTree), - if NewConflicts > OldConflicts -> - throw(conflict); - true -> ok - end; - true -> ok - end, - NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, - merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo, - [NewInfo|AccNewInfos],AccSeq+1) - end. - -new_index_entries([], DocCount, DelCount, AccById, AccBySeq) -> - {ok, DocCount, DelCount, AccById, AccBySeq}; -new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) -> - #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), - {DocCount2, DelCount2} = - if Deleted -> {DocCount, DelCount + 1}; - true -> {DocCount + 1, DelCount} - end, - new_index_entries(RestInfos, DocCount2, DelCount2, - [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], - [DocInfo|AccBySeq]). - -update_docs_int(Db, DocsList, Options) -> - #db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree, - docinfo_by_seq_btree = DocInfoBySeqBTree, - update_seq = LastSeq, - doc_count = FullDocCount, - doc_del_count = FullDelCount - } = Db, - - % separate out the NonRep documents from the rest of the documents - {DocsList2, NonRepDocs} = lists:foldl( - fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> - case Id of - ?LOCAL_DOC_PREFIX ++ _ when Rest==[] -> - % when saving NR (non rep) documents, you can only save a single rev - {DocsListAcc, [Doc | NonRepDocsAcc]}; - Id-> - {[Docs | DocsListAcc], NonRepDocsAcc} - end - end, {[], []}, DocsList), - - Ids = [Id || [#doc{id=Id}|_] <- DocsList2], - - % lookup up the existing documents, if they exist. - OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), - OldDocInfos = lists:zipwith( - fun(_Id, {ok, FullDocInfo}) -> - FullDocInfo; - (Id, not_found) -> - #full_doc_info{id=Id} - end, - Ids, OldDocLookups), - - {OldCount, OldDelCount} = lists:foldl( - fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{deleted=false} -> - {OldCountAcc + 1, OldDelCountAcc}; - _ -> - {OldCountAcc, OldDelCountAcc + 1} - end; - (not_found, Acc) -> - Acc - end, {0, 0}, OldDocLookups), - - % Merge the new docs into the revision trees. - NoConflicts = lists:member(new_edits, Options), - {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq), - - RemoveSeqs = - [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], - - % All regular documents are now ready to write. - - % Try to write the local documents first, a conflict might be generated - {ok, Db2} = update_local_docs(Db, NonRepDocs), - - % Write out the documents summaries (they are stored in the nodes of the rev trees) - {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), - - {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = - new_index_entries(FlushedDocInfos, 0, 0, [], []), - - % and the indexes to the documents - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), - - Db3 = Db2#db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree2, - docinfo_by_seq_btree = DocInfoBySeqBTree2, - update_seq = NewSeq, - doc_count = FullDocCount + NewDocsCount - OldCount, - doc_del_count = FullDelCount + NewDelCount - OldDelCount}, - - case lists:member(delay_commit, Options) of - true -> - {ok, Db3}; - false -> - {ok, commit_data(Db3)} - end. - -update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> - Ids = [Id || #doc{id=Id} <- Docs], - OldDocLookups = couch_btree:lookup(Btree, Ids), - BtreeEntries = lists:zipwith( - fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> - NewRev = - case Revs of - [] -> 0; - [RevStr|_] -> list_to_integer(RevStr) - end, - OldRev = - case OldDocLookup of - {ok, {_, {OldRev0, _}}} -> OldRev0; - not_found -> 0 - end, - case OldRev + 1 == NewRev of - true -> - case Delete of - false -> {update, {Id, {NewRev, Body}}}; - true -> {remove, Id} - end; - false -> - throw(conflict) - end - - end, Docs, OldDocLookups), - - BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], - BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], - - {ok, Btree2} = - couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - - {ok, Db#db{local_docs_btree = Btree2}}. - - - -commit_data(#db{fd=Fd, header=Header} = Db) -> - Header2 = Header#db_header{ - update_seq = Db#db.update_seq, - summary_stream_state = couch_stream:get_state(Db#db.summary_stream), - docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), - fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), - local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), - doc_count = Db#db.doc_count, - doc_del_count = Db#db.doc_del_count - }, - if Header == Header2 -> - Db; % unchanged. nothing to do - true -> - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), - Db#db{header = Header2} - end. - -copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> - {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), - % copy the bin values - NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> - {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd), - {Name, {Type, NewBinSp, Len}} - end, BinInfos), - % now write the document summary - {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}), - Sp. - -copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> - []; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> - % This is a leaf node, copy it over - NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), - [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> - % inner node, only copy info/data from leaf nodes - [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. - -copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) -> - Ids = [Id || #doc_info{id=Id} <- InfoBySeq], - LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), - NewFullDocInfos = lists:map( - fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} - end, LookupResults), - NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos], - {ok, DocInfoBTree} = - couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []), - {ok, FullDocInfoBTree} = - couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), - NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}. - - - -copy_compact_docs(Db, NewDb) -> - EnumBySeqFun = - fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) -> - case couch_util:should_flush() of - true -> - NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])), - {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}}; - false -> - {ok, {AccNewDb, [DocInfo | AccUncopied]}} - end - end, - {ok, {NewDb2, Uncopied}} = - couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}), - - case Uncopied of - [#doc_info{update_seq=LastSeq} | _] -> - commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq}, - lists:reverse(Uncopied))); - [] -> - NewDb2 - end. - -start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> - CompactFile = Filepath ++ ".compact", - ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), - case couch_file:open(CompactFile) of - {ok, Fd} -> - ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]), - {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); - {error, enoent} -> % - {ok, Fd} = couch_file:open(CompactFile, [create]), - Header = #db_header{}, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header) - end, - NewDb = init_db(Name, CompactFile, Fd, Header), - NewDb2 = copy_compact_docs(Db, NewDb), - NewDb3 = - case CopyLocal of - true -> - % suck up all the local docs into memory and write them to the new db - {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), - commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}); - _ -> - NewDb2 - end, - close_db(NewDb3), - Db#db.update_pid ! {compact_done, CompactFile}.
\ No newline at end of file |