diff options
Diffstat (limited to 'apps/couch')
30 files changed, 1118 insertions, 891 deletions
diff --git a/apps/couch/include/couch_db.hrl b/apps/couch/include/couch_db.hrl index a35745ef..f17e6bb1 100644 --- a/apps/couch/include/couch_db.hrl +++ b/apps/couch/include/couch_db.hrl @@ -142,9 +142,9 @@ {disk_version = ?LATEST_DISK_VERSION, update_seq = 0, unused = 0, - fulldocinfo_by_id_btree_state = nil, - docinfo_by_seq_btree_state = nil, - local_docs_btree_state = nil, + id_tree_state = nil, + seq_tree_state = nil, + local_tree_state = nil, purge_seq = 0, purged_docs = nil, security_ptr = nil, @@ -157,12 +157,12 @@ compactor_pid = nil, instance_start_time, % number of microsecs since jan 1 1970 as a binary string fd, - fd_ref_counter, + fd_monitor, header = #db_header{}, committed_update_seq, - fulldocinfo_by_id_btree, - docinfo_by_seq_btree, - local_docs_btree, + id_tree, + seq_tree, + local_tree, update_seq, name, filepath, @@ -196,7 +196,9 @@ stale = false, multi_get = false, callback = nil, - list = nil + list = nil, + keys = nil, + sorted = true }). -record(view_fold_helper_funs, { @@ -221,7 +223,7 @@ -record(group, { sig=nil, - db=nil, + dbname, fd=nil, name, def_lang, diff --git a/apps/couch/src/couch_auth_cache.erl b/apps/couch/src/couch_auth_cache.erl index 078bfcc1..0264a69d 100644 --- a/apps/couch/src/couch_auth_cache.erl +++ b/apps/couch/src/couch_auth_cache.erl @@ -282,6 +282,8 @@ refresh_entries(AuthDb) -> end. +refresh_entry(Db, #full_doc_info{} = FDI) -> + refresh_entry(Db, couch_doc:to_doc_info(FDI)); refresh_entry(Db, #doc_info{high_seq = DocSeq} = DocInfo) -> case is_user_doc(DocInfo) of {true, UserName} -> diff --git a/apps/couch/src/couch_btree.erl b/apps/couch/src/couch_btree.erl index 0e47bac7..4ed3fe54 100644 --- a/apps/couch/src/couch_btree.erl +++ b/apps/couch/src/couch_btree.erl @@ -16,23 +16,27 @@ -export([fold/4, full_reduce/1, final_reduce/2, foldl/3, foldl/4]). -export([fold_reduce/4, lookup/2, get_state/1, set_options/2]). --define(CHUNK_THRESHOLD, 16#4ff). - -record(btree, {fd, root, - extract_kv = fun({Key, Value}) -> {Key, Value} end, - assemble_kv = fun(Key, Value) -> {Key, Value} end, - less = fun(A, B) -> A < B end, + extract_kv, + assemble_kv, + less, reduce = nil }). +extract(#btree{extract_kv = undefined}, Value) -> + Value; extract(#btree{extract_kv=Extract}, Value) -> Extract(Value). +assemble(#btree{assemble_kv = undefined}, Key, Value) -> + {Key, Value}; assemble(#btree{assemble_kv=Assemble}, Key, Value) -> Assemble(Key, Value). +less(#btree{less = undefined}, A, B) -> + A < B; less(#btree{less=Less}, A, B) -> Less(A, B). @@ -106,29 +110,29 @@ convert_fun_arity(Fun) when is_function(Fun, 2) -> convert_fun_arity(Fun) when is_function(Fun, 3) -> Fun. % Already arity 3 -make_key_in_end_range_function(#btree{less=Less}, fwd, Options) -> +make_key_in_end_range_function(Bt, fwd, Options) -> case couch_util:get_value(end_key_gt, Options) of undefined -> case couch_util:get_value(end_key, Options) of undefined -> fun(_Key) -> true end; LastKey -> - fun(Key) -> not Less(LastKey, Key) end + fun(Key) -> not less(Bt, LastKey, Key) end end; EndKey -> - fun(Key) -> Less(Key, EndKey) end + fun(Key) -> less(Bt, Key, EndKey) end end; -make_key_in_end_range_function(#btree{less=Less}, rev, Options) -> +make_key_in_end_range_function(Bt, rev, Options) -> case couch_util:get_value(end_key_gt, Options) of undefined -> case couch_util:get_value(end_key, Options) of undefined -> fun(_Key) -> true end; LastKey -> - fun(Key) -> not Less(Key, LastKey) end + fun(Key) -> not less(Bt, Key, LastKey) end end; EndKey -> - fun(Key) -> Less(EndKey, Key) end + fun(Key) -> less(Bt, EndKey, Key) end end. @@ -198,7 +202,11 @@ op_order(remove) -> 2; op_order(insert) -> 3. lookup(#btree{root=Root, less=Less}=Bt, Keys) -> - SortedKeys = lists:sort(Less, Keys), + case Less of undefined -> + SortedKeys = lists:sort(Keys); + _ -> + SortedKeys = lists:sort(Less, Keys) + end, {ok, SortedResults} = lookup(Bt, Root, SortedKeys), % We want to return the results in the same order as the keys were input % but we may have changed the order when we sorted. So we need to put the @@ -271,9 +279,11 @@ complete_root(Bt, KPs) -> % it's probably really inefficient. chunkify(InList) -> + BaseChunkSize = list_to_integer(couch_config:get("couchdb", + "btree_chunk_size", "1279")), case byte_size(term_to_binary(InList)) of - Size when Size > ?CHUNK_THRESHOLD -> - NumberOfChunksLikely = ((Size div ?CHUNK_THRESHOLD) + 1), + Size when Size > BaseChunkSize -> + NumberOfChunksLikely = ((Size div BaseChunkSize) + 1), ChunkThreshold = Size div NumberOfChunksLikely, chunkify(InList, ChunkThreshold, [], 0, []); _Else -> diff --git a/apps/couch/src/couch_changes.erl b/apps/couch/src/couch_changes.erl index 3a5bc4f8..a9c08509 100644 --- a/apps/couch/src/couch_changes.erl +++ b/apps/couch/src/couch_changes.erl @@ -13,12 +13,13 @@ -module(couch_changes). -include("couch_db.hrl"). --export([handle_changes/3]). +-export([handle_changes/3, get_changes_timeout/2, main_only_filter/1, + all_docs_filter/1, wait_db_updated/2, get_rest_db_updated/0, + make_filter_fun/4]). -%% @type Req -> #httpd{} | {json_req, JsonObj()} -handle_changes(#changes_args{style=Style}=Args1, Req, Db) -> - Args = Args1#changes_args{filter= - make_filter_fun(Args1#changes_args.filter, Style, Req, Db)}, +%% @spec handle_changes(#changes_args{}, #httpd{} | {json_req, {[any()]}}, #db{}) -> any() +handle_changes(#changes_args{filter=Raw, style=Style}=Args1, Req, Db) -> + Args = Args1#changes_args{filter=make_filter_fun(Raw, Style, Req, Db)}, StartSeq = case Args#changes_args.dir of rev -> couch_db:get_update_seq(Db); @@ -68,20 +69,12 @@ handle_changes(#changes_args{style=Style}=Args1, Req, Db) -> end end. -%% @type Req -> #httpd{} | {json_req, JsonObj()} -make_filter_fun(FilterName, Style, Req, Db) -> - case [list_to_binary(couch_httpd:unquote(Part)) - || Part <- string:tokens(FilterName, "/")] of +%% @spec make_filter_fun(string(), main_only|all_docs, #httpd{} | {json_req, +%% {[any()]}}, #db{}) -> fun() +make_filter_fun(Filter, Style, Req, Db) when is_list(Filter) -> + case [?l2b(couch_httpd:unquote(X)) || X <- string:tokens(Filter, "/")] of [] -> - fun(#doc_info{revs=[#rev_info{rev=Rev}|_]=Revs}) -> - case Style of - main_only -> - [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]; - all_docs -> - [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} - || #rev_info{rev=R} <- Revs] - end - end; + make_filter_fun(nil, Style, Req, Db); [DName, FName] -> DesignId = <<"_design/", DName/binary>>, DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []), @@ -105,11 +98,21 @@ make_filter_fun(FilterName, Style, Req, Db) -> [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]} || {Pass, #doc{revs={RevPos,[RevId|_]}}} <- lists:zip(Passes, Docs), Pass == true] - end; + end; _Else -> throw({bad_request, "filter parameter must be of the form `designname/filtername`"}) - end. + end; +make_filter_fun(_, main_only, _, _) -> + fun ?MODULE:main_only_filter/1; +make_filter_fun(_, all_docs, _, _) -> + fun ?MODULE:all_docs_filter/1. + +main_only_filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}) -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}]. + +all_docs_filter(#doc_info{revs=Revs}) -> + [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs]. get_changes_timeout(Args, Callback) -> #changes_args{ @@ -205,8 +208,8 @@ end_sending_changes(Callback, EndSeq, ResponseType) -> changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous", Limit, IncludeDocs}) -> - #doc_info{id=Id, high_seq=Seq, - revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo, + #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]} + = DocInfo, Results0 = FilterFun(DocInfo), Results = [Result || Result <- Results0, Result /= null], Go = if Limit =< 1 -> stop; true -> ok end, diff --git a/apps/couch/src/couch_config.erl b/apps/couch/src/couch_config.erl index be53e3a3..73abdfd5 100644 --- a/apps/couch/src/couch_config.erl +++ b/apps/couch/src/couch_config.erl @@ -88,7 +88,7 @@ register(Fun) -> ?MODULE:register(Fun, self()). register(Fun, Pid) -> - gen_server:call(?MODULE, {register, Fun, Pid}). + couch_config_event:register(Fun, Pid). init(IniFiles) -> @@ -111,7 +111,7 @@ terminate(_Reason, _State) -> handle_call(all, _From, Config) -> Resp = lists:sort((ets:tab2list(?MODULE))), {reply, Resp, Config}; -handle_call({set, Sec, Key, Val, Persist}, From, Config) -> +handle_call({set, Sec, Key, Val, Persist}, _From, Config) -> true = ets:insert(?MODULE, {{Sec, Key}, Val}), case {Persist, Config#config.write_filename} of {true, undefined} -> @@ -121,12 +121,10 @@ handle_call({set, Sec, Key, Val, Persist}, From, Config) -> _ -> ok end, - spawn_link(fun() -> - [catch F(Sec, Key, Val, Persist) || {_Pid, F} <- Config#config.notify_funs], - gen_server:reply(From, ok) - end), - {noreply, Config}; -handle_call({delete, Sec, Key, Persist}, From, Config) -> + Event = {config_change, Sec, Key, Val, Persist}, + gen_event:sync_notify(couch_config_event, Event), + {reply, ok, Config}; +handle_call({delete, Sec, Key, Persist}, _From, Config) -> true = ets:delete(?MODULE, {Sec,Key}), case {Persist, Config#config.write_filename} of {true, undefined} -> @@ -136,26 +134,9 @@ handle_call({delete, Sec, Key, Persist}, From, Config) -> _ -> ok end, - spawn_link(fun() -> - [catch F(Sec, Key, deleted, Persist) || {_Pid, F} <- Config#config.notify_funs], - gen_server:reply(From, ok) - end), - {noreply, Config}; -handle_call({register, Fun, Pid}, _From, #config{notify_funs=PidFuns}=Config) -> - erlang:monitor(process, Pid), - % convert 1 and 2 arity to 3 arity - Fun2 = - case Fun of - _ when is_function(Fun, 1) -> - fun(Section, _Key, _Value, _Persist) -> Fun(Section) end; - _ when is_function(Fun, 2) -> - fun(Section, Key, _Value, _Persist) -> Fun(Section, Key) end; - _ when is_function(Fun, 3) -> - fun(Section, Key, Value, _Persist) -> Fun(Section, Key, Value) end; - _ when is_function(Fun, 4) -> - Fun - end, - {reply, ok, Config#config{notify_funs=[{Pid, Fun2} | PidFuns]}}. + Event = {config_change, Sec, Key, deleted, Persist}, + gen_event:sync_notify(couch_config_event, Event), + {reply, ok, Config}. handle_cast(stop, State) -> @@ -163,10 +144,9 @@ handle_cast(stop, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _, _, DownPid, _}, #config{notify_funs=PidFuns}=Config) -> - % remove any funs registered by the downed process - FilteredPidFuns = [{Pid,Fun} || {Pid,Fun} <- PidFuns, Pid /= DownPid], - {noreply, Config#config{notify_funs=FilteredPidFuns}}. +handle_info(Info, State) -> + ?LOG_ERROR("couch_config:handle_info Info: ~p~n", [Info]), + {noreply, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/apps/couch/src/couch_config_event.erl b/apps/couch/src/couch_config_event.erl new file mode 100644 index 00000000..e353c7d8 --- /dev/null +++ b/apps/couch/src/couch_config_event.erl @@ -0,0 +1,46 @@ +-module(couch_config_event). +-behaviour(gen_event). +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0, register/2]). + +-include("couch_db.hrl"). + +start_link() -> + gen_event:start_link({local, ?MODULE}). + +register(Fun, Pid) -> + gen_event:add_handler(?MODULE, {?MODULE, Fun}, [Fun, Pid]). + +init([Fun, Pid]) -> + Ref = erlang:monitor(process, Pid), + {ok, {Fun, Ref}}. + +handle_event({config_change,Sec,_,_,_}, {F,_}=St) when is_function(F,1) -> + catch F(Sec), + {ok, St}; +handle_event({config_change,Sec,K,_,_}, {F,_}=St) when is_function(F,2) -> + catch F(Sec,K), + {ok, St}; +handle_event({config_change,Sec,K,V,_}, {F,_}=St) when is_function(F,3) -> + catch F(Sec,K,V), + {ok, St}; +handle_event({config_change,Sec,K,V,Write}, {F,_}=St) when is_function(F,4) -> + catch F(Sec,K,V,Write), + {ok, St}. + +handle_call(_Request, St) -> + {ok, ok, St}. + +handle_info({'DOWN', Ref, _, _, _}, {_, Ref}) -> + remove_handler; +handle_info(_Info, St) -> + {ok, St}. + +terminate(Reason, St) -> + ?LOG_INFO("config_event handler ~p terminating with ~p", [St, Reason]), + ok. + +code_change(_OldVsn, St, _Extra) -> + {ok, St}. diff --git a/apps/couch/src/couch_db.erl b/apps/couch/src/couch_db.erl index 7678f6ca..a3112e24 100644 --- a/apps/couch/src/couch_db.erl +++ b/apps/couch/src/couch_db.erl @@ -11,7 +11,6 @@ % the License. -module(couch_db). --behaviour(gen_server). -export([open/2,open_int/2,close/1,create/2,start_compact/1,get_db_info/1,get_design_docs/1]). -export([open_ref_counted/2,is_idle/1,monitor/1,count_changes_since/2]). @@ -22,21 +21,20 @@ -export([enum_docs/4,enum_docs_since/5]). -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]). -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]). --export([start_link/3,open_doc_int/3,ensure_full_commit/1]). +-export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]). -export([set_security/2,get_security/1]). --export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -export([changes_since/5,changes_since/6,read_doc/2,new_revid/1]). --export([check_is_admin/1, check_is_reader/1]). +-export([check_is_admin/1, check_is_reader/1, get_doc_count/1, load_validation_funs/1]). -include("couch_db.hrl"). - start_link(DbName, Filepath, Options) -> case open_db_file(Filepath, Options) of {ok, Fd} -> - StartResult = gen_server:start_link(couch_db, {DbName, Filepath, Fd, Options}, []), + {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName, + Filepath, Fd, Options}, []), unlink(Fd), - StartResult; + gen_server:call(UpdaterPid, get_db); Else -> Else end. @@ -52,7 +50,7 @@ open_db_file(Filepath, Options) -> {ok, Fd} -> ?LOG_INFO("Found ~s~s compaction file, using as primary storage.", [Filepath, ".compact"]), ok = file:rename(Filepath ++ ".compact", Filepath), - ok = couch_file:sync(Fd), + ok = couch_file:sync(Filepath), {ok, Fd}; {error, enoent} -> {not_found, no_db_file} @@ -86,24 +84,33 @@ open(DbName, Options) -> Else -> Else end. -ensure_full_commit(#db{update_pid=UpdatePid,instance_start_time=StartTime}) -> - ok = gen_server:call(UpdatePid, full_commit, infinity), +ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> + ok = gen_server:call(Pid, full_commit, infinity), + {ok, StartTime}. + +ensure_full_commit(Db, RequiredSeq) -> + #db{main_pid=Pid, instance_start_time=StartTime} = Db, + ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity), {ok, StartTime}. -close(#db{fd_ref_counter=RefCntr}) -> - couch_ref_counter:drop(RefCntr). +close(#db{fd_monitor=RefCntr}) -> + erlang:demonitor(RefCntr). open_ref_counted(MainPid, OpenedPid) -> gen_server:call(MainPid, {open_ref_count, OpenedPid}). -is_idle(MainPid) -> - gen_server:call(MainPid, is_idle). +is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) -> + {monitored_by, Pids} = erlang:process_info(Db#db.fd, monitored_by), + (Pids -- [Db#db.main_pid, whereis(couch_stats_collector)]) =:= []; +is_idle(_Db) -> + false. monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). -start_compact(#db{update_pid=Pid}) -> - gen_server:call(Pid, start_compact). +start_compact(#db{main_pid=Pid}) -> + {ok, _} = gen_server:call(Pid, start_compact), + ok. delete_doc(Db, Id, Revisions) -> DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions], @@ -210,13 +217,13 @@ get_full_doc_info(Db, Id) -> Result. get_full_doc_infos(Db, Ids) -> - couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids). + couch_btree:lookup(Db#db.id_tree, Ids). -increment_update_seq(#db{update_pid=UpdatePid}) -> - gen_server:call(UpdatePid, increment_update_seq). +increment_update_seq(#db{main_pid=Pid}) -> + gen_server:call(Pid, increment_update_seq). -purge_docs(#db{update_pid=UpdatePid}, IdsRevs) -> - gen_server:call(UpdatePid, {purge_docs, IdsRevs}). +purge_docs(#db{main_pid=Pid}, IdsRevs) -> + gen_server:call(Pid, {purge_docs, IdsRevs}). get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. @@ -232,13 +239,17 @@ get_last_purged(#db{header=#db_header{purged_docs=nil}}) -> get_last_purged(#db{fd=Fd, header=#db_header{purged_docs=PurgedPointer}}) -> couch_file:pread_term(Fd, PurgedPointer). +get_doc_count(Db) -> + {ok, {Count, _DelCount}} = couch_btree:full_reduce(Db#db.id_tree), + {ok, Count}. + get_db_info(Db) -> #db{fd=Fd, header=#db_header{disk_version=DiskVersion}, compactor_pid=Compactor, update_seq=SeqNum, name=Name, - fulldocinfo_by_id_btree=FullDocBtree, + id_tree=FullDocBtree, instance_start_time=StartTime, committed_update_seq=CommittedUpdateSeq} = Db, {ok, Size} = couch_file:bytes(Fd), @@ -257,7 +268,12 @@ get_db_info(Db) -> ], {ok, InfoList}. -get_design_docs(#db{fulldocinfo_by_id_btree=Btree}=Db) -> +get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) -> + {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end), + receive {'DOWN', Ref, _, _, Response} -> + Response + end; +get_design_docs(#db{id_tree=Btree}=Db) -> {ok,_, Docs} = couch_btree:fold(Btree, fun(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, AccDocs) -> {ok, Doc} = couch_db:open_doc_int(Db, FullDocInfo, []), @@ -319,7 +335,7 @@ get_readers(#db{security=SecProps}) -> get_security(#db{security=SecProps}) -> {SecProps}. -set_security(#db{update_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> +set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> check_is_admin(Db), ok = validate_security_object(NewSecProps), ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity), @@ -354,7 +370,7 @@ validate_names_and_roles({Props}) when is_list(Props) -> get_revs_limit(#db{revs_limit=Limit}) -> Limit. -set_revs_limit(#db{update_pid=Pid}=Db, Limit) when Limit > 0 -> +set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> check_is_admin(Db), gen_server:call(Pid, {set_revs_limit, Limit}, infinity); set_revs_limit(_Db, _Limit) -> @@ -406,6 +422,9 @@ group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) -> validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}, _GetDiskDocFun) -> catch check_is_admin(Db); +validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) -> + ValidationFuns = load_validation_funs(Db), + validate_doc_update(Db#db{validate_doc_funs = ValidationFuns}, Doc, Fun); validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) -> ok; validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) -> @@ -424,6 +443,27 @@ validate_doc_update(Db, Doc, GetDiskDocFun) -> Error end. +% to be safe, spawn a middleman here +load_validation_funs(#db{main_pid = Pid} = Db) -> + {_, Ref} = spawn_monitor(fun() -> + {ok, DesignDocs} = get_design_docs(Db), + exit({ok, lists:flatmap(fun(DesignDoc) -> + case couch_doc:get_validate_doc_fun(DesignDoc) of + nil -> + []; + Fun -> + [Fun] + end + end, DesignDocs)}) + end), + receive + {'DOWN', Ref, _, _, {ok, Funs}} -> + gen_server:cast(Pid, {load_validation_funs, Funs}), + Funs; + {'DOWN', Ref, _, _, Reason} -> + ?LOG_ERROR("could not load validation funs ~p", [Reason]), + throw(internal_server_error) + end. prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, OldFullDocInfo, LeafRevsDict, AllowConflict) -> @@ -512,8 +552,8 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [PreppedBucket | AccPrepped], AccErrors3). -update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) -> - update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options, interactive_edit). +update_docs(Db, Docs, Options) -> + update_docs(Db, Docs, Options, interactive_edit). prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) -> @@ -743,37 +783,37 @@ set_commit_option(Options) -> [full_commit|Options] end. -collect_results(UpdatePid, MRef, ResultsAcc) -> +collect_results(Pid, MRef, ResultsAcc) -> receive - {result, UpdatePid, Result} -> - collect_results(UpdatePid, MRef, [Result | ResultsAcc]); - {done, UpdatePid} -> + {result, Pid, Result} -> + collect_results(Pid, MRef, [Result | ResultsAcc]); + {done, Pid} -> {ok, ResultsAcc}; - {retry, UpdatePid} -> + {retry, Pid} -> retry; {'DOWN', MRef, _, _, Reason} -> exit(Reason) end. -write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, +write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets, NonRepDocs, Options0) -> Options = set_commit_option(Options0), MergeConflicts = lists:member(merge_conflicts, Options), FullCommit = lists:member(full_commit, Options), - MRef = erlang:monitor(process, UpdatePid), + MRef = erlang:monitor(process, Pid), try - UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit}, - case collect_results(UpdatePid, MRef, []) of + Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit}, + case collect_results(Pid, MRef, []) of {ok, Results} -> {ok, Results}; retry -> % This can happen if the db file we wrote to was swapped out by % compaction. Retry by reopening the db and writing to the current file - {ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), + {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]), DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], % We only retry once close(Db2), - UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, - case collect_results(UpdatePid, MRef, []) of + Pid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, + case collect_results(Pid, MRef, []) of {ok, Results} -> {ok, Results}; retry -> throw({update_error, compaction_retry}) end @@ -921,9 +961,15 @@ enum_docs_reduce_to_count(Reds) -> changes_since(Db, Style, StartSeq, Fun, Acc) -> changes_since(Db, Style, StartSeq, Fun, [], Acc). - + changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> - Wrapper = fun(DocInfo, _Offset, Acc2) -> + Wrapper = fun(FullDocInfo, _Offset, Acc2) -> + case FullDocInfo of + #full_doc_info{} -> + DocInfo = couch_doc:to_doc_info(FullDocInfo); + #doc_info{} -> + DocInfo = FullDocInfo + end, #doc_info{revs=Revs} = DocInfo, DocInfo2 = case Style of @@ -936,83 +982,27 @@ changes_since(Db, Style, StartSeq, Fun, Options, Acc) -> end, Fun(DocInfo2, Acc2) end, - {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, - Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), + {ok, _LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, Wrapper, + Acc, [{start_key, couch_util:to_integer(StartSeq) + 1} | Options]), {ok, AccOut}. count_changes_since(Db, SinceSeq) -> {ok, Changes} = - couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, + couch_btree:fold_reduce(Db#db.seq_tree, fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)} + {ok, couch_btree:final_reduce(Db#db.seq_tree, PartialReds)} end, 0, [{start_key, SinceSeq + 1}]), Changes. enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> - {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.docinfo_by_seq_btree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]), + {ok, LastReduction, AccOut} = couch_btree:fold(Db#db.seq_tree, InFun, Acc, [{start_key, SinceSeq + 1} | Options]), {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. enum_docs(Db, InFun, InAcc, Options) -> - {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.fulldocinfo_by_id_btree, InFun, InAcc, Options), + {ok, LastReduce, OutAcc} = couch_btree:fold(Db#db.id_tree, InFun, InAcc, Options), {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. -% server functions - -init({DbName, Filepath, Fd, Options}) -> - {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []), - {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db), - couch_ref_counter:add(RefCntr), - case lists:member(sys_db, Options) of - true -> - ok; - false -> - couch_stats_collector:track_process_count({couchdb, open_databases}) - end, - process_flag(trap_exit, true), - {ok, Db}. - -terminate(_Reason, Db) -> - couch_util:shutdown_sync(Db#db.update_pid), - ok. - -handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) -> - ok = couch_ref_counter:add(RefCntr, OpenerPid), - {reply, {ok, Db}, Db}; -handle_call(is_idle, _From, #db{fd_ref_counter=RefCntr, compactor_pid=Compact, - waiting_delayed_commit=Delay}=Db) -> - % Idle means no referrers. Unless in the middle of a compaction file switch, - % there are always at least 2 referrers, couch_db_updater and us. - {reply, (Delay == nil) andalso (Compact == nil) andalso (couch_ref_counter:count(RefCntr) == 2), Db}; -handle_call({db_updated, NewDb}, _From, #db{fd_ref_counter=OldRefCntr}) -> - #db{fd_ref_counter=NewRefCntr}=NewDb, - case NewRefCntr =:= OldRefCntr of - true -> ok; - false -> - couch_ref_counter:add(NewRefCntr), - couch_ref_counter:drop(OldRefCntr) - end, - {reply, ok, NewDb}; -handle_call(get_db, _From, Db) -> - {reply, {ok, Db}, Db}. - - -handle_cast(Msg, Db) -> - ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]), - exit({error, Msg}). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -handle_info({'EXIT', _Pid, normal}, Db) -> - {noreply, Db}; -handle_info({'EXIT', _Pid, Reason}, Server) -> - {stop, Reason, Server}; -handle_info(Msg, Db) -> - ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), - exit({error, Msg}). - - %%% Internal function %%% open_doc_revs_int(Db, IdRevs, Options) -> Ids = [Id || {Id, _Revs} <- IdRevs], @@ -1054,7 +1044,7 @@ open_doc_revs_int(Db, IdRevs, Options) -> IdRevs, LookupResults). open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, _Options) -> - case couch_btree:lookup(Db#db.local_docs_btree, [Id]) of + case couch_btree:lookup(Db#db.local_tree, [Id]) of [{ok, {_, {Rev, BodyData}}}] -> {ok, #doc{id=Id, revs={0, [list_to_binary(integer_to_list(Rev))]}, body=BodyData}}; [not_found] -> diff --git a/apps/couch/src/couch_db_update_notifier_sup.erl b/apps/couch/src/couch_db_update_notifier_sup.erl index 4d730fc7..e7cc16c1 100644 --- a/apps/couch/src/couch_db_update_notifier_sup.erl +++ b/apps/couch/src/couch_db_update_notifier_sup.erl @@ -22,16 +22,14 @@ -behaviour(supervisor). --export([start_link/0,init/1]). +-export([start_link/0, init/1, config_change/3]). start_link() -> supervisor:start_link({local, couch_db_update_notifier_sup}, couch_db_update_notifier_sup, []). init([]) -> - ok = couch_config:register( - fun("update_notification", Key, Value) -> reload_config(Key, Value) end - ), + ok = couch_config:register(fun ?MODULE:config_change/3), UpdateNotifierExes = couch_config:get("update_notification"), @@ -48,7 +46,7 @@ init([]) -> %% @doc when update_notification configuration changes, terminate the process %% for that notifier and start a new one with the updated config -reload_config(Id, Exe) -> +config_change("update_notification", Id, Exe) -> ChildSpec = { Id, {couch_db_update_notifier, start_link, [Exe]}, diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl index 19a4c165..e4f8d0ca 100644 --- a/apps/couch/src/couch_db_updater.erl +++ b/apps/couch/src/couch_db_updater.erl @@ -13,14 +13,14 @@ -module(couch_db_updater). -behaviour(gen_server). --export([btree_by_id_reduce/2,btree_by_seq_reduce/2]). +-export([btree_by_id_split/1,btree_by_id_join/2,btree_by_id_reduce/2]). +-export([btree_by_seq_split/1,btree_by_seq_join/2,btree_by_seq_reduce/2]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include("couch_db.hrl"). -init({MainPid, DbName, Filepath, Fd, Options}) -> - process_flag(trap_exit, true), +init({DbName, Filepath, Fd, Options}) -> case lists:member(create, Options) of true -> % create a new header and writes it to the file @@ -44,25 +44,40 @@ init({MainPid, DbName, Filepath, Fd, Options}) -> end, Db = init_db(DbName, Filepath, Fd, Header), - Db2 = refresh_validate_doc_funs(Db), - {ok, Db2#db{main_pid = MainPid, is_sys_db = lists:member(sys_db, Options)}}. + couch_stats_collector:track_process_count({couchdb, open_databases}), + % we don't load validation funs here because the fabric query is liable to + % race conditions. Instead see couch_db:validate_doc_update, which loads + % them lazily + {ok, Db#db{main_pid = self(), is_sys_db = lists:member(sys_db, Options)}}. terminate(_Reason, Db) -> couch_file:close(Db#db.fd), couch_util:shutdown_sync(Db#db.compactor_pid), - couch_util:shutdown_sync(Db#db.fd_ref_counter), + couch_util:shutdown_sync(Db#db.fd), ok. +handle_call(start_compact, _From, Db) -> + {noreply, NewDb} = handle_cast(start_compact, Db), + {reply, {ok, NewDb#db.compactor_pid}, NewDb}; + handle_call(get_db, _From, Db) -> {reply, {ok, Db}, Db}; handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) -> + {reply, ok, Db}; +handle_call(full_commit, _From, Db) -> + {reply, ok, commit_data(Db)}; + +handle_call({full_commit, _}, _From, #db{waiting_delayed_commit=nil}=Db) -> {reply, ok, Db}; % no data waiting, return ok immediately -handle_call(full_commit, _From, Db) -> +handle_call({full_commit, RequiredSeq}, _From, Db) when RequiredSeq =< + Db#db.committed_update_seq -> + {reply, ok, Db}; +handle_call({full_commit, _}, _, Db) -> {reply, ok, commit_data(Db)}; % commit the data and return ok handle_call(increment_update_seq, _From, Db) -> Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), couch_db_update_notifier:notify({updated, Db#db.name}), {reply, {ok, Db2#db.update_seq}, Db2}; @@ -70,13 +85,13 @@ handle_call({set_security, NewSec}, _From, Db) -> {ok, Ptr} = couch_file:append_term(Db#db.fd, NewSec), Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr, update_seq=Db#db.update_seq+1}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {reply, ok, Db2}; handle_call({set_revs_limit, Limit}, _From, Db) -> Db2 = commit_data(Db#db{revs_limit=Limit, update_seq=Db#db.update_seq+1}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {reply, ok, Db2}; handle_call({purge_docs, _IdRevs}, _From, @@ -85,8 +100,8 @@ handle_call({purge_docs, _IdRevs}, _From, handle_call({purge_docs, IdRevs}, _From, Db) -> #db{ fd=Fd, - fulldocinfo_by_id_btree = DocInfoByIdBTree, - docinfo_by_seq_btree = DocInfoBySeqBTree, + id_tree = DocInfoByIdBTree, + seq_tree = DocInfoBySeqBTree, update_seq = LastSeq, header = Header = #db_header{purge_seq=PurgeSeq} } = Db, @@ -136,29 +151,32 @@ handle_call({purge_docs, IdRevs}, _From, Db) -> Db2 = commit_data( Db#db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree2, - docinfo_by_seq_btree = DocInfoBySeqBTree2, + id_tree = DocInfoByIdBTree2, + seq_tree = DocInfoBySeqBTree2, update_seq = NewSeq + 1, header=Header#db_header{purge_seq=PurgeSeq+1, purged_docs=Pointer}}), - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), couch_db_update_notifier:notify({updated, Db#db.name}), - {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}; -handle_call(start_compact, _From, Db) -> + {reply, {ok, (Db2#db.header)#db_header.purge_seq, IdRevsPurged}, Db2}. + + +handle_cast({load_validation_funs, ValidationFuns}, Db) -> + Db2 = Db#db{validate_doc_funs = ValidationFuns}, + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2}; +handle_cast(start_compact, Db) -> case Db#db.compactor_pid of nil -> ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]), Pid = spawn_link(fun() -> start_copy_compact(Db) end), Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), - {reply, ok, Db2}; + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2}; _ -> % compact currently running, this is a no-op - {reply, ok, Db} - end. - - - + {noreply, Db} + end; handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> {ok, NewFd} = couch_file:open(CompactFilepath), {ok, NewHeader} = couch_file:read_header(NewFd), @@ -168,13 +186,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> case Db#db.update_seq == NewSeq of true -> % suck up all the local docs into memory and write them to the new db - {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree, + {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree, fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_docs_btree, LocalDocs), + {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs), NewDb2 = commit_data(NewDb#db{ - local_docs_btree = NewLocalBtree, - main_pid = Db#db.main_pid, + local_tree = NewLocalBtree, + main_pid = self(), filepath = Filepath, instance_start_time = Db#db.instance_start_time, revs_limit = Db#db.revs_limit @@ -186,13 +204,13 @@ handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) -> couch_file:delete(RootDir, Filepath), ok = file:rename(CompactFilepath, Filepath), close_db(Db), - ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}), + ok = gen_server:call(couch_server, {db_updated, NewDb2}, infinity), ?LOG_INFO("Compaction for db \"~s\" completed.", [Db#db.name]), {noreply, NewDb2#db{compactor_pid=nil}}; false -> - ?LOG_INFO("Compaction file still behind main file " + ?LOG_INFO("Compaction for ~s still behind main file " "(update seq=~p. compact update seq=~p). Retrying.", - [Db#db.update_seq, NewSeq]), + [Db#db.name, Db#db.update_seq, NewSeq]), close_db(NewDb), Pid = spawn_link(fun() -> start_copy_compact(Db) end), Db2 = Db#db{compactor_pid=Pid}, @@ -215,7 +233,7 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts, FullCommit2) of {ok, Db2} -> - ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), if Db2#db.update_seq /= Db#db.update_seq -> couch_db_update_notifier:notify({updated, Db2#db.name}); true -> ok @@ -235,13 +253,16 @@ handle_info(delayed_commit, Db) -> Db -> {noreply, Db}; Db2 -> - ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {noreply, Db2} end; handle_info({'EXIT', _Pid, normal}, Db) -> {noreply, Db}; handle_info({'EXIT', _Pid, Reason}, Db) -> - {stop, Reason, Db}. + {stop, Reason, Db}; +handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) -> + ?LOG_ERROR("DB ~s shutting down - Fd ~p", [Name, Reason]), + {stop, normal, Db}. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -279,14 +300,27 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> end. -btree_by_seq_split(#doc_info{id=Id, high_seq=KeySeq, revs=Revs}) -> - RevInfos = [{Rev, Seq, Bp} || - #rev_info{rev=Rev,seq=Seq,deleted=false,body_sp=Bp} <- Revs], - DeletedRevInfos = [{Rev, Seq, Bp} || - #rev_info{rev=Rev,seq=Seq,deleted=true,body_sp=Bp} <- Revs], - {KeySeq,{Id, RevInfos, DeletedRevInfos}}. +rev_tree(DiskTree) -> + couch_key_tree:map(fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> + {IsDeleted == 1, BodyPointer, UpdateSeq}; + (_RevId, ?REV_MISSING) -> + ?REV_MISSING + end, DiskTree). + +disk_tree(RevTree) -> + couch_key_tree:map(fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> + {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq}; + (_RevId, ?REV_MISSING) -> + ?REV_MISSING + end, RevTree). +btree_by_seq_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Del, rev_tree=T}) -> + {Seq, {Id, if Del -> 1; true -> 0 end, disk_tree(T)}}. + +btree_by_seq_join(Seq, {Id, Del, T}) when is_integer(Del) -> + #full_doc_info{id=Id, update_seq=Seq, deleted=Del==1, rev_tree=rev_tree(T)}; btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> + % 1.0 stored #doc_info records in the seq tree. compact to upgrade. #doc_info{ id = Id, high_seq=KeySeq, @@ -310,14 +344,7 @@ btree_by_seq_join(KeySeq,{Id, Rev, Bp, Conflicts, DelConflicts, Deleted}) -> btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq, deleted=Deleted, rev_tree=Tree}) -> - DiskTree = - couch_key_tree:map( - fun(_RevId, {IsDeleted, BodyPointer, UpdateSeq}) -> - {if IsDeleted -> 1; true -> 0 end, BodyPointer, UpdateSeq}; - (_RevId, ?REV_MISSING) -> - ?REV_MISSING - end, Tree), - {Id, {Seq, if Deleted -> 1; true -> 0 end, DiskTree}}. + {Id, {Seq, if Deleted -> 1; true -> 0 end, disk_tree(Tree)}}. btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> Tree = @@ -377,19 +404,19 @@ init_db(DbName, Filepath, Fd, Header0) -> "[before_header, after_header, on_file_open]")), case lists:member(on_file_open, FsyncOptions) of - true -> ok = couch_file:sync(Fd); + true -> ok = couch_file:sync(Filepath); _ -> ok end, - {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd, - [{split, fun(X) -> btree_by_id_split(X) end}, - {join, fun(X,Y) -> btree_by_id_join(X,Y) end}, - {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]), - {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd, - [{split, fun(X) -> btree_by_seq_split(X) end}, - {join, fun(X,Y) -> btree_by_seq_join(X,Y) end}, - {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]), - {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd), + {ok, IdBtree} = couch_btree:open(Header#db_header.id_tree_state, Fd, + [{split, fun ?MODULE:btree_by_id_split/1}, + {join, fun ?MODULE:btree_by_id_join/2}, + {reduce, fun ?MODULE:btree_by_id_reduce/2}]), + {ok, SeqBtree} = couch_btree:open(Header#db_header.seq_tree_state, Fd, + [{split, fun ?MODULE:btree_by_seq_split/1}, + {join, fun ?MODULE:btree_by_seq_join/2}, + {reduce, fun ?MODULE:btree_by_seq_reduce/2}]), + {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_tree_state, Fd), case Header#db_header.security_ptr of nil -> Security = [], @@ -401,15 +428,13 @@ init_db(DbName, Filepath, Fd, Header0) -> {MegaSecs, Secs, MicroSecs} = now(), StartTime = ?l2b(io_lib:format("~p", [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), - {ok, RefCntr} = couch_ref_counter:start([Fd]), #db{ - update_pid=self(), fd=Fd, - fd_ref_counter = RefCntr, + fd_monitor = erlang:monitor(process,Fd), header=Header, - fulldocinfo_by_id_btree = IdBtree, - docinfo_by_seq_btree = SeqBtree, - local_docs_btree = LocalDocsBtree, + id_tree = IdBtree, + seq_tree = SeqBtree, + local_tree = LocalDocsBtree, committed_update_seq = Header#db_header.update_seq, update_seq = Header#db_header.update_seq, name = DbName, @@ -422,8 +447,8 @@ init_db(DbName, Filepath, Fd, Header0) -> }. -close_db(#db{fd_ref_counter = RefCntr}) -> - couch_ref_counter:drop(RefCntr). +close_db(#db{fd_monitor = Ref}) -> + erlang:demonitor(Ref). refresh_validate_doc_funs(Db) -> @@ -435,7 +460,13 @@ refresh_validate_doc_funs(Db) -> Fun -> [Fun] end end, DesignDocs), - Db#db{validate_doc_funs=ProcessDocFuns}. + case Db#db.name of + <<"shards/", _:18/binary, DbName/binary>> -> + fabric:reset_validation_funs(DbName), + Db#db{validate_doc_funs=ProcessDocFuns}; + _ -> + Db#db{validate_doc_funs=ProcessDocFuns} + end. % rev tree functions @@ -563,14 +594,11 @@ merge_rev_trees(MergeConflicts, [NewDocs|RestDocsList], -new_index_entries([], AccById, AccBySeq) -> - {AccById, AccBySeq}; -new_index_entries([FullDocInfo|RestInfos], AccById, AccBySeq) -> - #doc_info{revs=[#rev_info{deleted=Deleted}|_]} = DocInfo = - couch_doc:to_doc_info(FullDocInfo), - new_index_entries(RestInfos, - [FullDocInfo#full_doc_info{deleted=Deleted}|AccById], - [DocInfo|AccBySeq]). +new_index_entries([], Acc) -> + Acc; +new_index_entries([Info|Rest], Acc) -> + #doc_info{revs=[#rev_info{deleted=Del}|_]} = couch_doc:to_doc_info(Info), + new_index_entries(Rest, [Info#full_doc_info{deleted=Del}|Acc]). stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> @@ -579,8 +607,8 @@ stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> #db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree, - docinfo_by_seq_btree = DocInfoBySeqBTree, + id_tree = DocInfoByIdBTree, + seq_tree = DocInfoBySeqBTree, update_seq = LastSeq } = Db, Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], @@ -607,16 +635,17 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> % the trees, the attachments are already written to disk) {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), - {IndexFullDocInfos, IndexDocInfos} = - new_index_entries(FlushedFullDocInfos, [], []), + IndexInfos = new_index_entries(FlushedFullDocInfos, []), % and the indexes - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []), - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexDocInfos, RemoveSeqs), + {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, + IndexInfos, []), + {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, + IndexInfos, RemoveSeqs), Db3 = Db2#db{ - fulldocinfo_by_id_btree = DocInfoByIdBTree2, - docinfo_by_seq_btree = DocInfoBySeqBTree2, + id_tree = DocInfoByIdBTree2, + seq_tree = DocInfoBySeqBTree2, update_seq = NewSeq}, % Check if we just updated any design documents, and update the validation @@ -631,24 +660,26 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> {ok, commit_data(Db4, not FullCommit)}. -update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> +update_local_docs(#db{local_tree=Btree}=Db, Docs) -> Ids = [Id || {_Client, #doc{id=Id}} <- Docs], OldDocLookups = couch_btree:lookup(Btree, Ids), BtreeEntries = lists:zipwith( - fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, OldDocLookup) -> + fun({Client, #doc{id=Id,deleted=Delete,revs={0,PrevRevs},body=Body}}, + _OldDocLookup) -> case PrevRevs of [RevStr|_] -> PrevRev = list_to_integer(?b2l(RevStr)); [] -> PrevRev = 0 end, - OldRev = - case OldDocLookup of - {ok, {_, {OldRev0, _}}} -> OldRev0; - not_found -> 0 - end, - case OldRev == PrevRev of - true -> + %% disabled conflict checking for local docs -- APK 16 June 2010 + % OldRev = + % case OldDocLookup of + % {ok, {_, {OldRev0, _}}} -> OldRev0; + % not_found -> 0 + % end, + % case OldRev == PrevRev of + % true -> case Delete of false -> send_result(Client, Id, {0, PrevRevs}, {ok, @@ -658,11 +689,11 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> send_result(Client, Id, {0, PrevRevs}, {ok, {0, <<"0">>}}), {remove, Id} - end; - false -> - send_result(Client, Id, {0, PrevRevs}, conflict), - ignore - end + end%; + % false -> + % send_result(Client, Id, {0, PrevRevs}, conflict), + % ignore + % end end, Docs, OldDocLookups), BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], @@ -671,7 +702,7 @@ update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) -> {ok, Btree2} = couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Db#db{local_docs_btree = Btree2}}. + {ok, Db#db{local_tree = Btree2}}. commit_data(Db) -> @@ -680,9 +711,9 @@ commit_data(Db) -> db_to_header(Db, Header) -> Header#db_header{ update_seq = Db#db.update_seq, - docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree), - fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree), - local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree), + seq_tree_state = couch_btree:get_state(Db#db.seq_tree), + id_tree_state = couch_btree:get_state(Db#db.id_tree), + local_tree_state = couch_btree:get_state(Db#db.local_tree), security_ptr = Db#db.security_ptr, revs_limit = Db#db.revs_limit}. @@ -771,31 +802,36 @@ copy_rev_tree_attachments(SrcDb, DestFd, Tree) -> (_, _, branch) -> ?REV_MISSING end, Tree). - -copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> - Ids = [Id || #doc_info{id=Id} <- InfoBySeq], - LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids), +merge_lookups(Infos, []) -> + Infos; +merge_lookups([], _) -> + []; +merge_lookups([#doc_info{}|RestInfos], [{ok, FullDocInfo}|RestLookups]) -> + [FullDocInfo|merge_lookups(RestInfos, RestLookups)]; +merge_lookups([FullDocInfo|RestInfos], Lookups) -> + [FullDocInfo|merge_lookups(RestInfos, Lookups)]. + +copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) -> + % lookup any necessary full_doc_infos + DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], + LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), + Infos = merge_lookups(MixedInfos, LookupResults), % write out the attachments - NewFullDocInfos0 = lists:map( - fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) -> - Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, DestFd, RevTree)} - end, LookupResults), + NewInfos0 = [Info#full_doc_info{rev_tree=copy_rev_tree_attachments(Db, + DestFd, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- Infos], + % write out the docs % we do this in 2 stages so the docs are written out contiguously, making % view indexing and replication faster. - NewFullDocInfos1 = lists:map( - fun(#full_doc_info{rev_tree=RevTree}=Info) -> - Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( - fun(_Key, {IsDel, DocBody, Seq}) -> - {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), - {IsDel, Pos, Seq} - end, RevTree)} - end, NewFullDocInfos0), - - NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos1), - NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos], + NewInfos1 = [Info#full_doc_info{rev_tree=couch_key_tree:map_leafs( + fun(_Key, {IsDel, DocBody, Seq}) -> + {ok, Pos} = couch_file:append_term_md5(DestFd, DocBody), + {IsDel, Pos, Seq} + end, RevTree)} || #full_doc_info{rev_tree=RevTree}=Info <- NewInfos0], + + NewInfos = stem_full_doc_infos(Db, NewInfos1), RemoveSeqs = case Retry of false -> @@ -803,16 +839,16 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) -> true -> % We are retrying a compaction, meaning the documents we are copying may % already exist in our file and must be removed from the by_seq index. - Existing = couch_btree:lookup(NewDb#db.fulldocinfo_by_id_btree, Ids), + Ids = [Id || #full_doc_info{id=Id} <- Infos], + Existing = couch_btree:lookup(NewDb#db.id_tree, Ids), [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] end, - {ok, DocInfoBTree} = couch_btree:add_remove( - NewDb#db.docinfo_by_seq_btree, NewDocInfos, RemoveSeqs), - {ok, FullDocInfoBTree} = couch_btree:add_remove( - NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []), - NewDb#db{ fulldocinfo_by_id_btree=FullDocInfoBTree, - docinfo_by_seq_btree=DocInfoBTree}. + {ok, SeqTree} = couch_btree:add_remove( + NewDb#db.seq_tree, NewInfos, RemoveSeqs), + {ok, IdTree} = couch_btree:add_remove( + NewDb#db.id_tree, NewInfos, []), + NewDb#db{id_tree=IdTree, seq_tree=SeqTree}. @@ -821,13 +857,20 @@ copy_compact(Db, NewDb0, Retry) -> NewDb = NewDb0#db{fsync_options=FsyncOptions}, TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), EnumBySeqFun = - fun(#doc_info{high_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> + fun(DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> + case DocInfo of + #full_doc_info{update_seq=Seq} -> + ok; + #doc_info{high_seq=Seq} -> + ok + end, couch_task_status:update("Copied ~p of ~p changes (~p%)", [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]), if TotalCopied rem 1000 =:= 0 -> NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), if TotalCopied rem 10000 =:= 0 -> - {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}}; + NewDb3 = commit_data(NewDb2#db{update_seq=Seq}), + {ok, {NewDb3, [], TotalCopied + 1}}; true -> {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}} end; @@ -839,7 +882,7 @@ copy_compact(Db, NewDb0, Retry) -> couch_task_status:set_update_frequency(500), {ok, _, {NewDb2, Uncopied, TotalChanges}} = - couch_btree:foldl(Db#db.docinfo_by_seq_btree, EnumBySeqFun, + couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun, {NewDb, [], 0}, [{start_key, NewDb#db.update_seq + 1}]), diff --git a/apps/couch/src/couch_doc.erl b/apps/couch/src/couch_doc.erl index d15cd7de..d47f85ef 100644 --- a/apps/couch/src/couch_doc.erl +++ b/apps/couch/src/couch_doc.erl @@ -334,6 +334,8 @@ att_to_bin(#att{data=DataFun, att_len=Len}) when is_function(DataFun)-> )) ). +get_validate_doc_fun({Props}) -> + get_validate_doc_fun(couch_doc:from_json_obj({Props})); get_validate_doc_fun(#doc{body={Props}}=DDoc) -> case couch_util:get_value(<<"validate_doc_update">>, Props) of undefined -> @@ -364,7 +366,7 @@ merge_stubs(#doc{id=Id,atts=MemBins}=StubsDoc, #doc{atts=DiskBins}) -> DiskAtt; _ -> throw({missing_stub, - <<"id:", Id/binary, ", name:", Name/binary>>}) + <<"Invalid attachment stub in ", Id/binary, " for ", Name/binary>>}) end; (Att) -> Att @@ -453,15 +455,11 @@ doc_from_multi_part_stream(ContentType, DataFun) -> receive {doc_bytes, DocBytes} -> Doc = from_json_obj(?JSON_DECODE(DocBytes)), - % go through the attachments looking for 'follows' in the data, - % replace with function that reads the data from MIME stream. - ReadAttachmentDataFun = fun() -> - Parser ! {get_bytes, self()}, - receive {bytes, Bytes} -> Bytes end - end, + % we'll send the Parser process ID to the remote nodes so they can + % retrieve their own copies of the attachment data Atts2 = lists:map( fun(#att{data=follows}=A) -> - A#att{data=ReadAttachmentDataFun}; + A#att{data={follows, Parser}}; (A) -> A end, Doc#doc.atts), @@ -484,25 +482,75 @@ mp_parse_doc(body_end, AccBytes) -> From ! {doc_bytes, lists:reverse(AccBytes)} end, fun (Next) -> - mp_parse_atts(Next) + mp_parse_atts(Next, {[], 0, orddict:new(), []}) end. -mp_parse_atts(eof) -> - ok; -mp_parse_atts({headers, _H}) -> - fun (Next) -> - mp_parse_atts(Next) - end; -mp_parse_atts({body, Bytes}) -> - receive {get_bytes, From} -> - From ! {bytes, Bytes} - end, - fun (Next) -> - mp_parse_atts(Next) - end; -mp_parse_atts(body_end) -> - fun (Next) -> - mp_parse_atts(Next) +mp_parse_atts({headers, _}, Acc) -> + fun(Next) -> mp_parse_atts(Next, Acc) end; +mp_parse_atts(body_end, Acc) -> + fun(Next) -> mp_parse_atts(Next, Acc) end; +mp_parse_atts({body, Bytes}, {DataList, Offset, Counters, Waiting}) -> + NewAcc = maybe_send_data({DataList++[Bytes], Offset, Counters, Waiting}), + fun(Next) -> mp_parse_atts(Next, NewAcc) end; +mp_parse_atts(eof, {DataList, Offset, Counters, Waiting}) -> + N = list_to_integer(couch_config:get("cluster", "n", "3")), + M = length(Counters), + case (M == N) andalso DataList == [] of + true -> + ok; + false -> + receive {get_bytes, From} -> + C2 = orddict:update_counter(From, 1, Counters), + NewAcc = maybe_send_data({DataList, Offset, C2, [From|Waiting]}), + mp_parse_atts(eof, NewAcc) + after 3600000 -> + ok + end end. +maybe_send_data({ChunkList, Offset, Counters, Waiting}) -> + receive {get_bytes, From} -> + NewCounters = orddict:update_counter(From, 1, Counters), + maybe_send_data({ChunkList, Offset, NewCounters, [From|Waiting]}) + after 0 -> + % reply to as many writers as possible + NewWaiting = lists:filter(fun(Writer) -> + WhichChunk = orddict:fetch(Writer, Counters), + ListIndex = WhichChunk - Offset, + if ListIndex =< length(ChunkList) -> + Writer ! {bytes, lists:nth(ListIndex, ChunkList)}, + false; + true -> + true + end + end, Waiting), + % check if we can drop a chunk from the head of the list + case Counters of + [] -> + SmallestIndex = 0; + _ -> + SmallestIndex = lists:min(element(2, lists:unzip(Counters))) + end, + Size = length(Counters), + N = list_to_integer(couch_config:get("cluster", "n", "3")), + if Size == N andalso SmallestIndex == (Offset+1) -> + NewChunkList = tl(ChunkList), + NewOffset = Offset+1; + true -> + NewChunkList = ChunkList, + NewOffset = Offset + end, + + % we should wait for a writer if no one has written the last chunk + LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]), + if LargestIndex >= (Offset + length(ChunkList)) -> + % someone has written all possible chunks, keep moving + {NewChunkList, NewOffset, Counters, NewWaiting}; + true -> + receive {get_bytes, X} -> + C2 = orddict:update_counter(X, 1, Counters), + maybe_send_data({NewChunkList, NewOffset, C2, [X|NewWaiting]}) + end + end + end. diff --git a/apps/couch/src/couch_drv.erl b/apps/couch/src/couch_drv.erl new file mode 100644 index 00000000..70028659 --- /dev/null +++ b/apps/couch/src/couch_drv.erl @@ -0,0 +1,38 @@ +-module(couch_drv). +-behaviour(gen_server). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([start_link/0]). + +-include("couch_db.hrl"). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + case erl_ddll:load(code:priv_dir(couch), "couch_erl_driver") of + ok -> + {ok, nil}; + {error, already_loaded} -> + ?LOG_INFO("~p reloading couch_erl_driver", [?MODULE]), + ok = erl_ddll:reload(code:priv_dir(couch), "couch_erl_driver"), + {ok, nil}; + {error, Error} -> + {stop, erl_ddll:format_error(Error)} + end. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/apps/couch/src/couch_external_manager.erl b/apps/couch/src/couch_external_manager.erl index 7e401389..0c66ef8c 100644 --- a/apps/couch/src/couch_external_manager.erl +++ b/apps/couch/src/couch_external_manager.erl @@ -39,7 +39,7 @@ config_change("external", UrlName) -> init([]) -> process_flag(trap_exit, true), Handlers = ets:new(couch_external_manager_handlers, [set, private]), - couch_config:register(fun config_change/2), + couch_config:register(fun ?MODULE:config_change/2), {ok, Handlers}. terminate(_Reason, Handlers) -> diff --git a/apps/couch/src/couch_file.erl b/apps/couch/src/couch_file.erl index 0a891712..2d539f64 100644 --- a/apps/couch/src/couch_file.erl +++ b/apps/couch/src/couch_file.erl @@ -69,10 +69,10 @@ open(Filepath, Options) -> %%---------------------------------------------------------------------- append_term(Fd, Term) -> - append_binary(Fd, term_to_binary(Term)). + append_binary(Fd, term_to_binary(Term, [compressed, {minor_version,1}])). append_term_md5(Fd, Term) -> - append_binary_md5(Fd, term_to_binary(Term)). + append_binary_md5(Fd, term_to_binary(Term, [compressed, {minor_version,1}])). %%---------------------------------------------------------------------- @@ -237,6 +237,7 @@ init_status_error(ReturnPid, Ref, Error) -> init({Filepath, Options, ReturnPid, Ref}) -> process_flag(trap_exit, true), + timer:send_after(60000, maybe_close), case lists:member(create, Options) of true -> filelib:ensure_dir(Filepath), @@ -479,6 +480,18 @@ handle_cast(close, Fd) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_info(maybe_close, Fd) -> + case process_info(self(), monitored_by) of + {monitored_by, [_StatsCollector]} -> + {stop, normal, Fd}; + {monitored_by, []} -> + ?LOG_ERROR("~p ~p is un-monitored, maybe stats collector died", + [?MODULE, self()]), + {stop, normal, Fd}; + _Else -> + timer:send_after(10000, maybe_close), + {noreply, Fd} + end; handle_info({'EXIT', _, normal}, Fd) -> {noreply, Fd}; handle_info({'EXIT', _, Reason}, Fd) -> diff --git a/apps/couch/src/couch_os_process.erl b/apps/couch/src/couch_os_process.erl index 5776776b..1fe38e8e 100644 --- a/apps/couch/src/couch_os_process.erl +++ b/apps/couch/src/couch_os_process.erl @@ -59,12 +59,12 @@ prompt(Pid, Data) -> % Utility functions for reading and writing % in custom functions -writeline(OsProc, Data) when is_record(OsProc, os_proc) -> - port_command(OsProc#os_proc.port, Data ++ "\n"). +writeline(#os_proc{port=Port}, Data) -> + port_command(Port, Data ++ "\n"). readline(#os_proc{} = OsProc) -> readline(OsProc, []). -readline(#os_proc{port = Port} = OsProc, Acc) -> +readline(#os_proc{port=Port, timeout=Timeout} = OsProc, Acc) -> receive {Port, {data, {noeol, Data}}} -> readline(OsProc, [Data|Acc]); @@ -73,7 +73,7 @@ readline(#os_proc{port = Port} = OsProc, Acc) -> {Port, Err} -> catch port_close(Port), throw({os_process_error, Err}) - after OsProc#os_proc.timeout -> + after Timeout -> catch port_close(Port), throw({os_process_error, "OS process timed out."}) end. @@ -81,12 +81,12 @@ readline(#os_proc{port = Port} = OsProc, Acc) -> % Standard JSON functions writejson(OsProc, Data) when is_record(OsProc, os_proc) -> JsonData = ?JSON_ENCODE(Data), - ?LOG_DEBUG("OS Process ~p Input :: ~s", [OsProc#os_proc.port, JsonData]), + % ?LOG_DEBUG("OS Process ~p Input :: ~s", [OsProc#os_proc.port, JsonData]), true = writeline(OsProc, JsonData). -readjson(OsProc) when is_record(OsProc, os_proc) -> +readjson(#os_proc{} = OsProc) -> Line = readline(OsProc), - ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]), + % ?LOG_DEBUG("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]), case ?JSON_DECODE(Line) of [<<"log">>, Msg] when is_binary(Msg) -> % we got a message to log. Log it and continue @@ -104,9 +104,7 @@ readjson(OsProc) when is_record(OsProc, os_proc) -> % gen_server API init([Command, Options, PortOptions]) -> - process_flag(trap_exit, true), - PrivDir = couch_util:priv_dir(), - Spawnkiller = filename:join(PrivDir, "couchspawnkillable"), + Spawnkiller = filename:join([code:root_dir(), "bin", "couchspawnkillable"]), BaseProc = #os_proc{ command=Command, port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions), @@ -115,7 +113,7 @@ init([Command, Options, PortOptions]) -> }, KillCmd = readline(BaseProc), Pid = self(), - ?LOG_DEBUG("OS Process Start :: ~p", [BaseProc#os_proc.port]), + % ?LOG_DEBUG("OS Process Start :: ~p", [BaseProc#os_proc.port]), spawn(fun() -> % this ensure the real os process is killed when this process dies. erlang:monitor(process, Pid), diff --git a/apps/couch/src/couch_primary_sup.erl b/apps/couch/src/couch_primary_sup.erl new file mode 100644 index 00000000..e822b70a --- /dev/null +++ b/apps/couch/src/couch_primary_sup.erl @@ -0,0 +1,42 @@ +-module(couch_primary_sup). +-behaviour(supervisor). +-export([init/1, start_link/0]). + +start_link() -> + supervisor:start_link({local,couch_primary_services}, ?MODULE, []). + +init([]) -> + Children = [ + {collation_driver, + {couch_drv, start_link, []}, + permanent, + brutal_kill, + worker, + [couch_drv]}, + {couch_task_status, + {couch_task_status, start_link, []}, + permanent, + brutal_kill, + worker, + [couch_task_status]}, + {couch_server, + {couch_server, sup_start_link, []}, + permanent, + brutal_kill, + worker, + [couch_server]}, + {couch_db_update_event, + {gen_event, start_link, [{local, couch_db_update}]}, + permanent, + brutal_kill, + worker, + dynamic}, + {couch_replication_supervisor, + {couch_rep_sup, start_link, []}, + permanent, + infinity, + supervisor, + [couch_rep_sup]} + ], + {ok, {{one_for_one, 10, 3600}, Children}}. + diff --git a/apps/couch/src/couch_query_servers.erl b/apps/couch/src/couch_query_servers.erl index c4f1bf0b..144b7494 100644 --- a/apps/couch/src/couch_query_servers.erl +++ b/apps/couch/src/couch_query_servers.erl @@ -13,7 +13,7 @@ -module(couch_query_servers). -behaviour(gen_server). --export([start_link/0]). +-export([start_link/0, config_change/1]). -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). @@ -21,6 +21,7 @@ -export([filter_docs/5]). -export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]). +-export([get_os_process/1, ret_os_process/1]). % -export([test/0]). @@ -121,25 +122,19 @@ recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) os_reduce(_Lang, [], _KVs) -> {ok, []}; +os_reduce(#proc{} = Proc, OsRedSrcs, KVs) -> + [true, Reductions] = proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]), + {ok, Reductions}; os_reduce(Lang, OsRedSrcs, KVs) -> Proc = get_os_process(Lang), - OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of - [true, Reductions] -> Reductions - after - ok = ret_os_process(Proc) - end, - {ok, OsResults}. + try os_reduce(Proc, OsRedSrcs, KVs) after ok = ret_os_process(Proc) end. -os_rereduce(_Lang, [], _KVs) -> - {ok, []}; +os_rereduce(#proc{} = Proc, OsRedSrcs, KVs) -> + [true, [Reduction]] = proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]), + Reduction; os_rereduce(Lang, OsRedSrcs, KVs) -> Proc = get_os_process(Lang), - try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of - [true, [Reduction]] -> Reduction - after - ok = ret_os_process(Proc) - end. - + try os_rereduce(Proc, OsRedSrcs, KVs) after ok = ret_os_process(Proc) end. builtin_reduce(_Re, [], _KVs, Acc) -> {ok, lists:reverse(Acc)}; @@ -234,16 +229,7 @@ init([]) -> % just stop if one of the config settings change. couch_server_sup % will restart us and then we will pick up the new settings. - ok = couch_config:register( - fun("query_servers" ++ _, _) -> - supervisor:terminate_child(couch_secondary_services, query_servers), - supervisor:restart_child(couch_secondary_services, query_servers) - end), - ok = couch_config:register( - fun("native_query_servers" ++ _, _) -> - supervisor:terminate_child(couch_secondary_services, query_servers), - [supervisor:restart_child(couch_secondary_services, query_servers)] - end), + ok = couch_config:register(fun ?MODULE:config_change/1), Langs = ets:new(couch_query_server_langs, [set, private]), PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), @@ -275,23 +261,15 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProc case ets:lookup(LangProcs, Lang) of [{Lang, [P|Rest]}] -> % find a proc in the set that has the DDoc - case proc_with_ddoc(DDoc, DDocKey, [P|Rest]) of - {ok, Proc} -> - rem_from_list(LangProcs, Lang, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end; + {ok, Proc} = proc_with_ddoc(DDoc, DDocKey, [P|Rest]), + rem_from_list(LangProcs, Lang, Proc), + {reply, {ok, Proc, get_query_server_config()}, Server}; _ -> case (catch new_process(Langs, Lang)) of {ok, Proc} -> add_value(PidProcs, Proc#proc.pid, Proc), - case proc_with_ddoc(DDoc, DDocKey, [Proc]) of - {ok, Proc2} -> - {reply, {ok, Proc2, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end; + {ok, Proc2} = proc_with_ddoc(DDoc, DDocKey, [Proc]), + {reply, {ok, Proc2, get_query_server_config()}, Server}; Error -> {reply, Error, Server} end @@ -348,6 +326,13 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +config_change("query_servers") -> + supervisor:terminate_child(couch_secondary_services, query_servers), + supervisor:restart_child(couch_secondary_services, query_servers); +config_change("native_query_servers") -> + supervisor:terminate_child(couch_secondary_services, query_servers), + supervisor:restart_child(couch_secondary_services, query_servers). + % Private API get_query_server_config() -> diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 65573e8c..126639e0 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -15,7 +15,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([replicate/2, checkpoint/1]). +-export([replicate/2, checkpoint/1, start_link/3]). -include("couch_db.hrl"). @@ -49,6 +49,9 @@ doc_ids = nil }). +start_link(Id, PostBody, UserCtx) -> + gen_server:start_link(?MODULE, [Id, PostBody, UserCtx], []). + %% convenience function to do a simple replication from the shell replicate(Source, Target) when is_list(Source) -> replicate(?l2b(Source), Target); @@ -61,7 +64,7 @@ replicate(Source, Target) when is_binary(Source), is_binary(Target) -> replicate({Props}=PostBody, UserCtx) -> {BaseId, Extension} = make_replication_id(PostBody, UserCtx), Replicator = {BaseId ++ Extension, - {gen_server, start_link, [?MODULE, [BaseId, PostBody, UserCtx], []]}, + {?MODULE, start_link, [BaseId, PostBody, UserCtx]}, temporary, 1, worker, @@ -80,10 +83,15 @@ replicate({Props}=PostBody, UserCtx) -> false -> Server = start_replication_server(Replicator), - case couch_util:get_value(<<"continuous">>, Props, false) of - true -> + Continuous = couch_util:get_value(<<"continuous">>, Props, false), + Async = couch_util:get_value(<<"async">>, Props, false), + case {Continuous, Async} of + {true, _} -> {ok, {continuous, ?l2b(BaseId)}}; - false -> + {_, true} -> + spawn(fun() -> get_result(Server, PostBody, UserCtx) end), + Server; + _ -> get_result(Server, PostBody, UserCtx) end end. @@ -106,7 +114,9 @@ get_result(Server, PostBody, UserCtx) -> init(InitArgs) -> try do_init(InitArgs) - catch throw:{db_not_found, DbUrl} -> {stop, {db_not_found, DbUrl}} end. + catch _:Error -> + {stop, Error} + end. do_init([RepId, {PostProps}, UserCtx] = InitArgs) -> process_flag(trap_exit, true), @@ -211,14 +221,16 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({missing_revs_checkpoint, SourceSeq}, State) -> - couch_task_status:update("MR Processed source update #~p", [SourceSeq]), + couch_task_status:update("MR Processed source update #~p of ~p", + [SourceSeq, seqnum(State#state.source)]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), - couch_task_status:update("W Processed source update #~p", [SourceSeq]), + couch_task_status:update("W Processed source update #~p of ~p", + [SourceSeq, seqnum(State#state.source)]), {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; handle_info({writer_checkpoint, _}, State) -> {noreply, State}; @@ -227,8 +239,14 @@ handle_info({update_stats, Key, N}, State) -> ets:update_counter(State#state.stats, Key, N), {noreply, State}; -handle_info({'DOWN', _, _, _, _}, State) -> - ?LOG_INFO("replication terminating because local DB is shutting down", []), +handle_info({'DOWN', _, _, Pid, _}, State) -> + Me = node(), + case erlang:node(Pid) of + Me -> + ?LOG_INFO("replication terminating - local DB is shutting down", []); + Node -> + ?LOG_INFO("replication terminating - DB on ~p is shutting down", [Node]) + end, timer:cancel(State#state.checkpoint_scheduled), {stop, shutdown, State}; @@ -275,34 +293,35 @@ code_change(_OldVsn, State, _Extra) -> % internal funs start_replication_server(Replicator) -> - RepId = element(1, Replicator), - case supervisor:start_child(couch_rep_sup, Replicator) of + start_replication_server(Replicator, fun start_child/1). + +start_replication_server(Replicator, StartFun) -> + case StartFun(Replicator) of {ok, Pid} -> - ?LOG_INFO("starting new replication ~p at ~p", [RepId, Pid]), Pid; {error, already_present} -> - case supervisor:restart_child(couch_rep_sup, RepId) of - {ok, Pid} -> - ?LOG_INFO("starting replication ~p at ~p", [RepId, Pid]), - Pid; - {error, running} -> - %% this error occurs if multiple replicators are racing - %% each other to start and somebody else won. Just grab - %% the Pid by calling start_child again. - {error, {already_started, Pid}} = - supervisor:start_child(couch_rep_sup, Replicator), - ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), - Pid; - {error, {db_not_found, DbUrl}} -> - throw({db_not_found, <<"could not open ", DbUrl/binary>>}) - end; + start_replication_server(Replicator, fun restart_child/1); {error, {already_started, Pid}} -> - ?LOG_DEBUG("replication ~p already running at ~p", [RepId, Pid]), Pid; + {error, running} -> + Children = supervisor:which_children(couch_rep_sup), + {value, {_, Pid, _, _}} = lists:keysearch(Replicator, 1, Children), + Pid; + % sadly both seem to be needed. I don't know why. {error, {{db_not_found, DbUrl}, _}} -> - throw({db_not_found, <<"could not open ", DbUrl/binary>>}) + throw({db_not_found, <<"could not open ", DbUrl/binary>>}); + {error, {db_not_found, DbUrl}} -> + throw({db_not_found, <<"could not open ", DbUrl/binary>>}); + {error, {node_not_connected, Node}} -> + throw({node_not_connected, Node}) end. +start_child(Replicator) -> + supervisor:start_child(couch_rep_sup, Replicator). + +restart_child(Replicator) -> + supervisor:restart_child(couch_rep_sup, element(1, Replicator)). + compare_replication_logs(SrcDoc, TgtDoc) -> #doc{body={RepRecProps}} = SrcDoc, #doc{body={RepRecPropsTgt}} = TgtDoc, @@ -355,8 +374,8 @@ close_db(Db) -> dbname(#http_db{url = Url}) -> strip_password(Url); -dbname(#db{name = Name}) -> - Name. +dbname(#db{name = Name, main_pid = MainPid}) -> + ?l2b([Name, " (", pid_to_list(MainPid), ")"]). strip_password(Url) -> re:replace(Url, @@ -457,7 +476,12 @@ maybe_append_options(Options, Props) -> make_replication_id({Props}, UserCtx) -> %% funky algorithm to preserve backwards compatibility - {ok, HostName} = inet:gethostname(), + case couch_util:get_value(<<"use_hostname">>, Props, false) of + true -> + {ok, HostName} = inet:gethostname(); + false -> + HostName = couch_config:get("replication", "hostname", "cloudant.com") + end, % Port = mochiweb_socket_server:get(couch_httpd, port), Src = get_rep_endpoint(UserCtx, couch_util:get_value(<<"source">>, Props)), Tgt = get_rep_endpoint(UserCtx, couch_util:get_value(<<"target">>, Props)), @@ -480,15 +504,22 @@ make_replication_id({Props}, UserCtx) -> maybe_add_trailing_slash(Url) -> re:replace(Url, "[^/]$", "&/", [{return, list}]). -get_rep_endpoint(_UserCtx, {Props}) -> - Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), - {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), - {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), - case couch_util:get_value(<<"oauth">>, Auth) of +get_rep_endpoint(UserCtx, {Props}) -> + case couch_util:get_value(<<"url">>, Props) of undefined -> - {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; - {OAuth} -> - {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} + Node = couch_util:get_value(<<"node">>, Props), + Name = couch_util:get_value(<<"name">>, Props), + {Node, Name, UserCtx}; + RawUrl -> + Url = maybe_add_trailing_slash(RawUrl), + {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), + {Auth} = couch_util:get_value(<<"auth">>, Props, {[]}), + case couch_util:get_value(<<"oauth">>, Auth) of + undefined -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders]}; + {OAuth} -> + {remote, Url, [{?b2l(K),?b2l(V)} || {K,V} <- BinHeaders], OAuth} + end end; get_rep_endpoint(_UserCtx, <<"http://",_/binary>>=Url) -> {remote, maybe_add_trailing_slash(Url), []}; @@ -502,27 +533,43 @@ open_replication_log(#http_db{}=Db, RepId) -> Req = Db#http_db{resource=couch_util:url_encode(DocId)}, case couch_rep_httpc:request(Req) of {[{<<"error">>, _}, {<<"reason">>, _}]} -> - ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), + % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#http_db.url]), #doc{id=?l2b(DocId)}; Doc -> - ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), + % ?LOG_DEBUG("found a replication log for ~s", [Db#http_db.url]), couch_doc:from_json_obj(Doc) end; open_replication_log(Db, RepId) -> DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId), case couch_db:open_doc(Db, DocId, []) of {ok, Doc} -> - ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), + % ?LOG_DEBUG("found a replication log for ~s", [Db#db.name]), Doc; _ -> - ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), + % ?LOG_DEBUG("didn't find a replication log for ~s", [Db#db.name]), #doc{id=DocId} end. open_db(Props, UserCtx, ProxyParams) -> open_db(Props, UserCtx, ProxyParams, false). -open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> +open_db(<<"http://",_/binary>>=Url, _, ProxyParams, Create) -> + open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); +open_db(<<"https://",_/binary>>=Url, _, ProxyParams, Create) -> + open_remote_db({[{<<"url">>,Url}]}, ProxyParams, Create); +open_db({Props}, UserCtx, ProxyParams, Create) -> + case couch_util:get_value(<<"url">>, Props) of + undefined -> + Node = couch_util:get_value(<<"node">>, Props, node()), + DbName = couch_util:get_value(<<"name">>, Props), + open_local_db(Node, DbName, UserCtx, Create); + _Url -> + open_remote_db({Props}, ProxyParams, Create) + end; +open_db(<<DbName/binary>>, UserCtx, _ProxyParams, Create) -> + open_local_db(node(), DbName, UserCtx, Create). + +open_remote_db({Props}, ProxyParams, CreateTarget) -> Url = maybe_add_trailing_slash(couch_util:get_value(<<"url">>, Props)), {AuthProps} = couch_util:get_value(<<"auth">>, Props, {[]}), {BinHeaders} = couch_util:get_value(<<"headers">>, Props, {[]}), @@ -534,24 +581,32 @@ open_db({Props}, _UserCtx, ProxyParams, CreateTarget) -> headers = lists:ukeymerge(1, Headers, DefaultHeaders) }, Db = Db1#http_db{options = Db1#http_db.options ++ ProxyParams}, - couch_rep_httpc:db_exists(Db, CreateTarget); -open_db(<<"http://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> - open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); -open_db(<<"https://",_/binary>>=Url, _, ProxyParams, CreateTarget) -> - open_db({[{<<"url">>,Url}]}, [], ProxyParams, CreateTarget); -open_db(<<DbName/binary>>, UserCtx, _ProxyParams, CreateTarget) -> - case CreateTarget of - true -> - ok = couch_httpd:verify_is_server_admin(UserCtx), - couch_server:create(DbName, [{user_ctx, UserCtx}]); - false -> ok - end, - - case couch_db:open(DbName, [{user_ctx, UserCtx}]) of - {ok, Db} -> + couch_rep_httpc:db_exists(Db, CreateTarget). + +open_local_db(Node, DbName, UserCtx, Create) when is_binary(Node) -> + try open_local_db(list_to_existing_atom(?b2l(Node)), DbName, UserCtx, Create) + catch error:badarg -> + ?LOG_ERROR("unknown replication node ~s", [Node]), + throw({node_not_connected, Node}) end; +open_local_db(Node, DbName, UserCtx, Create) when is_atom(Node) -> + case catch gen_server:call({couch_server, Node}, {open, DbName, []}, infinity) of + {ok, #db{} = Db} -> + couch_db:monitor(Db), + Db#db{fd_monitor = erlang:monitor(process, Db#db.fd)}; + {ok, MainPid} when is_pid(MainPid) -> + {ok, Db} = couch_db:open_ref_counted(MainPid, UserCtx), couch_db:monitor(Db), Db; - {not_found, no_db_file} -> throw({db_not_found, DbName}) + {not_found, no_db_file} when Create =:= false-> + throw({db_not_found, DbName}); + {not_found, no_db_file} -> + ok = couch_httpd:verify_is_server_admin(UserCtx), + couch_server:create(DbName, [{user_ctx, UserCtx}]); + {'EXIT', {{nodedown, Node}, _Stack}} -> + throw({node_not_connected, couch_util:to_binary(Node)}); + {'EXIT', {noproc, {gen_server,call,_}}} -> + timer:sleep(1000), + throw({noproc, couch_server, Node}) end. schedule_checkpoint(#state{checkpoint_scheduled = nil} = State) -> @@ -582,9 +637,14 @@ do_checkpoint(State) -> } = State, case commit_to_both(Source, Target, NewSeqNum) of {SrcInstanceStartTime, TgtInstanceStartTime} -> - ?LOG_INFO("recording a checkpoint for ~s -> ~s at source update_seq ~p", - [dbname(Source), dbname(Target), NewSeqNum]), - SessionId = couch_uuids:random(), + ?LOG_DEBUG("recording a checkpoint for ~s -> ~s at source update_seq ~p" + " of ~p", [dbname(Source), dbname(Target), NewSeqNum, seqnum(Source)]), + SessionId = couch_uuids:new(), + TargetNode = case Target of #db{main_pid=MainPid} -> + erlang:node(MainPid); + _ -> + http + end, NewHistoryEntry = {[ {<<"session_id">>, SessionId}, {<<"start_time">>, list_to_binary(ReplicationStartTime)}, @@ -603,6 +663,7 @@ do_checkpoint(State) -> NewRepHistory = {[ {<<"session_id">>, SessionId}, {<<"source_last_seq">>, NewSeqNum}, + {<<"target_node">>, TargetNode}, {<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)} ]}, @@ -622,7 +683,9 @@ do_checkpoint(State) -> "yourself?)", []), State end; - _Else -> + Else -> + ?LOG_INFO("wanted ~p, got ~p from commit_to_both", [ + {SrcInstanceStartTime, TgtInstanceStartTime}, Else]), ?LOG_INFO("rebooting ~s -> ~s from last known replication checkpoint", [dbname(Source), dbname(Target)]), #state{ @@ -654,7 +717,12 @@ commit_to_both(Source, Target, RequiredSeq) -> {SrcCommitPid, Timestamp} -> Timestamp; {'EXIT', SrcCommitPid, {http_request_failed, _}} -> - exit(replication_link_failure) + nil; + {'EXIT', SrcCommitPid, {noproc, {gen_server, call, [_]}}} -> + nil; % DB crashed, this should trigger a reboot + {'EXIT', SrcCommitPid, Else} -> + ?LOG_ERROR("new error code for crashed replication commit ~p", [Else]), + nil end, {SourceStartTime, TargetStartTime}. @@ -667,12 +735,13 @@ ensure_full_commit(#http_db{headers = Headers} = Target) -> {ResultProps} = couch_rep_httpc:request(Req), true = couch_util:get_value(<<"ok">>, ResultProps), couch_util:get_value(<<"instance_start_time">>, ResultProps); -ensure_full_commit(Target) -> - {ok, NewDb} = couch_db:open_int(Target#db.name, []), +ensure_full_commit(#db{name=DbName, main_pid=Pid} = Target) -> + TargetNode = erlang:node(Pid), + {ok, NewDb} = rpc:call(TargetNode, couch_db, open_int, [DbName, []]), UpdateSeq = couch_db:get_update_seq(Target), CommitSeq = couch_db:get_committed_update_seq(NewDb), InstanceStartTime = NewDb#db.instance_start_time, - couch_db:close(NewDb), + catch couch_db:close(NewDb), if UpdateSeq > CommitSeq -> ?LOG_DEBUG("target needs a full commit: update ~p commit ~p", [UpdateSeq, CommitSeq]), @@ -732,6 +801,11 @@ up_to_date(Source, Seq) -> couch_db:close(NewDb), T. +seqnum(#http_db{}) -> + -1; +seqnum(Db) -> + Db#db.update_seq. + parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) -> parse_proxy_params(?b2l(ProxyUrl)); parse_proxy_params([]) -> diff --git a/apps/couch/src/couch_rep_att.erl b/apps/couch/src/couch_rep_att.erl index 28b8945c..476c64d4 100644 --- a/apps/couch/src/couch_rep_att.erl +++ b/apps/couch/src/couch_rep_att.erl @@ -78,7 +78,6 @@ receive_data(Ref, ReqId, ContentEncoding) -> ?LOG_ERROR("streaming attachment ~p failed with ~p", [ReqId, Err]), throw({attachment_request_failed, Err}); {ibrowse_async_response, ReqId, Data} -> - % ?LOG_DEBUG("got ~p bytes for ~p", [size(Data), ReqId]), Data; {ibrowse_async_response_end, ReqId} -> ?LOG_ERROR("streaming att. ended but more data requested ~p", [ReqId]), diff --git a/apps/couch/src/couch_rep_httpc.erl b/apps/couch/src/couch_rep_httpc.erl index aaa38106..3b11b869 100644 --- a/apps/couch/src/couch_rep_httpc.erl +++ b/apps/couch/src/couch_rep_httpc.erl @@ -176,8 +176,8 @@ process_response({error, Reason}, Req) -> Else -> Else end, - ?LOG_DEBUG("retrying couch_rep_httpc ~p request in ~p seconds due to " ++ - "{error, ~p}", [Method, Pause/1000, ShortReason]), + ?LOG_ERROR("~p retry ~p ~s in ~p seconds due to {error, ~p}", + [?MODULE, Method, full_url(Req), Pause/1000, ShortReason]), timer:sleep(Pause), if Reason == worker_is_dead -> C = spawn_link_worker_process(Req), diff --git a/apps/couch/src/couch_rep_reader.erl b/apps/couch/src/couch_rep_reader.erl index 8722f3f5..46633994 100644 --- a/apps/couch/src/couch_rep_reader.erl +++ b/apps/couch/src/couch_rep_reader.erl @@ -20,7 +20,7 @@ -import(couch_util, [url_encode/1]). -define (BUFFER_SIZE, 1000). --define (MAX_CONCURRENT_REQUESTS, 100). +-define (MAX_CONCURRENT_REQUESTS, 10). -define (MAX_CONNECTIONS, 20). -define (MAX_PIPELINE_SIZE, 50). diff --git a/apps/couch/src/couch_secondary_sup.erl b/apps/couch/src/couch_secondary_sup.erl new file mode 100644 index 00000000..8ccbd799 --- /dev/null +++ b/apps/couch/src/couch_secondary_sup.erl @@ -0,0 +1,35 @@ +-module(couch_secondary_sup). +-behaviour(supervisor). +-export([init/1, start_link/0]). + +start_link() -> + supervisor:start_link({local,couch_secondary_services}, ?MODULE, []). +init([]) -> + SecondarySupervisors = [ + {couch_db_update_notifier_sup, + {couch_db_update_notifier_sup, start_link, []}, + permanent, + infinity, + supervisor, + [couch_db_update_notifier_sup]}, + {couch_metrics_event_manager, + {gen_event, start_link, [{local, couch_metrics_event_manager}]}, + permanent, + brutal_kill, + worker, + dynamic} + ], + Children = SecondarySupervisors ++ [ + begin + {ok, {Module, Fun, Args}} = couch_util:parse_term(SpecStr), + + {list_to_atom(Name), + {Module, Fun, Args}, + permanent, + brutal_kill, + worker, + [Module]} + end + || {Name, SpecStr} + <- couch_config:get("daemons"), SpecStr /= ""], + {ok, {{one_for_one, 10, 3600}, Children}}. diff --git a/apps/couch/src/couch_server.erl b/apps/couch/src/couch_server.erl index 43fd9044..b54771be 100644 --- a/apps/couch/src/couch_server.erl +++ b/apps/couch/src/couch_server.erl @@ -13,10 +13,11 @@ -module(couch_server). -behaviour(gen_server). --export([open/2,create/2,delete/2,all_databases/0,get_version/0]). --export([init/1, handle_call/3,sup_start_link/0]). +-export([open/2,create/2,delete/2,all_databases/0,all_databases/1]). +-export([init/1, handle_call/3,sup_start_link/0,get_version/0]). -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]). --export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]). +-export([dev_start/0,is_admin/2,has_admins/0,get_stats/0,config_change/4]). +-export([close_lru/0]). -include("couch_db.hrl"). @@ -50,15 +51,26 @@ get_stats() -> sup_start_link() -> gen_server:start_link({local, couch_server}, couch_server, [], []). + open(DbName, Options) -> - case gen_server:call(couch_server, {open, DbName, Options}, infinity) of - {ok, Db} -> - Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), - {ok, Db#db{user_ctx=Ctx}}; - Error -> - Error + Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), + case ets:lookup(couch_dbs, DbName) of + [#db{fd=Fd, fd_monitor=Lock} = Db] when Lock =/= locked -> + ets:insert(couch_lru, {DbName, now()}), + {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + _ -> + case gen_server:call(couch_server, {open, DbName, Options}, infinity) of + {ok, #db{fd=Fd} = Db} -> + ets:insert(couch_lru, {DbName, now()}), + {ok, Db#db{user_ctx=Ctx, fd_monitor=erlang:monitor(process,Fd)}}; + Error -> + Error + end end. +close_lru() -> + gen_server:call(couch_server, close_lru). + create(DbName, Options) -> case gen_server:call(couch_server, {create, DbName, Options}, infinity) of {ok, Db} -> @@ -121,28 +133,12 @@ init([]) -> RootDir = couch_config:get("couchdb", "database_dir", "."), MaxDbsOpen = list_to_integer( couch_config:get("couchdb", "max_dbs_open")), - Self = self(), - ok = couch_config:register( - fun("couchdb", "database_dir") -> - exit(Self, config_change) - end), - ok = couch_config:register( - fun("couchdb", "max_dbs_open", Max) -> - gen_server:call(couch_server, - {set_max_dbs_open, list_to_integer(Max)}) - end), + ok = couch_config:register(fun ?MODULE:config_change/4), ok = couch_file:init_delete_dir(RootDir), hash_admin_passwords(), - ok = couch_config:register( - fun("admins", _Key, _Value, Persist) -> - % spawn here so couch_config doesn't try to call itself - spawn(fun() -> hash_admin_passwords(Persist) end) - end, false), - {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"), - ets:new(couch_dbs_by_name, [set, private, named_table]), - ets:new(couch_dbs_by_pid, [set, private, named_table]), - ets:new(couch_dbs_by_lru, [ordered_set, private, named_table]), - ets:new(couch_sys_dbs, [set, private, named_table]), + {ok, RegExp} = re:compile("^[a-z][a-z0-9\\_\\$()\\+\\-\\/\\.]*$"), + ets:new(couch_dbs, [set, protected, named_table, {keypos, #db.name}]), + ets:new(couch_lru, [set, public, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, @@ -150,15 +146,27 @@ init([]) -> start_time=httpd_util:rfc1123_date()}}. terminate(_Reason, _Srv) -> - [couch_util:shutdown_sync(Pid) || {_, {Pid, _LruTime}} <- - ets:tab2list(couch_dbs_by_name)], + ets:foldl(fun(#db{main_pid=Pid}, _) -> couch_util:shutdown_sync(Pid) end, + nil, couch_dbs), ok. +config_change("couchdb", "database_dir", _, _) -> + exit(whereis(couch_server), config_change); +config_change("couchdb", "max_dbs_open", Max, _) -> + gen_server:call(couch_server, {set_max_dbs_open, list_to_integer(Max)}); +config_change("admins", _, _, Persist) -> + % spawn here so couch_config doesn't try to call itself + spawn(fun() -> hash_admin_passwords(Persist) end). + all_databases() -> + all_databases(""). + +all_databases(Prefix) -> {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server), NormRoot = couch_util:normpath(Root), Filenames = - filelib:fold_files(Root, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", true, + filelib:fold_files(Root++Prefix, "^[a-z0-9\\_\\$()\\+\\-]*[\\.]couch$", + true, fun(Filename, AccIn) -> NormFilename = couch_util:normpath(Filename), case NormFilename -- NormRoot of @@ -181,172 +189,145 @@ maybe_close_lru_db(#server{dbs_open=NumOpen}=Server) -> Error -> Error end. +find_oldest_db({DbName, Lru}, Acc) -> + erlang:min({Lru, DbName}, Acc). + try_close_lru(StartTime) -> - LruTime = get_lru(), - if LruTime > StartTime -> - % this means we've looped through all our opened dbs and found them - % all in use. + case ets:foldl(fun find_oldest_db/2, {StartTime, nil}, couch_lru) of + {StartTime, nil} -> {error, all_dbs_active}; - true -> - [{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime), - [{_, {opened, MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName), - case couch_db:is_idle(MainPid) of - true -> - couch_util:shutdown_sync(MainPid), - true = ets:delete(couch_dbs_by_lru, LruTime), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, MainPid), - true = ets:delete(couch_sys_dbs, DbName), + {_, DbName} -> + % There may exist an extremely small possibility of a race + % condition here, if a process could lookup the DB before the lock, + % but fail to monitor the fd before the is_idle check. + true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, locked}), + [#db{main_pid = Pid} = Db] = ets:lookup(couch_dbs, DbName), + case couch_db:is_idle(Db) of true -> + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_lru, DbName), + exit(Pid, kill), ok; false -> - % this still has referrers. Go ahead and give it a current lru time - % and try the next one in the table. - NewLruTime = now(), - true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, NewLruTime}}), - true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}), - true = ets:delete(couch_dbs_by_lru, LruTime), - true = ets:insert(couch_dbs_by_lru, {NewLruTime, DbName}), + true = ets:update_element(couch_dbs, DbName, {#db.fd_monitor, nil}), + true = ets:insert(couch_lru, {DbName, now()}), try_close_lru(StartTime) end end. -get_lru() -> - get_lru(ets:first(couch_dbs_by_lru)). - -get_lru(LruTime) -> - [{LruTime, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime), - case ets:member(couch_sys_dbs, DbName) of - false -> - LruTime; - true -> - [{_, {opened, MainPid, _}}] = ets:lookup(couch_dbs_by_name, DbName), - case couch_db:is_idle(MainPid) of - true -> - LruTime; - false -> - get_lru(ets:next(couch_dbs_by_lru, LruTime)) - end - end. - open_async(Server, From, DbName, Filepath, Options) -> Parent = self(), + put({async_open, DbName}, now()), Opener = spawn_link(fun() -> - Res = couch_db:start_link(DbName, Filepath, Options), - gen_server:call( - Parent, {open_result, DbName, Res, Options}, infinity - ), - unlink(Parent), - case Res of - {ok, DbReader} -> - unlink(DbReader); - _ -> - ok - end - end), - true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From]}}), - true = ets:insert(couch_dbs_by_pid, {Opener, DbName}), - DbsOpen = case lists:member(sys_db, Options) of - true -> - true = ets:insert(couch_sys_dbs, {DbName, true}), - Server#server.dbs_open; - false -> - Server#server.dbs_open + 1 - end, - Server#server{dbs_open = DbsOpen}. - + Res = couch_db:start_link(DbName, Filepath, Options), + gen_server:call(Parent, {open_result, DbName, Res}, infinity), + unlink(Parent) + end), + % icky hack of field values - compactor_pid used to store clients + true = ets:insert(couch_dbs, #db{ + name = DbName, + main_pid = Opener, + compactor_pid = [From], + fd_monitor = locked + }), + Server#server{dbs_open=Server#server.dbs_open + 1}. + +handle_call(close_lru, _From, #server{dbs_open=N} = Server) -> + case try_close_lru(now()) of + ok -> + {reply, ok, Server#server{dbs_open = N-1}}; + Error -> + {reply, Error, Server} + end; +handle_call(open_dbs_count, _From, Server) -> + {reply, Server#server.dbs_open, Server}; +handle_call({set_dbname_regexp, RegExp}, _From, Server) -> + {reply, ok, Server#server{dbname_regexp=RegExp}}; handle_call({set_max_dbs_open, Max}, _From, Server) -> {reply, ok, Server#server{max_dbs_open=Max}}; handle_call(get_server, _From, Server) -> {reply, {ok, Server}, Server}; -handle_call({open_result, DbName, {ok, OpenedDbPid}, Options}, _From, Server) -> - link(OpenedDbPid), - [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName), - lists:foreach(fun({FromPid,_}=From) -> - gen_server:reply(From, - catch couch_db:open_ref_counted(OpenedDbPid, FromPid)) - end, Froms), - LruTime = now(), - true = ets:insert(couch_dbs_by_name, - {DbName, {opened, OpenedDbPid, LruTime}}), - true = ets:delete(couch_dbs_by_pid, Opener), - true = ets:insert(couch_dbs_by_pid, {OpenedDbPid, DbName}), - true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), - case lists:member(create, Options) of - true -> - couch_db_update_notifier:notify({created, DbName}); - false -> - ok +handle_call({open_result, DbName, {ok, Db}}, _From, Server) -> + link(Db#db.main_pid), + case erase({async_open, DbName}) of undefined -> ok; T0 -> + ?LOG_INFO("needed ~p ms to open new ~s", [timer:now_diff(now(),T0)/1000, + DbName]) end, + % icky hack of field values - compactor_pid used to store clients + [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName), + [gen_server:reply(From, {ok, Db}) || From <- Froms], + true = ets:insert(couch_dbs, Db), {reply, ok, Server}; -handle_call({open_result, DbName, Error, Options}, _From, Server) -> - [{DbName, {opening,Opener,Froms}}] = ets:lookup(couch_dbs_by_name, DbName), - lists:foreach(fun(From) -> - gen_server:reply(From, Error) - end, Froms), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, Opener), - DbsOpen = case lists:member(sys_db, Options) of - true -> - true = ets:delete(couch_sys_dbs, DbName), - Server#server.dbs_open; - false -> - Server#server.dbs_open - 1 - end, - {reply, ok, Server#server{dbs_open = DbsOpen}}; -handle_call({open, DbName, Options}, {FromPid,_}=From, Server) -> - LruTime = now(), - case ets:lookup(couch_dbs_by_name, DbName) of +handle_call({open_result, DbName, Error}, _From, Server) -> + % icky hack of field values - compactor_pid used to store clients + [#db{compactor_pid=Froms}] = ets:lookup(couch_dbs, DbName), + [gen_server:reply(From, Error) || From <- Froms], + ?LOG_INFO("open_result error ~p for ~s", [Error, DbName]), + true = ets:delete(couch_dbs, DbName), + {reply, ok, Server#server{dbs_open=Server#server.dbs_open - 1}}; +handle_call({open, DbName, Options}, From, Server) -> + case ets:lookup(couch_dbs, DbName) of [] -> - open_db(DbName, Server, Options, From); - [{_, {opening, Opener, Froms}}] -> - true = ets:insert(couch_dbs_by_name, {DbName, {opening, Opener, [From|Froms]}}), + DbNameList = binary_to_list(DbName), + case check_dbname(Server, DbNameList) of + ok -> + case maybe_close_lru_db(Server) of + {ok, Server2} -> + Filepath = get_full_filename(Server, DbNameList), + {noreply, open_async(Server2, From, DbName, Filepath, Options)}; + CloseError -> + {reply, CloseError, Server} + end; + Error -> + {reply, Error, Server} + end; + [#db{compactor_pid = Froms} = Db] when is_list(Froms) -> + % icky hack of field values - compactor_pid used to store clients + ?LOG_INFO("adding another listener to async open for ~s", [DbName]), + true = ets:insert(couch_dbs, Db#db{compactor_pid = [From|Froms]}), {noreply, Server}; - [{_, {opened, MainPid, PrevLruTime}}] -> - true = ets:insert(couch_dbs_by_name, {DbName, {opened, MainPid, LruTime}}), - true = ets:delete(couch_dbs_by_lru, PrevLruTime), - true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}), - {reply, couch_db:open_ref_counted(MainPid, FromPid), Server} + [#db{} = Db] -> + {reply, {ok, Db}, Server} end; handle_call({create, DbName, Options}, From, Server) -> - case ets:lookup(couch_dbs_by_name, DbName) of - [] -> - open_db(DbName, Server, [create | Options], From); - [_AlreadyRunningDb] -> - {reply, file_exists, Server} + DbNameList = binary_to_list(DbName), + case check_dbname(Server, DbNameList) of + ok -> + case ets:lookup(couch_dbs, DbName) of + [] -> + case maybe_close_lru_db(Server) of + {ok, Server2} -> + Filepath = get_full_filename(Server, DbNameList), + {noreply, open_async(Server2, From, DbName, Filepath, + [create | Options])}; + CloseError -> + {reply, CloseError, Server} + end; + [_AlreadyRunningDb] -> + {reply, file_exists, Server} + end; + Error -> + {reply, Error, Server} end; handle_call({delete, DbName, _Options}, _From, Server) -> DbNameList = binary_to_list(DbName), case check_dbname(Server, DbNameList) of ok -> FullFilepath = get_full_filename(Server, DbNameList), - UpdateState = - case ets:lookup(couch_dbs_by_name, DbName) of - [] -> false; - [{_, {opening, Pid, Froms}}] -> - couch_util:shutdown_sync(Pid), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, Pid), - [gen_server:send_result(F, not_found) || F <- Froms], - true; - [{_, {opened, Pid, LruTime}}] -> - couch_util:shutdown_sync(Pid), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, Pid), - true = ets:delete(couch_dbs_by_lru, LruTime), - true - end, - Server2 = case UpdateState of - true -> - DbsOpen = case ets:member(couch_sys_dbs, DbName) of - true -> - true = ets:delete(couch_sys_dbs, DbName), - Server#server.dbs_open; - false -> - Server#server.dbs_open - 1 - end, - Server#server{dbs_open = DbsOpen}; - false -> - Server + Server2 = + case ets:lookup(couch_dbs, DbName) of + [] -> Server; + [#db{main_pid=Pid, compactor_pid=Froms}] when is_list(Froms) -> + % icky hack of field values - compactor_pid used to store clients + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_lru, DbName), + exit(Pid, kill), + [gen_server:reply(F, not_found) || F <- Froms], + Server#server{dbs_open=Server#server.dbs_open - 1}; + [#db{main_pid=Pid}] -> + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_lru, DbName), + exit(Pid, kill), + Server#server{dbs_open=Server#server.dbs_open - 1} end, %% Delete any leftover .compact files. If we don't do this a subsequent @@ -364,36 +345,37 @@ handle_call({delete, DbName, _Options}, _From, Server) -> end; Error -> {reply, Error, Server} - end. + end; +handle_call({db_updated, Db}, _From, Server) -> + ets:insert(couch_dbs, Db), + {reply, ok, Server}. + -handle_cast(Msg, _Server) -> - exit({unknown_cast_message, Msg}). +handle_cast(Msg, Server) -> + {stop, {unknown_cast_message, Msg}, Server}. code_change(_OldVsn, State, _Extra) -> {ok, State}. - -handle_info({'EXIT', _Pid, config_change}, Server) -> - {noreply, shutdown, Server}; -handle_info(Error, _Server) -> - ?LOG_ERROR("Unexpected message, restarting couch_server: ~p", [Error]), - exit(kill). -open_db(DbName, Server, Options, From) -> - DbNameList = binary_to_list(DbName), - case check_dbname(Server, DbNameList) of - ok -> - Filepath = get_full_filename(Server, DbNameList), - case lists:member(sys_db, Options) of +handle_info({'EXIT', _Pid, config_change}, Server) -> + {stop, config_change, Server}; +handle_info({'EXIT', Pid, Reason}, #server{dbs_open=DbsOpen}=Server) -> + Match = erlang:make_tuple(tuple_size(#db{}), '_', [{1, db}, + {#db.main_pid, Pid}]), + case ets:match_object(couch_dbs, Match) of + [#db{name = DbName, compactor_pid=Froms}] -> + ?LOG_INFO("db ~s died with reason ~p", [DbName, Reason]), + % icky hack of field values - compactor_pid used to store clients + if is_list(Froms) -> + [gen_server:reply(From, Reason) || From <- Froms]; true -> - {noreply, open_async(Server, From, DbName, Filepath, Options)}; - false -> - case maybe_close_lru_db(Server) of - {ok, Server2} -> - {noreply, open_async(Server2, From, DbName, Filepath, Options)}; - CloseError -> - {reply, CloseError, Server} - end - end; - Error -> - {reply, Error, Server} - end. + ok + end, + true = ets:delete(couch_dbs, DbName), + true = ets:delete(couch_lru, DbName), + {noreply, Server#server{dbs_open=DbsOpen - 1}}; + [] -> + {noreply, Server} + end; +handle_info(Info, Server) -> + {stop, {unknown_message, Info}, Server}. diff --git a/apps/couch/src/couch_server_sup.erl b/apps/couch/src/couch_server_sup.erl index 4f0445da..1f31209b 100644 --- a/apps/couch/src/couch_server_sup.erl +++ b/apps/couch/src/couch_server_sup.erl @@ -14,9 +14,8 @@ -behaviour(supervisor). --export([start_link/1,stop/0, couch_config_start_link_wrapper/2, - start_primary_services/0,start_secondary_services/0, - restart_core_server/0]). +-export([start_link/1, couch_config_start_link_wrapper/2, + restart_core_server/0, config_change/2]). -include("couch_db.hrl"). @@ -68,59 +67,46 @@ start_server(IniFiles) -> _ -> ok end, - LibDir = - case couch_config:get("couchdb", "util_driver_dir", null) of - null -> - filename:join(couch_util:priv_dir(), "lib"); - LibDir0 -> LibDir0 - end, - - ok = couch_util:start_driver(LibDir), - BaseChildSpecs = - {{one_for_all, 10, 3600}, + {{one_for_all, 10, 60}, [{couch_config, {couch_server_sup, couch_config_start_link_wrapper, [IniFiles, ConfigPid]}, permanent, brutal_kill, worker, [couch_config]}, + {couch_config_event, + {couch_config_event, start_link, []}, + permanent, + 1000, + worker, + dynamic}, {couch_primary_services, - {couch_server_sup, start_primary_services, []}, + {couch_primary_sup, start_link, []}, permanent, infinity, supervisor, - [couch_server_sup]}, + [couch_primary_sup]}, {couch_secondary_services, - {couch_server_sup, start_secondary_services, []}, + {couch_secondary_sup, start_link, []}, permanent, infinity, supervisor, - [couch_server_sup]} + [couch_secondary_sup]} ]}, - % ensure these applications are running - application:start(ibrowse), - application:start(crypto), - {ok, Pid} = supervisor:start_link( {local, couch_server_sup}, couch_server_sup, BaseChildSpecs), - % launch the icu bridge % just restart if one of the config settings change. - - couch_config:register( - fun("couchdb", "util_driver_dir") -> - ?MODULE:stop(); - ("daemons", _) -> - ?MODULE:stop() - end, Pid), + couch_config:register(fun ?MODULE:config_change/2, Pid), unlink(ConfigPid), Ip = couch_config:get("httpd", "bind_address"), Port = mochiweb_socket_server:get(couch_httpd, port), io:format("Apache CouchDB has started. Time to relax.~n"), + ?LOG_INFO("Apache CouchDB has started on http://~s:~w/", [Ip, Port]), case couch_config:get("couchdb", "uri_file", null) of @@ -132,62 +118,12 @@ start_server(IniFiles) -> {ok, Pid}. -start_primary_services() -> - supervisor:start_link({local, couch_primary_services}, couch_server_sup, - {{one_for_one, 10, 3600}, - [{couch_log, - {couch_log, start_link, []}, - permanent, - brutal_kill, - worker, - [couch_log]}, - {couch_replication_supervisor, - {couch_rep_sup, start_link, []}, - permanent, - infinity, - supervisor, - [couch_rep_sup]}, - {couch_task_status, - {couch_task_status, start_link, []}, - permanent, - brutal_kill, - worker, - [couch_task_status]}, - {couch_server, - {couch_server, sup_start_link, []}, - permanent, - 1000, - worker, - [couch_server]}, - {couch_db_update_event, - {gen_event, start_link, [{local, couch_db_update}]}, - permanent, - brutal_kill, - worker, - dynamic} - ] - }). - -start_secondary_services() -> - DaemonChildSpecs = [ - begin - {ok, {Module, Fun, Args}} = couch_util:parse_term(SpecStr), - - {list_to_atom(Name), - {Module, Fun, Args}, - permanent, - 1000, - worker, - [Module]} - end - || {Name, SpecStr} - <- couch_config:get("daemons"), SpecStr /= ""], - - supervisor:start_link({local, couch_secondary_services}, couch_server_sup, - {{one_for_one, 10, 3600}, DaemonChildSpecs}). - -stop() -> - catch exit(whereis(couch_server_sup), normal). +config_change("daemons", _) -> + exit(whereis(couch_server_sup), shutdown); +config_change("couchdb", "util_driver_dir") -> + [Pid] = [P || {collation_driver,P,_,_} + <- supervisor:which_children(couch_primary_services)], + Pid ! reload_driver. init(ChildSpecs) -> {ok, ChildSpecs}. diff --git a/apps/couch/src/couch_stats_aggregator.erl b/apps/couch/src/couch_stats_aggregator.erl index 6090355d..7dac1124 100644 --- a/apps/couch/src/couch_stats_aggregator.erl +++ b/apps/couch/src/couch_stats_aggregator.erl @@ -94,7 +94,12 @@ init(StatDescsFileName) -> ets:new(?MODULE, [named_table, set, protected]), SampleStr = couch_config:get("stats", "samples", "[0]"), {ok, Samples} = couch_util:parse_term(SampleStr), - {ok, Descs} = file:consult(StatDescsFileName), + case file:consult(StatDescsFileName) of + {ok, Descs} -> + ok; + {error, _} -> + Descs = [] + end, lists:foreach(fun({Sect, Key, Value}) -> lists:foreach(fun(Secs) -> Agg = #aggregate{ diff --git a/apps/couch/src/couch_stats_collector.erl b/apps/couch/src/couch_stats_collector.erl index f7b9bb48..74238fc8 100644 --- a/apps/couch/src/couch_stats_collector.erl +++ b/apps/couch/src/couch_stats_collector.erl @@ -85,21 +85,12 @@ track_process_count(Stat) -> track_process_count(self(), Stat). track_process_count(Pid, Stat) -> - MonitorFun = fun() -> - Ref = erlang:monitor(process, Pid), - receive {'DOWN', Ref, _, _, _} -> ok end, - couch_stats_collector:decrement(Stat) - end, - case (catch couch_stats_collector:increment(Stat)) of - ok -> spawn(MonitorFun); - _ -> ok - end. - + gen_server:cast(?MODULE, {track_process_count, Stat, Pid}). init(_) -> ets:new(?HIT_TABLE, [named_table, set, public]), ets:new(?ABS_TABLE, [named_table, duplicate_bag, public]), - {ok, nil}. + {ok, []}. terminate(_Reason, _State) -> ok. @@ -107,11 +98,15 @@ terminate(_Reason, _State) -> handle_call(stop, _, State) -> {stop, normal, stopped, State}. -handle_cast(foo, State) -> - {noreply, State}. +handle_cast({track_process_count, Stat, Pid}, State) -> + ok = couch_stats_collector:increment(Stat), + Ref = erlang:monitor(process, Pid), + {noreply, [{Ref,Stat} | State]}. -handle_info(_Info, State) -> - {noreply, State}. +handle_info({'DOWN', Ref, _, _, _}, State) -> + {Ref, Stat} = lists:keyfind(Ref, 1, State), + ok = couch_stats_collector:decrement(Stat), + {noreply, lists:keydelete(Ref, 1, State)}. code_change(_OldVersion, State, _Extra) -> {ok, State}. diff --git a/apps/couch/src/couch_task_status.erl b/apps/couch/src/couch_task_status.erl index c4487dc4..639515c7 100644 --- a/apps/couch/src/couch_task_status.erl +++ b/apps/couch/src/couch_task_status.erl @@ -107,7 +107,6 @@ handle_call(all, _, Server) -> handle_cast({update_status, Pid, StatusText}, Server) -> [{Pid, {Type, TaskName, _StatusText}}] = ets:lookup(?MODULE, Pid), - ?LOG_DEBUG("New task status for ~s: ~s",[TaskName, StatusText]), true = ets:insert(?MODULE, {Pid, {Type, TaskName, StatusText}}), {noreply, Server}; handle_cast(stop, State) -> diff --git a/apps/couch/src/couch_view.erl b/apps/couch/src/couch_view.erl index 38c0a783..72facd27 100644 --- a/apps/couch/src/couch_view.erl +++ b/apps/couch/src/couch_view.erl @@ -17,7 +17,8 @@ detuple_kvs/2,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2, code_change/3,get_reduce_view/4,get_temp_reduce_view/5,get_temp_map_view/4, get_map_view/4,get_row_count/1,reduce_to_count/1,fold_reduce/4, - extract_map_view/1,get_group_server/2,get_group_info/2,cleanup_index_files/1]). + extract_map_view/1,get_group_server/2,get_group_info/2, + cleanup_index_files/1,config_change/2]). -include("couch_db.hrl"). @@ -249,11 +250,7 @@ fold(#view{btree=Btree}, Fun, Acc, Options) -> init([]) -> % read configuration settings and register for configuration changes RootDir = couch_config:get("couchdb", "view_index_dir"), - Self = self(), - ok = couch_config:register( - fun("couchdb", "view_index_dir")-> - exit(Self, config_change) - end), + ok = couch_config:register(fun ?MODULE:config_change/2), couch_db_update_notifier:start_link( fun({deleted, DbName}) -> @@ -335,6 +332,9 @@ handle_info({'EXIT', FromPid, Reason}, Server) -> end, {noreply, Server}. +config_change("couchdb", "view_index_dir") -> + exit(whereis(couch_view), config_change). + add_to_ets(Pid, DbName, Sig) -> true = ets:insert(couch_groups_by_updater, {Pid, {DbName, Sig}}), true = ets:insert(group_servers_by_sig, {{DbName, Sig}, Pid}), diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl index 895556bf..f56325a4 100644 --- a/apps/couch/src/couch_view_compactor.erl +++ b/apps/couch/src/couch_view_compactor.erl @@ -36,15 +36,20 @@ compact_group(Group, EmptyGroup) -> } = Group, #group{ - db = Db, + dbname = DbName, + fd = Fd, id_btree = EmptyIdBtree, + sig = Sig, views = EmptyViews } = EmptyGroup, - {ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree), + erlang:monitor(process, Fd), + + {ok, Db} = couch_db:open(DbName, []), + + {ok, {Count, _}} = couch_btree:full_reduce(Db#db.id_tree), <<"_design", ShortName/binary>> = GroupId, - DbName = couch_db:name(Db), TaskName = <<DbName/binary, ShortName/binary>>, couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>), @@ -72,10 +77,10 @@ compact_group(Group, EmptyGroup) -> current_seq=Seq }, - Pid = couch_view:get_group_server(DbName, GroupId), + Pid = ets:lookup_element(group_servers_by_sig, {DbName, Sig}, 2), gen_server:cast(Pid, {compact_done, NewGroup}). -%% @spec compact_view(View, EmptyView, Retry) -> CompactView +%% @spec compact_view(View, EmptyView) -> CompactView compact_view(View, EmptyView) -> {ok, Count} = couch_view:get_row_count(View), diff --git a/apps/couch/src/couch_view_group.erl b/apps/couch/src/couch_view_group.erl index f01befdf..f11bb54d 100644 --- a/apps/couch/src/couch_view_group.erl +++ b/apps/couch/src/couch_view_group.erl @@ -39,8 +39,7 @@ request_group(Pid, Seq) -> ?LOG_DEBUG("request_group {Pid, Seq} ~p", [{Pid, Seq}]), case gen_server:call(Pid, {request_group, Seq}, infinity) of - {ok, Group, RefCounter} -> - couch_ref_counter:add(RefCounter), + {ok, Group, _RefCounter} -> {ok, Group}; Error -> ?LOG_DEBUG("request_group Error ~p", [Error]), @@ -75,27 +74,26 @@ start_link(InitArgs) -> end. % init creates a closure which spawns the appropriate view_updater. -init({InitArgs, ReturnPid, Ref}) -> +init({{_, DbName, _}=InitArgs, ReturnPid, Ref}) -> process_flag(trap_exit, true), case prepare_group(InitArgs, false) of - {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} -> + {ok, #group{fd=Fd, current_seq=Seq}=Group} -> + {ok, Db} = couch_db:open(DbName, []), case Seq > couch_db:get_update_seq(Db) of true -> ReturnPid ! {Ref, self(), {error, invalid_view_seq}}, + couch_db:close(Db), ignore; _ -> - couch_db:monitor(Db), + try couch_db:monitor(Db) after couch_db:close(Db) end, Owner = self(), - Pid = spawn_link( - fun()-> couch_view_updater:update(Owner, Group) end - ), - {ok, RefCounter} = couch_ref_counter:start([Fd]), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), {ok, #group_state{ - db_name=couch_db:name(Db), + db_name= DbName, init_args=InitArgs, updater_pid = Pid, - group=Group, - ref_counter=RefCounter}} + group=Group#group{dbname=DbName}, + ref_counter=erlang:monitor(process,Fd)}} end; Error -> ReturnPid ! {Ref, self(), Error}, @@ -120,19 +118,16 @@ init({InitArgs, ReturnPid, Ref}) -> handle_call({request_group, RequestSeq}, From, #group_state{ - db_name=DbName, group=#group{current_seq=Seq}=Group, updater_pid=nil, waiting_list=WaitList }=State) when RequestSeq > Seq -> - {ok, Db} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db}, Owner = self(), - Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end), {noreply, State#group_state{ updater_pid=Pid, - group=Group2, + group=Group, waiting_list=[{From,RequestSeq}|WaitList] }, infinity}; @@ -153,6 +148,10 @@ handle_call({request_group, RequestSeq}, From, waiting_list=[{From, RequestSeq}|WaitList] }, infinity}; +handle_call({start_compact, CompactFun}, _From, State) -> + {noreply, NewState} = handle_cast({start_compact, CompactFun}, State), + {reply, {ok, NewState#group_state.compactor_pid}, NewState}; + handle_call(request_group_info, _From, State) -> GroupInfo = get_group_info(State), {reply, {ok, GroupInfo}, State}. @@ -160,24 +159,23 @@ handle_call(request_group_info, _From, State) -> handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil} = State) -> #group_state{ - group = #group{name = GroupId, sig = GroupSig} = Group, - init_args = {RootDir, DbName, _} + group = #group{dbname = DbName, name = GroupId, sig = GroupSig} = Group, + init_args = {RootDir, _, _} } = State, ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]), - {ok, Db} = couch_db:open_int(DbName, []), {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig), - NewGroup = reset_file(Db, Fd, DbName, Group), + NewGroup = reset_file(Fd, DbName, Group), Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end), {noreply, State#group_state{compactor_pid = Pid}}; handle_cast({start_compact, _}, State) -> %% compact already running, this is a no-op {noreply, State}; -handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, +handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup}, #group_state{group = #group{current_seq=OldSeq}} = State) when NewSeq >= OldSeq -> #group_state{ - group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group, + group = #group{name=GroupId, fd=OldFd, sig=GroupSig}, init_args = {RootDir, DbName, _}, updater_pid = UpdaterPid, ref_counter = RefCounter @@ -202,17 +200,12 @@ handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup}, %% cleanup old group unlink(OldFd), - couch_ref_counter:drop(RefCounter), - {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]), - case Group#group.db of - nil -> ok; - Else -> couch_db:close(Else) - end, + erlang:demonitor(RefCounter), self() ! delayed_commit, {noreply, State#group_state{ group=NewGroup, - ref_counter=NewRefCounter, + ref_counter=erlang:monitor(process,NewFd), compactor_pid=nil, updater_pid=NewUpdaterPid }}; @@ -223,17 +216,14 @@ handle_cast({compact_done, NewGroup}, State) -> } = State, ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++ "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]), - couch_db:close(NewGroup#group.db), - {ok, Db} = couch_db:open_int(DbName, []), + GroupServer = self(), Pid = spawn_link(fun() -> + erlang:monitor(process, NewGroup#group.fd), {_,Ref} = erlang:spawn_monitor(fun() -> - couch_view_updater:update(nil, NewGroup#group{db = Db}) + couch_view_updater:update(nil, NewGroup) end), - receive - {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> - #group{name=GroupId} = NewGroup2, - Pid2 = couch_view:get_group_server(DbName, GroupId), - gen_server:cast(Pid2, {compact_done, NewGroup2}) + receive {'DOWN', Ref, _, _, {new_group, NewGroup2}} -> + gen_server:cast(GroupServer, {compact_done, NewGroup2}) end end), {noreply, State#group_state{compactor_pid = Pid}}; @@ -245,7 +235,7 @@ handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid} waiting_commit = WaitingCommit } = State, NewSeq = NewGroup#group.current_seq, - ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq, + ?LOG_DEBUG("checkpointing view update at seq ~p for ~s ~s", [NewSeq, DbName, NewGroup#group.name]), if not WaitingCommit -> erlang:send_after(1000, self(), delayed_commit); @@ -275,13 +265,12 @@ handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) -> {noreply, State#group_state{waiting_commit=true}} end; -handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, - #group_state{db_name=DbName, +handle_info({'EXIT', FromPid, {new_group, Group}}, + #group_state{ updater_pid=UpPid, ref_counter=RefCounter, waiting_list=WaitList, waiting_commit=WaitingCommit}=State) when UpPid == FromPid -> - ok = couch_db:close(Db), if not WaitingCommit -> erlang:send_after(1000, self(), delayed_commit); true -> ok @@ -289,26 +278,20 @@ handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}}, case reply_with_group(Group, WaitList, [], RefCounter) of [] -> {noreply, State#group_state{waiting_commit=true, waiting_list=[], - group=Group#group{db=nil}, updater_pid=nil}}; + group=Group, updater_pid=nil}}; StillWaiting -> - % we still have some waiters, reopen the database and reupdate the index - {ok, Db2} = couch_db:open_int(DbName, []), - Group2 = Group#group{db=Db2}, + % we still have some waiters, reupdate the index Owner = self(), - Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end), + Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group) end), {noreply, State#group_state{waiting_commit=true, - waiting_list=StillWaiting, group=Group2, updater_pid=Pid}} + waiting_list=StillWaiting, group=Group, updater_pid=Pid}} end; handle_info({'EXIT', _, {new_group, _}}, State) -> %% message from an old (probably pre-compaction) updater; ignore {noreply, State}; -handle_info({'EXIT', FromPid, reset}, - #group_state{ - init_args=InitArgs, - updater_pid=UpPid, - group=Group}=State) when UpPid == FromPid -> - ok = couch_db:close(Group#group.db), +handle_info({'EXIT', FromPid, reset}, #group_state{init_args=InitArgs, + updater_pid=FromPid}=State) -> case prepare_group(InitArgs, true) of {ok, ResetGroup} -> Owner = self(), @@ -334,8 +317,9 @@ handle_info({'EXIT', FromPid, Reason}, State) -> ?LOG_DEBUG("Exit from linked pid: ~p", [{FromPid, Reason}]), {stop, Reason, State}; -handle_info({'DOWN',_,_,_,_}, State) -> - ?LOG_INFO("Shutting down view group server, monitored db is closing.", []), +handle_info({'DOWN',_,_,Pid,Reason}, #group_state{group=G}=State) -> + ?LOG_INFO("Shutting down group server ~p, db ~p closing w/ reason~n~p", + [G#group.name, Pid, Reason]), {stop, normal, reply_all(State, shutdown)}. @@ -371,32 +355,29 @@ reply_all(#group_state{waiting_list=WaitList}=State, Reply) -> [catch gen_server:reply(Pid, Reply) || {Pid, _} <- WaitList], State#group_state{waiting_list=[]}. +prepare_group({Root, DbName, #group{dbname=X}=G}, Reset) when X =/= DbName -> + prepare_group({Root, DbName, G#group{dbname=DbName}}, Reset); prepare_group({RootDir, DbName, #group{sig=Sig}=Group}, ForceReset)-> - case couch_db:open_int(DbName, []) of - {ok, Db} -> - case open_index_file(RootDir, DbName, Sig) of - {ok, Fd} -> - if ForceReset -> - % this can happen if we missed a purge - {ok, reset_file(Db, Fd, DbName, Group)}; - true -> - % 09 UPGRADE CODE - ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), - case (catch couch_file:read_header(Fd)) of - {ok, {Sig, HeaderInfo}} -> - % sigs match! - {ok, init_group(Db, Fd, Group, HeaderInfo)}; - _ -> - % this happens on a new file - {ok, reset_file(Db, Fd, DbName, Group)} - end - end; - Error -> - catch delete_index_file(RootDir, DbName, Sig), - Error + case open_index_file(RootDir, DbName, Sig) of + {ok, Fd} -> + if ForceReset -> + % this can happen if we missed a purge + {ok, reset_file(Fd, DbName, Group)}; + true -> + % 09 UPGRADE CODE + ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>), + case (catch couch_file:read_header(Fd)) of + {ok, {Sig, HeaderInfo}} -> + % sigs match! + {ok, init_group(Fd, Group, HeaderInfo)}; + _ -> + % this happens on a new file + {ok, reset_file(Fd, DbName, Group)} + end end; - Else -> - Else + Error -> + catch delete_index_file(RootDir, DbName, Sig), + Error end. get_index_header_data(#group{current_seq=Seq, purge_seq=PurgeSeq, @@ -446,7 +427,7 @@ open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) -> reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end, options=DesignOptions}, - {ok, Db, set_view_sig(#group{name = <<"_temp">>, db=Db, views=[View], + {ok, Db, set_view_sig(#group{name = <<"_temp">>, views=[View], def_lang=Language, design_options=DesignOptions})}; Error -> Error @@ -531,28 +512,39 @@ design_doc_to_view_group(#doc{id=Id,body={Fields}}) -> {View#view{id_num=N},N+1} end, 0, lists:sort(dict:to_list(DictBySrc))), - set_view_sig(#group{name=Id, views=Views, def_lang=Language, design_options=DesignOptions}). - -reset_group(#group{views=Views}=Group) -> - Views2 = [View#view{btree=nil} || View <- Views], - Group#group{db=nil,fd=nil,query_server=nil,current_seq=0, - id_btree=nil,views=Views2}. - -reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) -> - ?LOG_DEBUG("Resetting group index \"~s\" in db ~s", [Name, DbName]), + #group{ + name = Id, + views = Views, + def_lang = Language, + design_options = DesignOptions, + sig = couch_util:md5(term_to_binary({Views, Language, DesignOptions})) + }. + +reset_group(DbName, #group{views=Views}=Group) -> + Group#group{ + fd = nil, + dbname = DbName, + query_server = nil, + current_seq = 0, + id_btree = nil, + views = [View#view{btree=nil} || View <- Views] + }. + +reset_file(Fd, DbName, #group{sig=Sig,name=Name} = Group) -> + ?LOG_INFO("Resetting group index \"~s\" in db ~s", [Name, DbName]), ok = couch_file:truncate(Fd, 0), ok = couch_file:write_header(Fd, {Sig, nil}), - init_group(Db, Fd, reset_group(Group), nil). + init_group(Fd, reset_group(DbName, Group), nil). delete_index_file(RootDir, DbName, GroupSig) -> couch_file:delete(RootDir, index_file_name(RootDir, DbName, GroupSig)). -init_group(Db, Fd, #group{views=Views}=Group, nil) -> - init_group(Db, Fd, Group, - #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db), - id_btree_state=nil, view_states=[nil || _ <- Views]}); -init_group(Db, Fd, #group{def_lang=Lang,views=Views}= - Group, IndexHeader) -> +init_group(Fd, #group{dbname=DbName, views=Views}=Group, nil) -> + {ok, Db} = couch_db:open(DbName, []), + PurgeSeq = try couch_db:get_purge_seq(Db) after couch_db:close(Db) end, + Header = #index_header{purge_seq=PurgeSeq, view_states=[nil || _ <- Views]}, + init_group(Fd, Group, Header); +init_group(Fd, #group{def_lang=Lang,views=Views}=Group, IndexHeader) -> #index_header{seq=Seq, purge_seq=PurgeSeq, id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader, {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd), @@ -580,13 +572,10 @@ init_group(Db, Fd, #group{def_lang=Lang,views=Views}= <<"raw">> -> Less = fun(A,B) -> A < B end end, - {ok, Btree} = couch_btree:open(BtreeState, Fd, - [{less, Less}, - {reduce, ReduceFun}]), + {ok, Btree} = couch_btree:open(BtreeState, Fd, [{less, Less}, + {reduce, ReduceFun}]), View#view{btree=Btree} end, ViewStates, Views), - Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, - id_btree=IdBtree, views=Views2}. - - + Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq, id_btree=IdBtree, + views=Views2}. diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl index 2a9c960f..8424862b 100644 --- a/apps/couch/src/couch_view_updater.erl +++ b/apps/couch/src/couch_view_updater.erl @@ -20,20 +20,21 @@ update(Owner, Group) -> #group{ - db = #db{name=DbName} = Db, + dbname = DbName, name = GroupName, current_seq = Seq, purge_seq = PurgeSeq } = Group, couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>), + {ok, Db} = couch_db:open(DbName, []), DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = if DbPurgeSeq == PurgeSeq -> Group; DbPurgeSeq == PurgeSeq + 1 -> couch_task_status:update(<<"Removing purged entries from view index.">>), - purge_index(Group); + purge_index(Db, Group); true -> couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), exit(reset) @@ -77,7 +78,7 @@ update(Owner, Group) -> end. -purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> +purge_index(Db, #group{views=Views, id_btree=IdBtree}=Group) -> {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db), Ids = [Id || {Id, _Revs} <- PurgedIdsRevs], {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids), @@ -108,9 +109,14 @@ purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) -> views=Views2, purge_seq=couch_db:get_purge_seq(Db)}. - -load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign) -> - #doc_info{id=DocId, high_seq=Seq, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo, +-spec load_doc(#db{}, #doc_info{}, pid(), [atom()], boolean()) -> ok. +load_doc(Db, DI, MapQueue, DocOpts, IncludeDesign) -> + DocInfo = case DI of + #full_doc_info{id=DocId, update_seq=Seq, deleted=Deleted} -> + couch_doc:to_doc_info(DI); + #doc_info{id=DocId, high_seq=Seq, revs=[#rev_info{deleted=Deleted}|_]} -> + DI + end, case {IncludeDesign, DocId} of {false, <<?DESIGN_DOC_PREFIX, _/binary>>} -> % we skip design docs ok; @@ -122,7 +128,8 @@ load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign) -> couch_work_queue:queue(MapQueue, {Seq, Doc}) end end. - + +-spec do_maps(#group{}, pid(), pid(), any()) -> any(). do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) -> case couch_work_queue:dequeue(MapQueue) of closed -> @@ -139,6 +146,7 @@ do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) -> do_maps(Group1, MapQueue, WriteQueue, ViewEmptyKVs) end. +-spec do_writes(pid(), pid() | nil, #group{}, pid(), boolean()) -> any(). do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) -> case couch_work_queue:dequeue(WriteQueue) of closed -> @@ -165,6 +173,7 @@ do_writes(Parent, Owner, Group, WriteQueue, InitialBuild) -> do_writes(Parent, Owner, Group2, WriteQueue, InitialBuild) end. +-spec view_insert_query_results([#doc{}], list(), any(), any()) -> any(). view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) -> {ViewKVs, DocIdViewIdKeysAcc}; view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, DocIdViewIdKeysAcc) -> @@ -172,7 +181,8 @@ view_insert_query_results([Doc|RestDocs], [QueryResults | RestResults], ViewKVs, NewDocIdViewIdKeys = [{Doc#doc.id, NewViewIdKeys} | DocIdViewIdKeysAcc], view_insert_query_results(RestDocs, RestResults, NewViewKVs, NewDocIdViewIdKeys). - +-spec view_insert_doc_query_results(#doc{}, list(), list(), any(), any()) -> + any(). view_insert_doc_query_results(_Doc, [], [], ViewKVsAcc, ViewIdKeysAcc) -> {lists:reverse(ViewKVsAcc), lists:reverse(ViewIdKeysAcc)}; view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{View, KVs}|RestViewKVs], ViewKVsAcc, ViewIdKeysAcc) -> @@ -199,6 +209,7 @@ view_insert_doc_query_results(#doc{id=DocId}=Doc, [ResultKVs|RestResults], [{Vie NewViewIdKeysAcc = NewViewIdKeys ++ ViewIdKeysAcc, view_insert_doc_query_results(Doc, RestResults, RestViewKVs, NewViewKVsAcc, NewViewIdKeysAcc). +-spec view_compute(#group{}, [#doc{}]) -> {#group{}, any()}. view_compute(Group, []) -> {Group, []}; view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) -> @@ -214,7 +225,6 @@ view_compute(#group{def_lang=DefLang, query_server=QueryServerIn}=Group, Docs) - {Group#group{query_server=QueryServer}, Results}. - write_changes(Group, ViewKeyValuesToAdd, DocIdViewIdKeys, NewSeq, InitialBuild) -> #group{id_btree=IdBtree} = Group, |