refactor(emqx_slow_subs): refactor slow subs

This commit is contained in:
lafirest 2022-01-07 17:21:30 +08:00
parent 8901a85398
commit 44fe882f14
11 changed files with 300 additions and 411 deletions

View File

@ -15,14 +15,24 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(TOPK_TAB, emqx_slow_subs_topk). -define(TOPK_TAB, emqx_slow_subs_topk).
-define(INDEX_TAB, emqx_slow_subs_index).
-define(INDEX(Latency, ClientId), {Latency, ClientId}). -define(ID(ClientId, Topic), {ClientId, Topic}).
-define(INDEX(TimeSpan, Id), {Id, TimeSpan}).
-define(TOPK_INDEX(TimeSpan, Id), {TimeSpan, Id}).
-record(top_k, { index :: index() -define(MAX_SIZE, 1000).
, type :: emqx_message_latency_stats:latency_type()
-record(top_k, { index :: topk_index()
, last_update_time :: pos_integer() , last_update_time :: pos_integer()
, extra = [] , extra = []
}). }).
-record(index_tab, { index :: index()}).
-type top_k() :: #top_k{}. -type top_k() :: #top_k{}.
-type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()). -type index_tab() :: #index_tab{}.
-type id() :: {emqx_types:clientid(), emqx_types:topic()}.
-type index() :: ?INDEX(non_neg_integer(), id()).
-type topk_index() :: ?TOPK_INDEX(non_neg_integer(), id()).

View File

