diff --git a/apps/emqx/src/emqx_session_events.erl b/apps/emqx/src/emqx_session_events.erl index 754707f52..b04dd2044 100644 --- a/apps/emqx/src/emqx_session_events.erl +++ b/apps/emqx/src/emqx_session_events.erl @@ -21,8 +21,10 @@ -export([handle_event/2]). --type event_expired() :: {expired, emqx_types:message()}. --type event_dropped() :: {dropped, emqx_types:message(), _Reason :: atom()}. +-type message() :: emqx_types:message(). + +-type event_expired() :: {expired, message()}. +-type event_dropped() :: {dropped, message(), _Reason :: atom() | #{reason := atom(), _ => _}}. -type event_expire_rel() :: {expired_rel, non_neg_integer()}. -type event() :: @@ -37,22 +39,24 @@ handle_event(ClientInfo, {expired, Msg}) -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]), ok = inc_delivery_expired_cnt(1); -handle_event(ClientInfo, {dropped, Msg, qos0_msg}) -> +handle_event(ClientInfo, {dropped, Msg, no_local}) -> + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]), + ok = emqx_metrics:inc('delivery.dropped'), + ok = emqx_metrics:inc('delivery.dropped.no_local'); +handle_event(ClientInfo, {dropped, Msg, #{reason := qos0_msg, logctx := Ctx}}) -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.qos0_msg'), ok = inc_pd('send_msg.dropped', 1), ?SLOG( warning, - #{ + Ctx#{ msg => "dropped_qos0_msg", - % FIXME - % queue => QueueInfo, payload => Msg#message.payload }, #{topic => Msg#message.topic} ); -handle_event(ClientInfo, {dropped, Msg, queue_full}) -> +handle_event(ClientInfo, {dropped, Msg, #{reason := queue_full, logctx := Ctx}}) -> ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.queue_full'), @@ -60,18 +64,12 @@ handle_event(ClientInfo, {dropped, Msg, queue_full}) -> ok = inc_pd('send_msg.dropped.queue_full', 1), ?SLOG( warning, - #{ + Ctx#{ msg => "dropped_msg_due_to_mqueue_is_full", - % FIXME - % queue => QueueInfo, payload => Msg#message.payload }, #{topic => Msg#message.topic} ); -handle_event(ClientInfo, {dropped, Msg, no_local}) -> - ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]), - ok = emqx_metrics:inc('delivery.dropped'), - ok = emqx_metrics:inc('delivery.dropped.no_local'); handle_event(_ClientInfo, {expired_rel, 0}) -> ok; handle_event(_ClientInfo, {expired_rel, ExpiredCnt}) -> diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 0c909296e..586196d36 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -519,17 +519,21 @@ enqueue(ClientInfo, Msgs, Session) when is_list(Msgs) -> ). enqueue_msg(ClientInfo, #message{qos = QOS} = Msg, Session = #session{mqueue = Q}) -> - {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), + {Dropped, NQ} = emqx_mqueue:in(Msg, Q), case Dropped of undefined -> - Session#session{mqueue = NewQ}; + Session#session{mqueue = NQ}; _Msg -> + NQInfo = emqx_mqueue:info(NQ), Reason = - case emqx_mqueue:info(store_qos0, Q) of - false when QOS =:= ?QOS_0 -> qos0_msg; + case NQInfo of + #{store_qos0 := false} when QOS =:= ?QOS_0 -> qos0_msg; _ -> queue_full end, - _ = emqx_session_events:handle_event(ClientInfo, {dropped, Dropped, Reason}), + _ = emqx_session_events:handle_event( + ClientInfo, + {dropped, Dropped, #{reason => Reason, logctx => #{queue => NQInfo}}} + ), Session end.