diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.app.src b/apps/emqx_message_transformation/src/emqx_message_transformation.app.src index 7643cbb9f..dcc94db15 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.app.src +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.app.src @@ -1,6 +1,6 @@ {application, emqx_message_transformation, [ {description, "EMQX Message Transformation"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, [emqx_message_transformation_sup, emqx_message_transformation_registry]}, {mod, {emqx_message_transformation_app, []}}, {applications, [ diff --git a/apps/emqx_message_transformation/src/emqx_message_transformation.erl b/apps/emqx_message_transformation/src/emqx_message_transformation.erl index 14f44ec1f..cff7e3133 100644 --- a/apps/emqx_message_transformation/src/emqx_message_transformation.erl +++ b/apps/emqx_message_transformation/src/emqx_message_transformation.erl @@ -219,7 +219,7 @@ put_value([<<"user_property">>, Key], Rendered, Context0) -> Context = maps:update_with(dirty, fun(D) -> D#{user_property => true} end, Context0), maps:update_with( user_property, - fun(Ps) -> lists:keystore(Key, 1, Ps, {Key, Rendered}) end, + fun(Ps) -> maps:put(Key, Rendered, Ps) end, Context ); put_value([<<"qos">>], Rendered, Context0) -> @@ -323,6 +323,12 @@ message_to_context(#message{} = Message, Payload, Transformation) -> true -> #{}; false -> #{payload => true} end, + UserProperties0 = maps:get( + 'User-Property', + emqx_message:get_header(properties, Message, #{}), + [] + ), + UserProperties = maps:from_list(UserProperties0), #{ dirty => Dirty, client_attrs => emqx_message:get_header(client_attrs, Message, #{}), @@ -330,9 +336,7 @@ message_to_context(#message{} = Message, Payload, Transformation) -> qos => Message#message.qos, retain => emqx_message:get_flag(retain, Message, false), topic => Message#message.topic, - user_property => maps:get( - 'User-Property', emqx_message:get_header(properties, Message, #{}), [] - ) + user_property => UserProperties }. -spec context_to_message(emqx_types:message(), eval_context(), transformation()) -> @@ -363,7 +367,9 @@ take_from_context(Context, Message) -> emqx_message:set_flag(retain, maps:get(retain, Context), Acc); (user_property, _, Acc) -> Props0 = emqx_message:get_header(properties, Acc, #{}), - Props = maps:merge(Props0, #{'User-Property' => maps:get(user_property, Context)}), + UserProperties0 = maps:to_list(maps:get(user_property, Context)), + UserProperties = lists:keysort(1, UserProperties0), + Props = maps:merge(Props0, #{'User-Property' => UserProperties}), emqx_message:set_header(properties, Props, Acc) end, Message, diff --git a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl index 5e4d9a0cb..9728ca71d 100644 --- a/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl +++ b/apps/emqx_message_transformation/test/emqx_message_transformation_http_api_SUITE.erl @@ -300,14 +300,19 @@ connect(ClientId, IsPersistent, Opts) -> publish(Client, Topic, Payload) -> publish(Client, Topic, Payload, _QoS = 0). -publish(Client, Topic, {raw, Payload}, QoS) -> - case emqtt:publish(Client, Topic, Payload, QoS) of +publish(Client, Topic, Payload, QoS) -> + publish(Client, Topic, Payload, QoS, _Opts = #{}). + +publish(Client, Topic, {raw, Payload}, QoS, Opts) -> + Props = maps:get(props, Opts, #{}), + case emqtt:publish(Client, Topic, Props, Payload, [{qos, QoS}]) of ok -> ok; {ok, _} -> ok; Err -> Err end; -publish(Client, Topic, Payload, QoS) -> - case emqtt:publish(Client, Topic, emqx_utils_json:encode(Payload), QoS) of +publish(Client, Topic, Payload, QoS, Opts) -> + Props = maps:get(props, Opts, #{}), + case emqtt:publish(Client, Topic, Props, emqx_utils_json:encode(Payload), [{qos, QoS}]) of ok -> ok; {ok, _} -> ok; Err -> Err @@ -503,6 +508,7 @@ t_smoke_test(_Config) -> operation(topic, <<"concat([topic, '/', payload.t])">>), operation(retain, <<"payload.r">>), operation(<<"user_property.a">>, <<"payload.u.a">>), + operation(<<"user_property.copy">>, <<"user_property.original">>), operation(<<"payload">>, <<"payload.p.hello">>) ], Transformation1 = transformation(Name1, Operations), @@ -527,7 +533,8 @@ t_smoke_test(_Config) -> t => <<"t">>, u => #{a => <<"b">>} }, - _QosPub = 0 + _QosPub = 0, + #{props => #{'User-Property' => [{<<"original">>, <<"user_prop">>}]}} ), ?assertReceive( {publish, #{ @@ -535,7 +542,13 @@ t_smoke_test(_Config) -> qos := QoS, retain := true, topic := <<"t/1/t">>, - properties := #{'User-Property' := [{<<"a">>, <<"b">>}]} + properties := #{ + 'User-Property' := [ + {<<"a">>, <<"b">>}, + {<<"copy">>, <<"user_prop">>}, + {<<"original">>, <<"user_prop">>} + ] + } }} ), %% remember to clear retained message @@ -1534,6 +1547,7 @@ t_dryrun_transformation(_Config) -> operation(topic, <<"concat([topic, '/', payload.t])">>), operation(retain, <<"payload.r">>), operation(<<"user_property.a">>, <<"payload.u.a">>), + operation(<<"user_property.copy">>, <<"user_property.original">>), operation(<<"payload">>, <<"payload.p.hello">>) ], Transformation1 = transformation(Name1, Operations), @@ -1546,7 +1560,8 @@ t_dryrun_transformation(_Config) -> r => true, t => <<"t">>, u => #{a => <<"b">>} - } + }, + user_property => #{<<"original">> => <<"user_prop">>} }), ?assertMatch( {200, #{ @@ -1554,7 +1569,11 @@ t_dryrun_transformation(_Config) -> <<"qos">> := 1, <<"retain">> := true, <<"topic">> := <<"t/u/v/t">>, - <<"user_property">> := #{<<"a">> := <<"b">>} + <<"user_property">> := #{ + <<"a">> := <<"b">>, + <<"original">> := <<"user_prop">>, + <<"copy">> := <<"user_prop">> + } }}, dryrun_transformation(Transformation1, Message1) ),