emqx/apps/emqx_utils/src/emqx_metrics_worker.erl

699 lines
21 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_metrics_worker).
-behaviour(gen_server).
-include_lib("stdlib/include/ms_transform.hrl").
%% API functions
-export([
start_link/1,
stop/1,
child_spec/1,
child_spec/2
]).
-export([
inc/3,
inc/4,
observe/4,
get/3,
get_gauge/3,
set_gauge/5,
shift_gauge/5,
get_gauges/2,
delete_gauges/2,
get_rate/2,
get_slide/2,
get_slide/3,
get_counters/2,
create_metrics/3,
create_metrics/4,
clear_metrics/2,
reset_metrics/2,
has_metrics/2
]).
-export([get_metrics/2]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_info/2,
handle_cast/2,
code_change/3,
terminate/2
]).
-ifndef(TEST).
-define(SECS_5M, 300).
-define(SAMPLING, 10).
-else.
%% Use 5 secs average rate instead of 5 mins in case of testing
-define(SECS_5M, 5).
-define(SAMPLING, 1).
-endif.
-export_type([metrics/0, handler_name/0, metric_id/0, metric_spec/0]).
% Default
-type metric_type() ::
%% Simple counter
counter
%% Sliding window average
| slide.
-type metric_spec() :: {metric_type(), atom()}.
-type rate() :: #{
current := float(),
max := float(),
last5m := float()
}.
-type metrics() :: #{
counters := #{metric_name() => integer()},
gauges := #{metric_name() => integer()},
slides := #{metric_name() => number()},
rate := #{metric_name() => rate()}
}.
-type handler_name() :: atom().
%% metric_id() is actually a resource id
-type metric_id() :: binary() | atom().
-type metric_name() :: atom().
-type worker_id() :: term().
-define(CntrRef(Name), {?MODULE, Name}).
-define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
-define(GAUGE_TABLE(NAME),
list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(NAME) ++ "_gauge")
).
-record(rate, {
max = 0 :: number(),
current = 0 :: number(),
last5m = 0 :: number(),
%% metadata for calculating the avg rate
tick = 1 :: number(),
last_v = 0 :: number(),
%% metadata for calculating the 5min avg rate
last5m_acc = 0 :: number(),
last5m_smpl = [] :: list()
}).
-record(slide_datapoint, {
sum :: non_neg_integer(),
samples :: non_neg_integer(),
time :: non_neg_integer()
}).
-record(slide, {
%% Total number of samples through the history
n_samples = 0 :: non_neg_integer(),
datapoints = [] :: [#slide_datapoint{}]
}).
-record(state, {
metric_ids = sets:new(),
rates :: #{metric_id() => #{metric_name() => #rate{}}} | undefined,
slides = #{} :: #{metric_id() => #{metric_name() => #slide{}}}
}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
-spec child_spec(handler_name()) -> supervisor:child_spec().
child_spec(Name) ->
child_spec(emqx_metrics_worker, Name).
child_spec(ChldName, Name) ->
#{
id => ChldName,
start => {emqx_metrics_worker, start_link, [Name]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_metrics_worker]
}.
-spec create_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()]) ->
ok | {error, term()}.
create_metrics(Name, Id, Metrics) ->
Metrics1 = desugar(Metrics),
Counters = filter_counters(Metrics1),
create_metrics(Name, Id, Metrics1, Counters).
-spec create_metrics(handler_name(), metric_id(), [metric_spec() | metric_name()], [atom()]) ->
ok | {error, term()}.
create_metrics(Name, Id, Metrics, RateMetrics) ->
Metrics1 = desugar(Metrics),
gen_server:call(Name, {create_metrics, Id, Metrics1, RateMetrics}).
-spec clear_metrics(handler_name(), metric_id()) -> ok.
clear_metrics(Name, Id) ->
gen_server:call(Name, {delete_metrics, Id}).
-spec reset_metrics(handler_name(), metric_id()) -> ok.
reset_metrics(Name, Id) ->
gen_server:call(Name, {reset_metrics, Id}).
-spec has_metrics(handler_name(), metric_id()) -> boolean().
has_metrics(Name, Id) ->
case get_ref(Name, Id) of
not_found -> false;
_ -> true
end.
-spec get(handler_name(), metric_id(), metric_name() | integer()) -> number().
get(Name, Id, Metric) ->
case get_ref(Name, Id) of
not_found ->
0;
Ref when is_atom(Metric) ->
counters:get(Ref, idx_metric(Name, Id, counter, Metric));
Ref when is_integer(Metric) ->
counters:get(Ref, Metric)
end.
-spec get_rate(handler_name(), metric_id()) -> map().
get_rate(Name, Id) ->
gen_server:call(Name, {get_rate, Id}).
-spec get_counters(handler_name(), metric_id()) -> map().
get_counters(Name, Id) ->
maps:map(
fun(_Metric, Index) ->
get(Name, Id, Index)
end,
get_indexes(Name, counter, Id)
).
-spec get_slide(handler_name(), metric_id()) -> map().
get_slide(Name, Id) ->
gen_server:call(Name, {get_slide, Id}).
%% Get the average for a specified sliding window period.
%%
%% It will only account for the samples recorded in the past `Window' seconds.
-spec get_slide(handler_name(), metric_id(), non_neg_integer()) -> number().
get_slide(Name, Id, Window) ->
gen_server:call(Name, {get_slide, Id, Window}).
-spec reset_counters(handler_name(), metric_id()) -> ok.
reset_counters(Name, Id) ->
case get_ref(Name, Id) of
not_found ->
ok;
Ref ->
#{size := Size} = counters:info(Ref),
lists:foreach(fun(Idx) -> counters:put(Ref, Idx, 0) end, lists:seq(1, Size))
end.
-spec get_metrics(handler_name(), metric_id()) -> metrics().
get_metrics(Name, Id) ->
#{
rate => get_rate(Name, Id),
counters => get_counters(Name, Id),
gauges => get_gauges(Name, Id),
slides => get_slide(Name, Id)
}.
-spec inc(handler_name(), metric_id(), atom()) -> ok.
inc(Name, Id, Metric) ->
inc(Name, Id, Metric, 1).
-spec inc(handler_name(), metric_id(), metric_name(), integer()) -> ok.
inc(Name, Id, Metric, Val) ->
counters:add(get_ref(Name, Id), idx_metric(Name, Id, counter, Metric), Val).
%% Add a sample to the slide.
%%
%% Slide is short for "sliding window average" type of metric.
%%
%% It allows to monitor an average of some observed values in time,
%% and it's mainly used for performance analysis. For example, it can
%% be used to report run time of operations.
%%
%% Consider an example:
%%
%% ```
%% emqx_metrics_worker:create_metrics(Name, Id, [{slide, a}]),
%% emqx_metrics_worker:observe(Name, Id, a, 10),
%% emqx_metrics_worker:observe(Name, Id, a, 30),
%% #{a := 20} = emqx_metrics_worker:get_slide(Name, Id, _Window = 1).
%% '''
%%
%% After recording 2 samples, this metric becomes 20 (the average of 10 and 30).
%%
%% But after 1 second it becomes 0 again, unless new samples are recorded.
%%
-spec observe(handler_name(), metric_id(), atom(), integer()) -> ok.
observe(Name, Id, Metric, Val) ->
#{ref := CRef, slide := Idx} = maps:get(Id, get_pterm(Name)),
Index = maps:get(Metric, Idx),
%% Update sum:
counters:add(CRef, Index, Val),
%% Update number of samples:
counters:add(CRef, Index + 1, 1).
-spec set_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok.
set_gauge(Name, Id, WorkerId, Metric, Val) ->
Table = ?GAUGE_TABLE(Name),
try
true = ets:insert(Table, {{Id, Metric, WorkerId}, Val}),
ok
catch
error:badarg ->
ok
end.
-spec shift_gauge(handler_name(), metric_id(), worker_id(), metric_name(), integer()) -> ok.
shift_gauge(Name, Id, WorkerId, Metric, Val) ->
Table = ?GAUGE_TABLE(Name),
try
_ = ets:update_counter(
Table,
{Id, Metric, WorkerId},
Val,
{{Id, Metric, WorkerId}, 0}
),
ok
catch
error:badarg ->
ok
end.
-spec get_gauge(handler_name(), metric_id(), metric_name()) -> integer().
get_gauge(Name, Id, Metric) ->
Table = ?GAUGE_TABLE(Name),
MatchSpec =
ets:fun2ms(
fun({{Id0, Metric0, _WorkerId}, Val}) when Id0 =:= Id, Metric0 =:= Metric ->
Val
end
),
try
lists:sum(ets:select(Table, MatchSpec))
catch
error:badarg ->
0
end.
-spec get_gauges(handler_name(), metric_id()) -> map().
get_gauges(Name, Id) ->
Table = ?GAUGE_TABLE(Name),
MatchSpec =
ets:fun2ms(
fun({{Id0, Metric, _WorkerId}, Val}) when Id0 =:= Id ->
{Metric, Val}
end
),
try
lists:foldr(
fun({Metric, Val}, Acc) ->
maps:update_with(Metric, fun(X) -> X + Val end, Val, Acc)
end,
#{},
ets:select(Table, MatchSpec)
)
catch
error:badarg ->
#{}
end.
-spec delete_gauges(handler_name(), metric_id()) -> ok.
delete_gauges(Name, Id) ->
Table = ?GAUGE_TABLE(Name),
MatchSpec =
ets:fun2ms(
fun({{Id0, _Metric, _WorkerId}, _Val}) when Id0 =:= Id ->
true
end
),
try
_ = ets:select_delete(Table, MatchSpec),
ok
catch
error:badarg ->
ok
end.
start_link(Name) ->
gen_server:start_link({local, Name}, ?MODULE, Name, []).
init(Name) ->
erlang:process_flag(trap_exit, true),
%% the rate metrics
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
persistent_term:put(?CntrRef(Name), #{}),
_ = ets:new(?GAUGE_TABLE(Name), [named_table, ordered_set, public, {write_concurrency, true}]),
{ok, #state{}}.
handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
{reply, make_rate(0, 0, 0), State};
handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
{reply,
case maps:get(Id, Rates, undefined) of
undefined -> make_rate(0, 0, 0);
RatesPerId -> format_rates_of_id(RatesPerId)
end, State};
handle_call(
{create_metrics, Id, Metrics, RateMetrics},
_From,
State = #state{metric_ids = MIDs, rates = Rates, slides = Slides}
) ->
case RateMetrics -- filter_counters(Metrics) of
[] ->
RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
Rate1 =
case Rates of
undefined -> #{Id => RatePerId};
_ -> Rates#{Id => RatePerId}
end,
Slides1 = Slides#{Id => create_slides(Metrics)},
{reply, create_counters(get_self_name(), Id, Metrics), State#state{
metric_ids = sets:add_element(Id, MIDs),
rates = Rate1,
slides = Slides1
}};
_ ->
{reply, {error, not_super_set_of, {RateMetrics, Metrics}}, State}
end;
handle_call(
{delete_metrics, Id},
_From,
State = #state{metric_ids = MIDs, rates = Rates, slides = Slides}
) ->
Name = get_self_name(),
delete_counters(Name, Id),
delete_gauges(Name, Id),
{reply, ok, State#state{
metric_ids = sets:del_element(Id, MIDs),
rates =
case Rates of
undefined -> undefined;
_ -> maps:remove(Id, Rates)
end,
slides = maps:remove(Id, Slides)
}};
handle_call(
{reset_metrics, Id},
_From,
State = #state{rates = Rates, slides = Slides}
) ->
delete_gauges(get_self_name(), Id),
NewRates =
case Rates of
undefined ->
undefined;
_ ->
ResetRate =
maps:map(
fun(_Key, _Value) -> #rate{} end,
maps:get(Id, Rates, #{})
),
maps:put(Id, ResetRate, Rates)
end,
SlideSpecs = [{slide, I} || I <- maps:keys(maps:get(Id, Slides, #{}))],
NewSlides = Slides#{Id => create_slides(SlideSpecs)},
{reply, reset_counters(get_self_name(), Id), State#state{
rates =
NewRates,
slides = NewSlides
}};
handle_call({get_slide, Id}, _From, State = #state{slides = Slides}) ->
SlidesForID = maps:get(Id, Slides, #{}),
{reply, maps:map(fun(Metric, Slide) -> do_get_slide(Id, Metric, Slide) end, SlidesForID),
State};
handle_call({get_slide, Id, Window}, _From, State = #state{slides = Slides}) ->
SlidesForID = maps:get(Id, Slides, #{}),
{reply,
maps:map(fun(Metric, Slide) -> do_get_slide(Window, Id, Metric, Slide) end, SlidesForID),
State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(ticking, State = #state{rates = undefined}) ->
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
{noreply, State};
handle_info(ticking, State = #state{rates = Rates0, slides = Slides0}) ->
Rates =
maps:map(
fun(Id, RatesPerID) ->
maps:map(
fun(Metric, Rate) ->
calculate_rate(get(get_self_name(), Id, Metric), Rate)
end,
RatesPerID
)
end,
Rates0
),
Slides =
maps:map(
fun(Id, SlidesPerID) ->
maps:map(
fun(Metric, Slide) ->
update_slide(Id, Metric, Slide)
end,
SlidesPerID
)
end,
Slides0
),
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
{noreply, State#state{rates = Rates, slides = Slides}};
handle_info(_Info, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, #state{metric_ids = MIDs}) ->
Name = get_self_name(),
[delete_counters(Name, Id) || Id <- sets:to_list(MIDs)],
persistent_term:erase(?CntrRef(Name)).
stop(Name) ->
try
gen_server:stop(Name, normal, 10000)
catch
exit:noproc ->
ok;
exit:timeout ->
%% after timeout, the process killed by gen.erl
ok
end.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
create_counters(_Name, _Id, []) ->
error({create_counter_error, must_provide_a_list_of_metrics});
create_counters(Name, Id, Metrics) ->
%% backup the old counters
OlderCounters = maps:with(filter_counters(Metrics), get_counters(Name, Id)),
%% create the new counter
{Size, Indexes} = create_metric_indexes(Metrics),
Counters = get_pterm(Name),
CntrRef = counters:new(Size, [write_concurrency]),
persistent_term:put(
?CntrRef(Name),
Counters#{Id => Indexes#{ref => CntrRef}}
),
%% Restore the old counters. Slides are not restored, since they
%% are periodically zeroed anyway. We do lose some samples in the
%% current interval, but that's acceptable for now.
lists:foreach(
fun({Metric, N}) ->
inc(Name, Id, Metric, N)
end,
maps:to_list(OlderCounters)
).
create_metric_indexes(Metrics) ->
create_metric_indexes(Metrics, 1, [], []).
create_metric_indexes([], Size, Counters, Slides) ->
{Size, #{counter => maps:from_list(Counters), slide => maps:from_list(Slides)}};
create_metric_indexes([{counter, Id} | Rest], Index, Counters, Slides) ->
create_metric_indexes(Rest, Index + 1, [{Id, Index} | Counters], Slides);
create_metric_indexes([{slide, Id} | Rest], Index, Counters, Slides) ->
create_metric_indexes(Rest, Index + 2, Counters, [{Id, Index} | Slides]).
delete_counters(Name, Id) ->
persistent_term:put(?CntrRef(Name), maps:remove(Id, get_pterm(Name))).
get_ref(Name, Id) ->
case maps:find(Id, get_pterm(Name)) of
{ok, #{ref := Ref}} -> Ref;
error -> not_found
end.
idx_metric(Name, Id, Type, Metric) ->
maps:get(Metric, get_indexes(Name, Type, Id)).
get_indexes(Name, Type, Id) ->
case maps:find(Id, get_pterm(Name)) of
{ok, #{Type := Indexes}} -> Indexes;
error -> #{}
end.
get_pterm(Name) ->
persistent_term:get(?CntrRef(Name), #{}).
calculate_rate(_CurrVal, undefined) ->
undefined;
calculate_rate(CurrVal, #rate{
max = MaxRate0,
last_v = LastVal,
tick = Tick,
last5m_acc = AccRate5Min0,
last5m_smpl = Last5MinSamples0
}) ->
%% calculate the current rate based on the last value of the counter
CurrRate = (CurrVal - LastVal) / ?SAMPLING,
%% calculate the max rate since the emqx startup
MaxRate =
case MaxRate0 >= CurrRate of
true -> MaxRate0;
false -> CurrRate
end,
%% calculate the average rate in last 5 mins
{Last5MinSamples, Acc5Min, Last5Min} =
case Tick =< ?SAMPCOUNT_5M of
true ->
Acc = AccRate5Min0 + CurrRate,
{lists:reverse([CurrRate | lists:reverse(Last5MinSamples0)]), Acc, Acc / Tick};
false ->
[FirstRate | Rates] = Last5MinSamples0,
Acc = AccRate5Min0 + CurrRate - FirstRate,
{lists:reverse([CurrRate | lists:reverse(Rates)]), Acc, Acc / ?SAMPCOUNT_5M}
end,
#rate{
max = MaxRate,
current = CurrRate,
last5m = Last5Min,
last_v = CurrVal,
last5m_acc = Acc5Min,
last5m_smpl = Last5MinSamples,
tick = Tick + 1
}.
do_get_slide(Id, Metric, S = #slide{n_samples = NSamples}) ->
#{
n_samples => NSamples,
current => do_get_slide(2, Id, Metric, S),
last5m => do_get_slide(?SECS_5M, Id, Metric, S)
}.
do_get_slide(Window, Id, Metric, #slide{datapoints = DP0}) ->
Datapoint = get_slide_datapoint(Id, Metric),
{N, Sum} = get_slide_window(os:system_time(second) - Window, [Datapoint | DP0], 0, 0),
case N > 0 of
true -> Sum div N;
false -> 0
end.
get_slide_window(_StartTime, [], N, S) ->
{N, S};
get_slide_window(StartTime, [#slide_datapoint{time = T} | _], N, S) when T < StartTime ->
{N, S};
get_slide_window(StartTime, [#slide_datapoint{samples = N, sum = S} | Rest], AccN, AccS) ->
get_slide_window(StartTime, Rest, AccN + N, AccS + S).
get_slide_datapoint(Id, Metric) ->
Name = get_self_name(),
CRef = get_ref(Name, Id),
Index = idx_metric(Name, Id, slide, Metric),
Total = counters:get(CRef, Index),
N = counters:get(CRef, Index + 1),
#slide_datapoint{
sum = Total,
samples = N,
time = os:system_time(second)
}.
update_slide(Id, Metric, Slide0 = #slide{n_samples = NSamples, datapoints = DPs}) ->
Datapoint = get_slide_datapoint(Id, Metric),
%% Reset counters:
Name = get_self_name(),
CRef = get_ref(Name, Id),
Index = idx_metric(Name, Id, slide, Metric),
counters:put(CRef, Index, 0),
counters:put(CRef, Index + 1, 0),
Slide0#slide{
datapoints = [Datapoint | lists:droplast(DPs)],
n_samples = Datapoint#slide_datapoint.samples + NSamples
}.
format_rates_of_id(RatesPerId) ->
maps:map(
fun(_Metric, Rates) ->
format_rate(Rates)
end,
RatesPerId
).
format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) ->
make_rate(Current, Max, Last5Min).
make_rate(Current, Max, Last5Min) ->
#{
current => precision(Current, 2),
max => precision(Max, 2),
last5m => precision(Last5Min, 2)
}.
precision(Float, N) ->
Base = math:pow(10, N),
round(Float * Base) / Base.
desugar(Metrics) ->
lists:map(
fun
(Atom) when is_atom(Atom) ->
{counter, Atom};
(Spec = {_, _}) ->
Spec
end,
Metrics
).
filter_counters(Metrics) ->
[K || {counter, K} <- Metrics].
create_slides(Metrics) ->
EmptyDatapoints = [
#slide_datapoint{sum = 0, samples = 0, time = 0}
|| _ <- lists:seq(1, ?SECS_5M div ?SAMPLING)
],
maps:from_list([{K, #slide{datapoints = EmptyDatapoints}} || {slide, K} <- Metrics]).
get_self_name() ->
{registered_name, Name} = process_info(self(), registered_name),
Name.