feat: add client.check_acl_complete event
This commit is contained in:
parent
326b01968b
commit
e0d142c625
|
@ -39,6 +39,7 @@
|
|||
, on_message_delivered/3
|
||||
, on_message_acked/3
|
||||
, on_delivery_dropped/4
|
||||
, on_client_check_acl_complete/5
|
||||
]).
|
||||
|
||||
-export([ event_info/0
|
||||
|
@ -57,6 +58,7 @@
|
|||
, 'message.acked'
|
||||
, 'message.dropped'
|
||||
, 'delivery.dropped'
|
||||
, 'client.check_acl_complete'
|
||||
]).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
@ -112,6 +114,13 @@ on_client_connack(ConnInfo, Reason, _, Env) ->
|
|||
may_publish_and_apply('client.connack',
|
||||
fun() -> eventmsg_connack(ConnInfo, Reason) end, Env).
|
||||
|
||||
on_client_check_acl_complete(ClientInfo, PubSub, Topic, Result, Env) ->
|
||||
may_publish_and_apply('client.check_acl_complete',
|
||||
fun() -> eventmsg_check_acl_complete(ClientInfo,
|
||||
PubSub,
|
||||
Topic,
|
||||
Result) end, Env).
|
||||
|
||||
on_session_subscribed(ClientInfo, Topic, SubOpts, Env) ->
|
||||
may_publish_and_apply('session.subscribed',
|
||||
fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env).
|
||||
|
@ -253,6 +262,19 @@ eventmsg_connack(_ConnInfo = #{
|
|||
connected_at => ConnectedAt,
|
||||
conn_props => printable_maps(ConnProps)
|
||||
}).
|
||||
eventmsg_check_acl_complete(_ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := PeerHost
|
||||
}, PubSub, Topic, Result) ->
|
||||
with_basic_columns('client.check_acl_complete',
|
||||
#{clientid => ClientId,
|
||||
username => Username,
|
||||
peerhost => ntoa(PeerHost),
|
||||
topic => Topic,
|
||||
action => PubSub,
|
||||
result => Result
|
||||
}).
|
||||
|
||||
eventmsg_sub_or_unsub(Event, _ClientInfo = #{
|
||||
clientid := ClientId,
|
||||
|
@ -409,6 +431,7 @@ event_info() ->
|
|||
, event_info_client_connack()
|
||||
, event_info_session_subscribed()
|
||||
, event_info_session_unsubscribed()
|
||||
, event_info_client_check_acl_complete()
|
||||
].
|
||||
|
||||
event_info_message_publish() ->
|
||||
|
@ -483,6 +506,13 @@ event_info_session_unsubscribed() ->
|
|||
{<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>},
|
||||
<<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">>
|
||||
).
|
||||
event_info_client_check_acl_complete() ->
|
||||
event_info_common(
|
||||
'client.check_acl_complete',
|
||||
{<<"client check acl complete">>, <<"鉴权结果"/utf8>>},
|
||||
{<<"client check acl complete">>, <<"鉴权结果"/utf8>>},
|
||||
<<"SELECT * FROM \"$events/client_check_acl_complete\"">>
|
||||
).
|
||||
|
||||
event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
|
||||
#{event => event_topic(Event),
|
||||
|
@ -539,6 +569,13 @@ test_columns('session.subscribed') ->
|
|||
, {<<"username">>, <<"u_emqx">>}
|
||||
, {<<"topic">>, <<"t/a">>}
|
||||
, {<<"qos">>, 1}
|
||||
];
|
||||
test_columns('client.check_acl_complete') ->
|
||||
[ {<<"clientid">>, <<"c_emqx">>}
|
||||
, {<<"username">>, <<"u_emqx">>}
|
||||
, {<<"topic">>, <<"t/1">>}
|
||||
, {<<"action">>, <<"publish">>}
|
||||
, {<<"result">>, <<"allow">>}
|
||||
].
|
||||
|
||||
columns_with_exam('message.publish') ->
|
||||
|
@ -692,6 +729,17 @@ columns_with_exam('session.unsubscribed') ->
|
|||
, columns_example_props(unsub_props)
|
||||
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
||||
, {<<"node">>, node()}
|
||||
];
|
||||
columns_with_exam('client.check_acl_complete') ->
|
||||
[ {<<"event">>, 'client.check_acl_complete'}
|
||||
, {<<"clientid">>, <<"c_emqx">>}
|
||||
, {<<"username">>, <<"u_emqx">>}
|
||||
, {<<"peerhost">>, <<"192.168.0.10">>}
|
||||
, {<<"topic">>, <<"t/a">>}
|
||||
, {<<"action">>, <<"publish">>}
|
||||
, {<<"result">>, <<"allow">>}
|
||||
, {<<"timestamp">>, erlang:system_time(millisecond)}
|
||||
, {<<"node">>, node()}
|
||||
].
|
||||
|
||||
columns_example_props(PropType) ->
|
||||
|
@ -766,6 +814,7 @@ event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
|
|||
event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
|
||||
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
|
||||
event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
|
||||
event_name(<<"$events/client_check_acl_complete", _/binary>>) -> 'client.check_acl_complete';
|
||||
event_name(_) -> 'message.publish'.
|
||||
|
||||
event_topic('client.connected') -> <<"$events/client_connected">>;
|
||||
|
@ -777,7 +826,8 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>;
|
|||
event_topic('message.acked') -> <<"$events/message_acked">>;
|
||||
event_topic('message.dropped') -> <<"$events/message_dropped">>;
|
||||
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
|
||||
event_topic('message.publish') -> <<"$events/message_publish">>.
|
||||
event_topic('message.publish') -> <<"$events/message_publish">>;
|
||||
event_topic('client.check_acl_complete') -> <<"$events/client_check_acl_complete">>.
|
||||
|
||||
printable_maps(undefined) -> #{};
|
||||
printable_maps(Headers) ->
|
||||
|
|
|
@ -101,7 +101,6 @@ groups() ->
|
|||
t_sqlselect_2_2,
|
||||
t_sqlselect_2_3,
|
||||
t_sqlselect_3,
|
||||
t_sqlselect_3_1,
|
||||
t_sqlparse_event_1,
|
||||
t_sqlparse_event_2,
|
||||
t_sqlparse_event_3,
|
||||
|
@ -198,6 +197,8 @@ init_per_testcase(t_events, Config) ->
|
|||
description = #{en => <<"Hook metrics action">>}}),
|
||||
SQL = "SELECT * FROM \"$events/client_connected\", "
|
||||
"\"$events/client_disconnected\", "
|
||||
"\"$events/client_connack\", "
|
||||
"\"$events/client_check_acl_complete\", "
|
||||
"\"$events/session_subscribed\", "
|
||||
"\"$events/session_unsubscribed\", "
|
||||
"\"$events/message_acked\", "
|
||||
|
@ -1014,7 +1015,7 @@ t_events(_Config) ->
|
|||
, {proto_ver, v5}
|
||||
, {properties, #{'Session-Expiry-Interval' => 60}}
|
||||
]),
|
||||
ct:pal("====== verify $events/client_connected"),
|
||||
ct:pal("====== verify $events/client_connected, $events/client_connack"),
|
||||
client_connected(Client, Client2),
|
||||
ct:pal("====== verify $events/session_subscribed"),
|
||||
session_subscribed(Client2),
|
||||
|
@ -1040,8 +1041,10 @@ message_publish(Client) ->
|
|||
client_connected(Client, Client2) ->
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
{ok, _} = emqtt:connect(Client2),
|
||||
verify_event('client.connack'),
|
||||
verify_event('client.connected'),
|
||||
ok.
|
||||
|
||||
client_disconnected(Client, Client2) ->
|
||||
ok = emqtt:disconnect(Client, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}),
|
||||
ok = emqtt:disconnect(Client2, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}),
|
||||
|
@ -1054,6 +1057,7 @@ session_subscribed(Client2) ->
|
|||
, 1
|
||||
),
|
||||
verify_event('session.subscribed'),
|
||||
verify_event('client.check_acl_complete'),
|
||||
ok.
|
||||
session_unsubscribed(Client2) ->
|
||||
{ok, _, _} = emqtt:unsubscribe( Client2
|
||||
|
@ -1443,31 +1447,6 @@ t_sqlselect_3(_Config) ->
|
|||
emqtt:stop(Client),
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
t_sqlselect_3_1(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
%% republish the client.connected msg
|
||||
TopicRule = create_simple_repub_rule(
|
||||
<<"t2">>,
|
||||
"SELECT * "
|
||||
"FROM \"$events/client_connack\" "
|
||||
"WHERE username = 'emqx1'",
|
||||
<<"clientid=${clientid}">>),
|
||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
|
||||
ct:sleep(200),
|
||||
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
|
||||
{ok, _} = emqtt:connect(Client1),
|
||||
receive {publish, #{topic := T, payload := Payload}} ->
|
||||
?assertEqual(<<"t2">>, T),
|
||||
?assertEqual(<<"clientid=c_emqx1">>, Payload)
|
||||
after 1000 ->
|
||||
ct:fail(wait_for_t2)
|
||||
end,
|
||||
|
||||
emqtt:stop(Client),
|
||||
emqx_rule_registry:remove_rule(TopicRule).
|
||||
|
||||
t_metrics(_Config) ->
|
||||
ok = emqx_rule_engine:load_providers(),
|
||||
TopicRule = create_simple_repub_rule(
|
||||
|
@ -2670,6 +2649,37 @@ verify_event_fields('client.disconnected', Fields) ->
|
|||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
|
||||
verify_event_fields('client.connack', Fields) ->
|
||||
#{clientid := ClientId,
|
||||
clean_start := CleanStart,
|
||||
username := Username,
|
||||
peername := PeerName,
|
||||
sockname := SockName,
|
||||
proto_name := ProtoName,
|
||||
proto_ver := ProtoVer,
|
||||
keepalive := Keepalive,
|
||||
expiry_interval := ExpiryInterval,
|
||||
conn_props := Properties,
|
||||
timestamp := Timestamp,
|
||||
connected_at := EventAt
|
||||
} = Fields,
|
||||
Now = erlang:system_time(millisecond),
|
||||
TimestampElapse = Now - Timestamp,
|
||||
RcvdAtElapse = Now - EventAt,
|
||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])),
|
||||
verify_peername(PeerName),
|
||||
verify_peername(SockName),
|
||||
?assertEqual(<<"MQTT">>, ProtoName),
|
||||
?assertEqual(5, ProtoVer),
|
||||
?assert(is_integer(Keepalive)),
|
||||
?assert(is_boolean(CleanStart)),
|
||||
?assertEqual(60, ExpiryInterval),
|
||||
?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
||||
?assert(EventAt =< Timestamp);
|
||||
|
||||
verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed'
|
||||
; SubUnsub == 'session.unsubscribed' ->
|
||||
#{clientid := ClientId,
|
||||
|
@ -2793,7 +2803,20 @@ verify_event_fields('message.acked', Fields) ->
|
|||
?assert(is_map(PubAckProps)),
|
||||
?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000),
|
||||
?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000),
|
||||
?assert(EventAt =< Timestamp).
|
||||
?assert(EventAt =< Timestamp);
|
||||
|
||||
verify_event_fields('client.check_acl_complete', Fields) ->
|
||||
#{clientid := ClientId,
|
||||
action := Action,
|
||||
result := Result,
|
||||
topic := Topic,
|
||||
username := Username
|
||||
} = Fields,
|
||||
?assertEqual(<<"t1">>, Topic),
|
||||
?assert(lists:member(Action, [subscribe, publish])),
|
||||
?assert(lists:member(Result, [allow, deny])),
|
||||
?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])),
|
||||
?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])).
|
||||
|
||||
verify_peername(PeerName) ->
|
||||
case string:split(PeerName, ":") of
|
||||
|
|
|
@ -49,7 +49,10 @@ check_acl(ClientInfo, PubSub, Topic) ->
|
|||
true -> check_acl_cache(ClientInfo, PubSub, Topic);
|
||||
false -> do_check_acl(ClientInfo, PubSub, Topic)
|
||||
end,
|
||||
inc_acl_metrics(Result), Result.
|
||||
inc_acl_metrics(Result),
|
||||
emqx:run_hook('client.check_acl_complete', [ClientInfo, PubSub, Topic, Result]),
|
||||
%% io:format(standard_error, "~p, ~p, ~p, ~p, ~n", [ClientInfo, PubSub, Topic, Result]),
|
||||
Result.
|
||||
|
||||
check_acl_cache(ClientInfo, PubSub, Topic) ->
|
||||
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
|
||||
|
|
Loading…
Reference in New Issue