summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDamien F. Katz <damien@apache.org>2009-01-23 04:15:47 +0000
committerDamien F. Katz <damien@apache.org>2009-01-23 04:15:47 +0000
commit5dd96d8ee5f89524fa0be0f087a21f81b6b68ec4 (patch)
treec4931d99e50a77aebe695279bc8567e1eee7499c /src
parent0a46c330072a3811d98a5c989d4c6486cff83df2 (diff)
Added task status checking, to help debug the progress of long running tasks, like view indexing and compaction.
git-svn-id: https://svn.apache.org/repos/asf/couchdb/trunk@736906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'src')
-rw-r--r--src/couchdb/Makefile.am2
-rw-r--r--src/couchdb/couch_btree.erl2
-rw-r--r--src/couchdb/couch_db.erl14
-rw-r--r--src/couchdb/couch_db_updater.erl32
-rw-r--r--src/couchdb/couch_httpd_misc_handlers.erl13
-rw-r--r--src/couchdb/couch_server_sup.erl6
-rw-r--r--src/couchdb/couch_stream.erl6
-rw-r--r--src/couchdb/couch_task_status.erl97
-rw-r--r--src/couchdb/couch_view_group.erl6
-rw-r--r--src/couchdb/couch_view_updater.erl32
10 files changed, 180 insertions, 30 deletions
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index 8e280c70..1ad5d14a 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -66,6 +66,7 @@ source_files = \
couch_server.erl \
couch_server_sup.erl \
couch_stream.erl \
+ couch_task_status.erl \
couch_util.erl \
couch_view.erl \
couch_view_updater.erl \
@@ -102,6 +103,7 @@ compiled_files = \
couch_server.beam \
couch_server_sup.beam \
couch_stream.beam \
+ couch_task_status.beam \
couch_util.beam \
couch_view.beam \
couch_view_updater.beam \
diff --git a/src/couchdb/couch_btree.erl b/src/couchdb/couch_btree.erl
index 14bc4a1f..f993a0a1 100644
--- a/src/couchdb/couch_btree.erl
+++ b/src/couchdb/couch_btree.erl
@@ -14,7 +14,7 @@
-export([open/2, open/3, query_modify/4, add/2, add_remove/3, foldl/3, foldl/4]).
-export([foldr/3, foldr/4, fold/4, fold/5, full_reduce/1, final_reduce/2]).
--export([fold_reduce/7, lookup/2, get_state/1, set_options/2]).
+-export([fold_reduce/6, fold_reduce/7, lookup/2, get_state/1, set_options/2]).
-export([test/1, test/0, test_remove/2, test_add/2]).
-define(CHUNK_THRESHOLD, 16#4ff).
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index b3102044..c35bb913 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -14,7 +14,7 @@
-behaviour(gen_server).
-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]).
--export([open_ref_counted/2,num_refs/1,monitor/1]).
+-export([open_ref_counted/2,num_refs/1,monitor/1,count_changes_since/2]).
-export([update_doc/3,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
-export([get_missing_revs/2,name/1,doc_to_tree/1,get_update_seq/1,get_committed_update_seq/1]).
@@ -468,6 +468,18 @@ enum_docs_reduce_to_count(Reds) ->
fun couch_db_updater:btree_by_id_reduce/2, Reds),
Count.
+count_changes_since(Db, SinceSeq) ->
+ {ok, Changes} =
+ couch_btree:fold_reduce(Db#db.docinfo_by_seq_btree,
+ SinceSeq + 1, % startkey
+ ok, % endkey
+ fun(_,_) -> true end, % groupkeys
+ fun(_SeqStart, PartialReds, ok) ->
+ {ok, couch_btree:final_reduce(Db#db.docinfo_by_seq_btree, PartialReds)}
+ end,
+ ok),
+ Changes.
+
enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) ->
couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index a6fd0cc6..800730d8 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -590,19 +590,30 @@ copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, Info
copy_compact(Db, NewDb, Retry) ->
+ TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
EnumBySeqFun =
- fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
- case couch_util:should_flush() of
- true ->
+ fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied, TotalCopied}) ->
+ couch_task_status:update("Copied ~p of ~p changes (~p%)",
+ [TotalCopied, TotalChanges, (TotalCopied*100) div TotalChanges]),
+ if TotalCopied rem 1000 == 0 ->
NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
- {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
- false ->
- {ok, {AccNewDb, [DocInfo | AccUncopied]}}
+ if TotalCopied rem 10000 == 0 ->
+ {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}};
+ true ->
+ {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
+ end;
+ true ->
+ {ok, {AccNewDb, [DocInfo | AccUncopied], TotalCopied + 1}}
end
end,
- {ok, {NewDb2, Uncopied}} =
- couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}),
-
+
+ couch_task_status:set_update_frequency(500),
+
+ {ok, {NewDb2, Uncopied, TotalChanges}} =
+ couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, [], 0}),
+
+ couch_task_status:update("Flushing"),
+
NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
% copy misc header values
@@ -620,10 +631,11 @@ start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
case couch_file:open(CompactFile) of
{ok, Fd} ->
- ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
+ couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
{ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
{error, enoent} ->
+ couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
diff --git a/src/couchdb/couch_httpd_misc_handlers.erl b/src/couchdb/couch_httpd_misc_handlers.erl
index 31e821ce..e30f7594 100644
--- a/src/couchdb/couch_httpd_misc_handlers.erl
+++ b/src/couchdb/couch_httpd_misc_handlers.erl
@@ -14,7 +14,8 @@
-export([handle_welcome_req/2,handle_favicon_req/2,handle_utils_dir_req/2,
handle_all_dbs_req/1,handle_replicate_req/1,handle_restart_req/1,
- handle_uuids_req/1,handle_config_req/1,handle_stats_req/1]).
+ handle_uuids_req/1,handle_config_req/1,handle_stats_req/1,
+ handle_task_status_req/1]).
-export([increment_update_seq_req/2]).
@@ -61,12 +62,22 @@ handle_all_dbs_req(#httpd{method='GET'}=Req) ->
handle_all_dbs_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
+
handle_stats_req(#httpd{method='GET'}=Req) ->
ok = couch_httpd:verify_is_server_admin(Req),
send_json(Req, {couch_server:get_stats() ++ couch_file_stats:get_stats()});
handle_stats_req(Req) ->
send_method_not_allowed(Req, "GET,HEAD").
+
+handle_task_status_req(#httpd{method='GET'}=Req) ->
+ ok = couch_httpd:verify_is_server_admin(Req),
+ % convert the list of prop lists to a list of json objects
+ send_json(Req, [{Props} || Props <- couch_task_status:all()]);
+handle_task_status_req(Req) ->
+ send_method_not_allowed(Req, "GET,HEAD").
+
+
handle_replicate_req(#httpd{user_ctx=UserCtx,method='POST'}=Req) ->
{Props} = couch_httpd:json_body(Req),
Source = proplists:get_value(<<"source">>, Props),
diff --git a/src/couchdb/couch_server_sup.erl b/src/couchdb/couch_server_sup.erl
index f0157864..27efc9e7 100644
--- a/src/couchdb/couch_server_sup.erl
+++ b/src/couchdb/couch_server_sup.erl
@@ -133,6 +133,12 @@ start_primary_services() ->
brutal_kill,
worker,
[couch_log]},
+ {couch_task_status,
+ {couch_task_status, start_link, []},
+ permanent,
+ brutal_kill,
+ worker,
+ [couch_task_status]},
{couch_server,
{couch_server, sup_start_link, []},
permanent,
diff --git a/src/couchdb/couch_stream.erl b/src/couchdb/couch_stream.erl
index 964dc150..d957268f 100644
--- a/src/couchdb/couch_stream.erl
+++ b/src/couchdb/couch_stream.erl
@@ -60,10 +60,10 @@ open(State, Fd) ->
{ok, #stream{pid = Pid, fd = Fd}}.
close(#stream{pid = Pid, fd = _Fd}) ->
- gen_server:call(Pid, close).
+ gen_server:call(Pid, close, infinity).
get_state(#stream{pid = Pid, fd = _Fd}) ->
- gen_server:call(Pid, get_state).
+ gen_server:call(Pid, get_state, infinity).
ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
gen_server:call(Pid, {ensure_buffer, Bytes}).
@@ -118,7 +118,7 @@ write_term(Stream, Term) ->
write(#stream{}, <<>>) ->
{ok, {0,0}};
write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
- gen_server:call(Pid, {write, Bin}).
+ gen_server:call(Pid, {write, Bin}, infinity).
init({{Pos, BytesRemaining}, Fd}) ->
diff --git a/src/couchdb/couch_task_status.erl b/src/couchdb/couch_task_status.erl
new file mode 100644
index 00000000..a0ef490a
--- /dev/null
+++ b/src/couchdb/couch_task_status.erl
@@ -0,0 +1,97 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_task_status).
+-behaviour(gen_server).
+
+% This module allows is used to track the status of long running tasks.
+% Long running tasks register (add_task/3) then update their status (update/1)
+% and the task and status is added to tasks list. When the tracked task dies
+% it will be automatically removed the tracking. To get the tasks list, use the
+% all/0 function
+
+-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,handle_info/2,
+ code_change/3,add_task/3,update/1,update/2,all/0,set_update_frequency/1]).
+
+-include("couch_db.hrl").
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+to_binary(L) when is_list(L) ->
+ ?l2b(L);
+to_binary(B) when is_binary(B) ->
+ B.
+
+add_task(Type, TaskName, StatusText) ->
+ put(task_status_update, {{0,0,0}, 0}),
+ gen_server:call(?MODULE, {add_task, to_binary(Type),
+ to_binary(TaskName), to_binary(StatusText)}).
+
+set_update_frequency(Msecs) ->
+ put(task_status_update, {{0,0,0}, Msecs * 1000}).
+
+update(StatusText) ->
+ update("~s", [StatusText]).
+
+update(Format, Data) ->
+ {LastUpdateTime, Frequency} = get(task_status_update),
+
+ case timer:now_diff(Now = now(), LastUpdateTime) >= Frequency of
+ true ->
+ put(task_status_update, {Now, Frequency}),
+ gen_server:cast(?MODULE, {update_status, self(), ?l2b(io_lib:format(Format, Data))});
+ false ->
+ ok
+ end.
+
+
+% returns a list of proplists. Each proplist describes a running task.
+all() ->
+ [[{type,Type},
+ {task,Task},
+ {status,Status},
+ {pid,?l2b(pid_to_list(Pid))}] ||
+ {Pid, {Type,Task,Status}} <- ets:tab2list(tasks_by_pid)].
+
+init([]) ->
+ % read configuration settings and register for configuration changes
+ ets:new(tasks_by_pid, [ordered_set, protected, named_table]),
+ {ok, nil}.
+
+terminate(_Reason,_State) ->
+ ok.
+
+
+handle_call({add_task,Type,TaskName,StatusText}, {From, _}, Server) ->
+ case ets:lookup(tasks_by_pid, From) of
+ [] ->
+ true = ets:insert(tasks_by_pid, {From,{Type,TaskName,StatusText}}),
+ erlang:monitor(process, From),
+ {reply, ok, Server};
+ [_] ->
+ {reply, {add_task_error, already_registered}, Server}
+ end.
+
+handle_cast({update_status, Pid, StatusText}, Server) ->
+ [{Pid, {Type,TaskName,_StatusText}}] = ets:lookup(tasks_by_pid, Pid),
+ true = ets:insert(tasks_by_pid, {Pid, {Type,TaskName,StatusText}}),
+ {noreply, Server}.
+
+handle_info({'DOWN', _MonitorRef, _Type, Pid, _Info}, Server) ->
+ ets:delete(tasks_by_pid, Pid),
+ {noreply, Server}.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
diff --git a/src/couchdb/couch_view_group.erl b/src/couchdb/couch_view_group.erl
index 68c6c5cb..6d193516 100644
--- a/src/couchdb/couch_view_group.erl
+++ b/src/couchdb/couch_view_group.erl
@@ -265,12 +265,12 @@ prepare_group({view, RootDir, DbName, GroupId}, ForceReset)->
prepare_group({slow_view, DbName, Fd, Lang, MapSrc, RedSrc}, _ForceReset) ->
case couch_db:open(DbName, []) of
{ok, Db} ->
- View = #view{map_names=["_temp"],
+ View = #view{map_names=[<<"_temp">>],
id_num=0,
btree=nil,
def=MapSrc,
- reduce_funs= if RedSrc==[] -> []; true -> [{"_temp", RedSrc}] end},
- {ok, init_group(Db, Fd, #group{type=slow_view, name="_temp", db=Db,
+ reduce_funs= if RedSrc==[] -> []; true -> [{<<"_temp">>, RedSrc}] end},
+ {ok, init_group(Db, Fd, #group{type=slow_view, name= <<"_temp">>, db=Db,
views=[View], def_lang=Lang}, nil)};
Error ->
Error
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index 940602dd..b6ae860f 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -16,29 +16,39 @@
-include("couch_db.hrl").
-update(#group{db=Db,current_seq=Seq,purge_seq=PurgeSeq}=Group) ->
- ?LOG_DEBUG("Starting index update.",[]),
+update(#group{db=#db{name=DbName}=Db,name=GroupName,current_seq=Seq,purge_seq=PurgeSeq}=Group) ->
+ couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),
+
DbPurgeSeq = couch_db:get_purge_seq(Db),
Group2 =
if DbPurgeSeq == PurgeSeq ->
Group;
DbPurgeSeq == PurgeSeq + 1 ->
- ?LOG_DEBUG("Purging entries from view index.",[]),
+ couch_task_status:update(<<"Removing purged entries from view index.">>),
purge_index(Group);
true ->
- ?LOG_DEBUG("Resetting view index due to lost purge entries.",[]),
+ couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
exit(reset)
end,
ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
% compute on all docs modified since we last computed.
- {ok, {UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}
+ TotalChanges = couch_db:count_changes_since(Db, Seq),
+ % update status every half second
+ couch_task_status:set_update_frequency(500),
+ {ok, {_,{UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}}
= couch_db:enum_docs_since(
Db,
Seq,
- fun(DocInfo, _, Acc) -> process_doc(Db, DocInfo, Acc) end,
- {[], Group2, ViewEmptyKVs, []}
+ fun(DocInfo, _, {ChangesProcessed, Acc}) ->
+ couch_task_status:update("Processed ~p of ~p changes (~p%)",
+ [ChangesProcessed, TotalChanges, (ChangesProcessed*100) div TotalChanges]),
+ {ok, {ChangesProcessed+1, process_doc(Db, DocInfo, Acc)}}
+ end,
+ {0, {[], Group2, ViewEmptyKVs, []}}
),
+ couch_task_status:set_update_frequency(0),
+ couch_task_status:update("Finishing."),
{Group4, Results} = view_compute(Group3, UncomputedDocs),
{ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(
UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
@@ -93,7 +103,7 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs,
case couch_view_group:design_doc_to_view_group(Doc) of
#group{sig=Sig} ->
% The same md5 signature, keep on computing
- {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
+ {Docs, Group, ViewKVs, DocIdViewIdKeys};
_ ->
exit(reset)
end;
@@ -101,7 +111,7 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs,
exit(reset)
end;
<<?DESIGN_DOC_PREFIX, _/binary>> -> % we skip design docs
- {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys}};
+ {Docs, Group, ViewKVs, DocIdViewIdKeys};
_ ->
{Docs2, DocIdViewIdKeys2} =
if Deleted ->
@@ -119,9 +129,9 @@ process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs,
DocInfo#doc_info.update_seq),
garbage_collect(),
ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
- {ok, {[], Group2, ViewEmptyKeyValues, []}};
+ {[], Group2, ViewEmptyKeyValues, []};
false ->
- {ok, {Docs2, Group, ViewKVs, DocIdViewIdKeys2}}
+ {Docs2, Group, ViewKVs, DocIdViewIdKeys2}
end
end.