fix(emqx_slow_subs): fix some errors and test cases

This commit is contained in:
lafirest 2022-02-09 18:18:06 +08:00 committed by firest
parent 0c48bd92db
commit b09683bfcd
8 changed files with 29 additions and 69 deletions

View File

@ -322,8 +322,7 @@ get_session_confs(#{zone := Zone, clientid := ClientId}, #{receive_maximum := Ma
%% TODO: Add conf for allowing/disallowing persistent sessions. %% TODO: Add conf for allowing/disallowing persistent sessions.
%% Note that the connection info is already enriched to have %% Note that the connection info is already enriched to have
%% default config values for session expiry. %% default config values for session expiry.
is_persistent => EI > 0, is_persistent => EI > 0
latency_stats => emqx_config:get_zone_conf(Zone, [latency_stats])
}. }.
mqueue_confs(Zone) -> mqueue_confs(Zone) ->

View File

@ -182,9 +182,6 @@ this number of messages or bytes have passed through."""
, {"persistent_session_store", , {"persistent_session_store",
sc(ref("persistent_session_store"), sc(ref("persistent_session_store"),
#{})} #{})}
, {"latency_stats",
sc(ref("latency_stats"),
#{})}
, {"trace", , {"trace",
sc(ref("trace"), sc(ref("trace"),
#{desc => """ #{desc => """
@ -1105,10 +1102,6 @@ when deactivated, but after the retention time.
} }
]; ];
fields("latency_stats") ->
[ {"samples", sc(integer(), #{default => 10,
desc => "the number of samples for calculate the average latency of delivery"})}
];
fields("trace") -> fields("trace") ->
[ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{ [ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{
default => text, default => text,

View File

@ -183,7 +183,6 @@
, mqueue => emqx_mqueue:options() , mqueue => emqx_mqueue:options()
, is_persistent => boolean() , is_persistent => boolean()
, clientid => emqx_types:clientid() , clientid => emqx_types:clientid()
, latency_stats => emqx_message_latency_stats:create_options()
}. }.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -211,8 +210,7 @@ init(Opts) ->
awaiting_rel = #{}, awaiting_rel = #{},
max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100),
await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000), await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000),
created_at = erlang:system_time(millisecond), created_at = erlang:system_time(millisecond)
latency_stats = emqx_message_latency_stats:new(maps:get(latency_stats, Opts, #{}))
}. }.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -268,9 +266,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout; Timeout;
info(created_at, #session{created_at = CreatedAt}) -> info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt; CreatedAt.
info(latency_stats, #session{latency_stats = Stats}) ->
emqx_message_latency_stats:latency(Stats).
%% @doc Get stats of the session. %% @doc Get stats of the session.
-spec(stats(session()) -> emqx_types:stats()). -spec(stats(session()) -> emqx_types:stats()).

View File

@ -24,7 +24,7 @@ namespace() -> zone.
%% roots are added only for document generation. %% roots are added only for document generation.
roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown", roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown",
"conn_congestion", "rate_limit", "quota", "force_gc", "conn_congestion", "rate_limit", "quota", "force_gc",
"overload_protection", "latency_stats" "overload_protection"
]. ].
%% zone schemas are clones from the same name from root level %% zone schemas are clones from the same name from root level

View File

