refactor(emqx_slow_subs): refactor use moving average (#6287)

* refactor(emqx_slow_subs): refactor use moving average

* fix(emqx_slow_subs): change elapsed to latency, and fix some error

* fix(emqx_slow_subs): fix emqx_mgmt_api.erl indent

* fix(emqx_slow_subs): change api name

* fix(emqx_slow_subs): fix and improve some code

* fix(emqx_slow_subs): move clienid filed from latency_stats to session
This commit is contained in:
lafirest 2021-11-26 10:42:15 +08:00 committed by GitHub
parent 39e52d583e
commit fef3fc27cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 826 additions and 660 deletions

View File

@ -49,17 +49,46 @@ paginate(Tables, Params, RowFun) ->
query_handle(Table) when is_atom(Table) ->
qlc:q([R|| R <- ets:table(Table)]);
query_handle({Table, Opts}) when is_atom(Table) ->
qlc:q([R|| R <- ets:table(Table, Opts)]);
query_handle([Table]) when is_atom(Table) ->
qlc:q([R|| R <- ets:table(Table)]);
query_handle([{Table, Opts}]) when is_atom(Table) ->
qlc:q([R|| R <- ets:table(Table, Opts)]);
query_handle(Tables) ->
qlc:append([qlc:q([E || E <- ets:table(T)]) || T <- Tables]).
Fold = fun({Table, Opts}, Acc) ->
Handle = qlc:q([R|| R <- ets:table(Table, Opts)]),
[Handle | Acc];
(Table, Acc) ->
Handle = qlc:q([R|| R <- ets:table(Table)]),
[Handle | Acc]
end,
Handles = lists:foldl(Fold, [], Tables),
qlc:append(lists:reverse(Handles)).
count(Table) when is_atom(Table) ->
ets:info(Table, size);
count({Table, _Opts}) when is_atom(Table) ->
ets:info(Table, size);
count([Table]) when is_atom(Table) ->
ets:info(Table, size);
count([{Table, _Opts}]) when is_atom(Table) ->
ets:info(Table, size);
count(Tables) ->
lists:sum([count(T) || T <- Tables]).
Fold = fun({Table, _Opts}, Acc) ->
count(Table) ++ Acc;
(Table, Acc) ->
count(Table) ++ Acc
end,
lists:foldl(Fold, 0, Tables).
count(Table, Nodes) ->
lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]).

View File

@ -14,12 +14,15 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-define(LOG_TAB, emqx_st_statistics_log).
-define(TOPK_TAB, emqx_st_statistics_topk).
-define(TOPK_TAB, emqx_slow_subs_topk).
-record(top_k, { rank :: pos_integer()
, topic :: emqx_types:topic()
, average_count :: number()
, average_elapsed :: number()}).
-define(INDEX(Latency, ClientId), {Latency, ClientId}).
-record(top_k, { index :: index()
, type :: emqx_message_latency_stats:latency_type()
, last_update_time :: pos_integer()
, extra = []
}).
-type top_k() :: #top_k{}.
-type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()).

View File

