From b1bcf362374950a2a246a8996004e8a79617686a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 4 Apr 2022 15:52:26 -0300 Subject: [PATCH 1/5] test: silence unused var warning --- apps/emqx_exhook/test/emqx_exhook_metrics_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_exhook/test/emqx_exhook_metrics_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_metrics_SUITE.erl index d26f37b4b..079a292ae 100644 --- a/apps/emqx_exhook/test/emqx_exhook_metrics_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_metrics_SUITE.erl @@ -59,7 +59,7 @@ init_per_suite(Cfg) -> emqx_common_test_helpers:start_apps([emqx_exhook]), Cfg. -end_per_suite(Cfg) -> +end_per_suite(_Cfg) -> meck:unload(emqx_exhook_demo_svr), meck:unload(emqx_exhook_mgr), emqx_exhook_demo_svr:stop(), From ed7035ec4179d6a7ed70cf8f24885308b1d1cbe4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 5 Apr 2022 09:06:24 -0300 Subject: [PATCH 2/5] style(bridge): please elvis checks --- .../test/emqx_rule_engine_SUITE.erl | 43 +++++++++++++------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 7700e305d..8254e182c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -277,11 +277,14 @@ t_get_rules_for_topic_2(_Config) -> make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]), make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>, [<<"simple/+/1">>]), make_simple_rule(<<"rule-debug-4">>, <<"select * from \"simple/1\"">>, [<<"simple/1">>]), - make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>, [<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]), - make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>, [<<"simple/2">>,<<"simple/3">>, <<"simple/4">>]) + make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>, + [<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]), + make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>, + [<<"simple/2">>,<<"simple/3">>, <<"simple/4">>]) ]), ?assertEqual(Len0+4, length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>))), - ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]), + ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, + <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]), ok. t_get_rules_with_same_event(_Config) -> @@ -297,14 +300,23 @@ t_get_rules_with_same_event(_Config) -> ok = insert_rules( [make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]), make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]), - make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]), - make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>, [<<"$events/client_disconnected">>]), - make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>, [<<"$events/session_subscribed">>]), - make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>, [<<"$events/session_unsubscribed">>]), - make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>, [<<"$events/message_delivered">>]), - make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>, [<<"$events/message_acked">>]), - make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]), - make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>]) + make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, + [<<"$events/client_connected">>]), + make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>, + [<<"$events/client_disconnected">>]), + make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>, + [<<"$events/session_subscribed">>]), + make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>, + [<<"$events/session_unsubscribed">>]), + make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>, + [<<"$events/message_delivered">>]), + make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>, + [<<"$events/message_acked">>]), + make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, + [<<"$events/message_dropped">>]), + make_simple_rule(<<"r10">>, <<"select * from \"t/1, " + "$events/session_subscribed, $events/client_connected\"">>, + [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>]) ]), ?assertEqual(PubN + 3, length(emqx_rule_engine:get_rules_with_same_event(PubT))), ?assertEqual(2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>))), @@ -314,7 +326,8 @@ t_get_rules_with_same_event(_Config) -> ?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>))), ?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>))), ?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>))), - ok = delete_rules_by_ids([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]), + ok = delete_rules_by_ids([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>, + <<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]), ok. %%------------------------------------------------------------------------------ @@ -866,7 +879,8 @@ t_sqlparse_foreach_7(_Config) -> "incase is_not_null(info.cmd) " "from \"t/#\" " "where s.page = '2' ", - Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": {\"info\":[{\"name\":\"cmd1\", \"cmd\":\"1\"}, {\"cmd\":\"2\"}]} } }">>, + Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": " + "{\"info\":[{\"name\":\"cmd1\", \"cmd\":\"1\"}, {\"cmd\":\"2\"}]} } }">>, ?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}, #{<<"msg_type">> := <<"2">>}]}, emqx_rule_sqltester:test( #{sql => Sql, @@ -892,7 +906,8 @@ t_sqlparse_foreach_8(_Config) -> "incase is_map(info) " "from \"t/#\" " "where s.page = '2' ", - Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": {\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }">>, + Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": " + "{\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }">>, ?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]}, emqx_rule_sqltester:test( #{sql => Sql, From 8354095e4bbc6ae42f04ffcd5e620a103e6d41b5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 4 Apr 2022 16:14:42 -0300 Subject: [PATCH 3/5] feat(rules): export basic usage info for telemetry --- .../emqx_rule_engine/src/emqx_rule_engine.erl | 53 +++++++++++++++++++ .../test/emqx_rule_engine_SUITE.erl | 41 ++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 1bf0088fe..f42094b0c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -58,6 +58,9 @@ , clear_metrics_for_rule/1 ]). +%% exported for `emqx_telemetry' +-export([get_basic_usage_info/0]). + %% gen_server Callbacks -export([ init/1 , handle_call/3 @@ -201,6 +204,56 @@ unload_hooks_for_rule(#{id := Id, from := Topics}) -> end end, Topics). +%%------------------------------------------------------------------------------ +%% Telemetry helper functions +%%------------------------------------------------------------------------------ + +-spec get_basic_usage_info() -> #{ num_rules => non_neg_integer() + , referenced_bridges => + #{ BridgeType => non_neg_integer() + } + } + when BridgeType :: atom(). +get_basic_usage_info() -> + try + Rules = get_rules(), + EnabledRules = + lists:filter( + fun(#{enable := Enabled}) -> Enabled end, + Rules), + NumRules = length(EnabledRules), + ReferencedBridges = + lists:foldl( + fun(#{outputs := Outputs}, Acc) -> + BridgeIDs = lists:filter(fun is_binary/1, Outputs), + tally_referenced_bridges(BridgeIDs, Acc) + end, + #{}, + EnabledRules), + #{ num_rules => NumRules + , referenced_bridges => ReferencedBridges + } + catch + _:_ -> + #{ num_rules => 0 + , referenced_bridges => #{} + } + end. + + +tally_referenced_bridges(BridgeIDs, Acc0) -> + lists:foldl( + fun(BridgeID, Acc) -> + {BridgeType, _BridgeName} = emqx_bridge:parse_bridge_id(BridgeID), + maps:update_with( + BridgeType, + fun(X) -> X + 1 end, + 1, + Acc) + end, + Acc0, + BridgeIDs). + %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 8254e182c..ab5a34f13 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1356,6 +1356,47 @@ t_sqlparse_nested_get(_Config) -> payload => <<"{\"a\": {\"b\": 0}}">> }})). +%%------------------------------------------------------------------------------ +%% Test cases for telemetry functions +%%------------------------------------------------------------------------------ + +t_get_basic_usage_info_0(_Config) -> + ?assertEqual( + #{ num_rules => 0 + , referenced_bridges => #{} + }, + emqx_rule_engine:get_basic_usage_info()), + ok. + +t_get_basic_usage_info_1(_Config) -> + {ok, _} = + emqx_rule_engine:create_rule( + #{id => <<"rule:t_get_basic_usage_info:1">>, + sql => <<"select 1 from topic">>, + outputs => + [ #{function => <<"erlang:hibernate">>, args => #{}} + , #{function => console} + , <<"http:my_http_bridge">> + , <<"http:my_http_bridge">> + ]}), + {ok, _} = + emqx_rule_engine:create_rule( + #{id => <<"rule:t_get_basic_usage_info:2">>, + sql => <<"select 1 from topic">>, + outputs => + [ <<"mqtt:my_mqtt_bridge">> + , <<"http:my_http_bridge">> + ]}), + ?assertEqual( + #{ num_rules => 2 + , referenced_bridges => + #{ mqtt => 1 + , http => 3 + } + }, + emqx_rule_engine:get_basic_usage_info()), + ok. + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ From 911e02f6269fe5e31ad954d0794ce40f75e21426 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 4 Apr 2022 18:12:27 -0300 Subject: [PATCH 4/5] feat(bridge): export basic usage info for telemetry --- apps/emqx_bridge/src/emqx_bridge.erl | 32 +++++- apps/emqx_bridge/src/emqx_bridge_monitor.erl | 9 +- apps/emqx_bridge/test/emqx_bridge_SUITE.erl | 112 +++++++++++++++++++ 3 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 apps/emqx_bridge/test/emqx_bridge_SUITE.erl diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d0c4e389c..84920109c 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -60,6 +60,9 @@ -export([ config_key_path/0 ]). +%% exported for `emqx_telemetry' +-export([get_basic_usage_info/0]). + load_hook() -> Bridges = emqx:get_config([bridges], #{}), load_hook(Bridges). @@ -244,7 +247,7 @@ update(Type, Name, {OldConf, Conf}) -> %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated %% without restarting the bridge. %% - case if_only_to_toggole_enable(OldConf, Conf) of + case if_only_to_toggle_enable(OldConf, Conf) of false -> ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, config => Conf}), @@ -396,7 +399,7 @@ maybe_disable_bridge(Type, Name, Conf) -> true -> ok end. -if_only_to_toggole_enable(OldConf, Conf) -> +if_only_to_toggle_enable(OldConf, Conf) -> #{added := Added, removed := Removed, changed := Updated} = emqx_map_lib:diff_maps(OldConf, Conf), case {Added, Removed, Updated} of @@ -407,6 +410,31 @@ if_only_to_toggole_enable(OldConf, Conf) -> {_, _, _} -> false end. +-spec get_basic_usage_info() -> + #{ num_bridges => non_neg_integer() + , count_by_type => + #{ BridgeType => non_neg_integer() + } + } when BridgeType :: atom(). +get_basic_usage_info() -> + lists:foldl( + fun(#{resource_data := #{config := #{enable := false}}}, Acc) -> + Acc; + (#{type := BridgeType}, Acc) -> + NumBridges = maps:get(num_bridges, Acc), + CountByType0 = maps:get(count_by_type, Acc), + CountByType = maps:update_with( + binary_to_atom(BridgeType, utf8), + fun(X) -> X + 1 end, + 1, + CountByType0), + Acc#{ num_bridges => NumBridges + 1 + , count_by_type => CountByType + } + end, + #{num_bridges => 0, count_by_type => #{}}, + list()). + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index 6590ed928..8de216974 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -19,6 +19,8 @@ -behaviour(gen_server). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + %% API functions -export([ start_link/0 , ensure_all_started/1 @@ -67,6 +69,11 @@ code_change(_OldVsn, State, _Extra) -> load_bridges(Configs) -> lists:foreach(fun({Type, NamedConf}) -> lists:foreach(fun({Name, Conf}) -> - emqx_bridge:create(Type, Name, Conf) + _Res = emqx_bridge:create(Type, Name, Conf), + ?tp(emqx_bridge_monitor_loaded_bridge, + #{ type => Type + , name => Name + , res => _Res + }) end, maps:to_list(NamedConf)) end, maps:to_list(Configs)). diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl new file mode 100644 index 000000000..b03db50f8 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -0,0 +1,112 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% to avoid inter-suite dependencies + application:stop(emqx_connector), + ok = emqx_common_test_helpers:start_apps([emqx, emqx_bridge]), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([emqx, emqx_bridge]). + +init_per_testcase(t_get_basic_usage_info_1, Config) -> + setup_fake_telemetry_data(), + Config; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(t_get_basic_usage_info_1, _Config) -> + ok = emqx_bridge:remove(<<"http:basic_usage_info_http">>), + ok = emqx_bridge:remove(<<"http:basic_usage_info_http_disabled">>), + ok = emqx_bridge:remove(<<"mqtt:basic_usage_info_mqtt">>), + emqx_config:delete_override_conf_files(), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + +t_get_basic_usage_info_0(_Config) -> + ?assertEqual( + #{ num_bridges => 0 + , count_by_type => #{} + }, + emqx_bridge:get_basic_usage_info()). + +t_get_basic_usage_info_1(_Config) -> + BasicUsageInfo = emqx_bridge:get_basic_usage_info(), + ?assertEqual( + #{ num_bridges => 2 + , count_by_type => #{ http => 1 + , mqtt => 1 + } + }, + BasicUsageInfo). + +setup_fake_telemetry_data() -> + ConnectorConf = + #{<<"connectors">> => + #{<<"mqtt">> => #{<<"my_mqtt_connector">> => #{}}}}, + MQTTConfig = #{ connector => <<"mqtt:my_mqtt_connector">> + , enable => true + , direction => ingress + , remote_topic => <<"aws/#">> + , remote_qos => 1 + }, + HTTPConfig = #{ url => <<"http://localhost:9901/messages/${topic}">> + , enable => true + , direction => egress + , local_topic => "emqx_http/#" + , method => post + , body => <<"${payload}">> + , headers => #{} + , request_timeout => "15s" + }, + Conf = + #{ <<"bridges">> => + #{ <<"http">> => + #{ <<"basic_usage_info_http">> => HTTPConfig + , <<"basic_usage_info_http_disabled">> => + HTTPConfig#{enable => false} + } + , <<"mqtt">> => + #{ <<"basic_usage_info_mqtt">> => MQTTConfig + } + } + }, + ok = emqx_common_test_helpers:load_config(emqx_connector_schema, ConnectorConf), + ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf), + + ok = snabbkaffe:start_trace(), + Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end, + NEvents = 3, + BackInTime = 0, + Timeout = 1_000, + {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime), + ok = emqx_bridge:load(), + {ok, _} = snabbkaffe_collector:receive_events(Sub), + ok = snabbkaffe:stop(), + ok. From 0aa3597736c7095bde504cf26ebc2867b004a7fa Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Apr 2022 10:19:45 -0300 Subject: [PATCH 5/5] feat(telemetry): report basic rule and bridge info --- apps/emqx_modules/src/emqx_telemetry.erl | 39 ++++++++++- .../test/emqx_telemetry_SUITE.erl | 70 +++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 7b3630aa0..a89c4ce2f 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -325,6 +325,10 @@ generate_uuid() -> get_telemetry(State0 = #state{uuid = UUID}) -> OSInfo = os_info(), {MQTTRTInsights, State} = mqtt_runtime_insights(State0), + #{ + rule_engine := RuleEngineInfo, + bridge := BridgeInfo + } = get_rule_engine_and_bridge_info(), {State, [ {emqx_version, bin(emqx_app:get_release())}, {license, [{edition, <<"community">>}]}, @@ -343,7 +347,9 @@ get_telemetry(State0 = #state{uuid = UUID}) -> {mqtt_runtime_insights, MQTTRTInsights}, {advanced_mqtt_features, advanced_mqtt_features()}, {authn_authz, get_authn_authz_info()}, - {gateway, get_gateway_info()} + {gateway, get_gateway_info()}, + {rule_engine, RuleEngineInfo}, + {bridge, BridgeInfo} ]}. report_telemetry(State0 = #state{url = URL}) -> @@ -468,6 +474,37 @@ get_gateway_info() -> #{} end. +get_rule_engine_and_bridge_info() -> + #{ + num_rules := NumRules, + referenced_bridges := ReferencedBridges + } = emqx_rule_engine:get_basic_usage_info(), + #{ + num_bridges := NumDataBridges, + count_by_type := BridgeTypeCount + } = emqx_bridge:get_basic_usage_info(), + BridgeInfo = + maps:fold( + fun(BridgeType, BridgeCount, Acc) -> + ReferencingRules = maps:get(BridgeType, ReferencedBridges, 0), + Acc#{ + BridgeType => #{ + num => BridgeCount, + num_linked_by_rules => ReferencingRules + } + } + end, + #{}, + BridgeTypeCount + ), + #{ + rule_engine => #{num_rules => NumRules}, + bridge => #{ + num_data_bridges => NumDataBridges, + data_bridge => BridgeInfo + } + }. + bin(L) when is_list(L) -> list_to_binary(L); bin(A) when is_atom(A) -> diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index 550e5a956..beaa9ccb2 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -115,6 +115,14 @@ init_per_testcase(t_send_after_enable, Config) -> ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end), mock_httpc(), Config; +init_per_testcase(t_rule_engine_and_data_bridge_info, Config) -> + mock_httpc(), + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + {ok, _} = application:ensure_all_started(emqx_rule_engine), + ok = application:start(emqx_bridge), + ok = emqx_bridge_SUITE:setup_fake_telemetry_data(), + ok = setup_fake_rule_engine_data(), + Config; init_per_testcase(_Testcase, Config) -> TestPID = self(), ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]), @@ -149,6 +157,18 @@ end_per_testcase(t_enable, _Config) -> meck:unload([httpc, emqx_telemetry]); end_per_testcase(t_send_after_enable, _Config) -> meck:unload([httpc, emqx_telemetry]); +end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) -> + meck:unload(httpc), + lists:foreach( + fun(App) -> + ok = application:stop(App) + end, + [ + emqx_bridge, + emqx_rule_engine + ] + ), + ok; end_per_testcase(_Testcase, _Config) -> meck:unload([httpc]), ok. @@ -350,6 +370,27 @@ t_mqtt_runtime_insights(_) -> ?assertEqual(30_000, maps:get(num_topics, MQTTRTInsights2)), ok. +t_rule_engine_and_data_bridge_info(_Config) -> + {ok, TelemetryData} = emqx_telemetry:get_telemetry(), + RuleInfo = get_value(rule_engine, TelemetryData), + BridgeInfo = get_value(bridge, TelemetryData), + ?assertEqual( + #{num_rules => 2}, + RuleInfo + ), + ?assertEqual( + #{ + data_bridge => + #{ + http => #{num => 1, num_linked_by_rules => 3}, + mqtt => #{num => 1, num_linked_by_rules => 1} + }, + num_data_bridges => 2 + }, + BridgeInfo + ), + ok. + assert_approximate(Map, Key, Expected) -> Value = maps:get(Key, Map), ?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])). @@ -427,6 +468,35 @@ assert_gateway_listener_shape(ListenerData, GatewayType) -> #{gateway_type => GatewayType} ). +setup_fake_rule_engine_data() -> + {ok, _} = + emqx_rule_engine:create_rule( + #{ + id => <<"rule:t_get_basic_usage_info:1">>, + sql => <<"select 1 from topic">>, + outputs => + [ + #{function => <<"erlang:hibernate">>, args => #{}}, + #{function => console}, + <<"http:my_http_bridge">>, + <<"http:my_http_bridge">> + ] + } + ), + {ok, _} = + emqx_rule_engine:create_rule( + #{ + id => <<"rule:t_get_basic_usage_info:2">>, + sql => <<"select 1 from topic">>, + outputs => + [ + <<"mqtt:my_mqtt_bridge">>, + <<"http:my_http_bridge">> + ] + } + ), + ok. + set_special_configs(emqx_authz) -> {ok, _} = emqx:update_config([authorization, cache, enable], false), {ok, _} = emqx:update_config([authorization, no_match], deny),