From b2f027bcf7c18070e9cc23938e301f626d5e5dba Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 8 Feb 2022 10:32:25 +0800 Subject: [PATCH] feat(rule): add 'delivery.dropped' hook for rules --- .../emqx_rule_engine/src/emqx_rule_events.erl | 75 ++++++++++++++++++- 1 file changed, 72 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 1dd64582e..a0a33470b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -39,6 +39,7 @@ , on_message_dropped/4 , on_message_delivered/3 , on_message_acked/3 + , on_delivery_dropped/4 , on_bridge_message_received/2 ]). @@ -63,6 +64,7 @@ event_names() -> , 'message.delivered' , 'message.acked' , 'message.dropped' + , 'delivery.dropped' ]. reload() -> @@ -153,6 +155,15 @@ on_message_acked(ClientInfo, Message, Env) -> end, {ok, Message}. +on_delivery_dropped(ClientInfo, Message, Reason, Env) -> + case ignore_sys_message(Message) of + true -> ok; + false -> + apply_event('delivery.dropped', + fun() -> eventmsg_delivery_dropped(ClientInfo, Message, Reason) end, Env) + end, + {ok, Message}. + %%-------------------------------------------------------------------- %% Event Messages %%-------------------------------------------------------------------- @@ -311,6 +322,32 @@ eventmsg_acked(_ClientInfo = #{ publish_received_at => Timestamp }). +eventmsg_delivery_dropped(_ClientInfo = #{ + peerhost := PeerHost, + clientid := ReceiverCId, + username := ReceiverUsername + }, + Message = #message{id = Id, from = ClientId, qos = QoS, flags = Flags, topic = Topic, + headers = Headers, payload = Payload, timestamp = Timestamp}, + Reason) -> + with_basic_columns('delivery.dropped', + #{id => emqx_guid:to_hexstr(Id), + reason => Reason, + from_clientid => ClientId, + from_username => emqx_message:get_header(username, Message, undefined), + clientid => ReceiverCId, + username => ReceiverUsername, + payload => Payload, + peerhost => ntoa(PeerHost), + topic => Topic, + qos => QoS, + flags => Flags, + %% the column 'headers' will be removed in the next major release + headers => printable_maps(Headers), + pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), + publish_received_at => Timestamp + }). + sub_unsub_prop_key('session.subscribed') -> sub_props; sub_unsub_prop_key('session.unsubscribed') -> unsub_props. @@ -345,6 +382,7 @@ event_info() -> , event_info_client_disconnected() , event_info_session_subscribed() , event_info_session_unsubscribed() + , event_info_delivery_dropped() ]. event_info_message_publish() -> @@ -371,10 +409,18 @@ event_info_message_acked() -> event_info_message_dropped() -> event_info_common( 'message.dropped', - {<<"message dropped">>, <<"消息丢弃"/utf8>>}, - {<<"message dropped">>, <<"消息丢弃"/utf8>>}, + {<<"message routing-drop">>, <<"消息转发丢弃"/utf8>>}, + {<<"messages are discarded during routing, usually because there are no subscribers">>, <<"消息在转发的过程中被丢弃,一般是由于没有订阅者"/utf8>>}, <<"SELECT * FROM \"$events/message_dropped\" WHERE topic =~ 't/#'">> ). +event_info_delivery_dropped() -> + event_info_common( + 'delivery.dropped', + {<<"message delivery-drop">>, <<"消息投递丢弃"/utf8>>}, + {<<"messages are discarded during delivery, i.e. because the message queue is full">>, + <<"消息在投递的过程中被丢弃,比如由于消息队列已满"/utf8>>}, + <<"SELECT * FROM \"$events/delivery_dropped\" WHERE topic =~ 't/#'">> + ). event_info_client_connected() -> event_info_common( 'client.connected', @@ -414,7 +460,8 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> }. test_columns('message.dropped') -> - test_columns('message.publish'); + [ {<<"reason">>, <<"no_subscribers">>} + ] ++ test_columns('message.publish'); test_columns('message.publish') -> [ {<<"clientid">>, <<"c_emqx">>} , {<<"username">>, <<"u_emqx">>} @@ -422,6 +469,9 @@ test_columns('message.publish') -> , {<<"qos">>, 1} , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} ]; +test_columns('delivery.dropped') -> + [ {<<"reason">>, <<"queue_full">>} + ] ++ test_columns('message.delivered'); test_columns('message.acked') -> test_columns('message.delivered'); test_columns('message.delivered') -> @@ -486,6 +536,23 @@ columns_with_exam('message.dropped') -> , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; +columns_with_exam('delivery.dropped') -> + [ {<<"event">>, 'delivery.dropped'} + , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())} + , {<<"reason">>, queue_full} + , {<<"from_clientid">>, <<"c_emqx_1">>} + , {<<"from_username">>, <<"u_emqx_1">>} + , {<<"clientid">>, <<"c_emqx_2">>} + , {<<"username">>, <<"u_emqx_2">>} + , {<<"payload">>, <<"{\"msg\": \"hello\"}">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"qos">>, 1} + , {<<"flags">>, #{}} + , {<<"publish_received_at">>, erlang:system_time(millisecond)} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; columns_with_exam('client.connected') -> [ {<<"event">>, 'client.connected'} , {<<"clientid">>, <<"c_emqx">>} @@ -578,6 +645,7 @@ event_name(<<"$events/session_unsubscribed", _/binary>>) -> event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; +event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; @@ -587,6 +655,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; +event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; event_topic('message.publish') -> <<"$events/message_publish">>. printable_maps(undefined) -> #{};