@ -24,8 +24,8 @@
-logger_header("[SLOW Subs]"). -logger_header("[SLOW Subs]").
-export([ start_link/1, on_stats_update/2, enable/0 -export([ start_link/1, on_publish_completed/3, enable/0
, disable/0, clear_history/0, init_topk_tab/0 , disable/0, clear_history/0, init_tab/0
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -44,28 +44,19 @@
, last_tick_at := pos_integer() , last_tick_at := pos_integer()
}. }.
-type log() :: #{ rank := pos_integer()
, clientid := emqx_types:clientid()
, latency := non_neg_integer()
, type := emqx_message_latency_stats:latency_type()
}.
-type window_log() :: #{ last_tick_at := pos_integer()
, logs := [log()]
}.
-type message() :: #message{}. -type message() :: #message{}.
-import(proplists, [get_value/2]). -import(proplists, [get_value/2]).
-type stats_update_args() :: #{ clientid := emqx_types:clientid() -type stats_type() :: whole %% whole = internal + response
, latency := non_neg_integer() | internal %% timespan from message in to deliver
, type := emqx_message_latency_stats:latency_type() | response. %% timespan from delivery to client response
, last_insert_value := non_neg_integer()
, update_time := timer:time()
}.
-type stats_update_env() :: #{max_size := pos_integer()}. -type stats_update_args() :: #{ clientid := emqx_types:clientid()}.
-type stats_update_env() :: #{ threshold := non_neg_integer()
, stats_type := stats_type()
, max_size := pos_integer()}.
-ifdef(TEST). -ifdef(TEST).
-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). -define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
@ -74,7 +65,6 @@
-endif. -endif.
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
-define(NOTICE_TOPIC_NAME, "slow_subs").
-define(DEF_CALL_TIMEOUT, timer:seconds(10)). -define(DEF_CALL_TIMEOUT, timer:seconds(10)).
%% erlang term order %% erlang term order
@ -91,36 +81,29 @@
start_link(Env) -> start_link(Env) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
%% XXX NOTE:pay attention to the performance here on_publish_completed(#message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg)
-spec on_stats_update(stats_update_args(), stats_update_env()) -> true. when Ts =< BirthTime ->
on_stats_update(#{clientid := ClientId, ok;
latency := Latency,
type := Type,
last_insert_value := LIV,
update_time := Ts},
#{max_size := MaxSize}) ->
LastIndex = ?INDEX(LIV, ClientId), on_publish_completed(Msg, Env, Cfg) ->
Index = ?INDEX(Latency, ClientId), on_publish_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
%% check whether the client is in the table on_publish_completed(#message{topic = Topic} = Msg,
case ets:lookup(?TOPK_TAB, LastIndex) of #{clientid := ClientId},
[#top_k{index = Index}] -> Now, #{threshold := Threshold,
%% if last value == the new value, update the type and last_update_time stats_type := StatsType,
%% XXX for clients whose latency are stable for a long time, is it possible to reduce updates? max_size := MaxSize}) ->
ets:insert(?TOPK_TAB, TimeSpan = calc_timespan(StatsType, Msg, Now),
#top_k{index = Index, type = Type, last_update_time = Ts}); case TimeSpan =< Threshold of
[_] -> true -> ok;
%% if Latency > minimum value, we should update it _ ->
%% if Latency < minimum value, maybe it can replace the minimum value Id = ?ID(ClientId, Topic),
%% so alwyas update at here LastUpdateValue = find_last_update_value(Id),
%% do we need check if Latency == minimum ??? case TimeSpan =< LastUpdateValue of
ets:insert(?TOPK_TAB, true -> ok;
#top_k{index = Index, type = Type, last_update_time = Ts}), _ ->
ets:delete(?TOPK_TAB, LastIndex); try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id)
[] -> end
%% try to insert
try_insert_to_topk(MaxSize, Index, Latency, Type, Ts)
end. end.
clear_history() -> clear_history() ->
@ -132,26 +115,23 @@ enable() ->
disable() -> disable() ->
gen_server:call(?MODULE, {enable, false}, ?DEF_CALL_TIMEOUT). gen_server:call(?MODULE, {enable, false}, ?DEF_CALL_TIMEOUT).
init_topk_tab() -> init_tab() ->
case ets:whereis(?TOPK_TAB) of safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table
undefined -> , {keypos, #top_k.index}, {write_concurrency, true}
?TOPK_TAB = ets:new(?TOPK_TAB, , {read_concurrency, true}
[ ordered_set, public, named_table ]),
, {keypos, #top_k.index}, {write_concurrency, true}
safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table
, {keypos, #index_tab.index}, {write_concurrency, true}
, {read_concurrency, true} , {read_concurrency, true}
]); ]).
_ ->
?TOPK_TAB
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Conf]) -> init([Conf]) ->
notice_tick(Conf),
expire_tick(Conf), expire_tick(Conf),
update_threshold(Conf),
load(Conf), load(Conf),
{ok, #{config => Conf, {ok, #{config => Conf,
last_tick_at => ?NOW, last_tick_at => ?NOW,
@ -163,7 +143,6 @@ handle_call({enable, Enable}, _From,
IsEnable -> IsEnable ->
State; State;
true -> true ->
update_threshold(Cfg),
load(Cfg), load(Cfg),
State#{enable := true}; State#{enable := true};
_ -> _ ->
@ -173,7 +152,7 @@ handle_call({enable, Enable}, _From,
{reply, ok, State2}; {reply, ok, State2};
handle_call(clear_history, _, State) -> handle_call(clear_history, _, State) ->
ets:delete_all_objects(?TOPK_TAB), do_clear_history(),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
@ -190,12 +169,6 @@ handle_info(expire_tick, #{config := Cfg} = State) ->
do_clear(Cfg, Logs), do_clear(Cfg, Logs),
{noreply, State}; {noreply, State};
handle_info(notice_tick, #{config := Cfg} = State) ->
notice_tick(Cfg),
Logs = ets:tab2list(?TOPK_TAB),
do_notification(Logs, State),
{noreply, State#{last_tick_at := ?NOW}};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
@ -213,115 +186,116 @@ code_change(_OldVsn, State, _Extra) ->
expire_tick(_) -> expire_tick(_) ->
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
notice_tick(Cfg) ->
case get_value(notice_interval, Cfg) of
0 -> ok;
Interval ->
erlang:send_after(Interval, self(), ?FUNCTION_NAME),
ok
end.
-spec do_notification(list(), state()) -> ok.
do_notification([], _) ->
ok;
do_notification(Logs, #{last_tick_at := LastTickTime, config := Cfg}) ->
start_publish(Logs, LastTickTime, Cfg),
ok.
start_publish(Logs, TickTime, Cfg) ->
emqx_pool:async_submit({fun do_publish/4, [Logs, erlang:length(Logs), TickTime, Cfg]}).
do_publish([], _, _, _) ->
ok;
do_publish(Logs, Rank, TickTime, Cfg) ->
BatchSize = get_value(notice_batch_size, Cfg),
do_publish(Logs, BatchSize, Rank, TickTime, Cfg, []).
do_publish([Log | T], Size, Rank, TickTime, Cfg, Cache) when Size > 0 ->
Cache2 = [convert_to_notice(Rank, Log) | Cache],
do_publish(T, Size - 1, Rank - 1, TickTime, Cfg, Cache2);
do_publish(Logs, Size, Rank, TickTime, Cfg, Cache) when Size =:= 0 ->
publish(TickTime, Cfg, Cache),
do_publish(Logs, Rank, TickTime, Cfg);
do_publish([], _, _Rank, TickTime, Cfg, Cache) ->
publish(TickTime, Cfg, Cache),
ok.
convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId),
type = Type,
last_update_time = Ts}) ->
#{rank => Rank,
clientid => ClientId,
latency => Latency,
type => Type,
timestamp => Ts}.
publish(TickTime, Cfg, Notices) ->
WindowLog = #{last_tick_at => TickTime,
logs => lists:reverse(Notices)},
Payload = emqx_json:encode(WindowLog),
Msg = #message{ id = emqx_guid:gen()
, qos = get_value(notice_qos, Cfg)
, from = ?MODULE
, topic = emqx_topic:systop(?NOTICE_TOPIC_NAME)
, payload = Payload
, timestamp = ?NOW
},
_ = emqx_broker:safe_publish(Msg),
ok.
load(Cfg) -> load(Cfg) ->
MaxSize = get_value(top_k_num, Cfg), MaxSize = get_value(top_k_num, Cfg),
_ = emqx:hook('message.slow_subs_stats', StatsType = get_value(stats_type, Cfg),
fun ?MODULE:on_stats_update/2, Threshold = get_value(threshold, Cfg),
[#{max_size => MaxSize}]), _ = emqx:hook('message.publish_completed',
fun ?MODULE:on_publish_completed/3,
[#{max_size => MaxSize,
stats_type => StatsType,
threshold => Threshold
}]),
ok. ok.
unload() -> unload() ->
emqx:unhook('message.slow_subs_stats', fun ?MODULE:on_stats_update/2). emqx:unhook('message.publish_completed', fun ?MODULE:on_publish_completed/3 ),
do_clear_history().
do_clear(Cfg, Logs) -> do_clear(Cfg, Logs) ->
Now = ?NOW, Now = ?NOW,
Interval = get_value(expire_interval, Cfg), Interval = get_value(expire_interval, Cfg),
Each = fun(#top_k{index = Index, last_update_time = Ts}) -> Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) ->
case Now - Ts >= Interval of case Now - Ts >= Interval of
true -> true ->
ets:delete(?TOPK_TAB, Index); delete_with_index(TimeSpan, Id);
_ -> _ ->
true true
end end
end, end,
lists:foreach(Each, Logs). lists:foreach(Each, Logs).
try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) -> -spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer().
calc_timespan(whole, #message{timestamp = Ts}, Now) ->
Now - Ts;
calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) ->
End = emqx_message:get_header(deliver_begin_at, Msg, Now),
End - Ts;
calc_timespan(response, Msg, Now) ->
Begin = emqx_message:get_header(deliver_begin_at, Msg, Now),
Now - Begin.
%% update_topk is safe, because each process has a unique clientid
%% insert or delete are bind to this clientid, so there is no race condition
%%
%% but, the delete_with_index in L249 may have a race condition
%% because the data belong to other clientid will be deleted here (deleted the data written by other processes).%% so it may appear that:
%% when deleting a record, the other process is performing an update operation on this recrod
%% in order to solve this race condition problem, the index table also uses the ordered_set type,
%% so that even if the above situation occurs, it will only cause the old data to be deleted twice
%% and the correctness of the data will not be affected
try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) ->
case ets:info(?TOPK_TAB, size) of case ets:info(?TOPK_TAB, size) of
Size when Size < MaxSize -> Size when Size < MaxSize ->
%% if the size is under limit, insert it directly update_topk(Now, LastUpdateValue, TimeSpan, Id);
ets:insert(?TOPK_TAB,
#top_k{index = Index, type = Type, last_update_time = Ts});
_Size -> _Size ->
%% find the minimum value case ets:first(?TOPK_TAB) of
?INDEX(Min, _) = First = '$end_of_table' ->
case ets:first(?TOPK_TAB) of update_topk(Now, LastUpdateValue, TimeSpan, Id);
?INDEX(_, _) = I -> I; ?TOPK_INDEX(_, Id) ->
_ -> ?INDEX(Latency - 1, <<>>) update_topk(Now, LastUpdateValue, TimeSpan, Id);
end, ?TOPK_INDEX(Min, MinId) ->
case TimeSpan =< Min of
case Latency =< Min of true -> false;
true -> true; _ ->
_ -> update_topk(Now, LastUpdateValue, TimeSpan, Id),
ets:insert(?TOPK_TAB, delete_with_index(Min, MinId)
#top_k{index = Index, type = Type, last_update_time = Ts}), end
ets:delete(?TOPK_TAB, First)
end end
end. end.
update_threshold(Conf) -> -spec find_last_update_value(id()) -> non_neg_integer().
Threshold = proplists:get_value(threshold, Conf), find_last_update_value(Id) ->
_ = emqx_message_latency_stats:update_threshold(Threshold), case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of
ok. #index_tab{index = ?INDEX(LastUpdateValue, Id)} ->
LastUpdateValue;
_ ->
0
end.
-spec update_topk(non_neg_integer(), non_neg_integer(), non_neg_integer(), integer()) -> true.
update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
%% update record
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id),
last_update_time = Now,
extra = []
}),
%% update index
ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}),
%% delete the old record & index
delete_with_index(LastUpdateValue, Id).
-spec delete_with_index(non_neg_integer(), id()) -> true.
delete_with_index(0, _) ->
true;
delete_with_index(TimeSpan, Id) ->
ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)),
ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)).
safe_create_tab(Name, Opts) ->
case ets:whereis(Name) of
undefined ->
Name = ets:new(Name, Opts);
_ ->
Name
end.
do_clear_history() ->
ets:delete_all_objects(?INDEX_TAB),
ets:delete_all_objects(?TOPK_TAB).

