diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 27dcb38d9..f7f06b13c 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -33,6 +33,9 @@ %% hook callback -export([on_client_connected/3]). +%% exported for `emqx_telemetry' +-export([get_basic_usage_info/0]). + load() -> ok = emqx_conf:add_handler([auto_subscribe, topics], ?MODULE), update_hook(). @@ -67,6 +70,15 @@ on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) -> on_client_connected(_, _, _) -> ok. +%%-------------------------------------------------------------------- +%% Telemetry +%%-------------------------------------------------------------------- + +-spec get_basic_usage_info() -> #{auto_subscribe_count => non_neg_integer()}. +get_basic_usage_info() -> + AutoSubscribe = emqx_conf:get([auto_subscribe, topics], []), + #{auto_subscribe_count => length(AutoSubscribe)}. + %%-------------------------------------------------------------------- %% internal diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 5681e7867..4e11cfd3d 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -41,7 +41,7 @@ -define(CLIENT_USERNAME, <<"auto_sub_u">>). all() -> - [t_auto_subscribe, t_update]. + emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> mria:start(), @@ -99,6 +99,18 @@ init_per_suite(Config) -> ), Config. +init_per_testcase(t_get_basic_usage_info, Config) -> + {ok, _} = emqx_auto_subscribe:update([]), + Config; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(t_get_basic_usage_info, _Config) -> + {ok, _} = emqx_auto_subscribe:update([]), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + set_special_configs(emqx_dashboard) -> emqx_dashboard_api_test_helpers:set_default_config(), ok; @@ -150,6 +162,21 @@ t_update(_) -> ?assertEqual(1, erlang:length(GETResponseMap)), ok. +t_get_basic_usage_info(_Config) -> + ?assertEqual(#{auto_subscribe_count => 0}, emqx_auto_subscribe:get_basic_usage_info()), + AutoSubscribeTopics = + lists:map( + fun(N) -> + Num = integer_to_binary(N), + Topic = <<"auto/", Num/binary>>, + #{<<"topic">> => Topic} + end, + lists:seq(1, 3) + ), + {ok, _} = emqx_auto_subscribe:update(AutoSubscribeTopics), + ?assertEqual(#{auto_subscribe_count => 3}, emqx_auto_subscribe:get_basic_usage_info()), + ok. + check_subs(Count) -> Subs = ets:tab2list(emqx_suboption), ct:pal("---> ~p ~p ~n", [Subs, Count]), diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 479786eba..3d13f91ef 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -61,6 +61,9 @@ -export([format_delayed/1]). +%% exported for `emqx_telemetry' +-export([get_basic_usage_info/0]). + -record(delayed_message, {key, delayed, msg}). -type delayed_message() :: #delayed_message{}. -type with_id_return() :: ok | {error, not_found}. @@ -314,6 +317,19 @@ terminate(_Reason, #{stats_timer := StatsTimer} = State) -> code_change(_Vsn, State, _Extra) -> {ok, State}. +%%-------------------------------------------------------------------- +%% Telemetry +%%-------------------------------------------------------------------- + +-spec get_basic_usage_info() -> #{delayed_message_count => non_neg_integer()}. +get_basic_usage_info() -> + DelayedCount = + case ets:info(?TAB, size) of + undefined -> 0; + Num -> Num + end, + #{delayed_message_count => DelayedCount}. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_modules/src/emqx_modules.erl b/apps/emqx_modules/src/emqx_modules.erl deleted file mode 100644 index 7a3c6d117..000000000 --- a/apps/emqx_modules/src/emqx_modules.erl +++ /dev/null @@ -1,37 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_modules). - --export([ - get_advanced_mqtt_features_in_use/0, - set_advanced_mqtt_features_in_use/1 -]). - --type advanced_mqtt_feature() :: delayed | topic_rewrite | retained | auto_subscribe. --type advanced_mqtt_features_in_use() :: #{advanced_mqtt_feature() => boolean()}. - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - --spec get_advanced_mqtt_features_in_use() -> advanced_mqtt_features_in_use(). -get_advanced_mqtt_features_in_use() -> - application:get_env(?MODULE, advanced_mqtt_features_in_use, #{}). - --spec set_advanced_mqtt_features_in_use(advanced_mqtt_features_in_use()) -> ok. -set_advanced_mqtt_features_in_use(Features) -> - application:set_env(?MODULE, advanced_mqtt_features_in_use, Features). diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index ed5a05503..0b9cf0123 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -33,19 +33,7 @@ stop(_State) -> ok. maybe_enable_modules() -> - DelayedEnabled = emqx_conf:get([delayed, enable], true), - RewriteEnabled = length(emqx_conf:get([rewrite], [])) > 0, - RetainerEnabled = emqx_conf:get([retainer, enable], false), - AutoSubscribeEnabled = length(emqx_conf:get([auto_subscribe, topics], [])) > 0, - emqx_modules:set_advanced_mqtt_features_in_use( - #{ - delayed => DelayedEnabled, - topic_rewrite => RewriteEnabled, - retained => RetainerEnabled, - auto_subscribe => AutoSubscribeEnabled - } - ), - DelayedEnabled andalso emqx_delayed:enable(), + emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(), emqx_modules_conf:is_telemetry_enabled() andalso emqx_telemetry:enable(), emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(), emqx_conf_cli:load(), diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index efc392d3d..2da1a9f70 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -45,6 +45,9 @@ post_config_update/5 ]). +%% exported for `emqx_telemetry' +-export([get_basic_usage_info/0]). + %%-------------------------------------------------------------------- %% Load/Unload %%-------------------------------------------------------------------- @@ -98,6 +101,15 @@ rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> rewrite_publish(Message = #message{topic = Topic}, Rules) -> {ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}. +%%-------------------------------------------------------------------- +%% Telemetry +%%-------------------------------------------------------------------- + +-spec get_basic_usage_info() -> #{topic_rewrite_rule_count => non_neg_integer()}. +get_basic_usage_info() -> + RewriteRules = list(), + #{topic_rewrite_rule_count => length(RewriteRules)}. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index f3ebcc891..cb923fb79 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -447,8 +447,16 @@ update_mqtt_rates(State) -> {#{}, State}. advanced_mqtt_features() -> - AdvancedFeatures = emqx_modules:get_advanced_mqtt_features_in_use(), - maps:map(fun(_K, V) -> bool2int(V) end, AdvancedFeatures). + #{retained_messages := RetainedMessages} = emqx_retainer:get_basic_usage_info(), + #{topic_rewrite_rule_count := RewriteRules} = emqx_rewrite:get_basic_usage_info(), + #{delayed_message_count := DelayedCount} = emqx_delayed:get_basic_usage_info(), + #{auto_subscribe_count := AutoSubscribe} = emqx_auto_subscribe:get_basic_usage_info(), + #{ + topic_rewrite => RewriteRules, + delayed => DelayedCount, + retained => RetainedMessages, + auto_subscribe => AutoSubscribe + }. get_authn_authz_info() -> try @@ -521,9 +529,6 @@ bin(A) when is_atom(A) -> bin(B) when is_binary(B) -> B. -bool2int(true) -> 1; -bool2int(false) -> 0. - ensure_uuids() -> Txn = fun() -> NodeUUID = diff --git a/apps/emqx_modules/test/emqx_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl index 2f11c9ba2..2dba326f6 100644 --- a/apps/emqx_modules/test/emqx_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -49,6 +49,7 @@ init_per_testcase(_Case, Config) -> Config. end_per_testcase(_Case, _Config) -> + {atomic, ok} = mria:clear_table(emqx_delayed), ok = emqx_delayed:disable(). %%-------------------------------------------------------------------- @@ -178,3 +179,16 @@ t_unknown_messages(_) -> ignored, gen_server:call(OldPid, unknown) ). + +t_get_basic_usage_info(_Config) -> + ?assertEqual(#{delayed_message_count => 0}, emqx_delayed:get_basic_usage_info()), + lists:foreach( + fun(N) -> + Num = integer_to_binary(N), + Message = emqx_message:make(<<"$delayed/", Num/binary, "/delayed">>, <<"payload">>), + {stop, _} = emqx_delayed:on_message_publish(Message) + end, + lists:seq(1, 4) + ), + ?assertEqual(#{delayed_message_count => 4}, emqx_delayed:get_basic_usage_info()), + ok. diff --git a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl index 6cd919034..892c25a09 100644 --- a/apps/emqx_modules/test/emqx_rewrite_SUITE.erl +++ b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl @@ -55,6 +55,18 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]). +init_per_testcase(t_get_basic_usage_info, Config) -> + ok = emqx_rewrite:update([]), + Config; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(t_get_basic_usage_info, _Config) -> + ok = emqx_rewrite:update([]), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + t_subscribe_rewrite(_Config) -> {ok, Conn} = init(), SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>], @@ -200,6 +212,27 @@ t_update_re_failed(_Config) -> ), ok. +t_get_basic_usage_info(_Config) -> + ?assertEqual(#{topic_rewrite_rule_count => 0}, emqx_rewrite:get_basic_usage_info()), + RewriteTopics = + lists:map( + fun(N) -> + Num = integer_to_binary(N), + DestTopic = <<"rewrite/dest/", Num/binary>>, + SourceTopic = <<"rewrite/source/", Num/binary>>, + #{ + <<"source_topic">> => SourceTopic, + <<"dest_topic">> => DestTopic, + <<"action">> => all, + <<"re">> => DestTopic + } + end, + lists:seq(1, 2) + ), + ok = emqx_rewrite:update(RewriteTopics), + ?assertEqual(#{topic_rewrite_rule_count => 2}, emqx_rewrite:get_basic_usage_info()), + ok. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl index 142ede620..90ad1e806 100644 --- a/apps/emqx_modules/test/emqx_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -91,14 +91,14 @@ init_per_testcase(t_get_telemetry, Config) -> {ok, _} = application:ensure_all_started(emqx_gateway), Config; init_per_testcase(t_advanced_mqtt_features, Config) -> - OldValues = emqx_modules:get_advanced_mqtt_features_in_use(), - emqx_modules:set_advanced_mqtt_features_in_use(#{ - delayed => false, - topic_rewrite => false, - retained => false, - auto_subscribe => false - }), - [{old_values, OldValues} | Config]; + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + {ok, Retainer} = emqx_retainer:start_link(), + {atomic, ok} = mria:clear_table(emqx_delayed), + mock_advanced_mqtt_features(), + [ + {retainer, Retainer} + | Config + ]; init_per_testcase(t_authn_authz_info, Config) -> mock_httpc(), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), @@ -160,8 +160,14 @@ end_per_testcase(t_get_telemetry, _Config) -> application:stop(emqx_gateway), ok; end_per_testcase(t_advanced_mqtt_features, Config) -> - OldValues = ?config(old_values, Config), - emqx_modules:set_advanced_mqtt_features_in_use(OldValues); + Retainer = ?config(retainer, Config), + process_flag(trap_exit, true), + ok = emqx_retainer:clean(), + exit(Retainer, kill), + {ok, _} = emqx_auto_subscribe:update([]), + ok = emqx_rewrite:update([]), + {atomic, ok} = mria:clear_table(emqx_delayed), + ok; end_per_testcase(t_authn_authz_info, _Config) -> meck:unload([httpc]), emqx_authz:update({delete, postgresql}, #{}), @@ -322,28 +328,13 @@ t_advanced_mqtt_features(_) -> AdvFeats = get_value(advanced_mqtt_features, TelemetryData), ?assertEqual( #{ - retained => 0, - topic_rewrite => 0, - auto_subscribe => 0, - delayed => 0 + retained => 5, + topic_rewrite => 2, + auto_subscribe => 3, + delayed => 4 }, AdvFeats ), - lists:foreach( - fun(TelemetryKey) -> - EnabledFeats = emqx_modules:get_advanced_mqtt_features_in_use(), - emqx_modules:set_advanced_mqtt_features_in_use(EnabledFeats#{TelemetryKey => true}), - {ok, Data} = emqx_telemetry:get_telemetry(), - #{TelemetryKey := Value} = get_value(advanced_mqtt_features, Data), - ?assertEqual(1, Value, #{key => TelemetryKey}) - end, - [ - retained, - topic_rewrite, - auto_subscribe, - delayed - ] - ), ok. t_authn_authz_info(_) -> @@ -514,6 +505,56 @@ mock_httpc() -> TestPID ! {request, Method, URL, Headers, Body} end). +mock_advanced_mqtt_features() -> + Context = undefined, + lists:foreach( + fun(N) -> + Num = integer_to_binary(N), + Message = emqx_message:make(<<"retained/", Num/binary>>, <<"payload">>), + ok = emqx_retainer:store_retained(Context, Message) + end, + lists:seq(1, 5) + ), + + lists:foreach( + fun(N) -> + Num = integer_to_binary(N), + Message = emqx_message:make(<<"$delayed/", Num/binary, "/delayed">>, <<"payload">>), + {stop, _} = emqx_delayed:on_message_publish(Message) + end, + lists:seq(1, 4) + ), + + AutoSubscribeTopics = + lists:map( + fun(N) -> + Num = integer_to_binary(N), + Topic = <<"auto/", Num/binary>>, + #{<<"topic">> => Topic} + end, + lists:seq(1, 3) + ), + {ok, _} = emqx_auto_subscribe:update(AutoSubscribeTopics), + + RewriteTopics = + lists:map( + fun(N) -> + Num = integer_to_binary(N), + DestTopic = <<"rewrite/dest/", Num/binary>>, + SourceTopic = <<"rewrite/source/", Num/binary>>, + #{ + <<"source_topic">> => SourceTopic, + <<"dest_topic">> => DestTopic, + <<"action">> => all, + <<"re">> => DestTopic + } + end, + lists:seq(1, 2) + ), + ok = emqx_rewrite:update(RewriteTopics), + + ok. + create_authn(ChainName, built_in_database) -> emqx_authentication:initialize_authentication( ChainName, diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 1b198a0d4..6764f5e82 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -54,6 +54,9 @@ code_change/3 ]). +%% exported for `emqx_telemetry' +-export([get_basic_usage_info/0]). + -type state() :: #{ enable := boolean(), context_id := non_neg_integer(), @@ -161,6 +164,20 @@ call(Req) -> stats_fun() -> gen_server:cast(?MODULE, ?FUNCTION_NAME). +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +-spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}. +get_basic_usage_info() -> + try + RetainedMessages = gen_server:call(?MODULE, retained_count), + #{retained_messages => RetainedMessages} + catch + _:_ -> + #{retained_messages => 0} + end. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -191,6 +208,10 @@ handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) - Mod = get_backend_module(), Result = Mod:page_read(Context, Topic, Page, Limit), {reply, Result, State}; +handle_call(retained_count, _From, State = #{context := Context}) -> + Mod = get_backend_module(), + RetainedCount = Mod:size(Context), + {reply, RetainedCount, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 953752788..94526735a 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -103,6 +103,14 @@ end_per_group(_Group, Config) -> emqx_retainer_mnesia:populate_index_meta(), Config. +init_per_testcase(t_get_basic_usage_info, Config) -> + mnesia:clear_table(?TAB_INDEX), + mnesia:clear_table(?TAB_MESSAGE), + emqx_retainer_mnesia:populate_index_meta(), + Config; +init_per_testcase(_TestCase, Config) -> + Config. + load_base_conf() -> ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF). @@ -602,6 +610,20 @@ t_reindex(_) -> end ). +t_get_basic_usage_info(_Config) -> + ?assertEqual(#{retained_messages => 0}, emqx_retainer:get_basic_usage_info()), + Context = undefined, + lists:foreach( + fun(N) -> + Num = integer_to_binary(N), + Message = emqx_message:make(<<"retained/", Num/binary>>, <<"payload">>), + ok = emqx_retainer:store_retained(Context, Message) + end, + lists:seq(1, 5) + ), + ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()), + ok. + %%-------------------------------------------------------------------- %% Helper functions %%--------------------------------------------------------------------