Merge pull request #8981 from lafirest/v4.4

fix(slow_subs): Fix the unit error of the `deliver_begin_at` timestamp
This commit is contained in:
lafirest 2022-09-26 18:05:54 +08:00 committed by GitHub
commit d3f020912e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 71 additions and 25 deletions

View File

@ -1,5 +1,11 @@
# EMQX 4.4 Changes
## v4.4.10
### Bug fixes
- Fix the latency statistics error of the slow subscription module when `stats_type` is `internal` or `response`. [#8981](https://github.com/emqx/emqx/pull/8981)
## v4.4.9
### Bug fixes (synced from v4.3.20)

View File

@ -22,13 +22,25 @@
-include_lib("eunit/include/eunit.hrl").
-include("include/emqx_mqtt.hrl").
-include_lib("include/emqx.hrl").
-define(LANTENCY, 101).
%-define(LOGT(Format, Args), ct:pal(Format, Args)).
-define(TOPK_TAB, emqx_slow_subs_topk).
-define(NOW, erlang:system_time(millisecond)).
all() -> emqx_ct:all(?MODULE).
all() ->
[ {group, whole}
, {group, internal}
, {group, response}
].
groups() ->
Cases = emqx_ct:all(?MODULE),
[ {whole, [], Cases}
, {internal, [], Cases}
, {response, [], Cases}
].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx]),
@ -39,7 +51,8 @@ end_per_suite(Config) ->
Config.
init_per_testcase(_, Config) ->
emqx_mod_slow_subs:load(base_conf()),
Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
emqx_mod_slow_subs:load(base_conf(Group)),
Config.
end_per_testcase(_, _) ->
@ -49,10 +62,10 @@ end_per_testcase(_, _) ->
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_log_and_pub(_) ->
t_log_and_pub(Config) ->
%% Sub topic first
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
Clients = start_client(Subs),
Clients = start_client(Subs, Config),
timer:sleep(1500),
Now = ?NOW,
@ -60,14 +73,14 @@ t_log_and_pub(_) ->
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500})
emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
end,
lists:seq(1, 10)),
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500})
emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
end,
lists:seq(1, 10)),
@ -77,25 +90,33 @@ t_log_and_pub(_) ->
?assert(Size =< 8 andalso Size >= 3,
unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))),
?assert(
lists:all(
fun(#{timespan := Ts}) ->
Ts >= 101 andalso Ts < ?NOW - Now
end,
emqx_slow_subs_api:get_history()
)
),
timer:sleep(3000),
?assert(ets:info(?TOPK_TAB, size) =:= 0),
[Client ! stop || Client <- Clients],
ok.
base_conf() ->
[ {threshold, 300}
base_conf(Type) ->
[ {threshold, 100}
, {top_k_num, 5}
, {expire_interval, timer:seconds(3)}
, {stats_type, whole}
, {stats_type, Type}
].
start_client(Subs) ->
[spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)].
start_client(Subs, Config) ->
[spawn(fun() -> client(I, Subs, Config) end) || I <- lists:seq(1, 10)].
client(I, Subs) ->
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, io_lib:format("slow_subs_~p", [I])},
{username, <<"plain">>},
{password, <<"plain">>}]),
client(I, Subs, Config) ->
Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
ConnOptions = make_conn_options(Group, I),
{ok, C} = emqtt:start_link(ConnOptions),
{ok, _} = emqtt:connect(C),
Len = erlang:length(Subs),
@ -115,3 +136,14 @@ try_receive(Acc) ->
after 500 ->
Acc
end.
make_conn_options(response, I) ->
[ {msg_handler,
#{publish => fun(_) -> timer:sleep(50) end,
disconnected => fun(_) -> ok end}}
| make_conn_options(whole, I)];
make_conn_options(_, I) ->
[{host, "localhost"},
{clientid, io_lib:format("slow_subs_~p", [I])},
{username, <<"plain">>},
{password, <<"plain">>}].

View File

@ -2,7 +2,8 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.9",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -13,7 +14,8 @@
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -26,7 +28,8 @@
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}]},
{"4.4.7",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -41,7 +44,8 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.6",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -253,7 +257,8 @@
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.4.9",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_relup,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
@ -264,7 +269,8 @@
{load_module,emqx_router,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
@ -277,7 +283,8 @@
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}]},
{"4.4.7",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
@ -292,7 +299,8 @@
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
{"4.4.6",
[{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},

View File

@ -761,7 +761,7 @@ on_delivery_completed(_ClientInfo, _Ts, _Session) ->
ok.
mark_begin_deliver(Msg) ->
emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg).
emqx_message:set_header(deliver_begin_at, erlang:system_time(millisecond), Msg).
%%--------------------------------------------------------------------
%% Helper functions