diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d0c4e389c..84920109c 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -60,6 +60,9 @@ -export([ config_key_path/0 ]). +%% exported for `emqx_telemetry' +-export([get_basic_usage_info/0]). + load_hook() -> Bridges = emqx:get_config([bridges], #{}), load_hook(Bridges). @@ -244,7 +247,7 @@ update(Type, Name, {OldConf, Conf}) -> %% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated %% without restarting the bridge. %% - case if_only_to_toggole_enable(OldConf, Conf) of + case if_only_to_toggle_enable(OldConf, Conf) of false -> ?SLOG(info, #{msg => "update bridge", type => Type, name => Name, config => Conf}), @@ -396,7 +399,7 @@ maybe_disable_bridge(Type, Name, Conf) -> true -> ok end. -if_only_to_toggole_enable(OldConf, Conf) -> +if_only_to_toggle_enable(OldConf, Conf) -> #{added := Added, removed := Removed, changed := Updated} = emqx_map_lib:diff_maps(OldConf, Conf), case {Added, Removed, Updated} of @@ -407,6 +410,31 @@ if_only_to_toggole_enable(OldConf, Conf) -> {_, _, _} -> false end. +-spec get_basic_usage_info() -> + #{ num_bridges => non_neg_integer() + , count_by_type => + #{ BridgeType => non_neg_integer() + } + } when BridgeType :: atom(). +get_basic_usage_info() -> + lists:foldl( + fun(#{resource_data := #{config := #{enable := false}}}, Acc) -> + Acc; + (#{type := BridgeType}, Acc) -> + NumBridges = maps:get(num_bridges, Acc), + CountByType0 = maps:get(count_by_type, Acc), + CountByType = maps:update_with( + binary_to_atom(BridgeType, utf8), + fun(X) -> X + 1 end, + 1, + CountByType0), + Acc#{ num_bridges => NumBridges + 1 + , count_by_type => CountByType + } + end, + #{num_bridges => 0, count_by_type => #{}}, + list()). + bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_bridge/src/emqx_bridge_monitor.erl b/apps/emqx_bridge/src/emqx_bridge_monitor.erl index 6590ed928..8de216974 100644 --- a/apps/emqx_bridge/src/emqx_bridge_monitor.erl +++ b/apps/emqx_bridge/src/emqx_bridge_monitor.erl @@ -19,6 +19,8 @@ -behaviour(gen_server). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + %% API functions -export([ start_link/0 , ensure_all_started/1 @@ -67,6 +69,11 @@ code_change(_OldVsn, State, _Extra) -> load_bridges(Configs) -> lists:foreach(fun({Type, NamedConf}) -> lists:foreach(fun({Name, Conf}) -> - emqx_bridge:create(Type, Name, Conf) + _Res = emqx_bridge:create(Type, Name, Conf), + ?tp(emqx_bridge_monitor_loaded_bridge, + #{ type => Type + , name => Name + , res => _Res + }) end, maps:to_list(NamedConf)) end, maps:to_list(Configs)). diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl new file mode 100644 index 000000000..b03db50f8 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -0,0 +1,112 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% to avoid inter-suite dependencies + application:stop(emqx_connector), + ok = emqx_common_test_helpers:start_apps([emqx, emqx_bridge]), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([emqx, emqx_bridge]). + +init_per_testcase(t_get_basic_usage_info_1, Config) -> + setup_fake_telemetry_data(), + Config; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(t_get_basic_usage_info_1, _Config) -> + ok = emqx_bridge:remove(<<"http:basic_usage_info_http">>), + ok = emqx_bridge:remove(<<"http:basic_usage_info_http_disabled">>), + ok = emqx_bridge:remove(<<"mqtt:basic_usage_info_mqtt">>), + emqx_config:delete_override_conf_files(), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + +t_get_basic_usage_info_0(_Config) -> + ?assertEqual( + #{ num_bridges => 0 + , count_by_type => #{} + }, + emqx_bridge:get_basic_usage_info()). + +t_get_basic_usage_info_1(_Config) -> + BasicUsageInfo = emqx_bridge:get_basic_usage_info(), + ?assertEqual( + #{ num_bridges => 2 + , count_by_type => #{ http => 1 + , mqtt => 1 + } + }, + BasicUsageInfo). + +setup_fake_telemetry_data() -> + ConnectorConf = + #{<<"connectors">> => + #{<<"mqtt">> => #{<<"my_mqtt_connector">> => #{}}}}, + MQTTConfig = #{ connector => <<"mqtt:my_mqtt_connector">> + , enable => true + , direction => ingress + , remote_topic => <<"aws/#">> + , remote_qos => 1 + }, + HTTPConfig = #{ url => <<"http://localhost:9901/messages/${topic}">> + , enable => true + , direction => egress + , local_topic => "emqx_http/#" + , method => post + , body => <<"${payload}">> + , headers => #{} + , request_timeout => "15s" + }, + Conf = + #{ <<"bridges">> => + #{ <<"http">> => + #{ <<"basic_usage_info_http">> => HTTPConfig + , <<"basic_usage_info_http_disabled">> => + HTTPConfig#{enable => false} + } + , <<"mqtt">> => + #{ <<"basic_usage_info_mqtt">> => MQTTConfig + } + } + }, + ok = emqx_common_test_helpers:load_config(emqx_connector_schema, ConnectorConf), + ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf), + + ok = snabbkaffe:start_trace(), + Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end, + NEvents = 3, + BackInTime = 0, + Timeout = 1_000, + {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime), + ok = emqx_bridge:load(), + {ok, _} = snabbkaffe_collector:receive_events(Sub), + ok = snabbkaffe:stop(), + ok.