summaryrefslogtreecommitdiff
path: root/apps/couch
diff options
context:
space:
mode:
authorRobert Newson <robert.newson@cloudant.com>2012-11-14 19:32:23 +0000
committerRobert Newson <robert.newson@cloudant.com>2012-11-15 11:23:46 +0000
commit19017b3612b4567ced3b8ac9cfe816e6cf80e0f8 (patch)
treea0ffdab1271fb3ffd0c95b22340c7f10c5e98e53 /apps/couch
parentbf4d46be7037ddea375bdaf2bab067e29f2c423f (diff)
Backport new /_active_tasks API
Improved _active_tasks API Tasks are now free to set any properties they wish (as an Erlang proplist). Different tasks can have different properties and the status string doesn't exist anymore - instead client applications can build it using more granular properties from _active_tasks. Some of these properties are: 1) "progress" (an integer percentage, for all tasks) 2) "database" (for compactions and indexer tasks) 3) "design_document" (for indexer and view compaction tasks) 4) "source" and "target" (for replications) 5) "docs_read", "docs_written", "doc_write_failures", "missing_revs_found", "missing_revs_checked", "source_seq", "checkpointed_source_seq" and "continuous" for replications BugzID: 14269 Conflicts: apps/couch/src/couch_db_updater.erl apps/couch/src/couch_rep.erl apps/couch/src/couch_task_status.erl apps/couch/src/couch_view_compactor.erl apps/couch/src/couch_view_updater.erl
Diffstat (limited to 'apps/couch')
-rw-r--r--apps/couch/src/couch_db_updater.erl35
-rw-r--r--apps/couch/src/couch_rep.erl32
-rw-r--r--apps/couch/src/couch_task_status.erl99
-rw-r--r--apps/couch/src/couch_view_compactor.erl14
-rw-r--r--apps/couch/src/couch_view_updater.erl23
-rwxr-xr-xapps/couch/test/etap/090-task-status.t169
6 files changed, 254 insertions, 118 deletions
diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl
index 1f25186a..69580602 100644
--- a/apps/couch/src/couch_db_updater.erl
+++ b/apps/couch/src/couch_db_updater.erl
@@ -872,6 +872,7 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) ->
NewDb#db.seq_tree, NewInfos, RemoveSeqs),
{ok, IdTree} = couch_btree:add_remove(
NewDb#db.id_tree, NewInfos, []),
+ update_compact_task(length(NewInfos)),
NewDb#db{id_tree=IdTree, seq_tree=SeqTree}.
@@ -903,15 +904,31 @@ copy_compact(Db, NewDb0, Retry) ->
end
end,
- couch_task_status:set_update_frequency(500),
+ TaskProps0 = [
+ {type, database_compaction},
+ {database, Db#db.name},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ],
+ case Retry and couch_task_status:is_task_added() of
+ true ->
+ couch_task_status:update([
+ {retry, true},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ]);
+ false ->
+ couch_task_status:add_task(TaskProps0),
+ couch_task_status:set_update_frequency(500)
+ end,
{ok, _, {NewDb2, Uncopied, TotalChanges}} =
couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
{NewDb, [], 0},
[{start_key, NewDb#db.update_seq + 1}]),
- couch_task_status:update("Flushing"),
-
NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
% copy misc header values
@@ -929,7 +946,6 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_file:open(CompactFile) of
{ok, Fd} ->
- couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
case couch_file:read_header(Fd) of
{ok, Header} ->
@@ -938,7 +954,6 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
ok = couch_file:write_header(Fd, Header=#db_header{})
end;
{error, enoent} ->
- couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
ok = couch_file:write_header(Fd, Header=#db_header{})
@@ -957,3 +972,13 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P
close_db(NewDb3),
gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}).
+update_compact_task(NumChanges) ->
+ [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
+ Changes2 = Changes + NumChanges,
+ Progress = case Total of
+ 0 ->
+ 0;
+ _ ->
+ (Changes2 * 100) div Total
+ end,
+ couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl
index 2bdb66d1..80720375 100644
--- a/apps/couch/src/couch_rep.erl
+++ b/apps/couch/src/couch_rep.erl
@@ -172,9 +172,18 @@ do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) ->
ets:insert(Stats, {docs_written, 0}),
ets:insert(Stats, {doc_write_failures, 0}),
- {ShortId, _} = lists:split(6, BaseId),
- couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s",
- [ShortId, dbname(Source), dbname(Target)]), "Starting"),
+ couch_task_status:add_task([
+ {user, UserCtx#user_ctx.name},
+ {type, replication},
+ {replication_id, ?l2b(RepId)},
+ {source, dbname(Source)},
+ {target, dbname(Target)},
+ {continuous, Continuous},
+ {docs_read, 0},
+ {docs_written, 0},
+ {doc_write_failures, 0}
+ ]),
+ couch_task_status:set_update_frequency(1000),
State = #state{
changes_feed = ChangesFeed,
@@ -230,15 +239,16 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({missing_revs_checkpoint, SourceSeq}, State) ->
- couch_task_status:update("MR Processed source update #~p", [SourceSeq]),
- {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
-
+ NewState = schedule_checkpoint(State#state{committed_seq = SourceSeq}),
+ update_task(NewState),
+ {noreply, NewState};
handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State)
when SourceSeq > N ->
MissingRevs = State#state.missing_revs,
ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}),
- couch_task_status:update("W Processed source update #~p", [SourceSeq]),
- {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})};
+ NewState = schedule_checkpoint(State#state{committed_seq = SourceSeq}),
+ update_task(NewState),
+ {noreply, NewState};
handle_info({writer_checkpoint, _}, State) ->
{noreply, State};
@@ -430,7 +440,6 @@ do_terminate(State) ->
false ->
[gen_server:reply(R, retry) || R <- OtherListeners]
end,
- couch_task_status:update("Finishing"),
terminate_cleanup(State).
terminate_cleanup(State) ->
@@ -916,3 +925,8 @@ target_db_update_notifier(#db{name = DbName}) ->
Notifier;
target_db_update_notifier(_) ->
nil.
+
+update_task(#state{stats=Stats}) ->
+ Update = [ {Stat, ets:lookup_element(Stats, Stat, 2)} || Stat <-
+ [total_revs, missing_revs, docs_read, docs_written, doc_write_failures]],
+ couch_task_status:update(Update).
diff --git a/apps/couch/src/couch_task_status.erl b/apps/couch/src/couch_task_status.erl
index 639515c7..62327e27 100644
--- a/apps/couch/src/couch_task_status.erl
+++ b/apps/couch/src/couch_task_status.erl
@@ -13,14 +13,20 @@
-module(couch_task_status).
-behaviour(gen_server).
-% This module allows is used to track the status of long running tasks.
-% Long running tasks register (add_task/3) then update their status (update/1)
-% and the task and status is added to tasks list. When the tracked task dies
-% it will be automatically removed the tracking. To get the tasks list, use the
-% all/0 function
+% This module is used to track the status of long running tasks.
+% Long running tasks register themselves, via a call to add_task/1, and then
+% update their status properties via update/1. The status of a task is a
+% list of properties. Each property is a tuple, with the first element being
+% either an atom or a binary and the second element must be an EJSON value. When
+% a task updates its status, it can override some or all of its properties.
+% The properties {started_on, UnitTimestamp}, {updated_on, UnixTimestamp} and
+% {pid, ErlangPid} are automatically added by this module.
+% When a tracked task dies, its status will be automatically removed from
+% memory. To get the tasks list, call the all/0 function.
-export([start_link/0, stop/0]).
--export([all/0, add_task/3, update/1, update/2, set_update_frequency/1]).
+-export([all/0, add_task/1, update/1, get/1, set_update_frequency/1]).
+-export([is_task_added/0]).
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
@@ -29,6 +35,7 @@
-include("couch_db.hrl").
+-define(set(L, K, V), lists:keystore(K, 1, L, {K, V})).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@@ -41,32 +48,41 @@ stop() ->
all() ->
gen_server:call(?MODULE, all).
-
-add_task(Type, TaskName, StatusText) ->
+add_task(Props) ->
put(task_status_update, {{0, 0, 0}, 0}),
- Msg = {
- add_task,
- to_binary(Type),
- to_binary(TaskName),
- to_binary(StatusText)
- },
- gen_server:call(?MODULE, Msg).
+ Ts = timestamp(),
+ TaskProps = lists:ukeysort(
+ 1, [{started_on, Ts}, {updated_on, Ts} | Props]),
+ put(task_status_props, TaskProps),
+ gen_server:call(?MODULE, {add_task, TaskProps}).
+is_task_added() ->
+ is_list(erlang:get(task_status_props)).
set_update_frequency(Msecs) ->
put(task_status_update, {{0, 0, 0}, Msecs * 1000}).
-update(StatusText) ->
- update("~s", [StatusText]).
+update(Props) ->
+ MergeProps = lists:ukeysort(1, Props),
+ TaskProps = lists:ukeymerge(1, MergeProps, erlang:get(task_status_props)),
+ put(task_status_props, TaskProps),
+ maybe_persist(TaskProps).
+
+get(Props) when is_list(Props) ->
+ TaskProps = erlang:get(task_status_props),
+ [couch_util:get_value(P, TaskProps) || P <- Props];
+get(Prop) ->
+ TaskProps = erlang:get(task_status_props),
+ couch_util:get_value(Prop, TaskProps).
-update(Format, Data) ->
- {LastUpdateTime, Frequency} = get(task_status_update),
+maybe_persist(TaskProps0) ->
+ {LastUpdateTime, Frequency} = erlang:get(task_status_update),
case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of
true ->
put(task_status_update, {Now, Frequency}),
- Msg = ?l2b(io_lib:format(Format, Data)),
- gen_server:cast(?MODULE, {update_status, self(), Msg});
+ TaskProps = ?set(TaskProps0, updated_on, timestamp(Now)),
+ gen_server:cast(?MODULE, {update_status, self(), TaskProps});
false ->
ok
end.
@@ -82,32 +98,29 @@ terminate(_Reason,_State) ->
ok.
-handle_call({add_task, Type, TaskName, StatusText}, {From, _}, Server) ->
+handle_call({add_task, TaskProps}, {From, _}, Server) ->
case ets:lookup(?MODULE, From) of
[] ->
- true = ets:insert(?MODULE, {From, {Type, TaskName, StatusText}}),
+ true = ets:insert(?MODULE, {From, TaskProps}),
erlang:monitor(process, From),
{reply, ok, Server};
[_] ->
{reply, {add_task_error, already_registered}, Server}
end;
handle_call(all, _, Server) ->
- All = [
- [
- {type, Type},
- {task, Task},
- {status, Status},
- {pid, ?l2b(pid_to_list(Pid))}
- ]
- ||
- {Pid, {Type, Task, Status}} <- ets:tab2list(?MODULE)
- ],
- {reply, All, Server}.
-
-
-handle_cast({update_status, Pid, StatusText}, Server) ->
- [{Pid, {Type, TaskName, _StatusText}}] = ets:lookup(?MODULE, Pid),
- true = ets:insert(?MODULE, {Pid, {Type, TaskName, StatusText}}),
+ All = ets:tab2list(?MODULE),
+ {reply, tasks_to_json(All), Server}.
+
+handle_cast({update_status, Pid, NewProps}, Server) ->
+ case ets:lookup(?MODULE, Pid) of
+ [{Pid, _CurProps}] ->
+ ?LOG_DEBUG("New task status for ~p: ~p", [Pid, NewProps]),
+ true = ets:insert(?MODULE, {Pid, NewProps});
+ _ ->
+ % Task finished/died in the meanwhile and we must have received
+ % a monitor message before this call - ignore.
+ ok
+ end,
{noreply, Server};
handle_cast(stop, State) ->
{stop, normal, State}.
@@ -121,3 +134,11 @@ handle_info({'DOWN', _MonitorRef, _Type, Pid, _Info}, Server) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+timestamp() ->
+ timestamp(now()).
+
+timestamp({Mega, Secs, _}) ->
+ Mega * 1000000 + Secs.
+
+tasks_to_json(Tasks) ->
+ [ [{pid, ?l2b(pid_to_list(Pid))} | Props] || {Pid, Props} <- Tasks].
diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl
index 8ea1dca2..1d789d35 100644
--- a/apps/couch/src/couch_view_compactor.erl
+++ b/apps/couch/src/couch_view_compactor.erl
@@ -49,7 +49,12 @@ compact_group(Group, EmptyGroup, DbName) ->
<<"_design", ShortName/binary>> = GroupId,
TaskName = <<DbName/binary, ShortName/binary>>,
- couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
+ couch_task_status:add_task([
+ {type, view_compaction},
+ {database, DbName},
+ {design_document, ShortName},
+ {progress, 0}
+ ]),
Fun = fun({DocId, _ViewIdKeys} = KV, {Bt, Acc, TotalCopied, LastId}) ->
if DocId =:= LastId -> % COUCHDB-999
@@ -59,8 +64,7 @@ compact_group(Group, EmptyGroup, DbName) ->
exit({view_duplicated_id, DocId});
true -> ok end,
if TotalCopied rem 10000 =:= 0 ->
- couch_task_status:update("Copied ~p of ~p Ids (~p%)",
- [TotalCopied, Count, (TotalCopied*100) div Count]),
+ couch_task_status:update([{changes_done, TotalCopied}, {progress, (TotalCopied * 100) div Count}]),
{ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
{ok, {Bt2, [], TotalCopied+1, DocId}};
true ->
@@ -108,8 +112,7 @@ compact_view(View, EmptyView) ->
%% Key is {Key,DocId}
Fun = fun(KV, {Bt, Acc, TotalCopied}) ->
if TotalCopied rem 10000 =:= 0 ->
- couch_task_status:update("View #~p: copied ~p of ~p KVs (~p%)",
- [View#view.id_num, TotalCopied, Count, (TotalCopied*100) div Count]),
+ couch_task_status:update([{changes_done, TotalCopied}, {progress, (TotalCopied * 100) div Count}]),
{ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])),
{ok, {Bt2, [], TotalCopied + 1}};
true ->
@@ -121,4 +124,3 @@ compact_view(View, EmptyView) ->
{EmptyView#view.btree, [], 0}),
{ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
EmptyView#view{btree = NewBt}.
-
diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl
index 8238e3e5..ca144149 100644
--- a/apps/couch/src/couch_view_updater.erl
+++ b/apps/couch/src/couch_view_updater.erl
@@ -32,17 +32,25 @@ update(Owner, Group, #db{name = DbName} = Db) ->
current_seq = Seq,
purge_seq = PurgeSeq
} = Group,
- couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),
+ % compute on all docs modified since we last computed.
+ TotalChanges = couch_db:count_changes_since(Db, Seq),
+ couch_task_status:add_task([
+ {type, indexer},
+ {database, DbName},
+ {design_document, GroupName},
+ {progress, 0},
+ {changes_done, 0},
+ {total_changes, TotalChanges}
+ ]),
+ couch_task_status:set_update_frequency(500),
DbPurgeSeq = couch_db:get_purge_seq(Db),
Group2 =
if DbPurgeSeq == PurgeSeq ->
Group;
DbPurgeSeq == PurgeSeq + 1 ->
- couch_task_status:update(<<"Removing purged entries from view index.">>),
purge_index(Group, Db);
true ->
- couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
exit(reset)
end,
{ok, MapQueue} = couch_work_queue:new(
@@ -53,10 +61,6 @@ update(Owner, Group, #db{name = DbName} = Db) ->
ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
spawn_link(?MODULE, do_maps, [Group, MapQueue, WriteQueue, ViewEmptyKVs]),
spawn_link(?MODULE, do_writes, [Self, Owner, Group2, WriteQueue, Seq == 0]),
- % compute on all docs modified since we last computed.
- TotalChanges = couch_db:count_changes_since(Db, Seq),
- % update status every half second
- couch_task_status:set_update_frequency(500),
#group{ design_options = DesignOptions } = Group,
IncludeDesign = couch_util:get_value(<<"include_design">>,
DesignOptions, false),
@@ -69,8 +73,6 @@ update(Owner, Group, #db{name = DbName} = Db) ->
EnumFun = fun ?MODULE:load_docs/3,
Acc0 = {0, Db, MapQueue, DocOpts, IncludeDesign, TotalChanges},
{ok, _, _} = couch_db:enum_docs_since(Db, Seq, EnumFun, Acc0, []),
- couch_task_status:set_update_frequency(0),
- couch_task_status:update("Finishing."),
couch_work_queue:close(MapQueue),
receive {new_group, NewGroup} ->
exit({new_group,
@@ -78,8 +80,7 @@ update(Owner, Group, #db{name = DbName} = Db) ->
end.
load_docs(DocInfo, _, {I, Db, MapQueue, DocOpts, IncludeDesign, Total} = Acc) ->
- couch_task_status:update("Processed ~p of ~p changes (~p%)", [I, Total,
- (I*100) div Total]),
+ couch_task_status:update([{changes_done, I}, {progress, (I * 100) div Total}]),
load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign),
{ok, setelement(1, Acc, I+1)}.
diff --git a/apps/couch/test/etap/090-task-status.t b/apps/couch/test/etap/090-task-status.t
index b278de7f..34855834 100755
--- a/apps/couch/test/etap/090-task-status.t
+++ b/apps/couch/test/etap/090-task-status.t
@@ -15,7 +15,7 @@
main(_) ->
test_util:init_code_path(),
- etap:plan(16),
+ etap:plan(28),
case (catch test()) of
ok ->
etap:end_tests();
@@ -25,7 +25,7 @@ main(_) ->
end,
ok.
-check_status(Pid,ListPropLists) ->
+get_task_prop(Pid, Prop) ->
From = list_to_binary(pid_to_list(Pid)),
Element = lists:foldl(
fun(PropList,Acc) ->
@@ -36,18 +36,28 @@ check_status(Pid,ListPropLists) ->
[]
end
end,
- [], ListPropLists
+ [], couch_task_status:all()
),
- couch_util:get_value(status,hd(Element)).
+ case couch_util:get_value(Prop, hd(Element), nil) of
+ nil ->
+ etap:bail("Could not get property '" ++ couch_util:to_list(Prop) ++
+ "' for task " ++ pid_to_list(Pid));
+ Value ->
+ Value
+ end.
+
+now_ts() ->
+ {Mega, Secs, _} = erlang:now(),
+ Mega * 1000000 + Secs.
loop() ->
receive
- {add, From} ->
- Resp = couch_task_status:add_task("type", "task", "init"),
+ {add, Props, From} ->
+ Resp = couch_task_status:add_task(Props),
From ! {ok, self(), Resp},
loop();
- {update, Status, From} ->
- Resp = couch_task_status:update(Status),
+ {update, Props, From} ->
+ Resp = couch_task_status:update(Props),
From ! {ok, self(), Resp},
loop();
{update_frequency, Msecs, From} ->
@@ -82,96 +92,159 @@ test() ->
Pid2 = spawn(TaskUpdater),
Pid3 = spawn(TaskUpdater),
- ok = call(Pid1, add),
+ ok = call(Pid1, add, [{type, replication}, {progress, 0}]),
etap:is(
length(couch_task_status:all()),
1,
"Started a task"
),
+ Task1StartTime = get_task_prop(Pid1, started_on),
+ etap:is(
+ is_integer(Task1StartTime),
+ true,
+ "Task start time is defined."
+ ),
+ etap:is(
+ get_task_prop(Pid1, updated_on),
+ Task1StartTime,
+ "Task's start time is the same as the update time before an update."
+ ),
etap:is(
- call(Pid1, add),
+ call(Pid1, add, [{type, compaction}, {progress, 0}]),
{add_task_error, already_registered},
"Unable to register multiple tasks for a single Pid."
),
etap:is(
- check_status(Pid1, couch_task_status:all()),
- <<"init">>,
- "Task status was set to 'init'."
+ get_task_prop(Pid1, type),
+ replication,
+ "Task type is 'replication'."
),
-
- call(Pid1,update,"running"),
etap:is(
- check_status(Pid1,couch_task_status:all()),
- <<"running">>,
- "Status updated to 'running'."
+ get_task_prop(Pid1, progress),
+ 0,
+ "Task progress is 0."
),
+ ok = timer:sleep(1000),
+ call(Pid1, update, [{progress, 25}]),
+ etap:is(
+ get_task_prop(Pid1, progress),
+ 25,
+ "Task progress is 25."
+ ),
+ etap:is(
+ get_task_prop(Pid1, updated_on) > Task1StartTime,
+ true,
+ "Task's last update time has increased after an update."
+ ),
- call(Pid2,add),
+ call(Pid2, add, [{type, compaction}, {progress, 0}]),
etap:is(
length(couch_task_status:all()),
2,
"Started a second task."
),
-
+ Task2StartTime = get_task_prop(Pid2, started_on),
+ etap:is(
+ is_integer(Task2StartTime),
+ true,
+ "Second task's start time is defined."
+ ),
etap:is(
- check_status(Pid2, couch_task_status:all()),
- <<"init">>,
- "Second tasks's status was set to 'init'."
+ get_task_prop(Pid2, updated_on),
+ Task2StartTime,
+ "Second task's start time is the same as the update time before an update."
),
- call(Pid2, update, "running"),
etap:is(
- check_status(Pid2, couch_task_status:all()),
- <<"running">>,
- "Second task's status updated to 'running'."
+ get_task_prop(Pid2, type),
+ compaction,
+ "Second task's type is 'compaction'."
+ ),
+ etap:is(
+ get_task_prop(Pid2, progress),
+ 0,
+ "Second task's progress is 0."
),
+ ok = timer:sleep(1000),
+ call(Pid2, update, [{progress, 33}]),
+ etap:is(
+ get_task_prop(Pid2, progress),
+ 33,
+ "Second task's progress updated to 33."
+ ),
+ etap:is(
+ get_task_prop(Pid2, updated_on) > Task2StartTime,
+ true,
+ "Second task's last update time has increased after an update."
+ ),
- call(Pid3, add),
+ call(Pid3, add, [{type, indexer}, {progress, 0}]),
etap:is(
length(couch_task_status:all()),
3,
"Registered a third task."
),
-
+ Task3StartTime = get_task_prop(Pid3, started_on),
+ etap:is(
+ is_integer(Task3StartTime),
+ true,
+ "Third task's start time is defined."
+ ),
etap:is(
- check_status(Pid3, couch_task_status:all()),
- <<"init">>,
- "Third tasks's status was set to 'init'."
+ get_task_prop(Pid3, updated_on),
+ Task3StartTime,
+ "Third task's start time is the same as the update time before an update."
),
- call(Pid3, update, "running"),
etap:is(
- check_status(Pid3, couch_task_status:all()),
- <<"running">>,
- "Third task's status updated to 'running'."
+ get_task_prop(Pid3, type),
+ indexer,
+ "Third task's type is 'indexer'."
+ ),
+ etap:is(
+ get_task_prop(Pid3, progress),
+ 0,
+ "Third task's progress is 0."
),
+ ok = timer:sleep(1000),
+ call(Pid3, update, [{progress, 50}]),
+ etap:is(
+ get_task_prop(Pid3, progress),
+ 50,
+ "Third task's progress updated to 50."
+ ),
+ etap:is(
+ get_task_prop(Pid3, updated_on) > Task3StartTime,
+ true,
+ "Third task's last update time has increased after an update."
+ ),
call(Pid3, update_frequency, 500),
- call(Pid3, update, "still running"),
+ call(Pid3, update, [{progress, 66}]),
etap:is(
- check_status(Pid3, couch_task_status:all()),
- <<"still running">>,
- "Third task's status updated to 'still running'."
+ get_task_prop(Pid3, progress),
+ 66,
+ "Third task's progress updated to 66."
),
- call(Pid3, update, "skip this update"),
+ call(Pid3, update, [{progress, 67}]),
etap:is(
- check_status(Pid3, couch_task_status:all()),
- <<"still running">>,
- "Status update dropped because of frequency limit."
+ get_task_prop(Pid3, progress),
+ 66,
+ "Task update dropped because of frequency limit."
),
call(Pid3, update_frequency, 0),
- call(Pid3, update, "don't skip"),
+ call(Pid3, update, [{progress, 77}]),
etap:is(
- check_status(Pid3, couch_task_status:all()),
- <<"don't skip">>,
- "Status updated after reseting frequency limit."
+ get_task_prop(Pid3, progress),
+ 77,
+ "Task updated after reseting frequency limit."
),