@ -0,0 +1,319 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_slow_subs).
-behaviour(gen_server).
-include_lib("include/emqx.hrl").
-include_lib("include/logger.hrl").
-include_lib("emqx_plugin_libs/include/emqx_slow_subs.hrl").
-logger_header("[SLOW Subs]").
-export([ start_link/1, on_stats_update/2, enable/0
, disable/0, clear_history/0, init_topk_tab/0
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-compile(nowarn_unused_type).
-type state() :: #{ config := proplist:proplist()
, enable := boolean()
, last_tick_at := pos_integer()
}.
-type log() :: #{ rank := pos_integer()
, clientid := emqx_types:clientid()
, latency := non_neg_integer()
, type := emqx_message_latency_stats:latency_type()
}.
-type window_log() :: #{ last_tick_at := pos_integer()
, logs := [log()]
}.
-type message() :: #message{}.
-import(proplists, [get_value/2]).
-type stats_update_args() :: #{ clientid := emqx_types:clientid()
, latency := non_neg_integer()
, type := emqx_message_latency_stats:latency_type()
, last_insert_value := non_neg_integer()
, update_time := timer:time()
}.
-type stats_update_env() :: #{max_size := pos_integer()}.
-ifdef(TEST).
-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)).
-else.
-define(EXPIRE_CHECK_INTERVAL, timer:seconds(10)).
-endif.
-define(NOW, erlang:system_time(millisecond)).
-define(NOTICE_TOPIC_NAME, "slow_subs").
-define(DEF_CALL_TIMEOUT, timer:seconds(10)).
%% erlang term order
%% number < atom < reference < fun < port < pid < tuple < list < bit string
%% ets ordered_set is ascending by term order
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%% @doc Start the st_statistics
-spec(start_link(Env :: list()) -> emqx_types:startlink_ret()).
start_link(Env) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
%% XXX NOTE:pay attention to the performance here
-spec on_stats_update(stats_update_args(), stats_update_env()) -> true.
on_stats_update(#{clientid := ClientId,
latency := Latency,
type := Type,
last_insert_value := LIV,
update_time := Ts},
#{max_size := MaxSize}) ->
LastIndex = ?INDEX(LIV, ClientId),
Index = ?INDEX(Latency, ClientId),
%% check whether the client is in the table
case ets:lookup(?TOPK_TAB, LastIndex) of
[#top_k{index = Index}] ->
%% if last value == the new value, return
true;
[_] ->
%% if Latency > minimum value, we should update it
%% if Latency < minimum value, maybe it can replace the minimum value
%% so alwyas update at here
%% do we need check if Latency == minimum ???
ets:insert(?TOPK_TAB,
#top_k{index = Index, type = Type, last_update_time = Ts}),
ets:delete(?TOPK_TAB, LastIndex);
[] ->
%% try to insert
try_insert_to_topk(MaxSize, Index, Latency, Type, Ts)
end.
clear_history() ->
gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT).
enable() ->
gen_server:call(?MODULE, {enable, true}, ?DEF_CALL_TIMEOUT).
disable() ->
gen_server:call(?MODULE, {enable, false}, ?DEF_CALL_TIMEOUT).
init_topk_tab() ->
case ets:whereis(?TOPK_TAB) of
undefined ->
?TOPK_TAB = ets:new(?TOPK_TAB,
[ ordered_set, public, named_table
, {keypos, #top_k.index}, {write_concurrency, true}
, {read_concurrency, true}
]);
_ ->
?TOPK_TAB
end.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Conf]) ->
notice_tick(Conf),
expire_tick(Conf),
MaxSize = get_value(top_k_num, Conf),
load(MaxSize),
{ok, #{config => Conf,
last_tick_at => ?NOW,
enable => true}}.
handle_call({enable, Enable}, _From,
#{config := Cfg, enable := IsEnable} = State) ->
State2 = case Enable of
IsEnable ->
State;
true ->
MaxSize = get_value(max_topk_num, Cfg),
load(MaxSize),
State#{enable := true};
_ ->
unload(),
State#{enable := false}
end,
{reply, ok, State2};
handle_call(clear_history, _, State) ->
ets:delete_all_objects(?TOPK_TAB),
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(expire_tick, #{config := Cfg} = State) ->
expire_tick(Cfg),
Logs = ets:tab2list(?TOPK_TAB),
do_clear(Cfg, Logs),
{noreply, State};
handle_info(notice_tick, #{config := Cfg} = State) ->
notice_tick(Cfg),
Logs = ets:tab2list(?TOPK_TAB),
do_notification(Logs, State),
{noreply, State#{last_tick_at := ?NOW}};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _) ->
unload(),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
expire_tick(_) ->
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
notice_tick(Cfg) ->
case get_value(notice_interval, Cfg) of
0 -> ok;
Interval ->
erlang:send_after(Interval, self(), ?FUNCTION_NAME),
ok
end.
-spec do_notification(list(), state()) -> ok.
do_notification([], _) ->
ok;
do_notification(Logs, #{last_tick_at := LastTickTime, config := Cfg}) ->
start_publish(Logs, LastTickTime, Cfg),
ok.
start_publish(Logs, TickTime, Cfg) ->
emqx_pool:async_submit({fun do_publish/4, [Logs, erlang:length(Logs), TickTime, Cfg]}).
do_publish([], _, _, _) ->
ok;
do_publish(Logs, Rank, TickTime, Cfg) ->
BatchSize = get_value(notice_batch_size, Cfg),
do_publish(Logs, BatchSize, Rank, TickTime, Cfg, []).
do_publish([Log | T], Size, Rank, TickTime, Cfg, Cache) when Size > 0 ->
Cache2 = [convert_to_notice(Rank, Log) | Cache],
do_publish(T, Size - 1, Rank - 1, TickTime, Cfg, Cache2);
do_publish(Logs, Size, Rank, TickTime, Cfg, Cache) when Size =:= 0 ->
publish(TickTime, Cfg, Cache),
do_publish(Logs, Rank, TickTime, Cfg);
do_publish([], _, _Rank, TickTime, Cfg, Cache) ->
publish(TickTime, Cfg, Cache),
ok.
convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId),
type = Type,
last_update_time = Ts}) ->
#{rank => Rank,
clientid => ClientId,
latency => Latency,
type => Type,
timestamp => Ts}.
publish(TickTime, Cfg, Notices) ->
WindowLog = #{last_tick_at => TickTime,
logs => lists:reverse(Notices)},
Payload = emqx_json:encode(WindowLog),
Msg = #message{ id = emqx_guid:gen()
, qos = get_value(notice_qos, Cfg)
, from = ?MODULE
, topic = emqx_topic:systop(?NOTICE_TOPIC_NAME)
, payload = Payload
, timestamp = ?NOW
},
_ = emqx_broker:safe_publish(Msg),
ok.
load(MaxSize) ->
_ = emqx:hook('message.slow_subs_stats',
fun ?MODULE:on_stats_update/2,
[#{max_size => MaxSize}]),
ok.
unload() ->
emqx:unhook('message.slow_subs_stats', fun ?MODULE:on_stats_update/2).
do_clear(Cfg, Logs) ->
Now = ?NOW,
Interval = get_value(expire_interval, Cfg),
Each = fun(#top_k{index = Index, last_update_time = Ts}) ->
case Now - Ts >= Interval of
true ->
ets:delete(?TOPK_TAB, Index);
_ ->
true
end
end,
lists:foreach(Each, Logs).
try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) ->
case ets:info(?TOPK_TAB, size) of
Size when Size < MaxSize ->
%% if the size is under limit, insert it directly
ets:insert(?TOPK_TAB,
#top_k{index = Index, type = Type, last_update_time = Ts});
_Size ->
%% find the minimum value
?INDEX(Min, _) = First =
case ets:first(?TOPK_TAB) of
?INDEX(_, _) = I -> I;
_ -> ?INDEX(Latency - 1, <<>>)
end,
case Latency =< Min of
true -> true;
_ ->
ets:insert(?TOPK_TAB,
#top_k{index = Index, type = Type, last_update_time = Ts}),
ets:delete(?TOPK_TAB, First)
end
end.

View File

@ -0,0 +1,57 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_slow_subs_api).
-rest_api(#{name => clear_history,
method => 'DELETE',
path => "/slow_subscriptions",
func => clear_history,
descr => "Clear current data and re count slow topic"}).
-rest_api(#{name => get_history,
method => 'GET',
path => "/slow_subscriptions",
func => get_history,
descr => "Get slow topics statistics record data"}).
-export([ clear_history/2
, get_history/2
]).
-include("include/emqx_slow_subs.hrl").
-import(minirest, [return/1]).
%%--------------------------------------------------------------------
%% HTTP API
%%--------------------------------------------------------------------
clear_history(_Bindings, _Params) ->
ok = emqx_slow_subs:clear_history(),
return(ok).
get_history(_Bindings, Params) ->
RowFun = fun(#top_k{index = ?INDEX(Latency, ClientId),
type = Type,
last_update_time = Ts}) ->
[{clientid, ClientId},
{latency, Latency},
{type, Type},
{last_update_time, Ts}]
end,
Return = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, Params, RowFun),
return({ok, Return}).

View File

