diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 5d85d5990..fb7649e97 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -452,7 +452,8 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> }. test_columns('message.dropped') -> - test_columns('message.publish'); + [ {<<"reason">>, <<"no_subscribers">>} + ] ++ test_columns('message.publish'); test_columns('message.publish') -> [ {<<"clientid">>, <<"c_emqx">>} , {<<"username">>, <<"u_emqx">>} @@ -471,6 +472,9 @@ test_columns('message.delivered') -> , {<<"qos">>, 1} , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} ]; +test_columns('delivery.dropped') -> + [ {<<"reason">>, <<"queue_full">>} + ] ++ test_columns('message.delivered'); test_columns('client.connected') -> [ {<<"clientid">>, <<"c_emqx">>} , {<<"username">>, <<"u_emqx">>} @@ -657,6 +661,7 @@ event_name(<<"$events/session_unsubscribed", _/binary>>) -> event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; +event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; @@ -666,6 +671,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; +event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('message.publish') -> <<"$events/message_publish">>. printable_maps(undefined) -> #{}; diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 6c2482075..35121c046 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -101,7 +101,7 @@ do_apply_rule(#rule{id = RuleId, {match_conditions_error, {_EXCLASS_,_EXCPTION_,_ST_}}) of true -> Collection2 = filter_collection(Input, InCase, DoEach, Collection), - case Collection2 of + case Collection2 of [] -> emqx_rule_metrics:inc_rules_no_result(RuleId); _ -> emqx_rule_metrics:inc_rules_passed(RuleId) end, diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 1bccf0c1a..442aa1db8 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -408,7 +408,8 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)}, inc_ping_counter(), case ClientIdPing of ClientId -> - case emqx_session:replay(emqx_channel:get_session(Channel)) of + case emqx_session:replay(emqx_channel:info(clientinfo, Channel), + emqx_channel:get_session(Channel)) of {ok, [], Session0} -> State0 = send_message(?SN_PINGRESP_MSG(), State), {keep_state, State0#state{ @@ -521,7 +522,8 @@ handle_event(info, {deliver, _Topic, Msg}, asleep, % section 6.14, Support of sleeping clients ?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p", [Msg, Pendings]), - Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)), + Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel), + Msg, emqx_channel:get_session(Channel)), {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName, diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index c8aefb4aa..20257911b 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -342,7 +342,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> - case emqx_session:puback(PacketId, Session) of + case emqx_session:puback(ClientInfo, PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), {ok, Channel#channel{session = NSession}}; @@ -387,8 +387,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se handle_out(pubcomp, {PacketId, RC}, Channel) end; -handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> - case emqx_session:pubcomp(PacketId, Session) of +handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{ + clientinfo = ClientInfo, session = Session}) -> + case emqx_session:pubcomp(ClientInfo, PacketId, Session) of {ok, NSession} -> {ok, Channel#channel{session = NSession}}; {ok, Publishes, NSession} -> @@ -720,27 +721,33 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. -spec(handle_deliver(list(emqx_types:deliver()), channel()) -> {ok, channel()} | {ok, replies(), channel()}). -handle_deliver(Delivers, Channel = #channel{takeover = true, - pendings = Pendings, - session = Session, - clientinfo = #{clientid := ClientId}}) -> +handle_deliver(Delivers, Channel = #channel{ + takeover = true, + pendings = Pendings, + session = Session, + clientinfo = #{clientid := ClientId} = ClientInfo}) -> %% NOTE: Order is important here. While the takeover is in %% progress, the session cannot enqueue messages, since it already %% passed on the queue to the new connection in the session state. - NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)), + NPendings = lists:append(Pendings, + ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session)), {ok, Channel#channel{pendings = NPendings}}; -handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, - takeover = false, - session = Session, - clientinfo = #{clientid := ClientId}}) -> - NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session), +handle_deliver(Delivers, Channel = #channel{ + conn_state = disconnected, + takeover = false, + session = Session, + clientinfo = #{clientid := ClientId} = ClientInfo}) -> + NSession = emqx_session:enqueue(ClientInfo, + ignore_local(ClientInfo, maybe_nack(Delivers), ClientId, Session), Session), {ok, Channel#channel{session = NSession}}; -handle_deliver(Delivers, Channel = #channel{session = Session, - takeover = false, - clientinfo = #{clientid := ClientId}}) -> - case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of +handle_deliver(Delivers, Channel = #channel{ + session = Session, + takeover = false, + clientinfo = #{clientid := ClientId} = ClientInfo}) -> + case emqx_session:deliver(ClientInfo, + ignore_local(ClientInfo, Delivers, ClientId, Session), Session) of {ok, Publishes, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); @@ -748,12 +755,12 @@ handle_deliver(Delivers, Channel = #channel{session = Session, {ok, Channel#channel{session = NSession}} end. -ignore_local(Delivers, Subscriber, Session) -> +ignore_local(ClientInfo, Delivers, Subscriber, Session) -> Subs = emqx_session:info(subscriptions, Session), lists:dropwhile(fun({deliver, Topic, #message{from = Publisher} = Msg}) -> case maps:find(Topic, Subs) of {ok, #{nl := 1}} when Subscriber =:= Publisher -> - ok = emqx_hooks:run('delivery.dropped', [Msg, #{node => node()}, no_local]), + ok = emqx_hooks:run('delivery.dropped', [ClientInfo, Msg, no_local]), ok = emqx_metrics:inc('delivery.dropped'), ok = emqx_metrics:inc('delivery.dropped.no_local'), true; @@ -1026,8 +1033,8 @@ handle_timeout(_TRef, retry_delivery, Channel = #channel{conn_state = disconnected}) -> {ok, Channel}; handle_timeout(_TRef, retry_delivery, - Channel = #channel{session = Session}) -> - case emqx_session:retry(Session) of + Channel = #channel{session = Session, clientinfo = ClientInfo}) -> + case emqx_session:retry(ClientInfo, Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; {ok, Publishes, Timeout, NSession} -> @@ -1589,9 +1596,10 @@ maybe_resume_session(#channel{resuming = false}) -> ignore; maybe_resume_session(#channel{session = Session, resuming = true, - pendings = Pendings}) -> - {ok, Publishes, Session1} = emqx_session:replay(Session), - case emqx_session:deliver(Pendings, Session1) of + pendings = Pendings, + clientinfo = ClientInfo}) -> + {ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), + case emqx_session:deliver(ClientInfo, Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; {ok, More, Session2} ->