@ -22,7 +22,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
-export([ start_link/0, on_delivery_completed/4, update_settings/1 -export([ start_link/0, on_delivery_completed/3, update_settings/1
, clear_history/0, init_tab/0, post_config_update/5 , clear_history/0, init_tab/0, post_config_update/5
]). ]).
@ -77,17 +77,15 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
on_delivery_completed(_ClientInfo, on_delivery_completed(#message{timestamp = Ts},
#message{timestamp = Ts},
#{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime ->
ok; ok;
on_delivery_completed(ClientInfo, Msg, Env, Cfg) -> on_delivery_completed(Msg, Env, Cfg) ->
on_delivery_completed(ClientInfo, Msg, Env, erlang:system_time(millisecond), Cfg). on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
on_delivery_completed(#{clientid := ClientId}, on_delivery_completed(#message{topic = Topic} = Msg,
#message{topic = Topic} = Msg, #{clientid := ClientId},
_Env,
Now, Now,
#{threshold := Threshold, #{threshold := Threshold,
stats_type := StatsType, stats_type := StatsType,

View File

@ -26,7 +26,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
emqx_slow_subs:init_topk_tab(), emqx_slow_subs:init_tab(),
{ok, {{one_for_one, 10, 3600}, {ok, {{one_for_one, 10, 3600},
[#{id => st_statistics, [#{id => st_statistics,
start => {emqx_slow_subs, start_link, []}, start => {emqx_slow_subs, start_link, []},

View File

@ -31,9 +31,7 @@ slow_subs {
enable = true enable = true
top_k_num = 5, top_k_num = 5,
expire_interval = 3000 expire_interval = 3000
notice_interval = 1500 stats_type = whole
notice_qos = 0
notice_batch_size = 3
}""">>). }""">>).
all() -> all() ->
@ -60,7 +58,6 @@ t_log_and_pub(_) ->
%% Sub topic first %% Sub topic first
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
Clients = start_client(Subs), Clients = start_client(Subs),
emqx:subscribe("$SYS/brokers/+/slow_subs"),
timer:sleep(1000), timer:sleep(1000),
Now = ?NOW, Now = ?NOW,
%% publish %% publish
@ -82,15 +79,9 @@ t_log_and_pub(_) ->
timer:sleep(1000), timer:sleep(1000),
Size = ets:info(?TOPK_TAB, size), Size = ets:info(?TOPK_TAB, size),
%% some time record maybe delete due to it expired %% some time record maybe delete due to it expired
?assert(Size =< 6 andalso Size >= 4), ?assert(Size =< 6 andalso Size > 3),
timer:sleep(1500), timer:sleep(4000),
Recs = try_receive([]),
RecSum = lists:sum(Recs),
?assert(RecSum >= 5),
?assert(lists:all(fun(E) -> E =< 3 end, Recs)),
timer:sleep(3000),
?assert(ets:info(?TOPK_TAB, size) =:= 0), ?assert(ets:info(?TOPK_TAB, size) =:= 0),
[Client ! stop || Client <- Clients], [Client ! stop || Client <- Clients],
ok. ok.
@ -113,12 +104,3 @@ client(I, Subs) ->
stop -> stop ->
ok ok
end. end.
try_receive(Acc) ->
receive
{deliver, _, #message{payload = Payload}} ->
#{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]),
try_receive([length(Logs) | Acc])
after 500 ->
Acc
end.

View File

@ -40,9 +40,7 @@ slow_subs
enable = true enable = true
top_k_num = 5, top_k_num = 5,
expire_interval = 60000 expire_interval = 60000
notice_interval = 0 stats_type = whole
notice_qos = 0
notice_batch_size = 3
}""">>). }""">>).
@ -92,8 +90,7 @@ t_get_history(_) ->
Now = ?NOW, Now = ?NOW,
Each = fun(I) -> Each = fun(I) ->
ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId), ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)),
type = average,
last_update_time = Now}) last_update_time = Now})
end, end,
@ -101,18 +98,16 @@ t_get_history(_) ->
{ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10", {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10",
auth_header_()), auth_header_()),
#{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]), [First | _] = emqx_json:decode(Data, [return_maps]),
RFirst = #{<<"clientid">> => <<"test_5">>, ?assertMatch(#{<<"clientid">> := <<"test_5">>,
<<"latency">> => 5, <<"topic">> := <<"topic">>,
<<"type">> => <<"average">>, <<"last_update_time">> := Now,
<<"last_update_time">> => Now}, <<"node">> := _,
<<"timespan">> := _}, First).
?assertEqual(RFirst, First).
t_clear(_) -> t_clear(_) ->
ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>), ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)),
type = average,
last_update_time = ?NOW}), last_update_time = ?NOW}),
{ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [],
@ -122,7 +117,7 @@ t_clear(_) ->
t_settting(_) -> t_settting(_) ->
Conf = emqx:get_config([slow_subs]), Conf = emqx:get_config([slow_subs]),
Conf2 = Conf#{threshold => 1000}, Conf2 = Conf#{stats_type => internal},
{ok, Data} = request_api(put, {ok, Data} = request_api(put,
api_path(["slow_subscriptions", "settings"]), api_path(["slow_subscriptions", "settings"]),
[], [],
@ -131,7 +126,7 @@ t_settting(_) ->
Return = decode_json(Data), Return = decode_json(Data),
?assertEqual(Conf2, Return), ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return),
{ok, GetData} = request_api(get, {ok, GetData} = request_api(get,
api_path(["slow_subscriptions", "settings"]), api_path(["slow_subscriptions", "settings"]),
@ -143,10 +138,7 @@ t_settting(_) ->
GetReturn = decode_json(GetData), GetReturn = decode_json(GetData),
?assertEqual(Conf2, GetReturn), ?assertEqual(Conf2#{stats_type := <<"internal">>}, GetReturn).
?assertEqual(1000,
emqx_message_latency_stats:get_threshold()).
decode_json(Data) -> decode_json(Data) ->
BinJosn = emqx_json:decode(Data, [return_maps]), BinJosn = emqx_json:decode(Data, [return_maps]),