From 0c48bd92dba8d20a44ef76de4abd70efe24ffc28 Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 9 Feb 2022 16:51:21 +0800 Subject: [PATCH 1/5] feat: port slow subs from v4.4 --- apps/emqx/src/emqx_session.erl | 106 +++--- .../emqx_message_latency_stats.erl | 120 ------- .../emqx_slow_subs/emqx_moving_average.erl | 90 ----- apps/emqx_slow_subs/etc/emqx_slow_subs.conf | 23 +- .../emqx_slow_subs/include/emqx_slow_subs.hrl | 18 +- apps/emqx_slow_subs/src/emqx_slow_subs.erl | 319 ++++++++---------- .../emqx_slow_subs/src/emqx_slow_subs_api.erl | 79 +++-- .../src/emqx_slow_subs_schema.erl | 19 +- 8 files changed, 279 insertions(+), 495 deletions(-) delete mode 100644 apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl delete mode 100644 apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 460e94014..5e65b67e5 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -134,10 +134,12 @@ latency_stats :: emqx_message_latency_stats:stats() }). -%% in the previous code, we will replace the message record with the pubrel atom -%% in the pubrec function, this will lose the creation time of the message, -%% but now we need this time to calculate latency, so now pubrel atom is changed to this record --record(pubrel_await, {timestamp :: non_neg_integer()}). + +-type inflight_data_phase() :: wait_ack | wait_comp. + +-record(inflight_data, { phase :: inflight_data_phase() + , message :: emqx_types:message() + , timestamp :: non_neg_integer()}). -type(session() :: #session{}). @@ -168,7 +170,6 @@ , next_pkt_id , awaiting_rel_cnt , awaiting_rel_max - , latency_stats ]). -define(DEFAULT_BATCH_N, 1000). @@ -380,11 +381,11 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, | {error, emqx_types:reason_code()}). puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {Msg, _Ts}} when is_record(Msg, message) -> + {value, #inflight_data{phase = wait_ack, message = Msg}} -> + on_delivery_completed(Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - Session2 = update_latency(Msg, Session), - return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1})); - {value, {_Pubrel, _Ts}} -> + return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1})); + {value, _} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -404,11 +405,11 @@ return_with(Msg, {ok, Publishes, Session}) -> | {error, emqx_types:reason_code()}). pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {Msg, _Ts}} when is_record(Msg, message) -> - Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}), + {value, #inflight_data{phase = wait_ack, message = Msg} = Data} -> + Update = Data#inflight_data{phase = wait_comp}, Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; - {value, {_Pubrel, _Ts}} -> + {value, _} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -437,10 +438,10 @@ pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> | {error, emqx_types:reason_code()}). pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) -> - Session2 = update_latency(Pubrel, Session), + {value, #inflight_data{phase = wait_comp, message = Msg}} -> + on_delivery_completed(Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(ClientInfo, Session2#session{inflight = Inflight1}); + dequeue(ClientInfo, Session#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -504,6 +505,7 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) -> end. deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> + on_delivery_completed(Msg, Session), % {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = @@ -518,7 +520,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = {ok, Session1}; false -> Publish = {PacketId, maybe_ack(Msg)}, - Session1 = await(PacketId, Msg, Session), + Msg2 = mark_begin_deliver(Msg), + Session1 = await(PacketId, Msg2, Session), {ok, [Publish], next_pkt_id(Session1)} end. @@ -621,29 +624,29 @@ retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = Retry true -> {ok, Session}; false -> Now = erlang:system_time(millisecond), - Session2 = check_expire_latency(Now, RetryInterval, Session), - retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), [], Now, - Session2, ClientInfo) + retry_delivery(emqx_inflight:to_list(fun sort_fun/2, Inflight), + [], Now, Session, ClientInfo) end. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) -> {ok, lists:reverse(Acc), Interval, Session}; -retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = - #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) -> +retry_delivery([{PacketId, #inflight_data{timestamp = Ts} = Data} | More], + Acc, Now, Session = #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) -> case (Age = age(Now, Ts)) >= Interval of true -> - {Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo), + {Acc1, Inflight1} = do_retry_delivery(PacketId, Data, Now, Acc, Inflight, ClientInfo), retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo); false -> {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. -do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) -> - Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), - {[{pubrel, PacketId}|Acc], Inflight1}; +do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) -> + Update = Data#inflight_data{timestamp = Now}, + Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), + {[{pubrel, PacketId} | Acc], Inflight1}; -do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) -> +do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, Now, Acc, Inflight, ClientInfo) -> case emqx_message:is_expired(Msg) of true -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), @@ -651,8 +654,9 @@ do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) -> {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), - Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight), - {[{PacketId, Msg1}|Acc], Inflight1} + Update = Data#inflight_data{message = Msg1, timestamp = Now}, + Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), + {[{PacketId, Msg1} | Acc], Inflight1} end. %%-------------------------------------------------------------------- @@ -697,9 +701,9 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}). replay(ClientInfo, Session = #session{inflight = Inflight}) -> - Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) -> + Pubs = lists:map(fun({PacketId, #inflight_data{phase = wait_comp}}) -> {pubrel, PacketId}; - ({PacketId, {Msg, _Ts}}) -> + ({PacketId, #inflight_data{message = Msg}}) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)), case dequeue(ClientInfo, Session) of @@ -755,37 +759,23 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %%-------------------------------------------------------------------- %% Message Latency Stats %%-------------------------------------------------------------------- -update_latency(Msg, - #session{clientid = ClientId, - latency_stats = Stats, - created_at = CreateAt} = S) -> - case get_birth_timestamp(Msg, CreateAt) of - 0 -> S; - Ts -> - Latency = erlang:system_time(millisecond) - Ts, - Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats), - S#session{latency_stats = Stats2} - end. +on_delivery_completed(Msg, + #session{created_at = CreateAt, clientid = ClientId}) -> + emqx:run_hook('delivery.completed', + [Msg, + #{session_birth_time => CreateAt, clientid => ClientId}]). -check_expire_latency(Now, Interval, - #session{clientid = ClientId, latency_stats = Stats} = S) -> - Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats), - S#session{latency_stats = Stats2}. - -get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> - Ts; - -get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> - Ts; - -get_birth_timestamp(_, _) -> - 0. +mark_begin_deliver(Msg) -> + emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- -sort_fun() -> - fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end. + +-compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}). + +sort_fun(A, B) -> + A#inflight_data.timestamp =< B#inflight_data.timestamp. batch_n(Inflight) -> case emqx_inflight:max_size(Inflight) of @@ -794,7 +784,9 @@ batch_n(Inflight) -> end. with_ts(Msg) -> - {Msg, erlang:system_time(millisecond)}. + #inflight_data{phase = wait_ack, + message = Msg, + timestamp = erlang:system_time(millisecond)}. age(Now, Ts) -> Now - Ts. diff --git a/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl b/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl deleted file mode 100644 index 68927f5e0..000000000 --- a/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl +++ /dev/null @@ -1,120 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_message_latency_stats). - -%% API --export([new/1, update/3, check_expire/4, latency/1]). - --export([get_threshold/0, update_threshold/1]). - --define(NOW, erlang:system_time(millisecond)). --define(MINIMUM_INSERT_INTERVAL, 1000). --define(MINIMUM_THRESHOLD, 100). --define(DEFAULT_THRESHOLD, 500). --define(DEFAULT_SAMPLES, 10). --define(THRESHOLD_KEY, {?MODULE, threshold}). - --opaque stats() :: #{ ema := emqx_moving_average:ema() - , last_update_time := timestamp() - , last_access_time := timestamp() %% timestamp of last access top-k - , last_insert_value := non_neg_integer() - }. - --type timestamp() :: non_neg_integer(). --type timespan() :: number(). - --type latency_type() :: average - | expire. - --type create_options() :: #{samples => pos_integer()}. - --export_type([stats/0, latency_type/0, create_options/0]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- --spec new(non_neg_integer() | create_options()) -> stats(). -new(SamplesT) when is_integer(SamplesT) -> - Samples = erlang:max(1, SamplesT), - #{ ema => emqx_moving_average:new(exponential, #{period => Samples}) - , last_update_time => 0 - , last_access_time => 0 - , last_insert_value => 0 - }; - -new(OptsT) -> - Opts = maps:merge(#{samples => ?DEFAULT_SAMPLES}, OptsT), - #{samples := Samples} = Opts, - new(Samples). - --spec update(emqx_types:clientid(), number(), stats()) -> stats(). -update(ClientId, Val, #{ema := EMA} = Stats) -> - Now = ?NOW, - #{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA), - Stats2 = call_hook(ClientId, Now, average, Latency, Stats), - Stats2#{ ema := EMA2 - , last_update_time := ?NOW}. - --spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats(). -check_expire(_, Now, Interval, #{last_update_time := LUT} = S) - when LUT >= Now - Interval -> - S; - -check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) -> - Latency = Now - LUT, - call_hook(ClientId, Now, expire, Latency, S). - --spec latency(stats()) -> number(). -latency(#{ema := #{average := Average}}) -> - Average. - --spec update_threshold(pos_integer()) -> pos_integer(). -update_threshold(Threshold) -> - Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD), - persistent_term:put(?THRESHOLD_KEY, Val), - Val. - -get_threshold() -> - persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- --spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats(). -call_hook(_, _, _, Latency, S) - when Latency =< ?MINIMUM_THRESHOLD -> - S; - -call_hook(_, Now, _, _, #{last_access_time := LIT} = S) - when Now =< LIT + ?MINIMUM_INSERT_INTERVAL -> - S; - -call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) -> - case Latency =< get_threshold() of - true -> - Stats#{last_access_time := Now}; - _ -> - ToInsert = erlang:floor(Latency), - Arg = #{clientid => ClientId, - latency => ToInsert, - type => Type, - last_insert_value => LIV, - update_time => Now}, - emqx:run_hook('message.slow_subs_stats', [Arg]), - Stats#{last_insert_value := ToInsert, - last_access_time := Now} - end. diff --git a/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl b/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl deleted file mode 100644 index 7f8b037f6..000000000 --- a/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl +++ /dev/null @@ -1,90 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @see https://en.wikipedia.org/wiki/Moving_average - --module(emqx_moving_average). - -%% API --export([new/0, new/1, new/2, update/2]). - --type type() :: cumulative - | exponential. - --type ema() :: #{ type := exponential - , average := 0 | float() - , coefficient := float() - }. - --type cma() :: #{ type := cumulative - , average := 0 | float() - , count := non_neg_integer() - }. - --type moving_average() :: ema() - | cma(). - --define(DEF_EMA_ARG, #{period => 10}). --define(DEF_AVG_TYPE, exponential). - --export_type([type/0, moving_average/0, ema/0, cma/0]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- --spec new() -> moving_average(). -new() -> - new(?DEF_AVG_TYPE, #{}). - --spec new(type()) -> moving_average(). -new(Type) -> - new(Type, #{}). - --spec new(type(), Args :: map()) -> moving_average(). -new(cumulative, _) -> - #{ type => cumulative - , average => 0 - , count => 0 - }; - -new(exponential, Arg) -> - #{period := Period} = maps:merge(?DEF_EMA_ARG, Arg), - #{ type => exponential - , average => 0 - %% coefficient = 2/(N+1) is a common convention, see the wiki link for details - , coefficient => 2 / (Period + 1) - }. - --spec update(number(), moving_average()) -> moving_average(). - -update(Val, #{average := 0} = Avg) -> - Avg#{average := Val}; - -update(Val, #{ type := cumulative - , average := Average - , count := Count} = CMA) -> - NewCount = Count + 1, - CMA#{average := (Count * Average + Val) / NewCount, - count := NewCount}; - -update(Val, #{ type := exponential - , average := Average - , coefficient := Coefficient} = EMA) -> - EMA#{average := Coefficient * Val + (1 - Coefficient) * Average}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- diff --git a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf index 8378e971c..db7f1594d 100644 --- a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf +++ b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf @@ -20,21 +20,16 @@ slow_subs { ## Value: 10 top_k_num = 10 - ## The interval for pushing statistics table records to the system topic. When set to 0, push is disabled - ## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval - ## publish is disabled if set to 0s. + ## The ways to calculate the latency are as follows: ## - ## Value: 0s - notice_interval = 0s - - ## QoS of notification message + ## 1. whole + ## From the time the message arrives at EMQX until the message completes transmission ## - ## Default: 0 - notice_qos = 0 - - ## Maximum information number in one notification + ## 2.internal + ## From when the message arrives at EMQX until when EMQX starts delivering the message ## - ## Default: 100 - notice_batch_size = 100 - + ## 3.response + ## From the time EMQX starts delivering the message, until the message completes transmission + ## Default: whole + stats_type = whole } diff --git a/apps/emqx_slow_subs/include/emqx_slow_subs.hrl b/apps/emqx_slow_subs/include/emqx_slow_subs.hrl index 2fb7cdfd3..f282037dc 100644 --- a/apps/emqx_slow_subs/include/emqx_slow_subs.hrl +++ b/apps/emqx_slow_subs/include/emqx_slow_subs.hrl @@ -15,16 +15,24 @@ %%-------------------------------------------------------------------- -define(TOPK_TAB, emqx_slow_subs_topk). +-define(INDEX_TAB, emqx_slow_subs_index). --define(INDEX(Latency, ClientId), {Latency, ClientId}). +-define(ID(ClientId, Topic), {ClientId, Topic}). +-define(INDEX(TimeSpan, Id), {Id, TimeSpan}). +-define(TOPK_INDEX(TimeSpan, Id), {TimeSpan, Id}). --define(MAX_TAB_SIZE, 1000). +-define(MAX_SIZE, 1000). --record(top_k, { index :: index() - , type :: emqx_message_latency_stats:latency_type() +-record(top_k, { index :: topk_index() , last_update_time :: pos_integer() , extra = [] }). +-record(index_tab, { index :: index()}). + -type top_k() :: #top_k{}. --type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()). +-type index_tab() :: #index_tab{}. + +-type id() :: {emqx_types:clientid(), emqx_types:topic()}. +-type index() :: ?INDEX(non_neg_integer(), id()). +-type topk_index() :: ?TOPK_INDEX(non_neg_integer(), id()). diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index c8ef8a5d2..2ae8c0b31 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -22,8 +22,8 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). --export([ start_link/0, on_stats_update/2, update_settings/1 - , clear_history/0, init_topk_tab/0, post_config_update/5 +-export([ start_link/0, on_delivery_completed/4, update_settings/1 + , clear_history/0, init_tab/0, post_config_update/5 ]). %% gen_server callbacks @@ -40,29 +40,19 @@ -type state() :: #{ enable := boolean() , last_tick_at := pos_integer() , expire_timer := undefined | reference() - , notice_timer := undefined | reference() }. --type log() :: #{ rank := pos_integer() - , clientid := emqx_types:clientid() - , latency := non_neg_integer() - , type := emqx_message_latency_stats:latency_type() - }. - --type window_log() :: #{ last_tick_at := pos_integer() - , logs := [log()] - }. - -type message() :: #message{}. --type stats_update_args() :: #{ clientid := emqx_types:clientid() - , latency := non_neg_integer() - , type := emqx_message_latency_stats:latency_type() - , last_insert_value := non_neg_integer() - , update_time := timer:time() - }. +-type stats_type() :: whole %% whole = internal + response + | internal %% timespan from message in to deliver + | response. %% timespan from delivery to client response --type stats_update_env() :: #{max_size := pos_integer()}. +-type stats_update_args() :: #{session_birth_time := pos_integer()}. + +-type stats_update_env() :: #{ threshold := non_neg_integer() + , stats_type := stats_type() + , max_size := pos_integer()}. -ifdef(TEST). -define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). @@ -71,7 +61,6 @@ -endif. -define(NOW, erlang:system_time(millisecond)). --define(NOTICE_TOPIC_NAME, "slow_subs"). -define(DEF_CALL_TIMEOUT, timer:seconds(10)). %% erlang term order @@ -88,37 +77,32 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -%% XXX NOTE:pay attention to the performance here --spec on_stats_update(stats_update_args(), stats_update_env()) -> true. -on_stats_update(#{clientid := ClientId, - latency := Latency, - type := Type, - last_insert_value := LIV, - update_time := Ts}, - #{max_size := MaxSize}) -> +on_delivery_completed(_ClientInfo, + #message{timestamp = Ts}, + #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> + ok; - LastIndex = ?INDEX(LIV, ClientId), - Index = ?INDEX(Latency, ClientId), +on_delivery_completed(ClientInfo, Msg, Env, Cfg) -> + on_delivery_completed(ClientInfo, Msg, Env, erlang:system_time(millisecond), Cfg). - %% check whether the client is in the table - case ets:lookup(?TOPK_TAB, LastIndex) of - [#top_k{index = Index}] -> - %% if last value == the new value, update the type and last_update_time - %% XXX for clients whose latency are stable for a long time, is it - %% possible to reduce updates? - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}); - [_] -> - %% if Latency > minimum value, we should update it - %% if Latency < minimum value, maybe it can replace the minimum value - %% so always update at here - %% do we need check if Latency == minimum ??? - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}), - ets:delete(?TOPK_TAB, LastIndex); - [] -> - %% try to insert - try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) +on_delivery_completed(#{clientid := ClientId}, + #message{topic = Topic} = Msg, + _Env, + Now, + #{threshold := Threshold, + stats_type := StatsType, + max_size := MaxSize}) -> + TimeSpan = calc_timespan(StatsType, Msg, Now), + case TimeSpan =< Threshold of + true -> ok; + _ -> + Id = ?ID(ClientId, Topic), + LastUpdateValue = find_last_update_value(Id), + case TimeSpan =< LastUpdateValue of + true -> ok; + _ -> + try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) + end end. clear_history() -> @@ -127,21 +111,20 @@ clear_history() -> update_settings(Conf) -> emqx_conf:update([slow_subs], Conf, #{override_to => cluster}). -init_topk_tab() -> - case ets:whereis(?TOPK_TAB) of - undefined -> - ?TOPK_TAB = ets:new(?TOPK_TAB, - [ ordered_set, public, named_table - , {keypos, #top_k.index}, {write_concurrency, true} - , {read_concurrency, true} - ]); - _ -> - ?TOPK_TAB - end. - post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) -> gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT). +init_tab() -> + safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table + , {keypos, #top_k.index}, {write_concurrency, true} + , {read_concurrency, true} + ]), + + safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table + , {keypos, #index_tab.index}, {write_concurrency, true} + , {read_concurrency, true} + ]). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -151,8 +134,7 @@ init([]) -> InitState = #{enable => false, last_tick_at => 0, - expire_timer => undefined, - notice_timer => undefined + expire_timer => undefined }, Enable = emqx:get_config([slow_subs, enable]), @@ -164,7 +146,7 @@ handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) -> {reply, ok, State2}; handle_call(clear_history, _, State) -> - ets:delete_all_objects(?TOPK_TAB), + do_clear_history(), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -181,12 +163,6 @@ handle_info(expire_tick, State) -> State1 = start_timer(expire_timer, fun expire_tick/0, State), {noreply, State1}; -handle_info(notice_tick, State) -> - Logs = ets:tab2list(?TOPK_TAB), - do_notification(Logs, State), - State1 = start_timer(notice_timer, fun notice_tick/0, State), - {noreply, State1#{last_tick_at := ?NOW}}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -204,123 +180,127 @@ code_change(_OldVsn, State, _Extra) -> expire_tick() -> erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). -notice_tick() -> - case emqx:get_config([slow_subs, notice_interval]) of - 0 -> undefined; - Interval -> - erlang:send_after(Interval, self(), ?FUNCTION_NAME) - end. - --spec do_notification(list(), state()) -> ok. -do_notification([], _) -> - ok; - -do_notification(Logs, #{last_tick_at := LastTickTime}) -> - start_publish(Logs, LastTickTime), - ok. - -start_publish(Logs, TickTime) -> - emqx_pool:async_submit({fun do_publish/3, [Logs, erlang:length(Logs), TickTime]}). - -do_publish([], _, _) -> - ok; - -do_publish(Logs, Rank, TickTime) -> - BatchSize = emqx:get_config([slow_subs, notice_batch_size]), - do_publish(Logs, BatchSize, Rank, TickTime, []). - -do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 -> - Cache2 = [convert_to_notice(Rank, Log) | Cache], - do_publish(T, Size - 1, Rank - 1, TickTime, Cache2); - -do_publish(Logs, Size, Rank, TickTime, Cache) when Size =:= 0 -> - publish(TickTime, Cache), - do_publish(Logs, Rank, TickTime); - -do_publish([], _, _Rank, TickTime, Cache) -> - publish(TickTime, Cache), - ok. - -convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId), - type = Type, - last_update_time = Ts}) -> - #{rank => Rank, - clientid => ClientId, - latency => Latency, - type => Type, - timestamp => Ts}. - -publish(TickTime, Notices) -> - WindowLog = #{last_tick_at => TickTime, - logs => lists:reverse(Notices)}, - Payload = emqx_json:encode(WindowLog), - Msg = #message{ id = emqx_guid:gen() - , qos = emqx:get_config([slow_subs, notice_qos]) - , from = ?MODULE - , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME) - , payload = Payload - , timestamp = ?NOW - }, - _ = emqx_broker:safe_publish(Msg), - ok. - load(State) -> - MaxSizeT = emqx:get_config([slow_subs, top_k_num]), - MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE), - _ = emqx:hook('message.slow_subs_stats', - {?MODULE, on_stats_update, [#{max_size => MaxSize}]} - ), + #{top_k_num := MaxSizeT, + stats_type := StatsType, + threshold := Threshold} = emqx:get_config([slow_subs]), + MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE), + _ = emqx:hook('delivery.completed', + {?MODULE, on_delivery_completed, + [#{max_size => MaxSize, + stats_type => StatsType, + threshold => Threshold + }]}), - State1 = start_timer(notice_timer, fun notice_tick/0, State), - State2 = start_timer(expire_timer, fun expire_tick/0, State1), - State2#{enable := true, last_tick_at => ?NOW}. + State1 = start_timer(expire_timer, fun expire_tick/0, State), + State1#{enable := true, last_tick_at => ?NOW}. - -unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) -> - emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}), - State#{notice_timer := cancel_timer(NoticeTimer), - expire_timer := cancel_timer(ExpireTimer) - }. +unload(#{expire_timer := ExpireTimer} = State) -> + emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}), + State#{expire_timer := cancel_timer(ExpireTimer)}. do_clear(Logs) -> Now = ?NOW, Interval = emqx:get_config([slow_subs, expire_interval]), - Each = fun(#top_k{index = Index, last_update_time = Ts}) -> + Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) -> case Now - Ts >= Interval of true -> - ets:delete(?TOPK_TAB, Index); + delete_with_index(TimeSpan, Id); _ -> true - end + end end, lists:foreach(Each, Logs). -try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) -> +-spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer(). +calc_timespan(whole, #message{timestamp = Ts}, Now) -> + Now - Ts; + +calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) -> + End = emqx_message:get_header(deliver_begin_at, Msg, Now), + End - Ts; + +calc_timespan(response, Msg, Now) -> + Begin = emqx_message:get_header(deliver_begin_at, Msg, Now), + Now - Begin. + +%% update_topk is safe, because each process has a unique clientid +%% insert or delete are bind to this clientid, so there is no race condition +%% +%% but, the delete_with_index in L249 may have a race condition +%% because the data belong to other clientid will be deleted here +%% (deleted the data written by other processes). +%% so it may appear that: +%% when deleting a record, the other process is performing an update operation on this recrod +%% in order to solve this race condition problem, the index table also uses the ordered_set type, +%% so that even if the above situation occurs, it will only cause the old data to be deleted twice +%% and the correctness of the data will not be affected + +try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) -> case ets:info(?TOPK_TAB, size) of Size when Size < MaxSize -> - %% if the size is under limit, insert it directly - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}); + update_topk(Now, LastUpdateValue, TimeSpan, Id); _Size -> - %% find the minimum value - ?INDEX(Min, _) = First = - case ets:first(?TOPK_TAB) of - ?INDEX(_, _) = I -> I; - _ -> ?INDEX(Latency - 1, <<>>) - end, - - case Latency =< Min of - true -> true; - _ -> - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}), - - ets:delete(?TOPK_TAB, First) + case ets:first(?TOPK_TAB) of + '$end_of_table' -> + update_topk(Now, LastUpdateValue, TimeSpan, Id); + ?TOPK_INDEX(_, Id) -> + update_topk(Now, LastUpdateValue, TimeSpan, Id); + ?TOPK_INDEX(Min, MinId) -> + case TimeSpan =< Min of + true -> false; + _ -> + update_topk(Now, LastUpdateValue, TimeSpan, Id), + delete_with_index(Min, MinId) + end end end. + +-spec find_last_update_value(id()) -> non_neg_integer(). +find_last_update_value(Id) -> + case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of + ?INDEX(LastUpdateValue, Id) -> + LastUpdateValue; + _ -> + 0 + end. + +-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true. +update_topk(Now, LastUpdateValue, TimeSpan, Id) -> + %% update record + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id), + last_update_time = Now, + extra = [] + }), + + %% update index + ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}), + + %% delete the old record & index + delete_with_index(LastUpdateValue, Id). + +-spec delete_with_index(non_neg_integer(), id()) -> true. +delete_with_index(0, _) -> + true; + +delete_with_index(TimeSpan, Id) -> + ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)), + ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)). + +safe_create_tab(Name, Opts) -> + case ets:whereis(Name) of + undefined -> + Name = ets:new(Name, Opts); + _ -> + Name + end. + +do_clear_history() -> + ets:delete_all_objects(?INDEX_TAB), + ets:delete_all_objects(?TOPK_TAB). + check_enable(Enable, #{enable := IsEnable} = State) -> - update_threshold(), case Enable of IsEnable -> State; @@ -330,11 +310,6 @@ check_enable(Enable, #{enable := IsEnable} = State) -> unload(State) end. -update_threshold() -> - Threshold = emqx:get_config([slow_subs, threshold]), - emqx_message_latency_stats:update_threshold(Threshold), - ok. - start_timer(Name, Fun, State) -> _ = cancel_timer(maps:get(Name, State)), State#{Name := Fun()}. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index ee2016268..271afa626 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -23,14 +23,14 @@ -export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]). --export([slow_subs/2, encode_record/1, settings/2]). +-export([slow_subs/2, get_history/0, settings/2]). -import(hoconsc, [mk/2, ref/1]). -import(emqx_mgmt_util, [bad_request/0]). --define(FORMAT_FUN, {?MODULE, encode_record}). -define(APP, emqx_slow_subs). -define(APP_NAME, <<"emqx_slow_subs">>). +-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)). namespace() -> "slow_subscribers_statistics". @@ -74,12 +74,13 @@ schema("/slow_subscriptions/settings") -> fields(record) -> [ {clientid, mk(string(), #{desc => <<"the clientid">>})}, - {latency, + {node, + mk(string(), #{desc => <<"the node">>})}, + {topic, + mk(string(), #{desc => <<"the topic">>})}, + {timespan, mk(integer(), - #{desc => <<"average time for message delivery or time for message expire">>})}, - {type, - mk(string(), - #{desc => <<"type of the latency, could be average or expire">>})}, + #{desc => <<"timespan for message transmission">>})}, {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})} ]. @@ -89,24 +90,49 @@ conf_schema() -> hoconsc:mk(Ref, #{}). slow_subs(delete, _) -> - ok = emqx_slow_subs:clear_history(), + Nodes = mria_mnesia:running_nodes(), + _ = [rpc_call(Node, emqx_slow_subs, clear_history, [], ok, ?DEFAULT_RPC_TIMEOUT) + || Node <- Nodes], {204}; -slow_subs(get, #{query_string := QST}) -> - LimitT = maps:get(<<"limit">>, QST, ?MAX_TAB_SIZE), - Limit = erlang:min(?MAX_TAB_SIZE, emqx_mgmt_api:b2i(LimitT)), - Page = maps:get(<<"page">>, QST, 1), - QS = QST#{<<"limit">> => Limit, <<"page">> => Page}, - Data = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, QS, ?FORMAT_FUN), - {200, Data}. +slow_subs(get, _) -> + Nodes = mria_mnesia:running_nodes(), + Fun = fun(Node, Acc) -> + NodeRankL = rpc_call(Node, + ?MODULE, + get_history, + [], + [], + ?DEFAULT_RPC_TIMEOUT), + NodeRankL ++ Acc + end, -encode_record(#top_k{index = ?INDEX(Latency, ClientId), - type = Type, - last_update_time = Ts}) -> - #{clientid => ClientId, - latency => Latency, - type => Type, - last_update_time => Ts}. + RankL = lists:foldl(Fun, [], Nodes), + + SortFun = fun(#{timespan := A}, #{timespan := B}) -> + A > B + end, + + SortedL = lists:sort(SortFun, RankL), + SortedL2 = lists:sublist(SortedL, ?MAX_SIZE), + + {200, SortedL2}. + +get_history() -> + Node = node(), + RankL = ets:tab2list(?TOPK_TAB), + ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)), + last_update_time = LastUpdateTime + }) -> + #{ clientid => ClientId + , node => Node + , topic => Topic + , timespan => TimeSpan + , last_update_time => LastUpdateTime + } + end, + + lists:map(ConvFun, RankL). settings(get, _) -> {200, emqx:get_raw_config([slow_subs], #{})}; @@ -114,3 +140,12 @@ settings(get, _) -> settings(put, #{body := Body}) -> _ = emqx_slow_subs:update_settings(Body), {200, emqx:get_raw_config([slow_subs], #{})}. + +rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() -> + erlang:apply(M, F, A); + +rpc_call(Node, M, F, A, ErrorR, T) -> + case rpc:call(Node, M, F, A, T) of + {badrpc, _} -> ErrorR; + Res -> Res + end. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl index 91beca4ae..d625c4f26 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -22,21 +22,10 @@ fields("slow_subs") -> sc(integer(), 10, "The maximum number of records in the slow subscription statistics record table")} - , {notice_interval, - sc(emqx_schema:duration_ms(), - "0s", - "The interval for pushing statistics table records to the system topic. " - "publish top-k list to $SYS/brokers/${node}/slow_subs per notice_interval. " - "publish is disabled if set to 0s." - )} - , {notice_qos, - sc(range(0, 2), - 0, - "QoS of notification message in notice topic")} - , {notice_batch_size, - sc(integer(), - 100, - "Maximum information number in one notification")} + , {stats_type, + sc(hoconsc:union([whole, internal, response]), + whole, + "The method to calculate the latency")} ]. %%-------------------------------------------------------------------- From b09683bfcd49454b629cde25b747133d000898aa Mon Sep 17 00:00:00 2001 From: lafirest Date: Wed, 9 Feb 2022 18:18:06 +0800 Subject: [PATCH 2/5] fix(emqx_slow_subs): fix some errors and test cases --- apps/emqx/src/emqx_cm.erl | 3 +- apps/emqx/src/emqx_schema.erl | 7 ---- apps/emqx/src/emqx_session.erl | 8 ++--- apps/emqx/src/emqx_zone_schema.erl | 2 +- apps/emqx_slow_subs/src/emqx_slow_subs.erl | 16 ++++----- .../emqx_slow_subs/src/emqx_slow_subs_sup.erl | 2 +- .../test/emqx_slow_subs_SUITE.erl | 26 +++----------- .../test/emqx_slow_subs_api_SUITE.erl | 34 +++++++------------ 8 files changed, 29 insertions(+), 69 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 3c46c6b54..23c47660c 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -322,8 +322,7 @@ get_session_confs(#{zone := Zone, clientid := ClientId}, #{receive_maximum := Ma %% TODO: Add conf for allowing/disallowing persistent sessions. %% Note that the connection info is already enriched to have %% default config values for session expiry. - is_persistent => EI > 0, - latency_stats => emqx_config:get_zone_conf(Zone, [latency_stats]) + is_persistent => EI > 0 }. mqueue_confs(Zone) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ad6d7c9ac..eba7d2617 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -182,9 +182,6 @@ this number of messages or bytes have passed through.""" , {"persistent_session_store", sc(ref("persistent_session_store"), #{})} - , {"latency_stats", - sc(ref("latency_stats"), - #{})} , {"trace", sc(ref("trace"), #{desc => """ @@ -1105,10 +1102,6 @@ when deactivated, but after the retention time. } ]; -fields("latency_stats") -> - [ {"samples", sc(integer(), #{default => 10, - desc => "the number of samples for calculate the average latency of delivery"})} - ]; fields("trace") -> [ {"payload_encode", sc(hoconsc:enum([hex, text, hidden]), #{ default => text, diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 5e65b67e5..7851ae56f 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -183,7 +183,6 @@ , mqueue => emqx_mqueue:options() , is_persistent => boolean() , clientid => emqx_types:clientid() - , latency_stats => emqx_message_latency_stats:create_options() }. %%-------------------------------------------------------------------- @@ -211,8 +210,7 @@ init(Opts) -> awaiting_rel = #{}, max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000), - created_at = erlang:system_time(millisecond), - latency_stats = emqx_message_latency_stats:new(maps:get(latency_stats, Opts, #{})) + created_at = erlang:system_time(millisecond) }. %%-------------------------------------------------------------------- @@ -268,9 +266,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt; -info(latency_stats, #session{latency_stats = Stats}) -> - emqx_message_latency_stats:latency(Stats). + CreatedAt. %% @doc Get stats of the session. -spec(stats(session()) -> emqx_types:stats()). diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl index c2dfcf1a6..140cd1aca 100644 --- a/apps/emqx/src/emqx_zone_schema.erl +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -24,7 +24,7 @@ namespace() -> zone. %% roots are added only for document generation. roots() -> ["mqtt", "stats", "flapping_detect", "force_shutdown", "conn_congestion", "rate_limit", "quota", "force_gc", - "overload_protection", "latency_stats" + "overload_protection" ]. %% zone schemas are clones from the same name from root level diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index 2ae8c0b31..bd61ff5a6 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -22,7 +22,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). --export([ start_link/0, on_delivery_completed/4, update_settings/1 +-export([ start_link/0, on_delivery_completed/3, update_settings/1 , clear_history/0, init_tab/0, post_config_update/5 ]). @@ -77,17 +77,15 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -on_delivery_completed(_ClientInfo, - #message{timestamp = Ts}, - #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> +on_delivery_completed(#message{timestamp = Ts}, + #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> ok; -on_delivery_completed(ClientInfo, Msg, Env, Cfg) -> - on_delivery_completed(ClientInfo, Msg, Env, erlang:system_time(millisecond), Cfg). +on_delivery_completed(Msg, Env, Cfg) -> + on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg). -on_delivery_completed(#{clientid := ClientId}, - #message{topic = Topic} = Msg, - _Env, +on_delivery_completed(#message{topic = Topic} = Msg, + #{clientid := ClientId}, Now, #{threshold := Threshold, stats_type := StatsType, diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl index ce2c55a15..c4c5625e0 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_sup.erl @@ -26,7 +26,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - emqx_slow_subs:init_topk_tab(), + emqx_slow_subs:init_tab(), {ok, {{one_for_one, 10, 3600}, [#{id => st_statistics, start => {emqx_slow_subs, start_link, []}, diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl index 79c4bb600..21ca43e87 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl @@ -31,10 +31,8 @@ slow_subs { enable = true top_k_num = 5, expire_interval = 3000 - notice_interval = 1500 - notice_qos = 0 - notice_batch_size = 3 -}""">>). + stats_type = whole + }""">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -60,7 +58,6 @@ t_log_and_pub(_) -> %% Sub topic first Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}], Clients = start_client(Subs), - emqx:subscribe("$SYS/brokers/+/slow_subs"), timer:sleep(1000), Now = ?NOW, %% publish @@ -82,15 +79,9 @@ t_log_and_pub(_) -> timer:sleep(1000), Size = ets:info(?TOPK_TAB, size), %% some time record maybe delete due to it expired - ?assert(Size =< 6 andalso Size >= 4), + ?assert(Size =< 6 andalso Size > 3), - timer:sleep(1500), - Recs = try_receive([]), - RecSum = lists:sum(Recs), - ?assert(RecSum >= 5), - ?assert(lists:all(fun(E) -> E =< 3 end, Recs)), - - timer:sleep(3000), + timer:sleep(4000), ?assert(ets:info(?TOPK_TAB, size) =:= 0), [Client ! stop || Client <- Clients], ok. @@ -113,12 +104,3 @@ client(I, Subs) -> stop -> ok end. - -try_receive(Acc) -> - receive - {deliver, _, #message{payload = Payload}} -> - #{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]), - try_receive([length(Logs) | Acc]) - after 500 -> - Acc - end. diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl index fc2162863..7b25ef634 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl @@ -40,9 +40,7 @@ slow_subs enable = true top_k_num = 5, expire_interval = 60000 - notice_interval = 0 - notice_qos = 0 - notice_batch_size = 3 + stats_type = whole }""">>). @@ -92,8 +90,7 @@ t_get_history(_) -> Now = ?NOW, Each = fun(I) -> ClientId = erlang:list_to_binary(io_lib:format("test_~p", [I])), - ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(I, ClientId), - type = average, + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(ClientId, <<"topic">>)), last_update_time = Now}) end, @@ -101,18 +98,16 @@ t_get_history(_) -> {ok, Data} = request_api(get, api_path(["slow_subscriptions"]), "_page=1&_limit=10", auth_header_()), - #{<<"data">> := [First | _]} = emqx_json:decode(Data, [return_maps]), + [First | _] = emqx_json:decode(Data, [return_maps]), - RFirst = #{<<"clientid">> => <<"test_5">>, - <<"latency">> => 5, - <<"type">> => <<"average">>, - <<"last_update_time">> => Now}, - - ?assertEqual(RFirst, First). + ?assertMatch(#{<<"clientid">> := <<"test_5">>, + <<"topic">> := <<"topic">>, + <<"last_update_time">> := Now, + <<"node">> := _, + <<"timespan">> := _}, First). t_clear(_) -> - ets:insert(?TOPK_TAB, #top_k{index = ?INDEX(1, <<"test">>), - type = average, + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(1, ?ID(<<"clientid">>, <<"topic">>)), last_update_time = ?NOW}), {ok, _} = request_api(delete, api_path(["slow_subscriptions"]), [], @@ -122,7 +117,7 @@ t_clear(_) -> t_settting(_) -> Conf = emqx:get_config([slow_subs]), - Conf2 = Conf#{threshold => 1000}, + Conf2 = Conf#{stats_type => internal}, {ok, Data} = request_api(put, api_path(["slow_subscriptions", "settings"]), [], @@ -131,22 +126,19 @@ t_settting(_) -> Return = decode_json(Data), - ?assertEqual(Conf2, Return), + ?assertEqual(Conf2#{stats_type := <<"internal">>}, Return), {ok, GetData} = request_api(get, api_path(["slow_subscriptions", "settings"]), [], auth_header_() - ), + ), timer:sleep(1000), GetReturn = decode_json(GetData), - ?assertEqual(Conf2, GetReturn), - - ?assertEqual(1000, - emqx_message_latency_stats:get_threshold()). + ?assertEqual(Conf2#{stats_type := <<"internal">>}, GetReturn). decode_json(Data) -> BinJosn = emqx_json:decode(Data, [return_maps]), From 2fcc24dea600699348f9a89d243da16b0afdfa34 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 10 Feb 2022 14:30:49 +0800 Subject: [PATCH 3/5] fix(emqx_slow_subs): fix test case error --- apps/emqx/src/emqx_session.erl | 22 +++++----- apps/emqx/test/emqx_session_SUITE.erl | 63 +++++++++++++++++++++------ 2 files changed, 61 insertions(+), 24 deletions(-) diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 7851ae56f..003a4cd27 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -129,9 +129,8 @@ %% Awaiting PUBREL Timeout (Unit: millisecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer(), + created_at :: pos_integer() %% Message deliver latency stats - latency_stats :: emqx_message_latency_stats:stats() }). @@ -615,7 +614,7 @@ await(PacketId, Msg, Session = #session{inflight = Inflight}) -> -spec(retry(emqx_types:clientinfo(), session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). -retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = RetryInterval}) -> +retry(ClientInfo, Session = #session{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; false -> @@ -637,12 +636,8 @@ retry_delivery([{PacketId, #inflight_data{timestamp = Ts} = Data} | More], {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. -do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) -> - Update = Data#inflight_data{timestamp = Now}, - Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), - {[{pubrel, PacketId} | Acc], Inflight1}; - -do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, Now, Acc, Inflight, ClientInfo) -> +do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, + Now, Acc, Inflight, ClientInfo) -> case emqx_message:is_expired(Msg) of true -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), @@ -653,7 +648,12 @@ do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Da Update = Data#inflight_data{message = Msg1, timestamp = Now}, Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), {[{PacketId, Msg1} | Acc], Inflight1} - end. + end; + +do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) -> + Update = Data#inflight_data{timestamp = Now}, + Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), + {[{pubrel, PacketId} | Acc], Inflight1}. %%-------------------------------------------------------------------- %% Expire Awaiting Rel @@ -770,7 +770,7 @@ mark_begin_deliver(Msg) -> -compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}). -sort_fun(A, B) -> +sort_fun({_, A}, {_, B}) -> A#inflight_data.timestamp =< B#inflight_data.timestamp. batch_n(Inflight) -> diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index 9932424d7..fe8b78c24 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -25,7 +25,12 @@ all() -> emqx_common_test_helpers:all(?MODULE). -define(NOW, erlang:system_time(millisecond)). --record(pubrel_await, {timestamp :: non_neg_integer()}). + +-type inflight_data_phase() :: wait_ack | wait_comp. + +-record(inflight_data, { phase :: inflight_data_phase() + , message :: emqx_types:message() + , timestamp :: non_neg_integer()}). %%-------------------------------------------------------------------- %% CT callbacks @@ -167,14 +172,14 @@ t_is_awaiting_full_true(_) -> t_puback(_) -> Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<>>), - Inflight = emqx_inflight:insert(1, {Msg, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg), emqx_inflight:new()), Session = session(#{inflight => Inflight, mqueue => mqueue()}), {ok, Msg, Session1} = emqx_session:puback(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). t_puback_with_dequeue(_) -> Msg1 = emqx_message:make(clientid, ?QOS_1, <<"t1">>, <<"payload1">>), - Inflight = emqx_inflight:insert(1, {Msg1, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_ack, Msg1), emqx_inflight:new()), Msg2 = emqx_message:make(clientid, ?QOS_1, <<"t2">>, <<"payload2">>), {_, Q} = emqx_mqueue:in(Msg2, mqueue(#{max_len => 10})), Session = session(#{inflight => Inflight, mqueue => Q}), @@ -184,7 +189,7 @@ t_puback_with_dequeue(_) -> ?assertEqual(<<"t2">>, emqx_message:topic(Msg3)). t_puback_error_packet_id_in_use(_) -> - Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:puback(clientinfo(), 1, session(#{inflight => Inflight})). @@ -193,13 +198,13 @@ t_puback_error_packet_id_not_found(_) -> t_pubrec(_) -> Msg = emqx_message:make(test, ?QOS_2, <<"t">>, <<>>), - Inflight = emqx_inflight:insert(2, {Msg, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(2, with_ts(wait_ack, Msg), emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Msg, Session1} = emqx_session:pubrec(clientinfo(), 2, Session), - ?assertMatch([{{pubrel_await, _}, _}], emqx_inflight:values(emqx_session:info(inflight, Session1))). + ?assertMatch([#inflight_data{phase = wait_comp}], emqx_inflight:values(emqx_session:info(inflight, Session1))). t_pubrec_packet_id_in_use_error(_) -> - Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()), {error, ?RC_PACKET_IDENTIFIER_IN_USE} = emqx_session:pubrec(clientinfo(), 1, session(#{inflight => Inflight})). @@ -215,7 +220,7 @@ t_pubrel_error_packetid_not_found(_) -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} = emqx_session:pubrel(clientinfo(), 1, session()). t_pubcomp(_) -> - Inflight = emqx_inflight:insert(1, {#pubrel_await{timestamp = ?NOW}, ts(millisecond)}, emqx_inflight:new()), + Inflight = emqx_inflight:insert(1, with_ts(wait_comp, undefined), emqx_inflight:new()), Session = session(#{inflight => Inflight}), {ok, Session1} = emqx_session:pubcomp(clientinfo(), 1, Session), ?assertEqual(0, emqx_session:info(inflight_cnt, Session1)). @@ -272,9 +277,11 @@ t_deliver_qos1(_) -> ?assertEqual(2, emqx_session:info(inflight_cnt, Session1)), ?assertEqual(<<"t1">>, emqx_message:topic(Msg1)), ?assertEqual(<<"t2">>, emqx_message:topic(Msg2)), - {ok, Msg1, Session2} = emqx_session:puback(clientinfo(), 1, Session1), + {ok, Msg1T, Session2} = emqx_session:puback(clientinfo(), 1, Session1), + ?assertEqual(Msg1, remove_deliver_flag(Msg1T)), ?assertEqual(1, emqx_session:info(inflight_cnt, Session2)), - {ok, Msg2, Session3} = emqx_session:puback(clientinfo(), 2, Session2), + {ok, Msg2T, Session3} = emqx_session:puback(clientinfo(), 2, Session2), + ?assertEqual(Msg2, remove_deliver_flag(Msg2T)), ?assertEqual(0, emqx_session:info(inflight_cnt, Session3)). t_deliver_qos2(_) -> @@ -319,8 +326,9 @@ t_retry(_) -> {ok, Pubs, Session1} = emqx_session:deliver(clientinfo(), Delivers, Session), ElapseMs = 200, %% 0.2s ok = timer:sleep(ElapseMs), - Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], - {ok, Msgs1, 100, Session2} = emqx_session:retry(clientinfo(), Session1), + Msgs1 = [{I, with_ts(wait_ack, emqx_message:set_flag(dup, Msg))} || {I, Msg} <- Pubs], + {ok, Msgs1T, 100, Session2} = emqx_session:retry(clientinfo(), Session1), + ?assertEqual(inflight_data_to_msg(Msgs1), remove_deliver_flag(Msgs1T)), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). %%-------------------------------------------------------------------- @@ -344,7 +352,7 @@ t_replay(_) -> Session2 = emqx_session:enqueue(clientinfo(), Msg, Session1), Pubs1 = [{I, emqx_message:set_flag(dup, M)} || {I, M} <- Pubs], {ok, ReplayPubs, Session3} = emqx_session:replay(clientinfo(), Session2), - ?assertEqual(Pubs1 ++ [{3, Msg}], ReplayPubs), + ?assertEqual(Pubs1 ++ [{3, Msg}], remove_deliver_flag(ReplayPubs)), ?assertEqual(3, emqx_session:info(inflight_cnt, Session3)). t_expire_awaiting_rel(_) -> @@ -404,3 +412,32 @@ ts(second) -> erlang:system_time(second); ts(millisecond) -> erlang:system_time(millisecond). + +with_ts(Phase, Msg) -> + with_ts(Phase, Msg, erlang:system_time(millisecond)). + +with_ts(Phase, Msg, Ts) -> + #inflight_data{phase = Phase, + message = Msg, + timestamp = Ts}. + +remove_deliver_flag({Id, Data}) -> + {Id, remove_deliver_flag(Data)}; + +remove_deliver_flag(#inflight_data{message = Msg} = Data) -> + Data#inflight_data{message = remove_deliver_flag(Msg)}; + +remove_deliver_flag(List) when is_list(List) -> + lists:map(fun remove_deliver_flag/1, List); + +remove_deliver_flag(Msg) -> + emqx_message:remove_header(deliver_begin_at, Msg). + +inflight_data_to_msg({Id, Data}) -> + {Id, inflight_data_to_msg(Data)}; + +inflight_data_to_msg(#inflight_data{message = Msg}) -> + Msg; + +inflight_data_to_msg(List) when is_list(List) -> + lists:map(fun inflight_data_to_msg/1, List). From b9884de1d0afa7a3e244135b3b9eed60adaede64 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 10 Feb 2022 16:09:17 +0800 Subject: [PATCH 4/5] fix: fix proper test error --- apps/emqx/test/emqx_proper_types.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 55acf004c..5ddaa4d0d 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -113,8 +113,7 @@ sessioninfo() -> awaiting_rel(), % awaiting_rel non_neg_integer(), % max_awaiting_rel safty_timeout(), % await_rel_timeout - timestamp(), % created_at - latency_stats() + timestamp() % created_at }, emqx_session:info(Session)). From 361ca5be422add97e6d61ec401f7a8e8de26ed44 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 10 Feb 2022 17:40:19 +0800 Subject: [PATCH 5/5] fix(emqx_slow_subs): change rpc call to bpapi --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx/test/emqx_proper_types.erl | 24 ------------- .../emqx_slow_subs/src/emqx_slow_subs_api.erl | 30 +++++----------- .../src/proto/emqx_slow_subs_proto_v1.erl | 36 +++++++++++++++++++ 4 files changed, 45 insertions(+), 46 deletions(-) create mode 100644 apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 6b1926b30..1f040e4b3 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -16,3 +16,4 @@ {emqx_statsd,1}. {emqx_telemetry,1}. {emqx_topic_metrics,1}. +{emqx_slow_subs,1}. diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 5ddaa4d0d..4f54b9fed 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -337,30 +337,6 @@ normal_topic_filter() -> end end). -%% Type defined emqx_message_lantency_stats.erl - stats() -latency_stats() -> - Keys = [{threshold, number()}, - {ema, exp_moving_average()}, - {last_update_time, non_neg_integer()}, - {last_access_time, non_neg_integer()}, - {last_insert_value, non_neg_integer()} - ], - ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())}, - begin - maps:merge(maps:from_list(Ks), M) - end). - -%% Type defined emqx_moving_average.erl - ema() -exp_moving_average() -> - Keys = [{type, exponential}, - {average, number()}, - {coefficient, float()} - ], - ?LET({Ks, M}, {Keys, map(limited_atom(), limited_any_term())}, - begin - maps:merge(maps:from_list(Ks), M) - end). - %%-------------------------------------------------------------------- %% Basic Types %%-------------------------------------------------------------------- diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 271afa626..99adf1e53 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -90,24 +90,15 @@ conf_schema() -> hoconsc:mk(Ref, #{}). slow_subs(delete, _) -> - Nodes = mria_mnesia:running_nodes(), - _ = [rpc_call(Node, emqx_slow_subs, clear_history, [], ok, ?DEFAULT_RPC_TIMEOUT) - || Node <- Nodes], + _ = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:clear_history(Nodes) end), {204}; slow_subs(get, _) -> - Nodes = mria_mnesia:running_nodes(), - Fun = fun(Node, Acc) -> - NodeRankL = rpc_call(Node, - ?MODULE, - get_history, - [], - [], - ?DEFAULT_RPC_TIMEOUT), - NodeRankL ++ Acc + NodeRankL = rpc_call(fun(Nodes) -> emqx_slow_subs_proto_v1:get_history(Nodes) end), + Fun = fun({ok, L}, Acc) -> L ++ Acc; + (_, Acc) -> Acc end, - - RankL = lists:foldl(Fun, [], Nodes), + RankL = lists:foldl(Fun, [], NodeRankL), SortFun = fun(#{timespan := A}, #{timespan := B}) -> A > B @@ -141,11 +132,6 @@ settings(put, #{body := Body}) -> _ = emqx_slow_subs:update_settings(Body), {200, emqx:get_raw_config([slow_subs], #{})}. -rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() -> - erlang:apply(M, F, A); - -rpc_call(Node, M, F, A, ErrorR, T) -> - case rpc:call(Node, M, F, A, T) of - {badrpc, _} -> ErrorR; - Res -> Res - end. +rpc_call(Fun) -> + Nodes = mria_mnesia:running_nodes(), + Fun(Nodes). diff --git a/apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl b/apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl new file mode 100644 index 000000000..2e6fc7044 --- /dev/null +++ b/apps/emqx_slow_subs/src/proto/emqx_slow_subs_proto_v1.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_slow_subs_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([clear_history/1, get_history/1]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.0.0". + +-spec clear_history([node()]) -> emqx_rpc:erpc_multicall(map()). +clear_history(Nodes) -> + erpc:multicall(Nodes, emqx_slow_subs, clear_history, []). + +-spec get_history([node()]) -> emqx_rpc:erpc_multicall(map()). +get_history(Nodes) -> + erpc:multicall(Nodes, emqx_slow_subs_api, get_history, []).