View File

@ -30,9 +30,12 @@
-export([ clear_history/2 -export([ clear_history/2
, get_history/2 , get_history/2
, get_history/0
]). ]).
-include("include/emqx_slow_subs.hrl"). -include_lib("emqx_plugin_libs/include/emqx_slow_subs.hrl").
-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)).
-import(minirest, [return/1]). -import(minirest, [return/1]).
@ -41,17 +44,55 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
clear_history(_Bindings, _Params) -> clear_history(_Bindings, _Params) ->
ok = emqx_slow_subs:clear_history(), Nodes = ekka_mnesia:running_nodes(),
_ = [rpc_call(Node, emqx_slow_subs, clear_history, [], ok, ?DEFAULT_RPC_TIMEOUT)
|| Node <- Nodes],
return(ok). return(ok).
get_history(_Bindings, Params) -> get_history(_Bindings, _Params) ->
RowFun = fun(#top_k{index = ?INDEX(Latency, ClientId), Nodes = ekka_mnesia:running_nodes(),
type = Type, Fun = fun(Node, Acc) ->
last_update_time = Ts}) -> NodeRankL = rpc_call(Node,
[{clientid, ClientId}, ?MODULE,
{latency, Latency}, ?FUNCTION_NAME,
{type, Type}, [],
{last_update_time, Ts}] [],
end, ?DEFAULT_RPC_TIMEOUT),
Return = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, Params, RowFun), NodeRankL ++ Acc
return({ok, Return}). end,
RankL = lists:foldl(Fun, [], Nodes),
SortFun = fun(#{timespan := A}, #{timespan := B}) ->
A > B
end,
SortedL = lists:sort(SortFun, RankL),
SortedL2 = lists:sublist(SortedL, ?MAX_SIZE),
return({ok, SortedL2}).
get_history() ->
Node = node(),
RankL = ets:tab2list(?TOPK_TAB),
ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)),
last_update_time = LastUpdateTime
}) ->
#{ clientid => ClientId
, node => Node
, topic => Topic
, timespan => TimeSpan
, last_update_time => LastUpdateTime
}
end,
lists:map(ConvFun, RankL).
rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() ->
erlang:apply(M, F, A);
rpc_call(Node, M, F, A, ErrorR, T) ->
case rpc:call(Node, M, F, A, T) of
{badrpc, _} -> ErrorR;
Res -> Res
end.

