summaryrefslogtreecommitdiff
path: root/apps/couch/src
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2010-08-18 11:51:03 -0400
committerAdam Kocoloski <adam@cloudant.com>2010-08-18 14:24:57 -0400
commit7393d62b7b630bee50f609d0ae8125d33f7cda2b (patch)
tree754e9ab17a586319c562de488e60056feff60bb8 /apps/couch/src
parentc0cb2625f25a2b51485c164bea1d8822f449ce14 (diff)
Grab bag of Cloudant patches to couch OTP application
- Removal of couch_db and couch_ref_counter processes. Active DBs are accessible through a protected ets table owned by couch_server. - #full_doc_info{} in by_id and by_seq trees for faster compaction at the expense of more disk usage afterwards. Proposed as COUCHDB-738 but not accepted upstream. - Replication via distributed Erlang. - Better hot upgrade support (uses exported functions much more often). - Configurable btree chunk sizes allow for larger (but still bounded) reductions. - Shorter names for btree fields in #db{} and #db_header{}. - couch_view_group does not keep a reference to the #db{}. - Terms are stored compressed (again).
Diffstat (limited to 'apps/couch/src')
-rw-r--r--apps/couch/src/couch_auth_cache.erl2
-rw-r--r--apps/couch/src/couch_btree.erl38
-rw-r--r--apps/couch/src/couch_changes.erl47
-rw-r--r--apps/couch/src/couch_config.erl44
-rw-r--r--apps/couch/src/couch_config_event.erl46
-rw-r--r--apps/couch/src/couch_db.erl198
-rw-r--r--apps/couch/src/couch_db_update_notifier_sup.erl8
-rw-r--r--apps/couch/src/couch_db_updater.erl303
-rw-r--r--apps/couch/src/couch_doc.erl98
-rw-r--r--apps/couch/src/couch_drv.erl38
-rw-r--r--apps/couch/src/couch_external_manager.erl2
-rw-r--r--apps/couch/src/couch_file.erl17
-rw-r--r--apps/couch/src/couch_os_process.erl20
-rw-r--r--apps/couch/src/couch_primary_sup.erl42
-rw-r--r--apps/couch/src/couch_query_servers.erl61
-rw-r--r--apps/couch/src/couch_rep.erl214
-rw-r--r--apps/couch/src/couch_rep_att.erl1
-rw-r--r--apps/couch/src/couch_rep_httpc.erl4
-rw-r--r--apps/couch/src/couch_rep_reader.erl2
-rw-r--r--apps/couch/src/couch_secondary_sup.erl35
-rw-r--r--apps/couch/src/couch_server.erl378
-rw-r--r--apps/couch/src/couch_server_sup.erl106
-rw-r--r--apps/couch/src/couch_stats_aggregator.erl7
-rw-r--r--apps/couch/src/couch_stats_collector.erl25
-rw-r--r--apps/couch/src/couch_task_status.erl1
-rw-r--r--apps/couch/src/couch_view.erl12
-rw-r--r--apps/couch/src/couch_view_compactor.erl15
-rw-r--r--apps/couch/src/couch_view_group.erl197
-rw-r--r--apps/couch/src/couch_view_updater.erl28
29 files changed, 1107 insertions, 882 deletions
diff --git a/apps/couch/src/couch_auth_cache.erl b/apps/couch/src/couch_auth_cache.erl
index 078bfcc1..0264a69d 100644
--- a/apps/couch/src/couch_auth_cache.erl
+++ b/apps/couch/src/couch_auth_cache.erl
@@ -282,6 +282,8 @@ refresh_entries(AuthDb) ->
end.
+refresh_entry(Db, #full_doc_info{} = FDI) ->
+ refresh_entry(Db, couch_doc:to_doc_info(FDI));
refresh_entry(Db, #doc_info{high_seq = DocSeq} = DocInfo) ->
case is_user_doc(DocInfo) of
{true, UserName} ->
diff --git a/apps/couch/src/couch_btree.erl b/apps/couch/src/couch_btree.erl
index 0e47bac7..4ed3fe54 100644
--- a/apps/couch/src/couch_btree.erl
+++ b/apps/couch/src/couch_btree.erl
@@ -16,23 +16,27 @@
-export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]).
-export([fold_reduce/4, lookup/2, get_state/1, set_options/2]).
--define(CHUNK_THRESHOLD, 16#4ff).
-
-record(btree,
{fd,
root,
- extract_kv = fun({Key, Value}) -> {Key, Value} end,
- assemble_kv = fun(Key, Value) -> {Key, Value} end,
- less = fun(A, B) -> A < B end,
+ extract_kv,
+ assemble_kv,
+ less,
reduce = nil
}).
+extract(#btree{extract_kv = undefined}, Value) ->
+ Value;
extract(#btree{extract_kv=Extract}, Value) ->
Extract(Value).
+assemble(#btree{assemble_kv = undefined}, Key, Value) ->
+ {Key, Value};
assemble(#btree{assemble_kv=Assemble}, Key, Value) ->
Assemble(Key, Value).
+less(#btree{less = undefined}, A, B) ->
+ A < B;
less(#btree{less=Less}, A, B) ->
Less(A, B).
@@ -106,29 +110,29 @@ convert_fun_arity(Fun) when is_function(Fun, 2) ->
convert_fun_arity(Fun) when is_function(Fun, 3) ->
Fun. % Already arity 3
-make_key_in_end_range_function(#btree{less=Less}, fwd, Options) ->
+make_key_in_end_range_function(Bt, fwd, Options) ->
case couch_util:get_value(end_key_gt, Options) of
undefined ->
case couch_util:get_value(end_key, Options) of
undefined ->
fun(_Key) -> true end;
LastKey ->
- fun(Key) -> not Less(LastKey, Key) end
+ fun(Key) -> not less(Bt, LastKey, Key) end
end;
EndKey ->
- fun(Key) -> Less(Key, EndKey) end
+ fun(Key) -> less(Bt, Key, EndKey) end
end;
-make_key_in_end_range_function(#btree{less=Less}, rev, Options) ->
+make_key_in_end_range_function(Bt, rev, Options) ->
case couch_util:get_value(end_key_gt, Options) of
undefined ->
case couch_util:get_value(end_key, Options) of
undefined ->
fun(_Key) -> true end;
LastKey ->
- fun(Key) -> not Less(Key, LastKey) end
+ fun(Key) -> not less(Bt, Key, LastKey) end
end;
EndKey ->
- fun(Key) -> Less(EndKey, Key) end
+ fun(Key) -> less(Bt, EndKey, Key) end
end.
@@ -198,7 +202,11 @@ op_order(remove) -> 2;
op_order(insert) -> 3.
lookup(#btree{root=Root, less=Less}=Bt, Keys) ->
- SortedKeys = lists:sort(Less, Keys),
+ case Less of undefined ->
+ SortedKeys = lists:sort(Keys);
+ _ ->
+ SortedKeys = lists:sort(Less, Keys)
+ end,
{ok, SortedResults} = lookup(Bt, Root, SortedKeys),
% We want to return the results in the same order as the keys were input
% but we may have changed the order when we sorted. So we need to put the
@@ -271,9 +279,11 @@ complete_root(Bt, KPs) ->
% it's probably really inefficient.
chunkify(InList) ->
+ BaseChunkSize = list_to_integer(couch_config:get("couchdb",
+ "btree_chunk_size", "1279")),
case byte_size(term_to_binary(InList)) of
- Size when Size > ?CHUNK_THRESHOLD ->
- NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1),
+ Size when Size > BaseChunkSize ->
+ NumberOfChunksLikely = ((Size div BaseChunkSize) + 1),
ChunkThreshold = Size div NumberOfChunksLikely,
chunkify(InList, ChunkThreshold, [], 0, []);
_Else ->
diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl
index 3a5bc4f8..a9c08509 100644
--- a/apps/couch/src/couch_changes.erl
+++ b/apps/couch/src/couch_changes.erl
@@ -13,12 +13,13 @@
-module(couch_changes).
-include("couch_db.hrl").
--export([handle_changes/3]).
+-export([handle_changes/3, get_changes_timeout/2, main_only_filter/1,
+ all_docs_filter/1, wait_db_updated/2, get_rest_db_updated/0,
+ make_filter_fun/4]).
-%% @type Req -> #httpd{} | {json_req, JsonObj()}
-handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
- Args = Args1#changes_args{filter=
- make_filter_fun(Args1#changes_args.filter, Style, Req, Db)},
+%% @spec handle_changes(#changes_args{}, #httpd{} | {json_req, {[any()]}}, #db{}) -> any()
+handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) ->
+ Args = Args1#changes_args{filter=make_filter_fun(Raw, Style, Req, Db)},
StartSeq = case Args#changes_args.dir of
rev ->
couch_db:get_update_seq(Db);
@@ -68,20 +69,12 @@ handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
end
end.
-%% @type Req -> #httpd{} | {json_req, JsonObj()}
-make_filter_fun(FilterName, Style, Req, Db) ->
- case [list_to_binary(couch_httpd:unquote(Part))
- || Part <- string:tokens(FilterName, "/")] of
+%% @spec make_filter_fun(string(), main_only|all_docs, #httpd{} | {json_req,
+%% {[any()]}}, #db{}) -> fun()
+make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) ->
+ case [?l2b(couch_httpd:unquote(X)) || X <- string:tokens(Filter, "/")] of
[] ->
- fun(#doc_info{revs=[#rev_info{rev=Rev}|_]=Revs}) ->
- case Style of
- main_only ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
- all_docs ->
- [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
- || #rev_info{rev=R} <- Revs]
- end
- end;
+ make_filter_fun(nil, Style, Req, Db);
[DName, FName] ->
DesignId = <<"_design/", DName/binary>>,
DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
@@ -105,11 +98,21 @@ make_filter_fun(FilterName, Style, Req, Db) ->
[{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
|| {Pass, #doc{revs={RevPos,[RevId|_]}}}
<- lists:zip(Passes, Docs), Pass == true]
- end;
+ end;
_Else ->
throw({bad_request,
"filter parameter must be of the form `designname/filtername`"})
- end.
+ end;
+make_filter_fun(_, main_only, _, _) ->
+ fun ?MODULE:main_only_filter/1;
+make_filter_fun(_, all_docs, _, _) ->
+ fun ?MODULE:all_docs_filter/1.
+
+main_only_filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}].
+
+all_docs_filter(#doc_info{revs=Revs}) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs].
get_changes_timeout(Args, Callback) ->
#changes_args{
@@ -205,8 +208,8 @@ end_sending_changes(Callback, EndSeq, ResponseType) ->
changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous",
Limit, IncludeDocs}) ->
- #doc_info{id=Id, high_seq=Seq,
- revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo,
+ #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
+ = DocInfo,
Results0 = FilterFun(DocInfo),
Results = [Result || Result <- Results0, Result /= null],
Go = if Limit =< 1 -> stop; true -> ok end,
diff --git a/apps/couch/src/couch_config.erl b/apps/couch/src/couch_config.erl
index be53e3a3..73abdfd5 100644
--- a/apps/couch/src/couch_config.erl
+++ b/apps/couch/src/couch_config.erl
@@ -88,7 +88,7 @@ register(Fun) ->
?MODULE:register(Fun, self()).
register(Fun, Pid) ->
- gen_server:call(?MODULE, {register, Fun, Pid}).
+ couch_config_event:register(Fun, Pid).
init(IniFiles) ->
@@ -111,7 +111,7 @@ terminate(_Reason, _State) ->
handle_call(all, _From, Config) ->
Resp = lists:sort((ets:tab2list(?MODULE))),
{reply, Resp, Config};
-handle_call({set, Sec, Key, Val, Persist}, From, Config) ->
+handle_call({set, Sec, Key, Val, Persist}, _From, Config) ->
true = ets:insert(?MODULE, {{Sec, Key}, Val}),
case {Persist, Config#config.write_filename} of
{true, undefined} ->
@@ -121,12 +121,10 @@ handle_call({set, Sec, Key, Val, Persist}, From, Config) ->
_ ->
ok
end,
- spawn_link(fun() ->
- [catch F(Sec, Key, Val, Persist) || {_Pid, F} <- Config#config.notify_funs],
- gen_server:reply(From, ok)
- end),
- {noreply, Config};
-handle_call({delete, Sec, Key, Persist}, From, Config) ->
+ Event = {config_change, Sec, Key, Val, Persist},
+ gen_event:sync_notify(couch_config_event, Event),
+ {reply, ok, Config};
+handle_call({delete, Sec, Key, Persist}, _From, Config) ->
true = ets:delete(?MODULE, {Sec,Key}),
case {Persist, Config#config.write_filename} of
{true, undefined} ->
@@ -136,26 +134,9 @@ handle_call({delete, Sec, Key, Persist}, From, Config) ->
_ ->
ok
end,
- spawn_link(fun() ->
- [catch F(Sec, Key, deleted, Persist) || {_Pid, F} <- Config#config.notify_funs],
- gen_server:reply(From, ok)
- end),
- {noreply, Config};
-handle_call({register, Fun, Pid}, _From, #config{notify_funs=PidFuns}=Config) ->
- erlang:monitor(process, Pid),
- % convert 1 and 2 arity to 3 arity
- Fun2 =
- case Fun of
- _ when is_function(Fun, 1) ->
- fun(Section, _Key, _Value, _Persist) -> Fun(Section) end;
- _ when is_function(Fun, 2) ->
- fun(Section, Key, _Value, _Persist) -> Fun(Section, Key) end;
- _ when is_function(Fun, 3) ->
- fun(Section, Key, Value, _Persist) -> Fun(Section, Key, Value) end;
- _ when is_function(Fun, 4) ->
- Fun
- end,
- {reply, ok, Config#config{notify_funs=[{Pid, Fun2} | PidFuns]}}.
+ Event = {config_change, Sec, Key, deleted, Persist},
+ gen_event:sync_notify(couch_config_event, Event),
+ {reply, ok, Config}.
handle_cast(stop, State) ->
@@ -163,10 +144,9 @@ handle_cast(stop, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _, _, DownPid, _}, #config{notify_funs=PidFuns}=Config) ->
- % remove any funs registered by the downed process
- FilteredPidFuns = [{Pid,Fun} || {Pid,Fun} <- PidFuns, Pid /= DownPid],
- {noreply, Config#config{notify_funs=FilteredPidFuns}}.
+handle_info(Info, State) ->
+ ?LOG_ERROR("couch_config:handle_info Info: ~p~n", [Info]),
+ {noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
diff --git a/apps/couch/src/couch_config_event.erl b/apps/couch/src/couch_config_event.erl
new file mode 100644
index 00000000..e353c7d8
--- /dev/null
+++ b/apps/couch/src/couch_config_event.erl
@@ -0,0 +1,46 @@
+-module(couch_config_event).
+-behaviour(gen_event).
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0, register/2]).
+
+-include("couch_db.hrl").
+
+start_link() ->
+ gen_event:start_link({local, ?MODULE}).
+
+register(Fun, Pid) ->
+ gen_event:add_handler(?MODULE, {?MODULE, Fun}, [Fun, Pid]).
+
+init([Fun, Pid]) ->
+ Ref = erlang:monitor(process, Pid),
+ {ok, {Fun, Ref}}.
+
+handle_event({config_change,Sec,_,_,_}, {F,_}=St) when is_function(F,1) ->
+ catch F(Sec),
+ {ok, St};
+handle_event({config_change,Sec,K,_,_}, {F,_}=St) when is_function(F,2) ->
+ catch F(Sec,K),
+ {ok, St};
+handle_event({config_change,Sec,K,V,_}, {F,_}=St) when is_function(F,3) ->
+ catch F(Sec,K,V),
+ {ok, St};
+handle_event({config_change,Sec,K,V,Write}, {F,_}=St) when is_function(F,4) ->
+ catch F(Sec,K,V,Write),
+ {ok, St}.
+
+handle_call(_Request, St) ->
+ {ok, ok, St}.
+
+handle_info({'DOWN', Ref, _, _, _}, {_, Ref}) ->
+ remove_handler;
+handle_info(_Info, St) ->
+ {ok, St}.
+
+terminate(Reason, St) ->
+ ?LOG_INFO("config_event handler ~p terminating with ~p", [St, Reason]),
+ ok.
+
+code_change(_OldVsn, St, _Extra) ->
+ {ok, St}.
diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl
index 7678f6ca..a3112e24 100644
--- a/apps/couch/src/couch_db.erl
+++ b/apps/couch/src/couch_db.erl
@@ -11,7 +11,6 @@
% the License.
-module(couch_db).
--behaviour(gen_server).
-export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]).
-export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]).
@@ -22,21 +21,20 @@
-export([enum_docs/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
--export([start_link/3,open_doc_int/3,ensure_full_commit/1]).
+-export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
-export([set_security/2,get_security/1]).
--export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]).
--export([check_is_admin/1, check_is_reader/1]).
+-export([check_is_admin/1, check_is_reader/1, get_doc_count/1, load_validation_funs/1]).
-include("couch_db.hrl").
-
start_link(DbName, Filepath, Options) ->
case open_db_file(Filepath, Options) of
{ok, Fd} ->
- StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []),
+ {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
+ Filepath, Fd, Options}, []),
unlink(Fd),
- StartResult;
+ gen_server:call(UpdaterPid, get_db);
Else ->
Else
end.
@@ -52,7 +50,7 @@ open_db_file(Filepath, Options) ->
{ok, Fd} ->
?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]),
ok = file:rename(Filepath ++ ".compact", Filepath),
- ok = couch_file:sync(Fd),
+ ok = couch_file:sync(Filepath),
{ok, Fd};
{error, enoent} ->
{not_found, no_db_file}
@@ -86,24 +84,33 @@ open(DbName, Options) ->
Else -> Else
end.
-ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) ->
- ok = gen_server:call(UpdatePid, full_commit, infinity),
+ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
+ ok = gen_server:call(Pid, full_commit, infinity),
+ {ok, StartTime}.
+
+ensure_full_commit(Db, RequiredSeq) ->
+ #db{main_pid=Pid, instance_start_time=StartTime} = Db,
+ ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
{ok, StartTime}.
-close(#db{fd_ref_counter=RefCntr}) ->
- couch_ref_counter:drop(RefCntr).
+close(#db{fd_monitor=RefCntr}) ->
+ erlang:demonitor(RefCntr).
open_ref_counted(MainPid, OpenedPid) ->
gen_server:call(MainPid, {open_ref_count, OpenedPid}).
-is_idle(MainPid) ->
- gen_server:call(MainPid, is_idle).
+is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
+ {monitored_by, Pids} = erlang:process_info(Db#db.fd, monitored_by),
+ (Pids -- [Db#db.main_pid, whereis(couch_stats_collector)]) =:= [];
+is_idle(_Db) ->
+ false.
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
-start_compact(#db{update_pid=Pid}) ->
- gen_server:call(Pid, start_compact).
+start_compact(#db{main_pid=Pid}) ->
+ {ok, _} = gen_server:call(Pid, start_compact),
+ ok.
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
@@ -210,13 +217,13 @@ get_full_doc_info(Db, Id) ->
Result.
get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
+ couch_btree:lookup(Db#db.id_tree, Ids).
-increment_update_seq(#db{update_pid=UpdatePid}) ->
- gen_server:call(UpdatePid, increment_update_seq).
+increment_update_seq(#db{main_pid=Pid}) ->
+ gen_server:call(Pid, increment_update_seq).
-purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
- gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
+purge_docs(#db{main_pid=Pid}, IdsRevs) ->
+ gen_server:call(Pid, {purge_docs, IdsRevs}).
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
@@ -232,13 +239,17 @@ get_last_purged(#db{header=#db_header{purged_docs=nil}}) ->
get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) ->
couch_file:pread_term(Fd, PurgedPointer).
+get_doc_count(Db) ->
+ {ok, {Count, _DelCount}} = couch_btree:full_reduce(Db#db.id_tree),
+ {ok, Count}.
+
get_db_info(Db) ->
#db{fd=Fd,
header=#db_header{disk_version=DiskVersion},
compactor_pid=Compactor,
update_seq=SeqNum,
name=Name,
- fulldocinfo_by_id_btree=FullDocBtree,
+ id_tree=FullDocBtree,
instance_start_time=StartTime,
committed_update_seq=CommittedUpdateSeq} = Db,
{ok, Size} = couch_file:bytes(Fd),
@@ -257,7 +268,12 @@ get_db_info(Db) ->
],
{ok, InfoList}.
-get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) ->
+get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
+ {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
+ receive {'DOWN', Ref, _, _, Response} ->
+ Response
+ end;
+get_design_docs(#db{id_tree=Btree}=Db) ->
{ok,_, Docs} = couch_btree:fold(Btree,
fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) ->
{ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []),
@@ -319,7 +335,7 @@ get_readers(#db{security=SecProps}) ->
get_security(#db{security=SecProps}) ->
{SecProps}.
-set_security(#db{update_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
+set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
ok = validate_security_object(NewSecProps),
ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
@@ -354,7 +370,7 @@ validate_names_and_roles({Props}) when is_list(Props) ->
get_revs_limit(#db{revs_limit=Limit}) ->
Limit.
-set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 ->
+set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
set_revs_limit(_Db, _Limit) ->
@@ -406,6 +422,9 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) ->
catch check_is_admin(Db);
+validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
+ ValidationFuns = load_validation_funs(Db),
+ validate_doc_update(Db#db{validate_doc_funs = ValidationFuns}, Doc, Fun);
validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
ok;
validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
@@ -424,6 +443,27 @@ validate_doc_update(Db, Doc, GetDiskDocFun) ->
Error
end.
+% to be safe, spawn a middleman here
+load_validation_funs(#db{main_pid = Pid} = Db) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ {ok, DesignDocs} = get_design_docs(Db),
+ exit({ok, lists:flatmap(fun(DesignDoc) ->
+ case couch_doc:get_validate_doc_fun(DesignDoc) of
+ nil ->
+ [];
+ Fun ->
+ [Fun]
+ end
+ end, DesignDocs)})
+ end),
+ receive
+ {'DOWN', Ref, _, _, {ok, Funs}} ->
+ gen_server:cast(Pid, {load_validation_funs, Funs}),
+ Funs;
+ {'DOWN', Ref, _, _, Reason} ->
+ ?LOG_ERROR("could not load validation funs ~p", [Reason]),
+ throw(internal_server_error)
+ end.
prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
OldFullDocInfo, LeafRevsDict, AllowConflict) ->
@@ -512,8 +552,8 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets],
[PreppedBucket | AccPrepped], AccErrors3).
-update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
- update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit).
+update_docs(Db, Docs, Options) ->
+ update_docs(Db, Docs, Options, interactive_edit).
prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
@@ -743,37 +783,37 @@ set_commit_option(Options) ->
[full_commit|Options]
end.
-collect_results(UpdatePid, MRef, ResultsAcc) ->
+collect_results(Pid, MRef, ResultsAcc) ->
receive
- {result, UpdatePid, Result} ->
- collect_results(UpdatePid, MRef, [Result | ResultsAcc]);
- {done, UpdatePid} ->
+ {result, Pid, Result} ->
+ collect_results(Pid, MRef, [Result | ResultsAcc]);
+ {done, Pid} ->
{ok, ResultsAcc};
- {retry, UpdatePid} ->
+ {retry, Pid} ->
retry;
{'DOWN', MRef, _, _, Reason} ->
exit(Reason)
end.
-write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
+write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets,
NonRepDocs, Options0) ->
Options = set_commit_option(Options0),
MergeConflicts = lists:member(merge_conflicts, Options),
FullCommit = lists:member(full_commit, Options),
- MRef = erlang:monitor(process, UpdatePid),
+ MRef = erlang:monitor(process, Pid),
try
- UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
- case collect_results(UpdatePid, MRef, []) of
+ Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
- {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
+ {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
% We only retry once
close(Db2),
- UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
- case collect_results(UpdatePid, MRef, []) of
+ Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
+ case collect_results(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry -> throw({update_error, compaction_retry})
end
@@ -921,9 +961,15 @@ enum_docs_reduce_to_count(Reds) ->
changes_since(Db, Style, StartSeq, Fun, Acc) ->
changes_since(Db, Style, StartSeq, Fun, [], Acc).
-
+
changes_since(Db, Style, StartSeq, Fun, Options, Acc) ->
- Wrapper = fun(DocInfo, _Offset, Acc2) ->
+ Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
+ case FullDocInfo of
+ #full_doc_info{} ->
+ DocInfo = couch_doc:to_doc_info(FullDocInfo);
+ #doc_info{} ->
+ DocInfo = FullDocInfo
+ end,
#doc_info{revs=Revs} = DocInfo,
DocInfo2 =
case Style of
@@ -936,83 +982,27 @@ changes_since(Db, Style, StartSeq, Fun, Options, Acc) ->
end,
Fun(DocInfo2, Acc2)
end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree,
- Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
+ {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, Wrapper,
+ Acc, [{start_key, couch_util:to_integer(StartSeq) + 1} | Options]),
{ok, AccOut}.
count_changes_since(Db, SinceSeq) ->
{ok, Changes} =
- couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree,
+ couch_btree:fold_reduce(Db#db.seq_tree,
fun(_SeqStart, PartialReds, 0) ->
- {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)}
+ {ok, couch_btree:final_reduce(Db#db.seq_tree, PartialReds)}
end,
0, [{start_key, SinceSeq + 1}]),
Changes.
enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
- {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
+ {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]),
{ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
enum_docs(Db, InFun, InAcc, Options) ->
- {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree, InFun, InAcc, Options),
+ {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.id_tree, InFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
-% server functions
-
-init({DbName, Filepath, Fd, Options}) ->
- {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
- {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db),
- couch_ref_counter:add(RefCntr),
- case lists:member(sys_db, Options) of
- true ->
- ok;
- false ->
- couch_stats_collector:track_process_count({couchdb, open_databases})
- end,
- process_flag(trap_exit, true),
- {ok, Db}.
-
-terminate(_Reason, Db) ->
- couch_util:shutdown_sync(Db#db.update_pid),
- ok.
-
-handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) ->
- ok = couch_ref_counter:add(RefCntr, OpenerPid),
- {reply, {ok, Db}, Db};
-handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact,
- waiting_delayed_commit=Delay}=Db) ->
- % Idle means no referrers. Unless in the middle of a compaction file switch,
- % there are always at least 2 referrers, couch_db_updater and us.
- {reply, (Delay == nil) andalso (Compact == nil) andalso (couch_ref_counter:count(RefCntr) == 2), Db};
-handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) ->
- #db{fd_ref_counter=NewRefCntr}=NewDb,
- case NewRefCntr =:= OldRefCntr of
- true -> ok;
- false ->
- couch_ref_counter:add(NewRefCntr),
- couch_ref_counter:drop(OldRefCntr)
- end,
- {reply, ok, NewDb};
-handle_call(get_db, _From, Db) ->
- {reply, {ok, Db}, Db}.
-
-
-handle_cast(Msg, Db) ->
- ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]),
- exit({error, Msg}).
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-handle_info({'EXIT', _Pid, normal}, Db) ->
- {noreply, Db};
-handle_info({'EXIT', _Pid, Reason}, Server) ->
- {stop, Reason, Server};
-handle_info(Msg, Db) ->
- ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
- exit({error, Msg}).
-
-
%%% Internal function %%%
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
@@ -1054,7 +1044,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) ->
- case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of
+ case couch_btree:lookup(Db#db.local_tree, [Id]) of
[{ok, {_, {Rev, BodyData}}}] ->
{ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}};
[not_found] ->
diff --git a/apps/couch/src/couch_db_update_notifier_sup.erl b/apps/couch/src/couch_db_update_notifier_sup.erl
index 4d730fc7..e7cc16c1 100644
--- a/apps/couch/src/couch_db_update_notifier_sup.erl
+++ b/apps/couch/src/couch_db_update_notifier_sup.erl
@@ -22,16 +22,14 @@
-behaviour(supervisor).
--export([start_link/0,init/1]).
+-export([start_link/0, init/1, config_change/3]).
start_link() ->
supervisor:start_link({local, couch_db_update_notifier_sup},
couch_db_update_notifier_sup, []).
init([]) ->
- ok = couch_config:register(
- fun("update_notification", Key, Value) -> reload_config(Key, Value) end
- ),
+ ok = couch_config:register(fun ?MODULE:config_change/3),
UpdateNotifierExes = couch_config:get("update_notification"),
@@ -48,7 +46,7 @@ init([]) ->
%% @doc when update_notification configuration changes, terminate the process
%% for that notifier and start a new one with the updated config
-reload_config(Id, Exe) ->
+config_change("update_notification", Id, Exe) ->
ChildSpec = {
Id,
{couch_db_update_notifier, start_link, [Exe]},
diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl
index 19a4c165..e4f8d0ca 100644
--- a/apps/couch/src/couch_db_updater.erl
+++ b/apps/couch/src/couch_db_updater.erl
@@ -13,14 +13,14 @@
-module(couch_db_updater).
-behaviour(gen_server).
--export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
+-export([btree_by_id_split/1,btree_by_id_join/2,btree_by_id_reduce/2]).
+-export([btree_by_seq_split/1,btree_by_seq_join/2,btree_by_seq_reduce/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-include("couch_db.hrl").
-init({MainPid, DbName, Filepath, Fd, Options}) ->
- process_flag(trap_exit, true),
+init({DbName, Filepath, Fd, Options}) ->
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
@@ -44,25 +44,40 @@ init({MainPid, DbName, Filepath, Fd, Options}) ->
end,
Db = init_db(DbName, Filepath, Fd, Header),
- Db2 = refresh_validate_doc_funs(Db),
- {ok, Db2#db{main_pid = MainPid, is_sys_db = lists:member(sys_db, Options)}}.
+ couch_stats_collector:track_process_count({couchdb, open_databases}),
+ % we don't load validation funs here because the fabric query is liable to
+ % race conditions. Instead see couch_db:validate_doc_update, which loads
+ % them lazily
+ {ok, Db#db{main_pid = self(), is_sys_db = lists:member(sys_db, Options)}}.
terminate(_Reason, Db) ->
couch_file:close(Db#db.fd),
couch_util:shutdown_sync(Db#db.compactor_pid),
- couch_util:shutdown_sync(Db#db.fd_ref_counter),
+ couch_util:shutdown_sync(Db#db.fd),
ok.
+handle_call(start_compact, _From, Db) ->
+ {noreply, NewDb} = handle_cast(start_compact, Db),
+ {reply, {ok, NewDb#db.compactor_pid}, NewDb};
+
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
+ {reply, ok, Db};
+handle_call(full_commit, _From, Db) ->
+ {reply, ok, commit_data(Db)};
+
+handle_call({full_commit, _}, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
+handle_call({full_commit, RequiredSeq}, _From, Db) when RequiredSeq =<
+ Db#db.committed_update_seq ->
+ {reply, ok, Db};
+handle_call({full_commit, _}, _, Db) ->
{reply, ok, commit_data(Db)}; % commit the data and return ok
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_db_update_notifier:notify({updated, Db#db.name}),
{reply, {ok, Db2#db.update_seq}, Db2};
@@ -70,13 +85,13 @@ handle_call({set_security, NewSec}, _From, Db) ->
{ok, Ptr} = couch_file:append_term(Db#db.fd, NewSec),
Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
handle_call({set_revs_limit, Limit}, _From, Db) ->
Db2 = commit_data(Db#db{revs_limit=Limit,
update_seq=Db#db.update_seq+1}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
handle_call({purge_docs, _IdRevs}, _From,
@@ -85,8 +100,8 @@ handle_call({purge_docs, _IdRevs}, _From,
handle_call({purge_docs, IdRevs}, _From, Db) ->
#db{
fd=Fd,
- fulldocinfo_by_id_btree = DocInfoByIdBTree,
- docinfo_by_seq_btree = DocInfoBySeqBTree,
+ id_tree = DocInfoByIdBTree,
+ seq_tree = DocInfoBySeqBTree,
update_seq = LastSeq,
header = Header = #db_header{purge_seq=PurgeSeq}
} = Db,
@@ -136,29 +151,32 @@ handle_call({purge_docs, IdRevs}, _From, Db) ->
Db2 = commit_data(
Db#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree2,
- docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ id_tree = DocInfoByIdBTree2,
+ seq_tree = DocInfoBySeqBTree2,
update_seq = NewSeq + 1,
header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}),
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_db_update_notifier:notify({updated, Db#db.name}),
- {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2};
-handle_call(start_compact, _From, Db) ->
+ {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}.
+
+
+handle_cast({load_validation_funs, ValidationFuns}, Db) ->
+ Db2 = Db#db{validate_doc_funs = ValidationFuns},
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {noreply, Db2};
+handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
nil ->
?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
Pid = spawn_link(fun() -> start_copy_compact(Db) end),
Db2 = Db#db{compactor_pid=Pid},
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
- {reply, ok, Db2};
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
+ {noreply, Db2};
_ ->
% compact currently running, this is a no-op
- {reply, ok, Db}
- end.
-
-
-
+ {noreply, Db}
+ end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
{ok, NewHeader} = couch_file:read_header(NewFd),
@@ -168,13 +186,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
case Db#db.update_seq == NewSeq of
true ->
% suck up all the local docs into memory and write them to the new db
- {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+ {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree,
fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
- {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs),
+ {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs),
NewDb2 = commit_data(NewDb#db{
- local_docs_btree = NewLocalBtree,
- main_pid = Db#db.main_pid,
+ local_tree = NewLocalBtree,
+ main_pid = self(),
filepath = Filepath,
instance_start_time = Db#db.instance_start_time,
revs_limit = Db#db.revs_limit
@@ -186,13 +204,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
couch_file:delete(RootDir, Filepath),
ok = file:rename(CompactFilepath, Filepath),
close_db(Db),
- ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
+ ok = gen_server:call(couch_server, {db_updated, NewDb2}, infinity),
?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]),
{noreply, NewDb2#db{compactor_pid=nil}};
false ->
- ?LOG_INFO("Compaction file still behind main file "
+ ?LOG_INFO("Compaction for ~s still behind main file "
"(update seq=~p. compact update seq=~p). Retrying.",
- [Db#db.update_seq, NewSeq]),
+ [Db#db.name, Db#db.update_seq, NewSeq]),
close_db(NewDb),
Pid = spawn_link(fun() -> start_copy_compact(Db) end),
Db2 = Db#db{compactor_pid=Pid},
@@ -215,7 +233,7 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
FullCommit2) of
{ok, Db2} ->
- ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
if Db2#db.update_seq /= Db#db.update_seq ->
couch_db_update_notifier:notify({updated, Db2#db.name});
true -> ok
@@ -235,13 +253,16 @@ handle_info(delayed_commit, Db) ->
Db ->
{noreply, Db};
Db2 ->
- ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+ ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2}
end;
handle_info({'EXIT', _Pid, normal}, Db) ->
{noreply, Db};
handle_info({'EXIT', _Pid, Reason}, Db) ->
- {stop, Reason, Db}.
+ {stop, Reason, Db};
+handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
+ ?LOG_ERROR("DB ~s shutting down - Fd ~p", [Name, Reason]),
+ {stop, normal, Db}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -279,14 +300,27 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
end.
-btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) ->
- RevInfos = [{Rev, Seq, Bp} ||
- #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs],
- DeletedRevInfos = [{Rev, Seq, Bp} ||
- #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs],
- {KeySeq,{Id, RevInfos, DeletedRevInfos}}.
+rev_tree(DiskTree) ->
+ couch_key_tree:map(fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
+ {IsDeleted == 1, BodyPointer, UpdateSeq};
+ (_RevId, ?REV_MISSING) ->
+ ?REV_MISSING
+ end, DiskTree).
+
+disk_tree(RevTree) ->
+ couch_key_tree:map(fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
+ {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq};
+ (_RevId, ?REV_MISSING) ->
+ ?REV_MISSING
+ end, RevTree).
+btree_by_seq_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Del, rev_tree=T}) ->
+ {Seq, {Id, if Del -> 1; true -> 0 end, disk_tree(T)}}.
+
+btree_by_seq_join(Seq, {Id, Del, T}) when is_integer(Del) ->
+ #full_doc_info{id=Id, update_seq=Seq, deleted=Del==1, rev_tree=rev_tree(T)};
btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
+ % 1.0 stored #doc_info records in the seq tree. compact to upgrade.
#doc_info{
id = Id,
high_seq=KeySeq,
@@ -310,14 +344,7 @@ btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) ->
btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
deleted=Deleted, rev_tree=Tree}) ->
- DiskTree =
- couch_key_tree:map(
- fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) ->
- {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq};
- (_RevId, ?REV_MISSING) ->
- ?REV_MISSING
- end, Tree),
- {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}.
+ {Id, {Seq, if Deleted -> 1; true -> 0 end, disk_tree(Tree)}}.
btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
Tree =
@@ -377,19 +404,19 @@ init_db(DbName, Filepath, Fd, Header0) ->
"[before_header, after_header, on_file_open]")),
case lists:member(on_file_open, FsyncOptions) of
- true -> ok = couch_file:sync(Fd);
+ true -> ok = couch_file:sync(Filepath);
_ -> ok
end,
- {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
- [{split, fun(X) -> btree_by_id_split(X) end},
- {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
- {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]),
- {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
- [{split, fun(X) -> btree_by_seq_split(X) end},
- {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
- {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]),
- {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
+ {ok, IdBtree} = couch_btree:open(Header#db_header.id_tree_state, Fd,
+ [{split, fun ?MODULE:btree_by_id_split/1},
+ {join, fun ?MODULE:btree_by_id_join/2},
+ {reduce, fun ?MODULE:btree_by_id_reduce/2}]),
+ {ok, SeqBtree} = couch_btree:open(Header#db_header.seq_tree_state, Fd,
+ [{split, fun ?MODULE:btree_by_seq_split/1},
+ {join, fun ?MODULE:btree_by_seq_join/2},
+ {reduce, fun ?MODULE:btree_by_seq_reduce/2}]),
+ {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_tree_state, Fd),
case Header#db_header.security_ptr of
nil ->
Security = [],
@@ -401,15 +428,13 @@ init_db(DbName, Filepath, Fd, Header0) ->
{MegaSecs, Secs, MicroSecs} = now(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
- {ok, RefCntr} = couch_ref_counter:start([Fd]),
#db{
- update_pid=self(),
fd=Fd,
- fd_ref_counter = RefCntr,
+ fd_monitor = erlang:monitor(process,Fd),
header=Header,
- fulldocinfo_by_id_btree = IdBtree,
- docinfo_by_seq_btree = SeqBtree,
- local_docs_btree = LocalDocsBtree,
+ id_tree = IdBtree,
+ seq_tree = SeqBtree,
+ local_tree = LocalDocsBtree,
committed_update_seq = Header#db_header.update_seq,
update_seq = Header#db_header.update_seq,
name = DbName,
@@ -422,8 +447,8 @@ init_db(DbName, Filepath, Fd, Header0) ->
}.
-close_db(#db{fd_ref_counter = RefCntr}) ->
- couch_ref_counter:drop(RefCntr).
+close_db(#db{fd_monitor = Ref}) ->
+ erlang:demonitor(Ref).
refresh_validate_doc_funs(Db) ->
@@ -435,7 +460,13 @@ refresh_validate_doc_funs(Db) ->
Fun -> [Fun]
end
end, DesignDocs),
- Db#db{validate_doc_funs=ProcessDocFuns}.
+ case Db#db.name of
+ <<"shards/", _:18/binary, DbName/binary>> ->
+ fabric:reset_validation_funs(DbName),
+ Db#db{validate_doc_funs=ProcessDocFuns};
+ _ ->
+ Db#db{validate_doc_funs=ProcessDocFuns}
+ end.
% rev tree functions
@@ -563,14 +594,11 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList],
-new_index_entries([], AccById, AccBySeq) ->
- {AccById, AccBySeq};
-new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) ->
- #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo =
- couch_doc:to_doc_info(FullDocInfo),
- new_index_entries(RestInfos,
- [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
- [DocInfo|AccBySeq]).
+new_index_entries([], Acc) ->
+ Acc;
+new_index_entries([Info|Rest], Acc) ->
+ #doc_info{revs=[#rev_info{deleted=Del}|_]} = couch_doc:to_doc_info(Info),
+ new_index_entries(Rest, [Info#full_doc_info{deleted=Del}|Acc]).
stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
@@ -579,8 +607,8 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree,
- docinfo_by_seq_btree = DocInfoBySeqBTree,
+ id_tree = DocInfoByIdBTree,
+ seq_tree = DocInfoBySeqBTree,
update_seq = LastSeq
} = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
@@ -607,16 +635,17 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
% the trees, the attachments are already written to disk)
{ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
- {IndexFullDocInfos, IndexDocInfos} =
- new_index_entries(FlushedFullDocInfos, [], []),
+ IndexInfos = new_index_entries(FlushedFullDocInfos, []),
% and the indexes
- {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
- {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs),
+ {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
+ IndexInfos, []),
+ {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
+ IndexInfos, RemoveSeqs),
Db3 = Db2#db{
- fulldocinfo_by_id_btree = DocInfoByIdBTree2,
- docinfo_by_seq_btree = DocInfoBySeqBTree2,
+ id_tree = DocInfoByIdBTree2,
+ seq_tree = DocInfoBySeqBTree2,
update_seq = NewSeq},
% Check if we just updated any design documents, and update the validation
@@ -631,24 +660,26 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
{ok, commit_data(Db4, not FullCommit)}.
-update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
Ids = [Id || {_Client, #doc{id=Id}} <- Docs],
OldDocLookups = couch_btree:lookup(Btree, Ids),
BtreeEntries = lists:zipwith(
- fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) ->
+ fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}},
+ _OldDocLookup) ->
case PrevRevs of
[RevStr|_] ->
PrevRev = list_to_integer(?b2l(RevStr));
[] ->
PrevRev = 0
end,
- OldRev =
- case OldDocLookup of
- {ok, {_, {OldRev0, _}}} -> OldRev0;
- not_found -> 0
- end,
- case OldRev == PrevRev of
- true ->
+ %% disabled conflict checking for local docs -- APK 16 June 2010
+ % OldRev =
+ % case OldDocLookup of
+ % {ok, {_, {OldRev0, _}}} -> OldRev0;
+ % not_found -> 0
+ % end,
+ % case OldRev == PrevRev of
+ % true ->
case Delete of
false ->
send_result(Client, Id, {0, PrevRevs}, {ok,
@@ -658,11 +689,11 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
send_result(Client, Id, {0, PrevRevs},
{ok, {0, <<"0">>}}),
{remove, Id}
- end;
- false ->
- send_result(Client, Id, {0, PrevRevs}, conflict),
- ignore
- end
+ end%;
+ % false ->
+ % send_result(Client, Id, {0, PrevRevs}, conflict),
+ % ignore
+ % end
end, Docs, OldDocLookups),
BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
@@ -671,7 +702,7 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
- {ok, Db#db{local_docs_btree = Btree2}}.
+ {ok, Db#db{local_tree = Btree2}}.
commit_data(Db) ->
@@ -680,9 +711,9 @@ commit_data(Db) ->
db_to_header(Db, Header) ->
Header#db_header{
update_seq = Db#db.update_seq,
- docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
- fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
- local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+ seq_tree_state = couch_btree:get_state(Db#db.seq_tree),
+ id_tree_state = couch_btree:get_state(Db#db.id_tree),
+ local_tree_state = couch_btree:get_state(Db#db.local_tree),
security_ptr = Db#db.security_ptr,
revs_limit = Db#db.revs_limit}.
@@ -771,31 +802,36 @@ copy_rev_tree_attachments(SrcDb, DestFd, Tree) ->
(_, _, branch) ->
?REV_MISSING
end, Tree).
-
-copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
- Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
- LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+merge_lookups(Infos, []) ->
+ Infos;
+merge_lookups([], _) ->
+ [];
+merge_lookups([#doc_info{}|RestInfos], [{ok, FullDocInfo}|RestLookups]) ->
+ [FullDocInfo|merge_lookups(RestInfos, RestLookups)];
+merge_lookups([FullDocInfo|RestInfos], Lookups) ->
+ [FullDocInfo|merge_lookups(RestInfos, Lookups)].
+
+copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) ->
+ % lookup any necessary full_doc_infos
+ DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
+ LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
+ Infos = merge_lookups(MixedInfos, LookupResults),
% write out the attachments
- NewFullDocInfos0 = lists:map(
- fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)}
- end, LookupResults),
+ NewInfos0 = [Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db,
+ DestFd, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos],
+
% write out the docs
% we do this in 2 stages so the docs are written out contiguously, making
% view indexing and replication faster.
- NewFullDocInfos1 = lists:map(
- fun(#full_doc_info{rev_tree=RevTree}=Info) ->
- Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
- fun(_Key, {IsDel, DocBody, Seq}) ->
- {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
- {IsDel, Pos, Seq}
- end, RevTree)}
- end, NewFullDocInfos0),
-
- NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1),
- NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
+ NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map_leafs(
+ fun(_Key, {IsDel, DocBody, Seq}) ->
+ {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody),
+ {IsDel, Pos, Seq}
+ end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- NewInfos0],
+
+ NewInfos = stem_full_doc_infos(Db, NewInfos1),
RemoveSeqs =
case Retry of
false ->
@@ -803,16 +839,16 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
true ->
% We are retrying a compaction, meaning the documents we are copying may
% already exist in our file and must be removed from the by_seq index.
- Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids),
+ Ids = [Id || #full_doc_info{id=Id} <- Infos],
+ Existing = couch_btree:lookup(NewDb#db.id_tree, Ids),
[Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
end,
- {ok, DocInfoBTree} = couch_btree:add_remove(
- NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs),
- {ok, FullDocInfoBTree} = couch_btree:add_remove(
- NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
- NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree,
- docinfo_by_seq_btree=DocInfoBTree}.
+ {ok, SeqTree} = couch_btree:add_remove(
+ NewDb#db.seq_tree, NewInfos, RemoveSeqs),
+ {ok, IdTree} = couch_btree:add_remove(
+ NewDb#db.id_tree, NewInfos, []),
+ NewDb#db{id_tree=IdTree, seq_tree=SeqTree}.
@@ -821,13 +857,20 @@ copy_compact(Db, NewDb0, Retry) ->
NewDb = NewDb0#db{fsync_options=FsyncOptions},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
EnumBySeqFun =
- fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
+ fun(DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
+ case DocInfo of
+ #full_doc_info{update_seq=Seq} ->
+ ok;
+ #doc_info{high_seq=Seq} ->
+ ok
+ end,
couch_task_status:update("Copied ~p of ~p changes (~p%)",
[TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
if TotalCopied rem 1000 =:= 0 ->
NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
if TotalCopied rem 10000 =:= 0 ->
- {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}};
+ NewDb3 = commit_data(NewDb2#db{update_seq=Seq}),
+ {ok, {NewDb3, [], TotalCopied + 1}};
true ->
{ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
end;
@@ -839,7 +882,7 @@ copy_compact(Db, NewDb0, Retry) ->
couch_task_status:set_update_frequency(500),
{ok, _, {NewDb2, Uncopied, TotalChanges}} =
- couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun,
+ couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
{NewDb, [], 0},
[{start_key, NewDb#db.update_seq + 1}]),
diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl
index d15cd7de..d47f85ef 100644
--- a/apps/couch/src/couch_doc.erl
+++ b/apps/couch/src/couch_doc.erl
@@ -334,6 +334,8 @@ att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)->
))
).
+get_validate_doc_fun({Props}) ->
+ get_validate_doc_fun(couch_doc:from_json_obj({Props}));
get_validate_doc_fun(#doc{body={Props}}=DDoc) ->
case couch_util:get_value(<<"validate_doc_update">>, Props) of
undefined ->
@@ -364,7 +366,7 @@ merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) ->
DiskAtt;
_ ->
throw({missing_stub,
- <<"id:", Id/binary, ", name:", Name/binary>>})
+ <<"Invalid attachment stub in ", Id/binary, " for ", Name/binary>>})
end;
(Att) ->
Att
@@ -453,15 +455,11 @@ doc_from_multi_part_stream(ContentType, DataFun) ->
receive
{doc_bytes, DocBytes} ->
Doc = from_json_obj(?JSON_DECODE(DocBytes)),
- % go through the attachments looking for 'follows' in the data,
- % replace with function that reads the data from MIME stream.
- ReadAttachmentDataFun = fun() ->
- Parser ! {get_bytes, self()},
- receive {bytes, Bytes} -> Bytes end
- end,
+ % we'll send the Parser process ID to the remote nodes so they can
+ % retrieve their own copies of the attachment data
Atts2 = lists:map(
fun(#att{data=follows}=A) ->
- A#att{data=ReadAttachmentDataFun};
+ A#att{data={follows, Parser}};
(A) ->
A
end, Doc#doc.atts),
@@ -484,25 +482,75 @@ mp_parse_doc(body_end, AccBytes) ->
From ! {doc_bytes, lists:reverse(AccBytes)}
end,
fun (Next) ->
- mp_parse_atts(Next)
+ mp_parse_atts(Next, {[], 0, orddict:new(), []})
end.
-mp_parse_atts(eof) ->
- ok;
-mp_parse_atts({headers, _H}) ->
- fun (Next) ->
- mp_parse_atts(Next)
- end;
-mp_parse_atts({body, Bytes}) ->
- receive {get_bytes, From} ->
- From ! {bytes, Bytes}
- end,
- fun (Next) ->
- mp_parse_atts(Next)
- end;
-mp_parse_atts(body_end) ->
- fun (Next) ->
- mp_parse_atts(Next)
+mp_parse_atts({headers, _}, Acc) ->
+ fun(Next) -> mp_parse_atts(Next, Acc) end;
+mp_parse_atts(body_end, Acc) ->
+ fun(Next) -> mp_parse_atts(Next, Acc) end;
+mp_parse_atts({body, Bytes}, {DataList, Offset, Counters, Waiting}) ->
+ NewAcc = maybe_send_data({DataList++[Bytes], Offset, Counters, Waiting}),
+ fun(Next) -> mp_parse_atts(Next, NewAcc) end;
+mp_parse_atts(eof, {DataList, Offset, Counters, Waiting}) ->
+ N = list_to_integer(couch_config:get("cluster", "n", "3")),
+ M = length(Counters),
+ case (M == N) andalso DataList == [] of
+ true ->
+ ok;
+ false ->
+ receive {get_bytes, From} ->
+ C2 = orddict:update_counter(From, 1, Counters),
+ NewAcc = maybe_send_data({DataList, Offset, C2, [From|Waiting]}),
+ mp_parse_atts(eof, NewAcc)
+ after 3600000 ->
+ ok
+ end
end.
+maybe_send_data({ChunkList, Offset, Counters, Waiting}) ->
+ receive {get_bytes, From} ->
+ NewCounters = orddict:update_counter(From, 1, Counters),
+ maybe_send_data({ChunkList, Offset, NewCounters, [From|Waiting]})
+ after 0 ->
+ % reply to as many writers as possible
+ NewWaiting = lists:filter(fun(Writer) ->
+ WhichChunk = orddict:fetch(Writer, Counters),
+ ListIndex = WhichChunk - Offset,
+ if ListIndex =< length(ChunkList) ->
+ Writer ! {bytes, lists:nth(ListIndex, ChunkList)},
+ false;
+ true ->
+ true
+ end
+ end, Waiting),
+ % check if we can drop a chunk from the head of the list
+ case Counters of
+ [] ->
+ SmallestIndex = 0;
+ _ ->
+ SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
+ end,
+ Size = length(Counters),
+ N = list_to_integer(couch_config:get("cluster", "n", "3")),
+ if Size == N andalso SmallestIndex == (Offset+1) ->
+ NewChunkList = tl(ChunkList),
+ NewOffset = Offset+1;
+ true ->
+ NewChunkList = ChunkList,
+ NewOffset = Offset
+ end,
+
+ % we should wait for a writer if no one has written the last chunk
+ LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
+ if LargestIndex >= (Offset + length(ChunkList)) ->
+ % someone has written all possible chunks, keep moving
+ {NewChunkList, NewOffset, Counters, NewWaiting};
+ true ->
+ receive {get_bytes, X} ->
+ C2 = orddict:update_counter(X, 1, Counters),
+ maybe_send_data({NewChunkList, NewOffset, C2, [X|NewWaiting]})
+ end
+ end
+ end.
diff --git a/apps/couch/src/couch_drv.erl b/apps/couch/src/couch_drv.erl
new file mode 100644
index 00000000..70028659
--- /dev/null
+++ b/apps/couch/src/couch_drv.erl
@@ -0,0 +1,38 @@
+-module(couch_drv).
+-behaviour(gen_server).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([start_link/0]).
+
+-include("couch_db.hrl").
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ case erl_ddll:load(code:priv_dir(couch), "couch_erl_driver") of
+ ok ->
+ {ok, nil};
+ {error, already_loaded} ->
+ ?LOG_INFO("~p reloading couch_erl_driver", [?MODULE]),
+ ok = erl_ddll:reload(code:priv_dir(couch), "couch_erl_driver"),
+ {ok, nil};
+ {error, Error} ->
+ {stop, erl_ddll:format_error(Error)}
+ end.
+
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/apps/couch/src/couch_external_manager.erl b/apps/couch/src/couch_external_manager.erl
index 7e401389..0c66ef8c 100644
--- a/apps/couch/src/couch_external_manager.erl
+++ b/apps/couch/src/couch_external_manager.erl
@@ -39,7 +39,7 @@ config_change("external", UrlName) ->
init([]) ->
process_flag(trap_exit, true),
Handlers = ets:new(couch_external_manager_handlers, [set, private]),
- couch_config:register(fun config_change/2),
+ couch_config:register(fun ?MODULE:config_change/2),
{ok, Handlers}.
terminate(_Reason, Handlers) ->
diff --git a/apps/couch/src/couch_file.erl b/apps/couch/src/couch_file.erl
index 0a891712..2d539f64 100644
--- a/apps/couch/src/couch_file.erl
+++ b/apps/couch/src/couch_file.erl
@@ -69,10 +69,10 @@ open(Filepath, Options) ->
%%----------------------------------------------------------------------
append_term(Fd, Term) ->
- append_binary(Fd, term_to_binary(Term)).
+ append_binary(Fd, term_to_binary(Term, [compressed, {minor_version,1}])).
append_term_md5(Fd, Term) ->
- append_binary_md5(Fd, term_to_binary(Term)).
+ append_binary_md5(Fd, term_to_binary(Term, [compressed, {minor_version,1}])).
%%----------------------------------------------------------------------
@@ -237,6 +237,7 @@ init_status_error(ReturnPid, Ref, Error) ->
init({Filepath, Options, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
+ timer:send_after(60000, maybe_close),
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
@@ -479,6 +480,18 @@ handle_cast(close, Fd) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+handle_info(maybe_close, Fd) ->
+ case process_info(self(), monitored_by) of
+ {monitored_by, [_StatsCollector]} ->
+ {stop, normal, Fd};
+ {monitored_by, []} ->
+ ?LOG_ERROR("~p ~p is un-monitored, maybe stats collector died",
+ [?MODULE, self()]),
+ {stop, normal, Fd};
+ _Else ->
+ timer:send_after(10000, maybe_close),
+ {noreply, Fd}
+ end;
handle_info({'EXIT', _, normal}, Fd) ->
{noreply, Fd};
handle_info({'EXIT', _, Reason}, Fd) ->
diff --git a/apps/couch/src/couch_os_process.erl b/apps/couch/src/couch_os_process.erl
index 5776776b..1fe38e8e 100644
--- a/apps/couch/src/couch_os_process.erl
+++ b/apps/couch/src/couch_os_process.erl
@@ -59,12 +59,12 @@ prompt(Pid, Data) ->
% Utility functions for reading and writing
% in custom functions
-writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
- port_command(OsProc#os_proc.port, Data ++ "\n").
+writeline(#os_proc{port=Port}, Data) ->
+ port_command(Port, Data ++ "\n").
readline(#os_proc{} = OsProc) ->
readline(OsProc, []).
-readline(#os_proc{port = Port} = OsProc, Acc) ->
+readline(#os_proc{port=Port, timeout=Timeout} = OsProc, Acc) ->
receive
{Port, {data, {noeol, Data}}} ->
readline(OsProc, [Data|Acc]);
@@ -73,7 +73,7 @@ readline(#os_proc{port = Port} = OsProc, Acc) ->
{Port, Err} ->
catch port_close(Port),
throw({os_process_error, Err})
- after OsProc#os_proc.timeout ->
+ after Timeout ->
catch port_close(Port),
throw({os_process_error, "OS process timed out."})
end.
@@ -81,12 +81,12 @@ readline(#os_proc{port = Port} = OsProc, Acc) ->
% Standard JSON functions
writejson(OsProc, Data) when is_record(OsProc, os_proc) ->
JsonData = ?JSON_ENCODE(Data),
- ?LOG_DEBUG("OS Process ~p Input :: ~s", [OsProc#os_proc.port, JsonData]),
+ % ?LOG_DEBUG("OS Process ~p Input :: ~s", [OsProc#os_proc.port, JsonData]),
true = writeline(OsProc, JsonData).
-readjson(OsProc) when is_record(OsProc, os_proc) ->
+readjson(#os_proc{} = OsProc) ->
Line = readline(OsProc),
- ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]),
+ % ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]),
case ?JSON_DECODE(Line) of
[<<"log">>, Msg] when is_binary(Msg) ->
% we got a message to log. Log it and continue
@@ -104,9 +104,7 @@ readjson(OsProc) when is_record(OsProc, os_proc) ->
% gen_server API
init([Command, Options, PortOptions]) ->
- process_flag(trap_exit, true),
- PrivDir = couch_util:priv_dir(),
- Spawnkiller = filename:join(PrivDir, "couchspawnkillable"),
+ Spawnkiller = filename:join([code:root_dir(), "bin", "couchspawnkillable"]),
BaseProc = #os_proc{
command=Command,
port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions),
@@ -115,7 +113,7 @@ init([Command, Options, PortOptions]) ->
},
KillCmd = readline(BaseProc),
Pid = self(),
- ?LOG_DEBUG("OS Process Start :: ~p", [BaseProc#os_proc.port]),
+ % ?LOG_DEBUG("OS Process Start :: ~p", [BaseProc#os_proc.port]),
spawn(fun() ->
% this ensure the real os process is killed when this process dies.
erlang:monitor(process, Pid),
diff --git a/apps/couch/src/couch_primary_sup.erl b/apps/couch/src/couch_primary_sup.erl
new file mode 100644
index 00000000..e822b70a
--- /dev/null
+++ b/apps/couch/src/couch_primary_sup.erl
@@ -0,0 +1,42 @@
+-module(couch_primary_sup).
+-behaviour(supervisor).
+-export([init/1, start_link/0]).
+
+start_link() ->
+ supervisor:start_link({local,couch_primary_services}, ?MODULE, []).
+
+init([]) ->
+ Children = [
+ {collation_driver,
+ {couch_drv, start_link, []},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_drv]},
+ {couch_task_status,
+ {couch_task_status, start_link, []},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_task_status]},
+ {couch_server,
+ {couch_server, sup_start_link, []},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_server]},
+ {couch_db_update_event,
+ {gen_event, start_link, [{local, couch_db_update}]},
+ permanent,
+ brutal_kill,
+ worker,
+ dynamic},
+ {couch_replication_supervisor,
+ {couch_rep_sup, start_link, []},
+ permanent,
+ infinity,
+ supervisor,
+ [couch_rep_sup]}
+ ],
+ {ok, {{one_for_one, 10, 3600}, Children}}.
+
diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl
index c4f1bf0b..144b7494 100644
--- a/apps/couch/src/couch_query_servers.erl
+++ b/apps/couch/src/couch_query_servers.erl
@@ -13,7 +13,7 @@
-module(couch_query_servers).
-behaviour(gen_server).
--export([start_link/0]).
+-export([start_link/0, config_change/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
-export([start_doc_map/2, map_docs/2, stop_doc_map/1]).
@@ -21,6 +21,7 @@
-export([filter_docs/5]).
-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
+-export([get_os_process/1, ret_os_process/1]).
% -export([test/0]).
@@ -121,25 +122,19 @@ recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc)
os_reduce(_Lang, [], _KVs) ->
{ok, []};
+os_reduce(#proc{} = Proc, OsRedSrcs, KVs) ->
+ [true, Reductions] = proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]),
+ {ok, Reductions};
os_reduce(Lang, OsRedSrcs, KVs) ->
Proc = get_os_process(Lang),
- OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
- [true, Reductions] -> Reductions
- after
- ok = ret_os_process(Proc)
- end,
- {ok, OsResults}.
+ try os_reduce(Proc, OsRedSrcs, KVs) after ok = ret_os_process(Proc) end.
-os_rereduce(_Lang, [], _KVs) ->
- {ok, []};
+os_rereduce(#proc{} = Proc, OsRedSrcs, KVs) ->
+ [true, [Reduction]] = proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]),
+ Reduction;
os_rereduce(Lang, OsRedSrcs, KVs) ->
Proc = get_os_process(Lang),
- try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of
- [true, [Reduction]] -> Reduction
- after
- ok = ret_os_process(Proc)
- end.
-
+ try os_rereduce(Proc, OsRedSrcs, KVs) after ok = ret_os_process(Proc) end.
builtin_reduce(_Re, [], _KVs, Acc) ->
{ok, lists:reverse(Acc)};
@@ -234,16 +229,7 @@ init([]) ->
% just stop if one of the config settings change. couch_server_sup
% will restart us and then we will pick up the new settings.
- ok = couch_config:register(
- fun("query_servers" ++ _, _) ->
- supervisor:terminate_child(couch_secondary_services, query_servers),
- supervisor:restart_child(couch_secondary_services, query_servers)
- end),
- ok = couch_config:register(
- fun("native_query_servers" ++ _, _) ->
- supervisor:terminate_child(couch_secondary_services, query_servers),
- [supervisor:restart_child(couch_secondary_services, query_servers)]
- end),
+ ok = couch_config:register(fun ?MODULE:config_change/1),
Langs = ets:new(couch_query_server_langs, [set, private]),
PidProcs = ets:new(couch_query_server_pid_langs, [set, private]),
@@ -275,23 +261,15 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProc
case ets:lookup(LangProcs, Lang) of
[{Lang, [P|Rest]}] ->
% find a proc in the set that has the DDoc
- case proc_with_ddoc(DDoc, DDocKey, [P|Rest]) of
- {ok, Proc} ->
- rem_from_list(LangProcs, Lang, Proc),
- {reply, {ok, Proc, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end;
+ {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]),
+ rem_from_list(LangProcs, Lang, Proc),
+ {reply, {ok, Proc, get_query_server_config()}, Server};
_ ->
case (catch new_process(Langs, Lang)) of
{ok, Proc} ->
add_value(PidProcs, Proc#proc.pid, Proc),
- case proc_with_ddoc(DDoc, DDocKey, [Proc]) of
- {ok, Proc2} ->
- {reply, {ok, Proc2, get_query_server_config()}, Server};
- Error ->
- {reply, Error, Server}
- end;
+ {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]),
+ {reply, {ok, Proc2, get_query_server_config()}, Server};
Error ->
{reply, Error, Server}
end
@@ -348,6 +326,13 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+config_change("query_servers") ->
+ supervisor:terminate_child(couch_secondary_services, query_servers),
+ supervisor:restart_child(couch_secondary_services, query_servers);
+config_change("native_query_servers") ->
+ supervisor:terminate_child(couch_secondary_services, query_servers),
+ supervisor:restart_child(couch_secondary_services, query_servers).
+
% Private API
get_query_server_config() ->
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl
index 65573e8c..126639e0 100644
--- a/apps/couch/src/couch_rep.erl
+++ b/apps/couch/src/couch_rep.erl
@@ -15,7 +15,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([replicate/2, checkpoint/1]).
+-export([replicate/2, checkpoint/1, start_link/3]).
-include("couch_db.hrl").
@@ -49,6 +49,9 @@
doc_ids = nil
}).
+start_link(Id, PostBody, UserCtx) ->
+ gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []).
+
%% convenience function to do a simple replication from the shell
replicate(Source, Target) when is_list(Source) ->
replicate(?l2b(Source), Target);
@@ -61,7 +64,7 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) ->
replicate({Props}=PostBody, UserCtx) ->
{BaseId, Extension} = make_replication_id(PostBody, UserCtx),
Replicator = {BaseId ++ Extension,
- {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]},
+ {?MODULE, start_link, [BaseId, PostBody, UserCtx]},
temporary,
1,
worker,
@@ -80,10 +83,15 @@ replicate({Props}=PostBody, UserCtx) ->
false ->
Server = start_replication_server(Replicator),
- case couch_util:get_value(<<"continuous">>, Props, false) of
- true ->
+ Continuous = couch_util:get_value(<<"continuous">>, Props, false),
+ Async = couch_util:get_value(<<"async">>, Props, false),
+ case {Continuous, Async} of
+ {true, _} ->
{ok, {continuous, ?l2b(BaseId)}};
- false ->
+ {_, true} ->
+ spawn(fun() -> get_result(Server, PostBody, UserCtx) end),
+ Server;
+ _ ->
get_result(Server, PostBody, UserCtx)
end
end.
@@ -106,7 +114,9 @@ get_result(Server, PostBody, UserCtx) ->
init(InitArgs) ->
try do_init(InitArgs)
- catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end.
+ catch _:Error ->
+ {stop, Error}
+ end.
do_init([RepId, {PostProps}, UserCtx] = InitArgs) ->
process_flag(trap_exit, true),
@@ -211,14 +221,16 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
- couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
+ couch_task_status:update("MR Processed source update #~p of ~p",
+ [SourceSeq, seqnum(State#state.source)]),
{noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
when SourceSeq > N ->
MissingRevs = State#state.missing_revs,
ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
- couch_task_status:update("W Processed source update #~p", [SourceSeq]),
+ couch_task_status:update("W Processed source update #~p of ~p",
+ [SourceSeq, seqnum(State#state.source)]),
{noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
handle_info({writer_checkpoint, _}, State) ->
{noreply, State};
@@ -227,8 +239,14 @@ handle_info({update_stats, Key, N}, State) ->
ets:update_counter(State#state.stats, Key, N),
{noreply, State};
-handle_info({'DOWN', _, _, _, _}, State) ->
- ?LOG_INFO("replication terminating because local DB is shutting down", []),
+handle_info({'DOWN', _, _, Pid, _}, State) ->
+ Me = node(),
+ case erlang:node(Pid) of
+ Me ->
+ ?LOG_INFO("replication terminating - local DB is shutting down", []);
+ Node ->
+ ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node])
+ end,
timer:cancel(State#state.checkpoint_scheduled),
{stop, shutdown, State};
@@ -275,34 +293,35 @@ code_change(_OldVsn, State, _Extra) ->
% internal funs
start_replication_server(Replicator) ->
- RepId = element(1, Replicator),
- case supervisor:start_child(couch_rep_sup, Replicator) of
+ start_replication_server(Replicator, fun start_child/1).
+
+start_replication_server(Replicator, StartFun) ->
+ case StartFun(Replicator) of
{ok, Pid} ->
- ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]),
Pid;
{error, already_present} ->
- case supervisor:restart_child(couch_rep_sup, RepId) of
- {ok, Pid} ->
- ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]),
- Pid;
- {error, running} ->
- %% this error occurs if multiple replicators are racing
- %% each other to start and somebody else won. Just grab
- %% the Pid by calling start_child again.
- {error, {already_started, Pid}} =
- supervisor:start_child(couch_rep_sup, Replicator),
- ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
- Pid;
- {error, {db_not_found, DbUrl}} ->
- throw({db_not_found, <<"could not open ", DbUrl/binary>>})
- end;
+ start_replication_server(Replicator, fun restart_child/1);
{error, {already_started, Pid}} ->
- ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]),
Pid;
+ {error, running} ->
+ Children = supervisor:which_children(couch_rep_sup),
+ {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children),
+ Pid;
+ % sadly both seem to be needed. I don't know why.
{error, {{db_not_found, DbUrl}, _}} ->
- throw({db_not_found, <<"could not open ", DbUrl/binary>>})
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>});
+ {error, {db_not_found, DbUrl}} ->
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>});
+ {error, {node_not_connected, Node}} ->
+ throw({node_not_connected, Node})
end.
+start_child(Replicator) ->
+ supervisor:start_child(couch_rep_sup, Replicator).
+
+restart_child(Replicator) ->
+ supervisor:restart_child(couch_rep_sup, element(1, Replicator)).
+
compare_replication_logs(SrcDoc, TgtDoc) ->
#doc{body={RepRecProps}} = SrcDoc,
#doc{body={RepRecPropsTgt}} = TgtDoc,
@@ -355,8 +374,8 @@ close_db(Db) ->
dbname(#http_db{url = Url}) ->
strip_password(Url);
-dbname(#db{name = Name}) ->
- Name.
+dbname(#db{name = Name, main_pid = MainPid}) ->
+ ?l2b([Name, " (", pid_to_list(MainPid), ")"]).
strip_password(Url) ->
re:replace(Url,
@@ -457,7 +476,12 @@ maybe_append_options(Options, Props) ->
make_replication_id({Props}, UserCtx) ->
%% funky algorithm to preserve backwards compatibility
- {ok, HostName} = inet:gethostname(),
+ case couch_util:get_value(<<"use_hostname">>, Props, false) of
+ true ->
+ {ok, HostName} = inet:gethostname();
+ false ->
+ HostName = couch_config:get("replication", "hostname", "cloudant.com")
+ end,
% Port = mochiweb_socket_server:get(couch_httpd, port),
Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)),
Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)),
@@ -480,15 +504,22 @@ make_replication_id({Props}, UserCtx) ->
maybe_add_trailing_slash(Url) ->
re:replace(Url, "[^/]$", "&/", [{return, list}]).
-get_rep_endpoint(_UserCtx, {Props}) ->
- Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
- {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
- {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
- case couch_util:get_value(<<"oauth">>, Auth) of
+get_rep_endpoint(UserCtx, {Props}) ->
+ case couch_util:get_value(<<"url">>, Props) of
undefined ->
- {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
- {OAuth} ->
- {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
+ Node = couch_util:get_value(<<"node">>, Props),
+ Name = couch_util:get_value(<<"name">>, Props),
+ {Node, Name, UserCtx};
+ RawUrl ->
+ Url = maybe_add_trailing_slash(RawUrl),
+ {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
+ {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}),
+ case couch_util:get_value(<<"oauth">>, Auth) of
+ undefined ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]};
+ {OAuth} ->
+ {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth}
+ end
end;
get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) ->
{remote, maybe_add_trailing_slash(Url), []};
@@ -502,27 +533,43 @@ open_replication_log(#http_db{}=Db, RepId) ->
Req = Db#http_db{resource=couch_util:url_encode(DocId)},
case couch_rep_httpc:request(Req) of
{[{<<"error">>, _}, {<<"reason">>, _}]} ->
- ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
+ % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]),
#doc{id=?l2b(DocId)};
Doc ->
- ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
+ % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]),
couch_doc:from_json_obj(Doc)
end;
open_replication_log(Db, RepId) ->
DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
case couch_db:open_doc(Db, DocId, []) of
{ok, Doc} ->
- ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
+ % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]),
Doc;
_ ->
- ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
+ % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]),
#doc{id=DocId}
end.
open_db(Props, UserCtx, ProxyParams) ->
open_db(Props, UserCtx, ProxyParams, false).
-open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
+open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) ->
+ open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create);
+open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) ->
+ open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create);
+open_db({Props}, UserCtx, ProxyParams, Create) ->
+ case couch_util:get_value(<<"url">>, Props) of
+ undefined ->
+ Node = couch_util:get_value(<<"node">>, Props, node()),
+ DbName = couch_util:get_value(<<"name">>, Props),
+ open_local_db(Node, DbName, UserCtx, Create);
+ _Url ->
+ open_remote_db({Props}, ProxyParams, Create)
+ end;
+open_db(<<DbName/binary>>, UserCtx, _ProxyParams, Create) ->
+ open_local_db(node(), DbName, UserCtx, Create).
+
+open_remote_db({Props}, ProxyParams, CreateTarget) ->
Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)),
{AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}),
{BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}),
@@ -534,24 +581,32 @@ open_db({Props}, _UserCtx, ProxyParams, CreateTarget) ->
headers = lists:ukeymerge(1, Headers, DefaultHeaders)
},
Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams},
- couch_rep_httpc:db_exists(Db, CreateTarget);
-open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
- open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
-open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) ->
- open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget);
-open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) ->
- case CreateTarget of
- true ->
- ok = couch_httpd:verify_is_server_admin(UserCtx),
- couch_server:create(DbName, [{user_ctx, UserCtx}]);
- false -> ok
- end,
-
- case couch_db:open(DbName, [{user_ctx, UserCtx}]) of
- {ok, Db} ->
+ couch_rep_httpc:db_exists(Db, CreateTarget).
+
+open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) ->
+ try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create)
+ catch error:badarg ->
+ ?LOG_ERROR("unknown replication node ~s", [Node]),
+ throw({node_not_connected, Node}) end;
+open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) ->
+ case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of
+ {ok, #db{} = Db} ->
+ couch_db:monitor(Db),
+ Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)};
+ {ok, MainPid} when is_pid(MainPid) ->
+ {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx),
couch_db:monitor(Db),
Db;
- {not_found, no_db_file} -> throw({db_not_found, DbName})
+ {not_found, no_db_file} when Create =:= false->
+ throw({db_not_found, DbName});
+ {not_found, no_db_file} ->
+ ok = couch_httpd:verify_is_server_admin(UserCtx),
+ couch_server:create(DbName, [{user_ctx, UserCtx}]);
+ {'EXIT', {{nodedown, Node}, _Stack}} ->
+ throw({node_not_connected, couch_util:to_binary(Node)});
+ {'EXIT', {noproc, {gen_server,call,_}}} ->
+ timer:sleep(1000),
+ throw({noproc, couch_server, Node})
end.
schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) ->
@@ -582,9 +637,14 @@ do_checkpoint(State) ->
} = State,
case commit_to_both(Source, Target, NewSeqNum) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
- ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p",
- [dbname(Source), dbname(Target), NewSeqNum]),
- SessionId = couch_uuids:random(),
+ ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p"
+ " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]),
+ SessionId = couch_uuids:new(),
+ TargetNode = case Target of #db{main_pid=MainPid} ->
+ erlang:node(MainPid);
+ _ ->
+ http
+ end,
NewHistoryEntry = {[
{<<"session_id">>, SessionId},
{<<"start_time">>, list_to_binary(ReplicationStartTime)},
@@ -603,6 +663,7 @@ do_checkpoint(State) ->
NewRepHistory = {[
{<<"session_id">>, SessionId},
{<<"source_last_seq">>, NewSeqNum},
+ {<<"target_node">>, TargetNode},
{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}
]},
@@ -622,7 +683,9 @@ do_checkpoint(State) ->
"yourself?)", []),
State
end;
- _Else ->
+ Else ->
+ ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [
+ {SrcInstanceStartTime, TgtInstanceStartTime}, Else]),
?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint",
[dbname(Source), dbname(Target)]),
#state{
@@ -654,7 +717,12 @@ commit_to_both(Source, Target, RequiredSeq) ->
{SrcCommitPid, Timestamp} ->
Timestamp;
{'EXIT', SrcCommitPid, {http_request_failed, _}} ->
- exit(replication_link_failure)
+ nil;
+ {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} ->
+ nil; % DB crashed, this should trigger a reboot
+ {'EXIT', SrcCommitPid, Else} ->
+ ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]),
+ nil
end,
{SourceStartTime, TargetStartTime}.
@@ -667,12 +735,13 @@ ensure_full_commit(#http_db{headers = Headers} = Target) ->
{ResultProps} = couch_rep_httpc:request(Req),
true = couch_util:get_value(<<"ok">>, ResultProps),
couch_util:get_value(<<"instance_start_time">>, ResultProps);
-ensure_full_commit(Target) ->
- {ok, NewDb} = couch_db:open_int(Target#db.name, []),
+ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) ->
+ TargetNode = erlang:node(Pid),
+ {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]),
UpdateSeq = couch_db:get_update_seq(Target),
CommitSeq = couch_db:get_committed_update_seq(NewDb),
InstanceStartTime = NewDb#db.instance_start_time,
- couch_db:close(NewDb),
+ catch couch_db:close(NewDb),
if UpdateSeq > CommitSeq ->
?LOG_DEBUG("target needs a full commit: update ~p commit ~p",
[UpdateSeq, CommitSeq]),
@@ -732,6 +801,11 @@ up_to_date(Source, Seq) ->
couch_db:close(NewDb),
T.
+seqnum(#http_db{}) ->
+ -1;
+seqnum(Db) ->
+ Db#db.update_seq.
+
parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
parse_proxy_params(?b2l(ProxyUrl));
parse_proxy_params([]) ->
diff --git a/apps/couch/src/couch_rep_att.erl b/apps/couch/src/couch_rep_att.erl
index 28b8945c..476c64d4 100644
--- a/apps/couch/src/couch_rep_att.erl
+++ b/apps/couch/src/couch_rep_att.erl
@@ -78,7 +78,6 @@ receive_data(Ref, ReqId, ContentEncoding) ->
?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]),
throw({attachment_request_failed, Err});
{ibrowse_async_response, ReqId, Data} ->
- % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]),
Data;
{ibrowse_async_response_end, ReqId} ->
?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]),
diff --git a/apps/couch/src/couch_rep_httpc.erl b/apps/couch/src/couch_rep_httpc.erl
index aaa38106..3b11b869 100644
--- a/apps/couch/src/couch_rep_httpc.erl
+++ b/apps/couch/src/couch_rep_httpc.erl
@@ -176,8 +176,8 @@ process_response({error, Reason}, Req) ->
Else ->
Else
end,
- ?LOG_DEBUG("retrying couch_rep_httpc ~p request in ~p seconds due to " ++
- "{error, ~p}", [Method, Pause/1000, ShortReason]),
+ ?LOG_ERROR("~p retry ~p ~s in ~p seconds due to {error, ~p}",
+ [?MODULE, Method, full_url(Req), Pause/1000, ShortReason]),
timer:sleep(Pause),
if Reason == worker_is_dead ->
C = spawn_link_worker_process(Req),
diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl
index 8722f3f5..46633994 100644
--- a/apps/couch/src/couch_rep_reader.erl
+++ b/apps/couch/src/couch_rep_reader.erl
@@ -20,7 +20,7 @@
-import(couch_util, [url_encode/1]).
-define (BUFFER_SIZE, 1000).
--define (MAX_CONCURRENT_REQUESTS, 100).
+-define (MAX_CONCURRENT_REQUESTS, 10).
-define (MAX_CONNECTIONS, 20).
-define (MAX_PIPELINE_SIZE, 50).
diff --git a/apps/couch/src/couch_secondary_sup.erl b/apps/couch/src/couch_secondary_sup.erl
new file mode 100644
index 00000000..8ccbd799
--- /dev/null
+++ b/apps/couch/src/couch_secondary_sup.erl
@@ -0,0 +1,35 @@
+-module(couch_secondary_sup).
+-behaviour(supervisor).
+-export([init/1, start_link/0]).
+
+start_link() ->
+ supervisor:start_link({local,couch_secondary_services}, ?MODULE, []).
+init([]) ->
+ SecondarySupervisors = [
+ {couch_db_update_notifier_sup,
+ {couch_db_update_notifier_sup, start_link, []},
+ permanent,
+ infinity,
+ supervisor,
+ [couch_db_update_notifier_sup]},
+ {couch_metrics_event_manager,
+ {gen_event, start_link, [{local, couch_metrics_event_manager}]},
+ permanent,
+ brutal_kill,
+ worker,
+ dynamic}
+ ],
+ Children = SecondarySupervisors ++ [
+ begin
+ {ok, {Module, Fun, Args}} = couch_util:parse_term(SpecStr),
+
+ {list_to_atom(Name),
+ {Module, Fun, Args},
+ permanent,
+ brutal_kill,
+ worker,
+ [Module]}
+ end
+ || {Name, SpecStr}
+ <- couch_config:get("daemons"), SpecStr /= ""],
+ {ok, {{one_for_one, 10, 3600}, Children}}.
diff --git a/apps/couch/src/couch_server.erl b/apps/couch/src/couch_server.erl
index 43fd9044..b54771be 100644
--- a/apps/couch/src/couch_server.erl
+++ b/apps/couch/src/couch_server.erl
@@ -13,10 +13,11 @@
-module(couch_server).
-behaviour(gen_server).
--export([open/2,create/2,delete/2,all_databases/0,get_version/0]).
--export([init/1, handle_call/3,sup_start_link/0]).
+-export([open/2,create/2,delete/2,all_databases/0,all_databases/1]).
+-export([init/1, handle_call/3,sup_start_link/0,get_version/0]).
-export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
--export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]).
+-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0,config_change/4]).
+-export([close_lru/0]).
-include("couch_db.hrl").
@@ -50,15 +51,26 @@ get_stats() ->
sup_start_link() ->
gen_server:start_link({local, couch_server}, couch_server, [], []).
+
open(DbName, Options) ->
- case gen_server:call(couch_server, {open, DbName, Options}, infinity) of
- {ok, Db} ->
- Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
- {ok, Db#db{user_ctx=Ctx}};
- Error ->
- Error
+ Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
+ case ets:lookup(couch_dbs, DbName) of
+ [#db{fd=Fd, fd_monitor=Lock} = Db] when Lock =/= locked ->
+ ets:insert(couch_lru, {DbName, now()}),
+ {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
+ _ ->
+ case gen_server:call(couch_server, {open, DbName, Options}, infinity) of
+ {ok, #db{fd=Fd} = Db} ->
+ ets:insert(couch_lru, {DbName, now()}),
+ {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}};
+ Error ->
+ Error
+ end
end.
+close_lru() ->
+ gen_server:call(couch_server, close_lru).
+
create(DbName, Options) ->
case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
{ok, Db} ->
@@ -121,28 +133,12 @@ init([]) ->
RootDir = couch_config:get("couchdb", "database_dir", "."),
MaxDbsOpen = list_to_integer(
couch_config:get("couchdb", "max_dbs_open")),
- Self = self(),
- ok = couch_config:register(
- fun("couchdb", "database_dir") ->
- exit(Self, config_change)
- end),
- ok = couch_config:register(
- fun("couchdb", "max_dbs_open", Max) ->
- gen_server:call(couch_server,
- {set_max_dbs_open, list_to_integer(Max)})
- end),
+ ok = couch_config:register(fun ?MODULE:config_change/4),
ok = couch_file:init_delete_dir(RootDir),
hash_admin_passwords(),
- ok = couch_config:register(
- fun("admins", _Key, _Value, Persist) ->
- % spawn here so couch_config doesn't try to call itself
- spawn(fun() -> hash_admin_passwords(Persist) end)
- end, false),
- {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
- ets:new(couch_dbs_by_name, [set, private, named_table]),
- ets:new(couch_dbs_by_pid, [set, private, named_table]),
- ets:new(couch_dbs_by_lru, [ordered_set, private, named_table]),
- ets:new(couch_sys_dbs, [set, private, named_table]),
+ {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\.]*$"),
+ ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]),
+ ets:new(couch_lru, [set, public, named_table]),
process_flag(trap_exit, true),
{ok, #server{root_dir=RootDir,
dbname_regexp=RegExp,
@@ -150,15 +146,27 @@ init([]) ->
start_time=httpd_util:rfc1123_date()}}.
terminate(_Reason, _Srv) ->
- [couch_util:shutdown_sync(Pid) || {_, {Pid, _LruTime}} <-
- ets:tab2list(couch_dbs_by_name)],
+ ets:foldl(fun(#db{main_pid=Pid}, _) -> couch_util:shutdown_sync(Pid) end,
+ nil, couch_dbs),
ok.
+config_change("couchdb", "database_dir", _, _) ->
+ exit(whereis(couch_server), config_change);
+config_change("couchdb", "max_dbs_open", Max, _) ->
+ gen_server:call(couch_server, {set_max_dbs_open, list_to_integer(Max)});
+config_change("admins", _, _, Persist) ->
+ % spawn here so couch_config doesn't try to call itself
+ spawn(fun() -> hash_admin_passwords(Persist) end).
+
all_databases() ->
+ all_databases("").
+
+all_databases(Prefix) ->
{ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server),
NormRoot = couch_util:normpath(Root),
Filenames =
- filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true,
+ filelib:fold_files(Root++Prefix, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$",
+ true,
fun(Filename, AccIn) ->
NormFilename = couch_util:normpath(Filename),
case NormFilename -- NormRoot of
@@ -181,172 +189,145 @@ maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) ->
Error -> Error
end.
+find_oldest_db({DbName, Lru}, Acc) ->
+ erlang:min({Lru, DbName}, Acc).
+
try_close_lru(StartTime) ->
- LruTime = get_lru(),
- if LruTime > StartTime ->
- % this means we've looped through all our opened dbs and found them
- % all in use.
+ case ets:foldl(fun find_oldest_db/2, {StartTime, nil}, couch_lru) of
+ {StartTime, nil} ->
{error, all_dbs_active};
- true ->
- [{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
- [{_, {opened, MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName),
- case couch_db:is_idle(MainPid) of
- true ->
- couch_util:shutdown_sync(MainPid),
- true = ets:delete(couch_dbs_by_lru, LruTime),
- true = ets:delete(couch_dbs_by_name, DbName),
- true = ets:delete(couch_dbs_by_pid, MainPid),
- true = ets:delete(couch_sys_dbs, DbName),
+ {_, DbName} ->
+ % There may exist an extremely small possibility of a race
+ % condition here, if a process could lookup the DB before the lock,
+ % but fail to monitor the fd before the is_idle check.
+ true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}),
+ [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName),
+ case couch_db:is_idle(Db) of true ->
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_lru, DbName),
+ exit(Pid, kill),
ok;
false ->
- % this still has referrers. Go ahead and give it a current lru time
- % and try the next one in the table.
- NewLruTime = now(),
- true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, NewLruTime}}),
- true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
- true = ets:delete(couch_dbs_by_lru, LruTime),
- true = ets:insert(couch_dbs_by_lru, {NewLruTime, DbName}),
+ true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}),
+ true = ets:insert(couch_lru, {DbName, now()}),
try_close_lru(StartTime)
end
end.
-get_lru() ->
- get_lru(ets:first(couch_dbs_by_lru)).
-
-get_lru(LruTime) ->
- [{LruTime, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
- case ets:member(couch_sys_dbs, DbName) of
- false ->
- LruTime;
- true ->
- [{_, {opened, MainPid, _}}] = ets:lookup(couch_dbs_by_name, DbName),
- case couch_db:is_idle(MainPid) of
- true ->
- LruTime;
- false ->
- get_lru(ets:next(couch_dbs_by_lru, LruTime))
- end
- end.
-
open_async(Server, From, DbName, Filepath, Options) ->
Parent = self(),
+ put({async_open, DbName}, now()),
Opener = spawn_link(fun() ->
- Res = couch_db:start_link(DbName, Filepath, Options),
- gen_server:call(
- Parent, {open_result, DbName, Res, Options}, infinity
- ),
- unlink(Parent),
- case Res of
- {ok, DbReader} ->
- unlink(DbReader);
- _ ->
- ok
- end
- end),
- true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From]}}),
- true = ets:insert(couch_dbs_by_pid, {Opener, DbName}),
- DbsOpen = case lists:member(sys_db, Options) of
- true ->
- true = ets:insert(couch_sys_dbs, {DbName, true}),
- Server#server.dbs_open;
- false ->
- Server#server.dbs_open + 1
- end,
- Server#server{dbs_open = DbsOpen}.
-
+ Res = couch_db:start_link(DbName, Filepath, Options),
+ gen_server:call(Parent, {open_result, DbName, Res}, infinity),
+ unlink(Parent)
+ end),
+ % icky hack of field values - compactor_pid used to store clients
+ true = ets:insert(couch_dbs, #db{
+ name = DbName,
+ main_pid = Opener,
+ compactor_pid = [From],
+ fd_monitor = locked
+ }),
+ Server#server{dbs_open=Server#server.dbs_open + 1}.
+
+handle_call(close_lru, _From, #server{dbs_open=N} = Server) ->
+ case try_close_lru(now()) of
+ ok ->
+ {reply, ok, Server#server{dbs_open = N-1}};
+ Error ->
+ {reply, Error, Server}
+ end;
+handle_call(open_dbs_count, _From, Server) ->
+ {reply, Server#server.dbs_open, Server};
+handle_call({set_dbname_regexp, RegExp}, _From, Server) ->
+ {reply, ok, Server#server{dbname_regexp=RegExp}};
handle_call({set_max_dbs_open, Max}, _From, Server) ->
{reply, ok, Server#server{max_dbs_open=Max}};
handle_call(get_server, _From, Server) ->
{reply, {ok, Server}, Server};
-handle_call({open_result, DbName, {ok, OpenedDbPid}, Options}, _From, Server) ->
- link(OpenedDbPid),
- [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName),
- lists:foreach(fun({FromPid,_}=From) ->
- gen_server:reply(From,
- catch couch_db:open_ref_counted(OpenedDbPid, FromPid))
- end, Froms),
- LruTime = now(),
- true = ets:insert(couch_dbs_by_name,
- {DbName, {opened, OpenedDbPid, LruTime}}),
- true = ets:delete(couch_dbs_by_pid, Opener),
- true = ets:insert(couch_dbs_by_pid, {OpenedDbPid, DbName}),
- true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
- case lists:member(create, Options) of
- true ->
- couch_db_update_notifier:notify({created, DbName});
- false ->
- ok
+handle_call({open_result, DbName, {ok, Db}}, _From, Server) ->
+ link(Db#db.main_pid),
+ case erase({async_open, DbName}) of undefined -> ok; T0 ->
+ ?LOG_INFO("needed ~p ms to open new ~s", [timer:now_diff(now(),T0)/1000,
+ DbName])
end,
+ % icky hack of field values - compactor_pid used to store clients
+ [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName),
+ [gen_server:reply(From, {ok, Db}) || From <- Froms],
+ true = ets:insert(couch_dbs, Db),
{reply, ok, Server};
-handle_call({open_result, DbName, Error, Options}, _From, Server) ->
- [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName),
- lists:foreach(fun(From) ->
- gen_server:reply(From, Error)
- end, Froms),
- true = ets:delete(couch_dbs_by_name, DbName),
- true = ets:delete(couch_dbs_by_pid, Opener),
- DbsOpen = case lists:member(sys_db, Options) of
- true ->
- true = ets:delete(couch_sys_dbs, DbName),
- Server#server.dbs_open;
- false ->
- Server#server.dbs_open - 1
- end,
- {reply, ok, Server#server{dbs_open = DbsOpen}};
-handle_call({open, DbName, Options}, {FromPid,_}=From, Server) ->
- LruTime = now(),
- case ets:lookup(couch_dbs_by_name, DbName) of
+handle_call({open_result, DbName, Error}, _From, Server) ->
+ % icky hack of field values - compactor_pid used to store clients
+ [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName),
+ [gen_server:reply(From, Error) || From <- Froms],
+ ?LOG_INFO("open_result error ~p for ~s", [Error, DbName]),
+ true = ets:delete(couch_dbs, DbName),
+ {reply, ok, Server#server{dbs_open=Server#server.dbs_open - 1}};
+handle_call({open, DbName, Options}, From, Server) ->
+ case ets:lookup(couch_dbs, DbName) of
[] ->
- open_db(DbName, Server, Options, From);
- [{_, {opening, Opener, Froms}}] ->
- true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From|Froms]}}),
+ DbNameList = binary_to_list(DbName),
+ case check_dbname(Server, DbNameList) of
+ ok ->
+ case maybe_close_lru_db(Server) of
+ {ok, Server2} ->
+ Filepath = get_full_filename(Server, DbNameList),
+ {noreply, open_async(Server2, From, DbName, Filepath, Options)};
+ CloseError ->
+ {reply, CloseError, Server}
+ end;
+ Error ->
+ {reply, Error, Server}
+ end;
+ [#db{compactor_pid = Froms} = Db] when is_list(Froms) ->
+ % icky hack of field values - compactor_pid used to store clients
+ ?LOG_INFO("adding another listener to async open for ~s", [DbName]),
+ true = ets:insert(couch_dbs, Db#db{compactor_pid = [From|Froms]}),
{noreply, Server};
- [{_, {opened, MainPid, PrevLruTime}}] ->
- true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, LruTime}}),
- true = ets:delete(couch_dbs_by_lru, PrevLruTime),
- true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
- {reply, couch_db:open_ref_counted(MainPid, FromPid), Server}
+ [#db{} = Db] ->
+ {reply, {ok, Db}, Server}
end;
handle_call({create, DbName, Options}, From, Server) ->
- case ets:lookup(couch_dbs_by_name, DbName) of
- [] ->
- open_db(DbName, Server, [create | Options], From);
- [_AlreadyRunningDb] ->
- {reply, file_exists, Server}
+ DbNameList = binary_to_list(DbName),
+ case check_dbname(Server, DbNameList) of
+ ok ->
+ case ets:lookup(couch_dbs, DbName) of
+ [] ->
+ case maybe_close_lru_db(Server) of
+ {ok, Server2} ->
+ Filepath = get_full_filename(Server, DbNameList),
+ {noreply, open_async(Server2, From, DbName, Filepath,
+ [create | Options])};
+ CloseError ->
+ {reply, CloseError, Server}
+ end;
+ [_AlreadyRunningDb] ->
+ {reply, file_exists, Server}
+ end;
+ Error ->
+ {reply, Error, Server}
end;
handle_call({delete, DbName, _Options}, _From, Server) ->
DbNameList = binary_to_list(DbName),
case check_dbname(Server, DbNameList) of
ok ->
FullFilepath = get_full_filename(Server, DbNameList),
- UpdateState =
- case ets:lookup(couch_dbs_by_name, DbName) of
- [] -> false;
- [{_, {opening, Pid, Froms}}] ->
- couch_util:shutdown_sync(Pid),
- true = ets:delete(couch_dbs_by_name, DbName),
- true = ets:delete(couch_dbs_by_pid, Pid),
- [gen_server:send_result(F, not_found) || F <- Froms],
- true;
- [{_, {opened, Pid, LruTime}}] ->
- couch_util:shutdown_sync(Pid),
- true = ets:delete(couch_dbs_by_name, DbName),
- true = ets:delete(couch_dbs_by_pid, Pid),
- true = ets:delete(couch_dbs_by_lru, LruTime),
- true
- end,
- Server2 = case UpdateState of
- true ->
- DbsOpen = case ets:member(couch_sys_dbs, DbName) of
- true ->
- true = ets:delete(couch_sys_dbs, DbName),
- Server#server.dbs_open;
- false ->
- Server#server.dbs_open - 1
- end,
- Server#server{dbs_open = DbsOpen};
- false ->
- Server
+ Server2 =
+ case ets:lookup(couch_dbs, DbName) of
+ [] -> Server;
+ [#db{main_pid=Pid, compactor_pid=Froms}] when is_list(Froms) ->
+ % icky hack of field values - compactor_pid used to store clients
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_lru, DbName),
+ exit(Pid, kill),
+ [gen_server:reply(F, not_found) || F <- Froms],
+ Server#server{dbs_open=Server#server.dbs_open - 1};
+ [#db{main_pid=Pid}] ->
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_lru, DbName),
+ exit(Pid, kill),
+ Server#server{dbs_open=Server#server.dbs_open - 1}
end,
%% Delete any leftover .compact files. If we don't do this a subsequent
@@ -364,36 +345,37 @@ handle_call({delete, DbName, _Options}, _From, Server) ->
end;
Error ->
{reply, Error, Server}
- end.
+ end;
+handle_call({db_updated, Db}, _From, Server) ->
+ ets:insert(couch_dbs, Db),
+ {reply, ok, Server}.
+
-handle_cast(Msg, _Server) ->
- exit({unknown_cast_message, Msg}).
+handle_cast(Msg, Server) ->
+ {stop, {unknown_cast_message, Msg}, Server}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-handle_info({'EXIT', _Pid, config_change}, Server) ->
- {noreply, shutdown, Server};
-handle_info(Error, _Server) ->
- ?LOG_ERROR("Unexpected message, restarting couch_server: ~p", [Error]),
- exit(kill).
-open_db(DbName, Server, Options, From) ->
- DbNameList = binary_to_list(DbName),
- case check_dbname(Server, DbNameList) of
- ok ->
- Filepath = get_full_filename(Server, DbNameList),
- case lists:member(sys_db, Options) of
+handle_info({'EXIT', _Pid, config_change}, Server) ->
+ {stop, config_change, Server};
+handle_info({'EXIT', Pid, Reason}, #server{dbs_open=DbsOpen}=Server) ->
+ Match = erlang:make_tuple(tuple_size(#db{}), '_', [{1, db},
+ {#db.main_pid, Pid}]),
+ case ets:match_object(couch_dbs, Match) of
+ [#db{name = DbName, compactor_pid=Froms}] ->
+ ?LOG_INFO("db ~s died with reason ~p", [DbName, Reason]),
+ % icky hack of field values - compactor_pid used to store clients
+ if is_list(Froms) ->
+ [gen_server:reply(From, Reason) || From <- Froms];
true ->
- {noreply, open_async(Server, From, DbName, Filepath, Options)};
- false ->
- case maybe_close_lru_db(Server) of
- {ok, Server2} ->
- {noreply, open_async(Server2, From, DbName, Filepath, Options)};
- CloseError ->
- {reply, CloseError, Server}
- end
- end;
- Error ->
- {reply, Error, Server}
- end.
+ ok
+ end,
+ true = ets:delete(couch_dbs, DbName),
+ true = ets:delete(couch_lru, DbName),
+ {noreply, Server#server{dbs_open=DbsOpen - 1}};
+ [] ->
+ {noreply, Server}
+ end;
+handle_info(Info, Server) ->
+ {stop, {unknown_message, Info}, Server}.
diff --git a/apps/couch/src/couch_server_sup.erl b/apps/couch/src/couch_server_sup.erl
index 4f0445da..1f31209b 100644
--- a/apps/couch/src/couch_server_sup.erl
+++ b/apps/couch/src/couch_server_sup.erl
@@ -14,9 +14,8 @@
-behaviour(supervisor).
--export([start_link/1,stop/0, couch_config_start_link_wrapper/2,
- start_primary_services/0,start_secondary_services/0,
- restart_core_server/0]).
+-export([start_link/1, couch_config_start_link_wrapper/2,
+ restart_core_server/0, config_change/2]).
-include("couch_db.hrl").
@@ -68,59 +67,46 @@ start_server(IniFiles) ->
_ -> ok
end,
- LibDir =
- case couch_config:get("couchdb", "util_driver_dir", null) of
- null ->
- filename:join(couch_util:priv_dir(), "lib");
- LibDir0 -> LibDir0
- end,
-
- ok = couch_util:start_driver(LibDir),
-
BaseChildSpecs =
- {{one_for_all, 10, 3600},
+ {{one_for_all, 10, 60},
[{couch_config,
{couch_server_sup, couch_config_start_link_wrapper, [IniFiles, ConfigPid]},
permanent,
brutal_kill,
worker,
[couch_config]},
+ {couch_config_event,
+ {couch_config_event, start_link, []},
+ permanent,
+ 1000,
+ worker,
+ dynamic},
{couch_primary_services,
- {couch_server_sup, start_primary_services, []},
+ {couch_primary_sup, start_link, []},
permanent,
infinity,
supervisor,
- [couch_server_sup]},
+ [couch_primary_sup]},
{couch_secondary_services,
- {couch_server_sup, start_secondary_services, []},
+ {couch_secondary_sup, start_link, []},
permanent,
infinity,
supervisor,
- [couch_server_sup]}
+ [couch_secondary_sup]}
]},
- % ensure these applications are running
- application:start(ibrowse),
- application:start(crypto),
-
{ok, Pid} = supervisor:start_link(
{local, couch_server_sup}, couch_server_sup, BaseChildSpecs),
- % launch the icu bridge
% just restart if one of the config settings change.
-
- couch_config:register(
- fun("couchdb", "util_driver_dir") ->
- ?MODULE:stop();
- ("daemons", _) ->
- ?MODULE:stop()
- end, Pid),
+ couch_config:register(fun ?MODULE:config_change/2, Pid),
unlink(ConfigPid),
Ip = couch_config:get("httpd", "bind_address"),
Port = mochiweb_socket_server:get(couch_httpd, port),
io:format("Apache CouchDB has started. Time to relax.~n"),
+
?LOG_INFO("Apache CouchDB has started on http://~s:~w/", [Ip, Port]),
case couch_config:get("couchdb", "uri_file", null) of
@@ -132,62 +118,12 @@ start_server(IniFiles) ->
{ok, Pid}.
-start_primary_services() ->
- supervisor:start_link({local, couch_primary_services}, couch_server_sup,
- {{one_for_one, 10, 3600},
- [{couch_log,
- {couch_log, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [couch_log]},
- {couch_replication_supervisor,
- {couch_rep_sup, start_link, []},
- permanent,
- infinity,
- supervisor,
- [couch_rep_sup]},
- {couch_task_status,
- {couch_task_status, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [couch_task_status]},
- {couch_server,
- {couch_server, sup_start_link, []},
- permanent,
- 1000,
- worker,
- [couch_server]},
- {couch_db_update_event,
- {gen_event, start_link, [{local, couch_db_update}]},
- permanent,
- brutal_kill,
- worker,
- dynamic}
- ]
- }).
-
-start_secondary_services() ->
- DaemonChildSpecs = [
- begin
- {ok, {Module, Fun, Args}} = couch_util:parse_term(SpecStr),
-
- {list_to_atom(Name),
- {Module, Fun, Args},
- permanent,
- 1000,
- worker,
- [Module]}
- end
- || {Name, SpecStr}
- <- couch_config:get("daemons"), SpecStr /= ""],
-
- supervisor:start_link({local, couch_secondary_services}, couch_server_sup,
- {{one_for_one, 10, 3600}, DaemonChildSpecs}).
-
-stop() ->
- catch exit(whereis(couch_server_sup), normal).
+config_change("daemons", _) ->
+ exit(whereis(couch_server_sup), shutdown);
+config_change("couchdb", "util_driver_dir") ->
+ [Pid] = [P || {collation_driver,P,_,_}
+ <- supervisor:which_children(couch_primary_services)],
+ Pid ! reload_driver.
init(ChildSpecs) ->
{ok, ChildSpecs}.
diff --git a/apps/couch/src/couch_stats_aggregator.erl b/apps/couch/src/couch_stats_aggregator.erl
index 6090355d..7dac1124 100644
--- a/apps/couch/src/couch_stats_aggregator.erl
+++ b/apps/couch/src/couch_stats_aggregator.erl
@@ -94,7 +94,12 @@ init(StatDescsFileName) ->
ets:new(?MODULE, [named_table, set, protected]),
SampleStr = couch_config:get("stats", "samples", "[0]"),
{ok, Samples} = couch_util:parse_term(SampleStr),
- {ok, Descs} = file:consult(StatDescsFileName),
+ case file:consult(StatDescsFileName) of
+ {ok, Descs} ->
+ ok;
+ {error, _} ->
+ Descs = []
+ end,
lists:foreach(fun({Sect, Key, Value}) ->
lists:foreach(fun(Secs) ->
Agg = #aggregate{
diff --git a/apps/couch/src/couch_stats_collector.erl b/apps/couch/src/couch_stats_collector.erl
index f7b9bb48..74238fc8 100644
--- a/apps/couch/src/couch_stats_collector.erl
+++ b/apps/couch/src/couch_stats_collector.erl
@@ -85,21 +85,12 @@ track_process_count(Stat) ->
track_process_count(self(), Stat).
track_process_count(Pid, Stat) ->
- MonitorFun = fun() ->
- Ref = erlang:monitor(process, Pid),
- receive {'DOWN', Ref, _, _, _} -> ok end,
- couch_stats_collector:decrement(Stat)
- end,
- case (catch couch_stats_collector:increment(Stat)) of
- ok -> spawn(MonitorFun);
- _ -> ok
- end.
-
+ gen_server:cast(?MODULE, {track_process_count, Stat, Pid}).
init(_) ->
ets:new(?HIT_TABLE, [named_table, set, public]),
ets:new(?ABS_TABLE, [named_table, duplicate_bag, public]),
- {ok, nil}.
+ {ok, []}.
terminate(_Reason, _State) ->
ok.
@@ -107,11 +98,15 @@ terminate(_Reason, _State) ->
handle_call(stop, _, State) ->
{stop, normal, stopped, State}.
-handle_cast(foo, State) ->
- {noreply, State}.
+handle_cast({track_process_count, Stat, Pid}, State) ->
+ ok = couch_stats_collector:increment(Stat),
+ Ref = erlang:monitor(process, Pid),
+ {noreply, [{Ref,Stat} | State]}.
-handle_info(_Info, State) ->
- {noreply, State}.
+handle_info({'DOWN', Ref, _, _, _}, State) ->
+ {Ref, Stat} = lists:keyfind(Ref, 1, State),
+ ok = couch_stats_collector:decrement(Stat),
+ {noreply, lists:keydelete(Ref, 1, State)}.
code_change(_OldVersion, State, _Extra) ->
{ok, State}.
diff --git a/apps/couch/src/couch_task_status.erl b/apps/couch/src/couch_task_status.erl
index c4487dc4..639515c7 100644
--- a/apps/couch/src/couch_task_status.erl
+++ b/apps/couch/src/couch_task_status.erl
@@ -107,7 +107,6 @@ handle_call(all, _, Server) ->
handle_cast({update_status, Pid, StatusText}, Server) ->
[{Pid, {Type, TaskName, _StatusText}}] = ets:lookup(?MODULE, Pid),
- ?LOG_DEBUG("New task status for ~s: ~s",[TaskName, StatusText]),
true = ets:insert(?MODULE, {Pid, {Type, TaskName, StatusText}}),
{noreply, Server};
handle_cast(stop, State) ->
diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl
index 38c0a783..72facd27 100644
--- a/apps/couch/src/couch_view.erl
+++ b/apps/couch/src/couch_view.erl
@@ -17,7 +17,8 @@
detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,
code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4,
get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/4,
- extract_map_view/1,get_group_server/2,get_group_info/2,cleanup_index_files/1]).
+ extract_map_view/1,get_group_server/2,get_group_info/2,
+ cleanup_index_files/1,config_change/2]).
-include("couch_db.hrl").
@@ -249,11 +250,7 @@ fold(#view{btree=Btree}, Fun, Acc, Options) ->
init([]) ->
% read configuration settings and register for configuration changes
RootDir = couch_config:get("couchdb", "view_index_dir"),
- Self = self(),
- ok = couch_config:register(
- fun("couchdb", "view_index_dir")->
- exit(Self, config_change)
- end),
+ ok = couch_config:register(fun ?MODULE:config_change/2),
couch_db_update_notifier:start_link(
fun({deleted, DbName}) ->
@@ -335,6 +332,9 @@ handle_info({'EXIT', FromPid, Reason}, Server) ->
end,
{noreply, Server}.
+config_change("couchdb", "view_index_dir") ->
+ exit(whereis(couch_view), config_change).
+
add_to_ets(Pid, DbName, Sig) ->
true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}),
true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}),
diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl
index 895556bf..f56325a4 100644
--- a/apps/couch/src/couch_view_compactor.erl
+++ b/apps/couch/src/couch_view_compactor.erl
@@ -36,15 +36,20 @@ compact_group(Group, EmptyGroup) ->
} = Group,
#group{
- db = Db,
+ dbname = DbName,
+ fd = Fd,
id_btree = EmptyIdBtree,
+ sig = Sig,
views = EmptyViews
} = EmptyGroup,
- {ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
+ erlang:monitor(process, Fd),
+
+ {ok, Db} = couch_db:open(DbName, []),
+
+ {ok, {Count, _}} = couch_btree:full_reduce(Db#db.id_tree),
<<"_design", ShortName/binary>> = GroupId,
- DbName = couch_db:name(Db),
TaskName = <<DbName/binary, ShortName/binary>>,
couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
@@ -72,10 +77,10 @@ compact_group(Group, EmptyGroup) ->
current_seq=Seq
},
- Pid = couch_view:get_group_server(DbName, GroupId),
+ Pid = ets:lookup_element(group_servers_by_sig, {DbName, Sig}, 2),
gen_server:cast(Pid, {compact_done, NewGroup}).
-%% @spec compact_view(View, EmptyView, Retry) -> CompactView
+%% @spec compact_view(View, EmptyView) -> CompactView
compact_view(View, EmptyView) ->
{ok, Count} = couch_view:get_row_count(View),
diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl
index f01befdf..f11bb54d 100644
--- a/apps/couch/src/couch_view_group.erl
+++ b/apps/couch/src/couch_view_group.erl
@@ -39,8 +39,7 @@
request_group(Pid, Seq) ->
?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]),
case gen_server:call(Pid, {request_group, Seq}, infinity) of
- {ok, Group, RefCounter} ->
- couch_ref_counter:add(RefCounter),
+ {ok, Group, _RefCounter} ->
{ok, Group};
Error ->
?LOG_DEBUG("request_group Error ~p", [Error]),
@@ -75,27 +74,26 @@ start_link(InitArgs) ->
end.
% init creates a closure which spawns the appropriate view_updater.
-init({InitArgs, ReturnPid, Ref}) ->
+init({{_, DbName, _}=InitArgs, ReturnPid, Ref}) ->
process_flag(trap_exit, true),
case prepare_group(InitArgs, false) of
- {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} ->
+ {ok, #group{fd=Fd, current_seq=Seq}=Group} ->
+ {ok, Db} = couch_db:open(DbName, []),
case Seq > couch_db:get_update_seq(Db) of
true ->
ReturnPid ! {Ref, self(), {error, invalid_view_seq}},
+ couch_db:close(Db),
ignore;
_ ->
- couch_db:monitor(Db),
+ try couch_db:monitor(Db) after couch_db:close(Db) end,
Owner = self(),
- Pid = spawn_link(
- fun()-> couch_view_updater:update(Owner, Group) end
- ),
- {ok, RefCounter} = couch_ref_counter:start([Fd]),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
{ok, #group_state{
- db_name=couch_db:name(Db),
+ db_name= DbName,
init_args=InitArgs,
updater_pid = Pid,
- group=Group,
- ref_counter=RefCounter}}
+ group=Group#group{dbname=DbName},
+ ref_counter=erlang:monitor(process,Fd)}}
end;
Error ->
ReturnPid ! {Ref, self(), Error},
@@ -120,19 +118,16 @@ init({InitArgs, ReturnPid, Ref}) ->
handle_call({request_group, RequestSeq}, From,
#group_state{
- db_name=DbName,
group=#group{current_seq=Seq}=Group,
updater_pid=nil,
waiting_list=WaitList
}=State) when RequestSeq > Seq ->
- {ok, Db} = couch_db:open_int(DbName, []),
- Group2 = Group#group{db=Db},
Owner = self(),
- Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
+ Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
{noreply, State#group_state{
updater_pid=Pid,
- group=Group2,
+ group=Group,
waiting_list=[{From,RequestSeq}|WaitList]
}, infinity};
@@ -153,6 +148,10 @@ handle_call({request_group, RequestSeq}, From,
waiting_list=[{From, RequestSeq}|WaitList]
}, infinity};
+handle_call({start_compact, CompactFun}, _From, State) ->
+ {noreply, NewState} = handle_cast({start_compact, CompactFun}, State),
+ {reply, {ok, NewState#group_state.compactor_pid}, NewState};
+
handle_call(request_group_info, _From, State) ->
GroupInfo = get_group_info(State),
{reply, {ok, GroupInfo}, State}.
@@ -160,24 +159,23 @@ handle_call(request_group_info, _From, State) ->
handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
= State) ->
#group_state{
- group = #group{name = GroupId, sig = GroupSig} = Group,
- init_args = {RootDir, DbName, _}
+ group = #group{dbname = DbName, name = GroupId, sig = GroupSig} = Group,
+ init_args = {RootDir, _, _}
} = State,
?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]),
- {ok, Db} = couch_db:open_int(DbName, []),
{ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
- NewGroup = reset_file(Db, Fd, DbName, Group),
+ NewGroup = reset_file(Fd, DbName, Group),
Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
{noreply, State#group_state{compactor_pid = Pid}};
handle_cast({start_compact, _}, State) ->
%% compact already running, this is a no-op
{noreply, State};
-handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
+handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
#group_state{group = #group{current_seq=OldSeq}} = State)
when NewSeq >= OldSeq ->
#group_state{
- group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
+ group = #group{name=GroupId, fd=OldFd, sig=GroupSig},
init_args = {RootDir, DbName, _},
updater_pid = UpdaterPid,
ref_counter = RefCounter
@@ -202,17 +200,12 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
%% cleanup old group
unlink(OldFd),
- couch_ref_counter:drop(RefCounter),
- {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
- case Group#group.db of
- nil -> ok;
- Else -> couch_db:close(Else)
- end,
+ erlang:demonitor(RefCounter),
self() ! delayed_commit,
{noreply, State#group_state{
group=NewGroup,
- ref_counter=NewRefCounter,
+ ref_counter=erlang:monitor(process,NewFd),
compactor_pid=nil,
updater_pid=NewUpdaterPid
}};
@@ -223,17 +216,14 @@ handle_cast({compact_done, NewGroup}, State) ->
} = State,
?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
"compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
- couch_db:close(NewGroup#group.db),
- {ok, Db} = couch_db:open_int(DbName, []),
+ GroupServer = self(),
Pid = spawn_link(fun() ->
+ erlang:monitor(process, NewGroup#group.fd),
{_,Ref} = erlang:spawn_monitor(fun() ->
- couch_view_updater:update(nil, NewGroup#group{db = Db})
+ couch_view_updater:update(nil, NewGroup)
end),
- receive
- {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
- #group{name=GroupId} = NewGroup2,
- Pid2 = couch_view:get_group_server(DbName, GroupId),
- gen_server:cast(Pid2, {compact_done, NewGroup2})
+ receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
+ gen_server:cast(GroupServer, {compact_done, NewGroup2})
end
end),
{noreply, State#group_state{compactor_pid = Pid}};
@@ -245,7 +235,7 @@ handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
waiting_commit = WaitingCommit
} = State,
NewSeq = NewGroup#group.current_seq,
- ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
+ ?LOG_DEBUG("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
DbName, NewGroup#group.name]),
if not WaitingCommit ->
erlang:send_after(1000, self(), delayed_commit);
@@ -275,13 +265,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
{noreply, State#group_state{waiting_commit=true}}
end;
-handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
- #group_state{db_name=DbName,
+handle_info({'EXIT', FromPid, {new_group, Group}},
+ #group_state{
updater_pid=UpPid,
ref_counter=RefCounter,
waiting_list=WaitList,
waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
- ok = couch_db:close(Db),
if not WaitingCommit ->
erlang:send_after(1000, self(), delayed_commit);
true -> ok
@@ -289,26 +278,20 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
case reply_with_group(Group, WaitList, [], RefCounter) of
[] ->
{noreply, State#group_state{waiting_commit=true, waiting_list=[],
- group=Group#group{db=nil}, updater_pid=nil}};
+ group=Group, updater_pid=nil}};
StillWaiting ->
- % we still have some waiters, reopen the database and reupdate the index
- {ok, Db2} = couch_db:open_int(DbName, []),
- Group2 = Group#group{db=Db2},
+ % we still have some waiters, reupdate the index
Owner = self(),
- Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
+ Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group) end),
{noreply, State#group_state{waiting_commit=true,
- waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
+ waiting_list=StillWaiting, group=Group, updater_pid=Pid}}
end;
handle_info({'EXIT', _, {new_group, _}}, State) ->
%% message from an old (probably pre-compaction) updater; ignore
{noreply, State};
-handle_info({'EXIT', FromPid, reset},
- #group_state{
- init_args=InitArgs,
- updater_pid=UpPid,
- group=Group}=State) when UpPid == FromPid ->
- ok = couch_db:close(Group#group.db),
+handle_info({'EXIT', FromPid, reset}, #group_state{init_args=InitArgs,
+ updater_pid=FromPid}=State) ->
case prepare_group(InitArgs, true) of
{ok, ResetGroup} ->
Owner = self(),
@@ -334,8 +317,9 @@ handle_info({'EXIT', FromPid, Reason}, State) ->
?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]),
{stop, Reason, State};
-handle_info({'DOWN',_,_,_,_}, State) ->
- ?LOG_INFO("Shutting down view group server, monitored db is closing.", []),
+handle_info({'DOWN',_,_,Pid,Reason}, #group_state{group=G}=State) ->
+ ?LOG_INFO("Shutting down group server ~p, db ~p closing w/ reason~n~p",
+ [G#group.name, Pid, Reason]),
{stop, normal, reply_all(State, shutdown)}.
@@ -371,32 +355,29 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) ->
[catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList],
State#group_state{waiting_list=[]}.
+prepare_group({Root, DbName, #group{dbname=X}=G}, Reset) when X =/= DbName ->
+ prepare_group({Root, DbName, G#group{dbname=DbName}}, Reset);
prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)->
- case couch_db:open_int(DbName, []) of
- {ok, Db} ->
- case open_index_file(RootDir, DbName, Sig) of
- {ok, Fd} ->
- if ForceReset ->
- % this can happen if we missed a purge
- {ok, reset_file(Db, Fd, DbName, Group)};
- true ->
- % 09 UPGRADE CODE
- ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
- case (catch couch_file:read_header(Fd)) of
- {ok, {Sig, HeaderInfo}} ->
- % sigs match!
- {ok, init_group(Db, Fd, Group, HeaderInfo)};
- _ ->
- % this happens on a new file
- {ok, reset_file(Db, Fd, DbName, Group)}
- end
- end;
- Error ->
- catch delete_index_file(RootDir, DbName, Sig),
- Error
+ case open_index_file(RootDir, DbName, Sig) of
+ {ok, Fd} ->
+ if ForceReset ->
+ % this can happen if we missed a purge
+ {ok, reset_file(Fd, DbName, Group)};
+ true ->
+ % 09 UPGRADE CODE
+ ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
+ case (catch couch_file:read_header(Fd)) of
+ {ok, {Sig, HeaderInfo}} ->
+ % sigs match!
+ {ok, init_group(Fd, Group, HeaderInfo)};
+ _ ->
+ % this happens on a new file
+ {ok, reset_file(Fd, DbName, Group)}
+ end
end;
- Else ->
- Else
+ Error ->
+ catch delete_index_file(RootDir, DbName, Sig),
+ Error
end.
get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq,
@@ -446,7 +427,7 @@ open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end,
options=DesignOptions},
- {ok, Db, set_view_sig(#group{name = <<"_temp">>, db=Db, views=[View],
+ {ok, Db, set_view_sig(#group{name = <<"_temp">>, views=[View],
def_lang=Language, design_options=DesignOptions})};
Error ->
Error
@@ -531,28 +512,39 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) ->
{View#view{id_num=N},N+1}
end, 0, lists:sort(dict:to_list(DictBySrc))),
- set_view_sig(#group{name=Id, views=Views, def_lang=Language, design_options=DesignOptions}).
-
-reset_group(#group{views=Views}=Group) ->
- Views2 = [View#view{btree=nil} || View <- Views],
- Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
- id_btree=nil,views=Views2}.
-
-reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
- ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]),
+ #group{
+ name = Id,
+ views = Views,
+ def_lang = Language,
+ design_options = DesignOptions,
+ sig = couch_util:md5(term_to_binary({Views, Language, DesignOptions}))
+ }.
+
+reset_group(DbName, #group{views=Views}=Group) ->
+ Group#group{
+ fd = nil,
+ dbname = DbName,
+ query_server = nil,
+ current_seq = 0,
+ id_btree = nil,
+ views = [View#view{btree=nil} || View <- Views]
+ }.
+
+reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
+ ?LOG_INFO("Resetting group index \"~s\" in db ~s", [Name, DbName]),
ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, {Sig, nil}),
- init_group(Db, Fd, reset_group(Group), nil).
+ init_group(Fd, reset_group(DbName, Group), nil).
delete_index_file(RootDir, DbName, GroupSig) ->
couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)).
-init_group(Db, Fd, #group{views=Views}=Group, nil) ->
- init_group(Db, Fd, Group,
- #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
- id_btree_state=nil, view_states=[nil || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
- Group, IndexHeader) ->
+init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) ->
+ {ok, Db} = couch_db:open(DbName, []),
+ PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end,
+ Header = #index_header{purge_seq=PurgeSeq, view_states=[nil || _ <- Views]},
+ init_group(Fd, Group, Header);
+init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) ->
#index_header{seq=Seq, purge_seq=PurgeSeq,
id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
{ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
@@ -580,13 +572,10 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
<<"raw">> ->
Less = fun(A,B) -> A < B end
end,
- {ok, Btree} = couch_btree:open(BtreeState, Fd,
- [{less, Less},
- {reduce, ReduceFun}]),
+ {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, Less},
+ {reduce, ReduceFun}]),
View#view{btree=Btree}
end,
ViewStates, Views),
- Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
- id_btree=IdBtree, views=Views2}.
-
-
+ Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree,
+ views=Views2}.
diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl
index 2a9c960f..8424862b 100644
--- a/apps/couch/src/couch_view_updater.erl
+++ b/apps/couch/src/couch_view_updater.erl
@@ -20,20 +20,21 @@
update(Owner, Group) ->
#group{
- db = #db{name=DbName} = Db,
+ dbname = DbName,
name = GroupName,
current_seq = Seq,
purge_seq = PurgeSeq
} = Group,
couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),
+ {ok, Db} = couch_db:open(DbName, []),
DbPurgeSeq = couch_db:get_purge_seq(Db),
Group2 =
if DbPurgeSeq == PurgeSeq ->
Group;
DbPurgeSeq == PurgeSeq + 1 ->
couch_task_status:update(<<"Removing purged entries from view index.">>),
- purge_index(Group);
+ purge_index(Db, Group);
true ->
couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
exit(reset)
@@ -77,7 +78,7 @@ update(Owner, Group) ->
end.
-purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
+purge_index(Db, #group{views=Views, id_btree=IdBtree}=Group) ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
{ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
@@ -108,9 +109,14 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
views=Views2,
purge_seq=couch_db:get_purge_seq(Db)}.
-
-load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign) ->
- #doc_info{id=DocId, high_seq=Seq, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo,
+-spec load_doc(#db{}, #doc_info{}, pid(), [atom()], boolean()) -> ok.
+load_doc(Db, DI, MapQueue, DocOpts, IncludeDesign) ->
+ DocInfo = case DI of
+ #full_doc_info{id=DocId, update_seq=Seq, deleted=Deleted} ->
+ couch_doc:to_doc_info(DI);
+ #doc_info{id=DocId, high_seq=Seq, revs=[#rev_info{deleted=Deleted}|_]} ->
+ DI
+ end,
case {IncludeDesign, DocId} of
{false, <<?DESIGN_DOC_PREFIX, _/binary>>} -> % we skip design docs
ok;
@@ -122,7 +128,8 @@ load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign) ->
couch_work_queue:queue(MapQueue, {Seq, Doc})
end
end.
-
+
+-spec do_maps(#group{}, pid(), pid(), any()) -> any().
do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) ->
case couch_work_queue:dequeue(MapQueue) of
closed ->
@@ -139,6 +146,7 @@ do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) ->
do_maps(Group1, MapQueue, WriteQueue, ViewEmptyKVs)
end.
+-spec do_writes(pid(), pid() | nil, #group{}, pid(), boolean()) -> any().
do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) ->
case couch_work_queue:dequeue(WriteQueue) of
closed ->
@@ -165,6 +173,7 @@ do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) ->
do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild)
end.
+-spec view_insert_query_results([#doc{}], list(), any(), any()) -> any().
view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
{ViewKVs, DocIdViewIdKeysAcc};
view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) ->
@@ -172,7 +181,8 @@ view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs,
NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc],
view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys).
-
+-spec view_insert_doc_query_results(#doc{}, list(), list(), any(), any()) ->
+ any().
view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) ->
{lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)};
view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) ->
@@ -199,6 +209,7 @@ view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{Vie
NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc,
view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc).
+-spec view_compute(#group{}, [#doc{}]) -> {#group{}, any()}.
view_compute(Group, []) ->
{Group, []};
view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) ->
@@ -214,7 +225,6 @@ view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -
{Group#group{query_server=QueryServer}, Results}.
-
write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq, InitialBuild) ->
#group{id_btree=IdBtree} = Group,