refactor(metrics): improve the metrics for bridges/rules

This commit is contained in:
Shawn 2022-01-04 19:38:02 +08:00
parent 80a06c7201
commit c1212c4b6d
2 changed files with 152 additions and 112 deletions

View File

@ -28,17 +28,15 @@
, inc/4
, get/3
, get_rate/2
, create_metrics/2
, get_all_counters/2
, create_metrics/3
, create_metrics/4
, clear_metrics/2
]).
-export([ get_metrics/2
, get_matched/2
, get_success/2
, get_failed/2
, inc_matched/2
, inc_success/2
, inc_failed/2
]).
%% gen_server callbacks
@ -61,13 +59,14 @@
-export_type([metrics/0]).
-type rate() :: #{
current => float(),
max => float(),
last5m => float()
}.
-type metrics() :: #{
matched => integer(),
success => integer(),
failed => integer(),
rate => float(),
rate_max => float(),
rate_last5m => float()
counters => #{atom() => integer()},
rate => #{atom() => rate()}
}.
-type handler_name() :: atom().
-type metric_id() :: binary().
@ -107,35 +106,41 @@ child_spec(Name) ->
, modules => [emqx_plugin_libs_metrics]
}.
-spec(create_metrics(handler_name(), metric_id()) -> ok).
create_metrics(Name, Id) ->
gen_server:call(Name, {create_metrics, Id}).
-spec(create_metrics(handler_name(), metric_id(), [atom()]) -> ok).
create_metrics(Name, Id, Metrics) ->
create_metrics(Name, Id, Metrics, Metrics).
-spec(create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok).
create_metrics(Name, Id, Metrics, RateMetrics) ->
gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}).
-spec(clear_metrics(handler_name(), metric_id()) -> ok).
clear_metrics(Name, Id) ->
gen_server:call(Name, {delete_metrics, Id}).
-spec(get(handler_name(), metric_id(), atom()) -> number()).
-spec(get(handler_name(), metric_id(), atom() | integer()) -> number()).
get(Name, Id, Metric) ->
case get_couters_ref(Name, Id) of
case get_ref(Name, Id) of
not_found -> 0;
Ref -> counters:get(Ref, metrics_idx(Metric))
Ref when is_atom(Metric) ->
counters:get(Ref, idx_metric(Name, Id, Metric));
Ref when is_integer(Metric) ->
counters:get(Ref, Metric)
end.
-spec(get_rate(handler_name(), metric_id()) -> map()).
get_rate(Name, Id) ->
gen_server:call(Name, {get_rate, Id}).
-spec(get_all_counters(handler_name(), metric_id()) -> map()).
get_all_counters(Name, Id) ->
maps:map(fun(_Metric, Index) ->
get(Name, Id, Index)
end, get_indexes(Name, Id)).
-spec(get_metrics(handler_name(), metric_id()) -> metrics()).
get_metrics(Name, Id) ->
#{max := Max, current := Current, last5m := Last5M} = get_rate(Name, Id),
#{matched => get_matched(Name, Id),
success => get_success(Name, Id),
failed => get_failed(Name, Id),
rate => Current,
rate_max => Max,
rate_last5m => Last5M
}.
#{rate => get_rate(Name, Id), counters => get_all_counters(Name, Id)}.
-spec inc(handler_name(), metric_id(), atom()) -> ok.
inc(Name, Id, Metric) ->
@ -143,34 +148,14 @@ inc(Name, Id, Metric) ->
-spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok.
inc(Name, Id, Metric, Val) ->
case get_couters_ref(Name, Id) of
not_found ->
%% this may occur when increasing a counter for
%% a rule that was created from a remove node.
create_metrics(Name, Id),
counters:add(get_couters_ref(Name, Id), metrics_idx(Metric), Val);
Ref ->
counters:add(Ref, metrics_idx(Metric), Val)
end.
counters:add(get_ref(Name, Id), idx_metric(Name, Id,Metric), Val).
inc_matched(Name, Id) ->
inc(Name, Id, 'matched', 1).
inc_success(Name, Id) ->
inc(Name, Id, 'success', 1).
inc_failed(Name, Id) ->
inc(Name, Id, 'failed', 1).
get_matched(Name, Id) ->
get(Name, Id, 'matched').
get_success(Name, Id) ->
get(Name, Id, 'success').
get_failed(Name, Id) ->
get(Name, Id, 'failed').
start_link(Name) ->
gen_server:start_link({local, Name}, ?MODULE, Name, []).
@ -181,22 +166,22 @@ init(Name) ->
persistent_term:put(?CntrRef(Name), #{}),
{ok, #state{}}.
handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) ->
{reply, format_rate(#rate{}), State};
handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) ->
{reply, case maps:get(Id, Rates, undefined) of
undefined -> format_rate(#rate{});
Rate -> format_rate(Rate)
undefined -> #{};
RatesPerId -> format_rates_of_id(RatesPerId)
end, State};
handle_call({create_metrics, Id}, _From,
handle_call({create_metrics, Id, Metrics, RateMetrics}, _From,
State = #state{metric_ids = MIDs, rates = Rates}) ->
{reply, create_counters(get_self_name(), Id),
RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]),
Rate1= case Rates of
undefined -> #{Id => RatePerId};
_ -> Rates#{Id => RatePerId}
end,
{reply, create_counters(get_self_name(), Id, Metrics),
State#state{metric_ids = sets:add_element(Id, MIDs),
rates = case Rates of
undefined -> #{Id => #rate{}};
_ -> Rates#{Id => #rate{}}
end}};
rates = Rate1}};
handle_call({delete_metrics, Id}, _From,
State = #state{metric_ids = MIDs, rates = Rates}) ->
@ -218,9 +203,11 @@ handle_info(ticking, State = #state{rates = undefined}) ->
{noreply, State};
handle_info(ticking, State = #state{rates = Rates0}) ->
Rates = maps:map(
fun(Id, Rate) ->
calculate_rate(get_matched(get_self_name(), Id), Rate)
Rates =
maps:map(fun(Id, RatesPerID) ->
maps:map(fun(Metric, Rate) ->
calculate_rate(get(get_self_name(), Id, Metric), Rate)
end, RatesPerID)
end, Rates0),
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
{noreply, State#state{rates = Rates}};
@ -243,22 +230,38 @@ stop(Name) ->
%% Internal Functions
%%------------------------------------------------------------------------------
create_counters(Name, Id) ->
case get_couters_ref(Name, Id) of
not_found ->
Counters = get_all_counters(Name),
CntrRef = counters:new(max_counters_size(), [write_concurrency]),
persistent_term:put(?CntrRef(Name), Counters#{Id => CntrRef});
_Ref -> ok
end.
create_counters(_Name, _Id, []) ->
error({create_counter_error, must_provide_a_list_of_metrics});
create_counters(Name, Id, Metrics) ->
Size = length(Metrics),
Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))),
Counters = get_counters(Name),
CntrRef = counters:new(Size, [write_concurrency]),
persistent_term:put(?CntrRef(Name),
Counters#{Id => #{ref => CntrRef, indexes => Indexes}}).
delete_counters(Name, Id) ->
persistent_term:put(?CntrRef(Name), maps:remove(Id, get_all_counters(Name))).
persistent_term:put(?CntrRef(Name), maps:remove(Id, get_counters(Name))).
get_couters_ref(Name, Id) ->
maps:get(Id, get_all_counters(Name), not_found).
get_ref(Name, Id) ->
case maps:find(Id, get_counters(Name)) of
{ok, #{ref := Ref}} -> Ref;
error -> not_found
end.
get_all_counters(Name) ->
idx_metric(Name, Id, Metric) ->
case get_indexes(Name, Id) of
not_found -> not_found;
Indexes -> maps:get(Metric, Indexes, not_found)
end.
get_indexes(Name, Id) ->
case maps:find(Id, get_counters(Name)) of
{ok, #{indexes := Indexes}} -> Indexes;
error -> not_found
end.
get_counters(Name) ->
persistent_term:get(?CntrRef(Name), #{}).
calculate_rate(_CurrVal, undefined) ->
@ -292,8 +295,13 @@ calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal,
last_v = CurrVal, last5m_acc = Acc5Min,
last5m_smpl = Last5MinSamples, tick = Tick + 1}.
format_rates_of_id(RatesPerId) ->
maps:map(fun(_Metric, Rates) ->
format_rate(Rates)
end, RatesPerId).
format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) ->
#{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
#{max => precision(Max, 2), current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
precision(Float, N) ->
Base = math:pow(10, N),
@ -302,14 +310,3 @@ precision(Float, N) ->
get_self_name() ->
{registered_name, Name} = process_info(self(), registered_name),
Name.
%%------------------------------------------------------------------------------
%% Metrics Definitions
%%------------------------------------------------------------------------------
max_counters_size() -> 32.
metrics_idx('matched') -> 1;
metrics_idx('success') -> 2;
metrics_idx('failed') -> 3;
metrics_idx(_) -> 32.

View File

@ -23,22 +23,11 @@
-include_lib("common_test/include/ct.hrl").
all() ->
[ {group, metrics}
, {group, rate} ].
emqx_common_test_helpers:all(?MODULE).
suite() ->
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
groups() ->
[{metrics, [sequence],
[ t_rule
, t_no_creation_1
]},
{rate, [sequence],
[ rule_rate
]}
].
-define(NAME, ?MODULE).
init_per_suite(Config) ->
@ -59,12 +48,66 @@ init_per_testcase(_, Config) ->
end_per_testcase(_, _Config) ->
ok.
t_no_creation_1(_) ->
?assertEqual(ok, emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched')).
t_get_metrics(_) ->
Metrics = [a, b, c],
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics),
%% all the metrics are set to zero at start
?assertMatch(#{
rate := #{
a := #{current := 0.0, max := 0.0, last5m := 0.0},
b := #{current := 0.0, max := 0.0, last5m := 0.0},
c := #{current := 0.0, max := 0.0, last5m := 0.0}
},
counters := #{
a := 0,
b := 0,
c := 0
}
}, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
ct:sleep(1500),
?LET(#{
rate := #{
a := #{current := CurrA, max := MaxA, last5m := _},
b := #{current := CurrB, max := MaxB, last5m := _},
c := #{current := CurrC, max := MaxC, last5m := _}
},
counters := #{
a := 1,
b := 1,
c := 2
}
}, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>),
{?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0),
?assert(MaxA > 0), ?assert(MaxB > 0), ?assert(MaxC > 0)}),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
t_rule(_) ->
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>),
t_get_metrics_2(_) ->
Metrics = [a, b, c],
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics,
[a]),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c),
?assertMatch(#{
rate := Rate = #{
a := #{current := _, max := _, last5m := _}
},
counters := #{
a := 1,
b := 1,
c := 1
}
} when map_size(Rate) =:= 1, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>).
t_inc_matched(_) ->
Metrics = ['rules.matched'],
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, Metrics),
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>, Metrics),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
@ -74,20 +117,20 @@ t_rule(_) ->
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>).
rule_rate(_) ->
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>),
t_rate(_) ->
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, ['rules.matched']),
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>, ['rules.matched']),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'),
?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
ct:sleep(1000),
?LET(#{max := Max, current := Current},
?LET(#{'rules.matched' := #{max := Max, current := Current}},
emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2),
?assert(Current =< 2)}),
ct:sleep(2100),
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
?LET(#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>),
{?assert(Max =< 2),
?assert(Current == 0),
?assert(Last5Min =< 0.67)}),