diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 7f6b40da8..93e38c44d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.3", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}]}, {"4.4.2", [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, @@ -41,7 +42,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.3", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}]}, {"4.4.2", [{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index c1f019692..6c2ff97e4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -236,19 +236,18 @@ eventmsg_disconnected(_ClientInfo = #{ disconnected_at => DisconnectedAt }). -eventmsg_connack(_ConnInfo = #{ +eventmsg_connack(ConnInfo = #{ clientid := ClientId, clean_start := CleanStart, username := Username, peername := PeerName, sockname := SockName, proto_name := ProtoName, - proto_ver := ProtoVer, - keepalive := Keepalive, - connected_at := ConnectedAt, - conn_props := ConnProps, - expiry_interval := ExpiryInterval + proto_ver := ProtoVer }, Reason) -> + Keepalive = maps:get(keepalive, ConnInfo, 0), + ConnProps = maps:get(conn_props, ConnInfo, #{}), + ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0), with_basic_columns('client.connack', #{reason_code => reason(Reason), clientid => ClientId, @@ -260,7 +259,6 @@ eventmsg_connack(_ConnInfo = #{ proto_ver => ProtoVer, keepalive => Keepalive, expiry_interval => ExpiryInterval, - connected_at => ConnectedAt, conn_props => printable_maps(ConnProps) }). eventmsg_check_acl_complete(_ClientInfo = #{ diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index df9ebbb78..11f470e3b 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1058,6 +1058,7 @@ t_events(_Config) -> , {proto_ver, v5} , {properties, #{'Session-Expiry-Interval' => 60}} ]), + ct:pal("====== verify $events/client_connected, $events/client_connack"), client_connected(Client, Client2), ct:pal("====== verify $events/session_subscribed, $events/client_check_acl_complete"), @@ -1074,6 +1075,8 @@ t_events(_Config) -> message_dropped(Client), ct:pal("====== verify $events/client_disconnected"), client_disconnected(Client, Client2), + ct:pal("====== verify $events/client_connack"), + client_connack_failed(), ok. message_publish(Client) -> @@ -1081,12 +1084,33 @@ message_publish(Client) -> <<"{\"id\": 1, \"name\": \"ha\"}">>, [{qos, 1}]), verify_event('message.publish'), ok. + client_connected(Client, Client2) -> {ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client2), verify_event('client.connack'), verify_event('client.connected'), ok. + +client_connack_failed() -> + {ok, Client} = emqtt:start_link( + [ {username, <<"u_event3">>} + , {clientid, <<"c_event3">>} + , {proto_ver, v5} + , {properties, #{'Session-Expiry-Interval' => 60}} + ]), + try + meck:new(emqx_access_control, [non_strict, passthrough]), + meck:expect(emqx_access_control, authenticate, + fun(_) -> {error, bad_username_or_password} end), + process_flag(trap_exit, true), + ?assertMatch({error, _}, emqtt:connect(Client)), + timer:sleep(300), + verify_event('client.connack') + after + meck:unload(emqx_access_control) + end, + ok. client_disconnected(Client, Client2) -> ok = emqtt:disconnect(Client, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}), ok = emqtt:disconnect(Client2, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}), @@ -2726,14 +2750,14 @@ verify_event_fields('client.connack', Fields) -> keepalive := Keepalive, expiry_interval := ExpiryInterval, conn_props := Properties, - timestamp := Timestamp, - connected_at := EventAt + reason_code := Reason, + timestamp := Timestamp } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, - RcvdAtElapse = Now - EventAt, - ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), - ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), + ?assert(lists:member(Reason, [success, bad_username_or_password])), + ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>, <<"c_event3">>])), + ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>, <<"u_event3">>])), verify_peername(PeerName), verify_peername(SockName), ?assertEqual(<<"MQTT">>, ProtoName), @@ -2742,9 +2766,7 @@ verify_event_fields('client.connack', Fields) -> ?assert(is_boolean(CleanStart)), ?assertEqual(60, ExpiryInterval), ?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties), - ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000), - ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000), - ?assert(EventAt =< Timestamp); + ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000); verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed' ; SubUnsub == 'session.unsubscribed' ->