path: root/1.1.x/src/couchdb/couch_stats_aggregator.erl
diff options
Diffstat (limited to '1.1.x/src/couchdb/couch_stats_aggregator.erl')
1 files changed, 297 insertions, 0 deletions
diff --git a/1.1.x/src/couchdb/couch_stats_aggregator.erl b/1.1.x/src/couchdb/couch_stats_aggregator.erl
new file mode 100644
index 00000000..6090355d
--- /dev/null
+++ b/1.1.x/src/couchdb/couch_stats_aggregator.erl
@@ -0,0 +1,297 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-export([start/0, start/1, stop/0]).
+-export([all/0, all/1, get/1, get/2, get_json/1, get_json/2, collect_sample/0]).
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+-record(aggregate, {
+ description = <<"">>,
+ seconds = 0,
+ count = 0,
+ current = null,
+ sum = null,
+ mean = null,
+ variance = null,
+ stddev = null,
+ min = null,
+ max = null,
+ samples = []
+start() ->
+ PrivDir = couch_util:priv_dir(),
+ start(filename:join(PrivDir, "stat_descriptions.cfg")).
+start(FileName) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [FileName], []).
+stop() ->
+ gen_server:cast(?MODULE, stop).
+all() ->
+ ?MODULE:all(0).
+all(Time) when is_binary(Time) ->
+ ?MODULE:all(list_to_integer(binary_to_list(Time)));
+all(Time) when is_atom(Time) ->
+ ?MODULE:all(list_to_integer(atom_to_list(Time)));
+all(Time) when is_integer(Time) ->
+ Aggs = ets:match(?MODULE, {{'$1', Time}, '$2'}),
+ Stats = lists:map(fun([Key, Agg]) -> {Key, Agg} end, Aggs),
+ case Stats of
+ [] ->
+ {[]};
+ _ ->
+ Ret = lists:foldl(fun({{Mod, Key}, Agg}, Acc) ->
+ CurrKeys = case proplists:lookup(Mod, Acc) of
+ none -> [];
+ {Mod, {Keys}} -> Keys
+ end,
+ NewMod = {[{Key, to_json_term(Agg)} | CurrKeys]},
+ [{Mod, NewMod} | proplists:delete(Mod, Acc)]
+ end, [], Stats),
+ {Ret}
+ end.
+get(Key) ->
+ ?MODULE:get(Key, 0).
+get(Key, Time) when is_binary(Time) ->
+ ?MODULE:get(Key, list_to_integer(binary_to_list(Time)));
+get(Key, Time) when is_atom(Time) ->
+ ?MODULE:get(Key, list_to_integer(atom_to_list(Time)));
+get(Key, Time) when is_integer(Time) ->
+ case ets:lookup(?MODULE, {make_key(Key), Time}) of
+ [] -> #aggregate{seconds=Time};
+ [{_, Agg}] -> Agg
+ end.
+get_json(Key) ->
+ get_json(Key, 0).
+get_json(Key, Time) ->
+ to_json_term(?MODULE:get(Key, Time)).
+collect_sample() ->
+ gen_server:call(?MODULE, collect_sample, infinity).
+init(StatDescsFileName) ->
+ % Create an aggregate entry for each {description, rate} pair.
+ ets:new(?MODULE, [named_table, set, protected]),
+ SampleStr = couch_config:get("stats", "samples", "[0]"),
+ {ok, Samples} = couch_util:parse_term(SampleStr),
+ {ok, Descs} = file:consult(StatDescsFileName),
+ lists:foreach(fun({Sect, Key, Value}) ->
+ lists:foreach(fun(Secs) ->
+ Agg = #aggregate{
+ description=list_to_binary(Value),
+ seconds=Secs
+ },
+ ets:insert(?MODULE, {{{Sect, Key}, Secs}, Agg})
+ end, Samples)
+ end, Descs),
+ Self = self(),
+ ok = couch_config:register(
+ fun("stats", _) -> exit(Self, config_change) end
+ ),
+ Rate = list_to_integer(couch_config:get("stats", "rate", "1000")),
+ % TODO: Add timer_start to kernel start options.
+ {ok, TRef} = timer:apply_after(Rate, ?MODULE, collect_sample, []),
+ {ok, {TRef, Rate}}.
+terminate(_Reason, {TRef, _Rate}) ->
+ timer:cancel(TRef),
+ ok.
+handle_call(collect_sample, _, {OldTRef, SampleInterval}) ->
+ timer:cancel(OldTRef),
+ {ok, TRef} = timer:apply_after(SampleInterval, ?MODULE, collect_sample, []),
+ % Gather new stats values to add.
+ Incs = lists:map(fun({Key, Value}) ->
+ {Key, {incremental, Value}}
+ end, couch_stats_collector:all(incremental)),
+ Abs = lists:map(fun({Key, Values}) ->
+ couch_stats_collector:clear(Key),
+ Values2 = case Values of
+ X when is_list(X) -> X;
+ Else -> [Else]
+ end,
+ {_, Mean} = lists:foldl(fun(Val, {Count, Curr}) ->
+ {Count+1, Curr + (Val - Curr) / (Count+1)}
+ end, {0, 0}, Values2),
+ {Key, {absolute, Mean}}
+ end, couch_stats_collector:all(absolute)),
+ Values = Incs ++ Abs,
+ Now = erlang:now(),
+ lists:foreach(fun({{Key, Rate}, Agg}) ->
+ NewAgg = case proplists:lookup(Key, Values) of
+ none ->
+ rem_values(Now, Agg);
+ {Key, {Type, Value}} ->
+ NewValue = new_value(Type, Value, Agg#aggregate.current),
+ Agg2 = add_value(Now, NewValue, Agg),
+ rem_values(Now, Agg2)
+ end,
+ ets:insert(?MODULE, {{Key, Rate}, NewAgg})
+ end, ets:tab2list(?MODULE)),
+ {reply, ok, {TRef, SampleInterval}}.
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+handle_info(_Info, State) ->
+ {noreply, State}.
+code_change(_OldVersion, State, _Extra) ->
+ {ok, State}.
+new_value(incremental, Value, null) ->
+ Value;
+new_value(incremental, Value, Current) ->
+ Value - Current;
+new_value(absolute, Value, _Current) ->
+ Value.
+add_value(Time, Value, #aggregate{count=Count, seconds=Secs}=Agg) when Count < 1 ->
+ Samples = case Secs of
+ 0 -> [];
+ _ -> [{Time, Value}]
+ end,
+ Agg#aggregate{
+ count=1,
+ current=Value,
+ sum=Value,
+ mean=Value,
+ variance=0.0,
+ stddev=null,
+ min=Value,
+ max=Value,
+ samples=Samples
+ };
+add_value(Time, Value, Agg) ->
+ #aggregate{
+ count=Count,
+ current=Current,
+ sum=Sum,
+ mean=Mean,
+ variance=Variance,
+ samples=Samples
+ } = Agg,
+ NewCount = Count + 1,
+ NewMean = Mean + (Value - Mean) / NewCount,
+ NewVariance = Variance + (Value - Mean) * (Value - NewMean),
+ StdDev = case NewCount > 1 of
+ false -> null;
+ _ -> math:sqrt(NewVariance / (NewCount - 1))
+ end,
+ Agg2 = Agg#aggregate{
+ count=NewCount,
+ current=Current + Value,
+ sum=Sum + Value,
+ mean=NewMean,
+ variance=NewVariance,
+ stddev=StdDev,
+ min=lists:min([Agg#aggregate.min, Value]),
+ max=lists:max([Agg#aggregate.max, Value])
+ },
+ case Agg2#aggregate.seconds of
+ 0 -> Agg2;
+ _ -> Agg2#aggregate{samples=[{Time, Value} | Samples]}
+ end.
+rem_values(Time, Agg) ->
+ Seconds = Agg#aggregate.seconds,
+ Samples = Agg#aggregate.samples,
+ Pred = fun({When, _Value}) ->
+ timer:now_diff(Time, When) =< (Seconds * 1000000)
+ end,
+ {Keep, Remove} = lists:splitwith(Pred, Samples),
+ Agg2 = lists:foldl(fun({_, Value}, Acc) ->
+ rem_value(Value, Acc)
+ end, Agg, Remove),
+ Agg2#aggregate{samples=Keep}.
+rem_value(_Value, #aggregate{count=Count, seconds=Secs}) when Count =< 1 ->
+ #aggregate{seconds=Secs};
+rem_value(Value, Agg) ->
+ #aggregate{
+ count=Count,
+ sum=Sum,
+ mean=Mean,
+ variance=Variance
+ } = Agg,
+ OldMean = (Mean * Count - Value) / (Count - 1),
+ OldVariance = Variance - (Value - OldMean) * (Value - Mean),
+ OldCount = Count - 1,
+ StdDev = case OldCount > 1 of
+ false -> null;
+ _ -> math:sqrt(clamp_value(OldVariance / (OldCount - 1)))
+ end,
+ Agg#aggregate{
+ count=OldCount,
+ sum=Sum-Value,
+ mean=clamp_value(OldMean),
+ variance=clamp_value(OldVariance),
+ stddev=StdDev
+ }.
+to_json_term(Agg) ->
+ {Min, Max} = case Agg#aggregate.seconds > 0 of
+ false ->
+ {Agg#aggregate.min, Agg#aggregate.max};
+ _ ->
+ case length(Agg#aggregate.samples) > 0 of
+ true ->
+ Extract = fun({_Time, Value}) -> Value end,
+ Samples = lists:map(Extract, Agg#aggregate.samples),
+ {lists:min(Samples), lists:max(Samples)};
+ _ ->
+ {null, null}
+ end
+ end,
+ {[
+ {description, Agg#aggregate.description},
+ {current, round_value(Agg#aggregate.sum)},
+ {sum, round_value(Agg#aggregate.sum)},
+ {mean, round_value(Agg#aggregate.mean)},
+ {stddev, round_value(Agg#aggregate.stddev)},
+ {min, Min},
+ {max, Max}
+ ]}.
+make_key({Mod, Val}) when is_integer(Val) ->
+ {Mod, list_to_atom(integer_to_list(Val))};
+make_key(Key) ->
+ Key.
+round_value(Val) when not is_number(Val) ->
+ Val;
+round_value(Val) when Val == 0 ->
+ Val;
+round_value(Val) ->
+ erlang:round(Val * 1000.0) / 1000.0.
+clamp_value(Val) when Val > 0.00000000000001 ->
+ Val;
+clamp_value(_) ->
+ 0.0.