From 9f400eb11dc924f4c6f772177b192d776f28583c Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 16 Sep 2022 07:56:25 +0800 Subject: [PATCH 1/2] fix(slow_subs): Fix the unit error of the `deliver_begin_at` timestamp The unit of `deliver_begin_at` is incorrectly used the seconds, this will cause the `internal` stat type to never working and the `response` type time span error --- .../test/emqx_slow_subs_SUITE.erl | 64 ++++++++++++++----- src/emqx_session.erl | 2 +- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl index 6434fedd0..75f8bd433 100644 --- a/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl @@ -22,13 +22,25 @@ -include_lib("eunit/include/eunit.hrl"). -include("include/emqx_mqtt.hrl"). -include_lib("include/emqx.hrl"). +-define(LANTENCY, 101). %-define(LOGT(Format, Args), ct:pal(Format, Args)). -define(TOPK_TAB, emqx_slow_subs_topk). -define(NOW, erlang:system_time(millisecond)). -all() -> emqx_ct:all(?MODULE). +all() -> + [ {group, whole} + , {group, internal} + , {group, response} + ]. + +groups() -> + Cases = emqx_ct:all(?MODULE), + [ {whole, [], Cases} + , {internal, [], Cases} + , {response, [], Cases} + ]. init_per_suite(Config) -> emqx_ct_helpers:start_apps([emqx]), @@ -39,7 +51,8 @@ end_per_suite(Config) -> Config. init_per_testcase(_, Config) -> - emqx_mod_slow_subs:load(base_conf()), + Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)), + emqx_mod_slow_subs:load(base_conf(Group)), Config. end_per_testcase(_, _) -> @@ -49,10 +62,10 @@ end_per_testcase(_, _) -> %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- -t_log_and_pub(_) -> +t_log_and_pub(Config) -> %% Sub topic first Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], - Clients = start_client(Subs), + Clients = start_client(Subs, Config), timer:sleep(1500), Now = ?NOW, @@ -60,14 +73,14 @@ t_log_and_pub(_) -> lists:foreach(fun(I) -> Topic = list_to_binary(io_lib:format("/test1/~p", [I])), Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 500}) + emqx:publish(Msg#message{timestamp = Now - ?LANTENCY}) end, lists:seq(1, 10)), lists:foreach(fun(I) -> Topic = list_to_binary(io_lib:format("/test2/~p", [I])), Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 500}) + emqx:publish(Msg#message{timestamp = Now - ?LANTENCY}) end, lists:seq(1, 10)), @@ -77,25 +90,33 @@ t_log_and_pub(_) -> ?assert(Size =< 8 andalso Size >= 3, unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))), + ?assert( + lists:all( + fun(#{timespan := Ts}) -> + Ts >= 101 andalso Ts < ?NOW - Now + end, + emqx_slow_subs_api:get_history() + ) + ), + timer:sleep(3000), ?assert(ets:info(?TOPK_TAB, size) =:= 0), [Client ! stop || Client <- Clients], ok. -base_conf() -> - [ {threshold, 300} +base_conf(Type) -> + [ {threshold, 100} , {top_k_num, 5} , {expire_interval, timer:seconds(3)} - , {stats_type, whole} + , {stats_type, Type} ]. -start_client(Subs) -> - [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)]. +start_client(Subs, Config) -> + [spawn(fun() -> client(I, Subs, Config) end) || I <- lists:seq(1, 10)]. -client(I, Subs) -> - {ok, C} = emqtt:start_link([{host, "localhost"}, - {clientid, io_lib:format("slow_subs_~p", [I])}, - {username, <<"plain">>}, - {password, <<"plain">>}]), +client(I, Subs, Config) -> + Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)), + ConnOptions = make_conn_options(Group, I), + {ok, C} = emqtt:start_link(ConnOptions), {ok, _} = emqtt:connect(C), Len = erlang:length(Subs), @@ -115,3 +136,14 @@ try_receive(Acc) -> after 500 -> Acc end. + +make_conn_options(response, I) -> + [ {msg_handler, + #{publish => fun(_) -> timer:sleep(50) end, + disconnected => fun(_) -> ok end}} + | make_conn_options(whole, I)]; +make_conn_options(_, I) -> + [{host, "localhost"}, + {clientid, io_lib:format("slow_subs_~p", [I])}, + {username, <<"plain">>}, + {password, <<"plain">>}]. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 1d300c743..217e26bb0 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -761,7 +761,7 @@ on_delivery_completed(_ClientInfo, _Ts, _Session) -> ok. mark_begin_deliver(Msg) -> - emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg). + emqx_message:set_header(deliver_begin_at, erlang:system_time(millisecond), Msg). %%-------------------------------------------------------------------- %% Helper functions From 6fd0261915a163537ae97204ded6b76488f068e9 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 20 Sep 2022 11:01:51 +0800 Subject: [PATCH 2/2] chore: update emqx appup && CHANGES-4.4.md --- CHANGES-4.4.md | 6 ++++++ src/emqx.appup.src | 24 ++++++++++++++++-------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/CHANGES-4.4.md b/CHANGES-4.4.md index 6cfa9f74a..16d57a37c 100644 --- a/CHANGES-4.4.md +++ b/CHANGES-4.4.md @@ -1,5 +1,11 @@ # EMQX 4.4 Changes +## v4.4.10 + +### Bug fixes + +- Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8981](https://github.com/emqx/emqx/pull/8981) + ## v4.4.9 ### Bug fixes (synced from v4.3.20) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 09b877307..75598c95e 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.9", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -13,7 +14,8 @@ {load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -26,7 +28,8 @@ {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}]}, {"4.4.7", - [{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -41,7 +44,8 @@ {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.6", - [{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -253,7 +257,8 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.9", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_relup,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_channel,brutal_purge,soft_purge,[]}, @@ -264,7 +269,8 @@ {load_module,emqx_router,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.8", - [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, @@ -277,7 +283,8 @@ {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}]}, {"4.4.7", - [{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, @@ -292,7 +299,8 @@ {load_module,emqx_channel,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}]}, {"4.4.6", - [{load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, + [{load_module,emqx_session,brutal_purge,soft_purge,[]}, + {load_module,emqx_router_helper,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},