refactor(sessmem): pass log context as part of session event
Co-Authored-By: Thales Macedo Garitezi <thalesmg@gmail.com>
This commit is contained in:
parent
adc29e15cc
commit
9d145890cc
|
@ -21,8 +21,10 @@
|
|||
|
||||
-export([handle_event/2]).
|
||||
|
||||
-type event_expired() :: {expired, emqx_types:message()}.
|
||||
-type event_dropped() :: {dropped, emqx_types:message(), _Reason :: atom()}.
|
||||
-type message() :: emqx_types:message().
|
||||
|
||||
-type event_expired() :: {expired, message()}.
|
||||
-type event_dropped() :: {dropped, message(), _Reason :: atom() | #{reason := atom(), _ => _}}.
|
||||
-type event_expire_rel() :: {expired_rel, non_neg_integer()}.
|
||||
|
||||
-type event() ::
|
||||
|
@ -37,22 +39,24 @@
|
|||
handle_event(ClientInfo, {expired, Msg}) ->
|
||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, expired]),
|
||||
ok = inc_delivery_expired_cnt(1);
|
||||
handle_event(ClientInfo, {dropped, Msg, qos0_msg}) ->
|
||||
handle_event(ClientInfo, {dropped, Msg, no_local}) ->
|
||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
ok = emqx_metrics:inc('delivery.dropped.no_local');
|
||||
handle_event(ClientInfo, {dropped, Msg, #{reason := qos0_msg, logctx := Ctx}}) ->
|
||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, qos0_msg]),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
ok = emqx_metrics:inc('delivery.dropped.qos0_msg'),
|
||||
ok = inc_pd('send_msg.dropped', 1),
|
||||
?SLOG(
|
||||
warning,
|
||||
#{
|
||||
Ctx#{
|
||||
msg => "dropped_qos0_msg",
|
||||
% FIXME
|
||||
% queue => QueueInfo,
|
||||
payload => Msg#message.payload
|
||||
},
|
||||
#{topic => Msg#message.topic}
|
||||
);
|
||||
handle_event(ClientInfo, {dropped, Msg, queue_full}) ->
|
||||
handle_event(ClientInfo, {dropped, Msg, #{reason := queue_full, logctx := Ctx}}) ->
|
||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, queue_full]),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
ok = emqx_metrics:inc('delivery.dropped.queue_full'),
|
||||
|
@ -60,18 +64,12 @@ handle_event(ClientInfo, {dropped, Msg, queue_full}) ->
|
|||
ok = inc_pd('send_msg.dropped.queue_full', 1),
|
||||
?SLOG(
|
||||
warning,
|
||||
#{
|
||||
Ctx#{
|
||||
msg => "dropped_msg_due_to_mqueue_is_full",
|
||||
% FIXME
|
||||
% queue => QueueInfo,
|
||||
payload => Msg#message.payload
|
||||
},
|
||||
#{topic => Msg#message.topic}
|
||||
);
|
||||
handle_event(ClientInfo, {dropped, Msg, no_local}) ->
|
||||
ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]),
|
||||
ok = emqx_metrics:inc('delivery.dropped'),
|
||||
ok = emqx_metrics:inc('delivery.dropped.no_local');
|
||||
handle_event(_ClientInfo, {expired_rel, 0}) ->
|
||||
ok;
|
||||
handle_event(_ClientInfo, {expired_rel, ExpiredCnt}) ->
|
||||
|
|
|
@ -519,17 +519,21 @@ enqueue(ClientInfo, Msgs, Session) when is_list(Msgs) ->
|
|||
).
|
||||
|
||||
enqueue_msg(ClientInfo, #message{qos = QOS} = Msg, Session = #session{mqueue = Q}) ->
|
||||
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
|
||||
{Dropped, NQ} = emqx_mqueue:in(Msg, Q),
|
||||
case Dropped of
|
||||
undefined ->
|
||||
Session#session{mqueue = NewQ};
|
||||
Session#session{mqueue = NQ};
|
||||
_Msg ->
|
||||
NQInfo = emqx_mqueue:info(NQ),
|
||||
Reason =
|
||||
case emqx_mqueue:info(store_qos0, Q) of
|
||||
false when QOS =:= ?QOS_0 -> qos0_msg;
|
||||
case NQInfo of
|
||||
#{store_qos0 := false} when QOS =:= ?QOS_0 -> qos0_msg;
|
||||
_ -> queue_full
|
||||
end,
|
||||
_ = emqx_session_events:handle_event(ClientInfo, {dropped, Dropped, Reason}),
|
||||
_ = emqx_session_events:handle_event(
|
||||
ClientInfo,
|
||||
{dropped, Dropped, #{reason => Reason, logctx => #{queue => NQInfo}}}
|
||||
),
|
||||
Session
|
||||
end.
|
||||
|
||||
|
|
Loading…
Reference in New Issue