View File

@ -2228,25 +2228,18 @@ module.presence.qos = 1
## maximum number of Top-K record ## maximum number of Top-K record
## ##
## Value: 10 ## Defalut: 10
#module.slow_subs.top_k_num = 10 #module.slow_subs.top_k_num = 10
## enable notification ## Stats Type
## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval
## publish is disabled if set to 0s.
## ##
## Defaut: 0s ## Default: whole
#module.slow_subs.notice_interval = 0s #module.slow_subs.stats_type = whole
## QoS of notification message in notice topic ## Stats Threshold
## ##
## Defaut: 0 ## Default: 500ms
#module.slow_subs.notice_qos = 0 #module.slow_subs.threshold = 500ms
## Maximum information number in one notification
##
## Default: 100
#module.slow_subs.notice_batch_size = 100
## CONFIG_SECTION_END=modules ================================================== ## CONFIG_SECTION_END=modules ==================================================

View File

@ -69,7 +69,7 @@ stop_child(ChildId) ->
init([]) -> init([]) ->
ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]), ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]),
emqx_slow_subs:init_topk_tab(), emqx_slow_subs:init_tab(),
{ok, {{one_for_one, 10, 100}, []}}. {ok, {{one_for_one, 10, 100}, []}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -53,47 +53,39 @@ t_log_and_pub(_) ->
%% Sub topic first %% Sub topic first
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
Clients = start_client(Subs), Clients = start_client(Subs),
emqx:subscribe("$SYS/brokers/+/slow_subs"),
timer:sleep(1000),
Now = ?NOW,
%% publish
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500})
end,
lists:seq(1, 10)),
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500})
end,
lists:seq(1, 10)),
timer:sleep(1000),
Size = ets:info(?TOPK_TAB, size),
%% some time record maybe delete due to it expired
?assert(Size =< 6 andalso Size >= 4),
timer:sleep(1500), timer:sleep(1500),
Recs = try_receive([]), Now = ?NOW,
RecSum = lists:sum(Recs),
?assert(RecSum >= 5), %% publish
?assert(lists:all(fun(E) -> E =< 3 end, Recs)), lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500})
end,
lists:seq(1, 10)),
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500})
end,
lists:seq(1, 10)),
timer:sleep(2000), timer:sleep(2000),
Size = ets:info(?TOPK_TAB, size),
%% some time record maybe delete due to it expired
?assert(Size =< 6 andalso Size >= 4,
unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))),
timer:sleep(3000),
?assert(ets:info(?TOPK_TAB, size) =:= 0), ?assert(ets:info(?TOPK_TAB, size) =:= 0),
[Client ! stop || Client <- Clients], [Client ! stop || Client <- Clients],
ok. ok.
base_conf() -> base_conf() ->
[ {threshold, 500} [ {threshold, 300}
, {top_k_num, 5} , {top_k_num, 5}
, {expire_interval, timer:seconds(3)} , {expire_interval, timer:seconds(3)}
, {notice_interval, 1500} , {stats_type, whole}
, {notice_qos, 0}
, {notice_batch_size, 3}
]. ].
start_client(Subs) -> start_client(Subs) ->

