diff options
Diffstat (limited to 'apps')
-rw-r--r-- | apps/couch/src/couch_db_updater.erl | 35 | ||||
-rw-r--r-- | apps/couch/src/couch_rep.erl | 32 | ||||
-rw-r--r-- | apps/couch/src/couch_task_status.erl | 99 | ||||
-rw-r--r-- | apps/couch/src/couch_view_compactor.erl | 14 | ||||
-rw-r--r-- | apps/couch/src/couch_view_updater.erl | 23 | ||||
-rwxr-xr-x | apps/couch/test/etap/090-task-status.t | 169 |
6 files changed, 254 insertions, 118 deletions
diff --git a/apps/couch/src/couch_db_updater.erl b/apps/couch/src/couch_db_updater.erl index 1f25186a..69580602 100644 --- a/apps/couch/src/couch_db_updater.erl +++ b/apps/couch/src/couch_db_updater.erl @@ -872,6 +872,7 @@ copy_docs(Db, #db{fd=DestFd}=NewDb, MixedInfos, Retry) -> NewDb#db.seq_tree, NewInfos, RemoveSeqs), {ok, IdTree} = couch_btree:add_remove( NewDb#db.id_tree, NewInfos, []), + update_compact_task(length(NewInfos)), NewDb#db{id_tree=IdTree, seq_tree=SeqTree}. @@ -903,15 +904,31 @@ copy_compact(Db, NewDb0, Retry) -> end end, - couch_task_status:set_update_frequency(500), + TaskProps0 = [ + {type, database_compaction}, + {database, Db#db.name}, + {progress, 0}, + {changes_done, 0}, + {total_changes, TotalChanges} + ], + case Retry and couch_task_status:is_task_added() of + true -> + couch_task_status:update([ + {retry, true}, + {progress, 0}, + {changes_done, 0}, + {total_changes, TotalChanges} + ]); + false -> + couch_task_status:add_task(TaskProps0), + couch_task_status:set_update_frequency(500) + end, {ok, _, {NewDb2, Uncopied, TotalChanges}} = couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun, {NewDb, [], 0}, [{start_key, NewDb#db.update_seq + 1}]), - couch_task_status:update("Flushing"), - NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), % copy misc header values @@ -929,7 +946,6 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of {ok, Fd} -> - couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>), Retry = true, case couch_file:read_header(Fd) of {ok, Header} -> @@ -938,7 +954,6 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P ok = couch_file:write_header(Fd, Header=#db_header{}) end; {error, enoent} -> - couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), Retry = false, ok = couch_file:write_header(Fd, Header=#db_header{}) @@ -957,3 +972,13 @@ start_copy_compact(#db{name=Name,filepath=Filepath,header=#db_header{purge_seq=P close_db(NewDb3), gen_server:cast(Db#db.main_pid, {compact_done, CompactFile}). +update_compact_task(NumChanges) -> + [Changes, Total] = couch_task_status:get([changes_done, total_changes]), + Changes2 = Changes + NumChanges, + Progress = case Total of + 0 -> + 0; + _ -> + (Changes2 * 100) div Total + end, + couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]). diff --git a/apps/couch/src/couch_rep.erl b/apps/couch/src/couch_rep.erl index 2bdb66d1..80720375 100644 --- a/apps/couch/src/couch_rep.erl +++ b/apps/couch/src/couch_rep.erl @@ -172,9 +172,18 @@ do_init([{BaseId, _Ext} = RepId, {PostProps}, UserCtx] = InitArgs) -> ets:insert(Stats, {docs_written, 0}), ets:insert(Stats, {doc_write_failures, 0}), - {ShortId, _} = lists:split(6, BaseId), - couch_task_status:add_task("Replication", io_lib:format("~s: ~s -> ~s", - [ShortId, dbname(Source), dbname(Target)]), "Starting"), + couch_task_status:add_task([ + {user, UserCtx#user_ctx.name}, + {type, replication}, + {replication_id, ?l2b(RepId)}, + {source, dbname(Source)}, + {target, dbname(Target)}, + {continuous, Continuous}, + {docs_read, 0}, + {docs_written, 0}, + {doc_write_failures, 0} + ]), + couch_task_status:set_update_frequency(1000), State = #state{ changes_feed = ChangesFeed, @@ -230,15 +239,16 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({missing_revs_checkpoint, SourceSeq}, State) -> - couch_task_status:update("MR Processed source update #~p", [SourceSeq]), - {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; - + NewState = schedule_checkpoint(State#state{committed_seq = SourceSeq}), + update_task(NewState), + {noreply, NewState}; handle_info({writer_checkpoint, SourceSeq}, #state{committed_seq=N} = State) when SourceSeq > N -> MissingRevs = State#state.missing_revs, ok = gen_server:cast(MissingRevs, {update_committed_seq, SourceSeq}), - couch_task_status:update("W Processed source update #~p", [SourceSeq]), - {noreply, schedule_checkpoint(State#state{committed_seq = SourceSeq})}; + NewState = schedule_checkpoint(State#state{committed_seq = SourceSeq}), + update_task(NewState), + {noreply, NewState}; handle_info({writer_checkpoint, _}, State) -> {noreply, State}; @@ -430,7 +440,6 @@ do_terminate(State) -> false -> [gen_server:reply(R, retry) || R <- OtherListeners] end, - couch_task_status:update("Finishing"), terminate_cleanup(State). terminate_cleanup(State) -> @@ -916,3 +925,8 @@ target_db_update_notifier(#db{name = DbName}) -> Notifier; target_db_update_notifier(_) -> nil. + +update_task(#state{stats=Stats}) -> + Update = [ {Stat, ets:lookup_element(Stats, Stat, 2)} || Stat <- + [total_revs, missing_revs, docs_read, docs_written, doc_write_failures]], + couch_task_status:update(Update). diff --git a/apps/couch/src/couch_task_status.erl b/apps/couch/src/couch_task_status.erl index 639515c7..62327e27 100644 --- a/apps/couch/src/couch_task_status.erl +++ b/apps/couch/src/couch_task_status.erl @@ -13,14 +13,20 @@ -module(couch_task_status). -behaviour(gen_server). -% This module allows is used to track the status of long running tasks. -% Long running tasks register (add_task/3) then update their status (update/1) -% and the task and status is added to tasks list. When the tracked task dies -% it will be automatically removed the tracking. To get the tasks list, use the -% all/0 function +% This module is used to track the status of long running tasks. +% Long running tasks register themselves, via a call to add_task/1, and then +% update their status properties via update/1. The status of a task is a +% list of properties. Each property is a tuple, with the first element being +% either an atom or a binary and the second element must be an EJSON value. When +% a task updates its status, it can override some or all of its properties. +% The properties {started_on, UnitTimestamp}, {updated_on, UnixTimestamp} and +% {pid, ErlangPid} are automatically added by this module. +% When a tracked task dies, its status will be automatically removed from +% memory. To get the tasks list, call the all/0 function. -export([start_link/0, stop/0]). --export([all/0, add_task/3, update/1, update/2, set_update_frequency/1]). +-export([all/0, add_task/1, update/1, get/1, set_update_frequency/1]). +-export([is_task_added/0]). -export([init/1, terminate/2, code_change/3]). -export([handle_call/3, handle_cast/2, handle_info/2]). @@ -29,6 +35,7 @@ -include("couch_db.hrl"). +-define(set(L, K, V), lists:keystore(K, 1, L, {K, V})). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -41,32 +48,41 @@ stop() -> all() -> gen_server:call(?MODULE, all). - -add_task(Type, TaskName, StatusText) -> +add_task(Props) -> put(task_status_update, {{0, 0, 0}, 0}), - Msg = { - add_task, - to_binary(Type), - to_binary(TaskName), - to_binary(StatusText) - }, - gen_server:call(?MODULE, Msg). + Ts = timestamp(), + TaskProps = lists:ukeysort( + 1, [{started_on, Ts}, {updated_on, Ts} | Props]), + put(task_status_props, TaskProps), + gen_server:call(?MODULE, {add_task, TaskProps}). +is_task_added() -> + is_list(erlang:get(task_status_props)). set_update_frequency(Msecs) -> put(task_status_update, {{0, 0, 0}, Msecs * 1000}). -update(StatusText) -> - update("~s", [StatusText]). +update(Props) -> + MergeProps = lists:ukeysort(1, Props), + TaskProps = lists:ukeymerge(1, MergeProps, erlang:get(task_status_props)), + put(task_status_props, TaskProps), + maybe_persist(TaskProps). + +get(Props) when is_list(Props) -> + TaskProps = erlang:get(task_status_props), + [couch_util:get_value(P, TaskProps) || P <- Props]; +get(Prop) -> + TaskProps = erlang:get(task_status_props), + couch_util:get_value(Prop, TaskProps). -update(Format, Data) -> - {LastUpdateTime, Frequency} = get(task_status_update), +maybe_persist(TaskProps0) -> + {LastUpdateTime, Frequency} = erlang:get(task_status_update), case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of true -> put(task_status_update, {Now, Frequency}), - Msg = ?l2b(io_lib:format(Format, Data)), - gen_server:cast(?MODULE, {update_status, self(), Msg}); + TaskProps = ?set(TaskProps0, updated_on, timestamp(Now)), + gen_server:cast(?MODULE, {update_status, self(), TaskProps}); false -> ok end. @@ -82,32 +98,29 @@ terminate(_Reason,_State) -> ok. -handle_call({add_task, Type, TaskName, StatusText}, {From, _}, Server) -> +handle_call({add_task, TaskProps}, {From, _}, Server) -> case ets:lookup(?MODULE, From) of [] -> - true = ets:insert(?MODULE, {From, {Type, TaskName, StatusText}}), + true = ets:insert(?MODULE, {From, TaskProps}), erlang:monitor(process, From), {reply, ok, Server}; [_] -> {reply, {add_task_error, already_registered}, Server} end; handle_call(all, _, Server) -> - All = [ - [ - {type, Type}, - {task, Task}, - {status, Status}, - {pid, ?l2b(pid_to_list(Pid))} - ] - || - {Pid, {Type, Task, Status}} <- ets:tab2list(?MODULE) - ], - {reply, All, Server}. - - -handle_cast({update_status, Pid, StatusText}, Server) -> - [{Pid, {Type, TaskName, _StatusText}}] = ets:lookup(?MODULE, Pid), - true = ets:insert(?MODULE, {Pid, {Type, TaskName, StatusText}}), + All = ets:tab2list(?MODULE), + {reply, tasks_to_json(All), Server}. + +handle_cast({update_status, Pid, NewProps}, Server) -> + case ets:lookup(?MODULE, Pid) of + [{Pid, _CurProps}] -> + ?LOG_DEBUG("New task status for ~p: ~p", [Pid, NewProps]), + true = ets:insert(?MODULE, {Pid, NewProps}); + _ -> + % Task finished/died in the meanwhile and we must have received + % a monitor message before this call - ignore. + ok + end, {noreply, Server}; handle_cast(stop, State) -> {stop, normal, State}. @@ -121,3 +134,11 @@ handle_info({'DOWN', _MonitorRef, _Type, Pid, _Info}, Server) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +timestamp() -> + timestamp(now()). + +timestamp({Mega, Secs, _}) -> + Mega * 1000000 + Secs. + +tasks_to_json(Tasks) -> + [ [{pid, ?l2b(pid_to_list(Pid))} | Props] || {Pid, Props} <- Tasks]. diff --git a/apps/couch/src/couch_view_compactor.erl b/apps/couch/src/couch_view_compactor.erl index 8ea1dca2..1d789d35 100644 --- a/apps/couch/src/couch_view_compactor.erl +++ b/apps/couch/src/couch_view_compactor.erl @@ -49,7 +49,12 @@ compact_group(Group, EmptyGroup, DbName) -> <<"_design", ShortName/binary>> = GroupId, TaskName = <<DbName/binary, ShortName/binary>>, - couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>), + couch_task_status:add_task([ + {type, view_compaction}, + {database, DbName}, + {design_document, ShortName}, + {progress, 0} + ]), Fun = fun({DocId, _ViewIdKeys} = KV, {Bt, Acc, TotalCopied, LastId}) -> if DocId =:= LastId -> % COUCHDB-999 @@ -59,8 +64,7 @@ compact_group(Group, EmptyGroup, DbName) -> exit({view_duplicated_id, DocId}); true -> ok end, if TotalCopied rem 10000 =:= 0 -> - couch_task_status:update("Copied ~p of ~p Ids (~p%)", - [TotalCopied, Count, (TotalCopied*100) div Count]), + couch_task_status:update([{changes_done, TotalCopied}, {progress, (TotalCopied * 100) div Count}]), {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])), {ok, {Bt2, [], TotalCopied+1, DocId}}; true -> @@ -108,8 +112,7 @@ compact_view(View, EmptyView) -> %% Key is {Key,DocId} Fun = fun(KV, {Bt, Acc, TotalCopied}) -> if TotalCopied rem 10000 =:= 0 -> - couch_task_status:update("View #~p: copied ~p of ~p KVs (~p%)", - [View#view.id_num, TotalCopied, Count, (TotalCopied*100) div Count]), + couch_task_status:update([{changes_done, TotalCopied}, {progress, (TotalCopied * 100) div Count}]), {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV|Acc])), {ok, {Bt2, [], TotalCopied + 1}}; true -> @@ -121,4 +124,3 @@ compact_view(View, EmptyView) -> {EmptyView#view.btree, [], 0}), {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)), EmptyView#view{btree = NewBt}. - diff --git a/apps/couch/src/couch_view_updater.erl b/apps/couch/src/couch_view_updater.erl index 8238e3e5..ca144149 100644 --- a/apps/couch/src/couch_view_updater.erl +++ b/apps/couch/src/couch_view_updater.erl @@ -32,17 +32,25 @@ update(Owner, Group, #db{name = DbName} = Db) -> current_seq = Seq, purge_seq = PurgeSeq } = Group, - couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>), + % compute on all docs modified since we last computed. + TotalChanges = couch_db:count_changes_since(Db, Seq), + couch_task_status:add_task([ + {type, indexer}, + {database, DbName}, + {design_document, GroupName}, + {progress, 0}, + {changes_done, 0}, + {total_changes, TotalChanges} + ]), + couch_task_status:set_update_frequency(500), DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = if DbPurgeSeq == PurgeSeq -> Group; DbPurgeSeq == PurgeSeq + 1 -> - couch_task_status:update(<<"Removing purged entries from view index.">>), purge_index(Group, Db); true -> - couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), exit(reset) end, {ok, MapQueue} = couch_work_queue:new( @@ -53,10 +61,6 @@ update(Owner, Group, #db{name = DbName} = Db) -> ViewEmptyKVs = [{View, []} || View <- Group2#group.views], spawn_link(?MODULE, do_maps, [Group, MapQueue, WriteQueue, ViewEmptyKVs]), spawn_link(?MODULE, do_writes, [Self, Owner, Group2, WriteQueue, Seq == 0]), - % compute on all docs modified since we last computed. - TotalChanges = couch_db:count_changes_since(Db, Seq), - % update status every half second - couch_task_status:set_update_frequency(500), #group{ design_options = DesignOptions } = Group, IncludeDesign = couch_util:get_value(<<"include_design">>, DesignOptions, false), @@ -69,8 +73,6 @@ update(Owner, Group, #db{name = DbName} = Db) -> EnumFun = fun ?MODULE:load_docs/3, Acc0 = {0, Db, MapQueue, DocOpts, IncludeDesign, TotalChanges}, {ok, _, _} = couch_db:enum_docs_since(Db, Seq, EnumFun, Acc0, []), - couch_task_status:set_update_frequency(0), - couch_task_status:update("Finishing."), couch_work_queue:close(MapQueue), receive {new_group, NewGroup} -> exit({new_group, @@ -78,8 +80,7 @@ update(Owner, Group, #db{name = DbName} = Db) -> end. load_docs(DocInfo, _, {I, Db, MapQueue, DocOpts, IncludeDesign, Total} = Acc) -> - couch_task_status:update("Processed ~p of ~p changes (~p%)", [I, Total, - (I*100) div Total]), + couch_task_status:update([{changes_done, I}, {progress, (I * 100) div Total}]), load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign), {ok, setelement(1, Acc, I+1)}. diff --git a/apps/couch/test/etap/090-task-status.t b/apps/couch/test/etap/090-task-status.t index b278de7f..34855834 100755 --- a/apps/couch/test/etap/090-task-status.t +++ b/apps/couch/test/etap/090-task-status.t @@ -15,7 +15,7 @@ main(_) -> test_util:init_code_path(), - etap:plan(16), + etap:plan(28), case (catch test()) of ok -> etap:end_tests(); @@ -25,7 +25,7 @@ main(_) -> end, ok. -check_status(Pid,ListPropLists) -> +get_task_prop(Pid, Prop) -> From = list_to_binary(pid_to_list(Pid)), Element = lists:foldl( fun(PropList,Acc) -> @@ -36,18 +36,28 @@ check_status(Pid,ListPropLists) -> [] end end, - [], ListPropLists + [], couch_task_status:all() ), - couch_util:get_value(status,hd(Element)). + case couch_util:get_value(Prop, hd(Element), nil) of + nil -> + etap:bail("Could not get property '" ++ couch_util:to_list(Prop) ++ + "' for task " ++ pid_to_list(Pid)); + Value -> + Value + end. + +now_ts() -> + {Mega, Secs, _} = erlang:now(), + Mega * 1000000 + Secs. loop() -> receive - {add, From} -> - Resp = couch_task_status:add_task("type", "task", "init"), + {add, Props, From} -> + Resp = couch_task_status:add_task(Props), From ! {ok, self(), Resp}, loop(); - {update, Status, From} -> - Resp = couch_task_status:update(Status), + {update, Props, From} -> + Resp = couch_task_status:update(Props), From ! {ok, self(), Resp}, loop(); {update_frequency, Msecs, From} -> @@ -82,96 +92,159 @@ test() -> Pid2 = spawn(TaskUpdater), Pid3 = spawn(TaskUpdater), - ok = call(Pid1, add), + ok = call(Pid1, add, [{type, replication}, {progress, 0}]), etap:is( length(couch_task_status:all()), 1, "Started a task" ), + Task1StartTime = get_task_prop(Pid1, started_on), + etap:is( + is_integer(Task1StartTime), + true, + "Task start time is defined." + ), + etap:is( + get_task_prop(Pid1, updated_on), + Task1StartTime, + "Task's start time is the same as the update time before an update." + ), etap:is( - call(Pid1, add), + call(Pid1, add, [{type, compaction}, {progress, 0}]), {add_task_error, already_registered}, "Unable to register multiple tasks for a single Pid." ), etap:is( - check_status(Pid1, couch_task_status:all()), - <<"init">>, - "Task status was set to 'init'." + get_task_prop(Pid1, type), + replication, + "Task type is 'replication'." ), - - call(Pid1,update,"running"), etap:is( - check_status(Pid1,couch_task_status:all()), - <<"running">>, - "Status updated to 'running'." + get_task_prop(Pid1, progress), + 0, + "Task progress is 0." ), + ok = timer:sleep(1000), + call(Pid1, update, [{progress, 25}]), + etap:is( + get_task_prop(Pid1, progress), + 25, + "Task progress is 25." + ), + etap:is( + get_task_prop(Pid1, updated_on) > Task1StartTime, + true, + "Task's last update time has increased after an update." + ), - call(Pid2,add), + call(Pid2, add, [{type, compaction}, {progress, 0}]), etap:is( length(couch_task_status:all()), 2, "Started a second task." ), - + Task2StartTime = get_task_prop(Pid2, started_on), + etap:is( + is_integer(Task2StartTime), + true, + "Second task's start time is defined." + ), etap:is( - check_status(Pid2, couch_task_status:all()), - <<"init">>, - "Second tasks's status was set to 'init'." + get_task_prop(Pid2, updated_on), + Task2StartTime, + "Second task's start time is the same as the update time before an update." ), - call(Pid2, update, "running"), etap:is( - check_status(Pid2, couch_task_status:all()), - <<"running">>, - "Second task's status updated to 'running'." + get_task_prop(Pid2, type), + compaction, + "Second task's type is 'compaction'." + ), + etap:is( + get_task_prop(Pid2, progress), + 0, + "Second task's progress is 0." ), + ok = timer:sleep(1000), + call(Pid2, update, [{progress, 33}]), + etap:is( + get_task_prop(Pid2, progress), + 33, + "Second task's progress updated to 33." + ), + etap:is( + get_task_prop(Pid2, updated_on) > Task2StartTime, + true, + "Second task's last update time has increased after an update." + ), - call(Pid3, add), + call(Pid3, add, [{type, indexer}, {progress, 0}]), etap:is( length(couch_task_status:all()), 3, "Registered a third task." ), - + Task3StartTime = get_task_prop(Pid3, started_on), + etap:is( + is_integer(Task3StartTime), + true, + "Third task's start time is defined." + ), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"init">>, - "Third tasks's status was set to 'init'." + get_task_prop(Pid3, updated_on), + Task3StartTime, + "Third task's start time is the same as the update time before an update." ), - call(Pid3, update, "running"), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"running">>, - "Third task's status updated to 'running'." + get_task_prop(Pid3, type), + indexer, + "Third task's type is 'indexer'." + ), + etap:is( + get_task_prop(Pid3, progress), + 0, + "Third task's progress is 0." ), + ok = timer:sleep(1000), + call(Pid3, update, [{progress, 50}]), + etap:is( + get_task_prop(Pid3, progress), + 50, + "Third task's progress updated to 50." + ), + etap:is( + get_task_prop(Pid3, updated_on) > Task3StartTime, + true, + "Third task's last update time has increased after an update." + ), call(Pid3, update_frequency, 500), - call(Pid3, update, "still running"), + call(Pid3, update, [{progress, 66}]), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"still running">>, - "Third task's status updated to 'still running'." + get_task_prop(Pid3, progress), + 66, + "Third task's progress updated to 66." ), - call(Pid3, update, "skip this update"), + call(Pid3, update, [{progress, 67}]), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"still running">>, - "Status update dropped because of frequency limit." + get_task_prop(Pid3, progress), + 66, + "Task update dropped because of frequency limit." ), call(Pid3, update_frequency, 0), - call(Pid3, update, "don't skip"), + call(Pid3, update, [{progress, 77}]), etap:is( - check_status(Pid3, couch_task_status:all()), - <<"don't skip">>, - "Status updated after reseting frequency limit." + get_task_prop(Pid3, progress), + 77, + "Task updated after reseting frequency limit." ), |