From c1212c4b6db89f208cba9ba0233a324049650ed9 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 4 Jan 2022 19:38:02 +0800 Subject: [PATCH] refactor(metrics): improve the metrics for bridges/rules --- .../src/emqx_plugin_libs_metrics.erl | 177 +++++++++--------- .../test/emqx_plugin_libs_metrics_SUITE.erl | 87 ++++++--- 2 files changed, 152 insertions(+), 112 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index 2a46b470b..38e922bc2 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -28,17 +28,15 @@ , inc/4 , get/3 , get_rate/2 - , create_metrics/2 + , get_all_counters/2 + , create_metrics/3 + , create_metrics/4 , clear_metrics/2 ]). -export([ get_metrics/2 , get_matched/2 - , get_success/2 - , get_failed/2 , inc_matched/2 - , inc_success/2 - , inc_failed/2 ]). %% gen_server callbacks @@ -61,13 +59,14 @@ -export_type([metrics/0]). +-type rate() :: #{ + current => float(), + max => float(), + last5m => float() +}. -type metrics() :: #{ - matched => integer(), - success => integer(), - failed => integer(), - rate => float(), - rate_max => float(), - rate_last5m => float() + counters => #{atom() => integer()}, + rate => #{atom() => rate()} }. -type handler_name() :: atom(). -type metric_id() :: binary(). @@ -107,35 +106,41 @@ child_spec(Name) -> , modules => [emqx_plugin_libs_metrics] }. --spec(create_metrics(handler_name(), metric_id()) -> ok). -create_metrics(Name, Id) -> - gen_server:call(Name, {create_metrics, Id}). +-spec(create_metrics(handler_name(), metric_id(), [atom()]) -> ok). +create_metrics(Name, Id, Metrics) -> + create_metrics(Name, Id, Metrics, Metrics). + +-spec(create_metrics(handler_name(), metric_id(), [atom()], [atom()]) -> ok). +create_metrics(Name, Id, Metrics, RateMetrics) -> + gen_server:call(Name, {create_metrics, Id, Metrics, RateMetrics}). -spec(clear_metrics(handler_name(), metric_id()) -> ok). clear_metrics(Name, Id) -> gen_server:call(Name, {delete_metrics, Id}). --spec(get(handler_name(), metric_id(), atom()) -> number()). +-spec(get(handler_name(), metric_id(), atom() | integer()) -> number()). get(Name, Id, Metric) -> - case get_couters_ref(Name, Id) of + case get_ref(Name, Id) of not_found -> 0; - Ref -> counters:get(Ref, metrics_idx(Metric)) + Ref when is_atom(Metric) -> + counters:get(Ref, idx_metric(Name, Id, Metric)); + Ref when is_integer(Metric) -> + counters:get(Ref, Metric) end. -spec(get_rate(handler_name(), metric_id()) -> map()). get_rate(Name, Id) -> gen_server:call(Name, {get_rate, Id}). +-spec(get_all_counters(handler_name(), metric_id()) -> map()). +get_all_counters(Name, Id) -> + maps:map(fun(_Metric, Index) -> + get(Name, Id, Index) + end, get_indexes(Name, Id)). + -spec(get_metrics(handler_name(), metric_id()) -> metrics()). get_metrics(Name, Id) -> - #{max := Max, current := Current, last5m := Last5M} = get_rate(Name, Id), - #{matched => get_matched(Name, Id), - success => get_success(Name, Id), - failed => get_failed(Name, Id), - rate => Current, - rate_max => Max, - rate_last5m => Last5M - }. + #{rate => get_rate(Name, Id), counters => get_all_counters(Name, Id)}. -spec inc(handler_name(), metric_id(), atom()) -> ok. inc(Name, Id, Metric) -> @@ -143,34 +148,14 @@ inc(Name, Id, Metric) -> -spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok. inc(Name, Id, Metric, Val) -> - case get_couters_ref(Name, Id) of - not_found -> - %% this may occur when increasing a counter for - %% a rule that was created from a remove node. - create_metrics(Name, Id), - counters:add(get_couters_ref(Name, Id), metrics_idx(Metric), Val); - Ref -> - counters:add(Ref, metrics_idx(Metric), Val) - end. + counters:add(get_ref(Name, Id), idx_metric(Name, Id,Metric), Val). inc_matched(Name, Id) -> inc(Name, Id, 'matched', 1). -inc_success(Name, Id) -> - inc(Name, Id, 'success', 1). - -inc_failed(Name, Id) -> - inc(Name, Id, 'failed', 1). - get_matched(Name, Id) -> get(Name, Id, 'matched'). -get_success(Name, Id) -> - get(Name, Id, 'success'). - -get_failed(Name, Id) -> - get(Name, Id, 'failed'). - start_link(Name) -> gen_server:start_link({local, Name}, ?MODULE, Name, []). @@ -181,31 +166,31 @@ init(Name) -> persistent_term:put(?CntrRef(Name), #{}), {ok, #state{}}. -handle_call({get_rate, _Id}, _From, State = #state{rates = undefined}) -> - {reply, format_rate(#rate{}), State}; handle_call({get_rate, Id}, _From, State = #state{rates = Rates}) -> {reply, case maps:get(Id, Rates, undefined) of - undefined -> format_rate(#rate{}); - Rate -> format_rate(Rate) + undefined -> #{}; + RatesPerId -> format_rates_of_id(RatesPerId) end, State}; -handle_call({create_metrics, Id}, _From, +handle_call({create_metrics, Id, Metrics, RateMetrics}, _From, State = #state{metric_ids = MIDs, rates = Rates}) -> - {reply, create_counters(get_self_name(), Id), + RatePerId = maps:from_list([{M, #rate{}} || M <- RateMetrics]), + Rate1= case Rates of + undefined -> #{Id => RatePerId}; + _ -> Rates#{Id => RatePerId} + end, + {reply, create_counters(get_self_name(), Id, Metrics), State#state{metric_ids = sets:add_element(Id, MIDs), - rates = case Rates of - undefined -> #{Id => #rate{}}; - _ -> Rates#{Id => #rate{}} - end}}; + rates = Rate1}}; handle_call({delete_metrics, Id}, _From, State = #state{metric_ids = MIDs, rates = Rates}) -> {reply, delete_counters(get_self_name(), Id), State#state{metric_ids = sets:del_element(Id, MIDs), - rates = case Rates of - undefined -> undefined; - _ -> maps:remove(Id, Rates) - end}}; + rates = case Rates of + undefined -> undefined; + _ -> maps:remove(Id, Rates) + end}}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -218,10 +203,12 @@ handle_info(ticking, State = #state{rates = undefined}) -> {noreply, State}; handle_info(ticking, State = #state{rates = Rates0}) -> - Rates = maps:map( - fun(Id, Rate) -> - calculate_rate(get_matched(get_self_name(), Id), Rate) - end, Rates0), + Rates = + maps:map(fun(Id, RatesPerID) -> + maps:map(fun(Metric, Rate) -> + calculate_rate(get(get_self_name(), Id, Metric), Rate) + end, RatesPerID) + end, Rates0), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State#state{rates = Rates}}; @@ -243,29 +230,45 @@ stop(Name) -> %% Internal Functions %%------------------------------------------------------------------------------ -create_counters(Name, Id) -> - case get_couters_ref(Name, Id) of - not_found -> - Counters = get_all_counters(Name), - CntrRef = counters:new(max_counters_size(), [write_concurrency]), - persistent_term:put(?CntrRef(Name), Counters#{Id => CntrRef}); - _Ref -> ok - end. +create_counters(_Name, _Id, []) -> + error({create_counter_error, must_provide_a_list_of_metrics}); +create_counters(Name, Id, Metrics) -> + Size = length(Metrics), + Indexes = maps:from_list(lists:zip(Metrics, lists:seq(1, Size))), + Counters = get_counters(Name), + CntrRef = counters:new(Size, [write_concurrency]), + persistent_term:put(?CntrRef(Name), + Counters#{Id => #{ref => CntrRef, indexes => Indexes}}). delete_counters(Name, Id) -> - persistent_term:put(?CntrRef(Name), maps:remove(Id, get_all_counters(Name))). + persistent_term:put(?CntrRef(Name), maps:remove(Id, get_counters(Name))). -get_couters_ref(Name, Id) -> - maps:get(Id, get_all_counters(Name), not_found). +get_ref(Name, Id) -> + case maps:find(Id, get_counters(Name)) of + {ok, #{ref := Ref}} -> Ref; + error -> not_found + end. -get_all_counters(Name) -> +idx_metric(Name, Id, Metric) -> + case get_indexes(Name, Id) of + not_found -> not_found; + Indexes -> maps:get(Metric, Indexes, not_found) + end. + +get_indexes(Name, Id) -> + case maps:find(Id, get_counters(Name)) of + {ok, #{indexes := Indexes}} -> Indexes; + error -> not_found + end. + +get_counters(Name) -> persistent_term:get(?CntrRef(Name), #{}). calculate_rate(_CurrVal, undefined) -> undefined; calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal, - tick = Tick, last5m_acc = AccRate5Min0, - last5m_smpl = Last5MinSamples0}) -> + tick = Tick, last5m_acc = AccRate5Min0, + last5m_smpl = Last5MinSamples0}) -> %% calculate the current rate based on the last value of the counter CurrRate = (CurrVal - LastVal) / ?SAMPLING, @@ -292,8 +295,13 @@ calculate_rate(CurrVal, #rate{max = MaxRate0, last_v = LastVal, last_v = CurrVal, last5m_acc = Acc5Min, last5m_smpl = Last5MinSamples, tick = Tick + 1}. +format_rates_of_id(RatesPerId) -> + maps:map(fun(_Metric, Rates) -> + format_rate(Rates) + end, RatesPerId). + format_rate(#rate{max = Max, current = Current, last5m = Last5Min}) -> - #{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}. + #{max => precision(Max, 2), current => precision(Current, 2), last5m => precision(Last5Min, 2)}. precision(Float, N) -> Base = math:pow(10, N), @@ -302,14 +310,3 @@ precision(Float, N) -> get_self_name() -> {registered_name, Name} = process_info(self(), registered_name), Name. - -%%------------------------------------------------------------------------------ -%% Metrics Definitions -%%------------------------------------------------------------------------------ - -max_counters_size() -> 32. -metrics_idx('matched') -> 1; -metrics_idx('success') -> 2; -metrics_idx('failed') -> 3; -metrics_idx(_) -> 32. - diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl index 77ad03bfe..1fd4d9ca4 100644 --- a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl @@ -23,22 +23,11 @@ -include_lib("common_test/include/ct.hrl"). all() -> - [ {group, metrics} - , {group, rate} ]. + emqx_common_test_helpers:all(?MODULE). suite() -> [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. -groups() -> - [{metrics, [sequence], - [ t_rule - , t_no_creation_1 - ]}, - {rate, [sequence], - [ rule_rate - ]} - ]. - -define(NAME, ?MODULE). init_per_suite(Config) -> @@ -59,12 +48,66 @@ init_per_testcase(_, Config) -> end_per_testcase(_, _Config) -> ok. -t_no_creation_1(_) -> - ?assertEqual(ok, emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched')). +t_get_metrics(_) -> + Metrics = [a, b, c], + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics), + %% all the metrics are set to zero at start + ?assertMatch(#{ + rate := #{ + a := #{current := 0.0, max := 0.0, last5m := 0.0}, + b := #{current := 0.0, max := 0.0, last5m := 0.0}, + c := #{current := 0.0, max := 0.0, last5m := 0.0} + }, + counters := #{ + a := 0, + b := 0, + c := 0 + } + }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ct:sleep(1500), + ?LET(#{ + rate := #{ + a := #{current := CurrA, max := MaxA, last5m := _}, + b := #{current := CurrB, max := MaxB, last5m := _}, + c := #{current := CurrC, max := MaxC, last5m := _} + }, + counters := #{ + a := 1, + b := 1, + c := 2 + } + }, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>), + {?assert(CurrA > 0), ?assert(CurrB > 0), ?assert(CurrC > 0), + ?assert(MaxA > 0), ?assert(MaxB > 0), ?assert(MaxC > 0)}), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). -t_rule(_) -> - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>), - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>), +t_get_metrics_2(_) -> + Metrics = [a, b, c], + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"testid">>, Metrics, + [a]), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, a), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, b), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"testid">>, c), + ?assertMatch(#{ + rate := Rate = #{ + a := #{current := _, max := _, last5m := _} + }, + counters := #{ + a := 1, + b := 1, + c := 1 + } + } when map_size(Rate) =:= 1, emqx_plugin_libs_metrics:get_metrics(?NAME, <<"testid">>)), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"testid">>). + +t_inc_matched(_) -> + Metrics = ['rules.matched'], + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, Metrics), + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>, Metrics), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'), @@ -74,20 +117,20 @@ t_rule(_) -> ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>), ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>). -rule_rate(_) -> - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>), - ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>), +t_rate(_) -> + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>, ['rules.matched']), + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>, ['rules.matched']), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'), ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')), ct:sleep(1000), - ?LET(#{max := Max, current := Current}, + ?LET(#{'rules.matched' := #{max := Max, current := Current}}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), {?assert(Max =< 2), ?assert(Current =< 2)}), ct:sleep(2100), - ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), + ?LET(#{'rules.matched' := #{max := Max, current := Current, last5m := Last5Min}}, emqx_plugin_libs_metrics:get_rate(?NAME, <<"rule1">>), {?assert(Max =< 2), ?assert(Current == 0), ?assert(Last5Min =< 0.67)}),