Fix message dropped
Prior to this change, when store_qos0 option is set to false, the hook `message.dropped` would not be triggered. It is wrong design and when the ignore_loop is enabled or nl is set to 1, the hook `message.dropped` also would not be triggered.
This commit is contained in:
parent
2306789755
commit
15f8d3208f
|
@ -124,8 +124,8 @@ stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
|
||||||
|
|
||||||
%% @doc Enqueue a message.
|
%% @doc Enqueue a message.
|
||||||
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
|
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
|
||||||
in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
|
in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
|
||||||
{_Dropped = undefined, MQ};
|
{_Dropped = Msg, MQ};
|
||||||
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
||||||
p_table = PTab,
|
p_table = PTab,
|
||||||
q = Q,
|
q = Q,
|
||||||
|
|
|
@ -812,7 +812,11 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel,
|
||||||
%% Dispatch messages
|
%% Dispatch messages
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap}) ->
|
handle_dispatch(Msgs, State = #state{inflight = Inflight,
|
||||||
|
client_id = ClientId,
|
||||||
|
username = Username,
|
||||||
|
subscriptions = SubMap}) ->
|
||||||
|
SessProps = #{client_id => ClientId, username => Username},
|
||||||
%% Drain the mailbox and batch deliver
|
%% Drain the mailbox and batch deliver
|
||||||
Msgs1 = drain_m(batch_n(Inflight), Msgs),
|
Msgs1 = drain_m(batch_n(Inflight), Msgs),
|
||||||
%% Ack the messages for shared subscription
|
%% Ack the messages for shared subscription
|
||||||
|
@ -823,7 +827,9 @@ handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap
|
||||||
SubOpts = find_subopts(Topic, SubMap),
|
SubOpts = find_subopts(Topic, SubMap),
|
||||||
case process_subopts(SubOpts, Msg, State) of
|
case process_subopts(SubOpts, Msg, State) of
|
||||||
{ok, Msg1} -> [Msg1|Acc];
|
{ok, Msg1} -> [Msg1|Acc];
|
||||||
ignore -> Acc
|
ignore ->
|
||||||
|
emqx_hooks:run('message.dropped', [SessProps, Msg]),
|
||||||
|
Acc
|
||||||
end
|
end
|
||||||
end, [], Msgs2),
|
end, [], Msgs2),
|
||||||
NState = batch_process(Msgs3, State),
|
NState = batch_process(Msgs3, State),
|
||||||
|
@ -957,7 +963,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use
|
||||||
if
|
if
|
||||||
Dropped =/= undefined ->
|
Dropped =/= undefined ->
|
||||||
SessProps = #{client_id => ClientId, username => Username},
|
SessProps = #{client_id => ClientId, username => Username},
|
||||||
ok = emqx_hooks:run('message.dropped', [SessProps, Msg]);
|
ok = emqx_hooks:run('message.dropped', [SessProps, Dropped]);
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
State#state{mqueue = NewQ}.
|
State#state{mqueue = NewQ}.
|
||||||
|
|
Loading…
Reference in New Issue