View File

@ -68,31 +68,28 @@ base_conf() ->
t_get_history(_) -> t_get_history(_) ->
Now = ?NOW, Now = ?NOW,
Each = fun(I) -> Each = fun(I) ->
ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId), Topic = erlang:list_to_binary(io_lib:format("topic/~p", [I])),
type = average, ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(I, ?ID(ClientId, Topic)),
last_update_time = Now}) last_update_time = Now})
end, end,
lists:foreach(Each, lists:seq(1, 5)), lists:foreach(Each, lists:seq(1, 5)),
{ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10", {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "",
auth_header_()), auth_header_()),
#{meta := Meta, data := [First | _]} = decode(Data), #{data := [First | _]} = decode(Data),
RMeta = #{page => 1, limit => 10, count => 5},
?assertEqual(RMeta, Meta),
RFirst = #{clientid => <<"test_5">>, RFirst = #{clientid => <<"test_5">>,
latency => 5, topic => <<"topic/5">>,
type => <<"average">>, timespan => 5,
node => erlang:atom_to_binary(node()),
last_update_time => Now}, last_update_time => Now},
?assertEqual(RFirst, First). ?assertEqual(RFirst, First).
t_clear(_) -> t_clear(_) ->
ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>), ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"test">>, <<"test">>)),
type = average,
last_update_time = ?NOW}), last_update_time = ?NOW}),
{ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [],

View File

@ -1008,6 +1008,7 @@ end}.
]}. ]}.
%% @doc the number of smaples for calculate the average latency of delivery %% @doc the number of smaples for calculate the average latency of delivery
%% @deprecated This is a obsoleted configuration, kept here only for compatibility
{mapping, "zone.$name.latency_samples", "emqx.zones", [ {mapping, "zone.$name.latency_samples", "emqx.zones", [
{default, 10}, {default, 10},
{datatype, integer} {datatype, integer}
@ -2230,26 +2231,35 @@ end}.
]}. ]}.
{mapping, "module.slow_subs.expire_interval", "emqx.modules", [ {mapping, "module.slow_subs.expire_interval", "emqx.modules", [
{default, "5m"}, {default, "300s"},
{datatype, {duration, ms}} {datatype, {duration, ms}}
]}. ]}.
{mapping, "module.slow_subs.top_k_num", "emqx.modules", [ {mapping, "module.slow_subs.top_k_num", "emqx.modules", [
{default, 500}, {default, 10},
{datatype, integer} {datatype, integer},
{validators, ["range:0-1000"]}
]}. ]}.
{mapping, "module.slow_subs.stats_type", "emqx.modules", [
{default, whole},
{datatype, {enum, [whole, internal, response]}}
]}.
%% @deprecated This is a obsoleted configuration, kept here only for compatibility
{mapping, "module.slow_subs.notice_interval", "emqx.modules", [ {mapping, "module.slow_subs.notice_interval", "emqx.modules", [
{default, "0s"}, {default, "0s"},
{datatype, {duration, ms}} {datatype, {duration, ms}}
]}. ]}.
%% @deprecated This is a obsoleted configuration, kept here only for compatibility
{mapping, "module.slow_subs.notice_qos", "emqx.modules", [ {mapping, "module.slow_subs.notice_qos", "emqx.modules", [
{default, 0}, {default, 0},
{datatype, integer}, {datatype, integer},
{validators, ["range:0-1"]} {validators, ["range:0-1"]}
]}. ]}.
%% @deprecated This is a obsoleted configuration, kept here only for compatibility
{mapping, "module.slow_subs.notice_batch_size", "emqx.modules", [ {mapping, "module.slow_subs.notice_batch_size", "emqx.modules", [
{default, 500}, {default, 500},
{datatype, integer} {datatype, integer}

View File

@ -124,14 +124,13 @@
await_rel_timeout :: timeout(), await_rel_timeout :: timeout(),
%% Created at %% Created at
created_at :: pos_integer(), created_at :: pos_integer(),
%% Message deliver latency stats
latency_stats :: emqx_message_latency_stats:stats() extras :: map()
}). }).
%% in the previous code, we will replace the message record with the pubrel atom %% in the previous code, we will replace the message record with the pubrel atom
%% in the pubrec function, this will lose the creation time of the message, %% in the pubrec function, this will lose the creation time of the message,
%% but now we need this time to calculate latency, so now pubrel atom is changed to this record -record(pubrel_await, {message :: emqx_types:message()}).
-record(pubrel_await, {timestamp :: non_neg_integer()}).
-type(session() :: #session{}). -type(session() :: #session{}).
@ -157,8 +156,7 @@
mqueue_dropped, mqueue_dropped,
next_pkt_id, next_pkt_id,
awaiting_rel_cnt, awaiting_rel_cnt,
awaiting_rel_max, awaiting_rel_max
latency_stats
]). ]).
-define(DEFAULT_BATCH_N, 1000). -define(DEFAULT_BATCH_N, 1000).
@ -187,7 +185,7 @@ init(#{zone := Zone} = CInfo, #{receive_maximum := MaxInflight}) ->
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)), await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)),
created_at = erlang:system_time(millisecond), created_at = erlang:system_time(millisecond),
latency_stats = emqx_message_latency_stats:new(Zone) extras = #{}
}. }.
%% @private init mq %% @private init mq
@ -244,9 +242,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout div 1000; Timeout div 1000;
info(created_at, #session{created_at = CreatedAt}) -> info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt; CreatedAt.
info(latency_stats, #session{latency_stats = Stats}) ->
emqx_message_latency_stats:latency(Stats).
%% @doc Get stats of the session. %% @doc Get stats of the session.
-spec(stats(session()) -> emqx_types:stats()). -spec(stats(session()) -> emqx_types:stats()).
@ -339,10 +335,10 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) -> {value, {Msg, _Ts}} when is_record(Msg, message) ->
on_publish_completed(Msg, Session),
Inflight1 = emqx_inflight:delete(PacketId, Inflight), Inflight1 = emqx_inflight:delete(PacketId, Inflight),
Session2 = update_latency(Msg, Session), return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1})); {value, _Other} ->
{value, {_Pubrel, _Ts}} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}; {error, ?RC_PACKET_IDENTIFIER_IN_USE};
none -> none ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@ -364,10 +360,10 @@ return_with(Msg, {ok, Publishes, Session}) ->
pubrec(PacketId, Session = #session{inflight = Inflight}) -> pubrec(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) -> {value, {Msg, _Ts}} when is_record(Msg, message) ->
Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}), Update = with_ts(#pubrel_await{message = Msg}),
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
{ok, Msg, Session#session{inflight = Inflight1}}; {ok, Msg, Session#session{inflight = Inflight1}};
{value, {_PUBREL, _Ts}} -> {value, _Other} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}; {error, ?RC_PACKET_IDENTIFIER_IN_USE};
none -> none ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@ -396,10 +392,10 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
| {error, emqx_types:reason_code()}). | {error, emqx_types:reason_code()}).
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) -> {value, {Pubrel, Msg}} when is_record(Pubrel, pubrel_await) ->
Session2 = update_latency(Pubrel, Session), on_publish_completed(Msg, Session),
Inflight1 = emqx_inflight:delete(PacketId, Inflight), Inflight1 = emqx_inflight:delete(PacketId, Inflight),
dequeue(ClientInfo, Session2#session{inflight = Inflight1}); dequeue(ClientInfo, Session#session{inflight = Inflight1});
{value, _Other} -> {value, _Other} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}; {error, ?RC_PACKET_IDENTIFIER_IN_USE};
none -> none ->
@ -464,8 +460,7 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
end. end.
deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
emqx:run_hook('message.publish_done', on_publish_completed(Msg, Session),
[Msg, #{session_rebirth_time => Session#session.created_at}]),
{ok, [{undefined, maybe_ack(Msg)}], Session}; {ok, [{undefined, maybe_ack(Msg)}], Session};
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
@ -480,7 +475,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
{ok, Session1}; {ok, Session1};
false -> false ->
Publish = {PacketId, maybe_ack(Msg)}, Publish = {PacketId, maybe_ack(Msg)},
Session1 = await(PacketId, Msg, Session), Msg2 = mark_begin_deliver(Msg),
Session1 = await(PacketId, Msg2, Session),
{ok, [Publish], next_pkt_id(Session1)} {ok, [Publish], next_pkt_id(Session1)}
end. end.
@ -574,14 +570,13 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
-spec(retry(emqx_types:clientinfo(), session()) -spec(retry(emqx_types:clientinfo(), session())
-> {ok, session()} -> {ok, session()}
| {ok, replies(), timeout(), session()}). | {ok, replies(), timeout(), session()}).
retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> retry(ClientInfo, Session = #session{inflight = Inflight}) ->
case emqx_inflight:is_empty(Inflight) of case emqx_inflight:is_empty(Inflight) of
true -> {ok, Session}; true -> {ok, Session};
false -> false ->
Now = erlang:system_time(millisecond), Now = erlang:system_time(millisecond),
Session2 = check_expire_latency(Now, RetryInterval, Session),
retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
[], Now, Session2, ClientInfo) [], Now, Session, ClientInfo)
end. end.
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) -> retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) ->
@ -717,31 +712,15 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Message Latency Stats %% Message Latency Stats
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
update_latency(Msg, on_publish_completed(Msg,
#session{clientid = ClientId, #session{clientid = ClientId, created_at = CreateAt}) ->
latency_stats = Stats, emqx:run_hook('message.publish_completed',
created_at = CreateAt} = S) -> [Msg, #{ session_birth_time => CreateAt
case get_birth_timestamp(Msg, CreateAt) of , clientid => ClientId
0 -> S; }]).
Ts ->
Latency = erlang:system_time(millisecond) - Ts,
Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats),
S#session{latency_stats = Stats2}
end.
check_expire_latency(Now, Interval, mark_begin_deliver(Msg) ->
#session{clientid = ClientId, latency_stats = Stats} = S) -> emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg).
Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats),
S#session{latency_stats = Stats2}.
get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
Ts;
get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
Ts;
get_birth_timestamp(_, _) ->
0.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions

