From 4c149f92c164df4cfbd603024ab5fd0d3c319787 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 22 Nov 2021 15:09:52 +0800 Subject: [PATCH 1/4] feat(bridge): support metrics for bridges --- .../src/emqx_plugin_libs_metrics.erl} | 224 +++++++++++------- .../test/emqx_plugin_libs_metrics_SUITE.erl | 96 ++++++++ .../emqx_rule_engine/src/emqx_rule_engine.erl | 4 +- .../src/emqx_rule_engine_api.erl | 14 +- .../src/emqx_rule_engine_sup.erl | 7 +- .../src/emqx_rule_runtime.erl | 4 +- .../src/emqx_rule_sqltester.erl | 4 +- .../test/emqx_rule_metrics_SUITE.erl | 94 -------- 8 files changed, 248 insertions(+), 199 deletions(-) rename apps/{emqx_rule_engine/src/emqx_rule_metrics.erl => emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl} (50%) create mode 100644 apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl delete mode 100644 apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl similarity index 50% rename from apps/emqx_rule_engine/src/emqx_rule_metrics.erl rename to apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index 990911801..faddb6872 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -14,29 +14,31 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_rule_metrics). +-module(emqx_plugin_libs_metrics). -behaviour(gen_server). --include("rule_engine.hrl"). - %% API functions --export([ start_link/0 - , stop/0 +-export([ start_link/1 + , stop/1 + , child_spec/1 ]). --export([ get_rules_matched/1 +-export([ inc/3 + , inc/4 + , get/3 + , get_speed/2 + , create_metrics/2 + , clear_metrics/2 ]). --export([ inc/2 - , inc/3 - , get/2 - , get_rule_speed/1 - , create_rule_metrics/1 - , clear_rule_metrics/1 - ]). - --export([ get_rule_metrics/1 +-export([ get_metrics/2 + , get_matched/2 + , get_success/2 + , get_failed/2 + , inc_matched/2 + , inc_success/2 + , inc_failed/2 ]). %% gen_server callbacks @@ -57,10 +59,14 @@ -define(SAMPLING, 1). -endif. --define(CntrRef, ?MODULE). +-type handler_name() :: atom(). +-type metric_id() :: binary(). + +-define(CntrRef(Name), {?MODULE, Name}). -define(SAMPCOUNT_5M, (?SECS_5M div ?SAMPLING)). --record(rule_speed, { +%% the speed of 'matched' +-record(speed, { max = 0 :: number(), current = 0 :: number(), last5m = 0 :: number(), @@ -74,94 +80,121 @@ -record(state, { metric_ids = sets:new(), - rule_speeds :: undefined | #{rule_id() => #rule_speed{}} + speeds :: undefined | #{metric_id() => #speed{}} }). %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ --spec(create_rule_metrics(rule_id()) -> ok). -create_rule_metrics(Id) -> - gen_server:call(?MODULE, {create_rule_metrics, Id}). +-spec(child_spec(handler_name()) -> supervisor:child_spec()). +child_spec(Name) -> + #{ id => emqx_plugin_libs_metrics + , start => {emqx_plugin_libs_metrics, start_link, [Name]} + , restart => permanent + , shutdown => 5000 + , type => worker + , modules => [emqx_plugin_libs_metrics] + }. --spec(clear_rule_metrics(rule_id()) -> ok). -clear_rule_metrics(Id) -> - gen_server:call(?MODULE, {delete_rule_metrics, Id}). +-spec(create_metrics(handler_name(), metric_id()) -> ok). +create_metrics(Name, Id) -> + gen_server:call(Name, {create_metrics, Id}). --spec(get(rule_id(), atom()) -> number()). -get(Id, Metric) -> - case get_couters_ref(Id) of +-spec(clear_metrics(handler_name(), metric_id()) -> ok). +clear_metrics(Name, Id) -> + gen_server:call(Name, {delete_metrics, Id}). + +-spec(get(handler_name(), metric_id(), atom()) -> number()). +get(Name, Id, Metric) -> + case get_couters_ref(Name, Id) of not_found -> 0; Ref -> counters:get(Ref, metrics_idx(Metric)) end. --spec(get_rule_speed(rule_id()) -> map()). -get_rule_speed(Id) -> - gen_server:call(?MODULE, {get_rule_speed, Id}). +-spec(get_speed(handler_name(), metric_id()) -> map()). +get_speed(Name, Id) -> + gen_server:call(Name, {get_speed, Id}). --spec(get_rule_metrics(rule_id()) -> map()). -get_rule_metrics(Id) -> - #{max := Max, current := Current, last5m := Last5M} = get_rule_speed(Id), - #{matched => get_rules_matched(Id), +-spec(get_metrics(handler_name(), metric_id()) -> map()). +get_metrics(Name, Id) -> + #{max := Max, current := Current, last5m := Last5M} = get_speed(Name, Id), + #{matched => get_matched(Name, Id), + success => get_success(Name, Id), + failed => get_failed(Name, Id), speed => Current, speed_max => Max, speed_last5m => Last5M }. --spec inc(rule_id(), atom()) -> ok. -inc(Id, Metric) -> - inc(Id, Metric, 1). +-spec inc(handler_name(), metric_id(), atom()) -> ok. +inc(Name, Id, Metric) -> + inc(Name, Id, Metric, 1). --spec inc(rule_id(), atom(), pos_integer()) -> ok. -inc(Id, Metric, Val) -> - case get_couters_ref(Id) of +-spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok. +inc(Name, Id, Metric, Val) -> + case get_couters_ref(Name, Id) of not_found -> %% this may occur when increasing a counter for %% a rule that was created from a remove node. - create_rule_metrics(Id), - counters:add(get_couters_ref(Id), metrics_idx(Metric), Val); + create_metrics(Name, Id), + counters:add(get_couters_ref(Name, Id), metrics_idx(Metric), Val); Ref -> counters:add(Ref, metrics_idx(Metric), Val) end. -get_rules_matched(Id) -> - get(Id, 'rules.matched'). +inc_matched(Name, Id) -> + inc(Name, Id, 'matched', 1). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +inc_success(Name, Id) -> + inc(Name, Id, 'success', 1). -init([]) -> +inc_failed(Name, Id) -> + inc(Name, Id, 'failed', 1). + +get_matched(Name, Id) -> + get(Name, Id, 'matched'). + +get_success(Name, Id) -> + get(Name, Id, 'success'). + +get_failed(Name, Id) -> + get(Name, Id, 'failed'). + +start_link(Name) -> + gen_server:start_link({local, Name}, ?MODULE, Name, []). + +init(Name) -> erlang:process_flag(trap_exit, true), %% the speed metrics erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), - persistent_term:put(?CntrRef, #{}), + persistent_term:put(?CntrRef(Name), #{}), {ok, #state{}}. -handle_call({get_rule_speed, _Id}, _From, State = #state{rule_speeds = undefined}) -> - {reply, format_rule_speed(#rule_speed{}), State}; -handle_call({get_rule_speed, Id}, _From, State = #state{rule_speeds = RuleSpeeds}) -> - {reply, case maps:get(Id, RuleSpeeds, undefined) of - undefined -> format_rule_speed(#rule_speed{}); - Speed -> format_rule_speed(Speed) +handle_call({get_speed, _Id}, _From, State = #state{speeds = undefined}) -> + {reply, format_speed(#speed{}), State}; +handle_call({get_speed, Id}, _From, State = #state{speeds = Speeds}) -> + {reply, case maps:get(Id, Speeds, undefined) of + undefined -> format_speed(#speed{}); + Speed -> format_speed(Speed) end, State}; -handle_call({create_rule_metrics, Id}, _From, - State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) -> - {reply, create_counters(Id), +handle_call({create_metrics, Id}, _From, + State = #state{metric_ids = MIDs, speeds = Speeds}) -> + {reply, create_counters(get_self_name(), Id), State#state{metric_ids = sets:add_element(Id, MIDs), - rule_speeds = case RuleSpeeds of - undefined -> #{Id => #rule_speed{}}; - _ -> RuleSpeeds#{Id => #rule_speed{}} + speeds = case Speeds of + undefined -> #{Id => #speed{}}; + _ -> Speeds#{Id => #speed{}} end}}; -handle_call({delete_rule_metrics, Id}, _From, - State = #state{metric_ids = MIDs, rule_speeds = RuleSpeeds}) -> - {reply, delete_counters(Id), +handle_call({delete_metrics, Id}, _From, + State = #state{metric_ids = MIDs, speeds = Speeds}) -> + {reply, delete_counters(get_self_name(), Id), State#state{metric_ids = sets:del_element(Id, MIDs), - rule_speeds = case RuleSpeeds of + speeds = case Speeds of undefined -> undefined; - _ -> maps:remove(Id, RuleSpeeds) + _ -> maps:remove(Id, Speeds) end}}; handle_call(_Request, _From, State) -> @@ -170,17 +203,17 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(ticking, State = #state{rule_speeds = undefined}) -> +handle_info(ticking, State = #state{speeds = undefined}) -> erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), {noreply, State}; -handle_info(ticking, State = #state{rule_speeds = RuleSpeeds0}) -> - RuleSpeeds = maps:map( - fun(Id, RuleSpeed) -> - calculate_speed(get_rules_matched(Id), RuleSpeed) - end, RuleSpeeds0), +handle_info(ticking, State = #state{speeds = Speeds0}) -> + Speeds = maps:map( + fun(Id, Speed) -> + calculate_speed(get_matched(get_self_name(), Id), Speed) + end, Speeds0), erlang:send_after(timer:seconds(?SAMPLING), self(), ticking), - {noreply, State#state{rule_speeds = RuleSpeeds}}; + {noreply, State#state{speeds = Speeds}}; handle_info(_Info, State) -> {noreply, State}. @@ -189,37 +222,38 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, #state{metric_ids = MIDs}) -> - [delete_counters(Id) || Id <- sets:to_list(MIDs)], - persistent_term:erase(?CntrRef). + Name = get_self_name(), + [delete_counters(Name, Id) || Id <- sets:to_list(MIDs)], + persistent_term:erase(?CntrRef(Name)). -stop() -> - gen_server:stop(?MODULE). +stop(Name) -> + gen_server:stop(Name). %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ -create_counters(Id) -> - case get_couters_ref(Id) of +create_counters(Name, Id) -> + case get_couters_ref(Name, Id) of not_found -> - Counters = get_all_counters(), + Counters = get_all_counters(Name), CntrRef = counters:new(max_counters_size(), [write_concurrency]), - persistent_term:put(?CntrRef, Counters#{Id => CntrRef}); + persistent_term:put(?CntrRef(Name), Counters#{Id => CntrRef}); _Ref -> ok end. -delete_counters(Id) -> - persistent_term:put(?CntrRef, maps:remove(Id, get_all_counters())). +delete_counters(Name, Id) -> + persistent_term:put(?CntrRef(Name), maps:remove(Id, get_all_counters(Name))). -get_couters_ref(Id) -> - maps:get(Id, get_all_counters(), not_found). +get_couters_ref(Name, Id) -> + maps:get(Id, get_all_counters(Name), not_found). -get_all_counters() -> - persistent_term:get(?CntrRef, #{}). +get_all_counters(Name) -> + persistent_term:get(?CntrRef(Name), #{}). calculate_speed(_CurrVal, undefined) -> undefined; -calculate_speed(CurrVal, #rule_speed{max = MaxSpeed0, last_v = LastVal, +calculate_speed(CurrVal, #speed{max = MaxSpeed0, last_v = LastVal, tick = Tick, last5m_acc = AccSpeed5Min0, last5m_smpl = Last5MinSamples0}) -> %% calculate the current speed based on the last value of the counter @@ -244,22 +278,28 @@ calculate_speed(CurrVal, #rule_speed{max = MaxSpeed0, last_v = LastVal, Acc, Acc / ?SAMPCOUNT_5M} end, - #rule_speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min, + #speed{max = MaxSpeed, current = CurrSpeed, last5m = Last5Min, last_v = CurrVal, last5m_acc = Acc5Min, last5m_smpl = Last5MinSamples, tick = Tick + 1}. -format_rule_speed(#rule_speed{max = Max, current = Current, last5m = Last5Min}) -> +format_speed(#speed{max = Max, current = Current, last5m = Last5Min}) -> #{max => Max, current => precision(Current, 2), last5m => precision(Last5Min, 2)}. precision(Float, N) -> Base = math:pow(10, N), round(Float * Base) / Base. +get_self_name() -> + {registered_name, Name} = process_info(self(), registered_name), + Name. + %%------------------------------------------------------------------------------ %% Metrics Definitions %%------------------------------------------------------------------------------ -max_counters_size() -> 2. -metrics_idx('rules.matched') -> 1; -metrics_idx(_) -> 2. +max_counters_size() -> 32. +metrics_idx('matched') -> 1; +metrics_idx('success') -> 2; +metrics_idx('failed') -> 3; +metrics_idx(_) -> 32. diff --git a/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl new file mode 100644 index 000000000..3a74cd232 --- /dev/null +++ b/apps/emqx_plugin_libs/test/emqx_plugin_libs_metrics_SUITE.erl @@ -0,0 +1,96 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_plugin_libs_metrics_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [ {group, metrics} + , {group, speed} ]. + +suite() -> + [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. + +groups() -> + [{metrics, [sequence], + [ t_rule + , t_no_creation_1 + ]}, + {speed, [sequence], + [ rule_speed + ]} + ]. + +-define(NAME, ?MODULE). + +init_per_suite(Config) -> + emqx_common_test_helpers:start_apps([emqx_conf]), + {ok, _} = emqx_plugin_libs_metrics:start_link(?NAME), + Config. + +end_per_suite(_Config) -> + catch emqx_plugin_libs_metrics:stop(?NAME), + emqx_common_test_helpers:stop_apps([emqx_conf]), + ok. + +init_per_testcase(_, Config) -> + catch emqx_plugin_libs_metrics:stop(?NAME), + {ok, _} = emqx_plugin_libs_metrics:start_link(?NAME), + Config. + +end_per_testcase(_, _Config) -> + ok. + +t_no_creation_1(_) -> + ?assertEqual(ok, emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched')). + +t_rule(_) -> + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>), + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule2">>), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule2">>, 'rules.matched'), + ?assertEqual(1, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')), + ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule2">>, 'rules.matched')), + ?assertEqual(0, emqx_plugin_libs_metrics:get(?NAME, <<"rule3">>, 'rules.matched')), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule2">>). + +rule_speed(_) -> + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule1">>), + ok = emqx_plugin_libs_metrics:create_metrics(?NAME, <<"rule:2">>), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule1">>, 'rules.matched'), + ok = emqx_plugin_libs_metrics:inc(?NAME, <<"rule:2">>, 'rules.matched'), + ?assertEqual(2, emqx_plugin_libs_metrics:get(?NAME, <<"rule1">>, 'rules.matched')), + ct:sleep(1000), + ?LET(#{max := Max, current := Current}, + emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>), + {?assert(Max =< 2), + ?assert(Current =< 2)}), + ct:sleep(2100), + ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_plugin_libs_metrics:get_speed(?NAME, <<"rule1">>), + {?assert(Max =< 2), + ?assert(Current == 0), + ?assert(Last5Min =< 0.67)}), + ct:sleep(3000), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule1">>), + ok = emqx_plugin_libs_metrics:clear_metrics(?NAME, <<"rule:2">>). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index bcf0fa02a..974c6b8a4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -163,10 +163,10 @@ load_hooks_for_rule(#{from := Topics}) -> lists:foreach(fun emqx_rule_events:load/1, Topics). add_metrics_for_rule(#{id := Id}) -> - ok = emqx_rule_metrics:create_rule_metrics(Id). + ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, Id). clear_metrics_for_rule(#{id := Id}) -> - ok = emqx_rule_metrics:clear_rule_metrics(Id). + ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, Id). unload_hooks_for_rule(#{id := Id, from := Topics}) -> lists:foreach(fun(Topic) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 2250d0f93..9e341b388 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -338,7 +338,19 @@ do_format_output(BridgeChannelId) when is_binary(BridgeChannelId) -> BridgeChannelId. get_rule_metrics(Id) -> - [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_rule_metrics, [Id])) + Format = fun (Node, #{matched := Matched, + speed := Current, + speed_max := Max, + speed_last5m := Last5M + }) -> + #{ matched => Matched + , speed => Current + , speed_max => Max + , speed_last5m => Last5M + , node => Node + } + end, + [Format(Node, rpc:call(Node, emqx_plugin_libs_metrics, get_metrics, [rule_metrics, Id])) || Node <- mria_mnesia:running_nodes()]. get_one_rule(AllRules, Id) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl index 7fd44df82..356062c1f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl @@ -34,10 +34,5 @@ init([]) -> shutdown => 5000, type => worker, modules => [emqx_rule_engine]}, - Metrics = #{id => emqx_rule_metrics, - start => {emqx_rule_metrics, start_link, []}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_rule_metrics]}, + Metrics = emqx_plugin_libs_metrics:child_spec(rule_metrics), {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index dc162665c..7b68b3ee3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -99,7 +99,7 @@ do_apply_rule(#{ case ?RAISE(match_conditions(Conditions, ColumnsAndSelected), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'), + ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), Collection2 = filter_collection(Input, InCase, DoEach, Collection), {ok, [handle_output_list(Outputs, Coll, Input) || Coll <- Collection2]}; false -> @@ -117,7 +117,7 @@ do_apply_rule(#{id := RuleId, case ?RAISE(match_conditions(Conditions, maps:merge(Input, Selected)), {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> - ok = emqx_rule_metrics:inc(RuleId, 'rules.matched'), + ok = emqx_plugin_libs_metrics:inc_matched(rule_metrics, RuleId), {ok, handle_output_list(Outputs, Selected, Input)}; false -> {error, nomatch} diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index a67e62355..7cd9448db 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -41,7 +41,7 @@ test(#{sql := Sql, context := Context}) -> test_rule(Sql, Select, Context, EventTopics) -> RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]), - ok = emqx_rule_metrics:create_rule_metrics(RuleId), + ok = emqx_plugin_libs_metrics:create_metrics(rule_metrics, RuleId), Rule = #{ id => RuleId, sql => Sql, @@ -62,7 +62,7 @@ test_rule(Sql, Select, Context, EventTopics) -> {ok, Data} -> {ok, flatten(Data)}; {error, nomatch} -> {error, nomatch} after - emqx_rule_metrics:clear_rule_metrics(RuleId) + ok = emqx_plugin_libs_metrics:clear_metrics(rule_metrics, RuleId) end. get_selected_data(Selected, _Envs, _Args) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl deleted file mode 100644 index 418e8dd0f..000000000 --- a/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl +++ /dev/null @@ -1,94 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_rule_metrics_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - -all() -> - [ {group, metrics} - , {group, speed} ]. - -suite() -> - [{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}]. - -groups() -> - [{metrics, [sequence], - [ t_rule - , t_no_creation_1 - ]}, - {speed, [sequence], - [ rule_speed - ]} - ]. - -init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_conf]), - {ok, _} = emqx_rule_metrics:start_link(), - Config. - -end_per_suite(_Config) -> - catch emqx_rule_metrics:stop(), - emqx_common_test_helpers:stop_apps([emqx_conf]), - ok. - -init_per_testcase(_, Config) -> - catch emqx_rule_metrics:stop(), - {ok, _} = emqx_rule_metrics:start_link(), - Config. - -end_per_testcase(_, _Config) -> - ok. - -t_no_creation_1(_) -> - ?assertEqual(ok, emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched')). - -t_rule(_) -> - ok = emqx_rule_metrics:create_rule_metrics(<<"rule1">>), - ok = emqx_rule_metrics:create_rule_metrics(<<"rule2">>), - ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'), - ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'), - ok = emqx_rule_metrics:inc(<<"rule2">>, 'rules.matched'), - ct:pal("----couters: ---~p", [persistent_term:get(emqx_rule_metrics)]), - ?assertEqual(1, emqx_rule_metrics:get(<<"rule1">>, 'rules.matched')), - ?assertEqual(2, emqx_rule_metrics:get(<<"rule2">>, 'rules.matched')), - ?assertEqual(0, emqx_rule_metrics:get(<<"rule3">>, 'rules.matched')), - ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>), - ok = emqx_rule_metrics:clear_rule_metrics(<<"rule2">>). - -rule_speed(_) -> - ok = emqx_rule_metrics:create_rule_metrics(<<"rule1">>), - ok = emqx_rule_metrics:create_rule_metrics(<<"rule:2">>), - ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'), - ok = emqx_rule_metrics:inc(<<"rule1">>, 'rules.matched'), - ok = emqx_rule_metrics:inc(<<"rule:2">>, 'rules.matched'), - ?assertEqual(2, emqx_rule_metrics:get(<<"rule1">>, 'rules.matched')), - ct:sleep(1000), - ?LET(#{max := Max, current := Current}, emqx_rule_metrics:get_rule_speed(<<"rule1">>), - {?assert(Max =< 2), - ?assert(Current =< 2)}), - ct:sleep(2100), - ?LET(#{max := Max, current := Current, last5m := Last5Min}, emqx_rule_metrics:get_rule_speed(<<"rule1">>), - {?assert(Max =< 2), - ?assert(Current == 0), - ?assert(Last5Min =< 0.67)}), - ct:sleep(3000), - ok = emqx_rule_metrics:clear_rule_metrics(<<"rule1">>), - ok = emqx_rule_metrics:clear_rule_metrics(<<"rule:2">>). From 29ad6d215e5f925f6e132281b89235f6b5bb393f Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 22 Nov 2021 20:07:04 +0800 Subject: [PATCH 2/4] feat(resource): add metrics to emqx_resource --- apps/emqx_bridge/src/emqx_bridge.erl | 17 ++- apps/emqx_bridge/src/emqx_bridge_api.erl | 40 +++---- .../src/emqx_connector_mqtt.erl | 5 +- .../test/emqx_connector_api_SUITE.erl | 105 ++++++++++++++++-- apps/emqx_resource/include/emqx_resource.hrl | 2 +- apps/emqx_resource/src/emqx_resource.erl | 22 +++- .../src/emqx_resource_instance.erl | 9 +- apps/emqx_resource/src/emqx_resource_sup.erl | 7 +- 8 files changed, 159 insertions(+), 48 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index a1e436c61..01e5faf07 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -21,7 +21,6 @@ -export([post_config_update/5]). -export([ load_hook/0 - , reload_hook/0 , unload_hook/0 ]). @@ -55,22 +54,21 @@ -export([ config_key_path/0 ]). -reload_hook() -> - unload_hook(), - load_hook(). - load_hook() -> Bridges = emqx:get_config([bridges], #{}), + load_hook(Bridges). + +load_hook(Bridges) -> lists:foreach(fun({_Type, Bridge}) -> lists:foreach(fun({_Name, BridgeConf}) -> - load_hook(BridgeConf) + do_load_hook(BridgeConf) end, maps:to_list(Bridge)) end, maps:to_list(Bridges)). -load_hook(#{from_local_topic := _}) -> +do_load_hook(#{from_local_topic := _}) -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), ok; -load_hook(_Conf) -> ok. +do_load_hook(_Conf) -> ok. unload_hook() -> ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}). @@ -109,7 +107,8 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> {fun create/3, Added}, {fun update/3, Updated} ]), - ok = reload_hook(), + ok = unload_hook(), + ok = load_hook(NewConf), Result. perform_bridge_changes(Tasks) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index b520a183e..417fa49f7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -36,21 +36,21 @@ ". Bridge Ids must be of format :">>}} end). --define(METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX), - #{ +-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), + #{ matched => MATCH, success => SUCC, failed => FAILED, - rate => RATE, - rate_last5m => RATE_5, - rate_max => RATE_MAX + speed => RATE, + speed_last5m => RATE_5, + speed_max => RATE_MAX }). --define(MATCH_METRICS(SUCC, FAILED, RATE, RATE_5, RATE_MAX), - #{ +-define(metrics(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), + #{ matched := MATCH, success := SUCC, failed := FAILED, - rate := RATE, - rate_last5m := RATE_5, - rate_max := RATE_MAX + speed := RATE, + speed_last5m := RATE_5, + speed_max := RATE_MAX }). req_schema() -> @@ -73,11 +73,12 @@ status_schema() -> metrics_schema() -> #{ type => object , properties => #{ + matched => #{type => integer, example => "0"}, success => #{type => integer, example => "0"}, failed => #{type => integer, example => "0"}, - rate => #{type => number, format => float, example => "0.0"}, - rate_last5m => #{type => number, format => float, example => "0.0"}, - rate_max => #{type => number, format => float, example => "0.0"} + speed => #{type => number, format => float, example => "0.0"}, + speed_last5m => #{type => number, format => float, example => "0.0"}, + speed_max => #{type => number, format => float, example => "0.0"} } }. @@ -337,21 +338,22 @@ collect_metrics(Bridges) -> [maps:with([node, metrics], B) || B <- Bridges]. aggregate_metrics(AllMetrics) -> - InitMetrics = ?METRICS(0,0,0,0,0), - lists:foldl(fun(#{metrics := ?MATCH_METRICS(Succ1, Failed1, Rate1, Rate5m1, RateMax1)}, - ?MATCH_METRICS(Succ0, Failed0, Rate0, Rate5m0, RateMax0)) -> - ?METRICS(Succ1 + Succ0, Failed1 + Failed0, + InitMetrics = ?METRICS(0,0,0,0,0,0), + lists:foldl(fun(#{metrics := ?metrics(Match1, Succ1, Failed1, Rate1, Rate5m1, RateMax1)}, + ?metrics(Match0, Succ0, Failed0, Rate0, Rate5m0, RateMax0)) -> + ?METRICS(Match1 + Match0, Succ1 + Succ0, Failed1 + Failed0, Rate1 + Rate0, Rate5m1 + Rate5m0, RateMax1 + RateMax0) end, InitMetrics, AllMetrics). -format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) -> +format_resp(#{id := Id, raw_config := RawConf, + resource_data := #{mod := Mod, status := Status, metrics := Metrics}}) -> IsConnected = fun(started) -> connected; (_) -> disconnected end, RawConf#{ id => Id, node => node(), bridge_type => emqx_bridge:bridge_type(Mod), status => IsConnected(Status), - metrics => ?METRICS(0,0,0,0,0) + metrics => Metrics }. rpc_multicall(Func, Args) -> diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index d19f5b884..1acd8b298 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -127,10 +127,11 @@ on_stop(_InstId, #{name := InstanceId}) -> connector => InstanceId, reason => Reason}) end. -on_query(_InstId, {send_message, Msg}, _AfterQuery, #{name := InstanceId}) -> +on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> ?SLOG(debug, #{msg => "send msg to remote node", message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg). + emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), + emqx_resource:query_success(AfterQuery). on_health_check(_InstId, #{name := InstanceId} = State) -> case emqx_connector_mqtt_worker:ping(InstanceId) of diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index b163d1cf2..96f530563 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -25,7 +25,8 @@ -define(CONF_DEFAULT, <<"connectors: {}">>). -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(CONNECTR_ID, <<"mqtt:test_connector">>). --define(BRIDGE_ID, <<"mqtt:test_bridge">>). +-define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>). +-define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>). -define(MQTT_CONNECOTR(Username), #{ <<"server">> => <<"127.0.0.1:1883">>, @@ -37,7 +38,7 @@ -define(MQTT_CONNECOTR2(Server), ?MQTT_CONNECOTR(<<"user1">>)#{<<"server">> => Server}). --define(MQTT_BRIDGE(ID), +-define(MQTT_BRIDGE_INGRESS(ID), #{ <<"connector">> => ID, <<"direction">> => <<"ingress">>, @@ -49,6 +50,22 @@ <<"retain">> => <<"${retain}">> }). +-define(MQTT_BRIDGE_EGRESS(ID), +#{ + <<"connector">> => ID, + <<"direction">> => <<"egress">>, + <<"from_local_topic">> => <<"local_topic/#">>, + <<"to_remote_topic">> => <<"remote_topic/${topic}">>, + <<"payload">> => <<"${payload}">>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => <<"${retain}">> +}). + +-define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), + #{<<"matched">> := MATCH, <<"success">> := SUCC, + <<"failed">> := FAILED, <<"speed">> := SPEED, + <<"speed_last5m">> := SPEED5M, <<"speed_max">> := SPEEDMAX}). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -162,7 +179,7 @@ t_mqtt_crud_apis(_) -> }, jsx:decode(ErrMsg2)), ok. -t_mqtt_conn_bridge(_) -> +t_mqtt_conn_bridge_ingress(_) -> %% assert we there's no connectors and no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -184,10 +201,10 @@ t_mqtt_conn_bridge(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}), + ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_INGRESS}), %ct:pal("---bridge: ~p", [Bridge]), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS , <<"bridge_type">> := <<"mqtt">> , <<"status">> := <<"connected">> , <<"connector">> := ?CONNECTR_ID @@ -217,7 +234,77 @@ t_mqtt_conn_bridge(_) -> end), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% delete the connector + {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []), + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), + ok. + +t_mqtt_conn_bridge_egress(_) -> + %% assert we there's no connectors and no bridges at first + {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + %% then we add a mqtt connector, using POST + User1 = <<"user1">>, + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + + %ct:pal("---connector: ~p", [Connector]), + ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + , <<"server">> := <<"127.0.0.1:1883">> + , <<"username">> := User1 + , <<"password">> := <<"">> + , <<"proto_ver">> := <<"v4">> + , <<"ssl">> := #{<<"enable">> := false} + }, jsx:decode(Connector)), + + %% ... and a MQTT bridge, using POST + %% we bind this bridge to the connector created just now + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}), + + %ct:pal("---bridge: ~p", [Bridge]), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + , <<"bridge_type">> := <<"mqtt">> + , <<"status">> := <<"connected">> + , <<"connector">> := ?CONNECTR_ID + }, jsx:decode(Bridge)), + + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS + , <<"metrics">> := ?metrics(1, 1, 0, _, _, _) + , <<"node_metrics">> := + [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] + }, jsx:decode(BridgeStr)), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector @@ -245,8 +332,8 @@ t_mqtt_conn_update(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID}), - ?assertMatch(#{ <<"id">> := ?BRIDGE_ID + ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}), + ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS , <<"bridge_type">> := <<"mqtt">> , <<"status">> := <<"connected">> , <<"connector">> := ?CONNECTR_ID @@ -260,7 +347,7 @@ t_mqtt_conn_update(_) -> {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]), ?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), %% delete the connector diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index b2613ffe1..2c3b440c8 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -28,7 +28,7 @@ status => started | stopped }. -type resource_group() :: binary(). --type after_query() :: {OnSuccess :: after_query_fun(), OnFailed :: after_query_fun()} | +-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. %% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index e6dab38fa..b062e83ae 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -122,13 +122,18 @@ is_resource_mod(Module) -> -spec query_success(after_query()) -> ok. query_success(undefined) -> ok; -query_success({{OnSucc, Args}, _}) -> - safe_apply(OnSucc, Args). +query_success({OnSucc, _}) -> + apply_query_after_calls(OnSucc). -spec query_failed(after_query()) -> ok. query_failed(undefined) -> ok; -query_failed({_, {OnFailed, Args}}) -> - safe_apply(OnFailed, Args). +query_failed({_, OnFailed}) -> + apply_query_after_calls(OnFailed). + +apply_query_after_calls(Funcs) -> + lists:foreach(fun({Fun, Args}) -> + safe_apply(Fun, Args) + end, Funcs). %% ================================================================================= %% APIs for resource instances @@ -175,7 +180,7 @@ remove_local(InstId) -> %% ================================================================================= -spec query(instance_id(), Request :: term()) -> Result :: term(). query(InstId, Request) -> - query(InstId, Request, undefined). + query(InstId, Request, inc_metrics_funcs(InstId)). %% same to above, also defines what to do when the Module:on_query success or failed %% it is the duty of the Module to apply the `after_query()` functions. @@ -321,6 +326,13 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. +inc_metrics_funcs(InstId) -> + OnFailed = [{fun emqx_plugin_libs_metrics:inc_failed/2, [resource_metrics, InstId]}], + OnSucc = [ {fun emqx_plugin_libs_metrics:inc_matched/2, [resource_metrics, InstId]} + , {fun emqx_plugin_libs_metrics:inc_success/2, [resource_metrics, InstId]} + ], + {OnSucc, OnFailed}. + call_instance(InstId, Query) -> emqx_resource_instance:hash_call(InstId, Query). diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index e35675962..eaf6db0b2 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -24,6 +24,7 @@ %% load resource instances from *.conf files -export([ lookup/1 + , get_metrics/1 , list_all/0 , create_local/3 ]). @@ -65,9 +66,13 @@ hash_call(InstId, Request, Timeout) -> lookup(InstId) -> case ets:lookup(emqx_resource_instance, InstId) of [] -> {error, not_found}; - [{_, Data}] -> {ok, Data#{id => InstId}} + [{_, Data}] -> + {ok, Data#{id => InstId, metrics => get_metrics(InstId)}} end. +get_metrics(InstId) -> + emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). + force_lookup(InstId) -> {ok, Data} = lookup(InstId), Data. @@ -174,6 +179,7 @@ do_create(InstId, ResourceType, Config) -> #{mod => ResourceType, config => Config, state => ResourceState, status => stopped}}), _ = do_health_check(InstId), + ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), {ok, force_lookup(InstId)}; {error, Reason} -> logger:error("start ~ts resource ~ts failed: ~p", @@ -207,6 +213,7 @@ do_remove(InstId) -> do_remove(Mod, InstId, ResourceState) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), ets:delete(emqx_resource_instance, InstId), + ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId), ok. do_restart(InstId) -> diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 69d1acd20..534777b69 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -32,17 +32,20 @@ init([]) -> _ = ets:new(emqx_resource_instance, TabOpts), SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, + Metrics = emqx_plugin_libs_metrics:child_spec(resource_metrics), + Pool = ?RESOURCE_INST_MOD, Mod = ?RESOURCE_INST_MOD, ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]), - {ok, {SupFlags, [ + ResourceInsts = [ begin ensure_pool_worker(Pool, {Pool, Idx}, Idx), #{id => {Mod, Idx}, start => {Mod, start_link, [Pool, Idx]}, restart => transient, shutdown => 5000, type => worker, modules => [Mod]} - end || Idx <- lists:seq(1, ?POOL_SIZE)]}}. + end || Idx <- lists:seq(1, ?POOL_SIZE)], + {ok, {SupFlags, [Metrics | ResourceInsts]}}. %% internal functions ensure_pool(Pool, Type, Opts) -> From 9aa63358e6b11c229789b5af23b100cca9ebbbc8 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 23 Nov 2021 09:36:02 +0800 Subject: [PATCH 3/4] fix(resource): type spec for resource_data() --- .../src/emqx_plugin_libs_metrics.erl | 12 +++++++++++- apps/emqx_resource/include/emqx_resource.hrl | 11 ++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl index faddb6872..824890efc 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_metrics.erl @@ -59,6 +59,16 @@ -define(SAMPLING, 1). -endif. +-export_type([metrics/0]). + +-type metrics() :: #{ + matched => integer(), + success => integer(), + failed => integer(), + speed => float(), + speed_max => float(), + speed_last5m => float() +}. -type handler_name() :: atom(). -type metric_id() :: binary(). @@ -116,7 +126,7 @@ get(Name, Id, Metric) -> get_speed(Name, Id) -> gen_server:call(Name, {get_speed, Id}). --spec(get_metrics(handler_name(), metric_id()) -> map()). +-spec(get_metrics(handler_name(), metric_id()) -> metrics()). get_metrics(Name, Id) -> #{max := Max, current := Current, last5m := Last5M} = get_speed(Name, Id), #{matched => get_matched(Name, Id), diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 2c3b440c8..e5eb1785f 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -21,11 +21,12 @@ -type resource_spec() :: map(). -type resource_state() :: term(). -type resource_data() :: #{ - id => instance_id(), - mod => module(), - config => resource_config(), - state => resource_state(), - status => started | stopped + id := instance_id(), + mod := module(), + config := resource_config(), + state := resource_state(), + status := started | stopped, + metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | From 46838a08ccda11b9400933c8a3acebba373e9cb2 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 23 Nov 2021 10:41:45 +0800 Subject: [PATCH 4/4] fix(resource): update testcases for after_query functions --- apps/emqx_resource/test/emqx_resource_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 6a8a9476f..4f641c85a 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -88,7 +88,7 @@ t_query(_) -> Failure = fun() -> Pid ! failure end, #{pid := _} = emqx_resource:query(?ID, get_state), - #{pid := _} = emqx_resource:query(?ID, get_state, {{Success, []}, {Failure, []}}), + #{pid := _} = emqx_resource:query(?ID, get_state, {[{Success, []}], [{Failure, []}]}), receive Message -> ?assertEqual(success, Message)