diff options
Diffstat (limited to 'apps/couch/src/couch_task_status.erl')
-rw-r--r-- | apps/couch/src/couch_task_status.erl | 99 |
1 files changed, 60 insertions, 39 deletions
diff --git a/apps/couch/src/couch_task_status.erl b/apps/couch/src/couch_task_status.erl index 639515c7..62327e27 100644 --- a/apps/couch/src/couch_task_status.erl +++ b/apps/couch/src/couch_task_status.erl @@ -13,14 +13,20 @@ -module(couch_task_status). -behaviour(gen_server). -% This module allows is used to track the status of long running tasks. -% Long running tasks register (add_task/3) then update their status (update/1) -% and the task and status is added to tasks list. When the tracked task dies -% it will be automatically removed the tracking. To get the tasks list, use the -% all/0 function +% This module is used to track the status of long running tasks. +% Long running tasks register themselves, via a call to add_task/1, and then +% update their status properties via update/1. The status of a task is a +% list of properties. Each property is a tuple, with the first element being +% either an atom or a binary and the second element must be an EJSON value. When +% a task updates its status, it can override some or all of its properties. +% The properties {started_on, UnitTimestamp}, {updated_on, UnixTimestamp} and +% {pid, ErlangPid} are automatically added by this module. +% When a tracked task dies, its status will be automatically removed from +% memory. To get the tasks list, call the all/0 function. -export([start_link/0, stop/0]). --export([all/0, add_task/3, update/1, update/2, set_update_frequency/1]). +-export([all/0, add_task/1, update/1, get/1, set_update_frequency/1]). +-export([is_task_added/0]). -export([init/1, terminate/2, code_change/3]). -export([handle_call/3, handle_cast/2, handle_info/2]). @@ -29,6 +35,7 @@ -include("couch_db.hrl"). +-define(set(L, K, V), lists:keystore(K, 1, L, {K, V})). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -41,32 +48,41 @@ stop() -> all() -> gen_server:call(?MODULE, all). - -add_task(Type, TaskName, StatusText) -> +add_task(Props) -> put(task_status_update, {{0, 0, 0}, 0}), - Msg = { - add_task, - to_binary(Type), - to_binary(TaskName), - to_binary(StatusText) - }, - gen_server:call(?MODULE, Msg). + Ts = timestamp(), + TaskProps = lists:ukeysort( + 1, [{started_on, Ts}, {updated_on, Ts} | Props]), + put(task_status_props, TaskProps), + gen_server:call(?MODULE, {add_task, TaskProps}). +is_task_added() -> + is_list(erlang:get(task_status_props)). set_update_frequency(Msecs) -> put(task_status_update, {{0, 0, 0}, Msecs * 1000}). -update(StatusText) -> - update("~s", [StatusText]). +update(Props) -> + MergeProps = lists:ukeysort(1, Props), + TaskProps = lists:ukeymerge(1, MergeProps, erlang:get(task_status_props)), + put(task_status_props, TaskProps), + maybe_persist(TaskProps). + +get(Props) when is_list(Props) -> + TaskProps = erlang:get(task_status_props), + [couch_util:get_value(P, TaskProps) || P <- Props]; +get(Prop) -> + TaskProps = erlang:get(task_status_props), + couch_util:get_value(Prop, TaskProps). -update(Format, Data) -> - {LastUpdateTime, Frequency} = get(task_status_update), +maybe_persist(TaskProps0) -> + {LastUpdateTime, Frequency} = erlang:get(task_status_update), case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of true -> put(task_status_update, {Now, Frequency}), - Msg = ?l2b(io_lib:format(Format, Data)), - gen_server:cast(?MODULE, {update_status, self(), Msg}); + TaskProps = ?set(TaskProps0, updated_on, timestamp(Now)), + gen_server:cast(?MODULE, {update_status, self(), TaskProps}); false -> ok end. @@ -82,32 +98,29 @@ terminate(_Reason,_State) -> ok. -handle_call({add_task, Type, TaskName, StatusText}, {From, _}, Server) -> +handle_call({add_task, TaskProps}, {From, _}, Server) -> case ets:lookup(?MODULE, From) of [] -> - true = ets:insert(?MODULE, {From, {Type, TaskName, StatusText}}), + true = ets:insert(?MODULE, {From, TaskProps}), erlang:monitor(process, From), {reply, ok, Server}; [_] -> {reply, {add_task_error, already_registered}, Server} end; handle_call(all, _, Server) -> - All = [ - [ - {type, Type}, - {task, Task}, - {status, Status}, - {pid, ?l2b(pid_to_list(Pid))} - ] - || - {Pid, {Type, Task, Status}} <- ets:tab2list(?MODULE) - ], - {reply, All, Server}. - - -handle_cast({update_status, Pid, StatusText}, Server) -> - [{Pid, {Type, TaskName, _StatusText}}] = ets:lookup(?MODULE, Pid), - true = ets:insert(?MODULE, {Pid, {Type, TaskName, StatusText}}), + All = ets:tab2list(?MODULE), + {reply, tasks_to_json(All), Server}. + +handle_cast({update_status, Pid, NewProps}, Server) -> + case ets:lookup(?MODULE, Pid) of + [{Pid, _CurProps}] -> + ?LOG_DEBUG("New task status for ~p: ~p", [Pid, NewProps]), + true = ets:insert(?MODULE, {Pid, NewProps}); + _ -> + % Task finished/died in the meanwhile and we must have received + % a monitor message before this call - ignore. + ok + end, {noreply, Server}; handle_cast(stop, State) -> {stop, normal, State}. @@ -121,3 +134,11 @@ handle_info({'DOWN', _MonitorRef, _Type, Pid, _Info}, Server) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +timestamp() -> + timestamp(now()). + +timestamp({Mega, Secs, _}) -> + Mega * 1000000 + Secs. + +tasks_to_json(Tasks) -> + [ [{pid, ?l2b(pid_to_list(Pid))} | Props] || {Pid, Props} <- Tasks]. |