From 15f8d3208f4013da61291f53ea536d6150e9d91b Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Wed, 3 Apr 2019 20:22:05 +0800 Subject: [PATCH] Fix message dropped Prior to this change, when store_qos0 option is set to false, the hook `message.dropped` would not be triggered. It is wrong design and when the ignore_loop is enabled or nl is set to 1, the hook `message.dropped` also would not be triggered. --- src/emqx_mqueue.erl | 4 ++-- src/emqx_session.erl | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 594599daf..f13ccdd34 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -124,8 +124,8 @@ stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). -in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> - {_Dropped = undefined, MQ}; +in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + {_Dropped = Msg, MQ}; in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, p_table = PTab, q = Q, diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 3ef9107a7..2ef64c9aa 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -812,7 +812,11 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, %% Dispatch messages %%------------------------------------------------------------------------------ -handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap}) -> +handle_dispatch(Msgs, State = #state{inflight = Inflight, + client_id = ClientId, + username = Username, + subscriptions = SubMap}) -> + SessProps = #{client_id => ClientId, username => Username}, %% Drain the mailbox and batch deliver Msgs1 = drain_m(batch_n(Inflight), Msgs), %% Ack the messages for shared subscription @@ -823,7 +827,9 @@ handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap SubOpts = find_subopts(Topic, SubMap), case process_subopts(SubOpts, Msg, State) of {ok, Msg1} -> [Msg1|Acc]; - ignore -> Acc + ignore -> + emqx_hooks:run('message.dropped', [SessProps, Msg]), + Acc end end, [], Msgs2), NState = batch_process(Msgs3, State), @@ -957,7 +963,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Use if Dropped =/= undefined -> SessProps = #{client_id => ClientId, username => Username}, - ok = emqx_hooks:run('message.dropped', [SessProps, Msg]); + ok = emqx_hooks:run('message.dropped', [SessProps, Dropped]); true -> ok end, State#state{mqueue = NewQ}.