View File

@ -1,119 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_message_latency_stats).
%% API
-export([ new/1, update/3, check_expire/4, latency/1]).
-export([get_threshold/0, update_threshold/1]).
-define(NOW, erlang:system_time(millisecond)).
-define(MINIMUM_INSERT_INTERVAL, 1000).
-define(MINIMUM_THRESHOLD, 100).
-define(DEFAULT_THRESHOLD, 500).
-define(THRESHOLD_KEY, {?MODULE, threshold}).
-opaque stats() :: #{ ema := emqx_moving_average:ema()
, last_update_time := timestamp()
, last_access_time := timestamp() %% timestamp of last try to call hook
, last_insert_value := non_neg_integer()
}.
-type timestamp() :: non_neg_integer().
-type timespan() :: number().
-type latency_type() :: average
| expire.
-import(emqx_zone, [get_env/3]).
-export_type([stats/0, latency_type/0]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new(non_neg_integer() | emqx_types:zone()) -> stats().
new(SamplesT) when is_integer(SamplesT) ->
Samples = erlang:max(1, SamplesT),
#{ ema => emqx_moving_average:new(exponential, #{period => Samples})
, last_update_time => 0
, last_access_time => 0
, last_insert_value => 0
};
new(Zone) ->
Samples = get_env(Zone, latency_samples, 1),
new(Samples).
-spec update(emqx_types:clientid(), number(), stats()) -> stats().
update(ClientId, Val, #{ema := EMA} = Stats) ->
Now = ?NOW,
#{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA),
Stats2 = call_hook(ClientId, Now, average, Latency, Stats),
Stats2#{ ema := EMA2
, last_update_time := ?NOW}.
-spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats().
check_expire(_, Now, Interval, #{last_update_time := LUT} = S)
when LUT >= Now - Interval ->
S;
check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) ->
Latency = Now - LUT,
call_hook(ClientId, Now, expire, Latency, S).
-spec latency(stats()) -> number().
latency(#{ema := #{average := Average}}) ->
Average.
-spec update_threshold(pos_integer()) -> pos_integer().
update_threshold(Threshold) ->
Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD),
persistent_term:put(?THRESHOLD_KEY, Val),
Val.
get_threshold() ->
persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats().
call_hook(_, _, _, Latency, S)
when Latency =< ?MINIMUM_THRESHOLD ->
S;
call_hook(_, Now, _, _, #{last_access_time := LIT} = S)
when Now =< LIT + ?MINIMUM_INSERT_INTERVAL ->
S;
call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
case Latency =< get_threshold() of
true ->
Stats#{last_access_time := Now};
_ ->
ToInsert = erlang:floor(Latency),
Arg = #{clientid => ClientId,
latency => ToInsert,
type => Type,
last_insert_value => LIV,
update_time => Now},
emqx:run_hook('message.slow_subs_stats', [Arg]),
Stats#{last_insert_value := ToInsert,
last_access_time := Now}
end.

