diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 460e94014..5e65b67e5 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -134,10 +134,12 @@ latency_stats :: emqx_message_latency_stats:stats() }). -%% in the previous code, we will replace the message record with the pubrel atom -%% in the pubrec function, this will lose the creation time of the message, -%% but now we need this time to calculate latency, so now pubrel atom is changed to this record --record(pubrel_await, {timestamp :: non_neg_integer()}). + +-type inflight_data_phase() :: wait_ack | wait_comp. + +-record(inflight_data, { phase :: inflight_data_phase() + , message :: emqx_types:message() + , timestamp :: non_neg_integer()}). -type(session() :: #session{}). @@ -168,7 +170,6 @@ , next_pkt_id , awaiting_rel_cnt , awaiting_rel_max - , latency_stats ]). -define(DEFAULT_BATCH_N, 1000). @@ -380,11 +381,11 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, | {error, emqx_types:reason_code()}). puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {Msg, _Ts}} when is_record(Msg, message) -> + {value, #inflight_data{phase = wait_ack, message = Msg}} -> + on_delivery_completed(Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - Session2 = update_latency(Msg, Session), - return_with(Msg, dequeue(ClientInfo, Session2#session{inflight = Inflight1})); - {value, {_Pubrel, _Ts}} -> + return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1})); + {value, _} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -404,11 +405,11 @@ return_with(Msg, {ok, Publishes, Session}) -> | {error, emqx_types:reason_code()}). pubrec(_ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {Msg, _Ts}} when is_record(Msg, message) -> - Update = with_ts(#pubrel_await{timestamp = Msg#message.timestamp}), + {value, #inflight_data{phase = wait_ack, message = Msg} = Data} -> + Update = Data#inflight_data{phase = wait_comp}, Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; - {value, {_Pubrel, _Ts}} -> + {value, _} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} @@ -437,10 +438,10 @@ pubrel(_ClientInfo, PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> | {error, emqx_types:reason_code()}). pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of - {value, {Pubrel, _Ts}} when is_record(Pubrel, pubrel_await) -> - Session2 = update_latency(Pubrel, Session), + {value, #inflight_data{phase = wait_comp, message = Msg}} -> + on_delivery_completed(Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(ClientInfo, Session2#session{inflight = Inflight1}); + dequeue(ClientInfo, Session#session{inflight = Inflight1}); {value, _Other} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> @@ -504,6 +505,7 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) -> end. deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> + on_delivery_completed(Msg, Session), % {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = @@ -518,7 +520,8 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = {ok, Session1}; false -> Publish = {PacketId, maybe_ack(Msg)}, - Session1 = await(PacketId, Msg, Session), + Msg2 = mark_begin_deliver(Msg), + Session1 = await(PacketId, Msg2, Session), {ok, [Publish], next_pkt_id(Session1)} end. @@ -621,29 +624,29 @@ retry(ClientInfo, Session = #session{inflight = Inflight, retry_interval = Retry true -> {ok, Session}; false -> Now = erlang:system_time(millisecond), - Session2 = check_expire_latency(Now, RetryInterval, Session), - retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), [], Now, - Session2, ClientInfo) + retry_delivery(emqx_inflight:to_list(fun sort_fun/2, Inflight), + [], Now, Session, ClientInfo) end. retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}, _ClientInfo) -> {ok, lists:reverse(Acc), Interval, Session}; -retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = - #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) -> +retry_delivery([{PacketId, #inflight_data{timestamp = Ts} = Data} | More], + Acc, Now, Session = #session{retry_interval = Interval, inflight = Inflight}, ClientInfo) -> case (Age = age(Now, Ts)) >= Interval of true -> - {Acc1, Inflight1} = do_retry_delivery(PacketId, Msg, Now, Acc, Inflight, ClientInfo), + {Acc1, Inflight1} = do_retry_delivery(PacketId, Data, Now, Acc, Inflight, ClientInfo), retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}, ClientInfo); false -> {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. -do_retry_delivery(PacketId, pubrel, Now, Acc, Inflight, _) -> - Inflight1 = emqx_inflight:update(PacketId, {pubrel, Now}, Inflight), - {[{pubrel, PacketId}|Acc], Inflight1}; +do_retry_delivery(PacketId, Data, Now, Acc, Inflight, _) -> + Update = Data#inflight_data{timestamp = Now}, + Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), + {[{pubrel, PacketId} | Acc], Inflight1}; -do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) -> +do_retry_delivery(PacketId, #inflight_data{phase = wait_ack, message = Msg} = Data, Now, Acc, Inflight, ClientInfo) -> case emqx_message:is_expired(Msg) of true -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), @@ -651,8 +654,9 @@ do_retry_delivery(PacketId, #message{} = Msg, Now, Acc, Inflight, ClientInfo) -> {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), - Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight), - {[{PacketId, Msg1}|Acc], Inflight1} + Update = Data#inflight_data{message = Msg1, timestamp = Now}, + Inflight1 = emqx_inflight:update(PacketId, Update, Inflight), + {[{PacketId, Msg1} | Acc], Inflight1} end. %%-------------------------------------------------------------------- @@ -697,9 +701,9 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec(replay(emqx_types:clientinfo(), session()) -> {ok, replies(), session()}). replay(ClientInfo, Session = #session{inflight = Inflight}) -> - Pubs = lists:map(fun({PacketId, {Pubrel, _Ts}}) when is_record(Pubrel, pubrel_await) -> + Pubs = lists:map(fun({PacketId, #inflight_data{phase = wait_comp}}) -> {pubrel, PacketId}; - ({PacketId, {Msg, _Ts}}) -> + ({PacketId, #inflight_data{message = Msg}}) -> {PacketId, emqx_message:set_flag(dup, true, Msg)} end, emqx_inflight:to_list(Inflight)), case dequeue(ClientInfo, Session) of @@ -755,37 +759,23 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %%-------------------------------------------------------------------- %% Message Latency Stats %%-------------------------------------------------------------------- -update_latency(Msg, - #session{clientid = ClientId, - latency_stats = Stats, - created_at = CreateAt} = S) -> - case get_birth_timestamp(Msg, CreateAt) of - 0 -> S; - Ts -> - Latency = erlang:system_time(millisecond) - Ts, - Stats2 = emqx_message_latency_stats:update(ClientId, Latency, Stats), - S#session{latency_stats = Stats2} - end. +on_delivery_completed(Msg, + #session{created_at = CreateAt, clientid = ClientId}) -> + emqx:run_hook('delivery.completed', + [Msg, + #{session_birth_time => CreateAt, clientid => ClientId}]). -check_expire_latency(Now, Interval, - #session{clientid = ClientId, latency_stats = Stats} = S) -> - Stats2 = emqx_message_latency_stats:check_expire(ClientId, Now, Interval, Stats), - S#session{latency_stats = Stats2}. - -get_birth_timestamp(#message{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> - Ts; - -get_birth_timestamp(#pubrel_await{timestamp = Ts}, CreateAt) when CreateAt =< Ts -> - Ts; - -get_birth_timestamp(_, _) -> - 0. +mark_begin_deliver(Msg) -> + emqx_message:set_header(deliver_begin_at, erlang:system_time(second), Msg). %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- -sort_fun() -> - fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 =< Ts2 end. + +-compile({inline, [sort_fun/2, batch_n/1, with_ts/1, age/2]}). + +sort_fun(A, B) -> + A#inflight_data.timestamp =< B#inflight_data.timestamp. batch_n(Inflight) -> case emqx_inflight:max_size(Inflight) of @@ -794,7 +784,9 @@ batch_n(Inflight) -> end. with_ts(Msg) -> - {Msg, erlang:system_time(millisecond)}. + #inflight_data{phase = wait_ack, + message = Msg, + timestamp = erlang:system_time(millisecond)}. age(Now, Ts) -> Now - Ts. diff --git a/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl b/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl deleted file mode 100644 index 68927f5e0..000000000 --- a/apps/emqx/src/emqx_slow_subs/emqx_message_latency_stats.erl +++ /dev/null @@ -1,120 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_message_latency_stats). - -%% API --export([new/1, update/3, check_expire/4, latency/1]). - --export([get_threshold/0, update_threshold/1]). - --define(NOW, erlang:system_time(millisecond)). --define(MINIMUM_INSERT_INTERVAL, 1000). --define(MINIMUM_THRESHOLD, 100). --define(DEFAULT_THRESHOLD, 500). --define(DEFAULT_SAMPLES, 10). --define(THRESHOLD_KEY, {?MODULE, threshold}). - --opaque stats() :: #{ ema := emqx_moving_average:ema() - , last_update_time := timestamp() - , last_access_time := timestamp() %% timestamp of last access top-k - , last_insert_value := non_neg_integer() - }. - --type timestamp() :: non_neg_integer(). --type timespan() :: number(). - --type latency_type() :: average - | expire. - --type create_options() :: #{samples => pos_integer()}. - --export_type([stats/0, latency_type/0, create_options/0]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- --spec new(non_neg_integer() | create_options()) -> stats(). -new(SamplesT) when is_integer(SamplesT) -> - Samples = erlang:max(1, SamplesT), - #{ ema => emqx_moving_average:new(exponential, #{period => Samples}) - , last_update_time => 0 - , last_access_time => 0 - , last_insert_value => 0 - }; - -new(OptsT) -> - Opts = maps:merge(#{samples => ?DEFAULT_SAMPLES}, OptsT), - #{samples := Samples} = Opts, - new(Samples). - --spec update(emqx_types:clientid(), number(), stats()) -> stats(). -update(ClientId, Val, #{ema := EMA} = Stats) -> - Now = ?NOW, - #{average := Latency} = EMA2 = emqx_moving_average:update(Val, EMA), - Stats2 = call_hook(ClientId, Now, average, Latency, Stats), - Stats2#{ ema := EMA2 - , last_update_time := ?NOW}. - --spec check_expire(emqx_types:clientid(), timestamp(), timespan(), stats()) -> stats(). -check_expire(_, Now, Interval, #{last_update_time := LUT} = S) - when LUT >= Now - Interval -> - S; - -check_expire(ClientId, Now, _Interval, #{last_update_time := LUT} = S) -> - Latency = Now - LUT, - call_hook(ClientId, Now, expire, Latency, S). - --spec latency(stats()) -> number(). -latency(#{ema := #{average := Average}}) -> - Average. - --spec update_threshold(pos_integer()) -> pos_integer(). -update_threshold(Threshold) -> - Val = erlang:max(Threshold, ?MINIMUM_THRESHOLD), - persistent_term:put(?THRESHOLD_KEY, Val), - Val. - -get_threshold() -> - persistent_term:get(?THRESHOLD_KEY, ?DEFAULT_THRESHOLD). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- --spec call_hook(emqx_types:clientid(), timestamp(), latency_type(), timespan(), stats()) -> stats(). -call_hook(_, _, _, Latency, S) - when Latency =< ?MINIMUM_THRESHOLD -> - S; - -call_hook(_, Now, _, _, #{last_access_time := LIT} = S) - when Now =< LIT + ?MINIMUM_INSERT_INTERVAL -> - S; - -call_hook(ClientId, Now, Type, Latency, #{last_insert_value := LIV} = Stats) -> - case Latency =< get_threshold() of - true -> - Stats#{last_access_time := Now}; - _ -> - ToInsert = erlang:floor(Latency), - Arg = #{clientid => ClientId, - latency => ToInsert, - type => Type, - last_insert_value => LIV, - update_time => Now}, - emqx:run_hook('message.slow_subs_stats', [Arg]), - Stats#{last_insert_value := ToInsert, - last_access_time := Now} - end. diff --git a/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl b/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl deleted file mode 100644 index 7f8b037f6..000000000 --- a/apps/emqx/src/emqx_slow_subs/emqx_moving_average.erl +++ /dev/null @@ -1,90 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @see https://en.wikipedia.org/wiki/Moving_average - --module(emqx_moving_average). - -%% API --export([new/0, new/1, new/2, update/2]). - --type type() :: cumulative - | exponential. - --type ema() :: #{ type := exponential - , average := 0 | float() - , coefficient := float() - }. - --type cma() :: #{ type := cumulative - , average := 0 | float() - , count := non_neg_integer() - }. - --type moving_average() :: ema() - | cma(). - --define(DEF_EMA_ARG, #{period => 10}). --define(DEF_AVG_TYPE, exponential). - --export_type([type/0, moving_average/0, ema/0, cma/0]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- --spec new() -> moving_average(). -new() -> - new(?DEF_AVG_TYPE, #{}). - --spec new(type()) -> moving_average(). -new(Type) -> - new(Type, #{}). - --spec new(type(), Args :: map()) -> moving_average(). -new(cumulative, _) -> - #{ type => cumulative - , average => 0 - , count => 0 - }; - -new(exponential, Arg) -> - #{period := Period} = maps:merge(?DEF_EMA_ARG, Arg), - #{ type => exponential - , average => 0 - %% coefficient = 2/(N+1) is a common convention, see the wiki link for details - , coefficient => 2 / (Period + 1) - }. - --spec update(number(), moving_average()) -> moving_average(). - -update(Val, #{average := 0} = Avg) -> - Avg#{average := Val}; - -update(Val, #{ type := cumulative - , average := Average - , count := Count} = CMA) -> - NewCount = Count + 1, - CMA#{average := (Count * Average + Val) / NewCount, - count := NewCount}; - -update(Val, #{ type := exponential - , average := Average - , coefficient := Coefficient} = EMA) -> - EMA#{average := Coefficient * Val + (1 - Coefficient) * Average}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- diff --git a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf index 8378e971c..db7f1594d 100644 --- a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf +++ b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf @@ -20,21 +20,16 @@ slow_subs { ## Value: 10 top_k_num = 10 - ## The interval for pushing statistics table records to the system topic. When set to 0, push is disabled - ## publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval - ## publish is disabled if set to 0s. + ## The ways to calculate the latency are as follows: ## - ## Value: 0s - notice_interval = 0s - - ## QoS of notification message + ## 1. whole + ## From the time the message arrives at EMQX until the message completes transmission ## - ## Default: 0 - notice_qos = 0 - - ## Maximum information number in one notification + ## 2.internal + ## From when the message arrives at EMQX until when EMQX starts delivering the message ## - ## Default: 100 - notice_batch_size = 100 - + ## 3.response + ## From the time EMQX starts delivering the message, until the message completes transmission + ## Default: whole + stats_type = whole } diff --git a/apps/emqx_slow_subs/include/emqx_slow_subs.hrl b/apps/emqx_slow_subs/include/emqx_slow_subs.hrl index 2fb7cdfd3..f282037dc 100644 --- a/apps/emqx_slow_subs/include/emqx_slow_subs.hrl +++ b/apps/emqx_slow_subs/include/emqx_slow_subs.hrl @@ -15,16 +15,24 @@ %%-------------------------------------------------------------------- -define(TOPK_TAB, emqx_slow_subs_topk). +-define(INDEX_TAB, emqx_slow_subs_index). --define(INDEX(Latency, ClientId), {Latency, ClientId}). +-define(ID(ClientId, Topic), {ClientId, Topic}). +-define(INDEX(TimeSpan, Id), {Id, TimeSpan}). +-define(TOPK_INDEX(TimeSpan, Id), {TimeSpan, Id}). --define(MAX_TAB_SIZE, 1000). +-define(MAX_SIZE, 1000). --record(top_k, { index :: index() - , type :: emqx_message_latency_stats:latency_type() +-record(top_k, { index :: topk_index() , last_update_time :: pos_integer() , extra = [] }). +-record(index_tab, { index :: index()}). + -type top_k() :: #top_k{}. --type index() :: ?INDEX(non_neg_integer(), emqx_types:clientid()). +-type index_tab() :: #index_tab{}. + +-type id() :: {emqx_types:clientid(), emqx_types:topic()}. +-type index() :: ?INDEX(non_neg_integer(), id()). +-type topk_index() :: ?TOPK_INDEX(non_neg_integer(), id()). diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index c8ef8a5d2..2ae8c0b31 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -22,8 +22,8 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl"). --export([ start_link/0, on_stats_update/2, update_settings/1 - , clear_history/0, init_topk_tab/0, post_config_update/5 +-export([ start_link/0, on_delivery_completed/4, update_settings/1 + , clear_history/0, init_tab/0, post_config_update/5 ]). %% gen_server callbacks @@ -40,29 +40,19 @@ -type state() :: #{ enable := boolean() , last_tick_at := pos_integer() , expire_timer := undefined | reference() - , notice_timer := undefined | reference() }. --type log() :: #{ rank := pos_integer() - , clientid := emqx_types:clientid() - , latency := non_neg_integer() - , type := emqx_message_latency_stats:latency_type() - }. - --type window_log() :: #{ last_tick_at := pos_integer() - , logs := [log()] - }. - -type message() :: #message{}. --type stats_update_args() :: #{ clientid := emqx_types:clientid() - , latency := non_neg_integer() - , type := emqx_message_latency_stats:latency_type() - , last_insert_value := non_neg_integer() - , update_time := timer:time() - }. +-type stats_type() :: whole %% whole = internal + response + | internal %% timespan from message in to deliver + | response. %% timespan from delivery to client response --type stats_update_env() :: #{max_size := pos_integer()}. +-type stats_update_args() :: #{session_birth_time := pos_integer()}. + +-type stats_update_env() :: #{ threshold := non_neg_integer() + , stats_type := stats_type() + , max_size := pos_integer()}. -ifdef(TEST). -define(EXPIRE_CHECK_INTERVAL, timer:seconds(1)). @@ -71,7 +61,6 @@ -endif. -define(NOW, erlang:system_time(millisecond)). --define(NOTICE_TOPIC_NAME, "slow_subs"). -define(DEF_CALL_TIMEOUT, timer:seconds(10)). %% erlang term order @@ -88,37 +77,32 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -%% XXX NOTE:pay attention to the performance here --spec on_stats_update(stats_update_args(), stats_update_env()) -> true. -on_stats_update(#{clientid := ClientId, - latency := Latency, - type := Type, - last_insert_value := LIV, - update_time := Ts}, - #{max_size := MaxSize}) -> +on_delivery_completed(_ClientInfo, + #message{timestamp = Ts}, + #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> + ok; - LastIndex = ?INDEX(LIV, ClientId), - Index = ?INDEX(Latency, ClientId), +on_delivery_completed(ClientInfo, Msg, Env, Cfg) -> + on_delivery_completed(ClientInfo, Msg, Env, erlang:system_time(millisecond), Cfg). - %% check whether the client is in the table - case ets:lookup(?TOPK_TAB, LastIndex) of - [#top_k{index = Index}] -> - %% if last value == the new value, update the type and last_update_time - %% XXX for clients whose latency are stable for a long time, is it - %% possible to reduce updates? - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}); - [_] -> - %% if Latency > minimum value, we should update it - %% if Latency < minimum value, maybe it can replace the minimum value - %% so always update at here - %% do we need check if Latency == minimum ??? - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}), - ets:delete(?TOPK_TAB, LastIndex); - [] -> - %% try to insert - try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) +on_delivery_completed(#{clientid := ClientId}, + #message{topic = Topic} = Msg, + _Env, + Now, + #{threshold := Threshold, + stats_type := StatsType, + max_size := MaxSize}) -> + TimeSpan = calc_timespan(StatsType, Msg, Now), + case TimeSpan =< Threshold of + true -> ok; + _ -> + Id = ?ID(ClientId, Topic), + LastUpdateValue = find_last_update_value(Id), + case TimeSpan =< LastUpdateValue of + true -> ok; + _ -> + try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) + end end. clear_history() -> @@ -127,21 +111,20 @@ clear_history() -> update_settings(Conf) -> emqx_conf:update([slow_subs], Conf, #{override_to => cluster}). -init_topk_tab() -> - case ets:whereis(?TOPK_TAB) of - undefined -> - ?TOPK_TAB = ets:new(?TOPK_TAB, - [ ordered_set, public, named_table - , {keypos, #top_k.index}, {write_concurrency, true} - , {read_concurrency, true} - ]); - _ -> - ?TOPK_TAB - end. - post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) -> gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT). +init_tab() -> + safe_create_tab(?TOPK_TAB, [ ordered_set, public, named_table + , {keypos, #top_k.index}, {write_concurrency, true} + , {read_concurrency, true} + ]), + + safe_create_tab(?INDEX_TAB, [ ordered_set, public, named_table + , {keypos, #index_tab.index}, {write_concurrency, true} + , {read_concurrency, true} + ]). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -151,8 +134,7 @@ init([]) -> InitState = #{enable => false, last_tick_at => 0, - expire_timer => undefined, - notice_timer => undefined + expire_timer => undefined }, Enable = emqx:get_config([slow_subs, enable]), @@ -164,7 +146,7 @@ handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) -> {reply, ok, State2}; handle_call(clear_history, _, State) -> - ets:delete_all_objects(?TOPK_TAB), + do_clear_history(), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -181,12 +163,6 @@ handle_info(expire_tick, State) -> State1 = start_timer(expire_timer, fun expire_tick/0, State), {noreply, State1}; -handle_info(notice_tick, State) -> - Logs = ets:tab2list(?TOPK_TAB), - do_notification(Logs, State), - State1 = start_timer(notice_timer, fun notice_tick/0, State), - {noreply, State1#{last_tick_at := ?NOW}}; - handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -204,123 +180,127 @@ code_change(_OldVsn, State, _Extra) -> expire_tick() -> erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). -notice_tick() -> - case emqx:get_config([slow_subs, notice_interval]) of - 0 -> undefined; - Interval -> - erlang:send_after(Interval, self(), ?FUNCTION_NAME) - end. - --spec do_notification(list(), state()) -> ok. -do_notification([], _) -> - ok; - -do_notification(Logs, #{last_tick_at := LastTickTime}) -> - start_publish(Logs, LastTickTime), - ok. - -start_publish(Logs, TickTime) -> - emqx_pool:async_submit({fun do_publish/3, [Logs, erlang:length(Logs), TickTime]}). - -do_publish([], _, _) -> - ok; - -do_publish(Logs, Rank, TickTime) -> - BatchSize = emqx:get_config([slow_subs, notice_batch_size]), - do_publish(Logs, BatchSize, Rank, TickTime, []). - -do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 -> - Cache2 = [convert_to_notice(Rank, Log) | Cache], - do_publish(T, Size - 1, Rank - 1, TickTime, Cache2); - -do_publish(Logs, Size, Rank, TickTime, Cache) when Size =:= 0 -> - publish(TickTime, Cache), - do_publish(Logs, Rank, TickTime); - -do_publish([], _, _Rank, TickTime, Cache) -> - publish(TickTime, Cache), - ok. - -convert_to_notice(Rank, #top_k{index = ?INDEX(Latency, ClientId), - type = Type, - last_update_time = Ts}) -> - #{rank => Rank, - clientid => ClientId, - latency => Latency, - type => Type, - timestamp => Ts}. - -publish(TickTime, Notices) -> - WindowLog = #{last_tick_at => TickTime, - logs => lists:reverse(Notices)}, - Payload = emqx_json:encode(WindowLog), - Msg = #message{ id = emqx_guid:gen() - , qos = emqx:get_config([slow_subs, notice_qos]) - , from = ?MODULE - , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME) - , payload = Payload - , timestamp = ?NOW - }, - _ = emqx_broker:safe_publish(Msg), - ok. - load(State) -> - MaxSizeT = emqx:get_config([slow_subs, top_k_num]), - MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE), - _ = emqx:hook('message.slow_subs_stats', - {?MODULE, on_stats_update, [#{max_size => MaxSize}]} - ), + #{top_k_num := MaxSizeT, + stats_type := StatsType, + threshold := Threshold} = emqx:get_config([slow_subs]), + MaxSize = erlang:min(MaxSizeT, ?MAX_SIZE), + _ = emqx:hook('delivery.completed', + {?MODULE, on_delivery_completed, + [#{max_size => MaxSize, + stats_type => StatsType, + threshold => Threshold + }]}), - State1 = start_timer(notice_timer, fun notice_tick/0, State), - State2 = start_timer(expire_timer, fun expire_tick/0, State1), - State2#{enable := true, last_tick_at => ?NOW}. + State1 = start_timer(expire_timer, fun expire_tick/0, State), + State1#{enable := true, last_tick_at => ?NOW}. - -unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) -> - emqx:unhook('message.slow_subs_stats', {?MODULE, on_stats_update}), - State#{notice_timer := cancel_timer(NoticeTimer), - expire_timer := cancel_timer(ExpireTimer) - }. +unload(#{expire_timer := ExpireTimer} = State) -> + emqx:unhook('delivery.completed', {?MODULE, on_delivery_completed}), + State#{expire_timer := cancel_timer(ExpireTimer)}. do_clear(Logs) -> Now = ?NOW, Interval = emqx:get_config([slow_subs, expire_interval]), - Each = fun(#top_k{index = Index, last_update_time = Ts}) -> + Each = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, Id), last_update_time = Ts}) -> case Now - Ts >= Interval of true -> - ets:delete(?TOPK_TAB, Index); + delete_with_index(TimeSpan, Id); _ -> true - end + end end, lists:foreach(Each, Logs). -try_insert_to_topk(MaxSize, Index, Latency, Type, Ts) -> +-spec calc_timespan(stats_type(), emqx_types:message(), non_neg_integer()) -> non_neg_integer(). +calc_timespan(whole, #message{timestamp = Ts}, Now) -> + Now - Ts; + +calc_timespan(internal, #message{timestamp = Ts} = Msg, Now) -> + End = emqx_message:get_header(deliver_begin_at, Msg, Now), + End - Ts; + +calc_timespan(response, Msg, Now) -> + Begin = emqx_message:get_header(deliver_begin_at, Msg, Now), + Now - Begin. + +%% update_topk is safe, because each process has a unique clientid +%% insert or delete are bind to this clientid, so there is no race condition +%% +%% but, the delete_with_index in L249 may have a race condition +%% because the data belong to other clientid will be deleted here +%% (deleted the data written by other processes). +%% so it may appear that: +%% when deleting a record, the other process is performing an update operation on this recrod +%% in order to solve this race condition problem, the index table also uses the ordered_set type, +%% so that even if the above situation occurs, it will only cause the old data to be deleted twice +%% and the correctness of the data will not be affected + +try_insert_to_topk(MaxSize, Now, LastUpdateValue, TimeSpan, Id) -> case ets:info(?TOPK_TAB, size) of Size when Size < MaxSize -> - %% if the size is under limit, insert it directly - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}); + update_topk(Now, LastUpdateValue, TimeSpan, Id); _Size -> - %% find the minimum value - ?INDEX(Min, _) = First = - case ets:first(?TOPK_TAB) of - ?INDEX(_, _) = I -> I; - _ -> ?INDEX(Latency - 1, <<>>) - end, - - case Latency =< Min of - true -> true; - _ -> - ets:insert(?TOPK_TAB, - #top_k{index = Index, type = Type, last_update_time = Ts}), - - ets:delete(?TOPK_TAB, First) + case ets:first(?TOPK_TAB) of + '$end_of_table' -> + update_topk(Now, LastUpdateValue, TimeSpan, Id); + ?TOPK_INDEX(_, Id) -> + update_topk(Now, LastUpdateValue, TimeSpan, Id); + ?TOPK_INDEX(Min, MinId) -> + case TimeSpan =< Min of + true -> false; + _ -> + update_topk(Now, LastUpdateValue, TimeSpan, Id), + delete_with_index(Min, MinId) + end end end. + +-spec find_last_update_value(id()) -> non_neg_integer(). +find_last_update_value(Id) -> + case ets:next(?INDEX_TAB, ?INDEX(0, Id)) of + ?INDEX(LastUpdateValue, Id) -> + LastUpdateValue; + _ -> + 0 + end. + +-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true. +update_topk(Now, LastUpdateValue, TimeSpan, Id) -> + %% update record + ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id), + last_update_time = Now, + extra = [] + }), + + %% update index + ets:insert(?INDEX_TAB, #index_tab{index = ?INDEX(TimeSpan, Id)}), + + %% delete the old record & index + delete_with_index(LastUpdateValue, Id). + +-spec delete_with_index(non_neg_integer(), id()) -> true. +delete_with_index(0, _) -> + true; + +delete_with_index(TimeSpan, Id) -> + ets:delete(?INDEX_TAB, ?INDEX(TimeSpan, Id)), + ets:delete(?TOPK_TAB, ?TOPK_INDEX(TimeSpan, Id)). + +safe_create_tab(Name, Opts) -> + case ets:whereis(Name) of + undefined -> + Name = ets:new(Name, Opts); + _ -> + Name + end. + +do_clear_history() -> + ets:delete_all_objects(?INDEX_TAB), + ets:delete_all_objects(?TOPK_TAB). + check_enable(Enable, #{enable := IsEnable} = State) -> - update_threshold(), case Enable of IsEnable -> State; @@ -330,11 +310,6 @@ check_enable(Enable, #{enable := IsEnable} = State) -> unload(State) end. -update_threshold() -> - Threshold = emqx:get_config([slow_subs, threshold]), - emqx_message_latency_stats:update_threshold(Threshold), - ok. - start_timer(Name, Fun, State) -> _ = cancel_timer(maps:get(Name, State)), State#{Name := Fun()}. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index ee2016268..271afa626 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -23,14 +23,14 @@ -export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]). --export([slow_subs/2, encode_record/1, settings/2]). +-export([slow_subs/2, get_history/0, settings/2]). -import(hoconsc, [mk/2, ref/1]). -import(emqx_mgmt_util, [bad_request/0]). --define(FORMAT_FUN, {?MODULE, encode_record}). -define(APP, emqx_slow_subs). -define(APP_NAME, <<"emqx_slow_subs">>). +-define(DEFAULT_RPC_TIMEOUT, timer:seconds(5)). namespace() -> "slow_subscribers_statistics". @@ -74,12 +74,13 @@ schema("/slow_subscriptions/settings") -> fields(record) -> [ {clientid, mk(string(), #{desc => <<"the clientid">>})}, - {latency, + {node, + mk(string(), #{desc => <<"the node">>})}, + {topic, + mk(string(), #{desc => <<"the topic">>})}, + {timespan, mk(integer(), - #{desc => <<"average time for message delivery or time for message expire">>})}, - {type, - mk(string(), - #{desc => <<"type of the latency, could be average or expire">>})}, + #{desc => <<"timespan for message transmission">>})}, {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})} ]. @@ -89,24 +90,49 @@ conf_schema() -> hoconsc:mk(Ref, #{}). slow_subs(delete, _) -> - ok = emqx_slow_subs:clear_history(), + Nodes = mria_mnesia:running_nodes(), + _ = [rpc_call(Node, emqx_slow_subs, clear_history, [], ok, ?DEFAULT_RPC_TIMEOUT) + || Node <- Nodes], {204}; -slow_subs(get, #{query_string := QST}) -> - LimitT = maps:get(<<"limit">>, QST, ?MAX_TAB_SIZE), - Limit = erlang:min(?MAX_TAB_SIZE, emqx_mgmt_api:b2i(LimitT)), - Page = maps:get(<<"page">>, QST, 1), - QS = QST#{<<"limit">> => Limit, <<"page">> => Page}, - Data = emqx_mgmt_api:paginate({?TOPK_TAB, [{traverse, last_prev}]}, QS, ?FORMAT_FUN), - {200, Data}. +slow_subs(get, _) -> + Nodes = mria_mnesia:running_nodes(), + Fun = fun(Node, Acc) -> + NodeRankL = rpc_call(Node, + ?MODULE, + get_history, + [], + [], + ?DEFAULT_RPC_TIMEOUT), + NodeRankL ++ Acc + end, -encode_record(#top_k{index = ?INDEX(Latency, ClientId), - type = Type, - last_update_time = Ts}) -> - #{clientid => ClientId, - latency => Latency, - type => Type, - last_update_time => Ts}. + RankL = lists:foldl(Fun, [], Nodes), + + SortFun = fun(#{timespan := A}, #{timespan := B}) -> + A > B + end, + + SortedL = lists:sort(SortFun, RankL), + SortedL2 = lists:sublist(SortedL, ?MAX_SIZE), + + {200, SortedL2}. + +get_history() -> + Node = node(), + RankL = ets:tab2list(?TOPK_TAB), + ConvFun = fun(#top_k{index = ?TOPK_INDEX(TimeSpan, ?ID(ClientId, Topic)), + last_update_time = LastUpdateTime + }) -> + #{ clientid => ClientId + , node => Node + , topic => Topic + , timespan => TimeSpan + , last_update_time => LastUpdateTime + } + end, + + lists:map(ConvFun, RankL). settings(get, _) -> {200, emqx:get_raw_config([slow_subs], #{})}; @@ -114,3 +140,12 @@ settings(get, _) -> settings(put, #{body := Body}) -> _ = emqx_slow_subs:update_settings(Body), {200, emqx:get_raw_config([slow_subs], #{})}. + +rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() -> + erlang:apply(M, F, A); + +rpc_call(Node, M, F, A, ErrorR, T) -> + case rpc:call(Node, M, F, A, T) of + {badrpc, _} -> ErrorR; + Res -> Res + end. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl index 91beca4ae..d625c4f26 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -22,21 +22,10 @@ fields("slow_subs") -> sc(integer(), 10, "The maximum number of records in the slow subscription statistics record table")} - , {notice_interval, - sc(emqx_schema:duration_ms(), - "0s", - "The interval for pushing statistics table records to the system topic. " - "publish top-k list to $SYS/brokers/${node}/slow_subs per notice_interval. " - "publish is disabled if set to 0s." - )} - , {notice_qos, - sc(range(0, 2), - 0, - "QoS of notification message in notice topic")} - , {notice_batch_size, - sc(integer(), - 100, - "Maximum information number in one notification")} + , {stats_type, + sc(hoconsc:union([whole, internal, response]), + whole, + "The method to calculate the latency")} ]. %%--------------------------------------------------------------------