summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPaul Joseph Davis <davisp@apache.org>2010-11-05 23:14:02 +0000
committerPaul Joseph Davis <davisp@apache.org>2010-11-05 23:14:02 +0000
commitc8785113522b8486fab0ba53f2043d94e9b1507f (patch)
treef6d1c35ab8ce540c172e8faf2464b9b08d3b2302 /src
parentc82960cdd8e84a4db0dbbefca425bbb49aeddfa3 (diff)
Enable CouchDB to manage OS process daemons.
This is a simple feature that allows useres to configure CouchDB so that it maintains a given OS level process alive. If the process dies for any reason, CouchDB will restart it. If the process restarts too often, then CouchDB will mark it has halted and not attempt to restart it. The default max restart rate is three times in the last five seconds. These parameters are adjustable. Commands that are started in this manner will have access to a simple API over stdio to request configuration parameters or to add log statements to CouchDB's logs. To configure an OS process as a CouchDB os_daemon, create a section in your local.ini like such: [os_daemons] daemon_name = /path/to/command -with args This will make CouchDB bring up the command and attempt to keep it alive. To request a configuration parameter, an os_daemon can write a simple JSON message to stdout like such: ["get", "os_daemons"]\n which would return: {"daemon_name": "/path/to/command -with args"} Or: ["get", "os_daemons", "daemon_name"]\n which would return: "/path/to/command -with args" There's no restriction on what configuration variables are visible. There's also no method for altering the configuration. If you would like your OS daemon to be restarted in the event that the configuration changes, you can send the following messages: ["register", $(SECTION)]\n When anyting in that section changes, your OS process will be rebooted so it can pick up the new configuration settings. If you want to listen for changes on a specific key, you can send something like: ["register", $(SECTION), $(KEY)]\n In this case, CouchDB will only restart your daemon if that exact section/key pair changes, instead of anything in that entire section. Logging commands look like: ["log", $(JSON_MESSAGE)]\n Where $(JSON_MESSAGE) is arbitrary JSON data. These messages are logged at the 'info' level. If you want to log at a different level you can pass messages like such: ["log", $(JSON_MESSAGE), {"level": $(LEVEL)}]\n Where $(LEVEL) is one of "debug", "info", or "error". When implementing a daemon process to be managed by CouchDB you should remember to use a method like checking the parent process id or if stdin has been closed. These flags can tell you if your daemon process has been orphaned so you can exit cleanly. git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@1031875 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_os_daemons.erl359
2 files changed, 361 insertions, 0 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index badfb104..8bf136d6 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -56,6 +56,7 @@ source_files = \
couch_key_tree.erl \
couch_log.erl \
couch_native_process.erl \
+ couch_os_daemons.erl \
couch_os_process.erl \
couch_query_servers.erl \
couch_ref_counter.erl \
@@ -116,6 +117,7 @@ compiled_files = \
couch_key_tree.beam \
couch_log.beam \
couch_native_process.beam \
+ couch_os_daemons.beam \
couch_os_process.beam \
couch_query_servers.beam \
couch_ref_counter.beam \
diff --git a/src/couchdb/couch_os_daemons.erl b/src/couchdb/couch_os_daemons.erl
new file mode 100644
index 00000000..daf98f25
--- /dev/null
+++ b/src/couchdb/couch_os_daemons.erl
@@ -0,0 +1,359 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(couch_os_daemons).
+-behaviour(gen_server).
+
+-export([start_link/0, info/0, info/1, config_change/2]).
+
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+
+-record(daemon, {
+ port,
+ name,
+ cmd,
+ kill,
+ status=running,
+ cfg_patterns=[],
+ errors=[],
+ buf=[]
+}).
+
+-define(PORT_OPTIONS, [stream, {line, 1024}, binary, exit_status, hide]).
+-define(TIMEOUT, 5000).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+info() ->
+ info([]).
+
+info(Options) ->
+ gen_server:call(?MODULE, {daemon_info, Options}).
+
+config_change(Section, Key) ->
+ gen_server:cast(?MODULE, {config_change, Section, Key}).
+
+init(_) ->
+ process_flag(trap_exit, true),
+ ok = couch_config:register(fun couch_os_daemons:config_change/2),
+ Table = ets:new(?MODULE, [protected, set, {keypos, #daemon.port}]),
+ reload_daemons(Table),
+ {ok, Table}.
+
+terminate(_Reason, Table) ->
+ [stop_port(D) || D <- ets:tab2list(Table)],
+ ok.
+
+handle_call({daemon_info, Options}, _From, Table) when is_list(Options) ->
+ case lists:member(table, Options) of
+ true ->
+ {reply, {ok, ets:tab2list(Table)}, Table};
+ _ ->
+ {reply, {ok, Table}, Table}
+ end;
+handle_call(Msg, From, Table) ->
+ ?LOG_ERROR("Unknown call message to ~p from ~p: ~p", [?MODULE, From, Msg]),
+ {stop, error, Table}.
+
+handle_cast({config_change, Sect, Key}, Table) ->
+ restart_daemons(Table, Sect, Key),
+ case Sect of
+ "os_daemons" -> reload_daemons(Table);
+ _ -> ok
+ end,
+ {noreply, Table};
+handle_cast(stop, Table) ->
+ {stop, normal, Table};
+handle_cast(Msg, Table) ->
+ ?LOG_ERROR("Unknown cast message to ~p: ~p", [?MODULE, Msg]),
+ {stop, error, Table}.
+
+handle_info({'EXIT', Port, Reason}, Table) ->
+ case ets:lookup(Table, Port) of
+ [] ->
+ ?LOG_INFO("Port ~p exited after stopping: ~p~n", [Port, Reason]);
+ [#daemon{status=stopping}] ->
+ true = ets:delete(Table, Port);
+ [#daemon{name=Name, status=restarting, errors=Errs}=D] ->
+ ?LOG_INFO("Daemon ~P restarting after config change.", [Name]),
+ true = ets:delete(Table, Port),
+ {ok, Port2} = start_port(D#daemon.cmd),
+ true = ets:insert(Table, D#daemon{
+ port=Port2, status=running, kill=undefined, errors=Errs, buf=[]
+ });
+ [#daemon{name=Name, status=halted}] ->
+ ?LOG_ERROR("Halted daemon process: ~p", [Name]);
+ [D] ->
+ ?LOG_ERROR("Invalid port state at exit: ~p", [D])
+ end,
+ {noreply, Table};
+handle_info({Port, closed}, Table) ->
+ handle_info({Port, {exit_status, closed}}, Table);
+handle_info({Port, {exit_status, Status}}, Table) ->
+ case ets:lookup(Table, Port) of
+ [] ->
+ ?LOG_ERROR("Unknown port ~p exiting ~p", [Port, Status]),
+ {stop, {error, unknown_port_died, Status}, Table};
+ [#daemon{name=Name, status=restarting, errors=Errors}=D] ->
+ ?LOG_INFO("Daemon ~P restarting after config change.", [Name]),
+ true = ets:delete(Table, Port),
+ {ok, Port2} = start_port(D#daemon.cmd),
+ true = ets:insert(Table, D#daemon{
+ port=Port2, kill=undefined, errors=Errors, buf=[]
+ }),
+ {noreply, Table};
+ [#daemon{status=stopping}=D] ->
+ % The configuration changed and this daemon is no
+ % longer needed.
+ ?LOG_DEBUG("Port ~p shut down.", [D#daemon.name]),
+ true = ets:delete(Table, Port),
+ {noreply, Table};
+ [D] ->
+ % Port died for unknown reason. Check to see if it's
+ % died too many times or if we should boot it back up.
+ case should_halt([now() | D#daemon.errors]) of
+ {true, _} ->
+ % Halting the process. We won't try and reboot
+ % until the configuration changes.
+ Fmt = "Daemon ~p halted with exit_status ~p",
+ ?LOG_ERROR(Fmt, [D#daemon.name, Status]),
+ D2 = D#daemon{status=halted, errors=nil, buf=nil},
+ true = ets:insert(Table, D2),
+ {noreply, Table};
+ {false, Errors} ->
+ % We're guessing it was a random error, this daemon
+ % has behaved so we'll give it another chance.
+ Fmt = "Daemon ~p is being rebooted after exit_status ~p",
+ ?LOG_INFO(Fmt, [D#daemon.name, Status]),
+ true = ets:delete(Table, Port),
+ {ok, Port2} = start_port(D#daemon.cmd),
+ true = ets:insert(Table, D#daemon{
+ port=Port2, kill=undefined, errors=Errors, buf=[]
+ }),
+ {noreply, Table}
+ end;
+ _Else ->
+ throw(error)
+ end;
+handle_info({Port, {data, {noeol, Data}}}, Table) ->
+ [#daemon{buf=Buf}=D] = ets:lookup(Table, Port),
+ true = ets:insert(Table, D#daemon{buf=[Data | Buf]}),
+ {noreply, Table};
+handle_info({Port, {data, {eol, Data}}}, Table) ->
+ [#daemon{buf=Buf}=D] = ets:lookup(Table, Port),
+ Line = lists:reverse(Buf, Data),
+ % The first line echoed back is the kill command
+ % for when we go to get rid of the port. Lines after
+ % that are considered part of the stdio API.
+ case D#daemon.kill of
+ undefined ->
+ true = ets:insert(Table, D#daemon{kill=?b2l(Line), buf=[]});
+ _Else ->
+ D2 = case (catch ?JSON_DECODE(Line)) of
+ {invalid_json, Rejected} ->
+ ?LOG_ERROR("Ignoring OS daemon request: ~p", [Rejected]),
+ D;
+ JSON ->
+ {ok, D3} = handle_port_message(D, JSON),
+ D3
+ end,
+ true = ets:insert(Table, D2#daemon{buf=[]})
+ end,
+ {noreply, Table};
+handle_info({Port, Error}, Table) ->
+ ?LOG_ERROR("Unexpectd message from port ~p: ~p", [Port, Error]),
+ stop_port(Port),
+ [D] = ets:lookup(Table, Port),
+ true = ets:insert(Table, D#daemon{status=restarting, buf=nil}),
+ {noreply, Table};
+handle_info(Msg, Table) ->
+ ?LOG_ERROR("Unexpected info message to ~p: ~p", [?MODULE, Msg]),
+ {stop, error, Table}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+% Internal API
+
+%
+% Port management helpers
+%
+
+start_port(Command) ->
+ PrivDir = couch_util:priv_dir(),
+ Spawnkiller = filename:join(PrivDir, "couchspawnkillable"),
+ Port = open_port({spawn, Spawnkiller ++ " " ++ Command}, ?PORT_OPTIONS),
+ {ok, Port}.
+
+
+stop_port(#daemon{port=Port, kill=undefined}=D) ->
+ ?LOG_ERROR("Stopping daemon without a kill command: ~p", [D#daemon.name]),
+ catch port_close(Port);
+stop_port(#daemon{port=Port}=D) ->
+ ?LOG_DEBUG("Stopping daemon: ~p", [D#daemon.name]),
+ os:cmd(D#daemon.kill),
+ catch port_close(Port).
+
+
+handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section]) ->
+ KVs = couch_config:get(Section),
+ Data = lists:map(fun({K, V}) -> {?l2b(K), ?l2b(V)} end, KVs),
+ JsonData = ?JSON_ENCODE({Data}),
+ port_command(Port, JsonData ++ "\n"),
+ {ok, Daemon};
+handle_port_message(#daemon{port=Port}=Daemon, [<<"get">>, Section, Key]) ->
+ Value = couch_config:get(Section, Key, null),
+ port_command(Port, ?JSON_ENCODE(?l2b(Value)) ++ "\n"),
+ {ok, Daemon};
+handle_port_message(Daemon, [<<"register">>, Sec]) when is_binary(Sec) ->
+ Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [{?b2l(Sec)}]),
+ {ok, Daemon#daemon{cfg_patterns=Patterns}};
+handle_port_message(Daemon, [<<"register">>, Sec, Key])
+ when is_binary(Sec) andalso is_binary(Key) ->
+ Pattern = {?b2l(Sec), ?b2l(Key)},
+ Patterns = lists:usort(Daemon#daemon.cfg_patterns ++ [Pattern]),
+ {ok, Daemon#daemon{cfg_patterns=Patterns}};
+handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg]) ->
+ handle_log_message(Name, Msg, <<"info">>),
+ {ok, Daemon};
+handle_port_message(#daemon{name=Name}=Daemon, [<<"log">>, Msg, {Opts}]) ->
+ Level = couch_util:get_value(<<"level">>, Opts, <<"info">>),
+ handle_log_message(Name, Msg, Level),
+ {ok, Daemon};
+handle_port_message(#daemon{name=Name}=Daemon, Else) ->
+ ?LOG_ERROR("Daemon ~p made invalid request: ~p", [Name, Else]),
+ {ok, Daemon}.
+
+
+handle_log_message(Name, Msg, _Level) when not is_binary(Msg) ->
+ ?LOG_ERROR("Invalid log message from daemon ~p: ~p", [Name, Msg]);
+handle_log_message(Name, Msg, <<"debug">>) ->
+ ?LOG_DEBUG("Daemon ~p :: ~s", [Name, ?b2l(Msg)]);
+handle_log_message(Name, Msg, <<"info">>) ->
+ ?LOG_INFO("Daemon ~p :: ~s", [Name, ?b2l(Msg)]);
+handle_log_message(Name, Msg, <<"error">>) ->
+ ?LOG_ERROR("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]);
+handle_log_message(Name, Msg, Level) ->
+ ?LOG_ERROR("Invalid log level from daemon: ~p", [Level]),
+ ?LOG_INFO("Daemon: ~p :: ~s", [Name, ?b2l(Msg)]).
+
+%
+% Daemon management helpers
+%
+
+reload_daemons(Table) ->
+ % List of daemons we want to have running.
+ Configured = lists:sort(couch_config:get("os_daemons")),
+
+ % Remove records for daemons that were halted.
+ MSpecHalted = #daemon{name='$1', cmd='$2', status=halted, _='_'},
+ Halted = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecHalted)]),
+ ok = stop_os_daemons(Table, find_to_stop(Configured, Halted, [])),
+
+ % Stop daemons that are running
+ % Start newly configured daemons
+ MSpecRunning = #daemon{name='$1', cmd='$2', status=running, _='_'},
+ Running = lists:sort([{N, C} || [N, C] <- ets:match(Table, MSpecRunning)]),
+ ok = stop_os_daemons(Table, find_to_stop(Configured, Running, [])),
+ ok = boot_os_daemons(Table, find_to_boot(Configured, Running, [])),
+ ok.
+
+
+restart_daemons(Table, Sect, Key) ->
+ restart_daemons(Table, Sect, Key, ets:first(Table)).
+
+restart_daemons(_, _, _, '$end_of_table') ->
+ ok;
+restart_daemons(Table, Sect, Key, Port) ->
+ [D] = ets:lookup(Table, Port),
+ HasSect = lists:member({Sect}, D#daemon.cfg_patterns),
+ HasKey = lists:member({Sect, Key}, D#daemon.cfg_patterns),
+ case HasSect or HasKey of
+ true ->
+ stop_port(D),
+ D2 = D#daemon{status=restarting, buf=nil},
+ true = ets:insert(Table, D2);
+ _ ->
+ ok
+ end,
+ restart_daemons(Table, Sect, Key, ets:next(Table, Port)).
+
+
+stop_os_daemons(_Table, []) ->
+ ok;
+stop_os_daemons(Table, [{Name, Cmd} | Rest]) ->
+ [[Port]] = ets:match(Table, #daemon{port='$1', name=Name, cmd=Cmd, _='_'}),
+ [D] = ets:lookup(Table, Port),
+ case D#daemon.status of
+ halted ->
+ ets:delete(Table, Port);
+ _ ->
+ stop_port(D),
+ D2 = D#daemon{status=stopping, errors=nil, buf=nil},
+ true = ets:insert(Table, D2)
+ end,
+ stop_os_daemons(Table, Rest).
+
+boot_os_daemons(_Table, []) ->
+ ok;
+boot_os_daemons(Table, [{Name, Cmd} | Rest]) ->
+ {ok, Port} = start_port(Cmd),
+ true = ets:insert(Table, #daemon{port=Port, name=Name, cmd=Cmd}),
+ boot_os_daemons(Table, Rest).
+
+% Elements unique to the configured set need to be booted.
+find_to_boot([], _Rest, Acc) ->
+ % Nothing else configured.
+ Acc;
+find_to_boot([D | R1], [D | R2], Acc) ->
+ % Elements are equal, daemon already running.
+ find_to_boot(R1, R2, Acc);
+find_to_boot([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 ->
+ find_to_boot(R1, A2, [D1 | Acc]);
+find_to_boot(A1, [_ | R2], Acc) ->
+ find_to_boot(A1, R2, Acc);
+find_to_boot(Rest, [], Acc) ->
+ % No more candidates for already running. Boot all.
+ Rest ++ Acc.
+
+% Elements unique to the running set need to be killed.
+find_to_stop([], Rest, Acc) ->
+ % The rest haven't been found, so they must all
+ % be ready to die.
+ Rest ++ Acc;
+find_to_stop([D | R1], [D | R2], Acc) ->
+ % Elements are equal, daemon already running.
+ find_to_stop(R1, R2, Acc);
+find_to_stop([D1 | R1], [D2 | _]=A2, Acc) when D1 < D2 ->
+ find_to_stop(R1, A2, Acc);
+find_to_stop(A1, [D2 | R2], Acc) ->
+ find_to_stop(A1, R2, [D2 | Acc]);
+find_to_stop(_, [], Acc) ->
+ % No more running daemons to worry about.
+ Acc.
+
+should_halt(Errors) ->
+ RetryTimeCfg = couch_config:get("os_daemon_settings", "retry_time", "5"),
+ RetryTime = list_to_integer(RetryTimeCfg),
+
+ Now = now(),
+ RecentErrors = lists:filter(fun(Time) ->
+ timer:now_diff(Now, Time) =< RetryTime * 1000000
+ end, Errors),
+
+ RetryCfg = couch_config:get("os_daemon_settings", "max_retries", "3"),
+ Retries = list_to_integer(RetryCfg),
+
+ {length(RecentErrors) >= Retries, RecentErrors}.