From 3410e20fbe09081e4010cc5b68bd5da9c100f7b4 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Thu, 24 Mar 2022 14:49:48 +0800 Subject: [PATCH] feat(emqx_rule_engine_events): add client_connack event --- .../emqx_rule_engine/src/emqx_rule_events.erl | 66 +++++++++++++++++++ .../test/emqx_rule_engine_SUITE.erl | 26 ++++++++ 2 files changed, 92 insertions(+) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 540b1cbbd..0ef00ea85 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -31,6 +31,7 @@ -export([ on_client_connected/3 , on_client_disconnected/4 + , on_client_connack/4 , on_session_subscribed/4 , on_session_unsubscribed/4 , on_message_publish/2 @@ -48,6 +49,7 @@ -define(SUPPORTED_HOOK, [ 'client.connected' , 'client.disconnected' + , 'client.connack' , 'session.subscribed' , 'session.unsubscribed' , 'message.publish' @@ -106,6 +108,10 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> may_publish_and_apply('client.disconnected', fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env). +on_client_connack(ConnInfo, Reason, _, Env) -> + may_publish_and_apply('client.connack', + fun() -> eventmsg_connack(ConnInfo, Reason) end, Env). + on_session_subscribed(ClientInfo, Topic, SubOpts, Env) -> may_publish_and_apply('session.subscribed', fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env). @@ -220,6 +226,34 @@ eventmsg_disconnected(_ClientInfo = #{ disconnected_at => DisconnectedAt }). +eventmsg_connack(_ConnInfo = #{ + clientid := ClientId, + clean_start := CleanStart, + username := Username, + peername := PeerName, + sockname := SockName, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive, + connected_at := ConnectedAt, + conn_props := ConnProps, + expiry_interval := ExpiryInterval + }, Reason) -> + with_basic_columns('client.connack', + #{reason_code => reason(Reason), + clientid => ClientId, + clean_start => CleanStart, + username => Username, + peername => ntoa(PeerName), + sockname => ntoa(SockName), + proto_name => ProtoName, + proto_ver => ProtoVer, + keepalive => Keepalive, + expiry_interval => ExpiryInterval, + connected_at => ConnectedAt, + conn_props => printable_maps(ConnProps) + }). + eventmsg_sub_or_unsub(Event, _ClientInfo = #{ clientid := ClientId, username := Username, @@ -372,6 +406,7 @@ event_info() -> , event_info_delivery_dropped() , event_info_client_connected() , event_info_client_disconnected() + , event_info_client_connack() , event_info_session_subscribed() , event_info_session_unsubscribed() ]. @@ -427,6 +462,13 @@ event_info_client_disconnected() -> {<<"client disconnected">>, <<"连接断开"/utf8>>}, <<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">> ). +event_info_client_connack() -> + event_info_common( + 'client.connack', + {<<"client connack">>, <<"连接确认"/utf8>>}, + {<<"client connack">>, <<"连接确认"/utf8>>}, + <<"SELECT * FROM \"$events/client_connack\"">> + ). event_info_session_subscribed() -> event_info_common( 'session.subscribed', @@ -485,6 +527,11 @@ test_columns('client.disconnected') -> , {<<"username">>, <<"u_emqx">>} , {<<"reason">>, <<"normal">>} ]; +test_columns('client.connack') -> + [ {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"reason_code">>, <<"sucess">>} + ]; test_columns('session.unsubscribed') -> test_columns('session.subscribed'); test_columns('session.subscribed') -> @@ -607,6 +654,23 @@ columns_with_exam('client.disconnected') -> , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; +columns_with_exam('client.connack') -> + [ {<<"event">>, 'client.connected'} + , {<<"reason_code">>, success} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peername">>, <<"192.168.0.10:56431">>} + , {<<"sockname">>, <<"0.0.0.0:1883">>} + , {<<"proto_name">>, <<"MQTT">>} + , {<<"proto_ver">>, 5} + , {<<"keepalive">>, 60} + , {<<"clean_start">>, true} + , {<<"expiry_interval">>, 3600} + , {<<"connected_at">>, erlang:system_time(millisecond)} + , columns_example_props(conn_props) + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; columns_with_exam('session.subscribed') -> [ {<<"event">>, 'session.subscribed'} , {<<"clientid">>, <<"c_emqx">>} @@ -694,6 +758,7 @@ ntoa(IpAddr) -> event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected'; event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected'; +event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack'; event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed'; event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed'; @@ -705,6 +770,7 @@ event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; +event_topic('client.connack') -> <<"$events/client_connack">>; event_topic('session.subscribed') -> <<"$events/session_subscribed">>; event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 02e0f607c..ca4b8db0c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -101,6 +101,7 @@ groups() -> t_sqlselect_2_2, t_sqlselect_2_3, t_sqlselect_3, + t_sqlselect_3_1, t_sqlparse_event_1, t_sqlparse_event_2, t_sqlparse_event_3, @@ -1442,6 +1443,31 @@ t_sqlselect_3(_Config) -> emqtt:stop(Client), emqx_rule_registry:remove_rule(TopicRule). +t_sqlselect_3_1(_Config) -> + ok = emqx_rule_engine:load_providers(), + %% republish the client.connected msg + TopicRule = create_simple_repub_rule( + <<"t2">>, + "SELECT * " + "FROM \"$events/client_connack\" " + "WHERE username = 'emqx1'", + <<"clientid=${clientid}">>), + {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), + ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]), + {ok, _} = emqtt:connect(Client1), + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"clientid=c_emqx1">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, + + emqtt:stop(Client), + emqx_rule_registry:remove_rule(TopicRule). + t_metrics(_Config) -> ok = emqx_rule_engine:load_providers(), TopicRule = create_simple_repub_rule(