From 88ec14c220592c8c0db7869c9961423e9ee97e7c Mon Sep 17 00:00:00 2001 From: "Damien F. Katz" Date: Tue, 5 Aug 2008 01:43:40 +0000 Subject: Added concurrent open db limit and a LRU cache for closing old databases when limit reached (configurable via MaxDbsOpen var in couch.ini). Refactored db update code in couch_db.erl into couch_db_updater.erl. git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@682560 13f79535-47bb-0310-9956-ffa450edef68 --- src/couchdb/couch_db.erl | 681 +++++---------------------------------- src/couchdb/couch_db.hrl | 37 +++ src/couchdb/couch_db_updater.erl | 499 ++++++++++++++++++++++++++++ src/couchdb/couch_file.erl | 74 ++++- src/couchdb/couch_httpd.erl | 11 +- src/couchdb/couch_rep.erl | 20 +- src/couchdb/couch_server.erl | 252 +++++++++------ src/couchdb/couch_server_sup.erl | 5 +- src/couchdb/couch_view.erl | 99 +++--- 9 files changed, 910 insertions(+), 768 deletions(-) create mode 100644 src/couchdb/couch_db_updater.erl (limited to 'src') diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index 2f5df3b5..098f3de7 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -13,63 +13,24 @@ -module(couch_db). -behaviour(gen_server). --export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]). --export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]). --export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]). --export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]). +-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]). +-export([open_ref_counted/2,num_refs/1,monitor/1]). +-export([save_docs/3,update_doc/3,update_docs/2,update_docs/3,delete_doc/3]). +-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). +-export([get_missing_revs/2]). +-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1]). --export([start_update_loop/2]). +-export([start_link/3]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). --export([start_copy_compact_int/2]). --export([btree_by_id_split/1, - btree_by_id_join/2, - btree_by_id_reduce/2, - btree_by_seq_split/1, - btree_by_seq_join/2, - btree_by_seq_reduce/2]). -include("couch_db.hrl"). --record(db_header, - {write_version = 0, - update_seq = 0, - summary_stream_state = nil, - fulldocinfo_by_id_btree_state = nil, - docinfo_by_seq_btree_state = nil, - local_docs_btree_state = nil, - doc_count=0, - doc_del_count=0 - }). - --record(db, - {main_pid=nil, - update_pid=nil, - compactor_pid=nil, - fd, - header = #db_header{}, - summary_stream, - fulldocinfo_by_id_btree, - docinfo_by_seq_btree, - local_docs_btree, - update_seq, - doc_count, - doc_del_count, - name, - filepath - }). - -% small value used in revision trees to indicate the revision isn't stored --define(REV_MISSING, []). - --define(HEADER_SIG, <<$g, $m, $k, 0>>). - start_link(DbName, Filepath, Options) -> catch start_link0(DbName, Filepath, Options). start_link0(DbName, Filepath, Options) -> - % first delete the old file previous compaction Fd = case couch_file:open(Filepath, Options) of {ok, Fd0} -> @@ -105,33 +66,38 @@ start_link0(DbName, Filepath, Options) -> end, StartResult. -%%% Interface functions %%% -create(Filepath, Options) -> - create(Filepath, Filepath, Options). +create(DbName, Options) -> + couch_server:create(DbName, Options). -create(DbName, Filepath, Options) when is_list(Options) -> - start_link(DbName, Filepath, [create | Options]). +open(DbName, Options) -> + couch_server:open(DbName, Options). -open(DbName, Filepath) -> - start_link(DbName, Filepath, []). +close(#db{fd=Fd}) -> + couch_file:drop_ref(Fd). +open_ref_counted(MainPid, OpeningPid) -> + gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}). -% Compaction still needs work. Right now readers and writers can get an error -% file compaction changeover. This doesn't need to be the case. -start_compact(MainPid) -> - gen_server:cast(MainPid, start_compact). +num_refs(MainPid) -> + gen_server:call(MainPid, num_refs). -delete_doc(MainPid, Id, Revisions) -> +monitor(#db{main_pid=MainPid}) -> + erlang:monitor(process, MainPid). + +start_compact(#db{update_pid=Pid}) -> + gen_server:cast(Pid, start_compact). + +delete_doc(Db, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], - {ok, [Result]} = update_docs(MainPid, DeletedDocs, []), + {ok, [Result]} = update_docs(Db, DeletedDocs, []), {ok, Result}. -open_doc(MainPid, IdOrDocInfo) -> - open_doc(MainPid, IdOrDocInfo, []). +open_doc(Db, IdOrDocInfo) -> + open_doc(Db, IdOrDocInfo, []). -open_doc(MainPid, Id, Options) -> - case open_doc_int(get_db(MainPid), Id, Options) of +open_doc(Db, Id, Options) -> + case open_doc_int(Db, Id, Options) of {ok, #doc{deleted=true}=Doc} -> case lists:member(deleted, Options) of true -> @@ -143,13 +109,13 @@ open_doc(MainPid, Id, Options) -> Else end. -open_doc_revs(MainPid, Id, Revs, Options) -> - [Result] = open_doc_revs_int(get_db(MainPid), [{Id, Revs}], Options), +open_doc_revs(Db, Id, Revs, Options) -> + [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options), Result. -get_missing_revs(MainPid, IdRevsList) -> +get_missing_revs(Db, IdRevsList) -> Ids = [Id1 || {Id1, _Revs} <- IdRevsList], - FullDocInfoResults = get_full_doc_infos(MainPid, Ids), + FullDocInfoResults = get_full_doc_infos(Db, Ids), Results = lists:zipwith( fun({Id, Revs}, FullDocInfoResult) -> case FullDocInfoResult of @@ -177,18 +143,12 @@ get_full_doc_info(Db, Id) -> [Result] = get_full_doc_infos(Db, [Id]), Result. - -get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) -> - get_full_doc_infos(get_db(MainPid), Ids); -get_full_doc_infos(#db{}=Db, Ids) -> +get_full_doc_infos(Db, Ids) -> couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). -increment_update_seq(MainPid) -> - gen_server:call(MainPid, increment_update_seq). +increment_update_seq(#db{update_pid=UpdatePid}) -> + gen_server:call(UpdatePid, increment_update_seq). - -get_db_info(MainPid) when is_pid(MainPid) -> - get_db_info(get_db(MainPid)); get_db_info(Db) -> #db{fd=Fd, compactor_pid=Compactor, @@ -205,12 +165,12 @@ get_db_info(Db) -> ], {ok, InfoList}. -update_doc(MainPid, Doc, Options) -> - {ok, [NewRev]} = update_docs(MainPid, [Doc], Options), +update_doc(Db, Doc, Options) -> + {ok, [NewRev]} = update_docs(Db, [Doc], Options), {ok, NewRev}. -update_docs(MainPid, Docs) -> - update_docs(MainPid, Docs, []). +update_docs(Db, Docs) -> + update_docs(Db, Docs, []). % group_alike_docs groups the sorted documents into sublist buckets, by id. % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]] @@ -263,7 +223,7 @@ prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocI end end. -update_docs(MainPid, Docs, Options) -> +update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> % go ahead and generate the new revision ids for the documents. Docs2 = lists:map( fun(#doc{id=Id,revs=Revs}=Doc) -> @@ -278,7 +238,6 @@ update_docs(MainPid, Docs, Options) -> NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2], DocBuckets = group_alike_docs(Docs2), Ids = [Id || [#doc{id=Id}|_] <- DocBuckets], - Db = get_db(MainPid), % lookup the doc by id and get the most recent @@ -298,13 +257,14 @@ update_docs(MainPid, Docs, Options) -> % flush unwritten binaries to disk. DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2], - case gen_server:call(MainPid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of + case gen_server:call(UpdatePid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of ok -> {ok, NewRevs}; retry -> - Db2 = get_db(MainPid), + Db2 = open_ref_counted(Db#db.main_pid, self()), DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3], % We only retry once - case gen_server:call(MainPid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of + ok = close(Db2), + case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of ok -> {ok, NewRevs}; Else -> throw(Else) end; @@ -312,15 +272,11 @@ update_docs(MainPid, Docs, Options) -> throw(Else) end. -save_docs(MainPid, Docs) -> - save_docs(MainPid, Docs, []). - -save_docs(MainPid, Docs, Options) -> +save_docs(#db{update_pid=UpdatePid, fd=Fd}, Docs, Options) -> % flush unwritten binaries to disk. - Db = get_db(MainPid), DocBuckets = group_alike_docs(Docs), - DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], - ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}, infinity). + DocBuckets2 = [[doc_flush_binaries(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets], + ok = gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity). doc_flush_binaries(Doc, Fd) -> @@ -379,125 +335,51 @@ doc_flush_binaries(Doc, Fd) -> Doc#doc{attachments = NewBins}. enum_docs_since_reduce_to_count(Reds) -> - couch_btree:final_reduce(fun btree_by_seq_reduce/2, Reds). + couch_btree:final_reduce(fun couch_db_updater:btree_by_seq_reduce/2, Reds). enum_docs_reduce_to_count(Reds) -> - couch_btree:final_reduce(fun btree_by_id_reduce/2, Reds). + couch_btree:final_reduce(fun couch_db_updater:btree_by_id_reduce/2, Reds). -enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) -> - Db = get_db(MainPid), +enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) -> couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). -enum_docs_since(MainPid, SinceSeq, InFun, Acc) -> - enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc). +enum_docs_since(Db, SinceSeq, InFun, Acc) -> + enum_docs_since(Db, SinceSeq, fwd, InFun, Acc). -enum_docs(MainPid, StartId, Direction, InFun, InAcc) -> - Db = get_db(MainPid), +enum_docs(Db, StartId, Direction, InFun, InAcc) -> couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc). -enum_docs(MainPid, StartId, InFun, Ctx) -> - enum_docs(MainPid, StartId, fwd, InFun, Ctx). +enum_docs(Db, StartId, InFun, Ctx) -> + enum_docs(Db, StartId, fwd, InFun, Ctx). % server functions -init(InitArgs) -> - spawn_link(couch_db, start_update_loop, [self(), InitArgs]), - receive - {initialized, Db} -> - {ok, Db} - end. - -btree_by_seq_split(DocInfo) -> - #doc_info{ - id = Id, - rev = Rev, - update_seq = Seq, - summary_pointer = Sp, - conflict_revs = Conflicts, - deleted_conflict_revs = DelConflicts, - deleted = Deleted} = DocInfo, - {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}. - -btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) -> - #doc_info{ - id = Id, - rev = Rev, - update_seq = Seq, - summary_pointer = Sp, - conflict_revs = Conflicts, - deleted_conflict_revs = DelConflicts, - deleted = Deleted}. - -btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, - deleted=Deleted, rev_tree=Tree}) -> - {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}. - -btree_by_id_join(Id, {Seq, Deleted, Tree}) -> - #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}. - - - -btree_by_id_reduce(reduce, FullDocInfos) -> - % count the number of deleted documents - length([1 || #full_doc_info{deleted=false} <- FullDocInfos]); -btree_by_id_reduce(rereduce, Reds) -> - lists:sum(Reds). - -btree_by_seq_reduce(reduce, DocInfos) -> - % count the number of deleted documents - length(DocInfos); -btree_by_seq_reduce(rereduce, Reds) -> - lists:sum(Reds). - -init_db(DbName, Filepath, Fd, Header) -> - {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), - ok = couch_stream:set_min_buffer(SummaryStream, 10000), - {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, - [{split, fun btree_by_id_split/1}, - {join, fun btree_by_id_join/2}, - {reduce, fun btree_by_id_reduce/2}]), - {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, - [{split, fun btree_by_seq_split/1}, - {join, fun btree_by_seq_join/2}, - {reduce, fun btree_by_seq_reduce/2}]), - {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), - - #db{ - update_pid=self(), - fd=Fd, - header=Header, - summary_stream = SummaryStream, - fulldocinfo_by_id_btree = IdBtree, - docinfo_by_seq_btree = SeqBtree, - local_docs_btree = LocalDocsBtree, - update_seq = Header#db_header.update_seq, - doc_count = Header#db_header.doc_count, - doc_del_count = Header#db_header.doc_del_count, - name = DbName, - filepath=Filepath }. +init({DbName, Filepath, Fd, Options}) -> + {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), + ok = couch_file:add_ref(Fd), + gen_server:call(UpdaterPid, get_db). -close_db(#db{fd=Fd,summary_stream=Ss}) -> - couch_file:close(Fd), - couch_stream:close(Ss). - terminate(_Reason, Db) -> exit(Db#db.update_pid, kill). -handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) -> - Updater ! {From, update_docs, DocActions, Options}, - {noreply, Db}; -handle_call(increment_update_seq, From, #db{update_pid=Updater}=Db) -> - Updater ! {From, increment_update_seq}, - {noreply, Db}; -handle_call(get_db, _From, Db) -> +handle_call({open_ref_counted_instance, OpenerPid}, _From, #db{fd=Fd}=Db) -> + ok = couch_file:add_ref(Fd, OpenerPid), {reply, {ok, Db}, Db}; -handle_call({db_updated, NewDb}, _From, _OldDb) -> +handle_call(num_refs, _From, #db{fd=Fd}=Db) -> + {reply, couch_file:num_refs(Fd) - 1, Db}; +handle_call({db_updated, #db{fd=NewFd}=NewDb}, _From, #db{fd=OldFd}) -> + case NewFd == OldFd of + true -> ok; + false -> + couch_file:add_ref(NewFd), + couch_file:drop_ref(OldFd) + end, {reply, ok, NewDb}. -handle_cast(start_compact, #db{update_pid=Updater}=Db) -> - Updater ! compact, - {noreply, Db}. +handle_cast(Msg, Db) -> + ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]), + exit({error, Msg}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -508,114 +390,6 @@ handle_info(Msg, Db) -> %%% Internal function %%% - -start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) -> - link(Fd), - - case lists:member(create, Options) of - true -> - % create a new header and writes it to the file - Header = #db_header{}, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header), - % delete any old compaction files that might be hanging around - file:delete(Filepath ++ ".compact"), - file:delete(Filepath ++ ".old"); - false -> - {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG) - end, - - Db = init_db(DbName, Filepath, Fd, Header), - Db2 = Db#db{main_pid=MainPid}, - MainPid ! {initialized, Db2}, - update_loop(Db2). - -update_loop(#db{fd=Fd,name=Name, - filepath=Filepath, - main_pid=MainPid, - update_seq=UpdateSeq}=Db) -> - receive - {OrigFrom, update_docs, DocActions, Options} -> - case (catch update_docs_int(Db, DocActions, Options)) of - {ok, Db2} -> - ok = gen_server:call(MainPid, {db_updated, Db2}), - gen_server:reply(OrigFrom, ok), - couch_db_update_notifier:notify({updated, Name}), - update_loop(Db2); - retry -> - gen_server:reply(OrigFrom, retry), - update_loop(Db); - conflict -> - gen_server:reply(OrigFrom, conflict), - update_loop(Db); - Error -> - exit(Error) % we crashed - end; - compact -> - case Db#db.compactor_pid of - nil -> - ?LOG_INFO("Starting compaction for db \"~s\"", [Name]), - Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]), - Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(MainPid, {db_updated, Db2}), - update_loop(Db2); - _ -> - update_loop(Db) % already started - end; - {compact_done, CompactFilepath} -> - {ok, NewFd} = couch_file:open(CompactFilepath), - {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG), - #db{update_seq=NewSeq}= NewDb = - init_db(Name, CompactFilepath, NewFd, NewHeader), - case Db#db.update_seq == NewSeq of - true -> - NewDb2 = commit_data( - NewDb#db{ - main_pid = Db#db.main_pid, - doc_count = Db#db.doc_count, - doc_del_count = Db#db.doc_del_count, - filepath = Filepath}), - - ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), - ok = file:rename(Filepath, Filepath ++ ".old"), - ok = file:rename(CompactFilepath, Filepath), - - couch_stream:close(Db#db.summary_stream), - % close file handle async. - % wait 5 secs before closing, allowing readers to finish - unlink(Fd), - spawn_link(fun() -> - receive after 5000 -> ok end, - couch_file:close(Fd), - file:delete(Filepath ++ ".old") - end), - - ok = gen_server:call(MainPid, {db_updated, NewDb2}), - ?LOG_INFO("Compaction for db ~p completed.", [Name]), - update_loop(NewDb2#db{compactor_pid=nil}); - false -> - ?LOG_INFO("Compaction file still behind main file " - "(update seq=~p. compact update seq=~p). Retrying.", - [Db#db.update_seq, NewSeq]), - Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]), - Db2 = Db#db{compactor_pid=Pid}, - couch_file:close(NewFd), - update_loop(Db2) - end; - {OrigFrom, increment_update_seq} -> - Db2 = commit_data(Db#db{update_seq=UpdateSeq+1}), - ok = gen_server:call(MainPid, {db_updated, Db2}), - gen_server:reply(OrigFrom, {ok, UpdateSeq+1}), - couch_db_update_notifier:notify({updated, Name}), - update_loop(Db2); - Else -> - ?LOG_ERROR("Unknown message received in db ~s:~p", [Db#db.name, Else]), - exit({error, Else}) - end. - -get_db(MainPid) -> - {ok, Db} = gen_server:call(MainPid, get_db), - Db. - open_doc_revs_int(Db, IdRevs, Options) -> Ids = [Id || {Id, _Revs} <- IdRevs], LookupResults = get_full_doc_infos(Db, Ids), @@ -711,16 +485,6 @@ doc_meta_info(DocInfo, RevTree, Options) -> end end. -% rev tree functions - -doc_to_tree(Doc) -> - doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). - -doc_to_tree(Doc, [RevId]) -> - [{RevId, Doc, []}]; -doc_to_tree(Doc, [RevId | Rest]) -> - [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}]. - make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> {BodyData, BinValues} = case SummaryPointer of @@ -737,303 +501,6 @@ make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) -> attachments = BinValues, deleted = Deleted }. - -flush_trees(_Db, [], AccFlushedTrees) -> - {ok, lists:reverse(AccFlushedTrees)}; -flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> - #full_doc_info{rev_tree=Unflushed} = InfoUnflushed, - Flushed = couch_key_tree:map( - fun(_Rev, Value) -> - case Value of - #doc{attachments=Atts,deleted=IsDeleted}=Doc -> - % this node value is actually an unwritten document summary, - % write to disk. - % make sure the Fd in the written bins is the same Fd we are. - Bins = - case Atts of - [] -> []; - [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd -> - % convert bins, removing the FD. - % All bins should have been flushed to disk already. - [{BinName, {BinType, BinSp, BinLen}} - || {BinName, {BinType, {_Fd, BinSp, BinLen}}} - <- Atts]; - _ -> - % BinFd must not equal our Fd. This can happen when a database - % is being updated during a compaction - ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), - throw(retry) - end, - {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), - {IsDeleted, NewSummaryPointer}; - _ -> - Value - end - end, Unflushed), - flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). - -merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) -> - {ok, lists:reverse(AccNewInfos), AccSeq}; -merge_rev_trees(NoConflicts, [NewDocs|RestDocsList], - [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) -> - #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo, - UpdatesRevTree = lists:foldl( - fun(NewDoc, AccTree) -> - couch_key_tree:merge(AccTree, doc_to_tree(NewDoc)) - end, - [], NewDocs), - NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), - if NewRevTree == OldTree -> - % nothing changed - merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq); - true -> - if NoConflicts andalso OldTree /= [] -> - OldConflicts = couch_key_tree:count_leafs(OldTree), - NewConflicts = couch_key_tree:count_leafs(NewRevTree), - if NewConflicts > OldConflicts -> - throw(conflict); - true -> ok - end; - true -> ok - end, - NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, - merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo, - [NewInfo|AccNewInfos],AccSeq+1) - end. - -new_index_entries([], DocCount, DelCount, AccById, AccBySeq) -> - {ok, DocCount, DelCount, AccById, AccBySeq}; -new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) -> - #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), - {DocCount2, DelCount2} = - if Deleted -> {DocCount, DelCount + 1}; - true -> {DocCount + 1, DelCount} - end, - new_index_entries(RestInfos, DocCount2, DelCount2, - [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], - [DocInfo|AccBySeq]). - -update_docs_int(Db, DocsList, Options) -> - #db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree, - docinfo_by_seq_btree = DocInfoBySeqBTree, - update_seq = LastSeq, - doc_count = FullDocCount, - doc_del_count = FullDelCount - } = Db, - - % separate out the NonRep documents from the rest of the documents - {DocsList2, NonRepDocs} = lists:foldl( - fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> - case Id of - ?LOCAL_DOC_PREFIX ++ _ when Rest==[] -> - % when saving NR (non rep) documents, you can only save a single rev - {DocsListAcc, [Doc | NonRepDocsAcc]}; - Id-> - {[Docs | DocsListAcc], NonRepDocsAcc} - end - end, {[], []}, DocsList), - - Ids = [Id || [#doc{id=Id}|_] <- DocsList2], - - % lookup up the existing documents, if they exist. - OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), - OldDocInfos = lists:zipwith( - fun(_Id, {ok, FullDocInfo}) -> - FullDocInfo; - (Id, not_found) -> - #full_doc_info{id=Id} - end, - Ids, OldDocLookups), - - {OldCount, OldDelCount} = lists:foldl( - fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> - case couch_doc:to_doc_info(FullDocInfo) of - #doc_info{deleted=false} -> - {OldCountAcc + 1, OldDelCountAcc}; - _ -> - {OldCountAcc, OldDelCountAcc + 1} - end; - (not_found, Acc) -> - Acc - end, {0, 0}, OldDocLookups), - - % Merge the new docs into the revision trees. - NoConflicts = lists:member(new_edits, Options), - {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq), - - RemoveSeqs = - [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], - - % All regular documents are now ready to write. - - % Try to write the local documents first, a conflict might be generated - {ok, Db2} = update_local_docs(Db, NonRepDocs), - - % Write out the documents summaries (they are stored in the nodes of the rev trees) - {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), - - {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = - new_index_entries(FlushedDocInfos, 0, 0, [], []), - - % and the indexes to the documents - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), - - Db3 = Db2#db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree2, - docinfo_by_seq_btree = DocInfoBySeqBTree2, - update_seq = NewSeq, - doc_count = FullDocCount + NewDocsCount - OldCount, - doc_del_count = FullDelCount + NewDelCount - OldDelCount}, - - case lists:member(delay_commit, Options) of - true -> - {ok, Db3}; - false -> - {ok, commit_data(Db3)} - end. - -update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> - Ids = [Id || #doc{id=Id} <- Docs], - OldDocLookups = couch_btree:lookup(Btree, Ids), - BtreeEntries = lists:zipwith( - fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> - NewRev = - case Revs of - [] -> 0; - [RevStr|_] -> list_to_integer(RevStr) - end, - OldRev = - case OldDocLookup of - {ok, {_, {OldRev0, _}}} -> OldRev0; - not_found -> 0 - end, - case OldRev + 1 == NewRev of - true -> - case Delete of - false -> {update, {Id, {NewRev, Body}}}; - true -> {remove, Id} - end; - false -> - throw(conflict) - end - - end, Docs, OldDocLookups), - - BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], - BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], - - {ok, Btree2} = - couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - - {ok, Db#db{local_docs_btree = Btree2}}. - - - -commit_data(#db{fd=Fd, header=Header} = Db) -> - Header2 = Header#db_header{ - update_seq = Db#db.update_seq, - summary_stream_state = couch_stream:get_state(Db#db.summary_stream), - docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), - fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), - local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), - doc_count = Db#db.doc_count, - doc_del_count = Db#db.doc_del_count - }, - if Header == Header2 -> - Db; % unchanged. nothing to do - true -> - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), - Db#db{header = Header2} - end. - -copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> - {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), - % copy the bin values - NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> - {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd), - {Name, {Type, NewBinSp, Len}} - end, BinInfos), - % now write the document summary - {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}), - Sp. - -copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> - []; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> - % This is a leaf node, copy it over - NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), - [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; -copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> - % inner node, only copy info/data from leaf nodes - [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. - -copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) -> - Ids = [Id || #doc_info{id=Id} <- InfoBySeq], - LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), - NewFullDocInfos = lists:map( - fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} - end, LookupResults), - NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos], - {ok, DocInfoBTree} = - couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []), - {ok, FullDocInfoBTree} = - couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), - NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}. - - - -copy_compact_docs(Db, NewDb) -> - EnumBySeqFun = - fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) -> - case couch_util:should_flush() of - true -> - NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])), - {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}}; - false -> - {ok, {AccNewDb, [DocInfo | AccUncopied]}} - end - end, - {ok, {NewDb2, Uncopied}} = - couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}), - - case Uncopied of - [#doc_info{update_seq=LastSeq} | _] -> - commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq}, - lists:reverse(Uncopied))); - [] -> - NewDb2 - end. - -start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> - CompactFile = Filepath ++ ".compact", - ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), - case couch_file:open(CompactFile) of - {ok, Fd} -> - ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]), - {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); - {error, enoent} -> % - {ok, Fd} = couch_file:open(CompactFile, [create]), - Header = #db_header{}, - ok = couch_file:write_header(Fd, ?HEADER_SIG, Header) - end, - NewDb = init_db(Name, CompactFile, Fd, Header), - NewDb2 = copy_compact_docs(Db, NewDb), - NewDb3 = - case CopyLocal of - true -> - % suck up all the local docs into memory and write them to the new db - {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), - commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}); - _ -> - NewDb2 - end, - close_db(NewDb3), - Db#db.update_pid ! {compact_done, CompactFile}. \ No newline at end of file diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl index 9ca5d815..0c274396 100644 --- a/src/couchdb/couch_db.hrl +++ b/src/couchdb/couch_db.hrl @@ -69,3 +69,40 @@ % couch_db:open_doc(Db, Id, Options). meta = [] }). + + + + + +-record(db_header, + {write_version = 0, + update_seq = 0, + summary_stream_state = nil, + fulldocinfo_by_id_btree_state = nil, + docinfo_by_seq_btree_state = nil, + local_docs_btree_state = nil, + doc_count=0, + doc_del_count=0 + }). + +-record(db, + {main_pid=nil, + update_pid=nil, + compactor_pid=nil, + fd, + header = #db_header{}, + summary_stream, + fulldocinfo_by_id_btree, + docinfo_by_seq_btree, + local_docs_btree, + update_seq, + doc_count, + doc_del_count, + name, + filepath + }). + + + +% small value used in revision trees to indicate the revision isn't stored +-define(REV_MISSING, []). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl new file mode 100644 index 00000000..f0673af9 --- /dev/null +++ b/src/couchdb/couch_db_updater.erl @@ -0,0 +1,499 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_db_updater). +-behaviour(gen_server). + +-export([btree_by_id_reduce/2,btree_by_seq_reduce/2]). +-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). + +-include("couch_db.hrl"). + +-define(HEADER_SIG, <<$g, $m, $k, 0>>). + +init({MainPid, DbName, Filepath, Fd, Options}) -> + link(Fd), + case lists:member(create, Options) of + true -> + % create a new header and writes it to the file + Header = #db_header{}, + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header), + % delete any old compaction files that might be hanging around + file:delete(Filepath ++ ".compact"), + file:delete(Filepath ++ ".old"); + false -> + {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG) + end, + + Db = init_db(DbName, Filepath, Fd, Header), + {ok, Db#db{main_pid=MainPid}}. + +terminate(_Reason, Db) -> + close_db(Db). + +handle_call(get_db, _From, Db) -> + {reply, {ok, Db}, Db}; +handle_call({update_docs, DocActions, Options}, _From, Db) -> + try update_docs_int(Db, DocActions, Options) of + {ok, Db2} -> + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + couch_db_update_notifier:notify({updated, Db2#db.name}), + {reply, ok, Db2} + catch + throw: retry -> + {reply, retry, Db}; + throw: conflict -> + {reply, conflict, Db} + end; +handle_call(increment_update_seq, _From, Db) -> + Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), + ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + couch_db_update_notifier:notify({updated, Db#db.name}), + {reply, {ok, Db2#db.update_seq}, Db2}. + + +handle_cast(start_compact, Db) -> + case Db#db.compactor_pid of + nil -> + ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), + Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end), + Db2 = Db#db{compactor_pid=Pid}, + ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + {noreply, Db2}; + _ -> + % compact currently running, this is a no-op + {noreply, Db} + end; +handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> + {ok, NewFd} = couch_file:open(CompactFilepath), + {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG), + #db{update_seq=NewSeq} = NewDb = + init_db(Db#db.name, CompactFilepath, NewFd, NewHeader), + case Db#db.update_seq == NewSeq of + true -> + NewDb2 = commit_data( + NewDb#db{ + main_pid = Db#db.main_pid, + doc_count = Db#db.doc_count, + doc_del_count = Db#db.doc_del_count, + filepath = Filepath}), + + ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]), + file:delete(Filepath), + ok = file:rename(CompactFilepath, Filepath), + + couch_stream:close(Db#db.summary_stream), + couch_file:close_maybe(Db#db.fd), + file:delete(Filepath ++ ".old"), + + ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}), + ?LOG_INFO("Compaction for db ~p completed.", [Db#db.name]), + {noreply, NewDb2#db{compactor_pid=nil}}; + false -> + ?LOG_INFO("Compaction file still behind main file " + "(update seq=~p. compact update seq=~p). Retrying.", + [Db#db.update_seq, NewSeq]), + Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end), + Db2 = Db#db{compactor_pid=Pid}, + couch_file:close(NewFd), + {noreply, Db2} + end. + +handle_info(Msg, Db) -> + ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), + exit({error, Msg}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +btree_by_seq_split(DocInfo) -> + #doc_info{ + id = Id, + rev = Rev, + update_seq = Seq, + summary_pointer = Sp, + conflict_revs = Conflicts, + deleted_conflict_revs = DelConflicts, + deleted = Deleted} = DocInfo, + {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}. + +btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) -> + #doc_info{ + id = Id, + rev = Rev, + update_seq = Seq, + summary_pointer = Sp, + conflict_revs = Conflicts, + deleted_conflict_revs = DelConflicts, + deleted = Deleted}. + +btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, + deleted=Deleted, rev_tree=Tree}) -> + {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}. + +btree_by_id_join(Id, {Seq, Deleted, Tree}) -> + #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}. + + + +btree_by_id_reduce(reduce, FullDocInfos) -> + % count the number of deleted documents + length([1 || #full_doc_info{deleted=false} <- FullDocInfos]); +btree_by_id_reduce(rereduce, Reds) -> + lists:sum(Reds). + +btree_by_seq_reduce(reduce, DocInfos) -> + % count the number of deleted documents + length(DocInfos); +btree_by_seq_reduce(rereduce, Reds) -> + lists:sum(Reds). + +init_db(DbName, Filepath, Fd, Header) -> + {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd), + ok = couch_stream:set_min_buffer(SummaryStream, 10000), + {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, + [{split, fun(X) -> btree_by_id_split(X) end}, + {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, + {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]), + {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, + [{split, fun(X) -> btree_by_seq_split(X) end}, + {join, fun(X,Y) -> btree_by_seq_join(X,Y) end}, + {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]), + {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), + + #db{ + update_pid=self(), + fd=Fd, + header=Header, + summary_stream = SummaryStream, + fulldocinfo_by_id_btree = IdBtree, + docinfo_by_seq_btree = SeqBtree, + local_docs_btree = LocalDocsBtree, + update_seq = Header#db_header.update_seq, + doc_count = Header#db_header.doc_count, + doc_del_count = Header#db_header.doc_del_count, + name = DbName, + filepath=Filepath }. + +close_db(#db{fd=Fd,summary_stream=Ss}) -> + couch_file:close(Fd), + couch_stream:close(Ss). + +% rev tree functions + +doc_to_tree(Doc) -> + doc_to_tree(Doc, lists:reverse(Doc#doc.revs)). + +doc_to_tree(Doc, [RevId]) -> + [{RevId, Doc, []}]; +doc_to_tree(Doc, [RevId | Rest]) -> + [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}]. + +flush_trees(_Db, [], AccFlushedTrees) -> + {ok, lists:reverse(AccFlushedTrees)}; +flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> + #full_doc_info{rev_tree=Unflushed} = InfoUnflushed, + Flushed = couch_key_tree:map( + fun(_Rev, Value) -> + case Value of + #doc{attachments=Atts,deleted=IsDeleted}=Doc -> + % this node value is actually an unwritten document summary, + % write to disk. + % make sure the Fd in the written bins is the same Fd we are. + Bins = + case Atts of + [] -> []; + [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd -> + % convert bins, removing the FD. + % All bins should have been flushed to disk already. + [{BinName, {BinType, BinSp, BinLen}} + || {BinName, {BinType, {_Fd, BinSp, BinLen}}} + <- Atts]; + _ -> + % BinFd must not equal our Fd. This can happen when a database + % is being switched out during a compaction + ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []), + throw(retry) + end, + {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}), + {IsDeleted, NewSummaryPointer}; + _ -> + Value + end + end, Unflushed), + flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). + +merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) -> + {ok, lists:reverse(AccNewInfos), AccSeq}; +merge_rev_trees(NoConflicts, [NewDocs|RestDocsList], + [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) -> + #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo, + UpdatesRevTree = lists:foldl( + fun(NewDoc, AccTree) -> + couch_key_tree:merge(AccTree, doc_to_tree(NewDoc)) + end, + [], NewDocs), + NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree), + if NewRevTree == OldTree -> + % nothing changed + merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq); + true -> + if NoConflicts andalso OldTree /= [] -> + OldConflicts = couch_key_tree:count_leafs(OldTree), + NewConflicts = couch_key_tree:count_leafs(NewRevTree), + if NewConflicts > OldConflicts -> + throw(conflict); + true -> ok + end; + true -> ok + end, + NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree}, + merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo, + [NewInfo|AccNewInfos],AccSeq+1) + end. + +new_index_entries([], DocCount, DelCount, AccById, AccBySeq) -> + {ok, DocCount, DelCount, AccById, AccBySeq}; +new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) -> + #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo), + {DocCount2, DelCount2} = + if Deleted -> {DocCount, DelCount + 1}; + true -> {DocCount + 1, DelCount} + end, + new_index_entries(RestInfos, DocCount2, DelCount2, + [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], + [DocInfo|AccBySeq]). + +update_docs_int(Db, DocsList, Options) -> + #db{ + fulldocinfo_by_id_btree = DocInfoByIdBTree, + docinfo_by_seq_btree = DocInfoBySeqBTree, + update_seq = LastSeq, + doc_count = FullDocCount, + doc_del_count = FullDelCount + } = Db, + + % separate out the NonRep documents from the rest of the documents + {DocsList2, NonRepDocs} = lists:foldl( + fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) -> + case Id of + ?LOCAL_DOC_PREFIX ++ _ when Rest==[] -> + % when saving NR (non rep) documents, you can only save a single rev + {DocsListAcc, [Doc | NonRepDocsAcc]}; + Id-> + {[Docs | DocsListAcc], NonRepDocsAcc} + end + end, {[], []}, DocsList), + + Ids = [Id || [#doc{id=Id}|_] <- DocsList2], + + % lookup up the existing documents, if they exist. + OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), + OldDocInfos = lists:zipwith( + fun(_Id, {ok, FullDocInfo}) -> + FullDocInfo; + (Id, not_found) -> + #full_doc_info{id=Id} + end, + Ids, OldDocLookups), + + {OldCount, OldDelCount} = lists:foldl( + fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) -> + case couch_doc:to_doc_info(FullDocInfo) of + #doc_info{deleted=false} -> + {OldCountAcc + 1, OldDelCountAcc}; + _ -> + {OldCountAcc, OldDelCountAcc + 1} + end; + (not_found, Acc) -> + Acc + end, {0, 0}, OldDocLookups), + + % Merge the new docs into the revision trees. + NoConflicts = lists:member(new_edits, Options), + {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq), + + RemoveSeqs = + [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups], + + % All regular documents are now ready to write. + + % Try to write the local documents first, a conflict might be generated + {ok, Db2} = update_local_docs(Db, NonRepDocs), + + % Write out the documents summaries (they are stored in the nodes of the rev trees) + {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []), + + {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} = + new_index_entries(FlushedDocInfos, 0, 0, [], []), + + % and the indexes to the documents + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []), + + Db3 = Db2#db{ + fulldocinfo_by_id_btree = DocInfoByIdBTree2, + docinfo_by_seq_btree = DocInfoBySeqBTree2, + update_seq = NewSeq, + doc_count = FullDocCount + NewDocsCount - OldCount, + doc_del_count = FullDelCount + NewDelCount - OldDelCount}, + + case lists:member(delay_commit, Options) of + true -> + {ok, Db3}; + false -> + {ok, commit_data(Db3)} + end. + +update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> + Ids = [Id || #doc{id=Id} <- Docs], + OldDocLookups = couch_btree:lookup(Btree, Ids), + BtreeEntries = lists:zipwith( + fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) -> + NewRev = + case Revs of + [] -> 0; + [RevStr|_] -> list_to_integer(RevStr) + end, + OldRev = + case OldDocLookup of + {ok, {_, {OldRev0, _}}} -> OldRev0; + not_found -> 0 + end, + case OldRev + 1 == NewRev of + true -> + case Delete of + false -> {update, {Id, {NewRev, Body}}}; + true -> {remove, Id} + end; + false -> + throw(conflict) + end + + end, Docs, OldDocLookups), + + BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], + BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries], + + {ok, Btree2} = + couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), + + {ok, Db#db{local_docs_btree = Btree2}}. + + + +commit_data(#db{fd=Fd, header=Header} = Db) -> + Header2 = Header#db_header{ + update_seq = Db#db.update_seq, + summary_stream_state = couch_stream:get_state(Db#db.summary_stream), + docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), + fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), + local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), + doc_count = Db#db.doc_count, + doc_del_count = Db#db.doc_del_count + }, + if Header == Header2 -> + Db; % unchanged. nothing to do + true -> + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2), + Db#db{header = Header2} + end. + +copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) -> + {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp), + % copy the bin values + NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) -> + {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd), + {Name, {Type, NewBinSp, Len}} + end, BinInfos), + % now write the document summary + {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}), + Sp. + +copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) -> + []; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) -> + % This is a leaf node, copy it over + NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream), + [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]; +copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) -> + % inner node, only copy info/data from leaf nodes + [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)]. + +copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) -> + Ids = [Id || #doc_info{id=Id} <- InfoBySeq], + LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), + NewFullDocInfos = lists:map( + fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> + Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)} + end, LookupResults), + NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos], + {ok, DocInfoBTree} = + couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []), + {ok, FullDocInfoBTree} = + couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), + NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}. + + + +copy_compact_docs(Db, NewDb) -> + EnumBySeqFun = + fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) -> + case couch_util:should_flush() of + true -> + NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])), + {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}}; + false -> + {ok, {AccNewDb, [DocInfo | AccUncopied]}} + end + end, + {ok, {NewDb2, Uncopied}} = + couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}), + + case Uncopied of + [#doc_info{update_seq=LastSeq} | _] -> + commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq}, + lists:reverse(Uncopied))); + [] -> + NewDb2 + end. + +start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) -> + CompactFile = Filepath ++ ".compact", + ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), + case couch_file:open(CompactFile) of + {ok, Fd} -> + ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]), + {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); + {error, enoent} -> + receive after 1000 -> ok end, + {ok, Fd} = couch_file:open(CompactFile, [create]), + Header = #db_header{}, + ok = couch_file:write_header(Fd, ?HEADER_SIG, Header) + end, + NewDb = init_db(Name, CompactFile, Fd, Header), + NewDb2 = copy_compact_docs(Db, NewDb), + NewDb3 = + case CopyLocal of + true -> + % suck up all the local docs into memory and write them to the new db + {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), + {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs), + commit_data(NewDb2#db{local_docs_btree=NewLocalBtree}); + _ -> + NewDb2 + end, + close_db(NewDb3), + + gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}). \ No newline at end of file diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index b48c9bf3..c04ac33a 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -20,6 +20,7 @@ -export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]). -export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). +-export([close_maybe/1,drop_ref/1,drop_ref/2,add_ref/1,add_ref/2,num_refs/1]). %%---------------------------------------------------------------------- %% Args: Valid Options are [create] and [create,overwrite]. @@ -164,7 +165,25 @@ sync(Fd) -> %%---------------------------------------------------------------------- close(Fd) -> gen_server:cast(Fd, close). + +close_maybe(Fd) -> + gen_server:cast(Fd, {close_maybe, self()}). + +drop_ref(Fd) -> + drop_ref(Fd, self()). + +drop_ref(Fd, Pid) -> + gen_server:cast(Fd, {drop_ref, Pid}). + + +add_ref(Fd) -> + add_ref(Fd, self()). +add_ref(Fd, Pid) -> + gen_server:call(Fd, {add_ref, Pid}). + +num_refs(Fd) -> + gen_server:call(Fd, num_refs). write_header(Fd, Prefix, Data) -> TermBin = term_to_binary(Data), @@ -267,7 +286,7 @@ init_status_ok(ReturnPid, Fd) -> init_status_error(ReturnPid, Error) -> ReturnPid ! {self(), Error}, % signal back error status - self() ! self_close, % tell ourself to close async + gen_server:cast(self(), close), % tell ourself to close async {ok, nil}. % server functions @@ -342,16 +361,57 @@ handle_call({pread_bin, Pos}, _From, Fd) -> {ok, <>} = file:pread(Fd, Pos, 4), {ok, Bin} = file:pread(Fd, Pos + 4, TermLen), - {reply, {ok, Bin}, Fd}. + {reply, {ok, Bin}, Fd}; +handle_call({add_ref, Pid},_From, Fd) -> + undefined = put(Pid, erlang:monitor(process, Pid)), + {reply, ok, Fd}; +handle_call(num_refs, _From, Fd) -> + {monitors, Monitors} = process_info(self(), monitors), + {reply, length(Monitors), Fd}. + handle_cast(close, Fd) -> - {stop,normal,Fd}. % causes terminate to be called + {stop,normal,Fd}; +handle_cast({close_maybe, Pid}, Fd) -> + catch unlink(Pid), + maybe_close_async(Fd); +handle_cast({drop_ref, Pid}, Fd) -> + % don't check return of demonitor. The process could haved crashed causing + % the {'DOWN', ...} message to be sent and the process unmonitored. + erlang:demonitor(erase(Pid), [flush]), + maybe_close_async(Fd). + code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_info(self_close, State) -> - {stop,normal,State}; -handle_info(_Info, State) -> - {noreply, State}. +handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) -> + MonitorRef = erase(Pid), + maybe_close_async(Fd); +handle_info(Info, Fd) -> + exit({error, {Info, Fd}}). + + + +should_close(Fd) -> + case process_info(self(), links) of + {links, [Fd]} -> + % no linkers left (except our fd). What about monitors? + case process_info(self(), monitors) of + {monitors, []} -> + true; + _ -> + false + end; + {links, Links} when length(Links) > 1 -> + false + end. + +maybe_close_async(Fd) -> + case should_close(Fd) of + true -> + {stop,normal,Fd}; + false -> + {noreply,Fd} + end. diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl index 88271390..d1a4fa90 100644 --- a/src/couchdb/couch_httpd.erl +++ b/src/couchdb/couch_httpd.erl @@ -156,7 +156,8 @@ handle_db_request(Req, Method, {Path}) -> handle_db_request(Req, 'PUT', {DbName, []}) -> case couch_server:create(DbName, []) of - {ok, _Db} -> + {ok, Db} -> + couch_db:close(Db), send_json(Req, 201, {obj, [{ok, true}]}); {error, database_already_exists} -> Msg = io_lib:format("Database ~p already exists.", [DbName]), @@ -167,9 +168,13 @@ handle_db_request(Req, 'PUT', {DbName, []}) -> end; handle_db_request(Req, Method, {DbName, Rest}) -> - case couch_server:open(DbName) of + case couch_db:open(DbName, []) of {ok, Db} -> - handle_db_request(Req, Method, {DbName, Db, Rest}); + try + handle_db_request(Req, Method, {DbName, Db, Rest}) + after + couch_db:close(Db) + end; Error -> throw(Error) end; diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl index b2d46beb..f7aaa67c 100644 --- a/src/couchdb/couch_rep.erl +++ b/src/couchdb/couch_rep.erl @@ -43,7 +43,18 @@ replicate(DbNameA, DbNameB) -> replicate(Source, Target, Options) -> {ok, DbSrc} = open_db(Source), - {ok, DbTgt} = open_db(Target), + try + {ok, DbTgt} = open_db(Target), + try + replicate2(Source, DbSrc, Target, DbTgt, Options) + after + close_db(DbTgt) + end + after + close_db(DbSrc) + end. + +replicate2(Source, DbSrc, Target, DbTgt, Options) -> {ok, HostName} = inet:gethostname(), RepRecKey = ?LOCAL_DOC_PREFIX ++ HostName ++ ":" ++ Source ++ ":" ++ Target, @@ -237,7 +248,12 @@ open_db("http" ++ DbName)-> {ok, "http" ++ DbName ++ "/"} end; open_db(DbName)-> - couch_server:open(DbName). + couch_db:open(DbName, []). + +close_db("http" ++ _)-> + ok; +close_db(DbName)-> + couch_db:close(DbName). enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) -> diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl index bb3617b2..86cdb2f8 100644 --- a/src/couchdb/couch_server.erl +++ b/src/couchdb/couch_server.erl @@ -15,7 +15,7 @@ -behaviour(application). -export([start/0,start/1,start/2,stop/0,stop/1]). --export([open/1,create/2,delete/1,all_databases/0,get_version/0]). +-export([open/2,create/2,delete/1,all_databases/0,get_version/0]). -export([init/1, handle_call/3,sup_start_link/2]). -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). -export([dev_start/0,remote_restart/0]). @@ -25,7 +25,9 @@ -record(server,{ root_dir = [], dbname_regexp, - options=[] + remote_restart=[], + max_dbs_open=100, + current_dbs_open=0 }). start() -> @@ -64,33 +66,41 @@ get_version() -> sup_start_link(RootDir, Options) -> gen_server:start_link({local, couch_server}, couch_server, {RootDir, Options}, []). -open(Filename) -> - gen_server:call(couch_server, {open, Filename}). +open(DbName, Options) -> + gen_server:call(couch_server, {open, DbName, Options}). -create(Filename, Options) -> - gen_server:call(couch_server, {create, Filename, Options}). +create(DbName, Options) -> + gen_server:call(couch_server, {create, DbName, Options}). -delete(Filename) -> - gen_server:call(couch_server, {delete, Filename}). +delete(DbName) -> + gen_server:call(couch_server, {delete, DbName}). remote_restart() -> gen_server:call(couch_server, remote_restart). -init({RootDir, Options}) -> - {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"), - {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, options=Options}}. - -check_filename(#server{dbname_regexp=RegExp}, Filename) -> - case regexp:match(Filename, RegExp) of +check_dbname(#server{dbname_regexp=RegExp}, DbName) -> + case regexp:match(DbName, RegExp) of nomatch -> {error, illegal_database_name}; _Match -> ok end. -get_full_filename(Server, Filename) -> - filename:join([Server#server.root_dir, "./" ++ Filename ++ ".couch"]). +get_full_filename(Server, DbName) -> + filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]). +init({RootDir, Options}) -> + {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"), + ets:new(couch_dbs_by_name, [set, private, named_table]), + ets:new(couch_dbs_by_pid, [set, private, named_table]), + ets:new(couch_dbs_by_lru, [ordered_set, private, named_table]), + process_flag(trap_exit, true), + MaxDbsOpen = proplists:get_value(max_dbs_open, Options), + RemoteRestart = proplists:get_value(remote_restart, Options), + {ok, #server{root_dir=RootDir, + dbname_regexp=RegExp, + max_dbs_open=MaxDbsOpen, + remote_restart=RemoteRestart}}. terminate(_Reason, _Server) -> ok. @@ -109,107 +119,141 @@ all_databases() -> {ok, Filenames}. +maybe_close_lru_db(#server{current_dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server) + when NumOpen < MaxOpen -> + {ok, Server}; +maybe_close_lru_db(#server{current_dbs_open=NumOpen}=Server) -> + % must free up the lru db. + case try_close_lru(now()) of + ok -> {ok, Server#server{current_dbs_open=NumOpen-1}}; + Error -> Error + end. + +try_close_lru(StartTime) -> + LruTime = ets:first(couch_dbs_by_lru), + if LruTime > StartTime -> + % this means we've looped through all our opened dbs and found them + % all in use. + {error, all_dbs_active}; + true -> + [{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime), + [{_, {MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName), + case couch_db:num_refs(MainPid) of + 0 -> + exit(MainPid, kill), + receive {'EXIT', MainPid, _Reason} -> ok end, + true = ets:delete(couch_dbs_by_lru, LruTime), + true = ets:delete(couch_dbs_by_name, DbName), + true = ets:delete(couch_dbs_by_pid, MainPid), + ok; + _NumRefs -> + % this still has referrers. Go ahead and give it a current lru time + % and try the next one in the table. + NewLruTime = now(), + true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, NewLruTime}}), + true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}), + true = ets:delete(couch_dbs_by_lru, LruTime), + true = ets:insert(couch_dbs_by_lru, {NewLruTime, DbName}), + try_close_lru(StartTime) + end + end. + + handle_call(get_root, _From, #server{root_dir=Root}=Server) -> {reply, {ok, Root}, Server}; -handle_call({open, Filename}, From, Server) -> - case check_filename(Server, Filename) of - {error, Error} -> - {reply, {error, Error}, Server}; - ok -> - Filepath = get_full_filename(Server, Filename), - Result = supervisor:start_child(couch_server_sup, - {Filename, - {couch_db, open, [Filename, Filepath]}, - transient , - infinity, - supervisor, - [couch_db]}), - case Result of - {ok, Db} -> - {reply, {ok, Db}, Server}; - {error, already_present} -> - ok = supervisor:delete_child(couch_server_sup, Filename), - % call self recursively - handle_call({open, Filename}, From, Server); - {error, {already_started, Db}} -> - {reply, {ok, Db}, Server}; - {error, {not_found, _}} -> - {reply, not_found, Server}; - {error, {Error, _}} -> - {reply, {error, Error}, Server} - end +handle_call({open, DbName, Options}, {FromPid,_}, Server) -> + Filepath = get_full_filename(Server, DbName), + LruTime = now(), + case ets:lookup(couch_dbs_by_name, DbName) of + [] -> + case maybe_close_lru_db(Server) of + {ok, Server2} -> + case couch_db:start_link(DbName, Filepath, Options) of + {ok, MainPid} -> + true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}), + true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}), + true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), + DbsOpen = Server2#server.current_dbs_open + 1, + {reply, + couch_db:open_ref_counted(MainPid, FromPid), + Server2#server{current_dbs_open=DbsOpen}}; + CloseError -> + {reply, CloseError, Server2} + end; + Error -> + {reply, Error, Server} + end; + [{_, {MainPid, PrevLruTime}}] -> + true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}), + true = ets:delete(couch_dbs_by_lru, PrevLruTime), + true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), + {reply, couch_db:open_ref_counted(MainPid, FromPid), Server} end; -handle_call({create, Filename, Options}, _From, Server) -> - case check_filename(Server, Filename) of - {error, Error} -> - {reply, {error, Error}, Server}; +handle_call({create, DbName, Options}, {FromPid,_}, Server) -> + case check_dbname(Server, DbName) of ok -> - Filepath = get_full_filename(Server, Filename), - ChildSpec = {Filename, - {couch_db, create, [Filename, Filepath, Options]}, - transient, - infinity, - supervisor, - [couch_db]}, - Result = - case supervisor:delete_child(couch_server_sup, Filename) of - ok -> - sup_start_child(couch_server_sup, ChildSpec); - {error, not_found} -> - sup_start_child(couch_server_sup, ChildSpec); - {error, running} -> - % a server process for this database already started. Maybe kill it - case lists:member(overwrite, Options) of - true -> - supervisor:terminate_child(couch_server_sup, Filename), - ok = supervisor:delete_child(couch_server_sup, Filename), - sup_start_child(couch_server_sup, ChildSpec); - false -> - {error, database_already_exists} - end - end, - case Result of - {ok, _Db} -> couch_db_update_notifier:notify({created, Filename}); - _ -> ok - end, - {reply, Result, Server} + Filepath = get_full_filename(Server, DbName), + + case ets:lookup(couch_dbs_by_name, DbName) of + [] -> + case couch_db:start_link(DbName, Filepath, [create|Options]) of + {ok, MainPid} -> + LruTime = now(), + true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}), + true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}), + true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), + DbsOpen = Server#server.current_dbs_open + 1, + {reply, + couch_db:open_ref_counted(MainPid, FromPid), + Server#server{current_dbs_open=DbsOpen}}; + Error -> + {reply, Error, Server} + end; + [_AlreadyRunningDb] -> + {reply, {error, file_exists}, Server} + end; + Error -> + {reply, Error, Server} end; -handle_call({delete, Filename}, _From, Server) -> - FullFilepath = get_full_filename(Server, Filename), - supervisor:terminate_child(couch_server_sup, Filename), - supervisor:delete_child(couch_server_sup, Filename), +handle_call({delete, DbName}, _From, Server) -> + FullFilepath = get_full_filename(Server, DbName), + Server2 = + case ets:lookup(couch_dbs_by_name, DbName) of + [] -> Server; + [{_, {Pid, LruTime}}] -> + exit(Pid, kill), + receive {'EXIT', Pid, _Reason} -> ok end, + true = ets:delete(couch_dbs_by_name, DbName), + true = ets:delete(couch_dbs_by_pid, Pid), + true = ets:delete(couch_dbs_by_lru, LruTime), + DbsOpen = Server#server.current_dbs_open - 1, + Server#server{current_dbs_open=DbsOpen} + end, case file:delete(FullFilepath) of ok -> - couch_db_update_notifier:notify({deleted, Filename}), - {reply, ok, Server}; + couch_db_update_notifier:notify({deleted, DbName}), + {reply, ok, Server2}; {error, enoent} -> - {reply, not_found, Server}; + {reply, not_found, Server2}; Else -> - {reply, Else, Server} + {reply, Else, Server2} end; -handle_call(remote_restart, _From, #server{options=Options}=Server) -> - case proplists:get_value(remote_restart, Options) of - true -> - exit(self(), restart); - _ -> - ok - end, +handle_call(remote_restart, _From, #server{remote_restart=false}=Server) -> + {reply, ok, Server}; +handle_call(remote_restart, _From, #server{remote_restart=true}=Server) -> + exit(couch_server_sup, restart), {reply, ok, Server}. -% this function is just to strip out the child spec error stuff if hit -sup_start_child(couch_server_sup, ChildSpec) -> - case supervisor:start_child(couch_server_sup, ChildSpec) of - {error, {Error, _ChildInfo}} -> - {error, Error}; - Else -> - Else - end. - -handle_cast(_Msg, State) -> - {noreply,State}. +handle_cast(Msg, _Server) -> + exit({unknown_cast_message, Msg}). code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_info(_Info, State) -> - {noreply, State}. +handle_info({'EXIT', Pid, _Reason}, Server) -> + [{Pid, DbName}] = ets:lookup(couch_dbs_by_pid, Pid), + true = ets:delete(couch_dbs_by_pid, Pid), + true = ets:delete(couch_dbs_by_name, DbName), + {noreply, Server}; +handle_info(Info, _Server) -> + exit({unknown_message, Info}). diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl index 7a628eb9..5d4099bf 100644 --- a/src/couchdb/couch_server_sup.erl +++ b/src/couchdb/couch_server_sup.erl @@ -74,8 +74,9 @@ start_server(InputIniFilename) -> UtilDriverDir = proplists:get_value({"Couch", "UtilDriverDir"}, Ini, ""), UpdateNotifierExes = proplists:get_all_values({"Couch", "DbUpdateNotificationProcess"}, Ini), FtSearchQueryServer = proplists:get_value({"Couch", "FullTextSearchQueryServer"}, Ini, ""), - RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "undefined")), - ServerOptions = [{remote_restart, RemoteRestart}], + RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "false")), + MaxDbsOpen = proplists:get_value({"Couch", "MaxDbsOpen"}, Ini, 100), + ServerOptions = [{remote_restart, RemoteRestart}, {max_dbs_open, MaxDbsOpen}], QueryServers = [{Lang, QueryExe} || {{"Couch Query Servers", Lang}, QueryExe} <- Ini], ChildProcesses = diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index 53cc4cda..a29a809c 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -57,22 +57,22 @@ get_updater(DbName, GroupId) -> Pid. get_updated_group(Pid) -> - Mref = erlang:monitor(process, Pid), + Mref = erlang:monitor(process, Pid), receive - {'DOWN', Mref, _, _, Reason} -> - throw(Reason) + {'DOWN', Mref, _, _, Reason} -> + throw(Reason) after 0 -> - Pid ! {self(), get_updated}, - receive - {Pid, Response} -> - erlang:demonitor(Mref), - receive - {'DOWN', Mref, _, _, _} -> ok - after 0 -> ok - end, - Response; - {'DOWN', Mref, _, _, Reason} -> - throw(Reason) + Pid ! {self(), get_updated}, + receive + {Pid, Response} -> + erlang:demonitor(Mref), + receive + {'DOWN', Mref, _, _, _} -> ok + after 0 -> ok + end, + Response; + {'DOWN', Mref, _, _, Reason} -> + throw(Reason) end end. @@ -216,10 +216,7 @@ init(RootDir) -> {ok, #server{root_dir=RootDir}}. terminate(_Reason, _) -> - catch ets:delete(couch_views_by_name), - catch ets:delete(couch_views_by_updater), - catch ets:delete(couch_views_by_db), - catch ets:delete(couch_views_temp_fd_by_db). + ok. handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{root_dir=Root}=Server) -> @@ -317,7 +314,7 @@ code_change(_OldVsn, State, _Extra) -> start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> NotifyPids = get_notify_pids(1000), - case couch_server:open(DbName) of + case couch_db:open(DbName, []) of {ok, Db} -> View = #view{map_names=["_temp"], id_num=0, @@ -331,16 +328,20 @@ start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) -> def_lang=Lang, id_btree=nil}, Group2 = init_group(Db, Fd, Group,nil), - temp_update_loop(Group2, NotifyPids); + couch_db:monitor(Db), + couch_db:close(Db), + temp_update_loop(DbName, Group2, NotifyPids); Else -> exit(Else) end. -temp_update_loop(Group, NotifyPids) -> - {ok, Group2} = update_group(Group), +temp_update_loop(DbName, Group, NotifyPids) -> + {ok, Db} = couch_db:open(DbName, []), + {ok, Group2} = update_group(Group#group{db=Db}), + couch_db:close(Db), [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], garbage_collect(), - temp_update_loop(Group2, get_notify_pids(10000)). + temp_update_loop(DbName, Group2, get_notify_pids(10000)). reset_group(#group{views=Views}=Group) -> @@ -355,21 +356,21 @@ start_update_loop(RootDir, DbName, GroupId) -> start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> {Db, Group} = - case (catch couch_server:open(DbName)) of + case (catch couch_db:open(DbName, [])) of {ok, Db0} -> case (catch couch_db:open_doc(Db0, GroupId)) of {ok, Doc} -> {Db0, design_doc_to_view_group(Doc)}; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end; - Else -> - delete_index_file(RootDir, DbName, GroupId), - exit(Else) - end, - FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", - Group2 = + Else -> + delete_index_file(RootDir, DbName, GroupId), + exit(Else) + end; + Else -> + delete_index_file(RootDir, DbName, GroupId), + exit(Else) + end, + FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view", + Group2 = case couch_file:open(FileName) of {ok, Fd} -> Sig = Group#group.sig, @@ -386,7 +387,8 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) -> Error -> throw(Error) end end, - + couch_db:monitor(Db), + couch_db:close(Db), update_loop(RootDir, DbName, GroupId, Group2, NotifyPids). reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> @@ -396,14 +398,22 @@ reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> init_group(Db, Fd, reset_group(Group), nil). update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) -> - try update_group(Group) of - {ok, Group2} -> + {ok, Db}= couch_db:open(DbName, []), + Result = + try + update_group(Group#group{db=Db}) + catch + throw: restart -> restart + after + couch_db:close(Db) + end, + case Result of + {ok, Group2} -> HeaderData = {Sig, get_index_header_data(Group2)}, ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData), [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids], garbage_collect(), - update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)) - catch + update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000)); restart -> couch_file:close(Group#group.fd), start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids()) @@ -414,20 +424,23 @@ get_notify_pids(Wait) -> receive {Pid, get_updated} -> [Pid | get_notify_pids()]; + {'DOWN', _MonitorRef, _Type, _Pid, _Info} -> + ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []), + exit(normal); Else -> ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]), exit({error, Else}) after Wait -> exit(wait_timeout) - end. + end. % then keep getting all available and return. get_notify_pids() -> receive {Pid, get_updated} -> [Pid | get_notify_pids()] - after 0 -> - [] - end. + after 0 -> + [] + end. update_group(#group{db=Db,current_seq=CurrentSeq, views=Views}=Group) -> ViewEmptyKVs = [{View, []} || View <- Views], -- cgit v1.2.3