diff --git a/apps/emqx/src/emqx_access_control.erl b/apps/emqx/src/emqx_access_control.erl index 565644cf8..b853949f5 100644 --- a/apps/emqx/src/emqx_access_control.erl +++ b/apps/emqx/src/emqx_access_control.erl @@ -60,6 +60,10 @@ check_authorization_cache(ClientInfo, PubSub, Topic) -> emqx_authz_cache:put_authz_cache(PubSub, Topic, AuthzResult), AuthzResult; AuthzResult -> + emqx:run_hook( + 'client.check_authz_complete', + [ClientInfo, PubSub, Topic, AuthzResult, cache] + ), inc_acl_metrics(cache_hit), AuthzResult end. diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index 00309d560..916b08632 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -283,14 +283,18 @@ authorize(#{username := Username, peerhost := IpAddress } = Client, PubSub, Topic, DefaultResult, Sources) -> case do_authorize(Client, PubSub, Topic, Sources) of - {matched, allow} -> + {{matched, allow}, AuthzSource}-> + emqx:run_hook('client.check_authz_complete', + [Client, PubSub, Topic, allow, AuthzSource]), ?SLOG(info, #{msg => "authorization_permission_allowed", username => Username, ipaddr => IpAddress, topic => Topic}), emqx_metrics:inc(?METRIC_ALLOW), {stop, allow}; - {matched, deny} -> + {{matched, deny}, AuthzSource}-> + emqx:run_hook('client.check_authz_complete', + [Client, PubSub, Topic, deny, AuthzSource]), ?SLOG(info, #{msg => "authorization_permission_denied", username => Username, ipaddr => IpAddress, @@ -298,6 +302,8 @@ authorize(#{username := Username, emqx_metrics:inc(?METRIC_DENY), {stop, deny}; nomatch -> + emqx:run_hook('client.check_authz_complete', + [Client, PubSub, Topic, DefaultResult, default]), ?SLOG(info, #{msg => "authorization_failed_nomatch", username => Username, ipaddr => IpAddress, @@ -316,7 +322,7 @@ do_authorize(Client, PubSub, Topic, Module = authz_module(Type), case Module:authorize(Client, PubSub, Topic, Connector) of nomatch -> do_authorize(Client, PubSub, Topic, Tail); - Matched -> Matched + Matched -> {Matched, Type} end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl index d676767f2..32c53d5cd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl @@ -73,6 +73,8 @@ fields("rule_test") -> , ref("ctx_dropped") , ref("ctx_connected") , ref("ctx_disconnected") + , ref("ctx_connack") + , ref("ctx_check_authz_complete") , ref("ctx_bridge_mqtt") ]), #{desc => "The context of the event for testing", @@ -208,6 +210,31 @@ fields("ctx_disconnected") -> desc => "The Time that this Client is Disconnected"})} ]; +fields("ctx_connack") -> + [ {"event_type", sc(client_connack, #{desc => "Event Type", required => true})} + , {"reason_code", sc(binary(), #{desc => "The reason code"})} + , {"clientid", sc(binary(), #{desc => "The Client ID"})} + , {"clean_start", sc(boolean(), #{desc => "Clean Start", default => true})} + , {"username", sc(binary(), #{desc => "The User Name"})} + , {"peername", sc(binary(), #{desc => "The IP Address and Port of the Peer Client"})} + , {"sockname", sc(binary(), #{desc => "The IP Address and Port of the Local Listener"})} + , {"proto_name", sc(binary(), #{desc => "Protocol Name"})} + , {"proto_ver", sc(binary(), #{desc => "Protocol Version"})} + , {"keepalive", sc(integer(), #{desc => "KeepAlive"})} + , {"expiry_interval", sc(integer(), #{desc => "Expiry Interval"})} + , {"connected_at", sc(integer(), #{ + desc => "The Time that this Client is Connected"})} + ]; +fields("ctx_check_authz_complete") -> + [ {"event_type", sc(client_check_authz_complete, #{desc => "Event Type", required => true})} + , {"clientid", sc(binary(), #{desc => "The Client ID"})} + , {"username", sc(binary(), #{desc => "The User Name"})} + , {"peerhost", sc(binary(), #{desc => "The IP Address of the Peer Client"})} + , {"topic", sc(binary(), #{desc => "Message Topic"})} + , {"action", sc(binary(), #{desc => "Publish or Subscribe"})} + , {"authz_source", sc(binary(), #{desc => "Cache, Plugs or Default"})} + , {"result", sc(binary(), #{desc => "Allow or Deny"})} + ]; fields("ctx_bridge_mqtt") -> [ {"event_type", sc('$bridges/mqtt:*', #{desc => "Event Type", required => true})} , {"id", sc(binary(), #{desc => "Message ID"})} diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 89731e0bd..b0c27cf6a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -33,6 +33,8 @@ -export([ on_client_connected/3 , on_client_disconnected/4 + , on_client_connack/4 + , on_client_check_authz_complete/6 , on_session_subscribed/4 , on_session_unsubscribed/4 , on_message_publish/2 @@ -60,6 +62,8 @@ event_names() -> [ 'client.connected' , 'client.disconnected' + , 'client.connack' + , 'client.check_authz_complete' , 'session.subscribed' , 'session.unsubscribed' , 'message.publish' @@ -108,6 +112,18 @@ on_client_connected(ClientInfo, ConnInfo, Env) -> apply_event('client.connected', fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env). +on_client_connack(ConnInfo, Reason, _, Env) -> + apply_event('client.connack', + fun() -> eventmsg_connack(ConnInfo, Reason) end, Env). + +on_client_check_authz_complete(ClientInfo, PubSub, Topic, Result, AuthzSource, Env) -> + apply_event('client.check_authz_complete', + fun() -> eventmsg_check_authz_complete(ClientInfo, + PubSub, + Topic, + Result, + AuthzSource) end, Env). + on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> apply_event('client.disconnected', fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env). @@ -235,6 +251,49 @@ eventmsg_disconnected(_ClientInfo = #{ disconnected_at => DisconnectedAt }). +eventmsg_connack(_ConnInfo = #{ + clientid := ClientId, + clean_start := CleanStart, + username := Username, + peername := PeerName, + sockname := SockName, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive, + connected_at := ConnectedAt, + conn_props := ConnProps, + expiry_interval := ExpiryInterval + }, Reason) -> + with_basic_columns('client.connack', + #{reason_code => reason(Reason), + clientid => ClientId, + clean_start => CleanStart, + username => Username, + peername => ntoa(PeerName), + sockname => ntoa(SockName), + proto_name => ProtoName, + proto_ver => ProtoVer, + keepalive => Keepalive, + expiry_interval => ExpiryInterval, + connected_at => ConnectedAt, + conn_props => printable_maps(ConnProps) + }). + +eventmsg_check_authz_complete(_ClientInfo = #{ + clientid := ClientId, + username := Username, + peerhost := PeerHost + }, PubSub, Topic, Result, AuthzSource) -> + with_basic_columns('client.check_authz_complete', + #{clientid => ClientId, + username => Username, + peerhost => ntoa(PeerHost), + topic => Topic, + action => PubSub, + authz_source => AuthzSource, + result => Result + }). + eventmsg_sub_or_unsub(Event, _ClientInfo = #{ clientid := ClientId, username := Username, @@ -378,6 +437,8 @@ event_info() -> , event_info_message_dropped() , event_info_client_connected() , event_info_client_disconnected() + , event_info_client_connack() + , event_info_client_check_authz_complete() , event_info_session_subscribed() , event_info_session_unsubscribed() , event_info_delivery_dropped() @@ -435,6 +496,20 @@ event_info_client_disconnected() -> {<<"client disconnected">>, <<"连接断开"/utf8>>}, <<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">> ). +event_info_client_connack() -> + event_info_common( + 'client.connack', + {<<"client connack">>, <<"连接确认"/utf8>>}, + {<<"client connack">>, <<"连接确认"/utf8>>}, + <<"SELECT * FROM \"$events/client_connack\"">> + ). +event_info_client_check_authz_complete() -> + event_info_common( + 'client.check_authz_complete', + {<<"client check authz complete">>, <<"鉴权结果"/utf8>>}, + {<<"client check authz complete">>, <<"鉴权结果"/utf8>>}, + <<"SELECT * FROM \"$events/client_check_authz_complete\"">> + ). event_info_session_subscribed() -> event_info_common( 'session.subscribed', @@ -500,6 +575,18 @@ test_columns('client.disconnected') -> , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} , {<<"reason">>, [<<"normal">>, <<"the reason for shutdown">>]} ]; +test_columns('client.connack') -> + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} + , {<<"reason_code">>, [<<"sucess">>, <<"the reason code">>]} + ]; +test_columns('client.check_authz_complete') -> + [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]} + , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]} + , {<<"topic">>, [<<"t/1">>, <<"the topic of the MQTT message">>]} + , {<<"action">>, [<<"publish">>, <<"the action of publish or subscribe">>]} + , {<<"result">>, [<<"allow">>,<<"the authz check complete result">>]} + ]; test_columns('session.unsubscribed') -> test_columns('session.subscribed'); test_columns('session.subscribed') -> @@ -600,6 +687,35 @@ columns_with_exam('client.disconnected') -> , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; +columns_with_exam('client.connack') -> + [ {<<"event">>, 'client.connected'} + , {<<"reason_code">>, success} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peername">>, <<"192.168.0.10:56431">>} + , {<<"sockname">>, <<"0.0.0.0:1883">>} + , {<<"proto_name">>, <<"MQTT">>} + , {<<"proto_ver">>, 5} + , {<<"keepalive">>, 60} + , {<<"clean_start">>, true} + , {<<"expiry_interval">>, 3600} + , {<<"connected_at">>, erlang:system_time(millisecond)} + , columns_example_props(conn_props) + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; +columns_with_exam('client.check_authz_complete') -> + [ {<<"event">>, 'client.check_authz_complete'} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"action">>, <<"publish">>} + , {<<"authz_source">>, <<"cache">>} + , {<<"result">>, <<"allow">>} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; columns_with_exam('session.subscribed') -> [ columns_example_props(sub_props) ] ++ columns_message_sub_unsub('session.subscribed'); @@ -710,6 +826,9 @@ ntoa(IpAddr) -> event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected'; event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected'; +event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack'; +event_name(<<"$events/client_check_authz_complete", _/binary>>) -> + 'client.check_authz_complete'; event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed'; event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed'; @@ -722,6 +841,9 @@ event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; +event_topic('client.connack') -> <<"$events/client_connack">>; +event_topic('client.check_authz_complete') -> + <<"$events/client_check_authz_complete">>; event_topic('session.subscribed') -> <<"$events/session_subscribed">>; event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 8f0bc0beb..7700e305d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -105,13 +105,24 @@ groups() -> init_per_suite(Config) -> application:load(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]), + ok = emqx_common_test_helpers:start_apps( + [emqx_conf, emqx_rule_engine, emqx_authz], + fun set_special_configs/1), Config. end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]), ok. +set_special_configs(emqx_authz) -> + {ok, _} = emqx:update_config( + [authorization], + #{<<"no_match">> => atom_to_binary(allow), + <<"cache">> => #{<<"enable">> => atom_to_binary(true)}, + <<"sources">> => []}), + ok; +set_special_configs(_) -> + ok. on_resource_create(_id, _) -> #{}. on_resource_destroy(_id, _) -> ok. on_get_resource_status(_id, _) -> #{}. @@ -139,6 +150,8 @@ init_per_testcase(t_events, Config) -> init_events_counters(), SQL = "SELECT * FROM \"$events/client_connected\", " "\"$events/client_disconnected\", " + "\"$events/client_connack\", " + "\"$events/client_check_authz_complete\", " "\"$events/session_subscribed\", " "\"$events/session_unsubscribed\", " "\"$events/message_acked\", " @@ -321,7 +334,7 @@ t_events(_Config) -> , {proto_ver, v5} , {properties, #{'Session-Expiry-Interval' => 60}} ]), - ct:pal("====== verify $events/client_connected"), + ct:pal("====== verify $events/client_connected, $events/client_connack"), client_connected(Client, Client2), ct:pal("====== verify $events/message_dropped"), message_dropped(Client), @@ -349,6 +362,7 @@ message_publish(Client) -> client_connected(Client, Client2) -> {ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client2), + verify_event('client.connack'), verify_event('client.connected'), ok. client_disconnected(Client, Client2) -> @@ -359,6 +373,7 @@ client_disconnected(Client, Client2) -> session_subscribed(Client2) -> {ok, _, _} = emqtt:subscribe(Client2, #{'User-Property' => {<<"topic_name">>, <<"t1">>}}, <<"t1">>, 1), verify_event('session.subscribed'), + verify_event('client.check_authz_complete'), ok. session_unsubscribed(Client2) -> {ok, _, _} = emqtt:unsubscribe(Client2, #{'User-Property' => {<<"topic_name">>, <<"t1">>}}, <<"t1">>), @@ -1638,7 +1653,55 @@ verify_event_fields('message.acked', Fields) -> ?assert(is_map(PubAckProps)), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000), - ?assert(EventAt =< Timestamp). + ?assert(EventAt =< Timestamp); + +verify_event_fields('client.connack', Fields) -> + #{clientid := ClientId, + clean_start := CleanStart, + username := Username, + peername := PeerName, + sockname := SockName, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive, + expiry_interval := ExpiryInterval, + conn_props := Properties, + timestamp := Timestamp, + connected_at := EventAt + } = Fields, + Now = erlang:system_time(millisecond), + TimestampElapse = Now - Timestamp, + RcvdAtElapse = Now - EventAt, + ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), + ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), + verify_peername(PeerName), + verify_peername(SockName), + ?assertEqual(<<"MQTT">>, ProtoName), + ?assertEqual(5, ProtoVer), + ?assert(is_integer(Keepalive)), + ?assert(is_boolean(CleanStart)), + ?assertEqual(60000, ExpiryInterval), + ?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties), + ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000), + ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000), + ?assert(EventAt =< Timestamp); + +verify_event_fields('client.check_authz_complete', Fields) -> + #{clientid := ClientId, + action := Action, + result := Result, + topic := Topic, + authz_source := AuthzSource, + username := Username + } = Fields, + ?assertEqual(<<"t1">>, Topic), + ?assert(lists:member(Action, [subscribe, publish])), + ?assert(lists:member(Result, [allow, deny])), + ?assert(lists:member(AuthzSource, [cache, default, file, + http, mongodb, mysql, redis, + postgresql, built_in_database])), + ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), + ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])). verify_peername(PeerName) -> case string:split(PeerName, ":") of