fix(emqx_slow_subs): limit the max size of top-k table
This commit is contained in:
parent
d769401869
commit
d60c586bfb
|
@ -30,6 +30,7 @@
|
||||||
-export([ node_query/5
|
-export([ node_query/5
|
||||||
, cluster_query/4
|
, cluster_query/4
|
||||||
, select_table_with_count/5
|
, select_table_with_count/5
|
||||||
|
, b2i/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([do_query/6]).
|
-export([do_query/6]).
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
-define(INDEX(Latency, ClientId), {Latency, ClientId}).
|
-define(INDEX(Latency, ClientId), {Latency, ClientId}).
|
||||||
|
|
||||||
|
-define(MAX_TAB_SIZE, 1000).
|
||||||
|
|
||||||
-record(top_k, { index :: index()
|
-record(top_k, { index :: index()
|
||||||
, type :: emqx_message_latency_stats:latency_type()
|
, type :: emqx_message_latency_stats:latency_type()
|
||||||
, last_update_time :: pos_integer()
|
, last_update_time :: pos_integer()
|
||||||
|
|
|
@ -251,7 +251,8 @@ publish(TickTime, Notices) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
load() ->
|
load() ->
|
||||||
MaxSize = emqx:get_config([emqx_slow_subs, top_k_num]),
|
MaxSizeT = emqx:get_config([emqx_slow_subs, top_k_num]),
|
||||||
|
MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE),
|
||||||
_ = emqx:hook('message.slow_subs_stats',
|
_ = emqx:hook('message.slow_subs_stats',
|
||||||
{?MODULE, on_stats_update, [#{max_size => MaxSize}]}
|
{?MODULE, on_stats_update, [#{max_size => MaxSize}]}
|
||||||
),
|
),
|
||||||
|
|
|
@ -87,7 +87,11 @@ slow_subs(delete, _) ->
|
||||||
ok = emqx_slow_subs:clear_history(),
|
ok = emqx_slow_subs:clear_history(),
|
||||||
{204};
|
{204};
|
||||||
|
|
||||||
slow_subs(get, #{query_string := QS}) ->
|
slow_subs(get, #{query_string := QST}) ->
|
||||||
|
LimitT = maps:get(<<"limit">>, QST, ?MAX_TAB_SIZE),
|
||||||
|
Limit = erlang:min(?MAX_TAB_SIZE, emqx_mgmt_api:b2i(LimitT)),
|
||||||
|
Page = maps:get(<<"page">>, QST, 1),
|
||||||
|
QS = QST#{<<"limit">> => Limit, <<"page">> => Page},
|
||||||
Data = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, QS, ?FORMAT_FUN),
|
Data = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, QS, ?FORMAT_FUN),
|
||||||
{200, Data}.
|
{200, Data}.
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@ fields("emqx_slow_subs") ->
|
||||||
"The latency threshold for statistics, the minimum value is 100ms")}
|
"The latency threshold for statistics, the minimum value is 100ms")}
|
||||||
, {expire_interval,
|
, {expire_interval,
|
||||||
sc(emqx_schema:duration_ms(),
|
sc(emqx_schema:duration_ms(),
|
||||||
"5m",
|
"300s",
|
||||||
"The eviction time of the record, which in the statistics record table")}
|
"The eviction time of the record, which in the statistics record table")}
|
||||||
, {top_k_num,
|
, {top_k_num,
|
||||||
sc(integer(),
|
sc(integer(),
|
||||||
|
|
|
@ -90,7 +90,7 @@ t_log_and_pub(_) ->
|
||||||
?assert(RecSum >= 5),
|
?assert(RecSum >= 5),
|
||||||
?assert(lists:all(fun(E) -> E =< 3 end, Recs)),
|
?assert(lists:all(fun(E) -> E =< 3 end, Recs)),
|
||||||
|
|
||||||
timer:sleep(2000),
|
timer:sleep(3000),
|
||||||
?assert(ets:info(?TOPK_TAB, size) =:= 0),
|
?assert(ets:info(?TOPK_TAB, size) =:= 0),
|
||||||
[Client ! stop || Client <- Clients],
|
[Client ! stop || Client <- Clients],
|
||||||
ok.
|
ok.
|
||||||
|
|
Loading…
Reference in New Issue