feat(bridge): support metrics for bridges
This commit is contained in:
parent
50542ec441
commit
4c149f92c1
|
@ -14,29 +14,31 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_rule_metrics).
|
-module(emqx_plugin_libs_metrics).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include("rule_engine.hrl").
|
|
||||||
|
|
||||||
%% API functions
|
%% API functions
|
||||||
-export([ start_link/0
|
-export([ start_link/1
|
||||||
, stop/0
|
, stop/1
|
||||||
|
, child_spec/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ get_rules_matched/1
|
-export([ inc/3
|
||||||
|
, inc/4
|
||||||
|
, get/3
|
||||||
|
, get_speed/2
|
||||||
|
, create_metrics/2
|
||||||
|
, clear_metrics/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ inc/2
|
-export([ get_metrics/2
|
||||||
, inc/3
|
, get_matched/2
|
||||||
, get/2
|
, get_success/2
|
||||||
, get_rule_speed/1
|
, get_failed/2
|
||||||
, create_rule_metrics/1
|
, inc_matched/2
|
||||||
, clear_rule_metrics/1
|
, inc_success/2
|
||||||
]).
|
, inc_failed/2
|
||||||
|
|
||||||
-export([ get_rule_metrics/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -57,10 +59,14 @@
|
||||||
-define(SAMPLING, 1).
|
-define(SAMPLING, 1).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-define(CntrRef, ?MODULE).
|
-type handler_name() :: atom().
|
||||||
|
-type metric_id() :: binary().
|
||||||
|
|
||||||
|
-define(CntrRef(Name), {?MODULE, Name}).
|
||||||
-define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
|
-define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)).
|
||||||
|
|
||||||
-record(rule_speed, {
|
%% the speed of 'matched'
|
||||||
|
-record(speed, {
|
||||||
max = 0 :: number(),
|
max = 0 :: number(),
|
||||||
current = 0 :: number(),
|
current = 0 :: number(),
|
||||||
last5m = 0 :: number(),
|
last5m = 0 :: number(),
|
||||||
|
@ -74,94 +80,121 @@
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
metric_ids = sets:new(),
|
metric_ids = sets:new(),
|
||||||
rule_speeds :: undefined | #{rule_id() => #rule_speed{}}
|
speeds :: undefined | #{metric_id() => #speed{}}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(create_rule_metrics(rule_id()) -> ok).
|
-spec(child_spec(handler_name()) -> supervisor:child_spec()).
|
||||||
create_rule_metrics(Id) ->
|
child_spec(Name) ->
|
||||||
gen_server:call(?MODULE, {create_rule_metrics, Id}).
|
#{ id => emqx_plugin_libs_metrics
|
||||||
|
, start => {emqx_plugin_libs_metrics, start_link, [Name]}
|
||||||
|
, restart => permanent
|
||||||
|
, shutdown => 5000
|
||||||
|
, type => worker
|
||||||
|
, modules => [emqx_plugin_libs_metrics]
|
||||||
|
}.
|
||||||
|
|
||||||
-spec(clear_rule_metrics(rule_id()) -> ok).
|
-spec(create_metrics(handler_name(), metric_id()) -> ok).
|
||||||
clear_rule_metrics(Id) ->
|
create_metrics(Name, Id) ->
|
||||||
gen_server:call(?MODULE, {delete_rule_metrics, Id}).
|
gen_server:call(Name, {create_metrics, Id}).
|
||||||
|
|
||||||
-spec(get(rule_id(), atom()) -> number()).
|
-spec(clear_metrics(handler_name(), metric_id()) -> ok).
|
||||||
get(Id, Metric) ->
|
clear_metrics(Name, Id) ->
|
||||||
case get_couters_ref(Id) of
|
gen_server:call(Name, {delete_metrics, Id}).
|
||||||
|
|
||||||
|
-spec(get(handler_name(), metric_id(), atom()) -> number()).
|
||||||
|
get(Name, Id, Metric) ->
|
||||||
|
case get_couters_ref(Name, Id) of
|
||||||
not_found -> 0;
|
not_found -> 0;
|
||||||
Ref -> counters:get(Ref, metrics_idx(Metric))
|
Ref -> counters:get(Ref, metrics_idx(Metric))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(get_rule_speed(rule_id()) -> map()).
|
-spec(get_speed(handler_name(), metric_id()) -> map()).
|
||||||
get_rule_speed(Id) ->
|
get_speed(Name, Id) ->
|
||||||
gen_server:call(?MODULE, {get_rule_speed, Id}).
|
gen_server:call(Name, {get_speed, Id}).
|
||||||
|
|
||||||
-spec(get_rule_metrics(rule_id()) -> map()).
|
-spec(get_metrics(handler_name(), metric_id()) -> map()).
|
||||||
get_rule_metrics(Id) ->
|
get_metrics(Name, Id) ->
|
||||||
#{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id),
|
#{max := Max, current := Current, last5m := Last5M} = get_speed(Name, Id),
|
||||||
#{matched => get_rules_matched(Id),
|
#{matched => get_matched(Name, Id),
|
||||||
|
success => get_success(Name, Id),
|
||||||
|
failed => get_failed(Name, Id),
|
||||||
speed => Current,
|
speed => Current,
|
||||||
speed_max => Max,
|
speed_max => Max,
|
||||||
speed_last5m => Last5M
|
speed_last5m => Last5M
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-spec inc(rule_id(), atom()) -> ok.
|
-spec inc(handler_name(), metric_id(), atom()) -> ok.
|
||||||
inc(Id, Metric) ->
|
inc(Name, Id, Metric) ->
|
||||||
inc(Id, Metric, 1).
|
inc(Name, Id, Metric, 1).
|
||||||
|
|
||||||
-spec inc(rule_id(), atom(), pos_integer()) -> ok.
|
-spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok.
|
||||||
inc(Id, Metric, Val) ->
|
inc(Name, Id, Metric, Val) ->
|
||||||
case get_couters_ref(Id) of
|
case get_couters_ref(Name, Id) of
|
||||||
not_found ->
|
not_found ->
|
||||||
%% this may occur when increasing a counter for
|
%% this may occur when increasing a counter for
|
||||||
%% a rule that was created from a remove node.
|
%% a rule that was created from a remove node.
|
||||||
create_rule_metrics(Id),
|
create_metrics(Name, Id),
|
||||||
counters:add(get_couters_ref(Id), metrics_idx(Metric), Val);
|
counters:add(get_couters_ref(Name, Id), metrics_idx(Metric), Val);
|
||||||
Ref ->
|
Ref ->
|
||||||
counters:add(Ref, metrics_idx(Metric), Val)
|
counters:add(Ref, metrics_idx(Metric), Val)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_rules_matched(Id) ->
|
inc_matched(Name, Id) ->
|
||||||
get(Id, 'rules.matched').
|
inc(Name, Id, 'matched', 1).
|
||||||
|
|
||||||
start_link() ->
|
inc_success(Name, Id) ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
inc(Name, Id, 'success', 1).
|
||||||
|
|
||||||
init([]) ->
|
inc_failed(Name, Id) ->
|
||||||
|
inc(Name, Id, 'failed', 1).
|
||||||
|
|
||||||
|
get_matched(Name, Id) ->
|
||||||
|
get(Name, Id, 'matched').
|
||||||
|
|
||||||
|
get_success(Name, Id) ->
|
||||||
|
get(Name, Id, 'success').
|
||||||
|
|
||||||
|
get_failed(Name, Id) ->
|
||||||
|
get(Name, Id, 'failed').
|
||||||
|
|
||||||
|
start_link(Name) ->
|
||||||
|
gen_server:start_link({local, Name}, ?MODULE, Name, []).
|
||||||
|
|
||||||
|
init(Name) ->
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
%% the speed metrics
|
%% the speed metrics
|
||||||
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
||||||
persistent_term:put(?CntrRef, #{}),
|
persistent_term:put(?CntrRef(Name), #{}),
|
||||||
{ok, #state{}}.
|
{ok, #state{}}.
|
||||||
|
|
||||||
handle_call({get_rule_speed, _Id}, _From, State = #state{rule_speeds = undefined}) ->
|
handle_call({get_speed, _Id}, _From, State = #state{speeds = undefined}) ->
|
||||||
{reply, format_rule_speed(#rule_speed{}), State};
|
{reply, format_speed(#speed{}), State};
|
||||||
handle_call({get_rule_speed, Id}, _From, State = #state{rule_speeds = RuleSpeeds}) ->
|
handle_call({get_speed, Id}, _From, State = #state{speeds = Speeds}) ->
|
||||||
{reply, case maps:get(Id, RuleSpeeds, undefined) of
|
{reply, case maps:get(Id, Speeds, undefined) of
|
||||||
undefined -> format_rule_speed(#rule_speed{});
|
undefined -> format_speed(#speed{});
|
||||||
Speed -> format_rule_speed(Speed)
|
Speed -> format_speed(Speed)
|
||||||
end, State};
|
end, State};
|
||||||
|
|
||||||
handle_call({create_rule_metrics, Id}, _From,
|
handle_call({create_metrics, Id}, _From,
|
||||||
State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) ->
|
State = #state{metric_ids = MIDs, speeds = Speeds}) ->
|
||||||
{reply, create_counters(Id),
|
{reply, create_counters(get_self_name(), Id),
|
||||||
State#state{metric_ids = sets:add_element(Id, MIDs),
|
State#state{metric_ids = sets:add_element(Id, MIDs),
|
||||||
rule_speeds = case RuleSpeeds of
|
speeds = case Speeds of
|
||||||
undefined -> #{Id => #rule_speed{}};
|
undefined -> #{Id => #speed{}};
|
||||||
_ -> RuleSpeeds#{Id => #rule_speed{}}
|
_ -> Speeds#{Id => #speed{}}
|
||||||
end}};
|
end}};
|
||||||
|
|
||||||
handle_call({delete_rule_metrics, Id}, _From,
|
handle_call({delete_metrics, Id}, _From,
|
||||||
State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) ->
|
State = #state{metric_ids = MIDs, speeds = Speeds}) ->
|
||||||
{reply, delete_counters(Id),
|
{reply, delete_counters(get_self_name(), Id),
|
||||||
State#state{metric_ids = sets:del_element(Id, MIDs),
|
State#state{metric_ids = sets:del_element(Id, MIDs),
|
||||||
rule_speeds = case RuleSpeeds of
|
speeds = case Speeds of
|
||||||
undefined -> undefined;
|
undefined -> undefined;
|
||||||
_ -> maps:remove(Id, RuleSpeeds)
|
_ -> maps:remove(Id, Speeds)
|
||||||
end}};
|
end}};
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
|
@ -170,17 +203,17 @@ handle_call(_Request, _From, State) ->
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(ticking, State = #state{rule_speeds = undefined}) ->
|
handle_info(ticking, State = #state{speeds = undefined}) ->
|
||||||
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0}) ->
|
handle_info(ticking, State = #state{speeds = Speeds0}) ->
|
||||||
RuleSpeeds = maps:map(
|
Speeds = maps:map(
|
||||||
fun(Id, RuleSpeed) ->
|
fun(Id, Speed) ->
|
||||||
calculate_speed(get_rules_matched(Id), RuleSpeed)
|
calculate_speed(get_matched(get_self_name(), Id), Speed)
|
||||||
end, RuleSpeeds0),
|
end, Speeds0),
|
||||||
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
erlang:send_after(timer:seconds(?SAMPLING), self(), ticking),
|
||||||
{noreply, State#state{rule_speeds = RuleSpeeds}};
|
{noreply, State#state{speeds = Speeds}};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -189,37 +222,38 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
terminate(_Reason, #state{metric_ids = MIDs}) ->
|
terminate(_Reason, #state{metric_ids = MIDs}) ->
|
||||||
[delete_counters(Id) || Id <- sets:to_list(MIDs)],
|
Name = get_self_name(),
|
||||||
persistent_term:erase(?CntrRef).
|
[delete_counters(Name, Id) || Id <- sets:to_list(MIDs)],
|
||||||
|
persistent_term:erase(?CntrRef(Name)).
|
||||||
|
|
||||||
stop() ->
|
stop(Name) ->
|
||||||
gen_server:stop(?MODULE).
|
gen_server:stop(Name).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
create_counters(Id) ->
|
create_counters(Name, Id) ->
|
||||||
case get_couters_ref(Id) of
|
case get_couters_ref(Name, Id) of
|
||||||
not_found ->
|
not_found ->
|
||||||
Counters = get_all_counters(),
|
Counters = get_all_counters(Name),
|
||||||
CntrRef = counters:new(max_counters_size(), [write_concurrency]),
|
CntrRef = counters:new(max_counters_size(), [write_concurrency]),
|
||||||
persistent_term:put(?CntrRef, Counters#{Id => CntrRef});
|
persistent_term:put(?CntrRef(Name), Counters#{Id => CntrRef});
|
||||||
_Ref -> ok
|
_Ref -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
delete_counters(Id) ->
|
delete_counters(Name, Id) ->
|
||||||
persistent_term:put(?CntrRef, maps:remove(Id, get_all_counters())).
|
persistent_term:put(?CntrRef(Name), maps:remove(Id, get_all_counters(Name))).
|
||||||
|
|
||||||
get_couters_ref(Id) ->
|
get_couters_ref(Name, Id) ->
|
||||||
maps:get(Id, get_all_counters(), not_found).
|
maps:get(Id, get_all_counters(Name), not_found).
|
||||||
|
|
||||||
get_all_counters() ->
|
get_all_counters(Name) ->
|
||||||
persistent_term:get(?CntrRef, #{}).
|
persistent_term:get(?CntrRef(Name), #{}).
|
||||||
|
|
||||||
calculate_speed(_CurrVal, undefined) ->
|
calculate_speed(_CurrVal, undefined) ->
|
||||||
undefined;
|
undefined;
|
||||||
calculate_speed(CurrVal, #rule_speed{max = MaxSpeed0, last_v = LastVal,
|
calculate_speed(CurrVal, #speed{max = MaxSpeed0, last_v = LastVal,
|
||||||
tick = Tick, last5m_acc = AccSpeed5Min0,
|
tick = Tick, last5m_acc = AccSpeed5Min0,
|
||||||
last5m_smpl = Last5MinSamples0}) ->
|
last5m_smpl = Last5MinSamples0}) ->
|
||||||
%% calculate the current speed based on the last value of the counter
|
%% calculate the current speed based on the last value of the counter
|
||||||
|
@ -244,22 +278,28 @@ calculate_speed(CurrVal, #rule_speed{max = MaxSpeed0, last_v = LastVal,
|
||||||
Acc, Acc / ?SAMPCOUNT_5M}
|
Acc, Acc / ?SAMPCOUNT_5M}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
#rule_speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min,
|
#speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min,
|
||||||
last_v = CurrVal, last5m_acc = Acc5Min,
|
last_v = CurrVal, last5m_acc = Acc5Min,
|
||||||
last5m_smpl = Last5MinSamples, tick = Tick + 1}.
|
last5m_smpl = Last5MinSamples, tick = Tick + 1}.
|
||||||
|
|
||||||
format_rule_speed(#rule_speed{max = Max, current = Current, last5m = Last5Min}) ->
|
format_speed(#speed{max = Max, current = Current, last5m = Last5Min}) ->
|
||||||
#{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
|
#{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}.
|
||||||
|
|
||||||
precision(Float, N) ->
|
precision(Float, N) ->
|
||||||
Base = math:pow(10, N),
|
Base = math:pow(10, N),
|
||||||
round(Float * Base) / Base.
|
round(Float * Base) / Base.
|
||||||
|
|
||||||
|
get_self_name() ->
|
||||||
|
{registered_name, Name} = process_info(self(), registered_name),
|
||||||
|
Name.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Metrics Definitions
|
%% Metrics Definitions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
max_counters_size() -> 2.
|
max_counters_size() -> 32.
|
||||||
metrics_idx('rules.matched') -> 1;
|
metrics_idx('matched') -> 1;
|
||||||
metrics_idx(_) -> 2.
|
metrics_idx('success') -> 2;
|
||||||
|
metrics_idx('failed') -> 3;
|
||||||
|
metrics_idx(_) -> 32.
|
||||||
|
|
|
@ -0,0 +1,96 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_plugin_libs_metrics_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[ {group, metrics}
|
||||||
|
, {group, speed} ].
|
||||||
|
|
||||||
|
suite() ->
|
||||||
|
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
[{metrics, [sequence],
|
||||||
|
[ t_rule
|
||||||
|
, t_no_creation_1
|
||||||
|
]},
|
||||||
|
{speed, [sequence],
|
||||||
|
[ rule_speed
|
||||||
|
]}
|
||||||
|
].
|
||||||
|
|
||||||
|
-define(NAME, ?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||||
|
{ok, _} = emqx_plugin_libs_metrics:start_link(?NAME),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
catch emqx_plugin_libs_metrics:stop(?NAME),
|
||||||
|
emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(_, Config) ->
|
||||||
|
catch emqx_plugin_libs_metrics:stop(?NAME),
|
||||||
|
{ok, _} = emqx_plugin_libs_metrics:start_link(?NAME),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_no_creation_1(_) ->
|
||||||
|
?assertEqual(ok, emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched')).
|
||||||
|
|
||||||
|
t_rule(_) ->
|
||||||
|
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
|
||||||
|
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>),
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'),
|
||||||
|
?assertEqual(1, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
|
||||||
|
?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule2">>, 'rules.matched')),
|
||||||
|
?assertEqual(0, emqx_plugin_libs_metrics:get(?NAME, <<"rule3">>, 'rules.matched')),
|
||||||
|
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
|
||||||
|
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>).
|
||||||
|
|
||||||
|
rule_speed(_) ->
|
||||||
|
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>),
|
||||||
|
ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>),
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'),
|
||||||
|
ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'),
|
||||||
|
?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')),
|
||||||
|
ct:sleep(1000),
|
||||||
|
?LET(#{max := Max, current := Current},
|
||||||
|
emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>),
|
||||||
|
{?assert(Max =< 2),
|
||||||
|
?assert(Current =< 2)}),
|
||||||
|
ct:sleep(2100),
|
||||||
|
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>),
|
||||||
|
{?assert(Max =< 2),
|
||||||
|
?assert(Current == 0),
|
||||||
|
?assert(Last5Min =< 0.67)}),
|
||||||
|
ct:sleep(3000),
|
||||||
|
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>),
|
||||||
|
ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule:2">>).
|
|
@ -163,10 +163,10 @@ load_hooks_for_rule(#{from := Topics}) ->
|
||||||
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
lists:foreach(fun emqx_rule_events:load/1, Topics).
|
||||||
|
|
||||||
add_metrics_for_rule(#{id := Id}) ->
|
add_metrics_for_rule(#{id := Id}) ->
|
||||||
ok = emqx_rule_metrics:create_rule_metrics(Id).
|
ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id).
|
||||||
|
|
||||||
clear_metrics_for_rule(#{id := Id}) ->
|
clear_metrics_for_rule(#{id := Id}) ->
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(Id).
|
ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id).
|
||||||
|
|
||||||
unload_hooks_for_rule(#{id := Id, from := Topics}) ->
|
unload_hooks_for_rule(#{id := Id, from := Topics}) ->
|
||||||
lists:foreach(fun(Topic) ->
|
lists:foreach(fun(Topic) ->
|
||||||
|
|
|
@ -338,7 +338,19 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) ->
|
||||||
BridgeChannelId.
|
BridgeChannelId.
|
||||||
|
|
||||||
get_rule_metrics(Id) ->
|
get_rule_metrics(Id) ->
|
||||||
[maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id]))
|
Format = fun (Node, #{matched := Matched,
|
||||||
|
speed := Current,
|
||||||
|
speed_max := Max,
|
||||||
|
speed_last5m := Last5M
|
||||||
|
}) ->
|
||||||
|
#{ matched => Matched
|
||||||
|
, speed => Current
|
||||||
|
, speed_max => Max
|
||||||
|
, speed_last5m => Last5M
|
||||||
|
, node => Node
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
[Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id]))
|
||||||
|| Node <- mria_mnesia:running_nodes()].
|
|| Node <- mria_mnesia:running_nodes()].
|
||||||
|
|
||||||
get_one_rule(AllRules, Id) ->
|
get_one_rule(AllRules, Id) ->
|
||||||
|
|
|
@ -34,10 +34,5 @@ init([]) ->
|
||||||
shutdown => 5000,
|
shutdown => 5000,
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [emqx_rule_engine]},
|
modules => [emqx_rule_engine]},
|
||||||
Metrics = #{id => emqx_rule_metrics,
|
Metrics = emqx_plugin_libs_metrics:child_spec(rule_metrics),
|
||||||
start => {emqx_rule_metrics, start_link, []},
|
|
||||||
restart => permanent,
|
|
||||||
shutdown => 5000,
|
|
||||||
type => worker,
|
|
||||||
modules => [emqx_rule_metrics]},
|
|
||||||
{ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
|
{ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
|
||||||
|
|
|
@ -99,7 +99,7 @@ do_apply_rule(#{
|
||||||
case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
|
case ?RAISE(match_conditions(Conditions, ColumnsAndSelected),
|
||||||
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
|
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
||||||
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
Collection2 = filter_collection(Input, InCase, DoEach, Collection),
|
||||||
{ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]};
|
{ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]};
|
||||||
false ->
|
false ->
|
||||||
|
@ -117,7 +117,7 @@ do_apply_rule(#{id := RuleId,
|
||||||
case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
|
case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)),
|
||||||
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
{match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of
|
||||||
true ->
|
true ->
|
||||||
ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'),
|
ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId),
|
||||||
{ok, handle_output_list(Outputs, Selected, Input)};
|
{ok, handle_output_list(Outputs, Selected, Input)};
|
||||||
false ->
|
false ->
|
||||||
{error, nomatch}
|
{error, nomatch}
|
||||||
|
|
|
@ -41,7 +41,7 @@ test(#{sql := Sql, context := Context}) ->
|
||||||
|
|
||||||
test_rule(Sql, Select, Context, EventTopics) ->
|
test_rule(Sql, Select, Context, EventTopics) ->
|
||||||
RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
|
RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
|
||||||
ok = emqx_rule_metrics:create_rule_metrics(RuleId),
|
ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, RuleId),
|
||||||
Rule = #{
|
Rule = #{
|
||||||
id => RuleId,
|
id => RuleId,
|
||||||
sql => Sql,
|
sql => Sql,
|
||||||
|
@ -62,7 +62,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
|
||||||
{ok, Data} -> {ok, flatten(Data)};
|
{ok, Data} -> {ok, flatten(Data)};
|
||||||
{error, nomatch} -> {error, nomatch}
|
{error, nomatch} -> {error, nomatch}
|
||||||
after
|
after
|
||||||
emqx_rule_metrics:clear_rule_metrics(RuleId)
|
ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, RuleId)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_selected_data(Selected, _Envs, _Args) ->
|
get_selected_data(Selected, _Envs, _Args) ->
|
||||||
|
|
|
@ -1,94 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_rule_metrics_SUITE).
|
|
||||||
|
|
||||||
-compile(export_all).
|
|
||||||
-compile(nowarn_export_all).
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
|
||||||
-include_lib("common_test/include/ct.hrl").
|
|
||||||
|
|
||||||
all() ->
|
|
||||||
[ {group, metrics}
|
|
||||||
, {group, speed} ].
|
|
||||||
|
|
||||||
suite() ->
|
|
||||||
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
|
|
||||||
|
|
||||||
groups() ->
|
|
||||||
[{metrics, [sequence],
|
|
||||||
[ t_rule
|
|
||||||
, t_no_creation_1
|
|
||||||
]},
|
|
||||||
{speed, [sequence],
|
|
||||||
[ rule_speed
|
|
||||||
]}
|
|
||||||
].
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
|
||||||
emqx_common_test_helpers:start_apps([emqx_conf]),
|
|
||||||
{ok, _} = emqx_rule_metrics:start_link(),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
|
||||||
catch emqx_rule_metrics:stop(),
|
|
||||||
emqx_common_test_helpers:stop_apps([emqx_conf]),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
|
||||||
catch emqx_rule_metrics:stop(),
|
|
||||||
{ok, _} = emqx_rule_metrics:start_link(),
|
|
||||||
Config.
|
|
||||||
|
|
||||||
end_per_testcase(_, _Config) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
t_no_creation_1(_) ->
|
|
||||||
?assertEqual(ok, emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched')).
|
|
||||||
|
|
||||||
t_rule(_) ->
|
|
||||||
ok = emqx_rule_metrics:create_rule_metrics(<<"rule1">>),
|
|
||||||
ok = emqx_rule_metrics:create_rule_metrics(<<"rule2">>),
|
|
||||||
ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'),
|
|
||||||
ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'),
|
|
||||||
ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'),
|
|
||||||
ct:pal("----couters: ---~p", [persistent_term:get(emqx_rule_metrics)]),
|
|
||||||
?assertEqual(1, emqx_rule_metrics:get(<<"rule1">>, 'rules.matched')),
|
|
||||||
?assertEqual(2, emqx_rule_metrics:get(<<"rule2">>, 'rules.matched')),
|
|
||||||
?assertEqual(0, emqx_rule_metrics:get(<<"rule3">>, 'rules.matched')),
|
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>),
|
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule2">>).
|
|
||||||
|
|
||||||
rule_speed(_) ->
|
|
||||||
ok = emqx_rule_metrics:create_rule_metrics(<<"rule1">>),
|
|
||||||
ok = emqx_rule_metrics:create_rule_metrics(<<"rule:2">>),
|
|
||||||
ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'),
|
|
||||||
ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'),
|
|
||||||
ok = emqx_rule_metrics:inc(<<"rule:2">>, 'rules.matched'),
|
|
||||||
?assertEqual(2, emqx_rule_metrics:get(<<"rule1">>, 'rules.matched')),
|
|
||||||
ct:sleep(1000),
|
|
||||||
?LET(#{max := Max, current := Current}, emqx_rule_metrics:get_rule_speed(<<"rule1">>),
|
|
||||||
{?assert(Max =< 2),
|
|
||||||
?assert(Current =< 2)}),
|
|
||||||
ct:sleep(2100),
|
|
||||||
?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_rule_speed(<<"rule1">>),
|
|
||||||
{?assert(Max =< 2),
|
|
||||||
?assert(Current == 0),
|
|
||||||
?assert(Last5Min =< 0.67)}),
|
|
||||||
ct:sleep(3000),
|
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>),
|
|
||||||
ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:2">>).
|
|
Loading…
Reference in New Issue