feat: add advanced mqtt features to telemetry
This commit is contained in:
parent
fbfbaa8a9a
commit
d312bc0f28
|
@ -33,7 +33,21 @@ stop(_State) ->
|
|||
ok.
|
||||
|
||||
maybe_enable_modules() ->
|
||||
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(),
|
||||
DelayedEnabled = emqx_conf:get([delayed, enable], true),
|
||||
RewriteEnabled = length(emqx_conf:get([rewrite], [])) > 0,
|
||||
RetainerEnabled = emqx_conf:get([retainer, enable], false),
|
||||
AutoSubscribeEnabled = length(emqx_conf:get([auto_subscribe, topics], [])) > 0,
|
||||
application:set_env(
|
||||
emqx_telemetry,
|
||||
advanced_mqtt_features_in_use,
|
||||
#{
|
||||
delayed => DelayedEnabled,
|
||||
topic_rewrite => RewriteEnabled,
|
||||
retained => RetainerEnabled,
|
||||
auto_subscribe => AutoSubscribeEnabled
|
||||
}
|
||||
),
|
||||
DelayedEnabled andalso emqx_delayed:enable(),
|
||||
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
|
||||
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
|
||||
emqx_event_message:enable(),
|
||||
|
|
|
@ -347,7 +347,8 @@ get_telemetry(State0 = #state{uuid = UUID}) ->
|
|||
{messages_sent, messages_sent()},
|
||||
{build_info, build_info()},
|
||||
{vm_specs, vm_specs()},
|
||||
{mqtt_runtime_insights, MQTTRTInsights}
|
||||
{mqtt_runtime_insights, MQTTRTInsights},
|
||||
{advanced_mqtt_features, advanced_mqtt_features()}
|
||||
]}.
|
||||
|
||||
report_telemetry(State0 = #state{url = URL}) ->
|
||||
|
@ -447,6 +448,10 @@ update_mqtt_rates(
|
|||
update_mqtt_rates(State) ->
|
||||
{#{}, State}.
|
||||
|
||||
advanced_mqtt_features() ->
|
||||
AdvancedFeatures = application:get_env(emqx_telemetry, advanced_mqtt_features_in_use, #{}),
|
||||
maps:map(fun(_K, V) -> bool2int(V) end, AdvancedFeatures).
|
||||
|
||||
bin(L) when is_list(L) ->
|
||||
list_to_binary(L);
|
||||
bin(A) when is_atom(A) ->
|
||||
|
@ -454,6 +459,9 @@ bin(A) when is_atom(A) ->
|
|||
bin(B) when is_binary(B) ->
|
||||
B.
|
||||
|
||||
bool2int(true) -> 1;
|
||||
bool2int(false) -> 0.
|
||||
|
||||
empty_state() ->
|
||||
#state{
|
||||
url = ?TELEMETRY_URL,
|
||||
|
|
|
@ -29,11 +29,11 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
|||
|
||||
init_per_suite(Config) ->
|
||||
snabbkaffe:fix_ct_logging(),
|
||||
emqx_common_test_helpers:start_apps([emqx_modules]),
|
||||
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_common_test_helpers:stop_apps([emqx_modules]).
|
||||
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]).
|
||||
|
||||
init_per_testcase(t_get_telemetry, Config) ->
|
||||
DataDir = ?config(data_dir, Config),
|
||||
|
@ -66,6 +66,19 @@ init_per_testcase(t_get_telemetry, Config) ->
|
|||
end
|
||||
),
|
||||
Config;
|
||||
init_per_testcase(t_advanced_mqtt_features, Config) ->
|
||||
OldValues = application:get_env(emqx_telemetry, advanced_mqtt_features_in_use, #{}),
|
||||
application:set_env(
|
||||
emqx_telemetry,
|
||||
advanced_mqtt_features_in_use,
|
||||
#{
|
||||
delayed => false,
|
||||
topic_rewrite => false,
|
||||
retained => false,
|
||||
auto_subscribe => false
|
||||
}
|
||||
),
|
||||
[{old_values, OldValues} | Config];
|
||||
init_per_testcase(_Testcase, Config) ->
|
||||
TestPID = self(),
|
||||
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
|
||||
|
@ -79,6 +92,9 @@ init_per_testcase(_Testcase, Config) ->
|
|||
end_per_testcase(t_get_telemetry, _Config) ->
|
||||
meck:unload([httpc, emqx_telemetry]),
|
||||
ok;
|
||||
end_per_testcase(t_advanced_mqtt_features, Config) ->
|
||||
OldValues = ?config(old_values, Config),
|
||||
application:set_env(emqx_telemetry, advanced_mqtt_features_in_use, OldValues);
|
||||
end_per_testcase(_Testcase, _Config) ->
|
||||
meck:unload([httpc]),
|
||||
ok.
|
||||
|
@ -142,6 +158,39 @@ t_get_telemetry(_Config) ->
|
|||
?assert(is_integer(maps:get(num_topics, MQTTRTInsights))),
|
||||
ok.
|
||||
|
||||
t_advanced_mqtt_features(_) ->
|
||||
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
|
||||
AdvFeats = get_value(advanced_mqtt_features, TelemetryData),
|
||||
?assertEqual(
|
||||
#{
|
||||
retained => 0,
|
||||
topic_rewrite => 0,
|
||||
auto_subscribe => 0,
|
||||
delayed => 0
|
||||
},
|
||||
AdvFeats
|
||||
),
|
||||
lists:foreach(
|
||||
fun(TelemetryKey) ->
|
||||
{ok, EnabledFeats} = application:get_env(emqx_telemetry, advanced_mqtt_features_in_use),
|
||||
application:set_env(
|
||||
emqx_telemetry,
|
||||
advanced_mqtt_features_in_use,
|
||||
EnabledFeats#{TelemetryKey => true}
|
||||
),
|
||||
{ok, Data} = emqx_telemetry:get_telemetry(),
|
||||
#{TelemetryKey := Value} = get_value(advanced_mqtt_features, Data),
|
||||
?assertEqual(1, Value, #{key => TelemetryKey})
|
||||
end,
|
||||
[
|
||||
retained,
|
||||
topic_rewrite,
|
||||
auto_subscribe,
|
||||
delayed
|
||||
]
|
||||
),
|
||||
ok.
|
||||
|
||||
t_enable(_) ->
|
||||
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
|
||||
ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
|
||||
|
@ -176,6 +225,13 @@ t_send_after_enable(_) ->
|
|||
<<"messages_received_rate">> := _,
|
||||
<<"messages_sent_rate">> := _,
|
||||
<<"num_topics">> := _
|
||||
},
|
||||
<<"advanced_mqtt_features">> :=
|
||||
#{
|
||||
<<"retained">> := _,
|
||||
<<"topic_rewrite">> := _,
|
||||
<<"auto_subscribe">> := _,
|
||||
<<"delayed">> := _
|
||||
}
|
||||
},
|
||||
Decoded
|
||||
|
|
Loading…
Reference in New Issue