Merge pull request #7522 from thalesmg/telemetry-revamp-part5

feat: add basic rule engine and bridge usage info to telemetry
This commit is contained in:
Thales Macedo Garitezi 2022-04-07 10:19:55 -03:00 committed by GitHub
commit 1aea6b3ea7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 382 additions and 19 deletions

View File

@ -60,6 +60,9 @@
-export([ config_key_path/0
]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
load_hook() ->
Bridges = emqx:get_config([bridges], #{}),
load_hook(Bridges).
@ -244,7 +247,7 @@ update(Type, Name, {OldConf, Conf}) ->
%% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
%% without restarting the bridge.
%%
case if_only_to_toggole_enable(OldConf, Conf) of
case if_only_to_toggle_enable(OldConf, Conf) of
false ->
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
config => Conf}),
@ -396,7 +399,7 @@ maybe_disable_bridge(Type, Name, Conf) ->
true -> ok
end.
if_only_to_toggole_enable(OldConf, Conf) ->
if_only_to_toggle_enable(OldConf, Conf) ->
#{added := Added, removed := Removed, changed := Updated} =
emqx_map_lib:diff_maps(OldConf, Conf),
case {Added, Removed, Updated} of
@ -407,6 +410,31 @@ if_only_to_toggole_enable(OldConf, Conf) ->
{_, _, _} -> false
end.
-spec get_basic_usage_info() ->
#{ num_bridges => non_neg_integer()
, count_by_type =>
#{ BridgeType => non_neg_integer()
}
} when BridgeType :: atom().
get_basic_usage_info() ->
lists:foldl(
fun(#{resource_data := #{config := #{enable := false}}}, Acc) ->
Acc;
(#{type := BridgeType}, Acc) ->
NumBridges = maps:get(num_bridges, Acc),
CountByType0 = maps:get(count_by_type, Acc),
CountByType = maps:update_with(
binary_to_atom(BridgeType, utf8),
fun(X) -> X + 1 end,
1,
CountByType0),
Acc#{ num_bridges => NumBridges + 1
, count_by_type => CountByType
}
end,
#{num_bridges => 0, count_by_type => #{}},
list()).
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).

View File

@ -19,6 +19,8 @@
-behaviour(gen_server).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API functions
-export([ start_link/0
, ensure_all_started/1
@ -67,6 +69,11 @@ code_change(_OldVsn, State, _Extra) ->
load_bridges(Configs) ->
lists:foreach(fun({Type, NamedConf}) ->
lists:foreach(fun({Name, Conf}) ->
emqx_bridge:create(Type, Name, Conf)
_Res = emqx_bridge:create(Type, Name, Conf),
?tp(emqx_bridge_monitor_loaded_bridge,
#{ type => Type
, name => Name
, res => _Res
})
end, maps:to_list(NamedConf))
end, maps:to_list(Configs)).

View File

@ -0,0 +1,112 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
%% to avoid inter-suite dependencies
application:stop(emqx_connector),
ok = emqx_common_test_helpers:start_apps([emqx, emqx_bridge]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx, emqx_bridge]).
init_per_testcase(t_get_basic_usage_info_1, Config) ->
setup_fake_telemetry_data(),
Config;
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(t_get_basic_usage_info_1, _Config) ->
ok = emqx_bridge:remove(<<"http:basic_usage_info_http">>),
ok = emqx_bridge:remove(<<"http:basic_usage_info_http_disabled">>),
ok = emqx_bridge:remove(<<"mqtt:basic_usage_info_mqtt">>),
emqx_config:delete_override_conf_files(),
ok;
end_per_testcase(_TestCase, _Config) ->
ok.
t_get_basic_usage_info_0(_Config) ->
?assertEqual(
#{ num_bridges => 0
, count_by_type => #{}
},
emqx_bridge:get_basic_usage_info()).
t_get_basic_usage_info_1(_Config) ->
BasicUsageInfo = emqx_bridge:get_basic_usage_info(),
?assertEqual(
#{ num_bridges => 2
, count_by_type => #{ http => 1
, mqtt => 1
}
},
BasicUsageInfo).
setup_fake_telemetry_data() ->
ConnectorConf =
#{<<"connectors">> =>
#{<<"mqtt">> => #{<<"my_mqtt_connector">> => #{}}}},
MQTTConfig = #{ connector => <<"mqtt:my_mqtt_connector">>
, enable => true
, direction => ingress
, remote_topic => <<"aws/#">>
, remote_qos => 1
},
HTTPConfig = #{ url => <<"http://localhost:9901/messages/${topic}">>
, enable => true
, direction => egress
, local_topic => "emqx_http/#"
, method => post
, body => <<"${payload}">>
, headers => #{}
, request_timeout => "15s"
},
Conf =
#{ <<"bridges">> =>
#{ <<"http">> =>
#{ <<"basic_usage_info_http">> => HTTPConfig
, <<"basic_usage_info_http_disabled">> =>
HTTPConfig#{enable => false}
}
, <<"mqtt">> =>
#{ <<"basic_usage_info_mqtt">> => MQTTConfig
}
}
},
ok = emqx_common_test_helpers:load_config(emqx_connector_schema, ConnectorConf),
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf),
ok = snabbkaffe:start_trace(),
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end,
NEvents = 3,
BackInTime = 0,
Timeout = 1_000,
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
ok = emqx_bridge:load(),
{ok, _} = snabbkaffe_collector:receive_events(Sub),
ok = snabbkaffe:stop(),
ok.