@ -1,379 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics).
-behaviour(gen_server).
-include_lib("include/emqx.hrl").
-include_lib("include/logger.hrl").
-include_lib("emqx_plugin_libs/include/emqx_st_statistics.hrl").
-logger_header("[SLOW TOPICS]").
-export([ start_link/1, on_publish_done/3, enable/0
, disable/0, clear_history/0
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-compile(nowarn_unused_type).
-type state() :: #{ config := proplist:proplist()
, period := pos_integer()
, last_tick_at := pos_integer()
, counter := counters:counters_ref()
, enable := boolean()
}.
-type log() :: #{ topic := emqx_types:topic()
, count := pos_integer()
, average := float()
}.
-type window_log() :: #{ last_tick_at := pos_integer()
, logs := [log()]
}.
-record(slow_log, { topic :: emqx_types:topic()
, count :: integer() %% 0 will be used in initial value
, elapsed :: integer()
}).
-type message() :: #message{}.
-import(proplists, [get_value/2]).
-define(NOW, erlang:system_time(millisecond)).
-define(QUOTA_IDX, 1).
-type slow_log() :: #slow_log{}.
-type top_k_map() :: #{emqx_types:topic() => top_k()}.
-type publish_done_env() :: #{ ignore_before_create := boolean()
, threshold := pos_integer()
, counter := counters:counters_ref()
}.
-type publish_done_args() :: #{session_rebirth_time => pos_integer()}.
-ifdef(TEST).
-define(TOPK_ACCESS, public).
-else.
-define(TOPK_ACCESS, protected).
-endif.
%% erlang term order
%% number < atom < reference < fun < port < pid < tuple < list < bit string
%% ets ordered_set is ascending by term order
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%% @doc Start the st_statistics
-spec(start_link(Env :: list()) -> emqx_types:startlink_ret()).
start_link(Env) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
-spec on_publish_done(message(), publish_done_args(), publish_done_env()) -> ok.
on_publish_done(#message{timestamp = Timestamp},
#{session_rebirth_time := Created},
#{ignore_before_create := IgnoreBeforeCreate})
when IgnoreBeforeCreate, Timestamp < Created ->
ok;
on_publish_done(#message{timestamp = Timestamp} = Msg,
_,
#{threshold := Threshold, counter := Counter}) ->
case ?NOW - Timestamp of
Elapsed when Elapsed > Threshold ->
case get_log_quota(Counter) of
true ->
update_log(Msg, Elapsed);
_ ->
ok
end;
_ ->
ok
end.
clear_history() ->
gen_server:call(?MODULE, ?FUNCTION_NAME).
enable() ->
gen_server:call(?MODULE, {enable, true}).
disable() ->
gen_server:call(?MODULE, {enable, false}).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Env]) ->
erlang:process_flag(trap_exit, true),
init_log_tab(Env),
init_topk_tab(Env),
notification_tick(Env),
Counter = counters:new(1, [write_concurrency]),
set_log_quota(Env, Counter),
Threshold = get_value(threshold_time, Env),
IgnoreBeforeCreate = get_value(ignore_before_create, Env),
load(IgnoreBeforeCreate, Threshold, Counter),
{ok, #{config => Env,
period => 1,
last_tick_at => ?NOW,
counter => Counter,
enable => true}}.
handle_call({enable, Enable}, _From,
#{config := Cfg, counter := Counter, enable := IsEnable} = State) ->
State2 = case Enable of
IsEnable ->
State;
true ->
Threshold = get_value(threshold_time, Cfg),
IgnoreBeforeCreate = get_value(ignore_before_create, Cfg),
load(IgnoreBeforeCreate, Threshold, Counter),
State#{enable := true};
_ ->
unload(),
State#{enable := false}
end,
{reply, ok, State2};
handle_call(clear_history, _, State) ->
ets:delete_all_objects(?TOPK_TAB),
{reply, ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(notification_tick, #{config := Cfg, period := Period} = State) ->
notification_tick(Cfg),
do_notification(State),
{noreply, State#{last_tick_at := ?NOW,
period := Period + 1}};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _) ->
unload(),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
notification_tick(Env) ->
TimeWindow = get_value(time_window, Env),
erlang:send_after(TimeWindow, self(), ?FUNCTION_NAME).
init_log_tab(_) ->
?LOG_TAB = ets:new(?LOG_TAB, [ set, public, named_table
, {keypos, #slow_log.topic}, {write_concurrency, true}
, {read_concurrency, true}
]).
init_topk_tab(_) ->
?TOPK_TAB = ets:new(?TOPK_TAB, [ set, ?TOPK_ACCESS, named_table
, {keypos, #top_k.rank}, {write_concurrency, false}
, {read_concurrency, true}
]).
-spec get_log_quota(counters:counters_ref()) -> boolean().
get_log_quota(Counter) ->
case counters:get(Counter, ?QUOTA_IDX) of
Quota when Quota > 0 ->
counters:sub(Counter, ?QUOTA_IDX, 1),
true;
_ ->
false
end.
-spec set_log_quota(proplists:proplist(), counters:counters_ref()) -> ok.
set_log_quota(Cfg, Counter) ->
MaxLogNum = get_value(max_log_num, Cfg),
counters:put(Counter, ?QUOTA_IDX, MaxLogNum).
-spec update_log(message(), pos_integer()) -> ok.
update_log(#message{topic = Topic}, Elapsed) ->
_ = ets:update_counter(?LOG_TAB,
Topic,
[{#slow_log.count, 1}, {#slow_log.elapsed, Elapsed}],
#slow_log{topic = Topic,
count = 0,
elapsed = 0}),
ok.
-spec do_notification(state()) -> true.
do_notification(#{last_tick_at := TickTime,
config := Cfg,
period := Period,
counter := Counter}) ->
Logs = ets:tab2list(?LOG_TAB),
ets:delete_all_objects(?LOG_TAB),
start_publish(Logs, TickTime, Cfg),
set_log_quota(Cfg, Counter),
MaxRecord = get_value(top_k_num, Cfg),
update_topk(Logs, MaxRecord, Period).
-spec update_topk(list(slow_log()), pos_integer(), pos_integer()) -> true.
update_topk(Logs, MaxRecord, Period) ->
TopkMap = get_topk_map(Period),
TopkMap2 = update_topk_map(Logs, Period, TopkMap),
SortFun = fun(A, B) ->
A#top_k.average_count > B#top_k.average_count
end,
TopkL = lists:sort(SortFun, maps:values(TopkMap2)),
TopkL2 = lists:sublist(TopkL, 1, MaxRecord),
update_topk_tab(TopkL2).
-spec update_topk_map(list(slow_log()), pos_integer(), top_k_map()) -> top_k_map().
update_topk_map([#slow_log{topic = Topic,
count = LogTimes,
elapsed = LogElapsed} | T], Period, TopkMap) ->
case maps:get(Topic, TopkMap, undefined) of
undefined ->
Record = #top_k{rank = 1,
topic = Topic,
average_count = LogTimes,
average_elapsed = LogElapsed},
TopkMap2 = TopkMap#{Topic => Record},
update_topk_map(T, Period, TopkMap2);
#top_k{average_count = AvgCount,
average_elapsed = AvgElapsed} = Record ->
NewPeriod = Period + 1,
%% (a + b) / c = a / c + b / c
%% average_count(elapsed) dived NewPeriod in function get_topk_maps
AvgCount2 = AvgCount + LogTimes / NewPeriod,
AvgElapsed2 = AvgElapsed + LogElapsed / NewPeriod,
Record2 = Record#top_k{average_count = AvgCount2,
average_elapsed = AvgElapsed2},
update_topk_map(T, Period, TopkMap#{Topic := Record2})
end;
update_topk_map([], _, TopkMap) ->
TopkMap.
-spec update_topk_tab(list(top_k())) -> true.
update_topk_tab(Records) ->
Zip = fun(Rank, Item) -> Item#top_k{rank = Rank} end,
Len = erlang:length(Records),
RankedTopics = lists:zipwith(Zip, lists:seq(1, Len), Records),
ets:insert(?TOPK_TAB, RankedTopics).
start_publish(Logs, TickTime, Cfg) ->
emqx_pool:async_submit({fun do_publish/3, [Logs, TickTime, Cfg]}).
do_publish([], _, _) ->
ok;
do_publish(Logs, TickTime, Cfg) ->
BatchSize = get_value(notice_batch_size, Cfg),
do_publish(Logs, BatchSize, TickTime, Cfg, []).
do_publish([Log | T], Size, TickTime, Cfg, Cache) when Size > 0 ->
Cache2 = [convert_to_notice(Log) | Cache],
do_publish(T, Size - 1, TickTime, Cfg, Cache2);
do_publish(Logs, Size, TickTime, Cfg, Cache) when Size =:= 0 ->
publish(TickTime, Cfg, Cache),
do_publish(Logs, TickTime, Cfg);
do_publish([], _, TickTime, Cfg, Cache) ->
publish(TickTime, Cfg, Cache),
ok.
convert_to_notice(#slow_log{topic = Topic,
count = Count,
elapsed = Elapsed}) ->
#{topic => Topic,
count => Count,
average => Elapsed / Count}.
publish(TickTime, Cfg, Notices) ->
WindowLog = #{last_tick_at => TickTime,
logs => Notices},
Payload = emqx_json:encode(WindowLog),
_ = emqx:publish(#message{ id = emqx_guid:gen()
, qos = get_value(notice_qos, Cfg)
, from = ?MODULE
, topic = get_topic(Cfg)
, payload = Payload
, timestamp = ?NOW
}),
ok.
load(IgnoreBeforeCreate, Threshold, Counter) ->
_ = emqx:hook('message.publish_done',
fun ?MODULE:on_publish_done/3,
[#{ignore_before_create => IgnoreBeforeCreate,
threshold => Threshold,
counter => Counter}
]),
ok.
unload() ->
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3).
-spec get_topic(proplists:proplist()) -> binary().
get_topic(Cfg) ->
case get_value(notice_topic, Cfg) of
Topic when is_binary(Topic) ->
Topic;
Topic ->
erlang:list_to_binary(Topic)
end.
-spec get_topk_map(pos_integer()) -> top_k_map().
get_topk_map(Period) ->
Size = ets:info(?TOPK_TAB, size),
get_topk_map(1, Size, Period, #{}).
-spec get_topk_map(pos_integer(),
non_neg_integer(), pos_integer(), top_k_map()) -> top_k_map().
get_topk_map(Index, Size, _, TopkMap) when Index > Size ->
TopkMap;
get_topk_map(Index, Size, Period, TopkMap) ->
[#top_k{topic = Topic,
average_count = AvgCount,
average_elapsed = AvgElapsed} = R] = ets:lookup(?TOPK_TAB, Index),
NewPeriod = Period + 1,
TotalTimes = AvgCount * Period,
AvgCount2 = TotalTimes / NewPeriod,
AvgElapsed2 = TotalTimes * AvgElapsed / NewPeriod,
TopkMap2 = TopkMap#{Topic => R#top_k{average_count = AvgCount2,
average_elapsed = AvgElapsed2}},
get_topk_map(Index + 1, Size, Period, TopkMap2).

