Merge pull request #13381 from thalesmg/20240701-r57-fix-mt-read-user-prop
fix(message transformation): correctly read from user properties in context (r57)
This commit is contained in:
commit
51a8d3b041
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_message_transformation, [
|
{application, emqx_message_transformation, [
|
||||||
{description, "EMQX Message Transformation"},
|
{description, "EMQX Message Transformation"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]},
|
{registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]},
|
||||||
{mod, {emqx_message_transformation_app, []}},
|
{mod, {emqx_message_transformation_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -219,7 +219,7 @@ put_value([<<"user_property">>, Key], Rendered, Context0) ->
|
||||||
Context = maps:update_with(dirty, fun(D) -> D#{user_property => true} end, Context0),
|
Context = maps:update_with(dirty, fun(D) -> D#{user_property => true} end, Context0),
|
||||||
maps:update_with(
|
maps:update_with(
|
||||||
user_property,
|
user_property,
|
||||||
fun(Ps) -> lists:keystore(Key, 1, Ps, {Key, Rendered}) end,
|
fun(Ps) -> maps:put(Key, Rendered, Ps) end,
|
||||||
Context
|
Context
|
||||||
);
|
);
|
||||||
put_value([<<"qos">>], Rendered, Context0) ->
|
put_value([<<"qos">>], Rendered, Context0) ->
|
||||||
|
@ -323,6 +323,12 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
|
||||||
true -> #{};
|
true -> #{};
|
||||||
false -> #{payload => true}
|
false -> #{payload => true}
|
||||||
end,
|
end,
|
||||||
|
UserProperties0 = maps:get(
|
||||||
|
'User-Property',
|
||||||
|
emqx_message:get_header(properties, Message, #{}),
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
UserProperties = maps:from_list(UserProperties0),
|
||||||
#{
|
#{
|
||||||
dirty => Dirty,
|
dirty => Dirty,
|
||||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
|
client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
|
||||||
|
@ -330,9 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
|
||||||
qos => Message#message.qos,
|
qos => Message#message.qos,
|
||||||
retain => emqx_message:get_flag(retain, Message, false),
|
retain => emqx_message:get_flag(retain, Message, false),
|
||||||
topic => Message#message.topic,
|
topic => Message#message.topic,
|
||||||
user_property => maps:get(
|
user_property => UserProperties
|
||||||
'User-Property', emqx_message:get_header(properties, Message, #{}), []
|
|
||||||
)
|
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
|
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
|
||||||
|
@ -363,7 +367,9 @@ take_from_context(Context, Message) ->
|
||||||
emqx_message:set_flag(retain, maps:get(retain, Context), Acc);
|
emqx_message:set_flag(retain, maps:get(retain, Context), Acc);
|
||||||
(user_property, _, Acc) ->
|
(user_property, _, Acc) ->
|
||||||
Props0 = emqx_message:get_header(properties, Acc, #{}),
|
Props0 = emqx_message:get_header(properties, Acc, #{}),
|
||||||
Props = maps:merge(Props0, #{'User-Property' => maps:get(user_property, Context)}),
|
UserProperties0 = maps:to_list(maps:get(user_property, Context)),
|
||||||
|
UserProperties = lists:keysort(1, UserProperties0),
|
||||||
|
Props = maps:merge(Props0, #{'User-Property' => UserProperties}),
|
||||||
emqx_message:set_header(properties, Props, Acc)
|
emqx_message:set_header(properties, Props, Acc)
|
||||||
end,
|
end,
|
||||||
Message,
|
Message,
|
||||||
|
|
|
@ -300,14 +300,19 @@ connect(ClientId, IsPersistent, Opts) ->
|
||||||
publish(Client, Topic, Payload) ->
|
publish(Client, Topic, Payload) ->
|
||||||
publish(Client, Topic, Payload, _QoS = 0).
|
publish(Client, Topic, Payload, _QoS = 0).
|
||||||
|
|
||||||
publish(Client, Topic, {raw, Payload}, QoS) ->
|
publish(Client, Topic, Payload, QoS) ->
|
||||||
case emqtt:publish(Client, Topic, Payload, QoS) of
|
publish(Client, Topic, Payload, QoS, _Opts = #{}).
|
||||||
|
|
||||||
|
publish(Client, Topic, {raw, Payload}, QoS, Opts) ->
|
||||||
|
Props = maps:get(props, Opts, #{}),
|
||||||
|
case emqtt:publish(Client, Topic, Props, Payload, [{qos, QoS}]) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
Err -> Err
|
Err -> Err
|
||||||
end;
|
end;
|
||||||
publish(Client, Topic, Payload, QoS) ->
|
publish(Client, Topic, Payload, QoS, Opts) ->
|
||||||
case emqtt:publish(Client, Topic, emqx_utils_json:encode(Payload), QoS) of
|
Props = maps:get(props, Opts, #{}),
|
||||||
|
case emqtt:publish(Client, Topic, Props, emqx_utils_json:encode(Payload), [{qos, QoS}]) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{ok, _} -> ok;
|
{ok, _} -> ok;
|
||||||
Err -> Err
|
Err -> Err
|
||||||
|
@ -503,6 +508,7 @@ t_smoke_test(_Config) ->
|
||||||
operation(topic, <<"concat([topic, '/', payload.t])">>),
|
operation(topic, <<"concat([topic, '/', payload.t])">>),
|
||||||
operation(retain, <<"payload.r">>),
|
operation(retain, <<"payload.r">>),
|
||||||
operation(<<"user_property.a">>, <<"payload.u.a">>),
|
operation(<<"user_property.a">>, <<"payload.u.a">>),
|
||||||
|
operation(<<"user_property.copy">>, <<"user_property.original">>),
|
||||||
operation(<<"payload">>, <<"payload.p.hello">>)
|
operation(<<"payload">>, <<"payload.p.hello">>)
|
||||||
],
|
],
|
||||||
Transformation1 = transformation(Name1, Operations),
|
Transformation1 = transformation(Name1, Operations),
|
||||||
|
@ -527,7 +533,8 @@ t_smoke_test(_Config) ->
|
||||||
t => <<"t">>,
|
t => <<"t">>,
|
||||||
u => #{a => <<"b">>}
|
u => #{a => <<"b">>}
|
||||||
},
|
},
|
||||||
_QosPub = 0
|
_QosPub = 0,
|
||||||
|
#{props => #{'User-Property' => [{<<"original">>, <<"user_prop">>}]}}
|
||||||
),
|
),
|
||||||
?assertReceive(
|
?assertReceive(
|
||||||
{publish, #{
|
{publish, #{
|
||||||
|
@ -535,7 +542,13 @@ t_smoke_test(_Config) ->
|
||||||
qos := QoS,
|
qos := QoS,
|
||||||
retain := true,
|
retain := true,
|
||||||
topic := <<"t/1/t">>,
|
topic := <<"t/1/t">>,
|
||||||
properties := #{'User-Property' := [{<<"a">>, <<"b">>}]}
|
properties := #{
|
||||||
|
'User-Property' := [
|
||||||
|
{<<"a">>, <<"b">>},
|
||||||
|
{<<"copy">>, <<"user_prop">>},
|
||||||
|
{<<"original">>, <<"user_prop">>}
|
||||||
|
]
|
||||||
|
}
|
||||||
}}
|
}}
|
||||||
),
|
),
|
||||||
%% remember to clear retained message
|
%% remember to clear retained message
|
||||||
|
@ -1534,6 +1547,7 @@ t_dryrun_transformation(_Config) ->
|
||||||
operation(topic, <<"concat([topic, '/', payload.t])">>),
|
operation(topic, <<"concat([topic, '/', payload.t])">>),
|
||||||
operation(retain, <<"payload.r">>),
|
operation(retain, <<"payload.r">>),
|
||||||
operation(<<"user_property.a">>, <<"payload.u.a">>),
|
operation(<<"user_property.a">>, <<"payload.u.a">>),
|
||||||
|
operation(<<"user_property.copy">>, <<"user_property.original">>),
|
||||||
operation(<<"payload">>, <<"payload.p.hello">>)
|
operation(<<"payload">>, <<"payload.p.hello">>)
|
||||||
],
|
],
|
||||||
Transformation1 = transformation(Name1, Operations),
|
Transformation1 = transformation(Name1, Operations),
|
||||||
|
@ -1546,7 +1560,8 @@ t_dryrun_transformation(_Config) ->
|
||||||
r => true,
|
r => true,
|
||||||
t => <<"t">>,
|
t => <<"t">>,
|
||||||
u => #{a => <<"b">>}
|
u => #{a => <<"b">>}
|
||||||
}
|
},
|
||||||
|
user_property => #{<<"original">> => <<"user_prop">>}
|
||||||
}),
|
}),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{200, #{
|
{200, #{
|
||||||
|
@ -1554,7 +1569,11 @@ t_dryrun_transformation(_Config) ->
|
||||||
<<"qos">> := 1,
|
<<"qos">> := 1,
|
||||||
<<"retain">> := true,
|
<<"retain">> := true,
|
||||||
<<"topic">> := <<"t/u/v/t">>,
|
<<"topic">> := <<"t/u/v/t">>,
|
||||||
<<"user_property">> := #{<<"a">> := <<"b">>}
|
<<"user_property">> := #{
|
||||||
|
<<"a">> := <<"b">>,
|
||||||
|
<<"original">> := <<"user_prop">>,
|
||||||
|
<<"copy">> := <<"user_prop">>
|
||||||
|
}
|
||||||
}},
|
}},
|
||||||
dryrun_transformation(Transformation1, Message1)
|
dryrun_transformation(Transformation1, Message1)
|
||||||
),
|
),
|
||||||
|
|
Loading…
Reference in New Issue