feat(emqx_rule_engine_events): add client_connack event

This commit is contained in:
EMQ-YangM 2022-03-24 14:49:48 +08:00
parent 37d1ba88e4
commit 3410e20fbe
2 changed files with 92 additions and 0 deletions

View File

@ -31,6 +31,7 @@
-export([ on_client_connected/3 -export([ on_client_connected/3
, on_client_disconnected/4 , on_client_disconnected/4
, on_client_connack/4
, on_session_subscribed/4 , on_session_subscribed/4
, on_session_unsubscribed/4 , on_session_unsubscribed/4
, on_message_publish/2 , on_message_publish/2
@ -48,6 +49,7 @@
-define(SUPPORTED_HOOK, -define(SUPPORTED_HOOK,
[ 'client.connected' [ 'client.connected'
, 'client.disconnected' , 'client.disconnected'
, 'client.connack'
, 'session.subscribed' , 'session.subscribed'
, 'session.unsubscribed' , 'session.unsubscribed'
, 'message.publish' , 'message.publish'
@ -106,6 +108,10 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
may_publish_and_apply('client.disconnected', may_publish_and_apply('client.disconnected',
fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env). fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env).
on_client_connack(ConnInfo, Reason, _, Env) ->
may_publish_and_apply('client.connack',
fun() -> eventmsg_connack(ConnInfo, Reason) end, Env).
on_session_subscribed(ClientInfo, Topic, SubOpts, Env) -> on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
may_publish_and_apply('session.subscribed', may_publish_and_apply('session.subscribed',
fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env). fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env).
@ -220,6 +226,34 @@ eventmsg_disconnected(_ClientInfo = #{
disconnected_at => DisconnectedAt disconnected_at => DisconnectedAt
}). }).
eventmsg_connack(_ConnInfo = #{
clientid := ClientId,
clean_start := CleanStart,
username := Username,
peername := PeerName,
sockname := SockName,
proto_name := ProtoName,
proto_ver := ProtoVer,
keepalive := Keepalive,
connected_at := ConnectedAt,
conn_props := ConnProps,
expiry_interval := ExpiryInterval
}, Reason) ->
with_basic_columns('client.connack',
#{reason_code => reason(Reason),
clientid => ClientId,
clean_start => CleanStart,
username => Username,
peername => ntoa(PeerName),
sockname => ntoa(SockName),
proto_name => ProtoName,
proto_ver => ProtoVer,
keepalive => Keepalive,
expiry_interval => ExpiryInterval,
connected_at => ConnectedAt,
conn_props => printable_maps(ConnProps)
}).
eventmsg_sub_or_unsub(Event, _ClientInfo = #{ eventmsg_sub_or_unsub(Event, _ClientInfo = #{
clientid := ClientId, clientid := ClientId,
username := Username, username := Username,
@ -372,6 +406,7 @@ event_info() ->
, event_info_delivery_dropped() , event_info_delivery_dropped()
, event_info_client_connected() , event_info_client_connected()
, event_info_client_disconnected() , event_info_client_disconnected()
, event_info_client_connack()
, event_info_session_subscribed() , event_info_session_subscribed()
, event_info_session_unsubscribed() , event_info_session_unsubscribed()
]. ].
@ -427,6 +462,13 @@ event_info_client_disconnected() ->
{<<"client disconnected">>, <<"连接断开"/utf8>>}, {<<"client disconnected">>, <<"连接断开"/utf8>>},
<<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">> <<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">>
). ).
event_info_client_connack() ->
event_info_common(
'client.connack',
{<<"client connack">>, <<"连接确认"/utf8>>},
{<<"client connack">>, <<"连接确认"/utf8>>},
<<"SELECT * FROM \"$events/client_connack\"">>
).
event_info_session_subscribed() -> event_info_session_subscribed() ->
event_info_common( event_info_common(
'session.subscribed', 'session.subscribed',
@ -485,6 +527,11 @@ test_columns('client.disconnected') ->
, {<<"username">>, <<"u_emqx">>} , {<<"username">>, <<"u_emqx">>}
, {<<"reason">>, <<"normal">>} , {<<"reason">>, <<"normal">>}
]; ];
test_columns('client.connack') ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"reason_code">>, <<"sucess">>}
];
test_columns('session.unsubscribed') -> test_columns('session.unsubscribed') ->
test_columns('session.subscribed'); test_columns('session.subscribed');
test_columns('session.subscribed') -> test_columns('session.subscribed') ->
@ -607,6 +654,23 @@ columns_with_exam('client.disconnected') ->
, {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()} , {<<"node">>, node()}
]; ];
columns_with_exam('client.connack') ->
[ {<<"event">>, 'client.connected'}
, {<<"reason_code">>, success}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peername">>, <<"192.168.0.10:56431">>}
, {<<"sockname">>, <<"0.0.0.0:1883">>}
, {<<"proto_name">>, <<"MQTT">>}
, {<<"proto_ver">>, 5}
, {<<"keepalive">>, 60}
, {<<"clean_start">>, true}
, {<<"expiry_interval">>, 3600}
, {<<"connected_at">>, erlang:system_time(millisecond)}
, columns_example_props(conn_props)
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('session.subscribed') -> columns_with_exam('session.subscribed') ->
[ {<<"event">>, 'session.subscribed'} [ {<<"event">>, 'session.subscribed'}
, {<<"clientid">>, <<"c_emqx">>} , {<<"clientid">>, <<"c_emqx">>}
@ -694,6 +758,7 @@ ntoa(IpAddr) ->
event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected'; event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected'; event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack';
event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed'; event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
event_name(<<"$events/session_unsubscribed", _/binary>>) -> event_name(<<"$events/session_unsubscribed", _/binary>>) ->
'session.unsubscribed'; 'session.unsubscribed';
@ -705,6 +770,7 @@ event_name(_) -> 'message.publish'.
event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.connected') -> <<"$events/client_connected">>;
event_topic('client.disconnected') -> <<"$events/client_disconnected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
event_topic('client.connack') -> <<"$events/client_connack">>;
event_topic('session.subscribed') -> <<"$events/session_subscribed">>; event_topic('session.subscribed') -> <<"$events/session_subscribed">>;
event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.delivered') -> <<"$events/message_delivered">>;

View File

@ -101,6 +101,7 @@ groups() ->
t_sqlselect_2_2, t_sqlselect_2_2,
t_sqlselect_2_3, t_sqlselect_2_3,
t_sqlselect_3, t_sqlselect_3,
t_sqlselect_3_1,
t_sqlparse_event_1, t_sqlparse_event_1,
t_sqlparse_event_2, t_sqlparse_event_2,
t_sqlparse_event_3, t_sqlparse_event_3,
@ -1442,6 +1443,31 @@ t_sqlselect_3(_Config) ->
emqtt:stop(Client), emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule). emqx_rule_registry:remove_rule(TopicRule).
t_sqlselect_3_1(_Config) ->
ok = emqx_rule_engine:load_providers(),
%% republish the client.connected msg
TopicRule = create_simple_repub_rule(
<<"t2">>,
"SELECT * "
"FROM \"$events/client_connack\" "
"WHERE username = 'emqx1'",
<<"clientid=${clientid}">>),
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
{ok, _} = emqtt:connect(Client1),
receive {publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"t2">>, T),
?assertEqual(<<"clientid=c_emqx1">>, Payload)
after 1000 ->
ct:fail(wait_for_t2)
end,
emqtt:stop(Client),
emqx_rule_registry:remove_rule(TopicRule).
t_metrics(_Config) -> t_metrics(_Config) ->
ok = emqx_rule_engine:load_providers(), ok = emqx_rule_engine:load_providers(),
TopicRule = create_simple_repub_rule( TopicRule = create_simple_repub_rule(