Merge pull request #6790 from lafirest/fix/slow_subs
fix(emqx_slow_subs): add ClientInfo into the args of the delivery.com…
This commit is contained in:
commit
e82c73988b
|
@ -24,7 +24,7 @@
|
||||||
|
|
||||||
-logger_header("[SLOW Subs]").
|
-logger_header("[SLOW Subs]").
|
||||||
|
|
||||||
-export([ start_link/1, on_delivery_completed/3, enable/0
|
-export([ start_link/1, on_delivery_completed/4, enable/0
|
||||||
, disable/0, clear_history/0, init_tab/0
|
, disable/0, clear_history/0, init_tab/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@
|
||||||
| internal %% timespan from message in to deliver
|
| internal %% timespan from message in to deliver
|
||||||
| response. %% timespan from delivery to client response
|
| response. %% timespan from delivery to client response
|
||||||
|
|
||||||
-type stats_update_args() :: #{ clientid := emqx_types:clientid()}.
|
-type stats_update_args() :: #{session_birth_time := pos_integer()}.
|
||||||
|
|
||||||
-type stats_update_env() :: #{ threshold := non_neg_integer()
|
-type stats_update_env() :: #{ threshold := non_neg_integer()
|
||||||
, stats_type := stats_type()
|
, stats_type := stats_type()
|
||||||
|
@ -81,18 +81,20 @@
|
||||||
start_link(Env) ->
|
start_link(Env) ->
|
||||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
|
||||||
|
|
||||||
on_delivery_completed(#message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg)
|
on_delivery_completed(_ClientInfo, #message{timestamp = Ts}, #{session_birth_time := BirthTime}, _Cfg)
|
||||||
when Ts =< BirthTime ->
|
when Ts =< BirthTime ->
|
||||||
ok;
|
ok;
|
||||||
|
|
||||||
on_delivery_completed(Msg, Env, Cfg) ->
|
on_delivery_completed(ClientInfo, Msg, Env, Cfg) ->
|
||||||
on_delivery_completed(Msg, Env, erlang:system_time(millisecond), Cfg).
|
on_delivery_completed(ClientInfo, Msg, Env, erlang:system_time(millisecond), Cfg).
|
||||||
|
|
||||||
on_delivery_completed(#message{topic = Topic} = Msg,
|
on_delivery_completed(#{clientid := ClientId},
|
||||||
#{clientid := ClientId},
|
#message{topic = Topic} = Msg,
|
||||||
Now, #{threshold := Threshold,
|
_Env,
|
||||||
stats_type := StatsType,
|
Now,
|
||||||
max_size := MaxSize}) ->
|
#{threshold := Threshold,
|
||||||
|
stats_type := StatsType,
|
||||||
|
max_size := MaxSize}) ->
|
||||||
TimeSpan = calc_timespan(StatsType, Msg, Now),
|
TimeSpan = calc_timespan(StatsType, Msg, Now),
|
||||||
case TimeSpan =< Threshold of
|
case TimeSpan =< Threshold of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
|
@ -191,7 +193,7 @@ load(Cfg) ->
|
||||||
StatsType = get_value(stats_type, Cfg),
|
StatsType = get_value(stats_type, Cfg),
|
||||||
Threshold = get_value(threshold, Cfg),
|
Threshold = get_value(threshold, Cfg),
|
||||||
_ = emqx:hook('delivery.completed',
|
_ = emqx:hook('delivery.completed',
|
||||||
fun ?MODULE:on_delivery_completed/3,
|
fun ?MODULE:on_delivery_completed/4,
|
||||||
[#{max_size => MaxSize,
|
[#{max_size => MaxSize,
|
||||||
stats_type => StatsType,
|
stats_type => StatsType,
|
||||||
threshold => Threshold
|
threshold => Threshold
|
||||||
|
@ -199,7 +201,7 @@ load(Cfg) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
emqx:unhook('delivery.completed', fun ?MODULE:on_delivery_completed/3 ),
|
emqx:unhook('delivery.completed', fun ?MODULE:on_delivery_completed/4 ),
|
||||||
do_clear_history().
|
do_clear_history().
|
||||||
|
|
||||||
do_clear(Cfg, Logs) ->
|
do_clear(Cfg, Logs) ->
|
||||||
|
@ -266,7 +268,7 @@ find_last_update_value(Id) ->
|
||||||
0
|
0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec update_topk(non_neg_integer(), non_neg_integer(), non_neg_integer(), integer()) -> true.
|
-spec update_topk(pos_integer(), non_neg_integer(), non_neg_integer(), id()) -> true.
|
||||||
update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
|
update_topk(Now, LastUpdateValue, TimeSpan, Id) ->
|
||||||
%% update record
|
%% update record
|
||||||
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id),
|
ets:insert(?TOPK_TAB, #top_k{index = ?TOPK_INDEX(TimeSpan, Id),
|
||||||
|
|
|
@ -93,6 +93,6 @@ rpc_call(Node, M, F, A, _ErrorR, _T) when Node =:= node() ->
|
||||||
|
|
||||||
rpc_call(Node, M, F, A, ErrorR, T) ->
|
rpc_call(Node, M, F, A, ErrorR, T) ->
|
||||||
case rpc:call(Node, M, F, A, T) of
|
case rpc:call(Node, M, F, A, T) of
|
||||||
{badrpc, _} -> ErrorR;
|
{badrpc, _} -> ErrorR;
|
||||||
Res -> Res
|
Res -> Res
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -335,7 +335,7 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
|
||||||
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
puback(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
||||||
on_delivery_completed(Msg, Session),
|
on_delivery_completed(ClientInfo, Msg, Session),
|
||||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||||
return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
|
return_with(Msg, dequeue(ClientInfo, Session#session{inflight = Inflight1}));
|
||||||
{value, _Other} ->
|
{value, _Other} ->
|
||||||
|
@ -393,7 +393,7 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
|
||||||
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
pubcomp(ClientInfo, PacketId, Session = #session{inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
{value, {Pubrel, Msg}} when is_record(Pubrel, pubrel_await) ->
|
{value, {Pubrel, Msg}} when is_record(Pubrel, pubrel_await) ->
|
||||||
on_delivery_completed(Msg, Session),
|
on_delivery_completed(ClientInfo, Msg, Session),
|
||||||
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
|
||||||
dequeue(ClientInfo, Session#session{inflight = Inflight1});
|
dequeue(ClientInfo, Session#session{inflight = Inflight1});
|
||||||
{value, _Other} ->
|
{value, _Other} ->
|
||||||
|
@ -459,8 +459,8 @@ do_deliver(ClientInfo, [Msg | More], Acc, Session) ->
|
||||||
do_deliver(ClientInfo, More, [Publish | Acc], Session1)
|
do_deliver(ClientInfo, More, [Publish | Acc], Session1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
deliver_msg(_ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
|
deliver_msg(ClientInfo, Msg = #message{qos = ?QOS_0}, Session) ->
|
||||||
on_delivery_completed(Msg, Session),
|
on_delivery_completed(ClientInfo, Msg, Session),
|
||||||
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
{ok, [{undefined, maybe_ack(Msg)}], Session};
|
||||||
|
|
||||||
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
|
deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
|
||||||
|
@ -712,16 +712,16 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Message Latency Stats
|
%% Message Latency Stats
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
on_delivery_completed(Msg,
|
on_delivery_completed(ClientInfo,
|
||||||
#session{clientid = ClientId,
|
Msg,
|
||||||
created_at = CreateAt}) when is_record(Msg, message) ->
|
#session{created_at = CreateAt}) when is_record(Msg, message) ->
|
||||||
emqx:run_hook('delivery.completed',
|
emqx:run_hook('delivery.completed',
|
||||||
[Msg, #{ session_birth_time => CreateAt
|
[ClientInfo, Msg, #{session_birth_time => CreateAt}]);
|
||||||
, clientid => ClientId
|
|
||||||
}]);
|
|
||||||
|
|
||||||
%% in 4.4.0, timestamp are stored in pubrel_await, not message
|
%% Hot upgrade compatibility clause.
|
||||||
on_delivery_completed(_Ts, _Session) ->
|
%% In the 4.4.0, timestamp are stored in pubrel_await, not a message record.
|
||||||
|
%% This clause should be kept in all 4.4.x versions.
|
||||||
|
on_delivery_completed(_ClientInfo, _Ts, _Session) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
mark_begin_deliver(Msg) ->
|
mark_begin_deliver(Msg) ->
|
||||||
|
|
Loading…
Reference in New Issue