View File

@ -1,83 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_api).
-rest_api(#{name => clear_history,
method => 'DELETE',
path => "/slow_topic",
func => clear_history,
descr => "Clear current data and re count slow topic"}).
-rest_api(#{name => get_history,
method => 'GET',
path => "/slow_topic",
func => get_history,
descr => "Get slow topics statistics record data"}).
-export([ clear_history/2
, get_history/2
]).
-include("include/emqx_st_statistics.hrl").
-import(minirest, [return/1]).
%%--------------------------------------------------------------------
%% HTTP API
%%--------------------------------------------------------------------
clear_history(_Bindings, _Params) ->
ok = emqx_st_statistics:clear_history(),
return(ok).
get_history(_Bindings, Params) ->
PageT = proplists:get_value(<<"_page">>, Params),
LimitT = proplists:get_value(<<"_limit">>, Params),
Page = erlang:binary_to_integer(PageT),
Limit = erlang:binary_to_integer(LimitT),
Start = (Page - 1) * Limit + 1,
Size = ets:info(?TOPK_TAB, size),
End = Start + Limit - 1,
{HasNext, Count, Infos} = get_history(Start, End, Size),
return({ok, #{meta => #{page => Page,
limit => Limit,
hasnext => HasNext,
count => Count},
data => Infos}}).
get_history(Start, _End, Size) when Start > Size ->
{false, 0, []};
get_history(Start, End, Size) when End > Size ->
get_history(Start, Size, Size);
get_history(Start, End, Size) ->
Fold = fun(Rank, Acc) ->
[#top_k{topic = Topic
, average_count = Count
, average_elapsed = Elapsed}] = ets:lookup(?TOPK_TAB, Rank),
Info = [ {rank, Rank}
, {topic, Topic}
, {count, Count}
, {elapsed, Elapsed}],
[Info | Acc]
end,
Infos = lists:foldl(Fold, [], lists:seq(Start, End)),
{End < Size, End - Start + 1, Infos}.

View File

@ -2217,47 +2217,34 @@ module.presence.qos = 1
## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
##--------------------------------------------------------------------
## Slow Topic Statistics Module
## Slow Subscribers Statistics Module
## Threshold time of slow topics statistics
##
## Default: 10 seconds
#module.st_statistics.threshold_time = 10S
## ignore the messages that before than session created
##
## Default: true
#module.st_statistics.ignore_before_create = true
## Time window of slow topics statistics
## the expire time of the record which in topk
##
## Value: 5 minutes
#module.st_statistics.time_window = 5M
#module.slow_subs.expire_interval = 5m
## Maximum of slow topics log, log will clear when enter new time window
## maximum number of Top-K record
##
## Value: 500
#module.st_statistics.max_log_num = 500
## Value: 10
#module.slow_subs.top_k_num = 10
## Top-K record for slow topics, update from logs
## enable notification
## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval
## publish is disabled if set to 0s.
##
## Value: 500
#module.st_statistics.top_k_num = 500
## Topic of notification
##
## Defaut: $slow_topics
#module.st_statistics.notice_topic = $slow_topics
## Defaut: 0s
#module.slow_subs.notice_interval = 0s
## QoS of notification message in notice topic
##
## Defaut: 0
#module.st_statistics.notice_qos = 0
#module.slow_subs.notice_qos = 0
## Maximum information number in one notification
##
## Default: 500
#module.st_statistics.notice_batch_size = 500
## Default: 100
#module.slow_subs.notice_batch_size = 100
## CONFIG_SECTION_END=modules ==================================================

View File

@ -14,14 +14,14 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_st_statistics).
-module(emqx_mod_slow_subs).
-behaviour(emqx_gen_mod).
-include_lib("include/emqx.hrl").
-include_lib("include/logger.hrl").
-logger_header("[SLOW TOPICS]").
-logger_header("[SLOW Subs]").
%% emqx_gen_mod callbacks
-export([ load/1
@ -29,7 +29,7 @@
, description/0
]).
-define(LIB, emqx_st_statistics).
-define(LIB, emqx_slow_subs).
%%--------------------------------------------------------------------
%% Load/Unload
@ -46,4 +46,4 @@ unload(_Env) ->
ok.
description() ->
"EMQ X Slow Topic Statistics Module".
"EMQ X Slow Subscribers Statistics Module".

View File

@ -69,6 +69,7 @@ stop_child(ChildId) ->
init([]) ->
ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]),
emqx_slow_subs:init_topk_tab(),
{ok, {{one_for_one, 10, 100}, []}}.
%%--------------------------------------------------------------------

View File

