fix(slow_subs): Fix the unit error of the `deliver_begin_at` timestamp

The unit of `deliver_begin_at` is incorrectly used the seconds,
this will cause the `internal` stat type to never working and the `response` type time span error
This commit is contained in:
firest 2022-09-16 07:56:25 +08:00
parent 0fec2b874e
commit 9f400eb11d
2 changed files with 49 additions and 17 deletions

View File

@ -22,13 +22,25 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include("include/emqx_mqtt.hrl"). -include("include/emqx_mqtt.hrl").
-include_lib("include/emqx.hrl"). -include_lib("include/emqx.hrl").
-define(LANTENCY, 101).
%-define(LOGT(Format, Args), ct:pal(Format, Args)). %-define(LOGT(Format, Args), ct:pal(Format, Args)).
-define(TOPK_TAB, emqx_slow_subs_topk). -define(TOPK_TAB, emqx_slow_subs_topk).
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
all() -> emqx_ct:all(?MODULE). all() ->
[ {group, whole}
, {group, internal}
, {group, response}
].
groups() ->
Cases = emqx_ct:all(?MODULE),
[ {whole, [], Cases}
, {internal, [], Cases}
, {response, [], Cases}
].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx]), emqx_ct_helpers:start_apps([emqx]),
@ -39,7 +51,8 @@ end_per_suite(Config) ->
Config. Config.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
emqx_mod_slow_subs:load(base_conf()), Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
emqx_mod_slow_subs:load(base_conf(Group)),
Config. Config.
end_per_testcase(_, _) -> end_per_testcase(_, _) ->
@ -49,10 +62,10 @@ end_per_testcase(_, _) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases %% Test Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_log_and_pub(_) -> t_log_and_pub(Config) ->
%% Sub topic first %% Sub topic first
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
Clients = start_client(Subs), Clients = start_client(Subs, Config),
timer:sleep(1500), timer:sleep(1500),
Now = ?NOW, Now = ?NOW,
@ -60,14 +73,14 @@ t_log_and_pub(_) ->
lists:foreach(fun(I) -> lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test1/~p", [I])), Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500}) emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
end, end,
lists:seq(1, 10)), lists:seq(1, 10)),
lists:foreach(fun(I) -> lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test2/~p", [I])), Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500}) emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
end, end,
lists:seq(1, 10)), lists:seq(1, 10)),
@ -77,25 +90,33 @@ t_log_and_pub(_) ->
?assert(Size =< 8 andalso Size >= 3, ?assert(Size =< 8 andalso Size >= 3,
unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))), unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))),
?assert(
lists:all(
fun(#{timespan := Ts}) ->
Ts >= 101 andalso Ts < ?NOW - Now
end,
emqx_slow_subs_api:get_history()
)
),
timer:sleep(3000), timer:sleep(3000),
?assert(ets:info(?TOPK_TAB, size) =:= 0), ?assert(ets:info(?TOPK_TAB, size) =:= 0),
[Client ! stop || Client <- Clients], [Client ! stop || Client <- Clients],
ok. ok.
base_conf() -> base_conf(Type) ->
[ {threshold, 300} [ {threshold, 100}
, {top_k_num, 5} , {top_k_num, 5}
, {expire_interval, timer:seconds(3)} , {expire_interval, timer:seconds(3)}
, {stats_type, whole} , {stats_type, Type}
]. ].
start_client(Subs) -> start_client(Subs, Config) ->
[spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)]. [spawn(fun() -> client(I, Subs, Config) end) || I <- lists:seq(1, 10)].
client(I, Subs) -> client(I, Subs, Config) ->
{ok, C} = emqtt:start_link([{host, "localhost"}, Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
{clientid, io_lib:format("slow_subs_~p", [I])}, ConnOptions = make_conn_options(Group, I),
{username, <<"plain">>}, {ok, C} = emqtt:start_link(ConnOptions),
{password, <<"plain">>}]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
Len = erlang:length(Subs), Len = erlang:length(Subs),
@ -115,3 +136,14 @@ try_receive(Acc) ->
after 500 -> after 500 ->
Acc Acc
end. end.
make_conn_options(response, I) ->
[ {msg_handler,
#{publish => fun(_) -> timer:sleep(50) end,
disconnected => fun(_) -> ok end}}
| make_conn_options(whole, I)];
make_conn_options(_, I) ->
[{host, "localhost"},
{clientid, io_lib:format("slow_subs_~p", [I])},
{username, <<"plain">>},
{password, <<"plain">>}].

View File

@ -761,7 +761,7 @@ on_delivery_completed(_ClientInfo, _Ts, _Session) ->
ok. ok.
mark_begin_deliver(Msg) -> mark_begin_deliver(Msg) ->
emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg). emqx_message:set_header(deliver_begin_at, erlang:system_time(millisecond), Msg).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions