From 6e0ef893f4ec2ec45b6405893db0a7aebb5cafa0 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 10 Jun 2024 14:38:16 -0300 Subject: [PATCH] feat: pass along client attributes down to message transformation context --- apps/emqx/src/emqx_channel.erl | 17 ++++---- .../src/emqx_message_transformation.erl | 2 + ..._message_transformation_http_api_SUITE.erl | 40 ++++++++++++++++++- 3 files changed, 51 insertions(+), 8 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index eb54f6ba1..4b708e15a 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -685,20 +685,23 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) -> packet_to_message(Packet, #channel{ conninfo = #{proto_ver := ProtoVer}, - clientinfo = #{ - protocol := Protocol, - clientid := ClientId, - username := Username, - peerhost := PeerHost, - mountpoint := MountPoint - } + clientinfo = + #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost, + mountpoint := MountPoint + } = ClientInfo }) -> + ClientAttrs = maps:get(client_attrs, ClientInfo, #{}), emqx_mountpoint:mount( MountPoint, emqx_packet:to_message( Packet, ClientId, #{ + client_attrs => ClientAttrs, proto_ver => ProtoVer, protocol => Protocol, username => Username, diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 0ffb9f606..612a30f78 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -45,6 +45,7 @@ -type rendered_value() :: qos() | boolean() | binary(). -type eval_context() :: #{ + client_attrs := map(), payload := _, qos := _, retain := _, @@ -309,6 +310,7 @@ message_to_context(#message{} = Message, Payload, Transformation) -> end, #{ dirty => Dirty, + client_attrs => emqx_message:get_header(client_attrs, Message, #{}), payload => Payload, qos => Message#message.qos, retain => emqx_message:get_flag(retain, Message, false), diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index 60779911c..58efa69e0 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -250,7 +250,11 @@ connect(ClientId) -> connect(ClientId, _IsPersistent = false). connect(ClientId, IsPersistent) -> - Properties = emqx_utils_maps:put_if(#{}, 'Session-Expiry-Interval', 30, IsPersistent), + connect(ClientId, IsPersistent, _Opts = #{}). + +connect(ClientId, IsPersistent, Opts) -> + Properties0 = maps:get(properties, Opts, #{}), + Properties = emqx_utils_maps:put_if(Properties0, 'Session-Expiry-Interval', 30, IsPersistent), {ok, Client} = emqtt:start_link([ {clean_start, true}, {clientid, ClientId}, @@ -1441,3 +1445,37 @@ t_json_encode_decode_smoke_test(_Config) -> [] ), ok. + +%% Simple smoke test for client attributes support. +t_client_attrs(_Config) -> + {ok, Compiled} = emqx_variform:compile(<<"user_property.tenant">>), + ok = emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], [ + #{ + expression => Compiled, + set_as_attr => <<"tenant">> + } + ]), + on_exit(fun() -> ok = emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], []) end), + ?check_trace( + begin + Name1 = <<"foo">>, + Operation1 = operation(topic, <<"concat([client_attrs.tenant, '/', topic])">>), + Transformation1 = transformation(Name1, [Operation1]), + {201, _} = insert(Transformation1), + + Tenant = <<"mytenant">>, + C = connect( + <<"c1">>, + _IsPersistent = false, + #{properties => #{'User-Property' => [{<<"tenant">>, Tenant}]}} + ), + {ok, _, [_]} = emqtt:subscribe(C, emqx_topic:join([Tenant, <<"#">>])), + + ok = publish(C, <<"t/1">>, #{x => 1, y => 2}), + ?assertReceive({publish, #{topic := <<"mytenant/t/1">>}}), + + ok + end, + [] + ), + ok.