diff options
author | Damien F. Katz <damien@apache.org> | 2010-03-04 01:09:22 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2010-03-04 01:09:22 +0000 |
commit | 3a3a9c1efab1c9fe4cd5ebb6c80da4005eb0806b (patch) | |
tree | f850ff24c113465531aab2bdcdb2f2e70855d00c /src | |
parent | 2ea7ab6a8525184ecdbfcee69667e200511b9f9b (diff) |
Changed process tree shutdown to be synchronous, to eliminate spurious test failures caused by processes not shutdown fast enough or at the wrong time.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@918805 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/couch_config.erl | 18 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 11 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 13 | ||||
-rw-r--r-- | src/couchdb/couch_external_server.erl | 4 | ||||
-rw-r--r-- | src/couchdb/couch_file.erl | 19 | ||||
-rw-r--r-- | src/couchdb/couch_native_process.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_os_process.erl | 1 | ||||
-rw-r--r-- | src/couchdb/couch_query_servers.erl | 75 | ||||
-rw-r--r-- | src/couchdb/couch_ref_counter.erl | 9 | ||||
-rw-r--r-- | src/couchdb/couch_server.erl | 33 | ||||
-rw-r--r-- | src/couchdb/couch_server_sup.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_stats_collector.erl | 8 | ||||
-rw-r--r-- | src/couchdb/couch_util.erl | 39 | ||||
-rw-r--r-- | src/couchdb/couch_view.erl | 10 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 4 |
15 files changed, 154 insertions, 98 deletions
diff --git a/src/couchdb/couch_config.erl b/src/couchdb/couch_config.erl index 1fe5aa0d..0e0c3fcb 100644 --- a/src/couchdb/couch_config.erl +++ b/src/couchdb/couch_config.erl @@ -111,7 +111,7 @@ terminate(_Reason, _State) -> handle_call(all, _From, Config) -> Resp = lists:sort((ets:tab2list(?MODULE))), {reply, Resp, Config}; -handle_call({set, Sec, Key, Val, Persist}, _From, Config) -> +handle_call({set, Sec, Key, Val, Persist}, From, Config) -> true = ets:insert(?MODULE, {{Sec, Key}, Val}), case {Persist, Config#config.write_filename} of {true, undefined} -> @@ -121,9 +121,12 @@ handle_call({set, Sec, Key, Val, Persist}, _From, Config) -> _ -> ok end, - [catch F(Sec, Key, Val, Persist) || {_Pid, F} <- Config#config.notify_funs], - {reply, ok, Config}; -handle_call({delete, Sec, Key, Persist}, _From, Config) -> + spawn_link(fun() -> + [catch F(Sec, Key, Val, Persist) || {_Pid, F} <- Config#config.notify_funs], + gen_server:reply(From, ok) + end), + {noreply, Config}; +handle_call({delete, Sec, Key, Persist}, From, Config) -> true = ets:delete(?MODULE, {Sec,Key}), case {Persist, Config#config.write_filename} of {true, undefined} -> @@ -133,8 +136,11 @@ handle_call({delete, Sec, Key, Persist}, _From, Config) -> _ -> ok end, - [catch F(Sec, Key, deleted, Persist) || {_Pid, F} <- Config#config.notify_funs], - {reply, ok, Config}; + spawn_link(fun() -> + [catch F(Sec, Key, deleted, Persist) || {_Pid, F} <- Config#config.notify_funs], + gen_server:reply(From, ok) + end), + {noreply, Config}; handle_call({register, Fun, Pid}, _From, #config{notify_funs=PidFuns}=Config) -> erlang:monitor(process, Pid), % convert 1 and 2 arity to 3 arity diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index e5635f55..bf40e6b1 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -903,10 +903,11 @@ init({DbName, Filepath, Fd, Options}) -> {ok, #db{fd_ref_counter=RefCntr}=Db} = gen_server:call(UpdaterPid, get_db), couch_ref_counter:add(RefCntr), couch_stats_collector:track_process_count({couchdb, open_databases}), + process_flag(trap_exit, true), {ok, Db}. -terminate(Reason, _Db) -> - couch_util:terminate_linked(Reason), +terminate(_Reason, Db) -> + couch_util:shutdown_sync(Db#db.update_pid), ok. handle_call({open_ref_count, OpenerPid}, _, #db{fd_ref_counter=RefCntr}=Db) -> @@ -934,7 +935,11 @@ handle_cast(Msg, Db) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - + +handle_info({'EXIT', _Pid, normal}, Db) -> + {noreply, Db}; +handle_info({'EXIT', _Pid, Reason}, Server) -> + {stop, Reason, Server}; handle_info(Msg, Db) -> ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]), exit({error, Msg}). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index 982ee03a..fdd79481 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -20,6 +20,7 @@ init({MainPid, DbName, Filepath, Fd, Options}) -> + process_flag(trap_exit, true), case lists:member(create, Options) of true -> % create a new header and writes it to the file @@ -37,8 +38,10 @@ init({MainPid, DbName, Filepath, Fd, Options}) -> {ok, Db2#db{main_pid=MainPid}}. -terminate(Reason, _Srv) -> - couch_util:terminate_linked(Reason), +terminate(_Reason, Db) -> + couch_file:close(Db#db.fd), + couch_util:shutdown_sync(Db#db.compactor_pid), + couch_util:shutdown_sync(Db#db.fd_ref_counter), ok. handle_call(get_db, _From, Db) -> @@ -214,7 +217,11 @@ handle_info(delayed_commit, Db) -> Db2 -> ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}), {noreply, Db2} - end. + end; +handle_info({'EXIT', _Pid, normal}, Db) -> + {noreply, Db}; +handle_info({'EXIT', _Pid, Reason}, Db) -> + {stop, Reason, Db}. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/couchdb/couch_external_server.erl b/src/couchdb/couch_external_server.erl index 8e495320..045fcee9 100644 --- a/src/couchdb/couch_external_server.erl +++ b/src/couchdb/couch_external_server.erl @@ -50,9 +50,11 @@ terminate(_Reason, {_Name, _Command, Pid}) -> handle_call({execute, JsonReq}, _From, {Name, Command, Pid}) -> {reply, couch_os_process:prompt(Pid, JsonReq), {Name, Command, Pid}}. +handle_info({'EXIT', _Pid, normal}, State) -> + {noreply, State}; handle_info({'EXIT', Pid, Reason}, {Name, Command, Pid}) -> ?LOG_INFO("EXTERNAL: Process for ~s exiting. (reason: ~w)", [Name, Reason]), - {stop, normal, {Name, Command, Pid}}. + {stop, Reason, {Name, Command, Pid}}. handle_cast(stop, {Name, Command, Pid}) -> ?LOG_INFO("EXTERNAL: Shutting down ~s", [Name]), diff --git a/src/couchdb/couch_file.erl b/src/couchdb/couch_file.erl index 5904260c..4c6928a7 100644 --- a/src/couchdb/couch_file.erl +++ b/src/couchdb/couch_file.erl @@ -176,13 +176,21 @@ sync(Fd) -> gen_server:call(Fd, sync, infinity). %%---------------------------------------------------------------------- -%% Purpose: Close the file. Is performed asynchronously. +%% Purpose: Close the file. %% Returns: ok %%---------------------------------------------------------------------- close(Fd) -> - Result = gen_server:cast(Fd, close), - catch unlink(Fd), - Result. + MRef = erlang:monitor(process, Fd), + try + catch unlink(Fd), + catch exit(Fd, shutdown), + receive + {'DOWN', MRef, _, _, _} -> + ok + end + after + erlang:demonitor(MRef, [flush]) + end. % 09 UPGRADE CODE old_pread(Fd, Pos, Len) -> @@ -219,6 +227,7 @@ init_status_error(ReturnPid, Ref, Error) -> % server functions init({Filepath, Options, ReturnPid, Ref}) -> + process_flag(trap_exit, true), case lists:member(create, Options) of true -> filelib:ensure_dir(Filepath), @@ -432,6 +441,8 @@ handle_cast(close, Fd) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_info({'EXIT', _, normal}, Fd) -> + {noreply, Fd}; handle_info({'EXIT', _, Reason}, Fd) -> {stop, Reason, Fd}. diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl index 5600c421..a9869b29 100644 --- a/src/couchdb/couch_native_process.erl +++ b/src/couchdb/couch_native_process.erl @@ -94,8 +94,10 @@ handle_call({prompt, Data}, _From, State) -> {reply, Resp, NewState} end. -handle_cast(_Msg, State) -> {noreply, State}. -handle_info(_Msg, State) -> {noreply, State}. +handle_cast(foo, State) -> {noreply, State}. +handle_info({'EXIT',_,normal}, State) -> {noreply, State}; +handle_info({'EXIT',_,Reason}, State) -> + {stop, Reason, State}. terminate(_Reason, _State) -> ok. code_change(_OldVersion, State, _Extra) -> {ok, State}. diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl index 5ac13715..070b86fc 100644 --- a/src/couchdb/couch_os_process.erl +++ b/src/couchdb/couch_os_process.erl @@ -104,6 +104,7 @@ readjson(OsProc) when is_record(OsProc, os_proc) -> % gen_server API init([Command, Options, PortOptions]) -> + process_flag(trap_exit, true), PrivDir = couch_util:priv_dir(), Spawnkiller = filename:join(PrivDir, "couchspawnkillable"), BaseProc = #os_proc{ diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl index 3095b199..344ce8b2 100644 --- a/src/couchdb/couch_query_servers.erl +++ b/src/couchdb/couch_query_servers.erl @@ -15,7 +15,7 @@ -export([start_link/0]). --export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3,stop/0]). +-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]). -export([start_doc_map/2, map_docs/2, stop_doc_map/1]). -export([reduce/3, rereduce/3,validate_doc_update/5]). -export([filter_docs/5]). @@ -38,9 +38,6 @@ start_link() -> gen_server:start_link({local, couch_query_servers}, couch_query_servers, [], []). -stop() -> - exit(whereis(couch_query_servers), normal). - start_doc_map(Lang, Functions) -> Proc = get_os_process(Lang), lists:foreach(fun(FunctionSource) -> @@ -219,17 +216,18 @@ init([]) -> ok = couch_config:register( fun("query_servers" ++ _, _) -> - ?MODULE:stop() + supervisor:terminate_child(couch_secondary_services, query_servers), + supervisor:restart_child(couch_secondary_services, query_servers) end), ok = couch_config:register( fun("native_query_servers" ++ _, _) -> - ?MODULE:stop() + supervisor:terminate_child(couch_secondary_services, query_servers), + [supervisor:restart_child(couch_secondary_services, query_servers)] end), Langs = ets:new(couch_query_server_langs, [set, private]), PidProcs = ets:new(couch_query_server_pid_langs, [set, private]), LangProcs = ets:new(couch_query_server_procs, [set, private]), - InUse = ets:new(couch_query_server_used, [set, private]), % 'query_servers' specifies an OS command-line to execute. lists:foreach(fun({Lang, Command}) -> true = ets:insert(Langs, {?l2b(Lang), @@ -244,75 +242,71 @@ init([]) -> process_flag(trap_exit, true), {ok, {Langs, % Keyed by language name, value is {Mod,Func,Arg} PidProcs, % Keyed by PID, valus is a #proc record. - LangProcs, % Keyed by language name, value is a #proc record - InUse % Keyed by PID, value is #proc record. + LangProcs % Keyed by language name, value is a #proc record }}. -terminate(_Reason, _Server) -> +terminate(_Reason, {_Langs, PidProcs, _LangProcs}) -> + [couch_util:shutdown_sync(P) || {P,_} <- ets:tab2list(PidProcs)], ok. -handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> +handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, _From, {Langs, PidProcs, LangProcs}=Server) -> % Note to future self. Add max process limit. Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>), case ets:lookup(LangProcs, Lang) of [{Lang, [P|Rest]}] -> % find a proc in the set that has the DDoc case proc_with_ddoc(DDoc, DDocKey, [P|Rest]) of - {ok, Proc} -> - % looks like the proc isn't getting dropped from the list. - % we need to change this to take a fun for equality checking - % so we can do a comparison on portnum - rem_from_list(LangProcs, Lang, Proc), - add_to_list(InUse, Lang, Proc), - {reply, {ok, Proc, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end; + {ok, Proc} -> + rem_from_list(LangProcs, Lang, Proc), + {reply, {ok, Proc, get_query_server_config()}, Server}; + Error -> + {reply, Error, Server} + end; _ -> case (catch new_process(Langs, Lang)) of {ok, Proc} -> add_value(PidProcs, Proc#proc.pid, Proc), case proc_with_ddoc(DDoc, DDocKey, [Proc]) of - {ok, Proc2} -> - rem_from_list(LangProcs, Lang, Proc), - add_to_list(InUse, Lang, Proc2), - {reply, {ok, Proc2, get_query_server_config()}, Server}; - Error -> - {reply, Error, Server} - end; + {ok, Proc2} -> + {reply, {ok, Proc2, get_query_server_config()}, Server}; + Error -> + {reply, Error, Server} + end; Error -> {reply, Error, Server} end end; -handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs, InUse}=Server) -> +handle_call({get_proc, Lang}, _From, {Langs, PidProcs, LangProcs}=Server) -> % Note to future self. Add max process limit. case ets:lookup(LangProcs, Lang) of [{Lang, [Proc|_]}] -> - add_value(PidProcs, Proc#proc.pid, Proc), rem_from_list(LangProcs, Lang, Proc), - add_to_list(InUse, Lang, Proc), {reply, {ok, Proc, get_query_server_config()}, Server}; _ -> case (catch new_process(Langs, Lang)) of {ok, Proc} -> add_value(PidProcs, Proc#proc.pid, Proc), - add_to_list(InUse, Lang, Proc), {reply, {ok, Proc, get_query_server_config()}, Server}; Error -> {reply, Error, Server} end end; -handle_call({ret_proc, Proc}, _From, {_, _, LangProcs, InUse}=Server) -> +handle_call({unlink_proc, Pid}, _From, {_, PidProcs, _}=Server) -> + rem_value(PidProcs, Pid), + unlink(Pid), + {reply, ok, Server}; +handle_call({ret_proc, Proc}, _From, {_, PidProcs, LangProcs}=Server) -> % Along with max process limit, here we should check % if we're over the limit and discard when we are. + add_value(PidProcs, Proc#proc.pid, Proc), add_to_list(LangProcs, Proc#proc.lang, Proc), - rem_from_list(InUse, Proc#proc.lang, Proc), + link(Proc#proc.pid), {reply, true, Server}. handle_cast(_Whatever, Server) -> {noreply, Server}. -handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) -> +handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs}=Server) -> case ets:lookup(PidProcs, Pid) of [{Pid, Proc}] -> case Status of @@ -321,11 +315,14 @@ handle_info({'EXIT', Pid, Status}, {_, PidProcs, LangProcs, InUse}=Server) -> end, rem_value(PidProcs, Pid), catch rem_from_list(LangProcs, Proc#proc.lang, Proc), - catch rem_from_list(InUse, Proc#proc.lang, Proc), {noreply, Server}; [] -> - ?LOG_DEBUG("Unknown linked process died: ~p (reason: ~p)", [Pid, Status]), - {stop, Status, Server} + case Status of + normal -> + {noreply, Server}; + _ -> + {stop, Status, Server} + end end. code_change(_OldVsn, State, _Extra) -> @@ -402,6 +399,7 @@ get_ddoc_process(#doc{} = DDoc, DDocKey) -> proc_set_timeout(Proc, list_to_integer(couch_config:get( "couchdb", "os_process_timeout", "5000"))), link(Proc#proc.pid), + gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), Proc; _ -> catch proc_stop(Proc), @@ -419,6 +417,7 @@ get_os_process(Lang) -> proc_set_timeout(Proc, list_to_integer(couch_config:get( "couchdb", "os_process_timeout", "5000"))), link(Proc#proc.pid), + gen_server:call(couch_query_servers, {unlink_proc, Proc#proc.pid}), Proc; _ -> catch proc_stop(Proc), diff --git a/src/couchdb/couch_ref_counter.erl b/src/couchdb/couch_ref_counter.erl index 5c8e7437..96d92333 100644 --- a/src/couchdb/couch_ref_counter.erl +++ b/src/couchdb/couch_ref_counter.erl @@ -40,17 +40,18 @@ count(RefCounterPid) -> -record(srv, { - referrers=dict:new() % a dict of each ref counting proc. + referrers=dict:new(), % a dict of each ref counting proc. + child_procs=[] }). init({Pid, ChildProcs}) -> [link(ChildProc) || ChildProc <- ChildProcs], Referrers = dict:from_list([{Pid, {erlang:monitor(process, Pid), 1}}]), - {ok, #srv{referrers=Referrers}}. + {ok, #srv{referrers=Referrers, child_procs=ChildProcs}}. -terminate(Reason, _Srv) -> - couch_util:terminate_linked(Reason), +terminate(_Reason, #srv{child_procs=ChildProcs}) -> + [couch_util:shutdown_sync(Pid) || Pid <- ChildProcs], ok. diff --git a/src/couchdb/couch_server.erl b/src/couchdb/couch_server.erl index 325d4a12..61794dea 100644 --- a/src/couchdb/couch_server.erl +++ b/src/couchdb/couch_server.erl @@ -147,8 +147,9 @@ init([]) -> max_dbs_open=MaxDbsOpen, start_time=httpd_util:rfc1123_date()}}. -terminate(Reason, _Srv) -> - couch_util:terminate_linked(Reason), +terminate(_Reason, _Srv) -> + [couch_util:shutdown_sync(Pid) || {_, {Pid, _LruTime}} <- + ets:tab2list(couch_dbs_by_name)], ok. all_databases() -> @@ -187,8 +188,7 @@ try_close_lru(StartTime) -> [{_, {MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName), case couch_db:is_idle(MainPid) of true -> - exit(MainPid, kill), - receive {'EXIT', MainPid, _Reason} -> ok end, + couch_util:shutdown_sync(MainPid), true = ets:delete(couch_dbs_by_lru, LruTime), true = ets:delete(couch_dbs_by_name, DbName), true = ets:delete(couch_dbs_by_pid, MainPid), @@ -284,8 +284,7 @@ handle_call({delete, DbName, _Options}, _From, Server) -> case ets:lookup(couch_dbs_by_name, DbName) of [] -> Server; [{_, {Pid, LruTime}}] -> - exit(Pid, kill), - receive {'EXIT', Pid, _Reason} -> ok end, + couch_util:shutdown_sync(Pid), true = ets:delete(couch_dbs_by_name, DbName), true = ets:delete(couch_dbs_by_pid, Pid), true = ets:delete(couch_dbs_by_lru, LruTime), @@ -315,14 +314,18 @@ handle_cast(Msg, _Server) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -handle_info({'EXIT', _Pid, config_change}, _Server) -> - exit(kill); -handle_info({'EXIT', Pid, _Reason}, #server{dbs_open=DbsOpen}=Server) -> - [{Pid, DbName}] = ets:lookup(couch_dbs_by_pid, Pid), - [{DbName, {Pid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_pid, Pid), - true = ets:delete(couch_dbs_by_name, DbName), - true = ets:delete(couch_dbs_by_lru, LruTime), - {noreply, Server#server{dbs_open=DbsOpen - 1}}; +handle_info({'EXIT', _Pid, config_change}, Server) -> + {stop, shutdown, Server}; +handle_info({'EXIT', Pid, Reason}, #server{dbs_open=DbsOpen}=Server) -> + case ets:lookup(couch_dbs_by_pid, Pid) of + [{Pid, DbName}] -> + [{DbName, {Pid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName), + true = ets:delete(couch_dbs_by_pid, Pid), + true = ets:delete(couch_dbs_by_name, DbName), + true = ets:delete(couch_dbs_by_lru, LruTime), + {noreply, Server#server{dbs_open=DbsOpen - 1}}; + _ -> + {stop,Reason,Server} + end; handle_info(Info, _Server) -> exit({unknown_message, Info}). diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl index d89e987d..f1f8f692 100644 --- a/src/couchdb/couch_server_sup.erl +++ b/src/couchdb/couch_server_sup.erl @@ -174,7 +174,7 @@ start_secondary_services() -> {list_to_atom(Name), {Module, Fun, Args}, permanent, - brutal_kill, + 1000, worker, [Module]} end diff --git a/src/couchdb/couch_stats_collector.erl b/src/couchdb/couch_stats_collector.erl index 59d62a6e..f7b9bb48 100644 --- a/src/couchdb/couch_stats_collector.erl +++ b/src/couchdb/couch_stats_collector.erl @@ -60,7 +60,7 @@ increment(Key) -> Key2 = make_key(Key), case catch ets:update_counter(?HIT_TABLE, Key2, 1) of {'EXIT', {badarg, _}} -> - true = ets:insert(?HIT_TABLE, {Key2, 1}), + catch ets:insert(?HIT_TABLE, {Key2, 1}), ok; _ -> ok @@ -70,16 +70,16 @@ decrement(Key) -> Key2 = make_key(Key), case catch ets:update_counter(?HIT_TABLE, Key2, -1) of {'EXIT', {badarg, _}} -> - true = ets:insert(?HIT_TABLE, {Key2, -1}), + catch ets:insert(?HIT_TABLE, {Key2, -1}), ok; _ -> ok end. record(Key, Value) -> - true = ets:insert(?ABS_TABLE, {make_key(Key), Value}). + catch ets:insert(?ABS_TABLE, {make_key(Key), Value}). clear(Key) -> - true = ets:delete(?ABS_TABLE, make_key(Key)). + catch ets:delete(?ABS_TABLE, make_key(Key)). track_process_count(Stat) -> track_process_count(self(), Stat). diff --git a/src/couchdb/couch_util.erl b/src/couchdb/couch_util.erl index ca36a6db..a9831fde 100644 --- a/src/couchdb/couch_util.erl +++ b/src/couchdb/couch_util.erl @@ -12,7 +12,7 @@ -module(couch_util). --export([priv_dir/0, start_driver/1,terminate_linked/1]). +-export([priv_dir/0, start_driver/1]). -export([should_flush/0, should_flush/1, to_existing_atom/1]). -export([rand32/0, implode/2, collate/2, collate/3]). -export([abs_pathname/1,abs_pathname/2, trim/1, ascii_lower/1]). @@ -21,7 +21,7 @@ -export([file_read_size/1, get_nested_json_value/2, json_user_ctx/1]). -export([to_binary/1, to_integer/1, to_list/1, url_encode/1]). -export([json_encode/1, json_decode/1]). --export([verify/2]). +-export([verify/2,simple_call/2,shutdown_sync/1]). -export([compressible_att_type/1]). -include("couch_db.hrl"). @@ -59,14 +59,35 @@ to_existing_atom(V) when is_binary(V) -> to_existing_atom(V) when is_atom(V) -> V. +shutdown_sync(Pid) when not is_pid(Pid)-> + ok; +shutdown_sync(Pid) -> + MRef = erlang:monitor(process, Pid), + try + catch unlink(Pid), + catch exit(Pid, shutdown), + receive + {'DOWN', MRef, _, _, _} -> + ok + end + after + erlang:demonitor(MRef, [flush]) + end. + -terminate_linked(normal) -> - terminate_linked(shutdown); -terminate_linked(Reason) -> - {links, Links} = process_info(self(), links), - [catch exit(Pid, Reason) || Pid <- Links], - ok. - +simple_call(Pid, Message) -> + MRef = erlang:monitor(process, Pid), + try + Pid ! {self(), Message}, + receive + {Pid, Result} -> + Result; + {'DOWN', MRef, _, _, Reason} -> + exit(Reason) + end + after + erlang:demonitor(MRef, [flush]) + end. to_hex([]) -> []; diff --git a/src/couchdb/couch_view.erl b/src/couchdb/couch_view.erl index cbbbd4ac..234460ce 100644 --- a/src/couchdb/couch_view.erl +++ b/src/couchdb/couch_view.erl @@ -269,8 +269,9 @@ init([]) -> {ok, #server{root_dir=RootDir}}. -terminate(Reason, _Srv) -> - couch_util:terminate_linked(Reason), +terminate(_Reason, _Srv) -> + [couch_util:shutdown_sync(Pid) || {Pid, _} <- + ets:tab2list(couch_groups_by_updater)], ok. @@ -311,10 +312,7 @@ do_reset_indexes(DbName, Root) -> fun({_DbName, Sig}) -> ?LOG_DEBUG("Killing update process for view group ~s. in database ~s.", [Sig, DbName]), [{_, Pid}] = ets:lookup(group_servers_by_sig, {DbName, Sig}), - exit(Pid, kill), - receive {'EXIT', Pid, _} -> - delete_from_ets(Pid, DbName, Sig) - end + couch_util:shutdown_sync(Pid) end, Names), delete_index_dir(Root, DbName), file:delete(Root ++ "/." ++ ?b2l(DbName) ++ "_temp"). diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index bb4102c2..68966a88 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -341,8 +341,8 @@ handle_info({'DOWN',_,_,_,_}, State) -> terminate(Reason, #group_state{updater_pid=Update, compactor_pid=Compact}=S) -> reply_all(S, Reason), - catch exit(Update, Reason), - catch exit(Compact, Reason), + couch_util:shutdown_sync(Update), + couch_util:shutdown_sync(Compact), ok. code_change(_OldVsn, State, _Extra) -> |