View File

@ -192,7 +192,8 @@ t_pubrec(_) ->
Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>),
Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}), Session = session(#{inflight => Inflight}),
{ok, Msg, Session1} = emqx_session:pubrec(2, Session), {ok, MsgWithTime, Session1} = emqx_session:pubrec(2, Session),
?assertEqual(Msg, emqx_message:remove_header(deliver_begin_at, MsgWithTime)),
?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
t_pubrec_packet_id_in_use_error(_) -> t_pubrec_packet_id_in_use_error(_) ->
@ -214,7 +215,7 @@ t_pubrel_error_packetid_not_found(_) ->
t_pubcomp(_) -> t_pubcomp(_) ->
Now = ts(millisecond), Now = ts(millisecond),
Inflight = emqx_inflight:insert(1, {{pubrel_await, Now}, Now}, emqx_inflight:new()), Inflight = emqx_inflight:insert(1, {{pubrel_await, undefined}, Now}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}), Session = session(#{inflight => Inflight}),
{ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session), {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session),
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
@ -271,9 +272,13 @@ t_deliver_qos1(_) ->
?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)),
?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)),
{ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
{ok, Msg1WithTime, Session2} = emqx_session:puback(clientinfo(), 1, Session1),
?assertEqual(Msg1, emqx_message:remove_header(deliver_begin_at, Msg1WithTime)),
?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)),
{ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
{ok, Msg2WithTime, Session3} = emqx_session:puback(clientinfo(), 2, Session2),
?assertEqual(Msg2, emqx_message:remove_header(deliver_begin_at, Msg2WithTime)),
?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)).
t_deliver_qos2(_) -> t_deliver_qos2(_) ->
@ -317,7 +322,11 @@ t_retry(_) ->
{ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session),
ok = timer:sleep(200), ok = timer:sleep(200),
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
{ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1), {ok, Msgs1WithTime, 100, Session2} = emqx_session:retry(clientinfo(), Session1),
?assertEqual(Msgs1,
lists:map(fun({Id, M}) ->
{Id, emqx_message:remove_header(deliver_begin_at, M)}
end, Msgs1WithTime)),
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -341,7 +350,10 @@ t_replay(_) ->
Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1), Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1),
Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs], Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs],
{ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2), {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2),
?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), ?assertEqual(Pubs1 ++ [{3, Msg}],
lists:map(fun({Id, M}) ->
{Id, emqx_message:remove_header(deliver_begin_at, M)}
end, ReplayPubs)),
?assertEqual(3, emqx_session:info(inflight_cnt, Session3)). ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)).
t_expire_awaiting_rel(_) -> t_expire_awaiting_rel(_) ->