Merge pull request #13316 from thalesmg/fix-mt-read-user-prop-m-20240621
fix(message transformation): correctly read from user properties in context
This commit is contained in:
commit
debf1e6cd5
|
@ -219,7 +219,7 @@ put_value([<<"user_property">>, Key], Rendered, Context0) ->
|
|||
Context = maps:update_with(dirty, fun(D) -> D#{user_property => true} end, Context0),
|
||||
maps:update_with(
|
||||
user_property,
|
||||
fun(Ps) -> lists:keystore(Key, 1, Ps, {Key, Rendered}) end,
|
||||
fun(Ps) -> maps:put(Key, Rendered, Ps) end,
|
||||
Context
|
||||
);
|
||||
put_value([<<"qos">>], Rendered, Context0) ->
|
||||
|
@ -323,6 +323,12 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
|
|||
true -> #{};
|
||||
false -> #{payload => true}
|
||||
end,
|
||||
UserProperties0 = maps:get(
|
||||
'User-Property',
|
||||
emqx_message:get_header(properties, Message, #{}),
|
||||
[]
|
||||
),
|
||||
UserProperties = maps:from_list(UserProperties0),
|
||||
#{
|
||||
dirty => Dirty,
|
||||
client_attrs => emqx_message:get_header(client_attrs, Message, #{}),
|
||||
|
@ -330,9 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) ->
|
|||
qos => Message#message.qos,
|
||||
retain => emqx_message:get_flag(retain, Message, false),
|
||||
topic => Message#message.topic,
|
||||
user_property => maps:get(
|
||||
'User-Property', emqx_message:get_header(properties, Message, #{}), []
|
||||
)
|
||||
user_property => UserProperties
|
||||
}.
|
||||
|
||||
-spec context_to_message(emqx_types:message(), eval_context(), transformation()) ->
|
||||
|
@ -363,7 +367,9 @@ take_from_context(Context, Message) ->
|
|||
emqx_message:set_flag(retain, maps:get(retain, Context), Acc);
|
||||
(user_property, _, Acc) ->
|
||||
Props0 = emqx_message:get_header(properties, Acc, #{}),
|
||||
Props = maps:merge(Props0, #{'User-Property' => maps:get(user_property, Context)}),
|
||||
UserProperties0 = maps:to_list(maps:get(user_property, Context)),
|
||||
UserProperties = lists:keysort(1, UserProperties0),
|
||||
Props = maps:merge(Props0, #{'User-Property' => UserProperties}),
|
||||
emqx_message:set_header(properties, Props, Acc)
|
||||
end,
|
||||
Message,
|
||||
|
|
|
@ -300,14 +300,19 @@ connect(ClientId, IsPersistent, Opts) ->
|
|||
publish(Client, Topic, Payload) ->
|
||||
publish(Client, Topic, Payload, _QoS = 0).
|
||||
|
||||
publish(Client, Topic, {raw, Payload}, QoS) ->
|
||||
case emqtt:publish(Client, Topic, Payload, QoS) of
|
||||
publish(Client, Topic, Payload, QoS) ->
|
||||
publish(Client, Topic, Payload, QoS, _Opts = #{}).
|
||||
|
||||
publish(Client, Topic, {raw, Payload}, QoS, Opts) ->
|
||||
Props = maps:get(props, Opts, #{}),
|
||||
case emqtt:publish(Client, Topic, Props, Payload, [{qos, QoS}]) of
|
||||
ok -> ok;
|
||||
{ok, _} -> ok;
|
||||
Err -> Err
|
||||
end;
|
||||
publish(Client, Topic, Payload, QoS) ->
|
||||
case emqtt:publish(Client, Topic, emqx_utils_json:encode(Payload), QoS) of
|
||||
publish(Client, Topic, Payload, QoS, Opts) ->
|
||||
Props = maps:get(props, Opts, #{}),
|
||||
case emqtt:publish(Client, Topic, Props, emqx_utils_json:encode(Payload), [{qos, QoS}]) of
|
||||
ok -> ok;
|
||||
{ok, _} -> ok;
|
||||
Err -> Err
|
||||
|
@ -503,6 +508,7 @@ t_smoke_test(_Config) ->
|
|||
operation(topic, <<"concat([topic, '/', payload.t])">>),
|
||||
operation(retain, <<"payload.r">>),
|
||||
operation(<<"user_property.a">>, <<"payload.u.a">>),
|
||||
operation(<<"user_property.copy">>, <<"user_property.original">>),
|
||||
operation(<<"payload">>, <<"payload.p.hello">>)
|
||||
],
|
||||
Transformation1 = transformation(Name1, Operations),
|
||||
|
@ -527,7 +533,8 @@ t_smoke_test(_Config) ->
|
|||
t => <<"t">>,
|
||||
u => #{a => <<"b">>}
|
||||
},
|
||||
_QosPub = 0
|
||||
_QosPub = 0,
|
||||
#{props => #{'User-Property' => [{<<"original">>, <<"user_prop">>}]}}
|
||||
),
|
||||
?assertReceive(
|
||||
{publish, #{
|
||||
|
@ -535,7 +542,13 @@ t_smoke_test(_Config) ->
|
|||
qos := QoS,
|
||||
retain := true,
|
||||
topic := <<"t/1/t">>,
|
||||
properties := #{'User-Property' := [{<<"a">>, <<"b">>}]}
|
||||
properties := #{
|
||||
'User-Property' := [
|
||||
{<<"a">>, <<"b">>},
|
||||
{<<"copy">>, <<"user_prop">>},
|
||||
{<<"original">>, <<"user_prop">>}
|
||||
]
|
||||
}
|
||||
}}
|
||||
),
|
||||
%% remember to clear retained message
|
||||
|
@ -1534,6 +1547,7 @@ t_dryrun_transformation(_Config) ->
|
|||
operation(topic, <<"concat([topic, '/', payload.t])">>),
|
||||
operation(retain, <<"payload.r">>),
|
||||
operation(<<"user_property.a">>, <<"payload.u.a">>),
|
||||
operation(<<"user_property.copy">>, <<"user_property.original">>),
|
||||
operation(<<"payload">>, <<"payload.p.hello">>)
|
||||
],
|
||||
Transformation1 = transformation(Name1, Operations),
|
||||
|
@ -1546,7 +1560,8 @@ t_dryrun_transformation(_Config) ->
|
|||
r => true,
|
||||
t => <<"t">>,
|
||||
u => #{a => <<"b">>}
|
||||
}
|
||||
},
|
||||
user_property => #{<<"original">> => <<"user_prop">>}
|
||||
}),
|
||||
?assertMatch(
|
||||
{200, #{
|
||||
|
@ -1554,7 +1569,11 @@ t_dryrun_transformation(_Config) ->
|
|||
<<"qos">> := 1,
|
||||
<<"retain">> := true,
|
||||
<<"topic">> := <<"t/u/v/t">>,
|
||||
<<"user_property">> := #{<<"a">> := <<"b">>}
|
||||
<<"user_property">> := #{
|
||||
<<"a">> := <<"b">>,
|
||||
<<"original">> := <<"user_prop">>,
|
||||
<<"copy">> := <<"user_prop">>
|
||||
}
|
||||
}},
|
||||
dryrun_transformation(Transformation1, Message1)
|
||||
),
|
||||
|
|
Loading…
Reference in New Issue