feat: add rule event 'client.connack'

This commit is contained in:
EMQ-YangM 2022-03-28 11:32:07 +08:00
parent 659658a1ed
commit 65164fb046
2 changed files with 100 additions and 1 deletions

View File

@ -33,6 +33,7 @@
-export([ on_client_connected/3
, on_client_disconnected/4
, on_client_connack/4
, on_session_subscribed/4
, on_session_unsubscribed/4
, on_message_publish/2
@ -60,6 +61,7 @@
event_names() ->
[ 'client.connected'
, 'client.disconnected'
, 'client.connack'
, 'session.subscribed'
, 'session.unsubscribed'
, 'message.publish'
@ -108,6 +110,10 @@ on_client_connected(ClientInfo, ConnInfo, Env) ->
apply_event('client.connected',
fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env).
on_client_connack(ConnInfo, Reason, _, Env) ->
apply_event('client.connack',
fun() -> eventmsg_connack(ConnInfo, Reason) end, Env).
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) ->
apply_event('client.disconnected',
fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env).
@ -235,6 +241,34 @@ eventmsg_disconnected(_ClientInfo = #{
disconnected_at => DisconnectedAt
}).
eventmsg_connack(_ConnInfo = #{
clientid := ClientId,
clean_start := CleanStart,
username := Username,
peername := PeerName,
sockname := SockName,
proto_name := ProtoName,
proto_ver := ProtoVer,
keepalive := Keepalive,
connected_at := ConnectedAt,
conn_props := ConnProps,
expiry_interval := ExpiryInterval
}, Reason) ->
with_basic_columns('client.connack',
#{reason_code => reason(Reason),
clientid => ClientId,
clean_start => CleanStart,
username => Username,
peername => ntoa(PeerName),
sockname => ntoa(SockName),
proto_name => ProtoName,
proto_ver => ProtoVer,
keepalive => Keepalive,
expiry_interval => ExpiryInterval,
connected_at => ConnectedAt,
conn_props => printable_maps(ConnProps)
}).
eventmsg_sub_or_unsub(Event, _ClientInfo = #{
clientid := ClientId,
username := Username,
@ -378,6 +412,7 @@ event_info() ->
, event_info_message_dropped()
, event_info_client_connected()
, event_info_client_disconnected()
, event_info_client_connack()
, event_info_session_subscribed()
, event_info_session_unsubscribed()
, event_info_delivery_dropped()
@ -435,6 +470,13 @@ event_info_client_disconnected() ->
{<<"client disconnected">>, <<"连接断开"/utf8>>},
<<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">>
).
event_info_client_connack() ->
event_info_common(
'client.connack',
{<<"client connack">>, <<"连接确认"/utf8>>},
{<<"client connack">>, <<"连接确认"/utf8>>},
<<"SELECT * FROM \"$events/client_connack\"">>
).
event_info_session_subscribed() ->
event_info_common(
'session.subscribed',
@ -500,6 +542,11 @@ test_columns('client.disconnected') ->
, {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
, {<<"reason">>, [<<"normal">>, <<"the reason for shutdown">>]}
];
test_columns('client.connack') ->
[ {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"reason_code">>, <<"sucess">>}
];
test_columns('session.unsubscribed') ->
test_columns('session.subscribed');
test_columns('session.subscribed') ->
@ -600,6 +647,23 @@ columns_with_exam('client.disconnected') ->
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('client.connack') ->
[ {<<"event">>, 'client.connected'}
, {<<"reason_code">>, success}
, {<<"clientid">>, <<"c_emqx">>}
, {<<"username">>, <<"u_emqx">>}
, {<<"peername">>, <<"192.168.0.10:56431">>}
, {<<"sockname">>, <<"0.0.0.0:1883">>}
, {<<"proto_name">>, <<"MQTT">>}
, {<<"proto_ver">>, 5}
, {<<"keepalive">>, 60}
, {<<"clean_start">>, true}
, {<<"expiry_interval">>, 3600}
, {<<"connected_at">>, erlang:system_time(millisecond)}
, columns_example_props(conn_props)
, {<<"timestamp">>, erlang:system_time(millisecond)}
, {<<"node">>, node()}
];
columns_with_exam('session.subscribed') ->
[ columns_example_props(sub_props)
] ++ columns_message_sub_unsub('session.subscribed');
@ -710,6 +774,7 @@ ntoa(IpAddr) ->
event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack';
event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
event_name(<<"$events/session_unsubscribed", _/binary>>) ->
'session.unsubscribed';
@ -722,6 +787,7 @@ event_name(_) -> 'message.publish'.
event_topic('client.connected') -> <<"$events/client_connected">>;
event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
event_topic('client.connack') -> <<"$events/client_connack">>;
event_topic('session.subscribed') -> <<"$events/session_subscribed">>;
event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
event_topic('message.delivered') -> <<"$events/message_delivered">>;

View File

@ -139,6 +139,7 @@ init_per_testcase(t_events, Config) ->
init_events_counters(),
SQL = "SELECT * FROM \"$events/client_connected\", "
"\"$events/client_disconnected\", "
"\"$events/client_connack\", "
"\"$events/session_subscribed\", "
"\"$events/session_unsubscribed\", "
"\"$events/message_acked\", "
@ -321,7 +322,7 @@ t_events(_Config) ->
, {proto_ver, v5}
, {properties, #{'Session-Expiry-Interval' => 60}}
]),
ct:pal("====== verify $events/client_connected"),
ct:pal("====== verify $events/client_connected, $events/client_connack"),
client_connected(Client, Client2),
ct:pal("====== verify $events/message_dropped"),
message_dropped(Client),
@ -349,6 +350,7 @@ message_publish(Client) ->
client_connected(Client, Client2) ->
{ok, _} = emqtt:connect(Client),
{ok, _} = emqtt:connect(Client2),
verify_event('client.connack'),
verify_event('client.connected'),
ok.
client_disconnected(Client, Client2) ->
@ -1638,6 +1640,37 @@ verify_event_fields('message.acked', Fields) ->
?assert(is_map(PubAckProps)),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
?assert(EventAt =< Timestamp);
verify_event_fields('client.connack', Fields) ->
#{clientid := ClientId,
clean_start := CleanStart,
username := Username,
peername := PeerName,
sockname := SockName,
proto_name := ProtoName,
proto_ver := ProtoVer,
keepalive := Keepalive,
expiry_interval := ExpiryInterval,
conn_props := Properties,
timestamp := Timestamp,
connected_at := EventAt
} = Fields,
Now = erlang:system_time(millisecond),
TimestampElapse = Now - Timestamp,
RcvdAtElapse = Now - EventAt,
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
verify_peername(PeerName),
verify_peername(SockName),
?assertEqual(<<"MQTT">>, ProtoName),
?assertEqual(5, ProtoVer),
?assert(is_integer(Keepalive)),
?assert(is_boolean(CleanStart)),
?assertEqual(60000, ExpiryInterval),
?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
?assert(EventAt =< Timestamp).
verify_peername(PeerName) ->