View File

@ -59,7 +59,7 @@ init_per_suite(Cfg) ->
emqx_common_test_helpers:start_apps([emqx_exhook]),
Cfg.
end_per_suite(Cfg) ->
end_per_suite(_Cfg) ->
meck:unload(emqx_exhook_demo_svr),
meck:unload(emqx_exhook_mgr),
emqx_exhook_demo_svr:stop(),

View File

@ -325,6 +325,10 @@ generate_uuid() ->
get_telemetry(State0 = #state{uuid = UUID}) ->
OSInfo = os_info(),
{MQTTRTInsights, State} = mqtt_runtime_insights(State0),
#{
rule_engine := RuleEngineInfo,
bridge := BridgeInfo
} = get_rule_engine_and_bridge_info(),
{State, [
{emqx_version, bin(emqx_app:get_release())},
{license, [{edition, <<"community">>}]},
@ -343,7 +347,9 @@ get_telemetry(State0 = #state{uuid = UUID}) ->
{mqtt_runtime_insights, MQTTRTInsights},
{advanced_mqtt_features, advanced_mqtt_features()},
{authn_authz, get_authn_authz_info()},
{gateway, get_gateway_info()}
{gateway, get_gateway_info()},
{rule_engine, RuleEngineInfo},
{bridge, BridgeInfo}
]}.
report_telemetry(State0 = #state{url = URL}) ->
@ -468,6 +474,37 @@ get_gateway_info() ->
#{}
end.
get_rule_engine_and_bridge_info() ->
#{
num_rules := NumRules,
referenced_bridges := ReferencedBridges
} = emqx_rule_engine:get_basic_usage_info(),
#{
num_bridges := NumDataBridges,
count_by_type := BridgeTypeCount
} = emqx_bridge:get_basic_usage_info(),
BridgeInfo =
maps:fold(
fun(BridgeType, BridgeCount, Acc) ->
ReferencingRules = maps:get(BridgeType, ReferencedBridges, 0),
Acc#{
BridgeType => #{
num => BridgeCount,
num_linked_by_rules => ReferencingRules
}
}
end,
#{},
BridgeTypeCount
),
#{
rule_engine => #{num_rules => NumRules},
bridge => #{
num_data_bridges => NumDataBridges,
data_bridge => BridgeInfo
}
}.
bin(L) when is_list(L) ->
list_to_binary(L);
bin(A) when is_atom(A) ->

