From 9abdff60a142ff252d7cba186076872543141703 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 5 Aug 2024 17:47:52 +0800 Subject: [PATCH 1/2] feat(ruleengine): expose client_attrs to rule-engine --- .../emqx_rule_engine/src/emqx_rule_events.erl | 46 ++++--- .../test/emqx_rule_engine_SUITE.erl | 122 ++++++++++++++---- 2 files changed, 128 insertions(+), 40 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index a51f2bed5..edf70220e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -334,13 +334,14 @@ eventmsg_publish( qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), - publish_received_at => Timestamp + publish_received_at => Timestamp, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}) }, #{headers => Headers} ). eventmsg_connected( - _ClientInfo = #{ + ClientInfo = #{ clientid := ClientId, username := Username, is_bridge := IsBridge, @@ -375,13 +376,14 @@ eventmsg_connected( expiry_interval => ExpiryInterval div 1000, is_bridge => IsBridge, conn_props => printable_maps(ConnProps), - connected_at => ConnectedAt + connected_at => ConnectedAt, + client_attrs => maps:get(client_attrs, ClientInfo, #{}) }, #{} ). eventmsg_disconnected( - _ClientInfo = #{ + ClientInfo = #{ clientid := ClientId, username := Username }, @@ -405,7 +407,8 @@ eventmsg_disconnected( proto_name => ProtoName, proto_ver => ProtoVer, disconn_props => printable_maps(maps:get(disconn_props, ConnInfo, #{})), - disconnected_at => DisconnectedAt + disconnected_at => DisconnectedAt, + client_attrs => maps:get(client_attrs, ClientInfo, #{}) }, #{} ). @@ -444,7 +447,7 @@ eventmsg_connack( ). eventmsg_check_authz_complete( - _ClientInfo = #{ + ClientInfo = #{ clientid := ClientId, username := Username, peerhost := PeerHost, @@ -465,13 +468,14 @@ eventmsg_check_authz_complete( topic => Topic, action => PubSub, authz_source => AuthzSource, - result => Result + result => Result, + client_attrs => maps:get(client_attrs, ClientInfo, #{}) }, #{} ). eventmsg_check_authn_complete( - _ClientInfo = #{ + ClientInfo = #{ clientid := ClientId, username := Username, peername := PeerName @@ -493,14 +497,15 @@ eventmsg_check_authn_complete( peername => ntoa(PeerName), reason_code => force_to_bin(Reason), is_anonymous => IsAnonymous, - is_superuser => IsSuperuser + is_superuser => IsSuperuser, + client_attrs => maps:get(client_attrs, ClientInfo, #{}) }, #{} ). eventmsg_sub_or_unsub( Event, - _ClientInfo = #{ + ClientInfo = #{ clientid := ClientId, username := Username, peerhost := PeerHost, @@ -519,7 +524,8 @@ eventmsg_sub_or_unsub( peername => ntoa(PeerName), PropKey => printable_maps(maps:get(PropKey, SubOpts, #{})), topic => Topic, - qos => QoS + qos => QoS, + client_attrs => maps:get(client_attrs, ClientInfo, #{}) }, #{} ). @@ -551,7 +557,8 @@ eventmsg_dropped( qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), - publish_received_at => Timestamp + publish_received_at => Timestamp, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}) }, #{headers => Headers} ). @@ -583,7 +590,8 @@ eventmsg_transformation_failed( qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), - publish_received_at => Timestamp + publish_received_at => Timestamp, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}) }, #{headers => Headers} ). @@ -616,7 +624,8 @@ eventmsg_validation_failed( qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), - publish_received_at => Timestamp + publish_received_at => Timestamp, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}) }, #{headers => Headers} ). @@ -654,7 +663,8 @@ eventmsg_delivered( qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), - publish_received_at => Timestamp + publish_received_at => Timestamp, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}) }, #{headers => Headers} ). @@ -693,7 +703,8 @@ eventmsg_acked( flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), puback_props => printable_maps(emqx_message:get_header(puback_props, Message, #{})), - publish_received_at => Timestamp + publish_received_at => Timestamp, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}) }, #{headers => Headers} ). @@ -733,7 +744,8 @@ eventmsg_delivery_dropped( qos => QoS, flags => Flags, pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})), - publish_received_at => Timestamp + publish_received_at => Timestamp, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}) }, #{headers => Headers} ). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 230c6ec8a..6dcc0f59e 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -112,7 +112,8 @@ groups() -> t_sqlparse_undefined_variable, t_sqlparse_new_map, t_sqlparse_invalid_json, - t_sqlselect_as_put + t_sqlselect_as_put, + t_sqlselect_client_attr ]}, {events, [], [ t_events, @@ -3891,6 +3892,57 @@ t_trace_rule_id(_Config) -> ?assertEqual([], emqx_trace_handler:running()), emqtt:disconnect(T). +t_sqlselect_client_attr(_) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + {ok, Compiled} = emqx_variform:compile("user_property.group"), + emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], [ + #{ + expression => Compiled, + set_as_attr => <<"group">> + }, + #{ + expression => Compiled, + set_as_attr => <<"group2">> + } + ]), + + SQL = + "SELECT client_attrs as payload FROM \"t/1\" ", + Repub = republish_action(<<"t/2">>), + {ok, _TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [Repub] + } + ), + + {ok, Client} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'User-Property' => [{<<"group">>, <<"g1">>}]}} + ]), + {ok, _} = emqtt:connect(Client), + + {ok, _, _} = emqtt:subscribe(Client, <<"t/2">>, 0), + ct:sleep(100), + emqtt:publish(Client, <<"t/1">>, <<"Hello">>), + + receive + {publish, #{topic := Topic, payload := Payload}} -> + ?assertEqual(<<"t/2">>, Topic), + ?assertMatch( + #{<<"group">> := <<"g1">>, <<"group2">> := <<"g1">>}, + emqx_utils_json:decode(Payload) + ) + after 1000 -> + ct:fail(wait_for_t_2) + end, + + emqtt:disconnect(Client), + emqx_rule_engine:delete_rule(?TMP_RULEID), + emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ @@ -3990,7 +4042,8 @@ verify_event_fields('message.publish', Fields) -> flags := Flags, pub_props := Properties, timestamp := Timestamp, - publish_received_at := EventAt + publish_received_at := EventAt, + client_attrs := ClientAttrs } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, @@ -4007,7 +4060,8 @@ verify_event_fields('message.publish', Fields) -> ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000), - ?assert(EventAt =< Timestamp); + ?assert(EventAt =< Timestamp), + ?assert(is_map(ClientAttrs)); verify_event_fields('client.connected', Fields) -> #{ clientid := ClientId, @@ -4023,7 +4077,8 @@ verify_event_fields('client.connected', Fields) -> is_bridge := IsBridge, conn_props := Properties, timestamp := Timestamp, - connected_at := EventAt + connected_at := EventAt, + client_attrs := ClientAttrs } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, @@ -4042,7 +4097,8 @@ verify_event_fields('client.connected', Fields) -> ?assertMatch(#{'Session-Expiry-Interval' := 60}, Properties), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000), - ?assert(EventAt =< Timestamp); + ?assert(EventAt =< Timestamp), + ?assert(is_map(ClientAttrs)); verify_event_fields('client.disconnected', Fields) -> #{ reason := Reason, @@ -4052,7 +4108,8 @@ verify_event_fields('client.disconnected', Fields) -> sockname := SockName, disconn_props := Properties, timestamp := Timestamp, - disconnected_at := EventAt + disconnected_at := EventAt, + client_attrs := ClientAttrs } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, @@ -4065,7 +4122,8 @@ verify_event_fields('client.disconnected', Fields) -> ?assertMatch(#{'User-Property' := #{<<"reason">> := <<"normal">>}}, Properties), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000), - ?assert(EventAt =< Timestamp); + ?assert(EventAt =< Timestamp), + ?assert(is_map(ClientAttrs)); verify_event_fields(SubUnsub, Fields) when SubUnsub == 'session.subscribed'; SubUnsub == 'session.unsubscribed' @@ -4077,7 +4135,8 @@ verify_event_fields(SubUnsub, Fields) when peername := PeerName, topic := Topic, qos := QoS, - timestamp := Timestamp + timestamp := Timestamp, + client_attrs := ClientAttrs } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, @@ -4097,7 +4156,8 @@ verify_event_fields(SubUnsub, Fields) when #{'User-Property' := #{<<"topic_name">> := <<"t1">>}}, maps:get(PropKey, Fields) ), - ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000); + ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), + ?assert(is_map(ClientAttrs)); verify_event_fields('delivery.dropped', Fields) -> #{ event := 'delivery.dropped', @@ -4117,7 +4177,8 @@ verify_event_fields('delivery.dropped', Fields) -> qos := QoS, flags := Flags, timestamp := Timestamp, - topic := Topic + topic := Topic, + client_attrs := ClientAttrs } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, @@ -4139,7 +4200,8 @@ verify_event_fields('delivery.dropped', Fields) -> ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000), - ?assert(EventAt =< Timestamp); + ?assert(EventAt =< Timestamp), + ?assert(is_map(ClientAttrs)); verify_event_fields('message.dropped', Fields) -> #{ id := ID, @@ -4154,7 +4216,8 @@ verify_event_fields('message.dropped', Fields) -> flags := Flags, pub_props := Properties, timestamp := Timestamp, - publish_received_at := EventAt + publish_received_at := EventAt, + client_attrs := ClientAttrs } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, @@ -4172,7 +4235,8 @@ verify_event_fields('message.dropped', Fields) -> ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000), - ?assert(EventAt =< Timestamp); + ?assert(EventAt =< Timestamp), + ?assert(is_map(ClientAttrs)); verify_event_fields('message.delivered', Fields) -> #{ id := ID, @@ -4188,7 +4252,8 @@ verify_event_fields('message.delivered', Fields) -> flags := Flags, pub_props := Properties, timestamp := Timestamp, - publish_received_at := EventAt + publish_received_at := EventAt, + client_attrs := ClientAttrs } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, @@ -4207,7 +4272,8 @@ verify_event_fields('message.delivered', Fields) -> ?assertMatch(#{'Message-Expiry-Interval' := 60}, Properties), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000), - ?assert(EventAt =< Timestamp); + ?assert(EventAt =< Timestamp), + ?assert(is_map(ClientAttrs)); verify_event_fields('message.acked', Fields) -> #{ id := ID, @@ -4224,7 +4290,8 @@ verify_event_fields('message.acked', Fields) -> pub_props := PubProps, puback_props := PubAckProps, timestamp := Timestamp, - publish_received_at := EventAt + publish_received_at := EventAt, + client_attrs := ClientAttrs } = Fields, Now = erlang:system_time(millisecond), TimestampElapse = Now - Timestamp, @@ -4244,7 +4311,8 @@ verify_event_fields('message.acked', Fields) -> ?assert(is_map(PubAckProps)), ?assert(0 =< TimestampElapse andalso TimestampElapse =< 60 * 1000), ?assert(0 =< RcvdAtElapse andalso RcvdAtElapse =< 60 * 1000), - ?assert(EventAt =< Timestamp); + ?assert(EventAt =< Timestamp), + ?assert(is_map(ClientAttrs)); verify_event_fields('client.connack', Fields) -> #{ clientid := ClientId, @@ -4282,7 +4350,8 @@ verify_event_fields('client.check_authz_complete', Fields) -> peername := PeerName, topic := Topic, authz_source := AuthzSource, - username := Username + username := Username, + client_attrs := ClientAttrs } = Fields, ?assertEqual(<<"t1">>, Topic), ?assert(lists:member(Action, [subscribe, publish])), @@ -4302,20 +4371,23 @@ verify_event_fields('client.check_authz_complete', Fields) -> ]) ), ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), - ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])); + ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), + ?assert(is_map(ClientAttrs)); verify_event_fields('client.check_authn_complete', Fields) -> #{ clientid := ClientId, peername := PeerName, username := Username, is_anonymous := IsAnonymous, - is_superuser := IsSuperuser + is_superuser := IsSuperuser, + client_attrs := ClientAttrs } = Fields, verify_peername(PeerName), ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), ?assert(erlang:is_boolean(IsAnonymous)), - ?assert(erlang:is_boolean(IsSuperuser)); + ?assert(erlang:is_boolean(IsSuperuser)), + ?assert(is_map(ClientAttrs)); verify_event_fields('schema.validation_failed', Fields) -> #{ validation := ValidationName, @@ -4327,12 +4399,14 @@ verify_event_fields('schema.validation_failed', Fields) -> topic := _Topic, flags := _Flags, pub_props := _PubProps, - publish_received_at := _PublishReceivedAt + publish_received_at := _PublishReceivedAt, + client_attrs := ClientAttrs } = Fields, ?assertEqual(<<"v1">>, ValidationName), verify_peername(PeerName), ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), + ?assert(is_map(ClientAttrs)), ok; verify_event_fields('message.transformation_failed', Fields) -> #{ @@ -4345,12 +4419,14 @@ verify_event_fields('message.transformation_failed', Fields) -> topic := _Topic, flags := _Flags, pub_props := _PubProps, - publish_received_at := _PublishReceivedAt + publish_received_at := _PublishReceivedAt, + client_attrs := ClientAttrs } = Fields, ?assertEqual(<<"t1">>, TransformationName), verify_peername(PeerName), ?assert(lists:member(ClientId, [<<"c_event">>, <<"c_event2">>])), ?assert(lists:member(Username, [<<"u_event">>, <<"u_event2">>])), + ?assert(is_map(ClientAttrs)), ok. verify_peername(PeerName) -> From c35661f4844ddff881d86547ae58c529eafee9c7 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 5 Aug 2024 22:43:08 +0800 Subject: [PATCH 2/2] chore: update changes --- changes/ce/feat-13573.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/feat-13573.en.md diff --git a/changes/ce/feat-13573.en.md b/changes/ce/feat-13573.en.md new file mode 100644 index 000000000..270b31cad --- /dev/null +++ b/changes/ce/feat-13573.en.md @@ -0,0 +1 @@ +Expose `client_attrs` to rule engine and rule events.