diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index e068c5384..d42d2a486 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -49,17 +49,46 @@ paginate(Tables, Params, RowFun) -> query_handle(Table) when is_atom(Table) -> qlc:q([R|| R <- ets:table(Table)]); + +query_handle({Table, Opts}) when is_atom(Table) -> + qlc:q([R|| R <- ets:table(Table, Opts)]); + query_handle([Table]) when is_atom(Table) -> qlc:q([R|| R <- ets:table(Table)]); + +query_handle([{Table, Opts}]) when is_atom(Table) -> + qlc:q([R|| R <- ets:table(Table, Opts)]); + query_handle(Tables) -> - qlc:append([qlc:q([E || E <- ets:table(T)]) || T <- Tables]). + Fold = fun({Table, Opts}, Acc) -> + Handle = qlc:q([R|| R <- ets:table(Table, Opts)]), + [Handle | Acc]; + (Table, Acc) -> + Handle = qlc:q([R|| R <- ets:table(Table)]), + [Handle | Acc] + end, + Handles = lists:foldl(Fold, [], Tables), + qlc:append(lists:reverse(Handles)). count(Table) when is_atom(Table) -> ets:info(Table, size); + +count({Table, _Opts}) when is_atom(Table) -> + ets:info(Table, size); + count([Table]) when is_atom(Table) -> ets:info(Table, size); + +count([{Table, _Opts}]) when is_atom(Table) -> + ets:info(Table, size); + count(Tables) -> - lists:sum([count(T) || T <- Tables]). + Fold = fun({Table, _Opts}, Acc) -> + count(Table) ++ Acc; + (Table, Acc) -> + count(Table) ++ Acc + end, + lists:foldl(Fold, 0, Tables). count(Table, Nodes) -> lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]). diff --git a/apps/emqx_plugin_libs/include/emqx_st_statistics.hrl b/apps/emqx_plugin_libs/include/emqx_slow_subs.hrl similarity index 68% rename from apps/emqx_plugin_libs/include/emqx_st_statistics.hrl rename to apps/emqx_plugin_libs/include/emqx_slow_subs.hrl index 9184c3194..0b5e3a035 100644 --- a/apps/emqx_plugin_libs/include/emqx_st_statistics.hrl +++ b/apps/emqx_plugin_libs/include/emqx_slow_subs.hrl @@ -14,12 +14,15 @@ %% limitations under the License. %%-------------------------------------------------------------------- --define(LOG_TAB, emqx_st_statistics_log). --define(TOPK_TAB, emqx_st_statistics_topk). +-define(TOPK_TAB, emqx_slow_subs_topk). --record(top_k, { rank :: pos_integer() - , topic :: emqx_types:topic() - , average_count :: number() - , average_elapsed :: number()}). +-define(INDEX(Latency, ClientId), {Latency, ClientId}). + +-record(top_k, { index :: index() + , type :: emqx_message_latency_stats:latency_type() + , last_update_time :: pos_integer() + , extra = [] + }). -type top_k() :: #top_k{}. +-type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()). diff --git a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl new file mode 100644 index 000000000..a50b5eb0e --- /dev/null +++ b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl @@ -0,0 +1,319 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs). + +-behaviour(gen_server). + +-include_lib("include/emqx.hrl"). +-include_lib("include/logger.hrl"). +-include_lib("emqx_plugin_libs/include/emqx_slow_subs.hrl"). + +-logger_header("[SLOW Subs]"). + +-export([ start_link/1, on_stats_update/2, enable/0 + , disable/0, clear_history/0, init_topk_tab/0 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-compile(nowarn_unused_type). + +-type state() :: #{ config := proplist:proplist() + , enable := boolean() + , last_tick_at := pos_integer() + }. + +-type log() :: #{ rank := pos_integer() + , clientid := emqx_types:clientid() + , latency := non_neg_integer() + , type := emqx_message_latency_stats:latency_type() + }. + +-type window_log() :: #{ last_tick_at := pos_integer() + , logs := [log()] + }. + +-type message() :: #message{}. + +-import(proplists, [get_value/2]). + +-type stats_update_args() :: #{ clientid := emqx_types:clientid() + , latency := non_neg_integer() + , type := emqx_message_latency_stats:latency_type() + , last_insert_value := non_neg_integer() + , update_time := timer:time() + }. + +-type stats_update_env() :: #{max_size := pos_integer()}. + +-ifdef(TEST). +-define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). +-else. +-define(EXPIRE_CHECK_INTERVAL, timer:seconds(10)). +-endif. + +-define(NOW, erlang:system_time(millisecond)). +-define(NOTICE_TOPIC_NAME, "slow_subs"). +-define(DEF_CALL_TIMEOUT, timer:seconds(10)). + +%% erlang term order +%% number < atom < reference < fun < port < pid < tuple < list < bit string + +%% ets ordered_set is ascending by term order + +%%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- +%% @doc Start the st_statistics +-spec(start_link(Env :: list()) -> emqx_types:startlink_ret()). +start_link(Env) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). + +%% XXX NOTE:pay attention to the performance here +-spec on_stats_update(stats_update_args(), stats_update_env()) -> true. +on_stats_update(#{clientid := ClientId, + latency := Latency, + type := Type, + last_insert_value := LIV, + update_time := Ts}, + #{max_size := MaxSize}) -> + + LastIndex = ?INDEX(LIV, ClientId), + Index = ?INDEX(Latency, ClientId), + + %% check whether the client is in the table + case ets:lookup(?TOPK_TAB, LastIndex) of + [#top_k{index = Index}] -> + %% if last value == the new value, return + true; + [_] -> + %% if Latency > minimum value, we should update it + %% if Latency < minimum value, maybe it can replace the minimum value + %% so alwyas update at here + %% do we need check if Latency == minimum ??? + ets:insert(?TOPK_TAB, + #top_k{index = Index, type = Type, last_update_time = Ts}), + ets:delete(?TOPK_TAB, LastIndex); + [] -> + %% try to insert + try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) + end. + +clear_history() -> + gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT). + +enable() -> + gen_server:call(?MODULE, {enable, true}, ?DEF_CALL_TIMEOUT). + +disable() -> + gen_server:call(?MODULE, {enable, false}, ?DEF_CALL_TIMEOUT). + +init_topk_tab() -> + case ets:whereis(?TOPK_TAB) of + undefined -> + ?TOPK_TAB = ets:new(?TOPK_TAB, + [ ordered_set, public, named_table + , {keypos, #top_k.index}, {write_concurrency, true} + , {read_concurrency, true} + ]); + _ -> + ?TOPK_TAB + end. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Conf]) -> + notice_tick(Conf), + expire_tick(Conf), + MaxSize = get_value(top_k_num, Conf), + load(MaxSize), + {ok, #{config => Conf, + last_tick_at => ?NOW, + enable => true}}. + +handle_call({enable, Enable}, _From, + #{config := Cfg, enable := IsEnable} = State) -> + State2 = case Enable of + IsEnable -> + State; + true -> + MaxSize = get_value(max_topk_num, Cfg), + load(MaxSize), + State#{enable := true}; + _ -> + unload(), + State#{enable := false} + end, + {reply, ok, State2}; + +handle_call(clear_history, _, State) -> + ets:delete_all_objects(?TOPK_TAB), + {reply, ok, State}; + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info(expire_tick, #{config := Cfg} = State) -> + expire_tick(Cfg), + Logs = ets:tab2list(?TOPK_TAB), + do_clear(Cfg, Logs), + {noreply, State}; + +handle_info(notice_tick, #{config := Cfg} = State) -> + notice_tick(Cfg), + Logs = ets:tab2list(?TOPK_TAB), + do_notification(Logs, State), + {noreply, State#{last_tick_at := ?NOW}}; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _) -> + unload(), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +expire_tick(_) -> + erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). + +notice_tick(Cfg) -> + case get_value(notice_interval, Cfg) of + 0 -> ok; + Interval -> + erlang:send_after(Interval, self(), ?FUNCTION_NAME), + ok + end. + +-spec do_notification(list(), state()) -> ok. +do_notification([], _) -> + ok; + +do_notification(Logs, #{last_tick_at := LastTickTime, config := Cfg}) -> + start_publish(Logs, LastTickTime, Cfg), + ok. + +start_publish(Logs, TickTime, Cfg) -> + emqx_pool:async_submit({fun do_publish/4, [Logs, erlang:length(Logs), TickTime, Cfg]}). + +do_publish([], _, _, _) -> + ok; + +do_publish(Logs, Rank, TickTime, Cfg) -> + BatchSize = get_value(notice_batch_size, Cfg), + do_publish(Logs, BatchSize, Rank, TickTime, Cfg, []). + +do_publish([Log | T], Size, Rank, TickTime, Cfg, Cache) when Size > 0 -> + Cache2 = [convert_to_notice(Rank, Log) | Cache], + do_publish(T, Size - 1, Rank - 1, TickTime, Cfg, Cache2); + +do_publish(Logs, Size, Rank, TickTime, Cfg, Cache) when Size =:= 0 -> + publish(TickTime, Cfg, Cache), + do_publish(Logs, Rank, TickTime, Cfg); + +do_publish([], _, _Rank, TickTime, Cfg, Cache) -> + publish(TickTime, Cfg, Cache), + ok. + +convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId), + type = Type, + last_update_time = Ts}) -> + #{rank => Rank, + clientid => ClientId, + latency => Latency, + type => Type, + timestamp => Ts}. + +publish(TickTime, Cfg, Notices) -> + WindowLog = #{last_tick_at => TickTime, + logs => lists:reverse(Notices)}, + Payload = emqx_json:encode(WindowLog), + Msg = #message{ id = emqx_guid:gen() + , qos = get_value(notice_qos, Cfg) + , from = ?MODULE + , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME) + , payload = Payload + , timestamp = ?NOW + }, + _ = emqx_broker:safe_publish(Msg), + ok. + +load(MaxSize) -> + _ = emqx:hook('message.slow_subs_stats', + fun ?MODULE:on_stats_update/2, + [#{max_size => MaxSize}]), + ok. + +unload() -> + emqx:unhook('message.slow_subs_stats', fun ?MODULE:on_stats_update/2). + +do_clear(Cfg, Logs) -> + Now = ?NOW, + Interval = get_value(expire_interval, Cfg), + Each = fun(#top_k{index = Index, last_update_time = Ts}) -> + case Now - Ts >= Interval of + true -> + ets:delete(?TOPK_TAB, Index); + _ -> + true + end + end, + lists:foreach(Each, Logs). + +try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) -> + case ets:info(?TOPK_TAB, size) of + Size when Size < MaxSize -> + %% if the size is under limit, insert it directly + ets:insert(?TOPK_TAB, + #top_k{index = Index, type = Type, last_update_time = Ts}); + _Size -> + %% find the minimum value + ?INDEX(Min, _) = First = + case ets:first(?TOPK_TAB) of + ?INDEX(_, _) = I -> I; + _ -> ?INDEX(Latency - 1, <<>>) + end, + + case Latency =< Min of + true -> true; + _ -> + ets:insert(?TOPK_TAB, + #top_k{index = Index, type = Type, last_update_time = Ts}), + + ets:delete(?TOPK_TAB, First) + end + end. diff --git a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl new file mode 100644 index 000000000..a7c9fc050 --- /dev/null +++ b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs_api). + +-rest_api(#{name => clear_history, + method => 'DELETE', + path => "/slow_subscriptions", + func => clear_history, + descr => "Clear current data and re count slow topic"}). + +-rest_api(#{name => get_history, + method => 'GET', + path => "/slow_subscriptions", + func => get_history, + descr => "Get slow topics statistics record data"}). + +-export([ clear_history/2 + , get_history/2 + ]). + +-include("include/emqx_slow_subs.hrl"). + +-import(minirest, [return/1]). + +%%-------------------------------------------------------------------- +%% HTTP API +%%-------------------------------------------------------------------- + +clear_history(_Bindings, _Params) -> + ok = emqx_slow_subs:clear_history(), + return(ok). + +get_history(_Bindings, Params) -> + RowFun = fun(#top_k{index = ?INDEX(Latency, ClientId), + type = Type, + last_update_time = Ts}) -> + [{clientid, ClientId}, + {latency, Latency}, + {type, Type}, + {last_update_time, Ts}] + end, + Return = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, Params, RowFun), + return({ok, Return}). diff --git a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl deleted file mode 100644 index 668cb3926..000000000 --- a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics.erl +++ /dev/null @@ -1,379 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_st_statistics). - --behaviour(gen_server). - --include_lib("include/emqx.hrl"). --include_lib("include/logger.hrl"). --include_lib("emqx_plugin_libs/include/emqx_st_statistics.hrl"). - --logger_header("[SLOW TOPICS]"). - --export([ start_link/1, on_publish_done/3, enable/0 - , disable/0, clear_history/0 - ]). - -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --compile(nowarn_unused_type). - --type state() :: #{ config := proplist:proplist() - , period := pos_integer() - , last_tick_at := pos_integer() - , counter := counters:counters_ref() - , enable := boolean() - }. - --type log() :: #{ topic := emqx_types:topic() - , count := pos_integer() - , average := float() - }. - --type window_log() :: #{ last_tick_at := pos_integer() - , logs := [log()] - }. - --record(slow_log, { topic :: emqx_types:topic() - , count :: integer() %% 0 will be used in initial value - , elapsed :: integer() - }). - --type message() :: #message{}. - --import(proplists, [get_value/2]). - --define(NOW, erlang:system_time(millisecond)). --define(QUOTA_IDX, 1). - --type slow_log() :: #slow_log{}. --type top_k_map() :: #{emqx_types:topic() => top_k()}. - --type publish_done_env() :: #{ ignore_before_create := boolean() - , threshold := pos_integer() - , counter := counters:counters_ref() - }. - --type publish_done_args() :: #{session_rebirth_time => pos_integer()}. - --ifdef(TEST). --define(TOPK_ACCESS, public). --else. --define(TOPK_ACCESS, protected). --endif. - -%% erlang term order -%% number < atom < reference < fun < port < pid < tuple < list < bit string - -%% ets ordered_set is ascending by term order - -%%-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- -%% @doc Start the st_statistics --spec(start_link(Env :: list()) -> emqx_types:startlink_ret()). -start_link(Env) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). - --spec on_publish_done(message(), publish_done_args(), publish_done_env()) -> ok. -on_publish_done(#message{timestamp = Timestamp}, - #{session_rebirth_time := Created}, - #{ignore_before_create := IgnoreBeforeCreate}) - when IgnoreBeforeCreate, Timestamp < Created -> - ok; - -on_publish_done(#message{timestamp = Timestamp} = Msg, - _, - #{threshold := Threshold, counter := Counter}) -> - case ?NOW - Timestamp of - Elapsed when Elapsed > Threshold -> - case get_log_quota(Counter) of - true -> - update_log(Msg, Elapsed); - _ -> - ok - end; - _ -> - ok - end. - -clear_history() -> - gen_server:call(?MODULE, ?FUNCTION_NAME). - -enable() -> - gen_server:call(?MODULE, {enable, true}). - -disable() -> - gen_server:call(?MODULE, {enable, false}). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Env]) -> - erlang:process_flag(trap_exit, true), - init_log_tab(Env), - init_topk_tab(Env), - notification_tick(Env), - Counter = counters:new(1, [write_concurrency]), - set_log_quota(Env, Counter), - Threshold = get_value(threshold_time, Env), - IgnoreBeforeCreate = get_value(ignore_before_create, Env), - load(IgnoreBeforeCreate, Threshold, Counter), - {ok, #{config => Env, - period => 1, - last_tick_at => ?NOW, - counter => Counter, - enable => true}}. - -handle_call({enable, Enable}, _From, - #{config := Cfg, counter := Counter, enable := IsEnable} = State) -> - State2 = case Enable of - IsEnable -> - State; - true -> - Threshold = get_value(threshold_time, Cfg), - IgnoreBeforeCreate = get_value(ignore_before_create, Cfg), - load(IgnoreBeforeCreate, Threshold, Counter), - State#{enable := true}; - _ -> - unload(), - State#{enable := false} - end, - {reply, ok, State2}; - -handle_call(clear_history, _, State) -> - ets:delete_all_objects(?TOPK_TAB), - {reply, ok, State}; - -handle_call(Req, _From, State) -> - ?LOG(error, "Unexpected call: ~p", [Req]), - {reply, ignored, State}. - -handle_cast(Msg, State) -> - ?LOG(error, "Unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info(notification_tick, #{config := Cfg, period := Period} = State) -> - notification_tick(Cfg), - do_notification(State), - {noreply, State#{last_tick_at := ?NOW, - period := Period + 1}}; - -handle_info(Info, State) -> - ?LOG(error, "Unexpected info: ~p", [Info]), - {noreply, State}. - -terminate(_Reason, _) -> - unload(), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- -notification_tick(Env) -> - TimeWindow = get_value(time_window, Env), - erlang:send_after(TimeWindow, self(), ?FUNCTION_NAME). - -init_log_tab(_) -> - ?LOG_TAB = ets:new(?LOG_TAB, [ set, public, named_table - , {keypos, #slow_log.topic}, {write_concurrency, true} - , {read_concurrency, true} - ]). - -init_topk_tab(_) -> - ?TOPK_TAB = ets:new(?TOPK_TAB, [ set, ?TOPK_ACCESS, named_table - , {keypos, #top_k.rank}, {write_concurrency, false} - , {read_concurrency, true} - ]). - --spec get_log_quota(counters:counters_ref()) -> boolean(). -get_log_quota(Counter) -> - case counters:get(Counter, ?QUOTA_IDX) of - Quota when Quota > 0 -> - counters:sub(Counter, ?QUOTA_IDX, 1), - true; - _ -> - false - end. - --spec set_log_quota(proplists:proplist(), counters:counters_ref()) -> ok. -set_log_quota(Cfg, Counter) -> - MaxLogNum = get_value(max_log_num, Cfg), - counters:put(Counter, ?QUOTA_IDX, MaxLogNum). - --spec update_log(message(), pos_integer()) -> ok. -update_log(#message{topic = Topic}, Elapsed) -> - _ = ets:update_counter(?LOG_TAB, - Topic, - [{#slow_log.count, 1}, {#slow_log.elapsed, Elapsed}], - #slow_log{topic = Topic, - count = 0, - elapsed = 0}), - ok. - --spec do_notification(state()) -> true. -do_notification(#{last_tick_at := TickTime, - config := Cfg, - period := Period, - counter := Counter}) -> - Logs = ets:tab2list(?LOG_TAB), - ets:delete_all_objects(?LOG_TAB), - start_publish(Logs, TickTime, Cfg), - set_log_quota(Cfg, Counter), - MaxRecord = get_value(top_k_num, Cfg), - update_topk(Logs, MaxRecord, Period). - --spec update_topk(list(slow_log()), pos_integer(), pos_integer()) -> true. -update_topk(Logs, MaxRecord, Period) -> - TopkMap = get_topk_map(Period), - TopkMap2 = update_topk_map(Logs, Period, TopkMap), - SortFun = fun(A, B) -> - A#top_k.average_count > B#top_k.average_count - end, - TopkL = lists:sort(SortFun, maps:values(TopkMap2)), - TopkL2 = lists:sublist(TopkL, 1, MaxRecord), - update_topk_tab(TopkL2). - --spec update_topk_map(list(slow_log()), pos_integer(), top_k_map()) -> top_k_map(). -update_topk_map([#slow_log{topic = Topic, - count = LogTimes, - elapsed = LogElapsed} | T], Period, TopkMap) -> - case maps:get(Topic, TopkMap, undefined) of - undefined -> - Record = #top_k{rank = 1, - topic = Topic, - average_count = LogTimes, - average_elapsed = LogElapsed}, - TopkMap2 = TopkMap#{Topic => Record}, - update_topk_map(T, Period, TopkMap2); - #top_k{average_count = AvgCount, - average_elapsed = AvgElapsed} = Record -> - NewPeriod = Period + 1, - %% (a + b) / c = a / c + b / c - %% average_count(elapsed) dived NewPeriod in function get_topk_maps - AvgCount2 = AvgCount + LogTimes / NewPeriod, - AvgElapsed2 = AvgElapsed + LogElapsed / NewPeriod, - Record2 = Record#top_k{average_count = AvgCount2, - average_elapsed = AvgElapsed2}, - update_topk_map(T, Period, TopkMap#{Topic := Record2}) - end; - -update_topk_map([], _, TopkMap) -> - TopkMap. - --spec update_topk_tab(list(top_k())) -> true. -update_topk_tab(Records) -> - Zip = fun(Rank, Item) -> Item#top_k{rank = Rank} end, - Len = erlang:length(Records), - RankedTopics = lists:zipwith(Zip, lists:seq(1, Len), Records), - ets:insert(?TOPK_TAB, RankedTopics). - -start_publish(Logs, TickTime, Cfg) -> - emqx_pool:async_submit({fun do_publish/3, [Logs, TickTime, Cfg]}). - -do_publish([], _, _) -> - ok; - -do_publish(Logs, TickTime, Cfg) -> - BatchSize = get_value(notice_batch_size, Cfg), - do_publish(Logs, BatchSize, TickTime, Cfg, []). - -do_publish([Log | T], Size, TickTime, Cfg, Cache) when Size > 0 -> - Cache2 = [convert_to_notice(Log) | Cache], - do_publish(T, Size - 1, TickTime, Cfg, Cache2); - -do_publish(Logs, Size, TickTime, Cfg, Cache) when Size =:= 0 -> - publish(TickTime, Cfg, Cache), - do_publish(Logs, TickTime, Cfg); - -do_publish([], _, TickTime, Cfg, Cache) -> - publish(TickTime, Cfg, Cache), - ok. - -convert_to_notice(#slow_log{topic = Topic, - count = Count, - elapsed = Elapsed}) -> - #{topic => Topic, - count => Count, - average => Elapsed / Count}. - -publish(TickTime, Cfg, Notices) -> - WindowLog = #{last_tick_at => TickTime, - logs => Notices}, - Payload = emqx_json:encode(WindowLog), - _ = emqx:publish(#message{ id = emqx_guid:gen() - , qos = get_value(notice_qos, Cfg) - , from = ?MODULE - , topic = get_topic(Cfg) - , payload = Payload - , timestamp = ?NOW - }), - ok. - -load(IgnoreBeforeCreate, Threshold, Counter) -> - _ = emqx:hook('message.publish_done', - fun ?MODULE:on_publish_done/3, - [#{ignore_before_create => IgnoreBeforeCreate, - threshold => Threshold, - counter => Counter} - ]), - ok. - -unload() -> - emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3). - --spec get_topic(proplists:proplist()) -> binary(). -get_topic(Cfg) -> - case get_value(notice_topic, Cfg) of - Topic when is_binary(Topic) -> - Topic; - Topic -> - erlang:list_to_binary(Topic) - end. - --spec get_topk_map(pos_integer()) -> top_k_map(). -get_topk_map(Period) -> - Size = ets:info(?TOPK_TAB, size), - get_topk_map(1, Size, Period, #{}). - --spec get_topk_map(pos_integer(), - non_neg_integer(), pos_integer(), top_k_map()) -> top_k_map(). -get_topk_map(Index, Size, _, TopkMap) when Index > Size -> - TopkMap; -get_topk_map(Index, Size, Period, TopkMap) -> - [#top_k{topic = Topic, - average_count = AvgCount, - average_elapsed = AvgElapsed} = R] = ets:lookup(?TOPK_TAB, Index), - NewPeriod = Period + 1, - TotalTimes = AvgCount * Period, - AvgCount2 = TotalTimes / NewPeriod, - AvgElapsed2 = TotalTimes * AvgElapsed / NewPeriod, - TopkMap2 = TopkMap#{Topic => R#top_k{average_count = AvgCount2, - average_elapsed = AvgElapsed2}}, - get_topk_map(Index + 1, Size, Period, TopkMap2). diff --git a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics_api.erl b/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics_api.erl deleted file mode 100644 index dc74dcf36..000000000 --- a/apps/emqx_plugin_libs/src/emqx_st_statistics/emqx_st_statistics_api.erl +++ /dev/null @@ -1,83 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_st_statistics_api). - --rest_api(#{name => clear_history, - method => 'DELETE', - path => "/slow_topic", - func => clear_history, - descr => "Clear current data and re count slow topic"}). - --rest_api(#{name => get_history, - method => 'GET', - path => "/slow_topic", - func => get_history, - descr => "Get slow topics statistics record data"}). - --export([ clear_history/2 - , get_history/2 - ]). - --include("include/emqx_st_statistics.hrl"). - --import(minirest, [return/1]). - -%%-------------------------------------------------------------------- -%% HTTP API -%%-------------------------------------------------------------------- - -clear_history(_Bindings, _Params) -> - ok = emqx_st_statistics:clear_history(), - return(ok). - -get_history(_Bindings, Params) -> - PageT = proplists:get_value(<<"_page">>, Params), - LimitT = proplists:get_value(<<"_limit">>, Params), - Page = erlang:binary_to_integer(PageT), - Limit = erlang:binary_to_integer(LimitT), - Start = (Page - 1) * Limit + 1, - Size = ets:info(?TOPK_TAB, size), - End = Start + Limit - 1, - {HasNext, Count, Infos} = get_history(Start, End, Size), - return({ok, #{meta => #{page => Page, - limit => Limit, - hasnext => HasNext, - count => Count}, - data => Infos}}). - - -get_history(Start, _End, Size) when Start > Size -> - {false, 0, []}; - -get_history(Start, End, Size) when End > Size -> - get_history(Start, Size, Size); - -get_history(Start, End, Size) -> - Fold = fun(Rank, Acc) -> - [#top_k{topic = Topic - , average_count = Count - , average_elapsed = Elapsed}] = ets:lookup(?TOPK_TAB, Rank), - - Info = [ {rank, Rank} - , {topic, Topic} - , {count, Count} - , {elapsed, Elapsed}], - - [Info | Acc] - end, - Infos = lists:foldl(Fold, [], lists:seq(Start, End)), - {End < Size, End - Start + 1, Infos}. diff --git a/etc/emqx.conf b/etc/emqx.conf index 78d383930..de5c062a4 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2217,47 +2217,34 @@ module.presence.qos = 1 ## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 ##-------------------------------------------------------------------- -## Slow Topic Statistics Module +## Slow Subscribers Statistics Module -## Threshold time of slow topics statistics -## -## Default: 10 seconds -#module.st_statistics.threshold_time = 10S - -## ignore the messages that before than session created -## -## Default: true -#module.st_statistics.ignore_before_create = true - -## Time window of slow topics statistics +## the expire time of the record which in topk ## ## Value: 5 minutes -#module.st_statistics.time_window = 5M +#module.slow_subs.expire_interval = 5m -## Maximum of slow topics log, log will clear when enter new time window +## maximum number of Top-K record ## -## Value: 500 -#module.st_statistics.max_log_num = 500 +## Value: 10 +#module.slow_subs.top_k_num = 10 -## Top-K record for slow topics, update from logs +## enable notification +## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval +## publish is disabled if set to 0s. ## -## Value: 500 -#module.st_statistics.top_k_num = 500 - -## Topic of notification -## -## Defaut: $slow_topics -#module.st_statistics.notice_topic = $slow_topics +## Defaut: 0s +#module.slow_subs.notice_interval = 0s ## QoS of notification message in notice topic ## ## Defaut: 0 -#module.st_statistics.notice_qos = 0 +#module.slow_subs.notice_qos = 0 ## Maximum information number in one notification ## -## Default: 500 -#module.st_statistics.notice_batch_size = 500 +## Default: 100 +#module.slow_subs.notice_batch_size = 100 ## CONFIG_SECTION_END=modules ================================================== diff --git a/lib-ce/emqx_modules/src/emqx_mod_st_statistics.erl b/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl similarity index 90% rename from lib-ce/emqx_modules/src/emqx_mod_st_statistics.erl rename to lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl index d6796122c..b9117fe8b 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_st_statistics.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_slow_subs.erl @@ -14,14 +14,14 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_st_statistics). +-module(emqx_mod_slow_subs). -behaviour(emqx_gen_mod). -include_lib("include/emqx.hrl"). -include_lib("include/logger.hrl"). --logger_header("[SLOW TOPICS]"). +-logger_header("[SLOW Subs]"). %% emqx_gen_mod callbacks -export([ load/1 @@ -29,7 +29,7 @@ , description/0 ]). --define(LIB, emqx_st_statistics). +-define(LIB, emqx_slow_subs). %%-------------------------------------------------------------------- %% Load/Unload @@ -46,4 +46,4 @@ unload(_Env) -> ok. description() -> - "EMQ X Slow Topic Statistics Module". + "EMQ X Slow Subscribers Statistics Module". diff --git a/lib-ce/emqx_modules/src/emqx_mod_sup.erl b/lib-ce/emqx_modules/src/emqx_mod_sup.erl index 109564f65..4c8f54679 100644 --- a/lib-ce/emqx_modules/src/emqx_mod_sup.erl +++ b/lib-ce/emqx_modules/src/emqx_mod_sup.erl @@ -69,6 +69,7 @@ stop_child(ChildId) -> init([]) -> ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]), + emqx_slow_subs:init_topk_tab(), {ok, {{one_for_one, 10, 100}, []}}. %%-------------------------------------------------------------------- diff --git a/lib-ce/emqx_modules/test/emqx_st_statistics_SUITE.erl b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl similarity index 53% rename from lib-ce/emqx_modules/test/emqx_st_statistics_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl index 8df07f8b8..46f144a57 100644 --- a/lib-ce/emqx_modules/test/emqx_st_statistics_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_slow_subs_SUITE.erl @@ -14,18 +14,18 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_st_statistics_SUITE). +-module(emqx_slow_subs_SUITE). -compile(export_all). -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include("include/emqx_mqtt.hrl"). -include_lib("include/emqx.hrl"). %-define(LOGT(Format, Args), ct:pal(Format, Args)). --define(LOG_TAB, emqx_st_statistics_log). --define(TOPK_TAB, emqx_st_statistics_topk). +-define(TOPK_TAB, emqx_slow_subs_topk). -define(NOW, erlang:system_time(millisecond)). all() -> emqx_ct:all(?MODULE). @@ -39,11 +39,11 @@ end_per_suite(Config) -> Config. init_per_testcase(_, Config) -> - emqx_mod_st_statistics:load(base_conf()), + emqx_mod_slow_subs:load(base_conf()), Config. end_per_testcase(_, _) -> - emqx_mod_st_statistics:unload(undefined), + emqx_mod_slow_subs:unload([]), ok. %%-------------------------------------------------------------------- @@ -51,62 +51,74 @@ end_per_testcase(_, _) -> %%-------------------------------------------------------------------- t_log_and_pub(_) -> %% Sub topic first - SubBase = "/test", - emqx:subscribe("$slow_topics"), - Clients = start_client(SubBase), + Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], + Clients = start_client(Subs), + emqx:subscribe("$SYS/brokers/+/slow_subs"), timer:sleep(1000), Now = ?NOW, %% publish - ?assert(ets:info(?LOG_TAB, size) =:= 0), + lists:foreach(fun(I) -> - Topic = list_to_binary(io_lib:format("~s~p", [SubBase, I])), - Msg = emqx_message:make(Topic, <<"Hello">>), - emqx:publish(Msg#message{timestamp = Now - 1000}) + Topic = list_to_binary(io_lib:format("/test1/~p", [I])), + Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 500}) end, lists:seq(1, 10)), - timer:sleep(2400), + lists:foreach(fun(I) -> + Topic = list_to_binary(io_lib:format("/test2/~p", [I])), + Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>), + emqx:publish(Msg#message{timestamp = Now - 500}) + end, + lists:seq(1, 10)), - ?assert(ets:info(?LOG_TAB, size) =:= 0), - ?assert(ets:info(?TOPK_TAB, size) =:= 3), - try_receive(3), - try_receive(2), + timer:sleep(1000), + Size = ets:info(?TOPK_TAB, size), + %% some time record maybe delete due to it expired + ?assert(Size =< 6 andalso Size >= 4), + + timer:sleep(1500), + Recs = try_receive([]), + RecSum = lists:sum(Recs), + ?assert(RecSum >= 5), + ?assert(lists:all(fun(E) -> E =< 3 end, Recs)), + + timer:sleep(2000), + ?assert(ets:info(?TOPK_TAB, size) =:= 0), [Client ! stop || Client <- Clients], ok. - base_conf() -> - [{top_k_num, 3}, - {threshold_time, 10}, - {notice_qos, 0}, - {notice_batch_size, 3}, - {notice_topic,"$slow_topics"}, - {time_window, 2000}, - {max_log_num, 5}]. + [ {top_k_num, 5} + , {expire_interval, timer:seconds(3)} + , {notice_interval, 1500} + , {notice_qos, 0} + , {notice_batch_size, 3} + ]. -start_client(Base) -> - [spawn(fun() -> - Topic = list_to_binary(io_lib:format("~s~p", [Base, I])), - client(Topic) - end) - || I <- lists:seq(1, 10)]. +start_client(Subs) -> + [spawn(fun() -> client(I, Subs) end) || I <- lists:seq(1, 10)]. -client(Topic) -> +client(I, Subs) -> {ok, C} = emqtt:start_link([{host, "localhost"}, - {clientid, Topic}, + {clientid, io_lib:format("slow_subs_~p", [I])}, {username, <<"plain">>}, {password, <<"plain">>}]), {ok, _} = emqtt:connect(C), - {ok, _, _} = emqtt:subscribe(C, Topic), + + Len = erlang:length(Subs), + Sub = lists:nth(I rem Len + 1, Subs), + _ = emqtt:subscribe(C, Sub), + receive stop -> ok end. -try_receive(L) -> +try_receive(Acc) -> receive {deliver, _, #message{payload = Payload}} -> #{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]), - ?assertEqual(length(Logs), L) + try_receive([length(Logs) | Acc]) after 500 -> - ?assert(false) + Acc end. diff --git a/lib-ce/emqx_modules/test/emqx_st_statistics_api_SUITE.erl b/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl similarity index 59% rename from lib-ce/emqx_modules/test/emqx_st_statistics_api_SUITE.erl rename to lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl index 0fb9e904b..2efc7230a 100644 --- a/lib-ce/emqx_modules/test/emqx_st_statistics_api_SUITE.erl +++ b/lib-ce/emqx_modules/test/emqx_slow_subs_api_SUITE.erl @@ -12,9 +12,9 @@ %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. -%%-------------------------------------------------------------------- +%%--------------------------------------------------------------------n --module(emqx_st_statistics_api_SUITE). +-module(emqx_slow_subs_api_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -24,7 +24,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx_management/include/emqx_mgmt.hrl"). --include_lib("emqx_plugin_libs/include/emqx_st_statistics.hrl"). +-include_lib("emqx_plugin_libs/include/emqx_slow_subs.hrl"). -define(CONTENT_TYPE, "application/x-www-form-urlencoded"). @@ -33,6 +33,7 @@ -define(API_VERSION, "v4"). -define(BASE_PATH, "api"). +-define(NOW, erlang:system_time(millisecond)). all() -> emqx_ct:all(?MODULE). @@ -48,78 +49,52 @@ end_per_suite(Config) -> Config. init_per_testcase(_, Config) -> - emqx_mod_st_statistics:load(emqx_st_statistics_SUITE:base_conf()), + emqx_mod_slow_subs:load(base_conf()), Config. end_per_testcase(_, Config) -> - emqx_mod_st_statistics:unload(undefined), + emqx_mod_slow_subs:unload([]), Config. +base_conf() -> + [ {top_k_num, 5} + , {expire_interval, timer:seconds(60)} + , {notice_interval, 0} + , {notice_qos, 0} + , {notice_batch_size, 3} + ]. + t_get_history(_) -> - ets:insert(?TOPK_TAB, #top_k{rank = 1, - topic = <<"test">>, - average_count = 12, - average_elapsed = 1500}), + Now = ?NOW, + Each = fun(I) -> + ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), + ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId), + type = average, + last_update_time = Now}) + end, - {ok, Data} = request_api(get, api_path(["slow_topic"]), "_page=1&_limit=10", + lists:foreach(Each, lists:seq(1, 5)), + + {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10", auth_header_()), + #{meta := Meta, data := [First | _]} = decode(Data), - ShouldRet = #{meta => #{page => 1, - limit => 10, - hasnext => false, - count => 1}, - data => [#{topic => <<"test">>, - rank => 1, - elapsed => 1500, - count => 12}], - code => 0}, + RMeta = #{page => 1, limit => 10, count => 5}, + ?assertEqual(RMeta, Meta), - Ret = decode(Data), + RFirst = #{clientid => <<"test_5">>, + latency => 5, + type => <<"average">>, + last_update_time => Now}, - ?assertEqual(ShouldRet, Ret). - -t_rank_range(_) -> - Insert = fun(Rank) -> - ets:insert(?TOPK_TAB, - #top_k{rank = Rank, - topic = <<"test">>, - average_count = 12, - average_elapsed = 1500}) - end, - lists:foreach(Insert, lists:seq(1, 15)), - - timer:sleep(100), - - {ok, Data} = request_api(get, api_path(["slow_topic"]), "_page=1&_limit=10", - auth_header_()), - - Meta1 = #{page => 1, limit => 10, hasnext => true, count => 10}, - Ret1 = decode(Data), - ?assertEqual(Meta1, maps:get(meta, Ret1)), - - %% End > Size - {ok, Data2} = request_api(get, api_path(["slow_topic"]), "_page=2&_limit=10", - auth_header_()), - - Meta2 = #{page => 2, limit => 10, hasnext => false, count => 5}, - Ret2 = decode(Data2), - ?assertEqual(Meta2, maps:get(meta, Ret2)), - - %% Start > Size - {ok, Data3} = request_api(get, api_path(["slow_topic"]), "_page=3&_limit=10", - auth_header_()), - - Meta3 = #{page => 3, limit => 10, hasnext => false, count => 0}, - Ret3 = decode(Data3), - ?assertEqual(Meta3, maps:get(meta, Ret3)). + ?assertEqual(RFirst, First). t_clear(_) -> - ets:insert(?TOPK_TAB, #top_k{rank = 1, - topic = <<"test">>, - average_count = 12, - average_elapsed = 1500}), + ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>), + type = average, + last_update_time = ?NOW}), - {ok, _} = request_api(delete, api_path(["slow_topic"]), [], + {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], auth_header_()), ?assertEqual(0, ets:info(?TOPK_TAB, size)). diff --git a/priv/emqx.schema b/priv/emqx.schema index 15f0324cd..61a98f824 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1007,6 +1007,18 @@ end}. {datatype, {duration, s}} ]}. +%% @doc the number of smaples for calculate the average latency of delivery +{mapping, "zone.$name.latency_samples", "emqx.zones", [ + {default, 10}, + {datatype, integer} +]}. + +%% @doc Threshold for slow subscription statistics +{mapping, "zone.$name.latency_stats_threshold", "emqx.zones", [ + {default, "100ms"}, + {datatype, {duration, ms}} +]}. + %% @doc Max Packets that Awaiting PUBREL, 0 means no limit {mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [ {default, 0}, @@ -2218,43 +2230,28 @@ end}. {datatype, string} ]}. -{mapping, "module.st_statistics.threshold_time", "emqx.modules", [ - {default, "10s"}, +{mapping, "module.slow_subs.expire_interval", "emqx.modules", [ + {default, "5m"}, {datatype, {duration, ms}} ]}. -{mapping, "module.st_statistics.ignore_before_create", "emqx.modules", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{mapping, "module.st_statistics.time_window", "emqx.modules", [ - {default, "5M"}, - {datatype, {duration, ms}} -]}. - -{mapping, "module.st_statistics.max_log_num", "emqx.modules", [ +{mapping, "module.slow_subs.top_k_num", "emqx.modules", [ {default, 500}, {datatype, integer} ]}. -{mapping, "module.st_statistics.top_k_num", "emqx.modules", [ - {default, 500}, - {datatype, integer} +{mapping, "module.slow_subs.notice_interval", "emqx.modules", [ + {default, "0s"}, + {datatype, {duration, ms}} ]}. -{mapping, "module.st_statistics.notice_topic", "emqx.modules", [ - {default, "$slow_topics"}, - {datatype, string} -]}. - -{mapping, "module.st_statistics.notice_qos", "emqx.modules", [ +{mapping, "module.slow_subs.notice_qos", "emqx.modules", [ {default, 0}, {datatype, integer}, {validators, ["range:0-1"]} ]}. -{mapping, "module.st_statistics.notice_batch_size", "emqx.modules", [ +{mapping, "module.slow_subs.notice_batch_size", "emqx.modules", [ {default, 500}, {datatype, integer} ]}. @@ -2283,8 +2280,8 @@ end}. end, TotalRules) end, - SlowTopic = fun() -> - List = cuttlefish_variable:filter_by_prefix("module.st_statistics", Conf), + SlowSubs = fun() -> + List = cuttlefish_variable:filter_by_prefix("module.slow_subs", Conf), [{erlang:list_to_atom(Key), Value} || {[_, _, Key], Value} <- List] end, @@ -2295,7 +2292,7 @@ end}. [{emqx_mod_topic_metrics, []}], [{emqx_mod_delayed, []}], [{emqx_mod_trace, []}], - [{emqx_mod_st_statistics, SlowTopic()}], + [{emqx_mod_slow_subs, SlowSubs()}], [{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}] ]) end}. diff --git a/rebar.config.erl b/rebar.config.erl index 1000a2c92..6c14b6ae8 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -106,7 +106,7 @@ test_plugins() -> test_deps() -> [ {bbmustache, "1.10.0"} - , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.9"}}} + , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.11"}}} , meck ]. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 6122982ae..3e1e48a13 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -95,6 +95,8 @@ -import(emqx_zone, [get_env/3]). -record(session, { + %% Client's id + clientid :: emqx_types:clientid(), %% Client’s Subscriptions. subscriptions :: map(), %% Max subscriptions allowed @@ -121,8 +123,15 @@ %% Awaiting PUBREL Timeout (Unit: millsecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer() - }). + created_at :: pos_integer(), + %% Message deliver latency stats + latency_stats :: emqx_message_latency_stats:stats() + }). + +%% in the previous code, we will replace the message record with the pubrel atom +%% in the pubrec function, this will lose the creation time of the message, +%% but now we need this time to calculate latency, so now pubrel atom is changed to this record +-record(pubrel_await, {timestamp :: non_neg_integer()}). -type(session() :: #session{}). @@ -148,19 +157,26 @@ mqueue_dropped, next_pkt_id, awaiting_rel_cnt, - awaiting_rel_max + awaiting_rel_max, + latency_stats ]). -define(DEFAULT_BATCH_N, 1000). +-ifdef(TEST). +-define(GET_CLIENT_ID(C), maps:get(clientid, C, <<>>)). +-else. +-define(GET_CLIENT_ID(C), maps:get(clientid, C)). +-endif. %%-------------------------------------------------------------------- %% Init a Session %%-------------------------------------------------------------------- -spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()). -init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> - #session{max_subscriptions = get_env(Zone, max_subscriptions, 0), +init(#{zone := Zone} = CInfo, #{receive_maximum := MaxInflight}) -> + #session{clientid = ?GET_CLIENT_ID(CInfo), + max_subscriptions = get_env(Zone, max_subscriptions, 0), subscriptions = #{}, upgrade_qos = get_env(Zone, upgrade_qos, false), inflight = emqx_inflight:new(MaxInflight), @@ -170,7 +186,8 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> awaiting_rel = #{}, max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)), - created_at = erlang:system_time(millisecond) + created_at = erlang:system_time(millisecond), + latency_stats = emqx_message_latency_stats:new(Zone) }. %% @private init mq @@ -227,7 +244,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout div 1000; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt. + CreatedAt; +info(latency_stats, #session{latency_stats = Stats}) -> + emqx_message_latency_stats:latency(Stats). %% @doc Get stats of the session. -spec(stats(session()) -> emqx_types:stats()). @@ -317,13 +336,12 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, -> {ok, emqx_types:message(), session()} | {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}). -puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) -> +puback(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - emqx:run_hook('message.publish_done', - [Msg, #{session_rebirth_time => CreatedAt}]), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - return_with(Msg, dequeue(Session#session{inflight = Inflight1})); + Session2 = update_latency(Msg, Session), + return_with(Msg, dequeue(Session2#session{inflight = Inflight1})); {value, {_Pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -343,15 +361,13 @@ return_with(Msg, {ok, Publishes, Session}) -> -spec(pubrec(emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}). -pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) -> +pubrec(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - %% execute hook here, because message record will be replaced by pubrel - emqx:run_hook('message.publish_done', - [Msg, #{session_rebirth_time => CreatedAt}]), - Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight), + Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}), + Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; - {value, {pubrel, _Ts}} -> + {value, {_PUBREL, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -380,9 +396,10 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> | {error, emqx_types:reason_code()}). pubcomp(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {pubrel, _Ts}} -> + {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) -> + Session2 = update_latency(Pubrel, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(Session#session{inflight = Inflight1}); + dequeue(Session2#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -550,11 +567,16 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> %%-------------------------------------------------------------------- -spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). -retry(Session = #session{inflight = Inflight}) -> +retry(Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; - false -> retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), - [], erlang:system_time(millisecond), Session) + false -> + Now = erlang:system_time(millisecond), + Session2 = check_expire_latency(Now, RetryInterval, Session), + retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), + [], + Now, + Session2) end. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) -> @@ -581,8 +603,8 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) - {[{PacketId, Msg1} | Acc], Inflight1} end; -retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> - Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), +retry_delivery(PacketId, Pubrel, Now, Acc, Inflight) -> + Inflight1 = emqx_inflight:update(PacketId, {Pubrel, Now}, Inflight), {[{pubrel, PacketId} | Acc], Inflight1}. %%-------------------------------------------------------------------- @@ -626,10 +648,10 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec(replay(session()) -> {ok, replies(), session()}). replay(Session = #session{inflight = Inflight}) -> - Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) -> - {pubrel, PacketId}; + Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) -> + {pubrel, PacketId}; ({PacketId, {Msg, _Ts}}) -> - {PacketId, emqx_message:set_flag(dup, true, Msg)} + {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)), case dequeue(Session) of {ok, NSession} -> {ok, Pubs, NSession}; @@ -677,6 +699,35 @@ next_pkt_id(Session = #session{next_pkt_id = ?MAX_PACKET_ID}) -> next_pkt_id(Session = #session{next_pkt_id = Id}) -> Session#session{next_pkt_id = Id + 1}. +%%-------------------------------------------------------------------- +%% Message Latency Stats +%%-------------------------------------------------------------------- +update_latency(Msg, + #session{clientid = ClientId, + latency_stats = Stats, + created_at = CreateAt} = S) -> + case get_birth_timestamp(Msg, CreateAt) of + 0 -> S; + Ts -> + Latency = erlang:system_time(millisecond) - Ts, + Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats), + S#session{latency_stats = Stats2} + end. + +check_expire_latency(Now, Interval, + #session{clientid = ClientId, latency_stats = Stats} = S) -> + Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats), + S#session{latency_stats = Stats2}. + +get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> + Ts; + +get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> + Ts; + +get_birth_timestamp(_, _) -> + 0. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- diff --git a/src/emqx_slow_subs/emqx_message_latency_stats.erl b/src/emqx_slow_subs/emqx_message_latency_stats.erl new file mode 100644 index 000000000..f9661ab91 --- /dev/null +++ b/src/emqx_slow_subs/emqx_message_latency_stats.erl @@ -0,0 +1,105 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_message_latency_stats). + +%% API +-export([ new/1, new/2, update/3 + , check_expire/4, latency/1]). + +-define(NOW, erlang:system_time(millisecond)). +-define(MINIMUM_INSERT_INTERVAL, 1000). +-define(MINIMUM_THRESHOLD, 100). + +-opaque stats() :: #{ threshold := number() + , ema := emqx_moving_average:ema() + , last_update_time := timestamp() + , last_access_time := timestamp() %% timestamp of last access top-k + , last_insert_value := non_neg_integer() + }. + +-type timestamp() :: non_neg_integer(). +-type timespan() :: number(). + +-type latency_type() :: average + | expire. + +-import(emqx_zone, [get_env/3]). + +-export_type([stats/0, latency_type/0]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +-spec new(emqx_types:zone()) -> stats(). +new(Zone) -> + Samples = get_env(Zone, latency_samples, 1), + Threshold = get_env(Zone, latency_stats_threshold, ?MINIMUM_THRESHOLD), + new(Samples, Threshold). + +-spec new(non_neg_integer(), number()) -> stats(). +new(SamplesT, ThresholdT) -> + Samples = erlang:max(1, SamplesT), + Threshold = erlang:max(?MINIMUM_THRESHOLD, ThresholdT), + #{ ema => emqx_moving_average:new(exponential, #{period => Samples}) + , threshold => Threshold + , last_update_time => 0 + , last_access_time => 0 + , last_insert_value => 0 + }. + +-spec update(emqx_types:clientid(), number(), stats()) -> stats(). +update(ClientId, Val, #{ema := EMA} = Stats) -> + Now = ?NOW, + #{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA), + Stats2 = call_hook(ClientId, Now, average, Latency, Stats), + Stats2#{ ema := EMA2 + , last_update_time := ?NOW}. + +-spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats(). +check_expire(_, Now, Interval, #{last_update_time := LUT} = S) + when LUT >= Now - Interval -> + S; + +check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) -> + Latency = Now - LUT, + call_hook(ClientId, Now, expire, Latency, S). + +-spec latency(stats()) -> number(). +latency(#{ema := #{average := Average}}) -> + Average. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +-spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats(). +call_hook(_, Now, _, _, #{last_access_time := LIT} = S) + when LIT >= Now - ?MINIMUM_INSERT_INTERVAL -> + S; + +call_hook(_, _, _, Latency, #{threshold := Threshold} = S) + when Latency =< Threshold -> + S; + +call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) -> + Arg = #{clientid => ClientId, + latency => erlang:floor(Latency), + type => Type, + last_insert_value => LIV, + update_time => Now}, + emqx:run_hook('message.slow_subs_stats', [Arg]), + Stats#{last_insert_value := Latency, + last_access_time := Now}. diff --git a/src/emqx_slow_subs/emqx_moving_average.erl b/src/emqx_slow_subs/emqx_moving_average.erl new file mode 100644 index 000000000..64c73f987 --- /dev/null +++ b/src/emqx_slow_subs/emqx_moving_average.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @see https://en.wikipedia.org/wiki/Moving_average + +-module(emqx_moving_average). + +%% API +-export([new/0, new/1, new/2, update/2]). + +-type type() :: cumulative + | exponential. + +-type ema() :: #{ type := exponential + , average := 0 | float() + , coefficient := float() + }. + +-type cma() :: #{ type := cumulative + , average := 0 | float() + , count := non_neg_integer() + }. + +-type moving_average() :: ema() + | cma(). + +-define(DEF_EMA_ARG, #{period => 10}). +-define(DEF_AVG_TYPE, exponential). + +-export_type([type/0, moving_average/0, ema/0, cma/0]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- +-spec new() -> moving_average(). +new() -> + new(?DEF_AVG_TYPE, #{}). + +-spec new(type()) -> moving_average(). +new(Type) -> + new(Type, #{}). + +-spec new(type(), Args :: map()) -> moving_average(). +new(cumulative, _) -> + #{ type => cumulative + , average => 0 + , count => 0 + }; + +new(exponential, Arg) -> + #{period := Period} = maps:merge(?DEF_EMA_ARG, Arg), + #{ type => exponential + , average => 0 + %% coefficient = 2/(N+1) is a common convention, see the wiki link for details + , coefficient => 2 / (Period + 1) + }. + +-spec update(number(), moving_average()) -> moving_average(). + +update(Val, #{average := 0} = Avg) -> + Avg#{average := Val}; + +update(Val, #{ type := cumulative + , average := Average + , count := Count} = CMA) -> + NewCount = Count + 1, + CMA#{average := (Count * Average + Val) / NewCount, + count := NewCount}; + +update(Val, #{ type := exponential + , average := Average + , coefficient := Coefficient} = EMA) -> + EMA#{average := Coefficient * Val + (1 - Coefficient) * Average}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index cb7c10cae..f838d052a 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -180,7 +180,8 @@ t_puback_with_dequeue(_) -> ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). t_puback_error_packet_id_in_use(_) -> - Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + Now = ts(millisecond), + Inflight = emqx_inflight:insert(1, {{pubrel_await, Now}, Now}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(1, session(#{inflight => Inflight})). @@ -192,10 +193,11 @@ t_pubrec(_) -> Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Msg, Session1} = emqx_session:pubrec(2, Session), - ?assertMatch([{pubrel, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). + ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). t_pubrec_packet_id_in_use_error(_) -> - Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + Now = ts(millisecond), + Inflight = emqx_inflight:insert(1, {{pubrel_await, Now}, Now}, emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubrec(1, session(#{inflight => Inflight})). @@ -211,7 +213,8 @@ t_pubrel_error_packetid_not_found(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(1, session()). t_pubcomp(_) -> - Inflight = emqx_inflight:insert(1, {pubrel, ts(millisecond)}, emqx_inflight:new()), + Now = ts(millisecond), + Inflight = emqx_inflight:insert(1, {{pubrel_await, Now}, Now}, emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Session1} = emqx_session:pubcomp(1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). @@ -260,7 +263,7 @@ t_deliver_qos0(_) -> t_deliver_qos1(_) -> ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end), {ok, Session} = emqx_session:subscribe( - clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), + clientinfo(), <<"t1">>, subopts(#{qos => ?QOS_1}), session()), Delivers = [delivery(?QOS_1, T) || T <- [<<"t1">>, <<"t2">>]], {ok, [{1, Msg1}, {2, Msg2}], Session1} = emqx_session:deliver(Delivers, Session), ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), @@ -373,7 +376,7 @@ mqueue(Opts) -> session() -> session(#{}). session(InitFields) when is_map(InitFields) -> maps:fold(fun(Field, Value, Session) -> - emqx_session:set_field(Field, Value, Session) + emqx_session:set_field(Field, Value, Session) end, emqx_session:init(#{zone => channel}, #{receive_maximum => 0}), InitFields). @@ -396,4 +399,3 @@ ts(second) -> erlang:system_time(second); ts(millisecond) -> erlang:system_time(millisecond). -