summaryrefslogtreecommitdiff
path: root/apps/couch/src/couch_db.erl
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-18 11:51:03 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-18 14:24:57 -0400
commit7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch)
tree754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src/couch_db.erl
parentc0cb2625f25a2b51485c164bea1d8822f449ce14 (diff)
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src/couch_db.erl')
-rw-r--r--apps/couch/src/couch_db.erl198
1 files changed, 94 insertions, 104 deletions
diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl
index 7678f6ca..a3112e24 100644
--- a/apps/couch/src/couch_db.erl
+++ b/apps/couch/src/couch_db.erl
@@ -11,7 +11,6 @@
% the License.
-module(couch_db).
--behaviour(gen_server).
-export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
@@ -22,21 +21,20 @@
-export([enum_docs/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3,open_doc_int/3,ensure_full_commit/1]).
+-export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
-export([set_security/2,get_security/1]).
--export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]).
--export([check_is_admin/1, check_is_reader/1]).
+-export([check_is_admin/1, check_is_reader/1, get_doc_count/1, load_validation_funs/1]).
-include("couch_db.hrl").
-
start_link(DbName, Filepath, Options) ->
case open_db_file(Filepath, Options) of
{ok, Fd} ->
- StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
+ {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
+ Filepath, Fd, Options}, []),
unlink(Fd),
- StartResult;
+ gen_server:call(UpdaterPid, get_db);
Else ->
Else
end.
@@ -52,7 +50,7 @@ open_db_file(Filepath, Options) ->
{ok, Fd} ->
?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
ok = file:rename(Filepath ++ ".compact", Filepath),
- ok = couch_file:sync(Fd),
+ ok = couch_file:sync(Filepath),
{ok, Fd};
{error, enoent} ->
{not_found, no_db_file}
@@ -86,24 +84,33 @@ open(DbName, Options) ->
Else -> Else
end.
-ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
- ok = gen_server:call(UpdatePid, full_commit, infinity),
+ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
+ ok = gen_server:call(Pid, full_commit, infinity),
+ {ok, StartTime}.
+
+ensure_full_commit(Db, RequiredSeq) ->
+ #db{main_pid=Pid, instance_start_time=StartTime} = Db,
+ ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
{ok, StartTime}.
-close(#db{fd_ref_counter=RefCntr}) ->
- couch_ref_counter:drop(RefCntr).
+close(#db{fd_monitor=RefCntr}) ->
+ erlang:demonitor(RefCntr).
open_ref_counted(MainPid, OpenedPid) ->
gen_server:call(MainPid, {open_ref_count, OpenedPid}).
-is_idle(MainPid) ->
- gen_server:call(MainPid, is_idle).
+is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
+ {monitored_by, Pids} = erlang:process_info(Db#db.fd, monitored_by),
+ (Pids -- [Db#db.main_pid, whereis(couch_stats_collector)]) =:= [];
+is_idle(_Db) ->
+ false.
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
-start_compact(#db{update_pid=Pid}) ->
- gen_server:call(Pid, start_compact).
+start_compact(#db{main_pid=Pid}) ->
+ {ok, _} = gen_server:call(Pid, start_compact),
+ ok.
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
@@ -210,13 +217,13 @@ get_full_doc_info(Db, Id) ->
Result.
get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
+ couch_btree:lookup(Db#db.id_tree, Ids).
-increment_update_seq(#db{update_pid=UpdatePid}) ->
- gen_server:call(UpdatePid, increment_update_seq).
+increment_update_seq(#db{main_pid=Pid}) ->
+ gen_server:call(Pid, increment_update_seq).
-purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
- gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
+purge_docs(#db{main_pid=Pid}, IdsRevs) ->
+ gen_server:call(Pid, {purge_docs, IdsRevs}).
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
@@ -232,13 +239,17 @@ get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
couch_file:pread_term(Fd, PurgedPointer).
+get_doc_count(Db) ->
+ {ok, {Count, _DelCount}} = couch_btree:full_reduce(Db#db.id_tree),
+ {ok, Count}.
+
get_db_info(Db) ->
#db{fd=Fd,
header=#db_header{disk_version=DiskVersion},
compactor_pid=Compactor,
update_seq=SeqNum,
name=Name,
- fulldocinfo_by_id_btree=FullDocBtree,
+ id_tree=FullDocBtree,
instance_start_time=StartTime,
committed_update_seq=CommittedUpdateSeq} = Db,
{ok, Size} = couch_file:bytes(Fd),
@@ -257,7 +268,12 @@ get_db_info(Db) ->
],
{ok, InfoList}.
-get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) ->
+get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
+ {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
+ receive {'DOWN', Ref, _, _, Response} ->
+ Response
+ end;
+get_design_docs(#db{id_tree=Btree}=Db) ->
{ok,_, Docs} = couch_btree:fold(Btree,
fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) ->
{ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []),
@@ -319,7 +335,7 @@ get_readers(#db{security=SecProps}) ->
get_security(#db{security=SecProps}) ->
{SecProps}.
-set_security(#db{update_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
+set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
ok = validate_security_object(NewSecProps),
ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
@@ -354,7 +370,7 @@ validate_names_and_roles({Props}) when is_list(Props) ->
get_revs_limit(#db{revs_limit=Limit}) ->
Limit.
-set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 ->
+set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
set_revs_limit(_Db, _Limit) ->
@@ -406,6 +422,9 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
catch check_is_admin(Db);
+validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
+ ValidationFuns = load_validation_funs(Db),
+ validate_doc_update(Db#db{validate_doc_funs = ValidationFuns}, Doc, Fun);
validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
ok;
validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
@@ -424,6 +443,27 @@ validate_doc_update(Db, Doc, GetDiskDocFun) ->
Error
end.
+% to be safe, spawn a middleman here
+load_validation_funs(#db{main_pid = Pid} = Db) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ {ok, DesignDocs} = get_design_docs(Db),
+ exit({ok, lists:flatmap(fun(DesignDoc) ->
+ case couch_doc:get_validate_doc_fun(DesignDoc) of
+ nil ->
+ [];
+ Fun ->
+ [Fun]
+ end
+ end, DesignDocs)})
+ end),
+ receive
+ {'DOWN', Ref, _, _, {ok, Funs}} ->
+ gen_server:cast(Pid, {load_validation_funs, Funs}),
+ Funs;
+ {'DOWN', Ref, _, _, Reason} ->
+ ?LOG_ERROR("could not load validation funs ~p", [Reason]),
+ throw(internal_server_error)
+ end.
prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
OldFullDocInfo, LeafRevsDict, AllowConflict) ->
@@ -512,8 +552,8 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
[PreppedBucket | AccPrepped], AccErrors3).
-update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
- update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit).
+update_docs(Db, Docs, Options) ->
+ update_docs(Db, Docs, Options, interactive_edit).
prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
@@ -743,37 +783,37 @@ set_commit_option(Options) ->
[full_commit|Options]
end.
-collect_results(UpdatePid, MRef, ResultsAcc) ->
+collect_results(Pid, MRef, ResultsAcc) ->
receive
- {result, UpdatePid, Result} ->
- collect_results(UpdatePid, MRef, [Result | ResultsAcc]);
- {done, UpdatePid} ->
+ {result, Pid, Result} ->
+ collect_results(Pid, MRef, [Result | ResultsAcc]);
+ {done, Pid} ->
{ok, ResultsAcc};
- {retry, UpdatePid} ->
+ {retry, Pid} ->
retry;
{'DOWN', MRef, _, _, Reason} ->
exit(Reason)
end.
-write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
+write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets,
NonRepDocs, Options0) ->
Options = set_commit_option(Options0),
MergeConflicts = lists:member(merge_conflicts, Options),
FullCommit = lists:member(full_commit, Options),
- MRef = erlang:monitor(process, UpdatePid),
+ MRef = erlang:monitor(process, Pid),
try
- UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
- case collect_results(UpdatePid, MRef, []) of
+ Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
- {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
+ {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
- UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
- case collect_results(UpdatePid, MRef, []) of
+ Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry -> throw({update_error, compaction_retry})
end
@@ -921,9 +961,15 @@ enum_docs_reduce_to_count(Reds) ->
changes_since(Db, Style, StartSeq, Fun, Acc) ->
changes_since(Db, Style, StartSeq, Fun, [], Acc).
-
+
changes_since(Db, Style, StartSeq, Fun, Options, Acc) ->
- Wrapper = fun(DocInfo, _Offset, Acc2) ->
+ Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
+ case FullDocInfo of
+ #full_doc_info{} ->
+ DocInfo = couch_doc:to_doc_info(FullDocInfo);
+ #doc_info{} ->
+ DocInfo = FullDocInfo
+ end,
#doc_info{revs=Revs} = DocInfo,
DocInfo2 =
case Style of
@@ -936,83 +982,27 @@ changes_since(Db, Style, StartSeq, Fun, Options, Acc) ->
end,
Fun(DocInfo2, Acc2)
end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
- Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
+ {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, Wrapper,
+ Acc, [{start_key, couch_util:to_integer(StartSeq) + 1} | Options]),
{ok, AccOut}.
count_changes_since(Db, SinceSeq) ->
{ok, Changes} =
- couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree,
+ couch_btree:fold_reduce(Db#db.seq_tree,
fun(_SeqStart, PartialReds, 0) ->
- {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)}
+ {ok, couch_btree:final_reduce(Db#db.seq_tree, PartialReds)}
end,
0, [{start_key, SinceSeq + 1}]),
Changes.
enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
- {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
+ {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
{ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
enum_docs(Db, InFun, InAcc, Options) ->
- {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree, InFun, InAcc, Options),
+ {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.id_tree, InFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
-% server functions
-
-init({DbName, Filepath, Fd, Options}) ->
- {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
- {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db),
- couch_ref_counter:add(RefCntr),
- case lists:member(sys_db, Options) of
- true ->
- ok;
- false ->
- couch_stats_collector:track_process_count({couchdb, open_databases})
- end,
- process_flag(trap_exit, true),
- {ok, Db}.
-
-terminate(_Reason, Db) ->
- couch_util:shutdown_sync(Db#db.update_pid),
- ok.
-
-handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) ->
- ok = couch_ref_counter:add(RefCntr, OpenerPid),
- {reply, {ok, Db}, Db};
-handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact,
- waiting_delayed_commit=Delay}=Db) ->
- % Idle means no referrers. Unless in the middle of a compaction file switch,
- % there are always at least 2 referrers, couch_db_updater and us.
- {reply, (Delay == nil) andalso (Compact == nil) andalso (couch_ref_counter:count(RefCntr) == 2), Db};
-handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) ->
- #db{fd_ref_counter=NewRefCntr}=NewDb,
- case NewRefCntr =:= OldRefCntr of
- true -> ok;
- false ->
- couch_ref_counter:add(NewRefCntr),
- couch_ref_counter:drop(OldRefCntr)
- end,
- {reply, ok, NewDb};
-handle_call(get_db, _From, Db) ->
- {reply, {ok, Db}, Db}.
-
-
-handle_cast(Msg, Db) ->
- ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]),
- exit({error, Msg}).
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-handle_info({'EXIT', _Pid, normal}, Db) ->
- {noreply, Db};
-handle_info({'EXIT', _Pid, Reason}, Server) ->
- {stop, Reason, Server};
-handle_info(Msg, Db) ->
- ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
- exit({error, Msg}).
-
-
%%% Internal function %%%
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
@@ -1054,7 +1044,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) ->
- case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
+ case couch_btree:lookup(Db#db.local_tree, [Id]) of
[{ok, {_, {Rev, BodyData}}}] ->
{ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}};
[not_found] ->