View File

@ -115,6 +115,14 @@ init_per_testcase(t_send_after_enable, Config) ->
ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
mock_httpc(),
Config;
init_per_testcase(t_rule_engine_and_data_bridge_info, Config) ->
mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
{ok, _} = application:ensure_all_started(emqx_rule_engine),
ok = application:start(emqx_bridge),
ok = emqx_bridge_SUITE:setup_fake_telemetry_data(),
ok = setup_fake_rule_engine_data(),
Config;
init_per_testcase(_Testcase, Config) ->
TestPID = self(),
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
@ -149,6 +157,18 @@ end_per_testcase(t_enable, _Config) ->
meck:unload([httpc, emqx_telemetry]);
end_per_testcase(t_send_after_enable, _Config) ->
meck:unload([httpc, emqx_telemetry]);
end_per_testcase(t_rule_engine_and_data_bridge_info, _Config) ->
meck:unload(httpc),
lists:foreach(
fun(App) ->
ok = application:stop(App)
end,
[
emqx_bridge,
emqx_rule_engine
]
),
ok;
end_per_testcase(_Testcase, _Config) ->
meck:unload([httpc]),
ok.
@ -350,6 +370,27 @@ t_mqtt_runtime_insights(_) ->
?assertEqual(30_000, maps:get(num_topics, MQTTRTInsights2)),
ok.
t_rule_engine_and_data_bridge_info(_Config) ->
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
RuleInfo = get_value(rule_engine, TelemetryData),
BridgeInfo = get_value(bridge, TelemetryData),
?assertEqual(
#{num_rules => 2},
RuleInfo
),
?assertEqual(
#{
data_bridge =>
#{
http => #{num => 1, num_linked_by_rules => 3},
mqtt => #{num => 1, num_linked_by_rules => 1}
},
num_data_bridges => 2
},
BridgeInfo
),
ok.
assert_approximate(Map, Key, Expected) ->
Value = maps:get(Key, Map),
?assertEqual(Expected, float_to_list(Value, [{decimals, 2}])).
@ -427,6 +468,35 @@ assert_gateway_listener_shape(ListenerData, GatewayType) ->
#{gateway_type => GatewayType}
).
setup_fake_rule_engine_data() ->
{ok, _} =
emqx_rule_engine:create_rule(
#{
id => <<"rule:t_get_basic_usage_info:1">>,
sql => <<"select 1 from topic">>,
outputs =>
[
#{function => <<"erlang:hibernate">>, args => #{}},
#{function => console},
<<"http:my_http_bridge">>,
<<"http:my_http_bridge">>
]
}
),
{ok, _} =
emqx_rule_engine:create_rule(
#{
id => <<"rule:t_get_basic_usage_info:2">>,
sql => <<"select 1 from topic">>,
outputs =>
[
<<"mqtt:my_mqtt_bridge">>,
<<"http:my_http_bridge">>
]
}
),
ok.
set_special_configs(emqx_authz) ->
{ok, _} = emqx:update_config([authorization, cache, enable], false),
{ok, _} = emqx:update_config([authorization, no_match], deny),

View File

@ -58,6 +58,9 @@
, clear_metrics_for_rule/1
]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
%% gen_server Callbacks
-export([ init/1
, handle_call/3
@ -201,6 +204,56 @@ unload_hooks_for_rule(#{id := Id, from := Topics}) ->
end
end, Topics).
%%------------------------------------------------------------------------------
%% Telemetry helper functions
%%------------------------------------------------------------------------------
-spec get_basic_usage_info() -> #{ num_rules => non_neg_integer()
, referenced_bridges =>
#{ BridgeType => non_neg_integer()
}
}
when BridgeType :: atom().
get_basic_usage_info() ->
try
Rules = get_rules(),
EnabledRules =
lists:filter(
fun(#{enable := Enabled}) -> Enabled end,
Rules),
NumRules = length(EnabledRules),
ReferencedBridges =
lists:foldl(
fun(#{outputs := Outputs}, Acc) ->
BridgeIDs = lists:filter(fun is_binary/1, Outputs),
tally_referenced_bridges(BridgeIDs, Acc)
end,
#{},
EnabledRules),
#{ num_rules => NumRules
, referenced_bridges => ReferencedBridges
}
catch
_:_ ->
#{ num_rules => 0
, referenced_bridges => #{}
}
end.
tally_referenced_bridges(BridgeIDs, Acc0) ->
lists:foldl(
fun(BridgeID, Acc) ->
{BridgeType, _BridgeName} = emqx_bridge:parse_bridge_id(BridgeID),
maps:update_with(
BridgeType,
fun(X) -> X + 1 end,
1,
Acc)
end,
Acc0,
BridgeIDs).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------

