feat(bridge): export basic usage info for telemetry
This commit is contained in:
parent
8354095e4b
commit
911e02f626
|
@ -60,6 +60,9 @@
|
||||||
-export([ config_key_path/0
|
-export([ config_key_path/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% exported for `emqx_telemetry'
|
||||||
|
-export([get_basic_usage_info/0]).
|
||||||
|
|
||||||
load_hook() ->
|
load_hook() ->
|
||||||
Bridges = emqx:get_config([bridges], #{}),
|
Bridges = emqx:get_config([bridges], #{}),
|
||||||
load_hook(Bridges).
|
load_hook(Bridges).
|
||||||
|
@ -244,7 +247,7 @@ update(Type, Name, {OldConf, Conf}) ->
|
||||||
%% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
|
%% the `method` or `headers` of a HTTP bridge is changed, then the bridge can be updated
|
||||||
%% without restarting the bridge.
|
%% without restarting the bridge.
|
||||||
%%
|
%%
|
||||||
case if_only_to_toggole_enable(OldConf, Conf) of
|
case if_only_to_toggle_enable(OldConf, Conf) of
|
||||||
false ->
|
false ->
|
||||||
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
|
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
|
||||||
config => Conf}),
|
config => Conf}),
|
||||||
|
@ -396,7 +399,7 @@ maybe_disable_bridge(Type, Name, Conf) ->
|
||||||
true -> ok
|
true -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
if_only_to_toggole_enable(OldConf, Conf) ->
|
if_only_to_toggle_enable(OldConf, Conf) ->
|
||||||
#{added := Added, removed := Removed, changed := Updated} =
|
#{added := Added, removed := Removed, changed := Updated} =
|
||||||
emqx_map_lib:diff_maps(OldConf, Conf),
|
emqx_map_lib:diff_maps(OldConf, Conf),
|
||||||
case {Added, Removed, Updated} of
|
case {Added, Removed, Updated} of
|
||||||
|
@ -407,6 +410,31 @@ if_only_to_toggole_enable(OldConf, Conf) ->
|
||||||
{_, _, _} -> false
|
{_, _, _} -> false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec get_basic_usage_info() ->
|
||||||
|
#{ num_bridges => non_neg_integer()
|
||||||
|
, count_by_type =>
|
||||||
|
#{ BridgeType => non_neg_integer()
|
||||||
|
}
|
||||||
|
} when BridgeType :: atom().
|
||||||
|
get_basic_usage_info() ->
|
||||||
|
lists:foldl(
|
||||||
|
fun(#{resource_data := #{config := #{enable := false}}}, Acc) ->
|
||||||
|
Acc;
|
||||||
|
(#{type := BridgeType}, Acc) ->
|
||||||
|
NumBridges = maps:get(num_bridges, Acc),
|
||||||
|
CountByType0 = maps:get(count_by_type, Acc),
|
||||||
|
CountByType = maps:update_with(
|
||||||
|
binary_to_atom(BridgeType, utf8),
|
||||||
|
fun(X) -> X + 1 end,
|
||||||
|
1,
|
||||||
|
CountByType0),
|
||||||
|
Acc#{ num_bridges => NumBridges + 1
|
||||||
|
, count_by_type => CountByType
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
#{num_bridges => 0, count_by_type => #{}},
|
||||||
|
list()).
|
||||||
|
|
||||||
bin(Bin) when is_binary(Bin) -> Bin;
|
bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
%% API functions
|
%% API functions
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
, ensure_all_started/1
|
, ensure_all_started/1
|
||||||
|
@ -67,6 +69,11 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
load_bridges(Configs) ->
|
load_bridges(Configs) ->
|
||||||
lists:foreach(fun({Type, NamedConf}) ->
|
lists:foreach(fun({Type, NamedConf}) ->
|
||||||
lists:foreach(fun({Name, Conf}) ->
|
lists:foreach(fun({Name, Conf}) ->
|
||||||
emqx_bridge:create(Type, Name, Conf)
|
_Res = emqx_bridge:create(Type, Name, Conf),
|
||||||
|
?tp(emqx_bridge_monitor_loaded_bridge,
|
||||||
|
#{ type => Type
|
||||||
|
, name => Name
|
||||||
|
, res => _Res
|
||||||
|
})
|
||||||
end, maps:to_list(NamedConf))
|
end, maps:to_list(NamedConf))
|
||||||
end, maps:to_list(Configs)).
|
end, maps:to_list(Configs)).
|
||||||
|
|
|
@ -0,0 +1,112 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_bridge_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
%% to avoid inter-suite dependencies
|
||||||
|
application:stop(emqx_connector),
|
||||||
|
ok = emqx_common_test_helpers:start_apps([emqx, emqx_bridge]),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_common_test_helpers:stop_apps([emqx, emqx_bridge]).
|
||||||
|
|
||||||
|
init_per_testcase(t_get_basic_usage_info_1, Config) ->
|
||||||
|
setup_fake_telemetry_data(),
|
||||||
|
Config;
|
||||||
|
init_per_testcase(_TestCase, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(t_get_basic_usage_info_1, _Config) ->
|
||||||
|
ok = emqx_bridge:remove(<<"http:basic_usage_info_http">>),
|
||||||
|
ok = emqx_bridge:remove(<<"http:basic_usage_info_http_disabled">>),
|
||||||
|
ok = emqx_bridge:remove(<<"mqtt:basic_usage_info_mqtt">>),
|
||||||
|
emqx_config:delete_override_conf_files(),
|
||||||
|
ok;
|
||||||
|
end_per_testcase(_TestCase, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_get_basic_usage_info_0(_Config) ->
|
||||||
|
?assertEqual(
|
||||||
|
#{ num_bridges => 0
|
||||||
|
, count_by_type => #{}
|
||||||
|
},
|
||||||
|
emqx_bridge:get_basic_usage_info()).
|
||||||
|
|
||||||
|
t_get_basic_usage_info_1(_Config) ->
|
||||||
|
BasicUsageInfo = emqx_bridge:get_basic_usage_info(),
|
||||||
|
?assertEqual(
|
||||||
|
#{ num_bridges => 2
|
||||||
|
, count_by_type => #{ http => 1
|
||||||
|
, mqtt => 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
BasicUsageInfo).
|
||||||
|
|
||||||
|
setup_fake_telemetry_data() ->
|
||||||
|
ConnectorConf =
|
||||||
|
#{<<"connectors">> =>
|
||||||
|
#{<<"mqtt">> => #{<<"my_mqtt_connector">> => #{}}}},
|
||||||
|
MQTTConfig = #{ connector => <<"mqtt:my_mqtt_connector">>
|
||||||
|
, enable => true
|
||||||
|
, direction => ingress
|
||||||
|
, remote_topic => <<"aws/#">>
|
||||||
|
, remote_qos => 1
|
||||||
|
},
|
||||||
|
HTTPConfig = #{ url => <<"http://localhost:9901/messages/${topic}">>
|
||||||
|
, enable => true
|
||||||
|
, direction => egress
|
||||||
|
, local_topic => "emqx_http/#"
|
||||||
|
, method => post
|
||||||
|
, body => <<"${payload}">>
|
||||||
|
, headers => #{}
|
||||||
|
, request_timeout => "15s"
|
||||||
|
},
|
||||||
|
Conf =
|
||||||
|
#{ <<"bridges">> =>
|
||||||
|
#{ <<"http">> =>
|
||||||
|
#{ <<"basic_usage_info_http">> => HTTPConfig
|
||||||
|
, <<"basic_usage_info_http_disabled">> =>
|
||||||
|
HTTPConfig#{enable => false}
|
||||||
|
}
|
||||||
|
, <<"mqtt">> =>
|
||||||
|
#{ <<"basic_usage_info_mqtt">> => MQTTConfig
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ok = emqx_common_test_helpers:load_config(emqx_connector_schema, ConnectorConf),
|
||||||
|
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf),
|
||||||
|
|
||||||
|
ok = snabbkaffe:start_trace(),
|
||||||
|
Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end,
|
||||||
|
NEvents = 3,
|
||||||
|
BackInTime = 0,
|
||||||
|
Timeout = 1_000,
|
||||||
|
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
|
||||||
|
ok = emqx_bridge:load(),
|
||||||
|
{ok, _} = snabbkaffe_collector:receive_events(Sub),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
Loading…
Reference in New Issue