@ -14,18 +14,18 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_SUITE).
-module(emqx_slow_subs_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include("include/emqx_mqtt.hrl").
-include_lib("include/emqx.hrl").
%-define(LOGT(Format, Args), ct:pal(Format, Args)).
-define(LOG_TAB, emqx_st_statistics_log).
-define(TOPK_TAB, emqx_st_statistics_topk).
-define(TOPK_TAB, emqx_slow_subs_topk).
-define(NOW, erlang:system_time(millisecond)).
all() -> emqx_ct:all(?MODULE).
@ -39,11 +39,11 @@ end_per_suite(Config) ->
Config.
init_per_testcase(_, Config) ->
emqx_mod_st_statistics:load(base_conf()),
emqx_mod_slow_subs:load(base_conf()),
Config.
end_per_testcase(_, _) ->
emqx_mod_st_statistics:unload(undefined),
emqx_mod_slow_subs:unload([]),
ok.
%%--------------------------------------------------------------------
@ -51,62 +51,74 @@ end_per_testcase(_, _) ->
%%--------------------------------------------------------------------
t_log_and_pub(_) ->
%% Sub topic first
SubBase = "/test",
emqx:subscribe("$slow_topics"),
Clients = start_client(SubBase),
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
Clients = start_client(Subs),
emqx:subscribe("$SYS/brokers/+/slow_subs"),
timer:sleep(1000),
Now = ?NOW,
%% publish
?assert(ets:info(?LOG_TAB, size) =:= 0),
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("~s~p", [SubBase, I])),
Msg = emqx_message:make(Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 1000})
Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500})
end,
lists:seq(1, 10)),
timer:sleep(2400),
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 500})
end,
lists:seq(1, 10)),
?assert(ets:info(?LOG_TAB, size) =:= 0),
?assert(ets:info(?TOPK_TAB, size) =:= 3),
try_receive(3),
try_receive(2),
timer:sleep(1000),
Size = ets:info(?TOPK_TAB, size),
%% some time record maybe delete due to it expired
?assert(Size =< 6 andalso Size >= 4),
timer:sleep(1500),
Recs = try_receive([]),
RecSum = lists:sum(Recs),
?assert(RecSum >= 5),
?assert(lists:all(fun(E) -> E =< 3 end, Recs)),
timer:sleep(2000),
?assert(ets:info(?TOPK_TAB, size) =:= 0),
[Client ! stop || Client <- Clients],
ok.
base_conf() ->
[{top_k_num, 3},
{threshold_time, 10},
{notice_qos, 0},
{notice_batch_size, 3},
{notice_topic,"$slow_topics"},
{time_window, 2000},
{max_log_num, 5}].
[ {top_k_num, 5}
, {expire_interval, timer:seconds(3)}
, {notice_interval, 1500}
, {notice_qos, 0}
, {notice_batch_size, 3}
].
start_client(Base) ->
[spawn(fun() ->
Topic = list_to_binary(io_lib:format("~s~p", [Base, I])),
client(Topic)
end)
|| I <- lists:seq(1, 10)].
start_client(Subs) ->
[spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)].
client(Topic) ->
client(I, Subs) ->
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, Topic},
{clientid, io_lib:format("slow_subs_~p", [I])},
{username, <<"plain">>},
{password, <<"plain">>}]),
{ok, _} = emqtt:connect(C),
{ok, _, _} = emqtt:subscribe(C, Topic),
Len = erlang:length(Subs),
Sub = lists:nth(I rem Len + 1, Subs),
_ = emqtt:subscribe(C, Sub),
receive
stop ->
ok
end.
try_receive(L) ->
try_receive(Acc) ->
receive
{deliver, _, #message{payload = Payload}} ->
#{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]),
?assertEqual(length(Logs), L)
try_receive([length(Logs) | Acc])
after 500 ->
?assert(false)
Acc
end.

View File

@ -12,9 +12,9 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------n
-module(emqx_st_statistics_api_SUITE).
-module(emqx_slow_subs_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -24,7 +24,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_management/include/emqx_mgmt.hrl").
-include_lib("emqx_plugin_libs/include/emqx_st_statistics.hrl").
-include_lib("emqx_plugin_libs/include/emqx_slow_subs.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
@ -33,6 +33,7 @@
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
-define(NOW, erlang:system_time(millisecond)).
all() ->
emqx_ct:all(?MODULE).
@ -48,78 +49,52 @@ end_per_suite(Config) ->
Config.
init_per_testcase(_, Config) ->
emqx_mod_st_statistics:load(emqx_st_statistics_SUITE:base_conf()),
emqx_mod_slow_subs:load(base_conf()),
Config.
end_per_testcase(_, Config) ->
emqx_mod_st_statistics:unload(undefined),
emqx_mod_slow_subs:unload([]),
Config.
base_conf() ->
[ {top_k_num, 5}
, {expire_interval, timer:seconds(60)}
, {notice_interval, 0}
, {notice_qos, 0}
, {notice_batch_size, 3}
].
t_get_history(_) ->
ets:insert(?TOPK_TAB, #top_k{rank = 1,
topic = <<"test">>,
average_count = 12,
average_elapsed = 1500}),
Now = ?NOW,
Each = fun(I) ->
ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])),
ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId),
type = average,
last_update_time = Now})
end,
{ok, Data} = request_api(get, api_path(["slow_topic"]), "_page=1&_limit=10",
lists:foreach(Each, lists:seq(1, 5)),
{ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10",
auth_header_()),
#{meta := Meta, data := [First | _]} = decode(Data),
ShouldRet = #{meta => #{page => 1,
limit => 10,
hasnext => false,
count => 1},
data => [#{topic => <<"test">>,
rank => 1,
elapsed => 1500,
count => 12}],
code => 0},
RMeta = #{page => 1, limit => 10, count => 5},
?assertEqual(RMeta, Meta),
Ret = decode(Data),
RFirst = #{clientid => <<"test_5">>,
latency => 5,
type => <<"average">>,
last_update_time => Now},
?assertEqual(ShouldRet, Ret).
t_rank_range(_) ->
Insert = fun(Rank) ->
ets:insert(?TOPK_TAB,
#top_k{rank = Rank,
topic = <<"test">>,
average_count = 12,
average_elapsed = 1500})
end,
lists:foreach(Insert, lists:seq(1, 15)),
timer:sleep(100),
{ok, Data} = request_api(get, api_path(["slow_topic"]), "_page=1&_limit=10",
auth_header_()),
Meta1 = #{page => 1, limit => 10, hasnext => true, count => 10},
Ret1 = decode(Data),
?assertEqual(Meta1, maps:get(meta, Ret1)),
%% End > Size
{ok, Data2} = request_api(get, api_path(["slow_topic"]), "_page=2&_limit=10",
auth_header_()),
Meta2 = #{page => 2, limit => 10, hasnext => false, count => 5},
Ret2 = decode(Data2),
?assertEqual(Meta2, maps:get(meta, Ret2)),
%% Start > Size
{ok, Data3} = request_api(get, api_path(["slow_topic"]), "_page=3&_limit=10",
auth_header_()),
Meta3 = #{page => 3, limit => 10, hasnext => false, count => 0},
Ret3 = decode(Data3),
?assertEqual(Meta3, maps:get(meta, Ret3)).
?assertEqual(RFirst, First).
t_clear(_) ->
ets:insert(?TOPK_TAB, #top_k{rank = 1,
topic = <<"test">>,
average_count = 12,
average_elapsed = 1500}),
ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>),
type = average,
last_update_time = ?NOW}),
{ok, _} = request_api(delete, api_path(["slow_topic"]), [],
{ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [],
auth_header_()),
?assertEqual(0, ets:info(?TOPK_TAB, size)).

View File

@ -1007,6 +1007,18 @@ end}.
{datatype, {duration, s}}
]}.
%% @doc the number of smaples for calculate the average latency of delivery
{mapping, "zone.$name.latency_samples", "emqx.zones", [
{default, 10},
{datatype, integer}
]}.
%% @doc Threshold for slow subscription statistics
{mapping, "zone.$name.latency_stats_threshold", "emqx.zones", [
{default, "100ms"},
{datatype, {duration, ms}}
]}.
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [
{default, 0},
@ -2218,43 +2230,28 @@ end}.
{datatype, string}
]}.
{mapping, "module.st_statistics.threshold_time", "emqx.modules", [
{default, "10s"},
{mapping, "module.slow_subs.expire_interval", "emqx.modules", [
{default, "5m"},
{datatype, {duration, ms}}
]}.
{mapping, "module.st_statistics.ignore_before_create", "emqx.modules", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "module.st_statistics.time_window", "emqx.modules", [
{default, "5M"},
{datatype, {duration, ms}}
]}.
{mapping, "module.st_statistics.max_log_num", "emqx.modules", [
{mapping, "module.slow_subs.top_k_num", "emqx.modules", [
{default, 500},
{datatype, integer}
]}.
{mapping, "module.st_statistics.top_k_num", "emqx.modules", [
{default, 500},
{datatype, integer}
{mapping, "module.slow_subs.notice_interval", "emqx.modules", [
{default, "0s"},
{datatype, {duration, ms}}
]}.
{mapping, "module.st_statistics.notice_topic", "emqx.modules", [
{default, "$slow_topics"},
{datatype, string}
]}.
{mapping, "module.st_statistics.notice_qos", "emqx.modules", [
{mapping, "module.slow_subs.notice_qos", "emqx.modules", [
{default, 0},
{datatype, integer},
{validators, ["range:0-1"]}
]}.
{mapping, "module.st_statistics.notice_batch_size", "emqx.modules", [
{mapping, "module.slow_subs.notice_batch_size", "emqx.modules", [
{default, 500},
{datatype, integer}
]}.
@ -2283,8 +2280,8 @@ end}.
end, TotalRules)
end,
SlowTopic = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.st_statistics", Conf),
SlowSubs = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.slow_subs", Conf),
[{erlang:list_to_atom(Key), Value} || {[_, _, Key], Value} <- List]
end,
@ -2295,7 +2292,7 @@ end}.
[{emqx_mod_topic_metrics, []}],
[{emqx_mod_delayed, []}],
[{emqx_mod_trace, []}],
[{emqx_mod_st_statistics, SlowTopic()}],
[{emqx_mod_slow_subs, SlowSubs()}],
[{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}]
])
end}.

View File

@ -106,7 +106,7 @@ test_plugins() ->
test_deps() ->
[ {bbmustache, "1.10.0"}
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.9"}}}
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.11"}}}
, meck
].

View File

@ -95,6 +95,8 @@
-import(emqx_zone, [get_env/3]).
-record(session, {
%% Client's id
clientid :: emqx_types:clientid(),
%% Clients Subscriptions.
subscriptions :: map(),
%% Max subscriptions allowed
@ -121,8 +123,15 @@
%% Awaiting PUBREL Timeout (Unit: millsecond)
await_rel_timeout :: timeout(),
%% Created at
created_at :: pos_integer()
}).
created_at :: pos_integer(),
%% Message deliver latency stats
latency_stats :: emqx_message_latency_stats:stats()
}).
%% in the previous code, we will replace the message record with the pubrel atom
%% in the pubrec function, this will lose the creation time of the message,
%% but now we need this time to calculate latency, so now pubrel atom is changed to this record
-record(pubrel_await, {timestamp :: non_neg_integer()}).
-type(session() :: #session{}).
@ -148,19 +157,26 @@
mqueue_dropped,
next_pkt_id,
awaiting_rel_cnt,
awaiting_rel_max
awaiting_rel_max,
latency_stats
]).
-define(DEFAULT_BATCH_N, 1000).
-ifdef(TEST).
-define(GET_CLIENT_ID(C), maps:get(clientid, C, <<>>)).
-else.
-define(GET_CLIENT_ID(C), maps:get(clientid, C)).
-endif.
%%--------------------------------------------------------------------
%% Init a Session
%%--------------------------------------------------------------------
-spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()).
init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
#session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
init(#{zone := Zone} = CInfo, #{receive_maximum := MaxInflight}) ->
#session{clientid = ?GET_CLIENT_ID(CInfo),
max_subscriptions = get_env(Zone, max_subscriptions, 0),
subscriptions = #{},
upgrade_qos = get_env(Zone, upgrade_qos, false),
inflight = emqx_inflight:new(MaxInflight),
@ -170,7 +186,8 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
awaiting_rel = #{},
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)),
created_at = erlang:system_time(millisecond)
created_at = erlang:system_time(millisecond),
latency_stats = emqx_message_latency_stats:new(Zone)
}.
%% @private init mq
@ -227,7 +244,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout div 1000;
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt.
CreatedAt;
info(latency_stats, #session{latency_stats = Stats}) ->
emqx_message_latency_stats:latency(Stats).
%% @doc Get stats of the session.
-spec(stats(session()) -> emqx_types:stats()).
@ -317,13 +336,12 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
-> {ok, emqx_types:message(), session()}
| {ok, emqx_types:message(), replies(), session()}
| {error, emqx_types:reason_code()}).
puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) ->
puback(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
emqx:run_hook('message.publish_done',
[Msg, #{session_rebirth_time => CreatedAt}]),
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
Session2 = update_latency(Msg, Session),
return_with(Msg, dequeue(Session2#session{inflight = Inflight1}));
{value, {_Pubrel, _Ts}} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -343,15 +361,13 @@ return_with(Msg, {ok, Publishes, Session}) ->
-spec(pubrec(emqx_types:packet_id(), session())
-> {ok, emqx_types:message(), session()}
| {error, emqx_types:reason_code()}).
pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) ->
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
%% execute hook here, because message record will be replaced by pubrel
emqx:run_hook('message.publish_done',
[Msg, #{session_rebirth_time => CreatedAt}]),
Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}),
Inflight1 = emqx_inflight:update(PacketId, Update, Inflight),
{ok, Msg, Session#session{inflight = Inflight1}};
{value, {pubrel, _Ts}} ->
{value, {_PUBREL, _Ts}} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
@ -380,9 +396,10 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
| {error, emqx_types:reason_code()}).
pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {pubrel, _Ts}} ->
{value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) ->
Session2 = update_latency(Pubrel, Session),
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
dequeue(Session#session{inflight = Inflight1});
dequeue(Session2#session{inflight = Inflight1});
{value, _Other} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
@ -550,11 +567,16 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
%%--------------------------------------------------------------------
-spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
retry(Session = #session{inflight = Inflight}) ->
retry(Session = #session{inflight = Inflight, retry_interval = RetryInterval}) ->
case emqx_inflight:is_empty(Inflight) of
true -> {ok, Session};
false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
[], erlang:system_time(millisecond), Session)
false ->
Now = erlang:system_time(millisecond),
Session2 = check_expire_latency(Now, RetryInterval, Session),
retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
[],
Now,
Session2)
end.
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
@ -581,8 +603,8 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -
{[{PacketId, Msg1} | Acc], Inflight1}
end;
retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight),
retry_delivery(PacketId, Pubrel, Now, Acc, Inflight) ->
Inflight1 = emqx_inflight:update(PacketId, {Pubrel, Now}, Inflight),
{[{pubrel, PacketId} | Acc], Inflight1}.
%%--------------------------------------------------------------------
@ -626,10 +648,10 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions =
-spec(replay(session()) -> {ok, replies(), session()}).
replay(Session = #session{inflight = Inflight}) ->
Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
{pubrel, PacketId};
Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) ->
{pubrel, PacketId};
({PacketId, {Msg, _Ts}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}
{PacketId, emqx_message:set_flag(dup, true, Msg)}
end, emqx_inflight:to_list(Inflight)),
case dequeue(Session) of
{ok, NSession} -> {ok, Pubs, NSession};
@ -677,6 +699,35 @@ next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) ->
next_pkt_id(Session = #session{next_pkt_id = Id}) ->
Session#session{next_pkt_id = Id + 1}.
%%--------------------------------------------------------------------
%% Message Latency Stats
%%--------------------------------------------------------------------
update_latency(Msg,
#session{clientid = ClientId,
latency_stats = Stats,
created_at = CreateAt} = S) ->
case get_birth_timestamp(Msg, CreateAt) of
0 -> S;
Ts ->
Latency = erlang:system_time(millisecond) - Ts,
Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats),
S#session{latency_stats = Stats2}
end.
check_expire_latency(Now, Interval,
#session{clientid = ClientId, latency_stats = Stats} = S) ->
Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats),
S#session{latency_stats = Stats2}.
get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
Ts;
get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts ->
Ts;
get_birth_timestamp(_, _) ->
0.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------

View File

@ -0,0 +1,105 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_message_latency_stats).
%% API
-export([ new/1, new/2, update/3
, check_expire/4, latency/1]).
-define(NOW, erlang:system_time(millisecond)).
-define(MINIMUM_INSERT_INTERVAL, 1000).
-define(MINIMUM_THRESHOLD, 100).
-opaque stats() :: #{ threshold := number()
, ema := emqx_moving_average:ema()
, last_update_time := timestamp()
, last_access_time := timestamp() %% timestamp of last access top-k
, last_insert_value := non_neg_integer()
}.
-type timestamp() :: non_neg_integer().
-type timespan() :: number().
-type latency_type() :: average
| expire.
-import(emqx_zone, [get_env/3]).
-export_type([stats/0, latency_type/0]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new(emqx_types:zone()) -> stats().
new(Zone) ->
Samples = get_env(Zone, latency_samples, 1),
Threshold = get_env(Zone, latency_stats_threshold, ?MINIMUM_THRESHOLD),
new(Samples, Threshold).
-spec new(non_neg_integer(), number()) -> stats().
new(SamplesT, ThresholdT) ->
Samples = erlang:max(1, SamplesT),
Threshold = erlang:max(?MINIMUM_THRESHOLD, ThresholdT),
#{ ema => emqx_moving_average:new(exponential, #{period => Samples})
, threshold => Threshold
, last_update_time => 0
, last_access_time => 0
, last_insert_value => 0
}.
-spec update(emqx_types:clientid(), number(), stats()) -> stats().
update(ClientId, Val, #{ema := EMA} = Stats) ->
Now = ?NOW,
#{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA),
Stats2 = call_hook(ClientId, Now, average, Latency, Stats),
Stats2#{ ema := EMA2
, last_update_time := ?NOW}.
-spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats().
check_expire(_, Now, Interval, #{last_update_time := LUT} = S)
when LUT >= Now - Interval ->
S;
check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) ->
Latency = Now - LUT,
call_hook(ClientId, Now, expire, Latency, S).
-spec latency(stats()) -> number().
latency(#{ema := #{average := Average}}) ->
Average.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats().
call_hook(_, Now, _, _, #{last_access_time := LIT} = S)
when LIT >= Now - ?MINIMUM_INSERT_INTERVAL ->
S;
call_hook(_, _, _, Latency, #{threshold := Threshold} = S)
when Latency =< Threshold ->
S;
call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) ->
Arg = #{clientid => ClientId,
latency => erlang:floor(Latency),
type => Type,
last_insert_value => LIV,
update_time => Now},
emqx:run_hook('message.slow_subs_stats', [Arg]),
Stats#{last_insert_value := Latency,
last_access_time := Now}.

