feat(telemetry): count advanced mqtt feature usage

Originally, we wanted to just check if certain features were enabled.
Now, we want to count certain usage metrics for them.
This commit is contained in:
Thales Macedo Garitezi 2022-05-09 11:16:25 -03:00
parent a13b146499
commit 9e706fc76d
No known key found for this signature in database
GPG Key ID: DD279F8152A9B6DD
12 changed files with 239 additions and 85 deletions

View File

@ -33,6 +33,9 @@
%% hook callback
-export([on_client_connected/3]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
load() ->
ok = emqx_conf:add_handler([auto_subscribe, topics], ?MODULE),
update_hook().
@ -67,6 +70,15 @@ on_client_connected(ClientInfo, ConnInfo, {TopicHandler, Options}) ->
on_client_connected(_, _, _) ->
ok.
%%--------------------------------------------------------------------
%% Telemetry
%%--------------------------------------------------------------------
-spec get_basic_usage_info() -> #{auto_subscribe_count => non_neg_integer()}.
get_basic_usage_info() ->
AutoSubscribe = emqx_conf:get([auto_subscribe, topics], []),
#{auto_subscribe_count => length(AutoSubscribe)}.
%%--------------------------------------------------------------------
%% internal

View File

@ -41,7 +41,7 @@
-define(CLIENT_USERNAME, <<"auto_sub_u">>).
all() ->
[t_auto_subscribe, t_update].
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
mria:start(),
@ -99,6 +99,18 @@ init_per_suite(Config) ->
),
Config.
init_per_testcase(t_get_basic_usage_info, Config) ->
{ok, _} = emqx_auto_subscribe:update([]),
Config;
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(t_get_basic_usage_info, _Config) ->
{ok, _} = emqx_auto_subscribe:update([]),
ok;
end_per_testcase(_TestCase, _Config) ->
ok.
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(),
ok;
@ -150,6 +162,21 @@ t_update(_) ->
?assertEqual(1, erlang:length(GETResponseMap)),
ok.
t_get_basic_usage_info(_Config) ->
?assertEqual(#{auto_subscribe_count => 0}, emqx_auto_subscribe:get_basic_usage_info()),
AutoSubscribeTopics =
lists:map(
fun(N) ->
Num = integer_to_binary(N),
Topic = <<"auto/", Num/binary>>,
#{<<"topic">> => Topic}
end,
lists:seq(1, 3)
),
{ok, _} = emqx_auto_subscribe:update(AutoSubscribeTopics),
?assertEqual(#{auto_subscribe_count => 3}, emqx_auto_subscribe:get_basic_usage_info()),
ok.
check_subs(Count) ->
Subs = ets:tab2list(emqx_suboption),
ct:pal("---> ~p ~p ~n", [Subs, Count]),

View File

@ -61,6 +61,9 @@
-export([format_delayed/1]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
-record(delayed_message, {key, delayed, msg}).
-type delayed_message() :: #delayed_message{}.
-type with_id_return() :: ok | {error, not_found}.
@ -314,6 +317,19 @@ terminate(_Reason, #{stats_timer := StatsTimer} = State) ->
code_change(_Vsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Telemetry
%%--------------------------------------------------------------------
-spec get_basic_usage_info() -> #{delayed_message_count => non_neg_integer()}.
get_basic_usage_info() ->
DelayedCount =
case ets:info(?TAB, size) of
undefined -> 0;
Num -> Num
end,
#{delayed_message_count => DelayedCount}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -1,37 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules).
-export([
get_advanced_mqtt_features_in_use/0,
set_advanced_mqtt_features_in_use/1
]).
-type advanced_mqtt_feature() :: delayed | topic_rewrite | retained | auto_subscribe.
-type advanced_mqtt_features_in_use() :: #{advanced_mqtt_feature() => boolean()}.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec get_advanced_mqtt_features_in_use() -> advanced_mqtt_features_in_use().
get_advanced_mqtt_features_in_use() ->
application:get_env(?MODULE, advanced_mqtt_features_in_use, #{}).
-spec set_advanced_mqtt_features_in_use(advanced_mqtt_features_in_use()) -> ok.
set_advanced_mqtt_features_in_use(Features) ->
application:set_env(?MODULE, advanced_mqtt_features_in_use, Features).

View File

@ -33,19 +33,7 @@ stop(_State) ->
ok.
maybe_enable_modules() ->
DelayedEnabled = emqx_conf:get([delayed, enable], true),
RewriteEnabled = length(emqx_conf:get([rewrite], [])) > 0,
RetainerEnabled = emqx_conf:get([retainer, enable], false),
AutoSubscribeEnabled = length(emqx_conf:get([auto_subscribe, topics], [])) > 0,
emqx_modules:set_advanced_mqtt_features_in_use(
#{
delayed => DelayedEnabled,
topic_rewrite => RewriteEnabled,
retained => RetainerEnabled,
auto_subscribe => AutoSubscribeEnabled
}
),
DelayedEnabled andalso emqx_delayed:enable(),
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(),
emqx_modules_conf:is_telemetry_enabled() andalso emqx_telemetry:enable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
emqx_conf_cli:load(),

View File

@ -45,6 +45,9 @@
post_config_update/5
]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
@ -98,6 +101,15 @@ rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
{ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}.
%%--------------------------------------------------------------------
%% Telemetry
%%--------------------------------------------------------------------
-spec get_basic_usage_info() -> #{topic_rewrite_rule_count => non_neg_integer()}.
get_basic_usage_info() ->
RewriteRules = list(),
#{topic_rewrite_rule_count => length(RewriteRules)}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -447,8 +447,16 @@ update_mqtt_rates(State) ->
{#{}, State}.
advanced_mqtt_features() ->
AdvancedFeatures = emqx_modules:get_advanced_mqtt_features_in_use(),
maps:map(fun(_K, V) -> bool2int(V) end, AdvancedFeatures).
#{retained_messages := RetainedMessages} = emqx_retainer:get_basic_usage_info(),
#{topic_rewrite_rule_count := RewriteRules} = emqx_rewrite:get_basic_usage_info(),
#{delayed_message_count := DelayedCount} = emqx_delayed:get_basic_usage_info(),
#{auto_subscribe_count := AutoSubscribe} = emqx_auto_subscribe:get_basic_usage_info(),
#{
topic_rewrite => RewriteRules,
delayed => DelayedCount,
retained => RetainedMessages,
auto_subscribe => AutoSubscribe
}.
get_authn_authz_info() ->
try
@ -521,9 +529,6 @@ bin(A) when is_atom(A) ->
bin(B) when is_binary(B) ->
B.
bool2int(true) -> 1;
bool2int(false) -> 0.
ensure_uuids() ->
Txn = fun() ->
NodeUUID =

View File

@ -49,6 +49,7 @@ init_per_testcase(_Case, Config) ->
Config.
end_per_testcase(_Case, _Config) ->
{atomic, ok} = mria:clear_table(emqx_delayed),
ok = emqx_delayed:disable().
%%--------------------------------------------------------------------
@ -178,3 +179,16 @@ t_unknown_messages(_) ->
ignored,
gen_server:call(OldPid, unknown)
).
t_get_basic_usage_info(_Config) ->
?assertEqual(#{delayed_message_count => 0}, emqx_delayed:get_basic_usage_info()),
lists:foreach(
fun(N) ->
Num = integer_to_binary(N),
Message = emqx_message:make(<<"$delayed/", Num/binary, "/delayed">>, <<"payload">>),
{stop, _} = emqx_delayed:on_message_publish(Message)
end,
lists:seq(1, 4)
),
?assertEqual(#{delayed_message_count => 4}, emqx_delayed:get_basic_usage_info()),
ok.

View File

@ -55,6 +55,18 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]).
init_per_testcase(t_get_basic_usage_info, Config) ->
ok = emqx_rewrite:update([]),
Config;
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(t_get_basic_usage_info, _Config) ->
ok = emqx_rewrite:update([]),
ok;
end_per_testcase(_TestCase, _Config) ->
ok.
t_subscribe_rewrite(_Config) ->
{ok, Conn} = init(),
SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>],
@ -200,6 +212,27 @@ t_update_re_failed(_Config) ->
),
ok.
t_get_basic_usage_info(_Config) ->
?assertEqual(#{topic_rewrite_rule_count => 0}, emqx_rewrite:get_basic_usage_info()),
RewriteTopics =
lists:map(
fun(N) ->
Num = integer_to_binary(N),
DestTopic = <<"rewrite/dest/", Num/binary>>,
SourceTopic = <<"rewrite/source/", Num/binary>>,
#{
<<"source_topic">> => SourceTopic,
<<"dest_topic">> => DestTopic,
<<"action">> => all,
<<"re">> => DestTopic
}
end,
lists:seq(1, 2)
),
ok = emqx_rewrite:update(RewriteTopics),
?assertEqual(#{topic_rewrite_rule_count => 2}, emqx_rewrite:get_basic_usage_info()),
ok.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -91,14 +91,14 @@ init_per_testcase(t_get_telemetry, Config) ->
{ok, _} = application:ensure_all_started(emqx_gateway),
Config;
init_per_testcase(t_advanced_mqtt_features, Config) ->
OldValues = emqx_modules:get_advanced_mqtt_features_in_use(),
emqx_modules:set_advanced_mqtt_features_in_use(#{
delayed => false,
topic_rewrite => false,
retained => false,
auto_subscribe => false
}),
[{old_values, OldValues} | Config];
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
{ok, Retainer} = emqx_retainer:start_link(),
{atomic, ok} = mria:clear_table(emqx_delayed),
mock_advanced_mqtt_features(),
[
{retainer, Retainer}
| Config
];
init_per_testcase(t_authn_authz_info, Config) ->
mock_httpc(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
@ -160,8 +160,14 @@ end_per_testcase(t_get_telemetry, _Config) ->
application:stop(emqx_gateway),
ok;
end_per_testcase(t_advanced_mqtt_features, Config) ->
OldValues = ?config(old_values, Config),
emqx_modules:set_advanced_mqtt_features_in_use(OldValues);
Retainer = ?config(retainer, Config),
process_flag(trap_exit, true),
ok = emqx_retainer:clean(),
exit(Retainer, kill),
{ok, _} = emqx_auto_subscribe:update([]),
ok = emqx_rewrite:update([]),
{atomic, ok} = mria:clear_table(emqx_delayed),
ok;
end_per_testcase(t_authn_authz_info, _Config) ->
meck:unload([httpc]),
emqx_authz:update({delete, postgresql}, #{}),
@ -322,28 +328,13 @@ t_advanced_mqtt_features(_) ->
AdvFeats = get_value(advanced_mqtt_features, TelemetryData),
?assertEqual(
#{
retained => 0,
topic_rewrite => 0,
auto_subscribe => 0,
delayed => 0
retained => 5,
topic_rewrite => 2,
auto_subscribe => 3,
delayed => 4
},
AdvFeats
),
lists:foreach(
fun(TelemetryKey) ->
EnabledFeats = emqx_modules:get_advanced_mqtt_features_in_use(),
emqx_modules:set_advanced_mqtt_features_in_use(EnabledFeats#{TelemetryKey => true}),
{ok, Data} = emqx_telemetry:get_telemetry(),
#{TelemetryKey := Value} = get_value(advanced_mqtt_features, Data),
?assertEqual(1, Value, #{key => TelemetryKey})
end,
[
retained,
topic_rewrite,
auto_subscribe,
delayed
]
),
ok.
t_authn_authz_info(_) ->
@ -514,6 +505,56 @@ mock_httpc() ->
TestPID ! {request, Method, URL, Headers, Body}
end).
mock_advanced_mqtt_features() ->
Context = undefined,
lists:foreach(
fun(N) ->
Num = integer_to_binary(N),
Message = emqx_message:make(<<"retained/", Num/binary>>, <<"payload">>),
ok = emqx_retainer:store_retained(Context, Message)
end,
lists:seq(1, 5)
),
lists:foreach(
fun(N) ->
Num = integer_to_binary(N),
Message = emqx_message:make(<<"$delayed/", Num/binary, "/delayed">>, <<"payload">>),
{stop, _} = emqx_delayed:on_message_publish(Message)
end,
lists:seq(1, 4)
),
AutoSubscribeTopics =
lists:map(
fun(N) ->
Num = integer_to_binary(N),
Topic = <<"auto/", Num/binary>>,
#{<<"topic">> => Topic}
end,
lists:seq(1, 3)
),
{ok, _} = emqx_auto_subscribe:update(AutoSubscribeTopics),
RewriteTopics =
lists:map(
fun(N) ->
Num = integer_to_binary(N),
DestTopic = <<"rewrite/dest/", Num/binary>>,
SourceTopic = <<"rewrite/source/", Num/binary>>,
#{
<<"source_topic">> => SourceTopic,
<<"dest_topic">> => DestTopic,
<<"action">> => all,
<<"re">> => DestTopic
}
end,
lists:seq(1, 2)
),
ok = emqx_rewrite:update(RewriteTopics),
ok.
create_authn(ChainName, built_in_database) ->
emqx_authentication:initialize_authentication(
ChainName,

View File

@ -54,6 +54,9 @@
code_change/3
]).
%% exported for `emqx_telemetry'
-export([get_basic_usage_info/0]).
-type state() :: #{
enable := boolean(),
context_id := non_neg_integer(),
@ -161,6 +164,20 @@ call(Req) ->
stats_fun() ->
gen_server:cast(?MODULE, ?FUNCTION_NAME).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec get_basic_usage_info() -> #{retained_messages => non_neg_integer()}.
get_basic_usage_info() ->
try
RetainedMessages = gen_server:call(?MODULE, retained_count),
#{retained_messages => RetainedMessages}
catch
_:_ ->
#{retained_messages => 0}
end.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@ -191,6 +208,10 @@ handle_call({page_read, Topic, Page, Limit}, _, #{context := Context} = State) -
Mod = get_backend_module(),
Result = Mod:page_read(Context, Topic, Page, Limit),
{reply, Result, State};
handle_call(retained_count, _From, State = #{context := Context}) ->
Mod = get_backend_module(),
RetainedCount = Mod:size(Context),
{reply, RetainedCount, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.

View File

@ -103,6 +103,14 @@ end_per_group(_Group, Config) ->
emqx_retainer_mnesia:populate_index_meta(),
Config.
init_per_testcase(t_get_basic_usage_info, Config) ->
mnesia:clear_table(?TAB_INDEX),
mnesia:clear_table(?TAB_MESSAGE),
emqx_retainer_mnesia:populate_index_meta(),
Config;
init_per_testcase(_TestCase, Config) ->
Config.
load_base_conf() ->
ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF).
@ -602,6 +610,20 @@ t_reindex(_) ->
end
).
t_get_basic_usage_info(_Config) ->
?assertEqual(#{retained_messages => 0}, emqx_retainer:get_basic_usage_info()),
Context = undefined,
lists:foreach(
fun(N) ->
Num = integer_to_binary(N),
Message = emqx_message:make(<<"retained/", Num/binary>>, <<"payload">>),
ok = emqx_retainer:store_retained(Context, Message)
end,
lists:seq(1, 5)
),
?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()),
ok.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------