From 9f400eb11dc924f4c6f772177b192d776f28583c Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 16 Sep 2022 07:56:25 +0800 Subject: [PATCH] fix(slow_subs): Fix the unit error of the `deliver_begin_at` timestamp The unit of `deliver_begin_at` is incorrectly used the seconds, this will cause the `internal` stat type to never working and the `response` type time span error --- .../test/emqx_slow_subs_SUITE.erl | 64 ++++++++++++++----- src/emqx_session.erl | 2 +- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl index 6434fedd0..75f8bd433 100644 --- a/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl @@ -22,13 +22,25 @@ -include_lib("eunit/include/eunit.hrl"). -include("include/emqx_mqtt.hrl"). -include_lib("include/emqx.hrl"). +-define(LANTENCY, 101). %-define(LOGT(Format, Args), ct:pal(Format, Args)). -define(TOPK_TAB, emqx_slow_subs_topk). -define(NOW, erlang:system_time(millisecond)). -all() -> emqx_ct:all(?MODULE). +all() -> + [ {group, whole} + , {group, internal} + , {group, response} + ]. + +groups() -> + Cases = emqx_ct:all(?MODULE), + [ {whole, [], Cases} + , {internal, [], Cases} + , {response, [], Cases} + ]. init_per_suite(Config) -> emqx_ct_helpers:start_apps([emqx]), @@ -39,7 +51,8 @@ end_per_suite(Config) -> Config. init_per_testcase(_, Config) -> - emqx_mod_slow_subs:load(base_conf()), + Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)), + emqx_mod_slow_subs:load(base_conf(Group)), Config. end_per_testcase(_, _) -> @@ -49,10 +62,10 @@ end_per_testcase(_, _) -> %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- -t_log_and_pub(_) -> +t_log_and_pub(Config) -> %% Sub topic first Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], - Clients = start_client(Subs), + Clients = start_client(Subs, Config), timer:sleep(1500), Now = ?NOW, @@ -60,14 +73,14 @@ t_log_and_pub(_) -> lists:foreach(fun(I) -> Topic = list_to_binary(io_lib:format("/test1/~p", [I])), Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 500}) + emqx:publish(Msg#message{timestamp = Now - ?LANTENCY}) end, lists:seq(1, 10)), lists:foreach(fun(I) -> Topic = list_to_binary(io_lib:format("/test2/~p", [I])), Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 500}) + emqx:publish(Msg#message{timestamp = Now - ?LANTENCY}) end, lists:seq(1, 10)), @@ -77,25 +90,33 @@ t_log_and_pub(_) -> ?assert(Size =< 8 andalso Size >= 3, unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))), + ?assert( + lists:all( + fun(#{timespan := Ts}) -> + Ts >= 101 andalso Ts < ?NOW - Now + end, + emqx_slow_subs_api:get_history() + ) + ), + timer:sleep(3000), ?assert(ets:info(?TOPK_TAB, size) =:= 0), [Client ! stop || Client <- Clients], ok. -base_conf() -> - [ {threshold, 300} +base_conf(Type) -> + [ {threshold, 100} , {top_k_num, 5} , {expire_interval, timer:seconds(3)} - , {stats_type, whole} + , {stats_type, Type} ]. -start_client(Subs) -> - [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)]. +start_client(Subs, Config) -> + [spawn(fun() -> client(I, Subs, Config) end) || I <- lists:seq(1, 10)]. -client(I, Subs) -> - {ok, C} = emqtt:start_link([{host, "localhost"}, - {clientid, io_lib:format("slow_subs_~p", [I])}, - {username, <<"plain">>}, - {password, <<"plain">>}]), +client(I, Subs, Config) -> + Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)), + ConnOptions = make_conn_options(Group, I), + {ok, C} = emqtt:start_link(ConnOptions), {ok, _} = emqtt:connect(C), Len = erlang:length(Subs), @@ -115,3 +136,14 @@ try_receive(Acc) -> after 500 -> Acc end. + +make_conn_options(response, I) -> + [ {msg_handler, + #{publish => fun(_) -> timer:sleep(50) end, + disconnected => fun(_) -> ok end}} + | make_conn_options(whole, I)]; +make_conn_options(_, I) -> + [{host, "localhost"}, + {clientid, io_lib:format("slow_subs_~p", [I])}, + {username, <<"plain">>}, + {password, <<"plain">>}]. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 1d300c743..217e26bb0 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -761,7 +761,7 @@ on_delivery_completed(_ClientInfo, _Ts, _Session) -> ok. mark_begin_deliver(Msg) -> - emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg). + emqx_message:set_header(deliver_begin_at, erlang:system_time(millisecond), Msg). %%-------------------------------------------------------------------- %% Helper functions