diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 4b8afea40..8f0bc0beb 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -144,6 +144,7 @@ init_per_testcase(t_events, Config) -> "\"$events/message_acked\", " "\"$events/message_delivered\", " "\"$events/message_dropped\", " + "\"$events/delivery_dropped\", " "\"t1\"", {ok, Rule} = emqx_rule_engine:create_rule( #{id => <<"rule:t_events">>, @@ -322,18 +323,20 @@ t_events(_Config) -> ]), ct:pal("====== verify $events/client_connected"), client_connected(Client, Client2), + ct:pal("====== verify $events/message_dropped"), + message_dropped(Client), ct:pal("====== verify $events/session_subscribed"), session_subscribed(Client2), ct:pal("====== verify t1"), message_publish(Client), + ct:pal("====== verify $events/delivery_dropped"), + delivery_dropped(Client), ct:pal("====== verify $events/message_delivered"), message_delivered(Client), ct:pal("====== verify $events/message_acked"), message_acked(Client), ct:pal("====== verify $events/session_unsubscribed"), session_unsubscribed(Client2), - ct:pal("====== verify $events/message_dropped"), - message_dropped(Client), ct:pal("====== verify $events/client_disconnected"), client_disconnected(Client, Client2), ok. @@ -365,6 +368,15 @@ session_unsubscribed(Client2) -> message_delivered(_Client) -> verify_event('message.delivered'), ok. +delivery_dropped(Client) -> + %% subscribe "t1" and then publish to "t1", the message will not be received by itself + %% because we have set the subscribe flag 'nl' = true + {ok, _, _} = emqtt:subscribe(Client, #{}, <<"t1">>, [{nl, true}, {qos, 1}]), + ct:sleep(50), + message_publish(Client), + ct:pal("--- current emqx hooks: ~p", [ets:tab2list(emqx_hooks)]), + verify_event('delivery.dropped'), + ok. message_dropped(Client) -> message_publish(Client), verify_event('message.dropped'), @@ -1490,6 +1502,45 @@ verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed' maps:get(PropKey, Fields)), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000); +verify_event_fields('delivery.dropped', Fields) -> + #{event := 'delivery.dropped', + id := ID, + metadata := #{rule_id := RuleId}, + reason := Reason, + clientid := ClientId, + username := Username, + from_clientid := FromClientId, + from_username := FromUsername, + node := Node, + payload := Payload, + peerhost := PeerHost, + pub_props := Properties, + publish_received_at := EventAt, + qos := QoS, + flags := Flags, + timestamp := Timestamp, + topic := Topic} = Fields, + Now = erlang:system_time(millisecond), + TimestampElapse = Now - Timestamp, + RcvdAtElapse = Now - EventAt, + ?assert(is_binary(ID)), + ?assertEqual(<<"rule:t_events">>, RuleId), + ?assertEqual(no_local, Reason), + ?assertEqual(node(), Node), + ?assertEqual(<<"c_event">>, ClientId), + ?assertEqual(<<"u_event">>, Username), + ?assertEqual(<<"c_event">>, FromClientId), + ?assertEqual(<<"u_event">>, FromUsername), + ?assertEqual(<<"{\"id\": 1, \"name\": \"ha\"}">>, Payload), + verify_ipaddr(PeerHost), + ?assertEqual(<<"t1">>, Topic), + ?assertEqual(1, QoS), + ?assert(is_map(Flags)), + ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties), + ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000), + ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000), + ?assert(EventAt =< Timestamp); + verify_event_fields('message.dropped', Fields) -> #{id := ID, reason := Reason,