View File

@ -278,11 +278,14 @@ t_get_rules_for_topic_2(_Config) ->
make_simple_rule(<<"rule-debug-2">>, <<"select * from \"simple/+\"">>, [<<"simple/+">>]),
make_simple_rule(<<"rule-debug-3">>, <<"select * from \"simple/+/1\"">>, [<<"simple/+/1">>]),
make_simple_rule(<<"rule-debug-4">>, <<"select * from \"simple/1\"">>, [<<"simple/1">>]),
make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>, [<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]),
make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>, [<<"simple/2">>,<<"simple/3">>, <<"simple/4">>])
make_simple_rule(<<"rule-debug-5">>, <<"select * from \"simple/2,simple/+,simple/3\"">>,
[<<"simple/2">>,<<"simple/+">>, <<"simple/3">>]),
make_simple_rule(<<"rule-debug-6">>, <<"select * from \"simple/2,simple/3,simple/4\"">>,
[<<"simple/2">>,<<"simple/3">>, <<"simple/4">>])
]),
?assertEqual(Len0+4, length(emqx_rule_engine:get_rules_for_topic(<<"simple/1">>))),
ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>, <<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]),
ok = delete_rules_by_ids([<<"rule-debug-1">>, <<"rule-debug-2">>,<<"rule-debug-3">>,
<<"rule-debug-4">>,<<"rule-debug-5">>, <<"rule-debug-6">>]),
ok.
t_get_rules_with_same_event(_Config) ->
@ -298,14 +301,23 @@ t_get_rules_with_same_event(_Config) ->
ok = insert_rules(
[make_simple_rule(<<"r1">>, <<"select * from \"simple/#\"">>, [<<"simple/#">>]),
make_simple_rule(<<"r2">>, <<"select * from \"abc/+\"">>, [<<"abc/+">>]),
make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>, [<<"$events/client_connected">>]),
make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>, [<<"$events/client_disconnected">>]),
make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>, [<<"$events/session_subscribed">>]),
make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>, [<<"$events/session_unsubscribed">>]),
make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>, [<<"$events/message_delivered">>]),
make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>, [<<"$events/message_acked">>]),
make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>, [<<"$events/message_dropped">>]),
make_simple_rule(<<"r10">>, <<"select * from \"t/1, $events/session_subscribed, $events/client_connected\"">>, [<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>])
make_simple_rule(<<"r3">>, <<"select * from \"$events/client_connected\"">>,
[<<"$events/client_connected">>]),
make_simple_rule(<<"r4">>, <<"select * from \"$events/client_disconnected\"">>,
[<<"$events/client_disconnected">>]),
make_simple_rule(<<"r5">>, <<"select * from \"$events/session_subscribed\"">>,
[<<"$events/session_subscribed">>]),
make_simple_rule(<<"r6">>, <<"select * from \"$events/session_unsubscribed\"">>,
[<<"$events/session_unsubscribed">>]),
make_simple_rule(<<"r7">>, <<"select * from \"$events/message_delivered\"">>,
[<<"$events/message_delivered">>]),
make_simple_rule(<<"r8">>, <<"select * from \"$events/message_acked\"">>,
[<<"$events/message_acked">>]),
make_simple_rule(<<"r9">>, <<"select * from \"$events/message_dropped\"">>,
[<<"$events/message_dropped">>]),
make_simple_rule(<<"r10">>, <<"select * from \"t/1, "
"$events/session_subscribed, $events/client_connected\"">>,
[<<"t/1">>, <<"$events/session_subscribed">>, <<"$events/client_connected">>])
]),
?assertEqual(PubN + 3, length(emqx_rule_engine:get_rules_with_same_event(PubT))),
?assertEqual(2, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/client_connected">>))),
@ -315,7 +327,8 @@ t_get_rules_with_same_event(_Config) ->
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_delivered">>))),
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_acked">>))),
?assertEqual(1, length(emqx_rule_engine:get_rules_with_same_event(<<"$events/message_dropped">>))),
ok = delete_rules_by_ids([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]),
ok = delete_rules_by_ids([<<"r1">>, <<"r2">>,<<"r3">>, <<"r4">>,
<<"r5">>, <<"r6">>, <<"r7">>, <<"r8">>, <<"r9">>, <<"r10">>]),
ok.
%%------------------------------------------------------------------------------
@ -867,7 +880,8 @@ t_sqlparse_foreach_7(_Config) ->
"incase is_not_null(info.cmd) "
"from \"t/#\" "
"where s.page = '2' ",
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": {\"info\":[{\"name\":\"cmd1\", \"cmd\":\"1\"}, {\"cmd\":\"2\"}]} } }">>,
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": "
"{\"info\":[{\"name\":\"cmd1\", \"cmd\":\"1\"}, {\"cmd\":\"2\"}]} } }">>,
?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}, #{<<"msg_type">> := <<"2">>}]},
emqx_rule_sqltester:test(
#{sql => Sql,
@ -893,7 +907,8 @@ t_sqlparse_foreach_8(_Config) ->
"incase is_map(info) "
"from \"t/#\" "
"where s.page = '2' ",
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": {\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }">>,
Payload = <<"{\"sensors\": {\"page\": 2, \"collection\": "
"{\"info\":[\"haha\", {\"name\":\"cmd1\", \"cmd\":\"1\"}]} } }">>,
?assertMatch({ok,[#{<<"name">> := <<"cmd1">>, <<"msg_type">> := <<"1">>}]},
emqx_rule_sqltester:test(
#{sql => Sql,
@ -1365,6 +1380,47 @@ t_sqlparse_invalid_json(_Config) ->
<<"{\"sensors\": [{\"cmd\":\"1\"} {\"cmd\":}]}">>,
topic => <<"t/a">>}})).
%%------------------------------------------------------------------------------
%% Test cases for telemetry functions
%%------------------------------------------------------------------------------
t_get_basic_usage_info_0(_Config) ->
?assertEqual(
#{ num_rules => 0
, referenced_bridges => #{}
},
emqx_rule_engine:get_basic_usage_info()),
ok.
t_get_basic_usage_info_1(_Config) ->
{ok, _} =
emqx_rule_engine:create_rule(
#{id => <<"rule:t_get_basic_usage_info:1">>,
sql => <<"select 1 from topic">>,
outputs =>
[ #{function => <<"erlang:hibernate">>, args => #{}}
, #{function => console}
, <<"http:my_http_bridge">>
, <<"http:my_http_bridge">>
]}),
{ok, _} =
emqx_rule_engine:create_rule(
#{id => <<"rule:t_get_basic_usage_info:2">>,
sql => <<"select 1 from topic">>,
outputs =>
[ <<"mqtt:my_mqtt_bridge">>
, <<"http:my_http_bridge">>
]}),
?assertEqual(
#{ num_rules => 2
, referenced_bridges =>
#{ mqtt => 1
, http => 3
}
},
emqx_rule_engine:get_basic_usage_info()),
ok.
%%------------------------------------------------------------------------------
%% Internal helpers
%%------------------------------------------------------------------------------