diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 2774ac264..c8aefb4aa 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -750,9 +750,10 @@ handle_deliver(Delivers, Channel = #channel{session = Session, ignore_local(Delivers, Subscriber, Session) -> Subs = emqx_session:info(subscriptions, Session), - lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) -> + lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) -> case maps:find(Topic, Subs) of {ok, #{nl := 1}} when Subscriber =:= Publisher -> + ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, no_local]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.no_local'), true; diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index c3ce14d83..6cd4d05b1 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -146,7 +146,7 @@ %% PubSub Metrics {counter, 'messages.publish'}, % Messages Publish {counter, 'messages.dropped'}, % Messages dropped due to no subscribers - {counter, 'messages.dropped.expired'}, % QoS2 Messages expired + {counter, 'messages.dropped.await_pubrel_timeout'}, % QoS2 await PUBREL timeout {counter, 'messages.dropped.no_subscribers'}, % Messages dropped {counter, 'messages.forward'}, % Messages forward {counter, 'messages.retained'}, % Messages retained @@ -542,7 +542,8 @@ reserved_idx('messages.qos2.received') -> 106; reserved_idx('messages.qos2.sent') -> 107; reserved_idx('messages.publish') -> 108; reserved_idx('messages.dropped') -> 109; -reserved_idx('messages.dropped.expired') -> 110; +reserved_idx('messages.dropped.expired') -> 110; %% To be removed in 5.0 +reserved_idx('messages.dropped.await_pubrel_timeout') -> 110; reserved_idx('messages.dropped.no_subscribers') -> 111; reserved_idx('messages.forward') -> 112; reserved_idx('messages.retained') -> 113; diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9463345d4..346098add 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -403,8 +403,10 @@ dequeue(Cnt, Msgs, Q) -> {empty, _Q} -> dequeue(0, Msgs, Q); {{value, Msg}, Q1} -> case emqx_message:is_expired(Msg) of - true -> ok = inc_expired_cnt(delivery), - dequeue(Cnt, Msgs, Q1); + true -> + ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]), + ok = inc_delivery_expired_cnt(), + dequeue(Cnt, Msgs, Q1); false -> dequeue(acc_cnt(Msg, Cnt), [Msg|Msgs], Q1) end end. @@ -565,7 +567,8 @@ retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> case emqx_message:is_expired(Msg) of true -> - ok = inc_expired_cnt(delivery), + ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, expired]), + ok = inc_delivery_expired_cnt(), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), @@ -593,7 +596,7 @@ expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, AwaitingRel1 = maps:filter(NotExpired, AwaitingRel), ExpiredCnt = maps:size(AwaitingRel) - maps:size(AwaitingRel1), - (ExpiredCnt > 0) andalso inc_expired_cnt(message, ExpiredCnt), + (ExpiredCnt > 0) andalso inc_await_pubrel_timeout(ExpiredCnt), NSession = Session#session{awaiting_rel = AwaitingRel1}, case maps:size(AwaitingRel1) of 0 -> {ok, NSession}; @@ -644,18 +647,16 @@ run_hook(Name, Args) -> %%-------------------------------------------------------------------- %% Inc message/delivery expired counter %%-------------------------------------------------------------------- +inc_delivery_expired_cnt() -> + inc_delivery_expired_cnt(1). --compile({inline, [inc_expired_cnt/1, inc_expired_cnt/2]}). - -inc_expired_cnt(K) -> inc_expired_cnt(K, 1). - -inc_expired_cnt(delivery, N) -> +inc_delivery_expired_cnt(N) -> ok = emqx_metrics:inc('delivery.dropped', N), - emqx_metrics:inc('delivery.dropped.expired', N); + emqx_metrics:inc('delivery.dropped.expired', N). -inc_expired_cnt(message, N) -> +inc_await_pubrel_timeout(N) -> ok = emqx_metrics:inc('messages.dropped', N), - emqx_metrics:inc('messages.dropped.expired', N). + emqx_metrics:inc('messages.dropped.await_pubrel_timeout', N). %%-------------------------------------------------------------------- %% Next Packet Id