summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2008-08-05 01:43:40 +0000
committerDamien F. Katz <damien@apache.org>2008-08-05 01:43:40 +0000
commit88ec14c220592c8c0db7869c9961423e9ee97e7c (patch)
tree67974f234e4a0201302506e3b7c56a73cf909376 /src
parentb218a0e7d425f7b3660433a17c6558f676524730 (diff)
Added concurrent open db limit and a LRU cache for closing old databases when limit reached (configurable via MaxDbsOpen var in couch.ini). Refactored db update code in couch_db.erl into couch_db_updater.erl.
git-svn-id: https://svn.apache.org/repos/asf/incubator/couchdb/trunk@682560 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/couch_db.erl681
-rw-r--r--src/couchdb/couch_db.hrl37
-rw-r--r--src/couchdb/couch_db_updater.erl499
-rw-r--r--src/couchdb/couch_file.erl74
-rw-r--r--src/couchdb/couch_httpd.erl11
-rw-r--r--src/couchdb/couch_rep.erl20
-rw-r--r--src/couchdb/couch_server.erl252
-rw-r--r--src/couchdb/couch_server_sup.erl5
-rw-r--r--src/couchdb/couch_view.erl99
9 files changed, 910 insertions, 768 deletions
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 2f5df3b5..098f3de7 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -13,63 +13,24 @@
-module(couch_db).
-behaviour(gen_server).
--export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]).
--export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
--export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]).
--export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
+-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]).
+-export([open_ref_counted/2,num_refs/1,monitor/1]).
+-export([save_docs/3,update_doc/3,update_docs/2,update_docs/3,delete_doc/3]).
+-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
+-export([get_missing_revs/2]).
+-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1]).
--export([start_update_loop/2]).
+-export([start_link/3]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([start_copy_compact_int/2]).
--export([btree_by_id_split/1,
- btree_by_id_join/2,
- btree_by_id_reduce/2,
- btree_by_seq_split/1,
- btree_by_seq_join/2,
- btree_by_seq_reduce/2]).
-include("couch_db.hrl").
--record(db_header,
- {write_version = 0,
- update_seq = 0,
- summary_stream_state = nil,
- fulldocinfo_by_id_btree_state = nil,
- docinfo_by_seq_btree_state = nil,
- local_docs_btree_state = nil,
- doc_count=0,
- doc_del_count=0
- }).
-
--record(db,
- {main_pid=nil,
- update_pid=nil,
- compactor_pid=nil,
- fd,
- header = #db_header{},
- summary_stream,
- fulldocinfo_by_id_btree,
- docinfo_by_seq_btree,
- local_docs_btree,
- update_seq,
- doc_count,
- doc_del_count,
- name,
- filepath
- }).
-
-% small value used in revision trees to indicate the revision isn't stored
--define(REV_MISSING, []).
-
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
-
start_link(DbName, Filepath, Options) ->
catch start_link0(DbName, Filepath, Options).
start_link0(DbName, Filepath, Options) ->
- % first delete the old file previous compaction
Fd =
case couch_file:open(Filepath, Options) of
{ok, Fd0} ->
@@ -105,33 +66,38 @@ start_link0(DbName, Filepath, Options) ->
end,
StartResult.
-%%% Interface functions %%%
-create(Filepath, Options) ->
- create(Filepath, Filepath, Options).
+create(DbName, Options) ->
+ couch_server:create(DbName, Options).
-create(DbName, Filepath, Options) when is_list(Options) ->
- start_link(DbName, Filepath, [create | Options]).
+open(DbName, Options) ->
+ couch_server:open(DbName, Options).
-open(DbName, Filepath) ->
- start_link(DbName, Filepath, []).
+close(#db{fd=Fd}) ->
+ couch_file:drop_ref(Fd).
+open_ref_counted(MainPid, OpeningPid) ->
+ gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}).
-% Compaction still needs work. Right now readers and writers can get an error
-% file compaction changeover. This doesn't need to be the case.
-start_compact(MainPid) ->
- gen_server:cast(MainPid, start_compact).
+num_refs(MainPid) ->
+ gen_server:call(MainPid, num_refs).
-delete_doc(MainPid, Id, Revisions) ->
+monitor(#db{main_pid=MainPid}) ->
+ erlang:monitor(process, MainPid).
+
+start_compact(#db{update_pid=Pid}) ->
+ gen_server:cast(Pid, start_compact).
+
+delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
- {ok, [Result]} = update_docs(MainPid, DeletedDocs, []),
+ {ok, [Result]} = update_docs(Db, DeletedDocs, []),
{ok, Result}.
-open_doc(MainPid, IdOrDocInfo) ->
- open_doc(MainPid, IdOrDocInfo, []).
+open_doc(Db, IdOrDocInfo) ->
+ open_doc(Db, IdOrDocInfo, []).
-open_doc(MainPid, Id, Options) ->
- case open_doc_int(get_db(MainPid), Id, Options) of
+open_doc(Db, Id, Options) ->
+ case open_doc_int(Db, Id, Options) of
{ok, #doc{deleted=true}=Doc} ->
case lists:member(deleted, Options) of
true ->
@@ -143,13 +109,13 @@ open_doc(MainPid, Id, Options) ->
Else
end.
-open_doc_revs(MainPid, Id, Revs, Options) ->
- [Result] = open_doc_revs_int(get_db(MainPid), [{Id, Revs}], Options),
+open_doc_revs(Db, Id, Revs, Options) ->
+ [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options),
Result.
-get_missing_revs(MainPid, IdRevsList) ->
+get_missing_revs(Db, IdRevsList) ->
Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
- FullDocInfoResults = get_full_doc_infos(MainPid, Ids),
+ FullDocInfoResults = get_full_doc_infos(Db, Ids),
Results = lists:zipwith(
fun({Id, Revs}, FullDocInfoResult) ->
case FullDocInfoResult of
@@ -177,18 +143,12 @@ get_full_doc_info(Db, Id) ->
[Result] = get_full_doc_infos(Db, [Id]),
Result.
-
-get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) ->
- get_full_doc_infos(get_db(MainPid), Ids);
-get_full_doc_infos(#db{}=Db, Ids) ->
+get_full_doc_infos(Db, Ids) ->
couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
-increment_update_seq(MainPid) ->
- gen_server:call(MainPid, increment_update_seq).
+increment_update_seq(#db{update_pid=UpdatePid}) ->
+ gen_server:call(UpdatePid, increment_update_seq).
-
-get_db_info(MainPid) when is_pid(MainPid) ->
- get_db_info(get_db(MainPid));
get_db_info(Db) ->
#db{fd=Fd,
compactor_pid=Compactor,
@@ -205,12 +165,12 @@ get_db_info(Db) ->
],
{ok, InfoList}.
-update_doc(MainPid, Doc, Options) ->
- {ok, [NewRev]} = update_docs(MainPid, [Doc], Options),
+update_doc(Db, Doc, Options) ->
+ {ok, [NewRev]} = update_docs(Db, [Doc], Options),
{ok, NewRev}.
-update_docs(MainPid, Docs) ->
- update_docs(MainPid, Docs, []).
+update_docs(Db, Docs) ->
+ update_docs(Db, Docs, []).
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
@@ -263,7 +223,7 @@ prepare_doc_for_new_edit(Db, #doc{id=Id,revs=[NewRev|PrevRevs]}=Doc, OldFullDocI
end
end.
-update_docs(MainPid, Docs, Options) ->
+update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
% go ahead and generate the new revision ids for the documents.
Docs2 = lists:map(
fun(#doc{id=Id,revs=Revs}=Doc) ->
@@ -278,7 +238,6 @@ update_docs(MainPid, Docs, Options) ->
NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
DocBuckets = group_alike_docs(Docs2),
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
- Db = get_db(MainPid),
% lookup the doc by id and get the most recent
@@ -298,13 +257,14 @@ update_docs(MainPid, Docs, Options) ->
% flush unwritten binaries to disk.
DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
- case gen_server:call(MainPid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of
ok -> {ok, NewRevs};
retry ->
- Db2 = get_db(MainPid),
+ Db2 = open_ref_counted(Db#db.main_pid, self()),
DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
% We only retry once
- case gen_server:call(MainPid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
+ ok = close(Db2),
+ case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
ok -> {ok, NewRevs};
Else -> throw(Else)
end;
@@ -312,15 +272,11 @@ update_docs(MainPid, Docs, Options) ->
throw(Else)
end.
-save_docs(MainPid, Docs) ->
- save_docs(MainPid, Docs, []).
-
-save_docs(MainPid, Docs, Options) ->
+save_docs(#db{update_pid=UpdatePid, fd=Fd}, Docs, Options) ->
% flush unwritten binaries to disk.
- Db = get_db(MainPid),
DocBuckets = group_alike_docs(Docs),
- DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
- ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}, infinity).
+ DocBuckets2 = [[doc_flush_binaries(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ ok = gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity).
doc_flush_binaries(Doc, Fd) ->
@@ -379,125 +335,51 @@ doc_flush_binaries(Doc, Fd) ->
Doc#doc{attachments = NewBins}.
enum_docs_since_reduce_to_count(Reds) ->
- couch_btree:final_reduce(fun btree_by_seq_reduce/2, Reds).
+ couch_btree:final_reduce(fun couch_db_updater:btree_by_seq_reduce/2, Reds).
enum_docs_reduce_to_count(Reds) ->
- couch_btree:final_reduce(fun btree_by_id_reduce/2, Reds).
+ couch_btree:final_reduce(fun couch_db_updater:btree_by_id_reduce/2, Reds).
-enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) ->
- Db = get_db(MainPid),
+enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) ->
couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
-enum_docs_since(MainPid, SinceSeq, InFun, Acc) ->
- enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc).
+enum_docs_since(Db, SinceSeq, InFun, Acc) ->
+ enum_docs_since(Db, SinceSeq, fwd, InFun, Acc).
-enum_docs(MainPid, StartId, Direction, InFun, InAcc) ->
- Db = get_db(MainPid),
+enum_docs(Db, StartId, Direction, InFun, InAcc) ->
couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc).
-enum_docs(MainPid, StartId, InFun, Ctx) ->
- enum_docs(MainPid, StartId, fwd, InFun, Ctx).
+enum_docs(Db, StartId, InFun, Ctx) ->
+ enum_docs(Db, StartId, fwd, InFun, Ctx).
% server functions
-init(InitArgs) ->
- spawn_link(couch_db, start_update_loop, [self(), InitArgs]),
- receive
- {initialized, Db} ->
- {ok, Db}
- end.
-
-btree_by_seq_split(DocInfo) ->
- #doc_info{
- id = Id,
- rev = Rev,
- update_seq = Seq,
- summary_pointer = Sp,
- conflict_revs = Conflicts,
- deleted_conflict_revs = DelConflicts,
- deleted = Deleted} = DocInfo,
- {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}.
-
-btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
- #doc_info{
- id = Id,
- rev = Rev,
- update_seq = Seq,
- summary_pointer = Sp,
- conflict_revs = Conflicts,
- deleted_conflict_revs = DelConflicts,
- deleted = Deleted}.
-
-btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
- deleted=Deleted, rev_tree=Tree}) ->
- {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}.
-
-btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
- #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
-
-
-
-btree_by_id_reduce(reduce, FullDocInfos) ->
- % count the number of deleted documents
- length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
-btree_by_id_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-btree_by_seq_reduce(reduce, DocInfos) ->
- % count the number of deleted documents
- length(DocInfos);
-btree_by_seq_reduce(rereduce, Reds) ->
- lists:sum(Reds).
-
-init_db(DbName, Filepath, Fd, Header) ->
- {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
- ok = couch_stream:set_min_buffer(SummaryStream, 10000),
- {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
- [{split, fun btree_by_id_split/1},
- {join, fun btree_by_id_join/2},
- {reduce, fun btree_by_id_reduce/2}]),
- {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
- [{split, fun btree_by_seq_split/1},
- {join, fun btree_by_seq_join/2},
- {reduce, fun btree_by_seq_reduce/2}]),
- {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
-
- #db{
- update_pid=self(),
- fd=Fd,
- header=Header,
- summary_stream = SummaryStream,
- fulldocinfo_by_id_btree = IdBtree,
- docinfo_by_seq_btree = SeqBtree,
- local_docs_btree = LocalDocsBtree,
- update_seq = Header#db_header.update_seq,
- doc_count = Header#db_header.doc_count,
- doc_del_count = Header#db_header.doc_del_count,
- name = DbName,
- filepath=Filepath }.
+init({DbName, Filepath, Fd, Options}) ->
+ {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
+ ok = couch_file:add_ref(Fd),
+ gen_server:call(UpdaterPid, get_db).
-close_db(#db{fd=Fd,summary_stream=Ss}) ->
- couch_file:close(Fd),
- couch_stream:close(Ss).
-
terminate(_Reason, Db) ->
exit(Db#db.update_pid, kill).
-handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
- Updater ! {From, update_docs, DocActions, Options},
- {noreply, Db};
-handle_call(increment_update_seq, From, #db{update_pid=Updater}=Db) ->
- Updater ! {From, increment_update_seq},
- {noreply, Db};
-handle_call(get_db, _From, Db) ->
+handle_call({open_ref_counted_instance, OpenerPid}, _From, #db{fd=Fd}=Db) ->
+ ok = couch_file:add_ref(Fd, OpenerPid),
{reply, {ok, Db}, Db};
-handle_call({db_updated, NewDb}, _From, _OldDb) ->
+handle_call(num_refs, _From, #db{fd=Fd}=Db) ->
+ {reply, couch_file:num_refs(Fd) - 1, Db};
+handle_call({db_updated, #db{fd=NewFd}=NewDb}, _From, #db{fd=OldFd}) ->
+ case NewFd == OldFd of
+ true -> ok;
+ false ->
+ couch_file:add_ref(NewFd),
+ couch_file:drop_ref(OldFd)
+ end,
{reply, ok, NewDb}.
-handle_cast(start_compact, #db{update_pid=Updater}=Db) ->
- Updater ! compact,
- {noreply, Db}.
+handle_cast(Msg, Db) ->
+ ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]),
+ exit({error, Msg}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -508,114 +390,6 @@ handle_info(Msg, Db) ->
%%% Internal function %%%
-
-start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) ->
- link(Fd),
-
- case lists:member(create, Options) of
- true ->
- % create a new header and writes it to the file
- Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
- % delete any old compaction files that might be hanging around
- file:delete(Filepath ++ ".compact"),
- file:delete(Filepath ++ ".old");
- false ->
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
- end,
-
- Db = init_db(DbName, Filepath, Fd, Header),
- Db2 = Db#db{main_pid=MainPid},
- MainPid ! {initialized, Db2},
- update_loop(Db2).
-
-update_loop(#db{fd=Fd,name=Name,
- filepath=Filepath,
- main_pid=MainPid,
- update_seq=UpdateSeq}=Db) ->
- receive
- {OrigFrom, update_docs, DocActions, Options} ->
- case (catch update_docs_int(Db, DocActions, Options)) of
- {ok, Db2} ->
- ok = gen_server:call(MainPid, {db_updated, Db2}),
- gen_server:reply(OrigFrom, ok),
- couch_db_update_notifier:notify({updated, Name}),
- update_loop(Db2);
- retry ->
- gen_server:reply(OrigFrom, retry),
- update_loop(Db);
- conflict ->
- gen_server:reply(OrigFrom, conflict),
- update_loop(Db);
- Error ->
- exit(Error) % we crashed
- end;
- compact ->
- case Db#db.compactor_pid of
- nil ->
- ?LOG_INFO("Starting compaction for db \"~s\"", [Name]),
- Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]),
- Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(MainPid, {db_updated, Db2}),
- update_loop(Db2);
- _ ->
- update_loop(Db) % already started
- end;
- {compact_done, CompactFilepath} ->
- {ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
- #db{update_seq=NewSeq}= NewDb =
- init_db(Name, CompactFilepath, NewFd, NewHeader),
- case Db#db.update_seq == NewSeq of
- true ->
- NewDb2 = commit_data(
- NewDb#db{
- main_pid = Db#db.main_pid,
- doc_count = Db#db.doc_count,
- doc_del_count = Db#db.doc_del_count,
- filepath = Filepath}),
-
- ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
- ok = file:rename(Filepath, Filepath ++ ".old"),
- ok = file:rename(CompactFilepath, Filepath),
-
- couch_stream:close(Db#db.summary_stream),
- % close file handle async.
- % wait 5 secs before closing, allowing readers to finish
- unlink(Fd),
- spawn_link(fun() ->
- receive after 5000 -> ok end,
- couch_file:close(Fd),
- file:delete(Filepath ++ ".old")
- end),
-
- ok = gen_server:call(MainPid, {db_updated, NewDb2}),
- ?LOG_INFO("Compaction for db ~p completed.", [Name]),
- update_loop(NewDb2#db{compactor_pid=nil});
- false ->
- ?LOG_INFO("Compaction file still behind main file "
- "(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
- Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]),
- Db2 = Db#db{compactor_pid=Pid},
- couch_file:close(NewFd),
- update_loop(Db2)
- end;
- {OrigFrom, increment_update_seq} ->
- Db2 = commit_data(Db#db{update_seq=UpdateSeq+1}),
- ok = gen_server:call(MainPid, {db_updated, Db2}),
- gen_server:reply(OrigFrom, {ok, UpdateSeq+1}),
- couch_db_update_notifier:notify({updated, Name}),
- update_loop(Db2);
- Else ->
- ?LOG_ERROR("Unknown message received in db ~s:~p", [Db#db.name, Else]),
- exit({error, Else})
- end.
-
-get_db(MainPid) ->
- {ok, Db} = gen_server:call(MainPid, get_db),
- Db.
-
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
LookupResults = get_full_doc_infos(Db, Ids),
@@ -711,16 +485,6 @@ doc_meta_info(DocInfo, RevTree, Options) ->
end
end.
-% rev tree functions
-
-doc_to_tree(Doc) ->
- doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
-
-doc_to_tree(Doc, [RevId]) ->
- [{RevId, Doc, []}];
-doc_to_tree(Doc, [RevId | Rest]) ->
- [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
-
make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
{BodyData, BinValues} =
case SummaryPointer of
@@ -737,303 +501,6 @@ make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
attachments = BinValues,
deleted = Deleted
}.
-
-flush_trees(_Db, [], AccFlushedTrees) ->
- {ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
- #full_doc_info{rev_tree=Unflushed} = InfoUnflushed,
- Flushed = couch_key_tree:map(
- fun(_Rev, Value) ->
- case Value of
- #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
- % this node value is actually an unwritten document summary,
- % write to disk.
- % make sure the Fd in the written bins is the same Fd we are.
- Bins =
- case Atts of
- [] -> [];
- [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
- % convert bins, removing the FD.
- % All bins should have been flushed to disk already.
- [{BinName, {BinType, BinSp, BinLen}}
- || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
- <- Atts];
- _ ->
- % BinFd must not equal our Fd. This can happen when a database
- % is being updated during a compaction
- ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
- throw(retry)
- end,
- {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
- {IsDeleted, NewSummaryPointer};
- _ ->
- Value
- end
- end, Unflushed),
- flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-
-merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) ->
- {ok, lists:reverse(AccNewInfos), AccSeq};
-merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
- [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) ->
- #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo,
- UpdatesRevTree = lists:foldl(
- fun(NewDoc, AccTree) ->
- couch_key_tree:merge(AccTree, doc_to_tree(NewDoc))
- end,
- [], NewDocs),
- NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
- if NewRevTree == OldTree ->
- % nothing changed
- merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq);
- true ->
- if NoConflicts andalso OldTree /= [] ->
- OldConflicts = couch_key_tree:count_leafs(OldTree),
- NewConflicts = couch_key_tree:count_leafs(NewRevTree),
- if NewConflicts > OldConflicts ->
- throw(conflict);
- true -> ok
- end;
- true -> ok
- end,
- NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
- merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo,
- [NewInfo|AccNewInfos],AccSeq+1)
- end.
-
-new_index_entries([], DocCount, DelCount, AccById, AccBySeq) ->
- {ok, DocCount, DelCount, AccById, AccBySeq};
-new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) ->
- #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
- {DocCount2, DelCount2} =
- if Deleted -> {DocCount, DelCount + 1};
- true -> {DocCount + 1, DelCount}
- end,
- new_index_entries(RestInfos, DocCount2, DelCount2,
- [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
- [DocInfo|AccBySeq]).
-
-update_docs_int(Db, DocsList, Options) ->
- #db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree,
- docinfo_by_seq_btree = DocInfoBySeqBTree,
- update_seq = LastSeq,
- doc_count = FullDocCount,
- doc_del_count = FullDelCount
- } = Db,
-
- % separate out the NonRep documents from the rest of the documents
- {DocsList2, NonRepDocs} = lists:foldl(
- fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
- case Id of
- ?LOCAL_DOC_PREFIX ++ _ when Rest==[] ->
- % when saving NR (non rep) documents, you can only save a single rev
- {DocsListAcc, [Doc | NonRepDocsAcc]};
- Id->
- {[Docs | DocsListAcc], NonRepDocsAcc}
- end
- end, {[], []}, DocsList),
-
- Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
-
- % lookup up the existing documents, if they exist.
- OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
- OldDocInfos = lists:zipwith(
- fun(_Id, {ok, FullDocInfo}) ->
- FullDocInfo;
- (Id, not_found) ->
- #full_doc_info{id=Id}
- end,
- Ids, OldDocLookups),
-
- {OldCount, OldDelCount} = lists:foldl(
- fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
- case couch_doc:to_doc_info(FullDocInfo) of
- #doc_info{deleted=false} ->
- {OldCountAcc + 1, OldDelCountAcc};
- _ ->
- {OldCountAcc, OldDelCountAcc + 1}
- end;
- (not_found, Acc) ->
- Acc
- end, {0, 0}, OldDocLookups),
-
- % Merge the new docs into the revision trees.
- NoConflicts = lists:member(new_edits, Options),
- {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
-
- RemoveSeqs =
- [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
-
- % All regular documents are now ready to write.
-
- % Try to write the local documents first, a conflict might be generated
- {ok, Db2} = update_local_docs(Db, NonRepDocs),
-
- % Write out the documents summaries (they are stored in the nodes of the rev trees)
- {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
-
- {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
- new_index_entries(FlushedDocInfos, 0, 0, [], []),
-
- % and the indexes to the documents
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
-
- Db3 = Db2#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree2,
- docinfo_by_seq_btree = DocInfoBySeqBTree2,
- update_seq = NewSeq,
- doc_count = FullDocCount + NewDocsCount - OldCount,
- doc_del_count = FullDelCount + NewDelCount - OldDelCount},
-
- case lists:member(delay_commit, Options) of
- true ->
- {ok, Db3};
- false ->
- {ok, commit_data(Db3)}
- end.
-
-update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
- Ids = [Id || #doc{id=Id} <- Docs],
- OldDocLookups = couch_btree:lookup(Btree, Ids),
- BtreeEntries = lists:zipwith(
- fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
- NewRev =
- case Revs of
- [] -> 0;
- [RevStr|_] -> list_to_integer(RevStr)
- end,
- OldRev =
- case OldDocLookup of
- {ok, {_, {OldRev0, _}}} -> OldRev0;
- not_found -> 0
- end,
- case OldRev + 1 == NewRev of
- true ->
- case Delete of
- false -> {update, {Id, {NewRev, Body}}};
- true -> {remove, Id}
- end;
- false ->
- throw(conflict)
- end
-
- end, Docs, OldDocLookups),
-
- BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
- BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
-
- {ok, Btree2} =
- couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
- {ok, Db#db{local_docs_btree = Btree2}}.
-
-
-
-commit_data(#db{fd=Fd, header=Header} = Db) ->
- Header2 = Header#db_header{
- update_seq = Db#db.update_seq,
- summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
- docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
- fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
- local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
- doc_count = Db#db.doc_count,
- doc_del_count = Db#db.doc_del_count
- },
- if Header == Header2 ->
- Db; % unchanged. nothing to do
- true ->
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
- Db#db{header = Header2}
- end.
-
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
- {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
- % copy the bin values
- NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
- {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
- {Name, {Type, NewBinSp, Len}}
- end, BinInfos),
- % now write the document summary
- {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
- Sp.
-
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
- [];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
- % This is a leaf node, copy it over
- NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
- [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
- % inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
-
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
- Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
- LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
- NewFullDocInfos = lists:map(
- fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
- end, LookupResults),
- NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos],
- {ok, DocInfoBTree} =
- couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []),
- {ok, FullDocInfoBTree} =
- couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
- NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}.
-
-
-
-copy_compact_docs(Db, NewDb) ->
- EnumBySeqFun =
- fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
- case couch_util:should_flush() of
- true ->
- NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
- {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
- false ->
- {ok, {AccNewDb, [DocInfo | AccUncopied]}}
- end
- end,
- {ok, {NewDb2, Uncopied}} =
- couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}),
-
- case Uncopied of
- [#doc_info{update_seq=LastSeq} | _] ->
- commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq},
- lists:reverse(Uncopied)));
- [] ->
- NewDb2
- end.
-
-start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
- CompactFile = Filepath ++ ".compact",
- ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
- case couch_file:open(CompactFile) of
- {ok, Fd} ->
- ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
- {error, enoent} -> %
- {ok, Fd} = couch_file:open(CompactFile, [create]),
- Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header)
- end,
- NewDb = init_db(Name, CompactFile, Fd, Header),
- NewDb2 = copy_compact_docs(Db, NewDb),
- NewDb3 =
- case CopyLocal of
- true ->
- % suck up all the local docs into memory and write them to the new db
- {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
- fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
- commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
- _ ->
- NewDb2
- end,
- close_db(NewDb3),
- Db#db.update_pid ! {compact_done, CompactFile}.
\ No newline at end of file
diff --git a/src/couchdb/couch_db.hrl b/src/couchdb/couch_db.hrl
index 9ca5d815..0c274396 100644
--- a/src/couchdb/couch_db.hrl
+++ b/src/couchdb/couch_db.hrl
@@ -69,3 +69,40 @@
% couch_db:open_doc(Db, Id, Options).
meta = []
}).
+
+
+
+
+
+-record(db_header,
+ {write_version = 0,
+ update_seq = 0,
+ summary_stream_state = nil,
+ fulldocinfo_by_id_btree_state = nil,
+ docinfo_by_seq_btree_state = nil,
+ local_docs_btree_state = nil,
+ doc_count=0,
+ doc_del_count=0
+ }).
+
+-record(db,
+ {main_pid=nil,
+ update_pid=nil,
+ compactor_pid=nil,
+ fd,
+ header = #db_header{},
+ summary_stream,
+ fulldocinfo_by_id_btree,
+ docinfo_by_seq_btree,
+ local_docs_btree,
+ update_seq,
+ doc_count,
+ doc_del_count,
+ name,
+ filepath
+ }).
+
+
+
+% small value used in revision trees to indicate the revision isn't stored
+-define(REV_MISSING, []).
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
new file mode 100644
index 00000000..f0673af9
--- /dev/null
+++ b/src/couchdb/couch_db_updater.erl
@@ -0,0 +1,499 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater).
+-behaviour(gen_server).
+
+-export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(HEADER_SIG, <<$g, $m, $k, 0>>).
+
+init({MainPid, DbName, Filepath, Fd, Options}) ->
+ link(Fd),
+ case lists:member(create, Options) of
+ true ->
+ % create a new header and writes it to the file
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+ % delete any old compaction files that might be hanging around
+ file:delete(Filepath ++ ".compact"),
+ file:delete(Filepath ++ ".old");
+ false ->
+ {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+ end,
+
+ Db = init_db(DbName, Filepath, Fd, Header),
+ {ok, Db#db{main_pid=MainPid}}.
+
+terminate(_Reason, Db) ->
+ close_db(Db).
+
+handle_call(get_db, _From, Db) ->
+ {reply, {ok, Db}, Db};
+handle_call({update_docs, DocActions, Options}, _From, Db) ->
+ try update_docs_int(Db, DocActions, Options) of
+ {ok, Db2} ->
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ couch_db_update_notifier:notify({updated, Db2#db.name}),
+ {reply, ok, Db2}
+ catch
+ throw: retry ->
+ {reply, retry, Db};
+ throw: conflict ->
+ {reply, conflict, Db}
+ end;
+handle_call(increment_update_seq, _From, Db) ->
+ Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
+ ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ couch_db_update_notifier:notify({updated, Db#db.name}),
+ {reply, {ok, Db2#db.update_seq}, Db2}.
+
+
+handle_cast(start_compact, Db) ->
+ case Db#db.compactor_pid of
+ nil ->
+ ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
+ Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end),
+ Db2 = Db#db{compactor_pid=Pid},
+ ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ {noreply, Db2};
+ _ ->
+ % compact currently running, this is a no-op
+ {noreply, Db}
+ end;
+handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
+ {ok, NewFd} = couch_file:open(CompactFilepath),
+ {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+ #db{update_seq=NewSeq} = NewDb =
+ init_db(Db#db.name, CompactFilepath, NewFd, NewHeader),
+ case Db#db.update_seq == NewSeq of
+ true ->
+ NewDb2 = commit_data(
+ NewDb#db{
+ main_pid = Db#db.main_pid,
+ doc_count = Db#db.doc_count,
+ doc_del_count = Db#db.doc_del_count,
+ filepath = Filepath}),
+
+ ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
+ file:delete(Filepath),
+ ok = file:rename(CompactFilepath, Filepath),
+
+ couch_stream:close(Db#db.summary_stream),
+ couch_file:close_maybe(Db#db.fd),
+ file:delete(Filepath ++ ".old"),
+
+ ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
+ ?LOG_INFO("Compaction for db ~p completed.", [Db#db.name]),
+ {noreply, NewDb2#db{compactor_pid=nil}};
+ false ->
+ ?LOG_INFO("Compaction file still behind main file "
+ "(update seq=~p. compact update seq=~p). Retrying.",
+ [Db#db.update_seq, NewSeq]),
+ Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end),
+ Db2 = Db#db{compactor_pid=Pid},
+ couch_file:close(NewFd),
+ {noreply, Db2}
+ end.
+
+handle_info(Msg, Db) ->
+ ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
+ exit({error, Msg}).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+btree_by_seq_split(DocInfo) ->
+ #doc_info{
+ id = Id,
+ rev = Rev,
+ update_seq = Seq,
+ summary_pointer = Sp,
+ conflict_revs = Conflicts,
+ deleted_conflict_revs = DelConflicts,
+ deleted = Deleted} = DocInfo,
+ {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}.
+
+btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
+ #doc_info{
+ id = Id,
+ rev = Rev,
+ update_seq = Seq,
+ summary_pointer = Sp,
+ conflict_revs = Conflicts,
+ deleted_conflict_revs = DelConflicts,
+ deleted = Deleted}.
+
+btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
+ deleted=Deleted, rev_tree=Tree}) ->
+ {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}.
+
+btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
+ #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
+
+
+
+btree_by_id_reduce(reduce, FullDocInfos) ->
+ % count the number of deleted documents
+ length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
+btree_by_id_reduce(rereduce, Reds) ->
+ lists:sum(Reds).
+
+btree_by_seq_reduce(reduce, DocInfos) ->
+ % count the number of deleted documents
+ length(DocInfos);
+btree_by_seq_reduce(rereduce, Reds) ->
+ lists:sum(Reds).
+
+init_db(DbName, Filepath, Fd, Header) ->
+ {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
+ ok = couch_stream:set_min_buffer(SummaryStream, 10000),
+ {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
+ [{split, fun(X) -> btree_by_id_split(X) end},
+ {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
+ {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]),
+ {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
+ [{split, fun(X) -> btree_by_seq_split(X) end},
+ {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
+ {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]),
+ {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
+
+ #db{
+ update_pid=self(),
+ fd=Fd,
+ header=Header,
+ summary_stream = SummaryStream,
+ fulldocinfo_by_id_btree = IdBtree,
+ docinfo_by_seq_btree = SeqBtree,
+ local_docs_btree = LocalDocsBtree,
+ update_seq = Header#db_header.update_seq,
+ doc_count = Header#db_header.doc_count,
+ doc_del_count = Header#db_header.doc_del_count,
+ name = DbName,
+ filepath=Filepath }.
+
+close_db(#db{fd=Fd,summary_stream=Ss}) ->
+ couch_file:close(Fd),
+ couch_stream:close(Ss).
+
+% rev tree functions
+
+doc_to_tree(Doc) ->
+ doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
+
+doc_to_tree(Doc, [RevId]) ->
+ [{RevId, Doc, []}];
+doc_to_tree(Doc, [RevId | Rest]) ->
+ [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
+
+flush_trees(_Db, [], AccFlushedTrees) ->
+ {ok, lists:reverse(AccFlushedTrees)};
+flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
+ #full_doc_info{rev_tree=Unflushed} = InfoUnflushed,
+ Flushed = couch_key_tree:map(
+ fun(_Rev, Value) ->
+ case Value of
+ #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
+ % this node value is actually an unwritten document summary,
+ % write to disk.
+ % make sure the Fd in the written bins is the same Fd we are.
+ Bins =
+ case Atts of
+ [] -> [];
+ [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
+ % convert bins, removing the FD.
+ % All bins should have been flushed to disk already.
+ [{BinName, {BinType, BinSp, BinLen}}
+ || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+ <- Atts];
+ _ ->
+ % BinFd must not equal our Fd. This can happen when a database
+ % is being switched out during a compaction
+ ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
+ throw(retry)
+ end,
+ {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {IsDeleted, NewSummaryPointer};
+ _ ->
+ Value
+ end
+ end, Unflushed),
+ flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
+
+merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) ->
+ {ok, lists:reverse(AccNewInfos), AccSeq};
+merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
+ [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) ->
+ #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo,
+ UpdatesRevTree = lists:foldl(
+ fun(NewDoc, AccTree) ->
+ couch_key_tree:merge(AccTree, doc_to_tree(NewDoc))
+ end,
+ [], NewDocs),
+ NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
+ if NewRevTree == OldTree ->
+ % nothing changed
+ merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq);
+ true ->
+ if NoConflicts andalso OldTree /= [] ->
+ OldConflicts = couch_key_tree:count_leafs(OldTree),
+ NewConflicts = couch_key_tree:count_leafs(NewRevTree),
+ if NewConflicts > OldConflicts ->
+ throw(conflict);
+ true -> ok
+ end;
+ true -> ok
+ end,
+ NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
+ merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo,
+ [NewInfo|AccNewInfos],AccSeq+1)
+ end.
+
+new_index_entries([], DocCount, DelCount, AccById, AccBySeq) ->
+ {ok, DocCount, DelCount, AccById, AccBySeq};
+new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) ->
+ #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
+ {DocCount2, DelCount2} =
+ if Deleted -> {DocCount, DelCount + 1};
+ true -> {DocCount + 1, DelCount}
+ end,
+ new_index_entries(RestInfos, DocCount2, DelCount2,
+ [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
+ [DocInfo|AccBySeq]).
+
+update_docs_int(Db, DocsList, Options) ->
+ #db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree,
+ docinfo_by_seq_btree = DocInfoBySeqBTree,
+ update_seq = LastSeq,
+ doc_count = FullDocCount,
+ doc_del_count = FullDelCount
+ } = Db,
+
+ % separate out the NonRep documents from the rest of the documents
+ {DocsList2, NonRepDocs} = lists:foldl(
+ fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
+ case Id of
+ ?LOCAL_DOC_PREFIX ++ _ when Rest==[] ->
+ % when saving NR (non rep) documents, you can only save a single rev
+ {DocsListAcc, [Doc | NonRepDocsAcc]};
+ Id->
+ {[Docs | DocsListAcc], NonRepDocsAcc}
+ end
+ end, {[], []}, DocsList),
+
+ Ids = [Id || [#doc{id=Id}|_] <- DocsList2],
+
+ % lookup up the existing documents, if they exist.
+ OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
+ OldDocInfos = lists:zipwith(
+ fun(_Id, {ok, FullDocInfo}) ->
+ FullDocInfo;
+ (Id, not_found) ->
+ #full_doc_info{id=Id}
+ end,
+ Ids, OldDocLookups),
+
+ {OldCount, OldDelCount} = lists:foldl(
+ fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
+ case couch_doc:to_doc_info(FullDocInfo) of
+ #doc_info{deleted=false} ->
+ {OldCountAcc + 1, OldDelCountAcc};
+ _ ->
+ {OldCountAcc, OldDelCountAcc + 1}
+ end;
+ (not_found, Acc) ->
+ Acc
+ end, {0, 0}, OldDocLookups),
+
+ % Merge the new docs into the revision trees.
+ NoConflicts = lists:member(new_edits, Options),
+ {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
+
+ RemoveSeqs =
+ [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
+
+ % All regular documents are now ready to write.
+
+ % Try to write the local documents first, a conflict might be generated
+ {ok, Db2} = update_local_docs(Db, NonRepDocs),
+
+ % Write out the documents summaries (they are stored in the nodes of the rev trees)
+ {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
+
+ {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
+ new_index_entries(FlushedDocInfos, 0, 0, [], []),
+
+ % and the indexes to the documents
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
+
+ Db3 = Db2#db{
+ fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+ docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ update_seq = NewSeq,
+ doc_count = FullDocCount + NewDocsCount - OldCount,
+ doc_del_count = FullDelCount + NewDelCount - OldDelCount},
+
+ case lists:member(delay_commit, Options) of
+ true ->
+ {ok, Db3};
+ false ->
+ {ok, commit_data(Db3)}
+ end.
+
+update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+ Ids = [Id || #doc{id=Id} <- Docs],
+ OldDocLookups = couch_btree:lookup(Btree, Ids),
+ BtreeEntries = lists:zipwith(
+ fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
+ NewRev =
+ case Revs of
+ [] -> 0;
+ [RevStr|_] -> list_to_integer(RevStr)
+ end,
+ OldRev =
+ case OldDocLookup of
+ {ok, {_, {OldRev0, _}}} -> OldRev0;
+ not_found -> 0
+ end,
+ case OldRev + 1 == NewRev of
+ true ->
+ case Delete of
+ false -> {update, {Id, {NewRev, Body}}};
+ true -> {remove, Id}
+ end;
+ false ->
+ throw(conflict)
+ end
+
+ end, Docs, OldDocLookups),
+
+ BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+ BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
+
+ {ok, Btree2} =
+ couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
+
+ {ok, Db#db{local_docs_btree = Btree2}}.
+
+
+
+commit_data(#db{fd=Fd, header=Header} = Db) ->
+ Header2 = Header#db_header{
+ update_seq = Db#db.update_seq,
+ summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
+ docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
+ fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
+ local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+ doc_count = Db#db.doc_count,
+ doc_del_count = Db#db.doc_del_count
+ },
+ if Header == Header2 ->
+ Db; % unchanged. nothing to do
+ true ->
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
+ Db#db{header = Header2}
+ end.
+
+copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
+ {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+ % copy the bin values
+ NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
+ {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {Name, {Type, NewBinSp, Len}}
+ end, BinInfos),
+ % now write the document summary
+ {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
+ Sp.
+
+copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+ [];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
+ % This is a leaf node, copy it over
+ NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
+ [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+ % inner node, only copy info/data from leaf nodes
+ [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
+ Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+ LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+ NewFullDocInfos = lists:map(
+ fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
+ Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+ end, LookupResults),
+ NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos],
+ {ok, DocInfoBTree} =
+ couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []),
+ {ok, FullDocInfoBTree} =
+ couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+ NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}.
+
+
+
+copy_compact_docs(Db, NewDb) ->
+ EnumBySeqFun =
+ fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
+ case couch_util:should_flush() of
+ true ->
+ NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
+ {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
+ false ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied]}}
+ end
+ end,
+ {ok, {NewDb2, Uncopied}} =
+ couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}),
+
+ case Uncopied of
+ [#doc_info{update_seq=LastSeq} | _] ->
+ commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq},
+ lists:reverse(Uncopied)));
+ [] ->
+ NewDb2
+ end.
+
+start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
+ CompactFile = Filepath ++ ".compact",
+ ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
+ case couch_file:open(CompactFile) of
+ {ok, Fd} ->
+ ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
+ {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+ {error, enoent} ->
+ receive after 1000 -> ok end,
+ {ok, Fd} = couch_file:open(CompactFile, [create]),
+ Header = #db_header{},
+ ok = couch_file:write_header(Fd, ?HEADER_SIG, Header)
+ end,
+ NewDb = init_db(Name, CompactFile, Fd, Header),
+ NewDb2 = copy_compact_docs(Db, NewDb),
+ NewDb3 =
+ case CopyLocal of
+ true ->
+ % suck up all the local docs into memory and write them to the new db
+ {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
+ commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
+ _ ->
+ NewDb2
+ end,
+ close_db(NewDb3),
+
+ gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}). \ No newline at end of file
diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl
index b48c9bf3..c04ac33a 100644
--- a/src/couchdb/couch_file.erl
+++ b/src/couchdb/couch_file.erl
@@ -20,6 +20,7 @@
-export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
-export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+-export([close_maybe/1,drop_ref/1,drop_ref/2,add_ref/1,add_ref/2,num_refs/1]).
%%----------------------------------------------------------------------
%% Args: Valid Options are [create] and [create,overwrite].
@@ -164,7 +165,25 @@ sync(Fd) ->
%%----------------------------------------------------------------------
close(Fd) ->
gen_server:cast(Fd, close).
+
+close_maybe(Fd) ->
+ gen_server:cast(Fd, {close_maybe, self()}).
+
+drop_ref(Fd) ->
+ drop_ref(Fd, self()).
+
+drop_ref(Fd, Pid) ->
+ gen_server:cast(Fd, {drop_ref, Pid}).
+
+
+add_ref(Fd) ->
+ add_ref(Fd, self()).
+add_ref(Fd, Pid) ->
+ gen_server:call(Fd, {add_ref, Pid}).
+
+num_refs(Fd) ->
+ gen_server:call(Fd, num_refs).
write_header(Fd, Prefix, Data) ->
TermBin = term_to_binary(Data),
@@ -267,7 +286,7 @@ init_status_ok(ReturnPid, Fd) ->
init_status_error(ReturnPid, Error) ->
ReturnPid ! {self(), Error}, % signal back error status
- self() ! self_close, % tell ourself to close async
+ gen_server:cast(self(), close), % tell ourself to close async
{ok, nil}.
% server functions
@@ -342,16 +361,57 @@ handle_call({pread_bin, Pos}, _From, Fd) ->
{ok, <<TermLen:32>>}
= file:pread(Fd, Pos, 4),
{ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, Bin}, Fd}.
+ {reply, {ok, Bin}, Fd};
+handle_call({add_ref, Pid},_From, Fd) ->
+ undefined = put(Pid, erlang:monitor(process, Pid)),
+ {reply, ok, Fd};
+handle_call(num_refs, _From, Fd) ->
+ {monitors, Monitors} = process_info(self(), monitors),
+ {reply, length(Monitors), Fd}.
+
handle_cast(close, Fd) ->
- {stop,normal,Fd}. % causes terminate to be called
+ {stop,normal,Fd};
+handle_cast({close_maybe, Pid}, Fd) ->
+ catch unlink(Pid),
+ maybe_close_async(Fd);
+handle_cast({drop_ref, Pid}, Fd) ->
+ % don't check return of demonitor. The process could haved crashed causing
+ % the {'DOWN', ...} message to be sent and the process unmonitored.
+ erlang:demonitor(erase(Pid), [flush]),
+ maybe_close_async(Fd).
+
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-handle_info(self_close, State) ->
- {stop,normal,State};
-handle_info(_Info, State) ->
- {noreply, State}.
+handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) ->
+ MonitorRef = erase(Pid),
+ maybe_close_async(Fd);
+handle_info(Info, Fd) ->
+ exit({error, {Info, Fd}}).
+
+
+
+should_close(Fd) ->
+ case process_info(self(), links) of
+ {links, [Fd]} ->
+ % no linkers left (except our fd). What about monitors?
+ case process_info(self(), monitors) of
+ {monitors, []} ->
+ true;
+ _ ->
+ false
+ end;
+ {links, Links} when length(Links) > 1 ->
+ false
+ end.
+
+maybe_close_async(Fd) ->
+ case should_close(Fd) of
+ true ->
+ {stop,normal,Fd};
+ false ->
+ {noreply,Fd}
+ end.
diff --git a/src/couchdb/couch_httpd.erl b/src/couchdb/couch_httpd.erl
index 88271390..d1a4fa90 100644
--- a/src/couchdb/couch_httpd.erl
+++ b/src/couchdb/couch_httpd.erl
@@ -156,7 +156,8 @@ handle_db_request(Req, Method, {Path}) ->
handle_db_request(Req, 'PUT', {DbName, []}) ->
case couch_server:create(DbName, []) of
- {ok, _Db} ->
+ {ok, Db} ->
+ couch_db:close(Db),
send_json(Req, 201, {obj, [{ok, true}]});
{error, database_already_exists} ->
Msg = io_lib:format("Database ~p already exists.", [DbName]),
@@ -167,9 +168,13 @@ handle_db_request(Req, 'PUT', {DbName, []}) ->
end;
handle_db_request(Req, Method, {DbName, Rest}) ->
- case couch_server:open(DbName) of
+ case couch_db:open(DbName, []) of
{ok, Db} ->
- handle_db_request(Req, Method, {DbName, Db, Rest});
+ try
+ handle_db_request(Req, Method, {DbName, Db, Rest})
+ after
+ couch_db:close(Db)
+ end;
Error ->
throw(Error)
end;
diff --git a/src/couchdb/couch_rep.erl b/src/couchdb/couch_rep.erl
index b2d46beb..f7aaa67c 100644
--- a/src/couchdb/couch_rep.erl
+++ b/src/couchdb/couch_rep.erl
@@ -43,7 +43,18 @@ replicate(DbNameA, DbNameB) ->
replicate(Source, Target, Options) ->
{ok, DbSrc} = open_db(Source),
- {ok, DbTgt} = open_db(Target),
+ try
+ {ok, DbTgt} = open_db(Target),
+ try
+ replicate2(Source, DbSrc, Target, DbTgt, Options)
+ after
+ close_db(DbTgt)
+ end
+ after
+ close_db(DbSrc)
+ end.
+
+replicate2(Source, DbSrc, Target, DbTgt, Options) ->
{ok, HostName} = inet:gethostname(),
RepRecKey = ?LOCAL_DOC_PREFIX ++ HostName ++ ":" ++ Source ++ ":" ++ Target,
@@ -237,7 +248,12 @@ open_db("http" ++ DbName)->
{ok, "http" ++ DbName ++ "/"}
end;
open_db(DbName)->
- couch_server:open(DbName).
+ couch_db:open(DbName, []).
+
+close_db("http" ++ _)->
+ ok;
+close_db(DbName)->
+ couch_db:close(DbName).
enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->
diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl
index bb3617b2..86cdb2f8 100644
--- a/src/couchdb/couch_server.erl
+++ b/src/couchdb/couch_server.erl
@@ -15,7 +15,7 @@
-behaviour(application).
-export([start/0,start/1,start/2,stop/0,stop/1]).
--export([open/1,create/2,delete/1,all_databases/0,get_version/0]).
+-export([open/2,create/2,delete/1,all_databases/0,get_version/0]).
-export([init/1, handle_call/3,sup_start_link/2]).
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
-export([dev_start/0,remote_restart/0]).
@@ -25,7 +25,9 @@
-record(server,{
root_dir = [],
dbname_regexp,
- options=[]
+ remote_restart=[],
+ max_dbs_open=100,
+ current_dbs_open=0
}).
start() ->
@@ -64,33 +66,41 @@ get_version() ->
sup_start_link(RootDir, Options) ->
gen_server:start_link({local, couch_server}, couch_server, {RootDir, Options}, []).
-open(Filename) ->
- gen_server:call(couch_server, {open, Filename}).
+open(DbName, Options) ->
+ gen_server:call(couch_server, {open, DbName, Options}).
-create(Filename, Options) ->
- gen_server:call(couch_server, {create, Filename, Options}).
+create(DbName, Options) ->
+ gen_server:call(couch_server, {create, DbName, Options}).
-delete(Filename) ->
- gen_server:call(couch_server, {delete, Filename}).
+delete(DbName) ->
+ gen_server:call(couch_server, {delete, DbName}).
remote_restart() ->
gen_server:call(couch_server, remote_restart).
-init({RootDir, Options}) ->
- {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
- {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, options=Options}}.
-
-check_filename(#server{dbname_regexp=RegExp}, Filename) ->
- case regexp:match(Filename, RegExp) of
+check_dbname(#server{dbname_regexp=RegExp}, DbName) ->
+ case regexp:match(DbName, RegExp) of
nomatch ->
{error, illegal_database_name};
_Match ->
ok
end.
-get_full_filename(Server, Filename) ->
- filename:join([Server#server.root_dir, "./" ++ Filename ++ ".couch"]).
+get_full_filename(Server, DbName) ->
+ filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]).
+init({RootDir, Options}) ->
+ {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
+ ets:new(couch_dbs_by_name, [set, private, named_table]),
+ ets:new(couch_dbs_by_pid, [set, private, named_table]),
+ ets:new(couch_dbs_by_lru, [ordered_set, private, named_table]),
+ process_flag(trap_exit, true),
+ MaxDbsOpen = proplists:get_value(max_dbs_open, Options),
+ RemoteRestart = proplists:get_value(remote_restart, Options),
+ {ok, #server{root_dir=RootDir,
+ dbname_regexp=RegExp,
+ max_dbs_open=MaxDbsOpen,
+ remote_restart=RemoteRestart}}.
terminate(_Reason, _Server) ->
ok.
@@ -109,107 +119,141 @@ all_databases() ->
{ok, Filenames}.
+maybe_close_lru_db(#server{current_dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server)
+ when NumOpen < MaxOpen ->
+ {ok, Server};
+maybe_close_lru_db(#server{current_dbs_open=NumOpen}=Server) ->
+ % must free up the lru db.
+ case try_close_lru(now()) of
+ ok -> {ok, Server#server{current_dbs_open=NumOpen-1}};
+ Error -> Error
+ end.
+
+try_close_lru(StartTime) ->
+ LruTime = ets:first(couch_dbs_by_lru),
+ if LruTime > StartTime ->
+ % this means we've looped through all our opened dbs and found them
+ % all in use.
+ {error, all_dbs_active};
+ true ->
+ [{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
+ [{_, {MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName),
+ case couch_db:num_refs(MainPid) of
+ 0 ->
+ exit(MainPid, kill),
+ receive {'EXIT', MainPid, _Reason} -> ok end,
+ true = ets:delete(couch_dbs_by_lru, LruTime),
+ true = ets:delete(couch_dbs_by_name, DbName),
+ true = ets:delete(couch_dbs_by_pid, MainPid),
+ ok;
+ _NumRefs ->
+ % this still has referrers. Go ahead and give it a current lru time
+ % and try the next one in the table.
+ NewLruTime = now(),
+ true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, NewLruTime}}),
+ true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
+ true = ets:delete(couch_dbs_by_lru, LruTime),
+ true = ets:insert(couch_dbs_by_lru, {NewLruTime, DbName}),
+ try_close_lru(StartTime)
+ end
+ end.
+
+
handle_call(get_root, _From, #server{root_dir=Root}=Server) ->
{reply, {ok, Root}, Server};
-handle_call({open, Filename}, From, Server) ->
- case check_filename(Server, Filename) of
- {error, Error} ->
- {reply, {error, Error}, Server};
- ok ->
- Filepath = get_full_filename(Server, Filename),
- Result = supervisor:start_child(couch_server_sup,
- {Filename,
- {couch_db, open, [Filename, Filepath]},
- transient ,
- infinity,
- supervisor,
- [couch_db]}),
- case Result of
- {ok, Db} ->
- {reply, {ok, Db}, Server};
- {error, already_present} ->
- ok = supervisor:delete_child(couch_server_sup, Filename),
- % call self recursively
- handle_call({open, Filename}, From, Server);
- {error, {already_started, Db}} ->
- {reply, {ok, Db}, Server};
- {error, {not_found, _}} ->
- {reply, not_found, Server};
- {error, {Error, _}} ->
- {reply, {error, Error}, Server}
- end
+handle_call({open, DbName, Options}, {FromPid,_}, Server) ->
+ Filepath = get_full_filename(Server, DbName),
+ LruTime = now(),
+ case ets:lookup(couch_dbs_by_name, DbName) of
+ [] ->
+ case maybe_close_lru_db(Server) of
+ {ok, Server2} ->
+ case couch_db:start_link(DbName, Filepath, Options) of
+ {ok, MainPid} ->
+ true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}),
+ true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
+ true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
+ DbsOpen = Server2#server.current_dbs_open + 1,
+ {reply,
+ couch_db:open_ref_counted(MainPid, FromPid),
+ Server2#server{current_dbs_open=DbsOpen}};
+ CloseError ->
+ {reply, CloseError, Server2}
+ end;
+ Error ->
+ {reply, Error, Server}
+ end;
+ [{_, {MainPid, PrevLruTime}}] ->
+ true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}),
+ true = ets:delete(couch_dbs_by_lru, PrevLruTime),
+ true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
+ {reply, couch_db:open_ref_counted(MainPid, FromPid), Server}
end;
-handle_call({create, Filename, Options}, _From, Server) ->
- case check_filename(Server, Filename) of
- {error, Error} ->
- {reply, {error, Error}, Server};
+handle_call({create, DbName, Options}, {FromPid,_}, Server) ->
+ case check_dbname(Server, DbName) of
ok ->
- Filepath = get_full_filename(Server, Filename),
- ChildSpec = {Filename,
- {couch_db, create, [Filename, Filepath, Options]},
- transient,
- infinity,
- supervisor,
- [couch_db]},
- Result =
- case supervisor:delete_child(couch_server_sup, Filename) of
- ok ->
- sup_start_child(couch_server_sup, ChildSpec);
- {error, not_found} ->
- sup_start_child(couch_server_sup, ChildSpec);
- {error, running} ->
- % a server process for this database already started. Maybe kill it
- case lists:member(overwrite, Options) of
- true ->
- supervisor:terminate_child(couch_server_sup, Filename),
- ok = supervisor:delete_child(couch_server_sup, Filename),
- sup_start_child(couch_server_sup, ChildSpec);
- false ->
- {error, database_already_exists}
- end
- end,
- case Result of
- {ok, _Db} -> couch_db_update_notifier:notify({created, Filename});
- _ -> ok
- end,
- {reply, Result, Server}
+ Filepath = get_full_filename(Server, DbName),
+
+ case ets:lookup(couch_dbs_by_name, DbName) of
+ [] ->
+ case couch_db:start_link(DbName, Filepath, [create|Options]) of
+ {ok, MainPid} ->
+ LruTime = now(),
+ true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}),
+ true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
+ true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
+ DbsOpen = Server#server.current_dbs_open + 1,
+ {reply,
+ couch_db:open_ref_counted(MainPid, FromPid),
+ Server#server{current_dbs_open=DbsOpen}};
+ Error ->
+ {reply, Error, Server}
+ end;
+ [_AlreadyRunningDb] ->
+ {reply, {error, file_exists}, Server}
+ end;
+ Error ->
+ {reply, Error, Server}
end;
-handle_call({delete, Filename}, _From, Server) ->
- FullFilepath = get_full_filename(Server, Filename),
- supervisor:terminate_child(couch_server_sup, Filename),
- supervisor:delete_child(couch_server_sup, Filename),
+handle_call({delete, DbName}, _From, Server) ->
+ FullFilepath = get_full_filename(Server, DbName),
+ Server2 =
+ case ets:lookup(couch_dbs_by_name, DbName) of
+ [] -> Server;
+ [{_, {Pid, LruTime}}] ->
+ exit(Pid, kill),
+ receive {'EXIT', Pid, _Reason} -> ok end,
+ true = ets:delete(couch_dbs_by_name, DbName),
+ true = ets:delete(couch_dbs_by_pid, Pid),
+ true = ets:delete(couch_dbs_by_lru, LruTime),
+ DbsOpen = Server#server.current_dbs_open - 1,
+ Server#server{current_dbs_open=DbsOpen}
+ end,
case file:delete(FullFilepath) of
ok ->
- couch_db_update_notifier:notify({deleted, Filename}),
- {reply, ok, Server};
+ couch_db_update_notifier:notify({deleted, DbName}),
+ {reply, ok, Server2};
{error, enoent} ->
- {reply, not_found, Server};
+ {reply, not_found, Server2};
Else ->
- {reply, Else, Server}
+ {reply, Else, Server2}
end;
-handle_call(remote_restart, _From, #server{options=Options}=Server) ->
- case proplists:get_value(remote_restart, Options) of
- true ->
- exit(self(), restart);
- _ ->
- ok
- end,
+handle_call(remote_restart, _From, #server{remote_restart=false}=Server) ->
+ {reply, ok, Server};
+handle_call(remote_restart, _From, #server{remote_restart=true}=Server) ->
+ exit(couch_server_sup, restart),
{reply, ok, Server}.
-% this function is just to strip out the child spec error stuff if hit
-sup_start_child(couch_server_sup, ChildSpec) ->
- case supervisor:start_child(couch_server_sup, ChildSpec) of
- {error, {Error, _ChildInfo}} ->
- {error, Error};
- Else ->
- Else
- end.
-
-handle_cast(_Msg, State) ->
- {noreply,State}.
+handle_cast(Msg, _Server) ->
+ exit({unknown_cast_message, Msg}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-handle_info(_Info, State) ->
- {noreply, State}.
+handle_info({'EXIT', Pid, _Reason}, Server) ->
+ [{Pid, DbName}] = ets:lookup(couch_dbs_by_pid, Pid),
+ true = ets:delete(couch_dbs_by_pid, Pid),
+ true = ets:delete(couch_dbs_by_name, DbName),
+ {noreply, Server};
+handle_info(Info, _Server) ->
+ exit({unknown_message, Info}).
diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl
index 7a628eb9..5d4099bf 100644
--- a/src/couchdb/couch_server_sup.erl
+++ b/src/couchdb/couch_server_sup.erl
@@ -74,8 +74,9 @@ start_server(InputIniFilename) ->
UtilDriverDir = proplists:get_value({"Couch", "UtilDriverDir"}, Ini, ""),
UpdateNotifierExes = proplists:get_all_values({"Couch", "DbUpdateNotificationProcess"}, Ini),
FtSearchQueryServer = proplists:get_value({"Couch", "FullTextSearchQueryServer"}, Ini, ""),
- RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "undefined")),
- ServerOptions = [{remote_restart, RemoteRestart}],
+ RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "false")),
+ MaxDbsOpen = proplists:get_value({"Couch", "MaxDbsOpen"}, Ini, 100),
+ ServerOptions = [{remote_restart, RemoteRestart}, {max_dbs_open, MaxDbsOpen}],
QueryServers = [{Lang, QueryExe} || {{"Couch Query Servers", Lang}, QueryExe} <- Ini],
ChildProcesses =
diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl
index 53cc4cda..a29a809c 100644
--- a/src/couchdb/couch_view.erl
+++ b/src/couchdb/couch_view.erl
@@ -57,22 +57,22 @@ get_updater(DbName, GroupId) ->
Pid.
get_updated_group(Pid) ->
- Mref = erlang:monitor(process, Pid),
+ Mref = erlang:monitor(process, Pid),
receive
- {'DOWN', Mref, _, _, Reason} ->
- throw(Reason)
+ {'DOWN', Mref, _, _, Reason} ->
+ throw(Reason)
after 0 ->
- Pid ! {self(), get_updated},
- receive
- {Pid, Response} ->
- erlang:demonitor(Mref),
- receive
- {'DOWN', Mref, _, _, _} -> ok
- after 0 -> ok
- end,
- Response;
- {'DOWN', Mref, _, _, Reason} ->
- throw(Reason)
+ Pid ! {self(), get_updated},
+ receive
+ {Pid, Response} ->
+ erlang:demonitor(Mref),
+ receive
+ {'DOWN', Mref, _, _, _} -> ok
+ after 0 -> ok
+ end,
+ Response;
+ {'DOWN', Mref, _, _, Reason} ->
+ throw(Reason)
end
end.
@@ -216,10 +216,7 @@ init(RootDir) ->
{ok, #server{root_dir=RootDir}}.
terminate(_Reason, _) ->
- catch ets:delete(couch_views_by_name),
- catch ets:delete(couch_views_by_updater),
- catch ets:delete(couch_views_by_db),
- catch ets:delete(couch_views_temp_fd_by_db).
+ ok.
handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{root_dir=Root}=Server) ->
@@ -317,7 +314,7 @@ code_change(_OldVsn, State, _Extra) ->
start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
NotifyPids = get_notify_pids(1000),
- case couch_server:open(DbName) of
+ case couch_db:open(DbName, []) of
{ok, Db} ->
View = #view{map_names=["_temp"],
id_num=0,
@@ -331,16 +328,20 @@ start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
def_lang=Lang,
id_btree=nil},
Group2 = init_group(Db, Fd, Group,nil),
- temp_update_loop(Group2, NotifyPids);
+ couch_db:monitor(Db),
+ couch_db:close(Db),
+ temp_update_loop(DbName, Group2, NotifyPids);
Else ->
exit(Else)
end.
-temp_update_loop(Group, NotifyPids) ->
- {ok, Group2} = update_group(Group),
+temp_update_loop(DbName, Group, NotifyPids) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ {ok, Group2} = update_group(Group#group{db=Db}),
+ couch_db:close(Db),
[Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
garbage_collect(),
- temp_update_loop(Group2, get_notify_pids(10000)).
+ temp_update_loop(DbName, Group2, get_notify_pids(10000)).
reset_group(#group{views=Views}=Group) ->
@@ -355,21 +356,21 @@ start_update_loop(RootDir, DbName, GroupId) ->
start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
{Db, Group} =
- case (catch couch_server:open(DbName)) of
+ case (catch couch_db:open(DbName, [])) of
{ok, Db0} ->
case (catch couch_db:open_doc(Db0, GroupId)) of
{ok, Doc} ->
{Db0, design_doc_to_view_group(Doc)};
- Else ->
- delete_index_file(RootDir, DbName, GroupId),
- exit(Else)
- end;
- Else ->
- delete_index_file(RootDir, DbName, GroupId),
- exit(Else)
- end,
- FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
- Group2 =
+ Else ->
+ delete_index_file(RootDir, DbName, GroupId),
+ exit(Else)
+ end;
+ Else ->
+ delete_index_file(RootDir, DbName, GroupId),
+ exit(Else)
+ end,
+ FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
+ Group2 =
case couch_file:open(FileName) of
{ok, Fd} ->
Sig = Group#group.sig,
@@ -386,7 +387,8 @@ start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
Error -> throw(Error)
end
end,
-
+ couch_db:monitor(Db),
+ couch_db:close(Db),
update_loop(RootDir, DbName, GroupId, Group2, NotifyPids).
reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
@@ -396,14 +398,22 @@ reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
init_group(Db, Fd, reset_group(Group), nil).
update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) ->
- try update_group(Group) of
- {ok, Group2} ->
+ {ok, Db}= couch_db:open(DbName, []),
+ Result =
+ try
+ update_group(Group#group{db=Db})
+ catch
+ throw: restart -> restart
+ after
+ couch_db:close(Db)
+ end,
+ case Result of
+ {ok, Group2} ->
HeaderData = {Sig, get_index_header_data(Group2)},
ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
[Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
garbage_collect(),
- update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000))
- catch
+ update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000));
restart ->
couch_file:close(Group#group.fd),
start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids())
@@ -414,20 +424,23 @@ get_notify_pids(Wait) ->
receive
{Pid, get_updated} ->
[Pid | get_notify_pids()];
+ {'DOWN', _MonitorRef, _Type, _Pid, _Info} ->
+ ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []),
+ exit(normal);
Else ->
?LOG_ERROR("Unexpected message in view updater: ~p", [Else]),
exit({error, Else})
after Wait ->
exit(wait_timeout)
- end.
+ end.
% then keep getting all available and return.
get_notify_pids() ->
receive
{Pid, get_updated} ->
[Pid | get_notify_pids()]
- after 0 ->
- []
- end.
+ after 0 ->
+ []
+ end.
update_group(#group{db=Db,current_seq=CurrentSeq, views=Views}=Group) ->
ViewEmptyKVs = [{View, []} || View <- Views],