diff --git a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl index 50769a8fc..e8d18fca7 100644 --- a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl +++ b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs.erl @@ -24,7 +24,7 @@ -logger_header("[SLOW Subs]"). --export([ start_link/1, on_delivery_completed/3, enable/0 +-export([ start_link/1, on_delivery_completed/4, enable/0 , disable/0, clear_history/0, init_tab/0 ]). @@ -52,7 +52,7 @@ | internal %% timespan from message in to deliver | response. %% timespan from delivery to client response --type stats_update_args() :: #{ clientid := emqx_types:clientid()}. +-type stats_update_args() :: #{session_birth_time := pos_integer()}. -type stats_update_env() :: #{ threshold := non_neg_integer() , stats_type := stats_type() @@ -81,18 +81,20 @@ start_link(Env) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). -on_delivery_completed(#message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg) +on_delivery_completed(_ClientInfo, #message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg) when Ts =< BirthTime -> ok; -on_delivery_completed(Msg, Env, Cfg) -> - on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg). +on_delivery_completed(ClientInfo, Msg, Env, Cfg) -> + on_delivery_completed(ClientInfo, Msg, Env, erlang:system_time(millisecond), Cfg). -on_delivery_completed(#message{topic = Topic} = Msg, - #{clientid := ClientId}, - Now, #{threshold := Threshold, - stats_type := StatsType, - max_size := MaxSize}) -> +on_delivery_completed(#{clientid := ClientId}, + #message{topic = Topic} = Msg, + _Env, + Now, + #{threshold := Threshold, + stats_type := StatsType, + max_size := MaxSize}) -> TimeSpan = calc_timespan(StatsType, Msg, Now), case TimeSpan =< Threshold of true -> ok; @@ -191,7 +193,7 @@ load(Cfg) -> StatsType = get_value(stats_type, Cfg), Threshold = get_value(threshold, Cfg), _ = emqx:hook('delivery.completed', - fun ?MODULE:on_delivery_completed/3, + fun ?MODULE:on_delivery_completed/4, [#{max_size => MaxSize, stats_type => StatsType, threshold => Threshold @@ -199,7 +201,7 @@ load(Cfg) -> ok. unload() -> - emqx:unhook('delivery.completed', fun ?MODULE:on_delivery_completed/3 ), + emqx:unhook('delivery.completed', fun ?MODULE:on_delivery_completed/4 ), do_clear_history(). do_clear(Cfg, Logs) -> @@ -266,7 +268,7 @@ find_last_update_value(Id) -> 0 end. --spec update_topk(non_neg_integer(), non_neg_integer(), non_neg_integer(), integer()) -> true. +-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true. update_topk(Now, LastUpdateValue, TimeSpan, Id) -> %% update record ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id), diff --git a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl index 23d39273d..26b70cb71 100644 --- a/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl +++ b/apps/emqx_plugin_libs/src/emqx_slow_subs/emqx_slow_subs_api.erl @@ -93,6 +93,6 @@ rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() -> rpc_call(Node, M, F, A, ErrorR, T) -> case rpc:call(Node, M, F, A, T) of - {badrpc, _} -> ErrorR; - Res -> Res + {badrpc, _} -> ErrorR; + Res -> Res end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 7d903d77d..2e2a4dafe 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -335,7 +335,7 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - on_delivery_completed(Msg, Session), + on_delivery_completed(ClientInfo, Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1})); {value, _Other} -> @@ -393,7 +393,7 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Pubrel, Msg}} when is_record(Pubrel, pubrel_await) -> - on_delivery_completed(Msg, Session), + on_delivery_completed(ClientInfo, Msg, Session), Inflight1 = emqx_inflight:delete(PacketId, Inflight), dequeue(ClientInfo, Session#session{inflight = Inflight1}); {value, _Other} -> @@ -459,8 +459,8 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) -> do_deliver(ClientInfo, More, [Publish | Acc], Session1) end. -deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> - on_delivery_completed(Msg, Session), +deliver_msg(ClientInfo, Msg = #message{qos = ?QOS_0}, Session) -> + on_delivery_completed(ClientInfo, Msg, Session), {ok, [{undefined, maybe_ack(Msg)}], Session}; deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session = @@ -712,16 +712,16 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %%-------------------------------------------------------------------- %% Message Latency Stats %%-------------------------------------------------------------------- -on_delivery_completed(Msg, - #session{clientid = ClientId, - created_at = CreateAt}) when is_record(Msg, message) -> +on_delivery_completed(ClientInfo, + Msg, + #session{created_at = CreateAt}) when is_record(Msg, message) -> emqx:run_hook('delivery.completed', - [Msg, #{ session_birth_time => CreateAt - , clientid => ClientId - }]); + [ClientInfo, Msg, #{session_birth_time => CreateAt}]); -%% in 4.4.0, timestamp are stored in pubrel_await, not message -on_delivery_completed(_Ts, _Session) -> +%% Hot upgrade compatibility clause. +%% In the 4.4.0, timestamp are stored in pubrel_await, not a message record. +%% This clause should be kept in all 4.4.x versions. +on_delivery_completed(_ClientInfo, _Ts, _Session) -> ok. mark_begin_deliver(Msg) ->