View File

@ -0,0 +1,90 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @see https://en.wikipedia.org/wiki/Moving_average
-module(emqx_moving_average).
%% API
-export([new/0, new/1, new/2, update/2]).
-type type() :: cumulative
| exponential.
-type ema() :: #{ type := exponential
, average := 0 | float()
, coefficient := float()
}.
-type cma() :: #{ type := cumulative
, average := 0 | float()
, count := non_neg_integer()
}.
-type moving_average() :: ema()
| cma().
-define(DEF_EMA_ARG, #{period => 10}).
-define(DEF_AVG_TYPE, exponential).
-export_type([type/0, moving_average/0, ema/0, cma/0]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new() -> moving_average().
new() ->
new(?DEF_AVG_TYPE, #{}).
-spec new(type()) -> moving_average().
new(Type) ->
new(Type, #{}).
-spec new(type(), Args :: map()) -> moving_average().
new(cumulative, _) ->
#{ type => cumulative
, average => 0
, count => 0
};
new(exponential, Arg) ->
#{period := Period} = maps:merge(?DEF_EMA_ARG, Arg),
#{ type => exponential
, average => 0
%% coefficient = 2/(N+1) is a common convention, see the wiki link for details
, coefficient => 2 / (Period + 1)
}.
-spec update(number(), moving_average()) -> moving_average().
update(Val, #{average := 0} = Avg) ->
Avg#{average := Val};
update(Val, #{ type := cumulative
, average := Average
, count := Count} = CMA) ->
NewCount = Count + 1,
CMA#{average := (Count * Average + Val) / NewCount,
count := NewCount};
update(Val, #{ type := exponential
, average := Average
, coefficient := Coefficient} = EMA) ->
EMA#{average := Coefficient * Val + (1 - Coefficient) * Average}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -180,7 +180,8 @@ t_puback_with_dequeue(_) ->
?assertEqual(<<"t2">>, emqx_message:topic(Msg3)).
t_puback_error_packet_id_in_use(_) ->
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
Now = ts(millisecond),
Inflight = emqx_inflight:insert(1, {{pubrel_await, Now}, Now}, emqx_inflight:new()),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
emqx_session:puback(1, session(#{inflight => Inflight})).
@ -192,10 +193,11 @@ t_pubrec(_) ->
Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{ok, Msg, Session1} = emqx_session:pubrec(2, Session),
?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))).
t_pubrec_packet_id_in_use_error(_) ->
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
Now = ts(millisecond),
Inflight = emqx_inflight:insert(1, {{pubrel_await, Now}, Now}, emqx_inflight:new()),
{error, ?RC_PACKET_IDENTIFIER_IN_USE} =
emqx_session:pubrec(1, session(#{inflight => Inflight})).
@ -211,7 +213,8 @@ t_pubrel_error_packetid_not_found(_) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()).
t_pubcomp(_) ->
Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()),
Now = ts(millisecond),
Inflight = emqx_inflight:insert(1, {{pubrel_await, Now}, Now}, emqx_inflight:new()),
Session = session(#{inflight => Inflight}),
{ok, Session1} = emqx_session:pubcomp(1, Session),
?assertEqual(0, emqx_session:info(inflight_cnt, Session1)).
@ -260,7 +263,7 @@ t_deliver_qos0(_) ->
t_deliver_qos1(_) ->
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
{ok, Session} = emqx_session:subscribe(
clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()),
Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]],
{ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session),
?assertEqual(2, emqx_session:info(inflight_cnt, Session1)),
@ -373,7 +376,7 @@ mqueue(Opts) ->
session() -> session(#{}).
session(InitFields) when is_map(InitFields) ->
maps:fold(fun(Field, Value, Session) ->
emqx_session:set_field(Field, Value, Session)
emqx_session:set_field(Field, Value, Session)
end,
emqx_session:init(#{zone => channel}, #{receive_maximum => 0}),
InitFields).
@ -396,4 +399,3 @@ ts(second) ->
erlang:system_time(second);
ts(millisecond) ->
erlang:system_time(millisecond).