fix(message transformation): correctly read from user properties in context

Port of https://github.com/emqx/emqx/pull/13316 to release-57

Fixes https://emqx.atlassian.net/browse/EMQX-12582
This commit is contained in:
Thales Macedo Garitezi 2024-06-21 09:39:22 -03:00
parent 6dbb561944
commit 59084dbfbe
3 changed files with 39 additions and 14 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_message_transformation, [ {application, emqx_message_transformation, [
{description, "EMQX Message Transformation"}, {description, "EMQX Message Transformation"},
{vsn, "0.1.0"}, {vsn, "0.1.1"},
{registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]}, {registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]},
{mod, {emqx_message_transformation_app, []}}, {mod, {emqx_message_transformation_app, []}},
{applications, [ {applications, [

View File

@ -219,7 +219,7 @@ put_value([<<"user_property">>, Key], Rendered, Context0) ->
Context = maps:update_with(dirty, fun(D) -> D#{user_property => true} end, Context0), Context = maps:update_with(dirty, fun(D) -> D#{user_property => true} end, Context0),
maps:update_with( maps:update_with(
user_property, user_property,
fun(Ps) -> lists:keystore(Key, 1, Ps, {Key, Rendered}) end, fun(Ps) -> maps:put(Key, Rendered, Ps) end,
Context Context
); );
put_value([<<"qos">>], Rendered, Context0) -> put_value([<<"qos">>], Rendered, Context0) ->
@ -323,6 +323,12 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
true -> #{}; true -> #{};
false -> #{payload => true} false -> #{payload => true}
end, end,
UserProperties0 = maps:get(
'User-Property',
emqx_message:get_header(properties, Message, #{}),
[]
),
UserProperties = maps:from_list(UserProperties0),
#{ #{
dirty => Dirty, dirty => Dirty,
client_attrs => emqx_message:get_header(client_attrs, Message, #{}), client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
@ -330,9 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
qos => Message#message.qos, qos => Message#message.qos,
retain => emqx_message:get_flag(retain, Message, false), retain => emqx_message:get_flag(retain, Message, false),
topic => Message#message.topic, topic => Message#message.topic,
user_property => maps:get( user_property => UserProperties
'User-Property', emqx_message:get_header(properties, Message, #{}), []
)
}. }.
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) -> -spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
@ -363,7 +367,9 @@ take_from_context(Context, Message) ->
emqx_message:set_flag(retain, maps:get(retain, Context), Acc); emqx_message:set_flag(retain, maps:get(retain, Context), Acc);
(user_property, _, Acc) -> (user_property, _, Acc) ->
Props0 = emqx_message:get_header(properties, Acc, #{}), Props0 = emqx_message:get_header(properties, Acc, #{}),
Props = maps:merge(Props0, #{'User-Property' => maps:get(user_property, Context)}), UserProperties0 = maps:to_list(maps:get(user_property, Context)),
UserProperties = lists:keysort(1, UserProperties0),
Props = maps:merge(Props0, #{'User-Property' => UserProperties}),
emqx_message:set_header(properties, Props, Acc) emqx_message:set_header(properties, Props, Acc)
end, end,
Message, Message,

View File

@ -300,14 +300,19 @@ connect(ClientId, IsPersistent, Opts) ->
publish(Client, Topic, Payload) -> publish(Client, Topic, Payload) ->
publish(Client, Topic, Payload, _QoS = 0). publish(Client, Topic, Payload, _QoS = 0).
publish(Client, Topic, {raw, Payload}, QoS) -> publish(Client, Topic, Payload, QoS) ->
case emqtt:publish(Client, Topic, Payload, QoS) of publish(Client, Topic, Payload, QoS, _Opts = #{}).
publish(Client, Topic, {raw, Payload}, QoS, Opts) ->
Props = maps:get(props, Opts, #{}),
case emqtt:publish(Client, Topic, Props, Payload, [{qos, QoS}]) of
ok -> ok; ok -> ok;
{ok, _} -> ok; {ok, _} -> ok;
Err -> Err Err -> Err
end; end;
publish(Client, Topic, Payload, QoS) -> publish(Client, Topic, Payload, QoS, Opts) ->
case emqtt:publish(Client, Topic, emqx_utils_json:encode(Payload), QoS) of Props = maps:get(props, Opts, #{}),
case emqtt:publish(Client, Topic, Props, emqx_utils_json:encode(Payload), [{qos, QoS}]) of
ok -> ok; ok -> ok;
{ok, _} -> ok; {ok, _} -> ok;
Err -> Err Err -> Err
@ -503,6 +508,7 @@ t_smoke_test(_Config) ->
operation(topic, <<"concat([topic, '/', payload.t])">>), operation(topic, <<"concat([topic, '/', payload.t])">>),
operation(retain, <<"payload.r">>), operation(retain, <<"payload.r">>),
operation(<<"user_property.a">>, <<"payload.u.a">>), operation(<<"user_property.a">>, <<"payload.u.a">>),
operation(<<"user_property.copy">>, <<"user_property.original">>),
operation(<<"payload">>, <<"payload.p.hello">>) operation(<<"payload">>, <<"payload.p.hello">>)
], ],
Transformation1 = transformation(Name1, Operations), Transformation1 = transformation(Name1, Operations),
@ -527,7 +533,8 @@ t_smoke_test(_Config) ->
t => <<"t">>, t => <<"t">>,
u => #{a => <<"b">>} u => #{a => <<"b">>}
}, },
_QosPub = 0 _QosPub = 0,
#{props => #{'User-Property' => [{<<"original">>, <<"user_prop">>}]}}
), ),
?assertReceive( ?assertReceive(
{publish, #{ {publish, #{
@ -535,7 +542,13 @@ t_smoke_test(_Config) ->
qos := QoS, qos := QoS,
retain := true, retain := true,
topic := <<"t/1/t">>, topic := <<"t/1/t">>,
properties := #{'User-Property' := [{<<"a">>, <<"b">>}]} properties := #{
'User-Property' := [
{<<"a">>, <<"b">>},
{<<"copy">>, <<"user_prop">>},
{<<"original">>, <<"user_prop">>}
]
}
}} }}
), ),
%% remember to clear retained message %% remember to clear retained message
@ -1534,6 +1547,7 @@ t_dryrun_transformation(_Config) ->
operation(topic, <<"concat([topic, '/', payload.t])">>), operation(topic, <<"concat([topic, '/', payload.t])">>),
operation(retain, <<"payload.r">>), operation(retain, <<"payload.r">>),
operation(<<"user_property.a">>, <<"payload.u.a">>), operation(<<"user_property.a">>, <<"payload.u.a">>),
operation(<<"user_property.copy">>, <<"user_property.original">>),
operation(<<"payload">>, <<"payload.p.hello">>) operation(<<"payload">>, <<"payload.p.hello">>)
], ],
Transformation1 = transformation(Name1, Operations), Transformation1 = transformation(Name1, Operations),
@ -1546,7 +1560,8 @@ t_dryrun_transformation(_Config) ->
r => true, r => true,
t => <<"t">>, t => <<"t">>,
u => #{a => <<"b">>} u => #{a => <<"b">>}
} },
user_property => #{<<"original">> => <<"user_prop">>}
}), }),
?assertMatch( ?assertMatch(
{200, #{ {200, #{
@ -1554,7 +1569,11 @@ t_dryrun_transformation(_Config) ->
<<"qos">> := 1, <<"qos">> := 1,
<<"retain">> := true, <<"retain">> := true,
<<"topic">> := <<"t/u/v/t">>, <<"topic">> := <<"t/u/v/t">>,
<<"user_property">> := #{<<"a">> := <<"b">>} <<"user_property">> := #{
<<"a">> := <<"b">>,
<<"original">> := <<"user_prop">>,
<<"copy">> := <<"user_prop">>
}
}}, }},
dryrun_transformation(Transformation1, Message1) dryrun_transformation(Transformation1, Message1)
), ),