From 3410e20fbe09081e4010cc5b68bd5da9c100f7b4 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Thu, 24 Mar 2022 14:49:48 +0800 Subject: [PATCH 1/7] feat(emqx_rule_engine_events): add client_connack event --- .../emqx_rule_engine/src/emqx_rule_events.erl | 66 +++++++++++++++++++ .../test/emqx_rule_engine_SUITE.erl | 26 ++++++++ 2 files changed, 92 insertions(+) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 540b1cbbd..0ef00ea85 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -31,6 +31,7 @@ -export([ on_client_connected/3 , on_client_disconnected/4 + , on_client_connack/4 , on_session_subscribed/4 , on_session_unsubscribed/4 , on_message_publish/2 @@ -48,6 +49,7 @@ -define(SUPPORTED_HOOK, [ 'client.connected' , 'client.disconnected' + , 'client.connack' , 'session.subscribed' , 'session.unsubscribed' , 'message.publish' @@ -106,6 +108,10 @@ on_client_disconnected(ClientInfo, Reason, ConnInfo, Env) -> may_publish_and_apply('client.disconnected', fun() -> eventmsg_disconnected(ClientInfo, ConnInfo, Reason) end, Env). +on_client_connack(ConnInfo, Reason, _, Env) -> + may_publish_and_apply('client.connack', + fun() -> eventmsg_connack(ConnInfo, Reason) end, Env). + on_session_subscribed(ClientInfo, Topic, SubOpts, Env) -> may_publish_and_apply('session.subscribed', fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env). @@ -220,6 +226,34 @@ eventmsg_disconnected(_ClientInfo = #{ disconnected_at => DisconnectedAt }). +eventmsg_connack(_ConnInfo = #{ + clientid := ClientId, + clean_start := CleanStart, + username := Username, + peername := PeerName, + sockname := SockName, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive, + connected_at := ConnectedAt, + conn_props := ConnProps, + expiry_interval := ExpiryInterval + }, Reason) -> + with_basic_columns('client.connack', + #{reason_code => reason(Reason), + clientid => ClientId, + clean_start => CleanStart, + username => Username, + peername => ntoa(PeerName), + sockname => ntoa(SockName), + proto_name => ProtoName, + proto_ver => ProtoVer, + keepalive => Keepalive, + expiry_interval => ExpiryInterval, + connected_at => ConnectedAt, + conn_props => printable_maps(ConnProps) + }). + eventmsg_sub_or_unsub(Event, _ClientInfo = #{ clientid := ClientId, username := Username, @@ -372,6 +406,7 @@ event_info() -> , event_info_delivery_dropped() , event_info_client_connected() , event_info_client_disconnected() + , event_info_client_connack() , event_info_session_subscribed() , event_info_session_unsubscribed() ]. @@ -427,6 +462,13 @@ event_info_client_disconnected() -> {<<"client disconnected">>, <<"连接断开"/utf8>>}, <<"SELECT * FROM \"$events/client_disconnected\" WHERE topic =~ 't/#'">> ). +event_info_client_connack() -> + event_info_common( + 'client.connack', + {<<"client connack">>, <<"连接确认"/utf8>>}, + {<<"client connack">>, <<"连接确认"/utf8>>}, + <<"SELECT * FROM \"$events/client_connack\"">> + ). event_info_session_subscribed() -> event_info_common( 'session.subscribed', @@ -485,6 +527,11 @@ test_columns('client.disconnected') -> , {<<"username">>, <<"u_emqx">>} , {<<"reason">>, <<"normal">>} ]; +test_columns('client.connack') -> + [ {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"reason_code">>, <<"sucess">>} + ]; test_columns('session.unsubscribed') -> test_columns('session.subscribed'); test_columns('session.subscribed') -> @@ -607,6 +654,23 @@ columns_with_exam('client.disconnected') -> , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} ]; +columns_with_exam('client.connack') -> + [ {<<"event">>, 'client.connected'} + , {<<"reason_code">>, success} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peername">>, <<"192.168.0.10:56431">>} + , {<<"sockname">>, <<"0.0.0.0:1883">>} + , {<<"proto_name">>, <<"MQTT">>} + , {<<"proto_ver">>, 5} + , {<<"keepalive">>, 60} + , {<<"clean_start">>, true} + , {<<"expiry_interval">>, 3600} + , {<<"connected_at">>, erlang:system_time(millisecond)} + , columns_example_props(conn_props) + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} + ]; columns_with_exam('session.subscribed') -> [ {<<"event">>, 'session.subscribed'} , {<<"clientid">>, <<"c_emqx">>} @@ -694,6 +758,7 @@ ntoa(IpAddr) -> event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected'; event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected'; +event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack'; event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed'; event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed'; @@ -705,6 +770,7 @@ event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; event_topic('client.disconnected') -> <<"$events/client_disconnected">>; +event_topic('client.connack') -> <<"$events/client_connack">>; event_topic('session.subscribed') -> <<"$events/session_subscribed">>; event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>; event_topic('message.delivered') -> <<"$events/message_delivered">>; diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 02e0f607c..ca4b8db0c 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -101,6 +101,7 @@ groups() -> t_sqlselect_2_2, t_sqlselect_2_3, t_sqlselect_3, + t_sqlselect_3_1, t_sqlparse_event_1, t_sqlparse_event_2, t_sqlparse_event_3, @@ -1442,6 +1443,31 @@ t_sqlselect_3(_Config) -> emqtt:stop(Client), emqx_rule_registry:remove_rule(TopicRule). +t_sqlselect_3_1(_Config) -> + ok = emqx_rule_engine:load_providers(), + %% republish the client.connected msg + TopicRule = create_simple_repub_rule( + <<"t2">>, + "SELECT * " + "FROM \"$events/client_connack\" " + "WHERE username = 'emqx1'", + <<"clientid=${clientid}">>), + {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), + ct:sleep(200), + {ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]), + {ok, _} = emqtt:connect(Client1), + receive {publish, #{topic := T, payload := Payload}} -> + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"clientid=c_emqx1">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, + + emqtt:stop(Client), + emqx_rule_registry:remove_rule(TopicRule). + t_metrics(_Config) -> ok = emqx_rule_engine:load_providers(), TopicRule = create_simple_repub_rule( From 326b01968bfdf61c712322870b1a10b79426d1d4 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Thu, 24 Mar 2022 16:52:01 +0800 Subject: [PATCH 2/7] fix(appup): load_module emqx_rule_events --- apps/emqx_rule_engine/src/emqx_rule_engine.appup.src | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 0028cc988..d5b2c6319 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.1", - [{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -18,7 +19,8 @@ {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.1", - [{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, From e0d142c625497e0e782cca86253cb1461c513fff Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 25 Mar 2022 11:08:30 +0800 Subject: [PATCH 3/7] feat: add client.check_acl_complete event --- .../emqx_rule_engine/src/emqx_rule_events.erl | 52 +++++++++++- .../test/emqx_rule_engine_SUITE.erl | 79 ++++++++++++------- src/emqx_access_control.erl | 5 +- 3 files changed, 106 insertions(+), 30 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 0ef00ea85..1a03d01fc 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -39,6 +39,7 @@ , on_message_delivered/3 , on_message_acked/3 , on_delivery_dropped/4 + , on_client_check_acl_complete/5 ]). -export([ event_info/0 @@ -57,6 +58,7 @@ , 'message.acked' , 'message.dropped' , 'delivery.dropped' + , 'client.check_acl_complete' ]). -ifdef(TEST). @@ -112,6 +114,13 @@ on_client_connack(ConnInfo, Reason, _, Env) -> may_publish_and_apply('client.connack', fun() -> eventmsg_connack(ConnInfo, Reason) end, Env). +on_client_check_acl_complete(ClientInfo, PubSub, Topic, Result, Env) -> + may_publish_and_apply('client.check_acl_complete', + fun() -> eventmsg_check_acl_complete(ClientInfo, + PubSub, + Topic, + Result) end, Env). + on_session_subscribed(ClientInfo, Topic, SubOpts, Env) -> may_publish_and_apply('session.subscribed', fun() -> eventmsg_sub_or_unsub('session.subscribed', ClientInfo, Topic, SubOpts) end, Env). @@ -253,6 +262,19 @@ eventmsg_connack(_ConnInfo = #{ connected_at => ConnectedAt, conn_props => printable_maps(ConnProps) }). +eventmsg_check_acl_complete(_ClientInfo = #{ + clientid := ClientId, + username := Username, + peerhost := PeerHost + }, PubSub, Topic, Result) -> + with_basic_columns('client.check_acl_complete', + #{clientid => ClientId, + username => Username, + peerhost => ntoa(PeerHost), + topic => Topic, + action => PubSub, + result => Result + }). eventmsg_sub_or_unsub(Event, _ClientInfo = #{ clientid := ClientId, @@ -409,6 +431,7 @@ event_info() -> , event_info_client_connack() , event_info_session_subscribed() , event_info_session_unsubscribed() + , event_info_client_check_acl_complete() ]. event_info_message_publish() -> @@ -483,6 +506,13 @@ event_info_session_unsubscribed() -> {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>}, <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">> ). +event_info_client_check_acl_complete() -> + event_info_common( + 'client.check_acl_complete', + {<<"client check acl complete">>, <<"鉴权结果"/utf8>>}, + {<<"client check acl complete">>, <<"鉴权结果"/utf8>>}, + <<"SELECT * FROM \"$events/client_check_acl_complete\"">> + ). event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) -> #{event => event_topic(Event), @@ -539,6 +569,13 @@ test_columns('session.subscribed') -> , {<<"username">>, <<"u_emqx">>} , {<<"topic">>, <<"t/a">>} , {<<"qos">>, 1} + ]; +test_columns('client.check_acl_complete') -> + [ {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"topic">>, <<"t/1">>} + , {<<"action">>, <<"publish">>} + , {<<"result">>, <<"allow">>} ]. columns_with_exam('message.publish') -> @@ -692,6 +729,17 @@ columns_with_exam('session.unsubscribed') -> , columns_example_props(unsub_props) , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} + ]; +columns_with_exam('client.check_acl_complete') -> + [ {<<"event">>, 'client.check_acl_complete'} + , {<<"clientid">>, <<"c_emqx">>} + , {<<"username">>, <<"u_emqx">>} + , {<<"peerhost">>, <<"192.168.0.10">>} + , {<<"topic">>, <<"t/a">>} + , {<<"action">>, <<"publish">>} + , {<<"result">>, <<"allow">>} + , {<<"timestamp">>, erlang:system_time(millisecond)} + , {<<"node">>, node()} ]. columns_example_props(PropType) -> @@ -766,6 +814,7 @@ event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered'; event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked'; event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped'; event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped'; +event_name(<<"$events/client_check_acl_complete", _/binary>>) -> 'client.check_acl_complete'; event_name(_) -> 'message.publish'. event_topic('client.connected') -> <<"$events/client_connected">>; @@ -777,7 +826,8 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>; event_topic('message.acked') -> <<"$events/message_acked">>; event_topic('message.dropped') -> <<"$events/message_dropped">>; event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>; -event_topic('message.publish') -> <<"$events/message_publish">>. +event_topic('message.publish') -> <<"$events/message_publish">>; +event_topic('client.check_acl_complete') -> <<"$events/client_check_acl_complete">>. printable_maps(undefined) -> #{}; printable_maps(Headers) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index ca4b8db0c..136db6695 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -101,7 +101,6 @@ groups() -> t_sqlselect_2_2, t_sqlselect_2_3, t_sqlselect_3, - t_sqlselect_3_1, t_sqlparse_event_1, t_sqlparse_event_2, t_sqlparse_event_3, @@ -198,6 +197,8 @@ init_per_testcase(t_events, Config) -> description = #{en => <<"Hook metrics action">>}}), SQL = "SELECT * FROM \"$events/client_connected\", " "\"$events/client_disconnected\", " + "\"$events/client_connack\", " + "\"$events/client_check_acl_complete\", " "\"$events/session_subscribed\", " "\"$events/session_unsubscribed\", " "\"$events/message_acked\", " @@ -1014,7 +1015,7 @@ t_events(_Config) -> , {proto_ver, v5} , {properties, #{'Session-Expiry-Interval' => 60}} ]), - ct:pal("====== verify $events/client_connected"), + ct:pal("====== verify $events/client_connected, $events/client_connack"), client_connected(Client, Client2), ct:pal("====== verify $events/session_subscribed"), session_subscribed(Client2), @@ -1040,8 +1041,10 @@ message_publish(Client) -> client_connected(Client, Client2) -> {ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client2), + verify_event('client.connack'), verify_event('client.connected'), ok. + client_disconnected(Client, Client2) -> ok = emqtt:disconnect(Client, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}), ok = emqtt:disconnect(Client2, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}), @@ -1054,6 +1057,7 @@ session_subscribed(Client2) -> , 1 ), verify_event('session.subscribed'), + verify_event('client.check_acl_complete'), ok. session_unsubscribed(Client2) -> {ok, _, _} = emqtt:unsubscribe( Client2 @@ -1443,31 +1447,6 @@ t_sqlselect_3(_Config) -> emqtt:stop(Client), emqx_rule_registry:remove_rule(TopicRule). -t_sqlselect_3_1(_Config) -> - ok = emqx_rule_engine:load_providers(), - %% republish the client.connected msg - TopicRule = create_simple_repub_rule( - <<"t2">>, - "SELECT * " - "FROM \"$events/client_connack\" " - "WHERE username = 'emqx1'", - <<"clientid=${clientid}">>), - {ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]), - {ok, _} = emqtt:connect(Client), - {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), - ct:sleep(200), - {ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]), - {ok, _} = emqtt:connect(Client1), - receive {publish, #{topic := T, payload := Payload}} -> - ?assertEqual(<<"t2">>, T), - ?assertEqual(<<"clientid=c_emqx1">>, Payload) - after 1000 -> - ct:fail(wait_for_t2) - end, - - emqtt:stop(Client), - emqx_rule_registry:remove_rule(TopicRule). - t_metrics(_Config) -> ok = emqx_rule_engine:load_providers(), TopicRule = create_simple_repub_rule( @@ -2670,6 +2649,37 @@ verify_event_fields('client.disconnected', Fields) -> ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000), ?assert(EventAt =< Timestamp); +verify_event_fields('client.connack', Fields) -> + #{clientid := ClientId, + clean_start := CleanStart, + username := Username, + peername := PeerName, + sockname := SockName, + proto_name := ProtoName, + proto_ver := ProtoVer, + keepalive := Keepalive, + expiry_interval := ExpiryInterval, + conn_props := Properties, + timestamp := Timestamp, + connected_at := EventAt + } = Fields, + Now = erlang:system_time(millisecond), + TimestampElapse = Now - Timestamp, + RcvdAtElapse = Now - EventAt, + ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), + ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), + verify_peername(PeerName), + verify_peername(SockName), + ?assertEqual(<<"MQTT">>, ProtoName), + ?assertEqual(5, ProtoVer), + ?assert(is_integer(Keepalive)), + ?assert(is_boolean(CleanStart)), + ?assertEqual(60, ExpiryInterval), + ?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties), + ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000), + ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000), + ?assert(EventAt =< Timestamp); + verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed' ; SubUnsub == 'session.unsubscribed' -> #{clientid := ClientId, @@ -2793,7 +2803,20 @@ verify_event_fields('message.acked', Fields) -> ?assert(is_map(PubAckProps)), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60*1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60*1000), - ?assert(EventAt =< Timestamp). + ?assert(EventAt =< Timestamp); + +verify_event_fields('client.check_acl_complete', Fields) -> + #{clientid := ClientId, + action := Action, + result := Result, + topic := Topic, + username := Username + } = Fields, + ?assertEqual(<<"t1">>, Topic), + ?assert(lists:member(Action, [subscribe, publish])), + ?assert(lists:member(Result, [allow, deny])), + ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), + ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])). verify_peername(PeerName) -> case string:split(PeerName, ":") of diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index fb0741c0c..8de296a6d 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -49,7 +49,10 @@ check_acl(ClientInfo, PubSub, Topic) -> true -> check_acl_cache(ClientInfo, PubSub, Topic); false -> do_check_acl(ClientInfo, PubSub, Topic) end, - inc_acl_metrics(Result), Result. + inc_acl_metrics(Result), + emqx:run_hook('client.check_acl_complete', [ClientInfo, PubSub, Topic, Result]), + %% io:format(standard_error, "~p, ~p, ~p, ~p, ~n", [ClientInfo, PubSub, Topic, Result]), + Result. check_acl_cache(ClientInfo, PubSub, Topic) -> case emqx_acl_cache:get_acl_cache(PubSub, Topic) of From 0ffb66ee7fa57f721f36a990febad32ac7eaec26 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 25 Mar 2022 11:12:55 +0800 Subject: [PATCH 4/7] fix(CHANGES): update CHANGES-4.4.md --- CHANGES-4.4.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES-4.4.md b/CHANGES-4.4.md index 59c62d9c5..6ac86a0f1 100644 --- a/CHANGES-4.4.md +++ b/CHANGES-4.4.md @@ -1,5 +1,8 @@ # EMQ X 4.4 Changes +### Enhancements +* Add rule events: client.connack, client.check_acl_complete + ## v4.4.2 **NOTE**: v4.4.2 is in sync with: v4.3.13 From 3eab6b436bdf44ee267e07d9b7384864620f4e54 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 25 Mar 2022 11:15:23 +0800 Subject: [PATCH 5/7] fix(appup): load_module emqx_access_control --- apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl | 3 +-- src/emqx.appup.src | 6 ++++-- src/emqx_access_control.erl | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 136db6695..bc4f968aa 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1017,7 +1017,7 @@ t_events(_Config) -> ]), ct:pal("====== verify $events/client_connected, $events/client_connack"), client_connected(Client, Client2), - ct:pal("====== verify $events/session_subscribed"), + ct:pal("====== verify $events/session_subscribed, $events/client_check_acl_complete"), session_subscribed(Client2), ct:pal("====== verify t1"), message_publish(Client), @@ -1044,7 +1044,6 @@ client_connected(Client, Client2) -> verify_event('client.connack'), verify_event('client.connected'), ok. - client_disconnected(Client, Client2) -> ok = emqtt:disconnect(Client, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}), ok = emqtt:disconnect(Client2, 0, #{'User-Property' => {<<"reason">>, <<"normal">>}}), diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 383db43f1..372e26df7 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -2,7 +2,8 @@ %% Unless you know what you are doing, DO NOT edit manually!! {VSN, [{"4.4.1", - [{load_module,emqx_frame,brutal_purge,soft_purge,[]}, + [{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, @@ -44,7 +45,8 @@ {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{"4.4.1", - [{load_module,emqx_frame,brutal_purge,soft_purge,[]}, + [{load_module,emqx_access_control,brutal_purge,soft_purge,[]}, + {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_session,brutal_purge,soft_purge,[]}, diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 8de296a6d..4598bea38 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -51,7 +51,6 @@ check_acl(ClientInfo, PubSub, Topic) -> end, inc_acl_metrics(Result), emqx:run_hook('client.check_acl_complete', [ClientInfo, PubSub, Topic, Result]), - %% io:format(standard_error, "~p, ~p, ~p, ~p, ~n", [ClientInfo, PubSub, Topic, Result]), Result. check_acl_cache(ClientInfo, PubSub, Topic) -> From 44f4dfa49854d51d9abd342b60dcd1f683e1b648 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 25 Mar 2022 13:40:29 +0800 Subject: [PATCH 6/7] fix(CHANGES): update CHANGES-4.4.md --- CHANGES-4.4.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES-4.4.md b/CHANGES-4.4.md index 6ac86a0f1..3b1c4c911 100644 --- a/CHANGES-4.4.md +++ b/CHANGES-4.4.md @@ -2,6 +2,8 @@ ### Enhancements * Add rule events: client.connack, client.check_acl_complete +- client.connack The rule event is triggered when the server sends a CONNACK packet to the client. reason_code contains the error reason code. +- client.check_acl_complete The rule event is triggered when the client check acl complete. ## v4.4.2 From 059fc6e3c7dd5bda9a518bbc802153cabc13d58b Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 25 Mar 2022 18:05:25 +0800 Subject: [PATCH 7/7] feat(emqx_rule_events): add field 'is_cache' --- apps/emqx_rule_engine/src/emqx_rule_events.erl | 11 +++++++---- .../emqx_rule_engine/test/emqx_rule_engine_SUITE.erl | 2 ++ src/emqx_access_control.erl | 12 +++++++----- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 1a03d01fc..1cef282c8 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -39,7 +39,7 @@ , on_message_delivered/3 , on_message_acked/3 , on_delivery_dropped/4 - , on_client_check_acl_complete/5 + , on_client_check_acl_complete/6 ]). -export([ event_info/0 @@ -114,12 +114,13 @@ on_client_connack(ConnInfo, Reason, _, Env) -> may_publish_and_apply('client.connack', fun() -> eventmsg_connack(ConnInfo, Reason) end, Env). -on_client_check_acl_complete(ClientInfo, PubSub, Topic, Result, Env) -> +on_client_check_acl_complete(ClientInfo, PubSub, Topic, Result, IsCache, Env) -> may_publish_and_apply('client.check_acl_complete', fun() -> eventmsg_check_acl_complete(ClientInfo, PubSub, Topic, - Result) end, Env). + Result, + IsCache) end, Env). on_session_subscribed(ClientInfo, Topic, SubOpts, Env) -> may_publish_and_apply('session.subscribed', @@ -266,13 +267,14 @@ eventmsg_check_acl_complete(_ClientInfo = #{ clientid := ClientId, username := Username, peerhost := PeerHost - }, PubSub, Topic, Result) -> + }, PubSub, Topic, Result, IsCache) -> with_basic_columns('client.check_acl_complete', #{clientid => ClientId, username => Username, peerhost => ntoa(PeerHost), topic => Topic, action => PubSub, + is_cache => IsCache, result => Result }). @@ -737,6 +739,7 @@ columns_with_exam('client.check_acl_complete') -> , {<<"peerhost">>, <<"192.168.0.10">>} , {<<"topic">>, <<"t/a">>} , {<<"action">>, <<"publish">>} + , {<<"is_cache">>, <<"false">>} , {<<"result">>, <<"allow">>} , {<<"timestamp">>, erlang:system_time(millisecond)} , {<<"node">>, node()} diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index bc4f968aa..2a0498d2f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -2809,11 +2809,13 @@ verify_event_fields('client.check_acl_complete', Fields) -> action := Action, result := Result, topic := Topic, + is_cache := IsCache, username := Username } = Fields, ?assertEqual(<<"t1">>, Topic), ?assert(lists:member(Action, [subscribe, publish])), ?assert(lists:member(Result, [allow, deny])), + ?assert(lists:member(IsCache, [true, false])), ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])). diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 4598bea38..11eb5efb2 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -50,7 +50,6 @@ check_acl(ClientInfo, PubSub, Topic) -> false -> do_check_acl(ClientInfo, PubSub, Topic) end, inc_acl_metrics(Result), - emqx:run_hook('client.check_acl_complete', [ClientInfo, PubSub, Topic, Result]), Result. check_acl_cache(ClientInfo, PubSub, Topic) -> @@ -61,15 +60,18 @@ check_acl_cache(ClientInfo, PubSub, Topic) -> AclResult; AclResult -> inc_acl_metrics(cache_hit), + emqx:run_hook('client.check_acl_complete', [ClientInfo, PubSub, Topic, AclResult, true]), AclResult end. do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) -> Default = emqx_zone:get_env(Zone, acl_nomatch, deny), - case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], Default) of - allow -> allow; - _Other -> deny - end. + Result = case run_hooks('client.check_acl', [ClientInfo, PubSub, Topic], Default) of + allow -> allow; + _Other -> deny + end, + emqx:run_hook('client.check_acl_complete', [ClientInfo, PubSub, Topic, Result, false]), + Result. default_auth_result(Zone) -> case emqx_zone:get_env(Zone, allow_anonymous, false) of