diff options
author | Damien F. Katz <damien@apache.org> | 2009-01-23 04:15:47 +0000 |
---|---|---|
committer | Damien F. Katz <damien@apache.org> | 2009-01-23 04:15:47 +0000 |
commit | 5dd96d8ee5f89524fa0be0f087a21f81b6b68ec4 (patch) | |
tree | c4931d99e50a77aebe695279bc8567e1eee7499c /src | |
parent | 0a46c330072a3811d98a5c989d4c6486cff83df2 (diff) |
Added task status checking, to help debug the progress of long running tasks, like view indexing and compaction.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@736906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r-- | src/couchdb/Makefile.am | 2 | ||||
-rw-r--r-- | src/couchdb/couch_btree.erl | 2 | ||||
-rw-r--r-- | src/couchdb/couch_db.erl | 14 | ||||
-rw-r--r-- | src/couchdb/couch_db_updater.erl | 32 | ||||
-rw-r--r-- | src/couchdb/couch_httpd_misc_handlers.erl | 13 | ||||
-rw-r--r-- | src/couchdb/couch_server_sup.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_stream.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_task_status.erl | 97 | ||||
-rw-r--r-- | src/couchdb/couch_view_group.erl | 6 | ||||
-rw-r--r-- | src/couchdb/couch_view_updater.erl | 32 |
10 files changed, 180 insertions, 30 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am index 8e280c70..1ad5d14a 100644 --- a/src/couchdb/Makefile.am +++ b/src/couchdb/Makefile.am @@ -66,6 +66,7 @@ source_files = \ couch_server.erl \ couch_server_sup.erl \ couch_stream.erl \ + couch_task_status.erl \ couch_util.erl \ couch_view.erl \ couch_view_updater.erl \ @@ -102,6 +103,7 @@ compiled_files = \ couch_server.beam \ couch_server_sup.beam \ couch_stream.beam \ + couch_task_status.beam \ couch_util.beam \ couch_view.beam \ couch_view_updater.beam \ diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl index 14bc4a1f..f993a0a1 100644 --- a/src/couchdb/couch_btree.erl +++ b/src/couchdb/couch_btree.erl @@ -14,7 +14,7 @@ -export([open/2, open/3, query_modify/4, add/2, add_remove/3, foldl/3, foldl/4]). -export([foldr/3, foldr/4, fold/4, fold/5, full_reduce/1, final_reduce/2]). --export([fold_reduce/7, lookup/2, get_state/1, set_options/2]). +-export([fold_reduce/6, fold_reduce/7, lookup/2, get_state/1, set_options/2]). -export([test/1, test/0, test_remove/2, test_add/2]). -define(CHUNK_THRESHOLD, 16#4ff). diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl index b3102044..c35bb913 100644 --- a/src/couchdb/couch_db.erl +++ b/src/couchdb/couch_db.erl @@ -14,7 +14,7 @@ -behaviour(gen_server). -export([open/2,close/1,create/2,start_compact/1,get_db_info/1]). --export([open_ref_counted/2,num_refs/1,monitor/1]). +-export([open_ref_counted/2,num_refs/1,monitor/1,count_changes_since/2]). -export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]). -export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]). -export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]). @@ -468,6 +468,18 @@ enum_docs_reduce_to_count(Reds) -> fun couch_db_updater:btree_by_id_reduce/2, Reds), Count. +count_changes_since(Db, SinceSeq) -> + {ok, Changes} = + couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree, + SinceSeq + 1, % startkey + ok, % endkey + fun(_,_) -> true end, % groupkeys + fun(_SeqStart, PartialReds, ok) -> + {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)} + end, + ok), + Changes. + enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) -> couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx). diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl index a6fd0cc6..800730d8 100644 --- a/src/couchdb/couch_db_updater.erl +++ b/src/couchdb/couch_db_updater.erl @@ -590,19 +590,30 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info copy_compact(Db, NewDb, Retry) -> + TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), EnumBySeqFun = - fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) -> - case couch_util:should_flush() of - true -> + fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) -> + couch_task_status:update("Copied ~p of ~p changes (~p%)", + [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]), + if TotalCopied rem 1000 == 0 -> NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), - {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}}; - false -> - {ok, {AccNewDb, [DocInfo | AccUncopied]}} + if TotalCopied rem 10000 == 0 -> + {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}}; + true -> + {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}} + end; + true -> + {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}} end end, - {ok, {NewDb2, Uncopied}} = - couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}), - + + couch_task_status:set_update_frequency(500), + + {ok, {NewDb2, Uncopied, TotalChanges}} = + couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, [], 0}), + + couch_task_status:update("Flushing"), + NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), % copy misc header values @@ -620,10 +631,11 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) -> ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]), case couch_file:open(CompactFile) of {ok, Fd} -> - ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]), + couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>), Retry = true, {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG); {error, enoent} -> + couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>), {ok, Fd} = couch_file:open(CompactFile, [create]), Retry = false, ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{}) diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl index 31e821ce..e30f7594 100644 --- a/src/couchdb/couch_httpd_misc_handlers.erl +++ b/src/couchdb/couch_httpd_misc_handlers.erl @@ -14,7 +14,8 @@ -export([handle_welcome_req/2,handle_favicon_req/2,handle_utils_dir_req/2, handle_all_dbs_req/1,handle_replicate_req/1,handle_restart_req/1, - handle_uuids_req/1,handle_config_req/1,handle_stats_req/1]). + handle_uuids_req/1,handle_config_req/1,handle_stats_req/1, + handle_task_status_req/1]). -export([increment_update_seq_req/2]). @@ -61,12 +62,22 @@ handle_all_dbs_req(#httpd{method='GET'}=Req) -> handle_all_dbs_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). + handle_stats_req(#httpd{method='GET'}=Req) -> ok = couch_httpd:verify_is_server_admin(Req), send_json(Req, {couch_server:get_stats() ++ couch_file_stats:get_stats()}); handle_stats_req(Req) -> send_method_not_allowed(Req, "GET,HEAD"). + +handle_task_status_req(#httpd{method='GET'}=Req) -> + ok = couch_httpd:verify_is_server_admin(Req), + % convert the list of prop lists to a list of json objects + send_json(Req, [{Props} || Props <- couch_task_status:all()]); +handle_task_status_req(Req) -> + send_method_not_allowed(Req, "GET,HEAD"). + + handle_replicate_req(#httpd{user_ctx=UserCtx,method='POST'}=Req) -> {Props} = couch_httpd:json_body(Req), Source = proplists:get_value(<<"source">>, Props), diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl index f0157864..27efc9e7 100644 --- a/src/couchdb/couch_server_sup.erl +++ b/src/couchdb/couch_server_sup.erl @@ -133,6 +133,12 @@ start_primary_services() -> brutal_kill, worker, [couch_log]}, + {couch_task_status, + {couch_task_status, start_link, []}, + permanent, + brutal_kill, + worker, + [couch_task_status]}, {couch_server, {couch_server, sup_start_link, []}, permanent, diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl index 964dc150..d957268f 100644 --- a/src/couchdb/couch_stream.erl +++ b/src/couchdb/couch_stream.erl @@ -60,10 +60,10 @@ open(State, Fd) -> {ok, #stream{pid = Pid, fd = Fd}}. close(#stream{pid = Pid, fd = _Fd}) -> - gen_server:call(Pid, close). + gen_server:call(Pid, close, infinity). get_state(#stream{pid = Pid, fd = _Fd}) -> - gen_server:call(Pid, get_state). + gen_server:call(Pid, get_state, infinity). ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) -> gen_server:call(Pid, {ensure_buffer, Bytes}). @@ -118,7 +118,7 @@ write_term(Stream, Term) -> write(#stream{}, <<>>) -> {ok, {0,0}}; write(#stream{pid = Pid}, Bin) when is_binary(Bin) -> - gen_server:call(Pid, {write, Bin}). + gen_server:call(Pid, {write, Bin}, infinity). init({{Pos, BytesRemaining}, Fd}) -> diff --git a/src/couchdb/couch_task_status.erl b/src/couchdb/couch_task_status.erl new file mode 100644 index 00000000..a0ef490a --- /dev/null +++ b/src/couchdb/couch_task_status.erl @@ -0,0 +1,97 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_task_status). +-behaviour(gen_server). + +% This module allows is used to track the status of long running tasks. +% Long running tasks register (add_task/3) then update their status (update/1) +% and the task and status is added to tasks list. When the tracked task dies +% it will be automatically removed the tracking. To get the tasks list, use the +% all/0 function + +-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2, + code_change/3,add_task/3,update/1,update/2,all/0,set_update_frequency/1]). + +-include("couch_db.hrl"). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +to_binary(L) when is_list(L) -> + ?l2b(L); +to_binary(B) when is_binary(B) -> + B. + +add_task(Type, TaskName, StatusText) -> + put(task_status_update, {{0,0,0}, 0}), + gen_server:call(?MODULE, {add_task, to_binary(Type), + to_binary(TaskName), to_binary(StatusText)}). + +set_update_frequency(Msecs) -> + put(task_status_update, {{0,0,0}, Msecs * 1000}). + +update(StatusText) -> + update("~s", [StatusText]). + +update(Format, Data) -> + {LastUpdateTime, Frequency} = get(task_status_update), + + case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of + true -> + put(task_status_update, {Now, Frequency}), + gen_server:cast(?MODULE, {update_status, self(), ?l2b(io_lib:format(Format, Data))}); + false -> + ok + end. + + +% returns a list of proplists. Each proplist describes a running task. +all() -> + [[{type,Type}, + {task,Task}, + {status,Status}, + {pid,?l2b(pid_to_list(Pid))}] || + {Pid, {Type,Task,Status}} <- ets:tab2list(tasks_by_pid)]. + +init([]) -> + % read configuration settings and register for configuration changes + ets:new(tasks_by_pid, [ordered_set, protected, named_table]), + {ok, nil}. + +terminate(_Reason,_State) -> + ok. + + +handle_call({add_task,Type,TaskName,StatusText}, {From, _}, Server) -> + case ets:lookup(tasks_by_pid, From) of + [] -> + true = ets:insert(tasks_by_pid, {From,{Type,TaskName,StatusText}}), + erlang:monitor(process, From), + {reply, ok, Server}; + [_] -> + {reply, {add_task_error, already_registered}, Server} + end. + +handle_cast({update_status, Pid, StatusText}, Server) -> + [{Pid, {Type,TaskName,_StatusText}}] = ets:lookup(tasks_by_pid, Pid), + true = ets:insert(tasks_by_pid, {Pid, {Type,TaskName,StatusText}}), + {noreply, Server}. + +handle_info({'DOWN', _MonitorRef, _Type, Pid, _Info}, Server) -> + ets:delete(tasks_by_pid, Pid), + {noreply, Server}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl index 68c6c5cb..6d193516 100644 --- a/src/couchdb/couch_view_group.erl +++ b/src/couchdb/couch_view_group.erl @@ -265,12 +265,12 @@ prepare_group({view, RootDir, DbName, GroupId}, ForceReset)-> prepare_group({slow_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) -> case couch_db:open(DbName, []) of {ok, Db} -> - View = #view{map_names=["_temp"], + View = #view{map_names=[<<"_temp">>], id_num=0, btree=nil, def=MapSrc, - reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end}, - {ok, init_group(Db, Fd, #group{type=slow_view, name="_temp", db=Db, + reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end}, + {ok, init_group(Db, Fd, #group{type=slow_view, name= <<"_temp">>, db=Db, views=[View], def_lang=Lang}, nil)}; Error -> Error diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl index 940602dd..b6ae860f 100644 --- a/src/couchdb/couch_view_updater.erl +++ b/src/couchdb/couch_view_updater.erl @@ -16,29 +16,39 @@ -include("couch_db.hrl"). -update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq}=Group) -> - ?LOG_DEBUG("Starting index update.",[]), +update(#group{db=#db{name=DbName}=Db,name=GroupName,current_seq=Seq,purge_seq=PurgeSeq}=Group) -> + couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>), + DbPurgeSeq = couch_db:get_purge_seq(Db), Group2 = if DbPurgeSeq == PurgeSeq -> Group; DbPurgeSeq == PurgeSeq + 1 -> - ?LOG_DEBUG("Purging entries from view index.",[]), + couch_task_status:update(<<"Removing purged entries from view index.">>), purge_index(Group); true -> - ?LOG_DEBUG("Resetting view index due to lost purge entries.",[]), + couch_task_status:update(<<"Resetting view index due to lost purge entries.">>), exit(reset) end, ViewEmptyKVs = [{View, []} || View <- Group2#group.views], % compute on all docs modified since we last computed. - {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}} + TotalChanges = couch_db:count_changes_since(Db, Seq), + % update status every half second + couch_task_status:set_update_frequency(500), + {ok, {_,{UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}} = couch_db:enum_docs_since( Db, Seq, - fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end, - {[], Group2, ViewEmptyKVs, []} + fun(DocInfo, _, {ChangesProcessed, Acc}) -> + couch_task_status:update("Processed ~p of ~p changes (~p%)", + [ChangesProcessed, TotalChanges, (ChangesProcessed*100) div TotalChanges]), + {ok, {ChangesProcessed+1, process_doc(Db, DocInfo, Acc)}} + end, + {0, {[], Group2, ViewEmptyKVs, []}} ), + couch_task_status:set_update_frequency(0), + couch_task_status:update("Finishing."), {Group4, Results} = view_compute(Group3, UncomputedDocs), {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results( UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys), @@ -93,7 +103,7 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, case couch_view_group:design_doc_to_view_group(Doc) of #group{sig=Sig} -> % The same md5 signature, keep on computing - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; + {Docs, Group, ViewKVs, DocIdViewIdKeys}; _ -> exit(reset) end; @@ -101,7 +111,7 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, exit(reset) end; <<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs - {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}}; + {Docs, Group, ViewKVs, DocIdViewIdKeys}; _ -> {Docs2, DocIdViewIdKeys2} = if Deleted -> @@ -119,9 +129,9 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocInfo#doc_info.update_seq), garbage_collect(), ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views], - {ok, {[], Group2, ViewEmptyKeyValues, []}}; + {[], Group2, ViewEmptyKeyValues, []}; false -> - {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2}} + {Docs2, Group, ViewKVs, DocIdViewIdKeys2} end end. |