From f34fce7c70361a0633c0b7b32426be027ec0d5b0 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 31 Mar 2022 14:29:42 +0300 Subject: [PATCH] chore(emqx_modules): improve emqx_topic_metrics coverage --- apps/emqx_modules/src/emqx_topic_metrics.erl | 6 +- .../test/emqx_topic_metrics_SUITE.erl | 114 ++++++++++++++---- 2 files changed, 94 insertions(+), 26 deletions(-) diff --git a/apps/emqx_modules/src/emqx_topic_metrics.erl b/apps/emqx_modules/src/emqx_topic_metrics.erl index 5aa04db07..65409fd61 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ on_message_publish/1, @@ -212,6 +213,7 @@ init([Opts]) -> error("max topic metrics quota exceeded") end end, + ?tp(debug, emqx_topic_metrics_started, #{}), {ok, #state{speeds = lists:foldl(Fun, #{}, Opts)}, hibernate}. handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) -> @@ -262,7 +264,7 @@ handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) end. handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), + ?tp(error, emqx_topic_metrics_unexpected_cast, #{cast => Msg}), {noreply, State}. handle_info(ticking, State = #state{speeds = Speeds}) -> @@ -278,7 +280,7 @@ handle_info(ticking, State = #state{speeds = Speeds}) -> erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking), {noreply, State#state{speeds = NSpeeds}}; handle_info(Info, State) -> - ?SLOG(error, #{msg => "unexpected_info", info => Info}), + ?tp(error, emqx_topic_metrics_unexpected_info, #{info => Info}), {noreply, State}. terminate(_Reason, _State) -> diff --git a/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl b/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl index 4d8a340d2..87ec1382c 100644 --- a/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl +++ b/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl @@ -19,14 +19,10 @@ -compile(export_all). -compile(nowarn_export_all). --define(TOPIC, << - "" - "\n" - "topic_metrics: []" - "" ->>). +-define(TOPIC, #{<<"topic_metrics">> => []}). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). @@ -39,8 +35,17 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_modules]). -t_nonexistent_topic_metrics(_) -> +init_per_testcase(_Case, Config) -> emqx_topic_metrics:enable(), + emqx_topic_metrics:deregister_all(), + Config. + +end_per_testcase(_Case, _Config) -> + emqx_topic_metrics:deregister_all(), + emqx_config:put([topic_metrics], []), + emqx_topic_metrics:disable(). + +t_nonexistent_topic_metrics(_) -> ?assertEqual({error, topic_not_found}, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual({error, topic_not_found}, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), ?assertEqual({error, topic_not_found}, emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in')), @@ -56,12 +61,9 @@ t_nonexistent_topic_metrics(_) -> %% emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics') %% ), - emqx_topic_metrics:deregister(<<"a/b/c">>), - emqx_topic_metrics:disable(). + ok = emqx_topic_metrics:deregister(<<"a/b/c">>). t_topic_metrics(_) -> - emqx_topic_metrics:enable(), - ?assertEqual(false, emqx_topic_metrics:is_registered(<<"a/b/c">>)), ?assertEqual([], emqx_topic_metrics:all_registered_topics()), emqx_topic_metrics:register(<<"a/b/c">>), @@ -78,11 +80,9 @@ t_topic_metrics(_) -> %% #{long => 0, medium => 0, short => 0} %% ), - emqx_topic_metrics:deregister(<<"a/b/c">>), - emqx_topic_metrics:disable(). + ok = emqx_topic_metrics:deregister(<<"a/b/c">>). t_hook(_) -> - emqx_topic_metrics:enable(), emqx_topic_metrics:register(<<"a/b/c">>), ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), @@ -97,19 +97,85 @@ t_hook(_) -> {username, "myuser"} ]), {ok, _} = emqtt:connect(C), - emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 0}]), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 1}]), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 2}]), ct:sleep(100), - ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(3, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), - ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos1.in')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos2.in')), + ?assertEqual(3, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), - emqtt:subscribe(C, <<"a/b/c">>), - emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), + emqtt:subscribe(C, <<"a/b/c">>, [{qos, 2}]), ct:sleep(100), - ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 0}]), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 1}]), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, [{qos, 2}]), + ct:sleep(100), + ?assertEqual(6, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), - ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')), + ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos1.in')), + ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos2.in')), + ?assertEqual(3, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')), ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), - ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), - emqx_topic_metrics:deregister(<<"a/b/c">>), - emqx_topic_metrics:disable(). + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos1.out')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos2.out')), + ?assertEqual(3, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), + ok = emqx_topic_metrics:deregister(<<"a/b/c">>). + +t_topic_server_restart(_) -> + emqx_config:put([topic_metrics], [#{topic => <<"a/b/c">>}]), + ?check_trace( + begin + ?wait_async_action( + erlang:exit(whereis(emqx_topic_metrics), kill), + #{?snk_kind := emqx_topic_metrics_started}, + 500 + ) + end, + fun(Trace) -> + ?assertMatch( + [_ | _], + ?of_kind(emqx_topic_metrics_started, Trace) + ) + end + ), + + ?assertEqual( + [<<"a/b/c">>], + emqx_topic_metrics:all_registered_topics() + ). + +t_unknown_messages(_) -> + OldPid = whereis(emqx_topic_metrics), + ?check_trace( + begin + ?wait_async_action( + OldPid ! unknown, + #{?snk_kind := emqx_topic_metrics_unexpected_info}, + 500 + ), + ?wait_async_action( + gen_server:cast(OldPid, unknown), + #{?snk_kind := emqx_topic_metrics_unexpected_cast}, + 500 + ) + end, + fun(Trace) -> + ?assertMatch( + [_ | _], + ?of_kind(emqx_topic_metrics_unexpected_info, Trace) + ), + ?assertMatch( + [_ | _], + ?of_kind(emqx_topic_metrics_unexpected_cast, Trace) + ) + end + ), + + %% emqx_topic_metrics did not crash from unexpected calls + ?assertEqual( + OldPid, + whereis(emqx_topic_metrics) + ).