diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index bf8efc8f6..4fcd2718e 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -33,7 +33,21 @@ stop(_State) -> ok. maybe_enable_modules() -> - emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(), + DelayedEnabled = emqx_conf:get([delayed, enable], true), + RewriteEnabled = length(emqx_conf:get([rewrite], [])) > 0, + RetainerEnabled = emqx_conf:get([retainer, enable], false), + AutoSubscribeEnabled = length(emqx_conf:get([auto_subscribe, topics], [])) > 0, + application:set_env( + emqx_telemetry, + advanced_mqtt_features_in_use, + #{ + delayed => DelayedEnabled, + topic_rewrite => RewriteEnabled, + retained => RetainerEnabled, + auto_subscribe => AutoSubscribeEnabled + } + ), + DelayedEnabled andalso emqx_delayed:enable(), emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(), emqx_event_message:enable(), diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index ae0cd428e..e15f91519 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -347,7 +347,8 @@ get_telemetry(State0 = #state{uuid = UUID}) -> {messages_sent, messages_sent()}, {build_info, build_info()}, {vm_specs, vm_specs()}, - {mqtt_runtime_insights, MQTTRTInsights} + {mqtt_runtime_insights, MQTTRTInsights}, + {advanced_mqtt_features, advanced_mqtt_features()} ]}. report_telemetry(State0 = #state{url = URL}) -> @@ -447,6 +448,10 @@ update_mqtt_rates( update_mqtt_rates(State) -> {#{}, State}. +advanced_mqtt_features() -> + AdvancedFeatures = application:get_env(emqx_telemetry, advanced_mqtt_features_in_use, #{}), + maps:map(fun(_K, V) -> bool2int(V) end, AdvancedFeatures). + bin(L) when is_list(L) -> list_to_binary(L); bin(A) when is_atom(A) -> @@ -454,6 +459,9 @@ bin(A) when is_atom(A) -> bin(B) when is_binary(B) -> B. +bool2int(true) -> 1; +bool2int(false) -> 0. + empty_state() -> #state{ url = ?TELEMETRY_URL, diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index 2c93d33d3..004d10d43 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -29,11 +29,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> snabbkaffe:fix_ct_logging(), - emqx_common_test_helpers:start_apps([emqx_modules]), + emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_modules]). + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]). init_per_testcase(t_get_telemetry, Config) -> DataDir = ?config(data_dir, Config), @@ -66,6 +66,19 @@ init_per_testcase(t_get_telemetry, Config) -> end ), Config; +init_per_testcase(t_advanced_mqtt_features, Config) -> + OldValues = application:get_env(emqx_telemetry, advanced_mqtt_features_in_use, #{}), + application:set_env( + emqx_telemetry, + advanced_mqtt_features_in_use, + #{ + delayed => false, + topic_rewrite => false, + retained => false, + auto_subscribe => false + } + ), + [{old_values, OldValues} | Config]; init_per_testcase(_Testcase, Config) -> TestPID = self(), ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]), @@ -79,6 +92,9 @@ init_per_testcase(_Testcase, Config) -> end_per_testcase(t_get_telemetry, _Config) -> meck:unload([httpc, emqx_telemetry]), ok; +end_per_testcase(t_advanced_mqtt_features, Config) -> + OldValues = ?config(old_values, Config), + application:set_env(emqx_telemetry, advanced_mqtt_features_in_use, OldValues); end_per_testcase(_Testcase, _Config) -> meck:unload([httpc]), ok. @@ -142,6 +158,39 @@ t_get_telemetry(_Config) -> ?assert(is_integer(maps:get(num_topics, MQTTRTInsights))), ok. +t_advanced_mqtt_features(_) -> + {ok, TelemetryData} = emqx_telemetry:get_telemetry(), + AdvFeats = get_value(advanced_mqtt_features, TelemetryData), + ?assertEqual( + #{ + retained => 0, + topic_rewrite => 0, + auto_subscribe => 0, + delayed => 0 + }, + AdvFeats + ), + lists:foreach( + fun(TelemetryKey) -> + {ok, EnabledFeats} = application:get_env(emqx_telemetry, advanced_mqtt_features_in_use), + application:set_env( + emqx_telemetry, + advanced_mqtt_features_in_use, + EnabledFeats#{TelemetryKey => true} + ), + {ok, Data} = emqx_telemetry:get_telemetry(), + #{TelemetryKey := Value} = get_value(advanced_mqtt_features, Data), + ?assertEqual(1, Value, #{key => TelemetryKey}) + end, + [ + retained, + topic_rewrite, + auto_subscribe, + delayed + ] + ), + ok. + t_enable(_) -> ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]), ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end), @@ -176,6 +225,13 @@ t_send_after_enable(_) -> <<"messages_received_rate">> := _, <<"messages_sent_rate">> := _, <<"num_topics">> := _ + }, + <<"advanced_mqtt_features">> := + #{ + <<"retained">> := _, + <<"topic_rewrite">> := _, + <<"auto_subscribe">> := _, + <<"delayed">> := _ } }, Decoded