From 78c5a779d7c721663470609acc80c52efd3c7886 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 5 Sep 2023 14:34:18 -0300 Subject: [PATCH] feat(republish): allow templating mqtt properties Fixes https://emqx.atlassian.net/browse/EMQX-10912 --- .../test/emqx_bridge_mqtt_SUITE.erl | 2 +- .../emqx_dashboard/src/emqx_dashboard.app.src | 2 +- .../src/emqx_rule_actions.erl | 84 +++++++- .../src/emqx_rule_engine.app.src | 2 +- .../src/emqx_rule_engine_schema.erl | 54 ++++- .../test/emqx_rule_engine_SUITE.erl | 196 +++++++++++++++++- .../test/emqx_rule_engine_schema_tests.erl | 134 ++++++++++++ changes/ce/feat-11568.en.md | 1 + changes/ce/fix-11568.en.md | 1 + rel/i18n/emqx_rule_engine_schema.hocon | 10 + 10 files changed, 471 insertions(+), 15 deletions(-) create mode 100644 apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl create mode 100644 changes/ce/feat-11568.en.md create mode 100644 changes/ce/fix-11568.en.md diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 29cfe976a..bc0f2450a 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -640,7 +640,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> #{ <<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>, <<"enable">> => true, - <<"actions">> => [#{<<"function">> => "emqx_bridge_mqtt_SUITE:inspect"}], + <<"actions">> => [#{<<"function">> => <<"emqx_bridge_mqtt_SUITE:inspect">>}], <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> } ), diff --git a/apps/emqx_dashboard/src/emqx_dashboard.app.src b/apps/emqx_dashboard/src/emqx_dashboard.app.src index f8395025e..42a379bf7 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.app.src +++ b/apps/emqx_dashboard/src/emqx_dashboard.app.src @@ -2,7 +2,7 @@ {application, emqx_dashboard, [ {description, "EMQX Web Dashboard"}, % strict semver, bump manually! - {vsn, "5.0.26"}, + {vsn, "5.0.27"}, {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index a5ad35066..93da7191a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -20,6 +20,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). %% APIs -export([parse_action/1]). @@ -60,16 +61,23 @@ pre_process_action_args( qos := QoS, retain := Retain, payload := Payload, - user_properties := UserProperties + mqtt_properties := MQTTPropertiesTemplate0, + user_properties := UserPropertiesTemplate } = Args ) -> + MQTTPropertiesTemplate = + maps:map( + fun(_Key, V) -> emqx_placeholder:preproc_tmpl(V) end, + MQTTPropertiesTemplate0 + ), Args#{ preprocessed_tmpl => #{ topic => emqx_placeholder:preproc_tmpl(Topic), qos => preproc_vars(QoS), retain => preproc_vars(Retain), payload => emqx_placeholder:preproc_tmpl(Payload), - user_properties => preproc_user_properties(UserProperties) + mqtt_properties => MQTTPropertiesTemplate, + user_properties => preproc_user_properties(UserPropertiesTemplate) } }; pre_process_action_args(_, Args) -> @@ -106,6 +114,7 @@ republish( retain := RetainTks, topic := TopicTks, payload := PayloadTks, + mqtt_properties := MQTTPropertiesTemplate, user_properties := UserPropertiesTks } } @@ -118,7 +127,9 @@ republish( %% events such as message.acked and message.dropped Flags0 = maps:get(flags, Env, #{}), Flags = Flags0#{retain => Retain}, - PubProps = format_pub_props(UserPropertiesTks, Selected, Env), + PubProps0 = format_pub_props(UserPropertiesTks, Selected, Env), + MQTTProps = format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env), + PubProps = maps:merge(PubProps0, MQTTProps), ?TRACE( "RULE", "republish_message", @@ -232,3 +243,70 @@ format_pub_props(UserPropertiesTks, Selected, Env) -> replace_simple_var(UserPropertiesTks, Selected, #{}) end, #{'User-Property' => UserProperties}. + +format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> + #{metadata := #{rule_id := RuleId}} = Env, + MQTTProperties0 = + maps:fold( + fun(K, Template, Acc) -> + try + V = emqx_placeholder:proc_tmpl(Template, Selected), + Acc#{K => V} + catch + Kind:Error -> + ?SLOG( + error, + #{ + msg => "bad_mqtt_prop_value", + rule_id => RuleId, + reason => {Kind, Error}, + property => K, + selected => Selected + } + ), + Acc + end + end, + #{}, + MQTTPropertiesTemplate + ), + coerce_properties_values(MQTTProperties0, Env). + +ensure_int(B) when is_binary(B) -> binary_to_integer(B); +ensure_int(I) when is_integer(I) -> I. + +coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) -> + PublishProperties = + maps:from_list([ + {K, T} + || {_Id, {K, T, Tags}} <- maps:to_list(emqtt_props:all()), + is_list(Tags) andalso lists:member(?PUBLISH, Tags) + ]), + maps:fold( + fun(K, V, Acc) -> + try + case maps:get(K, PublishProperties) of + 'Byte' -> Acc#{K => ensure_int(V)}; + 'Two-Byte-Integer' -> Acc#{K => ensure_int(V)}; + 'Four-Byte-Integer' -> Acc#{K => ensure_int(V)}; + 'Variable-Byte-Integer' -> Acc#{K => ensure_int(V)}; + _ -> Acc#{K => V} + end + catch + Kind:Error -> + ?SLOG( + error, + #{ + msg => "bad_mqtt_prop_value", + rule_id => RuleId, + reason => {Kind, Error}, + property => K, + value => V + } + ), + Acc + end + end, + #{}, + MQTTProperties + ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index e6d00bcae..23e4a3f05 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.23"}, + {vsn, "5.0.24"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl, uuid]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 56da4ea41..bd7d44b69 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -63,7 +63,7 @@ fields("rules") -> )}, {"actions", ?HOCON( - ?ARRAY(?UNION(actions())), + ?ARRAY(hoconsc:union(actions())), #{ desc => ?DESC("rules_actions"), default => [], @@ -161,6 +161,14 @@ fields("republish_args") -> example => <<"${payload}">> } )}, + {mqtt_properties, + ?HOCON( + ?R_REF("republish_mqtt_properties"), + #{ + desc => ?DESC("republish_args_mqtt_properties"), + default => #{} + } + )}, {user_properties, ?HOCON( binary(), @@ -170,6 +178,19 @@ fields("republish_args") -> example => <<"${pub_props.'User-Property'}">> } )} + ]; +fields("republish_mqtt_properties") -> + [ + {'Payload-Format-Indicator', + ?HOCON(binary(), #{required => false, desc => ?DESC('Payload-Format-Indicator')})}, + {'Message-Expiry-Interval', + ?HOCON(binary(), #{required => false, desc => ?DESC('Message-Expiry-Interval')})}, + {'Content-Type', ?HOCON(binary(), #{required => false, desc => ?DESC('Content-Type')})}, + {'Response-Topic', ?HOCON(binary(), #{required => false, desc => ?DESC('Response-Topic')})}, + {'Correlation-Data', + ?HOCON(binary(), #{required => false, desc => ?DESC('Correlation-Data')})}, + {'Subscription-Identifier', + ?HOCON(binary(), #{required => false, desc => ?DESC('Subscription-Identifier')})} ]. desc("rule_engine") -> @@ -200,12 +221,31 @@ rule_name() -> )}. actions() -> - [ - binary(), - ?R_REF("builtin_action_republish"), - ?R_REF("builtin_action_console"), - ?R_REF("user_provided_function") - ]. + fun + (all_union_members) -> + [ + binary(), + ?R_REF("builtin_action_republish"), + ?R_REF("builtin_action_console"), + ?R_REF("user_provided_function") + ]; + ({value, V}) -> + case V of + #{<<"function">> := <<"console">>} -> + [?R_REF("builtin_action_console")]; + #{<<"function">> := <<"republish">>} -> + [?R_REF("builtin_action_republish")]; + #{<<"function">> := <<_/binary>>} -> + [?R_REF("user_provided_function")]; + <<_/binary>> -> + [binary()]; + _ -> + throw(#{ + field_name => actions, + reason => <<"unknown action type">> + }) + end + end. qos() -> ?UNION([emqx_schema:qos(), binary()]). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 8c3bd0ebb..2e24ab3a2 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -74,6 +75,7 @@ groups() -> t_sqlselect_inject_props, t_sqlselect_01, t_sqlselect_02, + t_sqlselect_03, t_sqlselect_1, t_sqlselect_2, t_sqlselect_3, @@ -520,13 +522,16 @@ t_get_rule_ids_by_action(_) -> t_ensure_action_removed(_) -> Id = <<"t_ensure_action_removed">>, GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>, - emqx:update_config( + {ok, _} = emqx:update_config( [rule_engine, rules, Id], #{ <<"actions">> => [ #{<<"function">> => GetSelectedData}, #{<<"function">> => <<"console">>}, - #{<<"function">> => <<"republish">>}, + #{ + <<"function">> => <<"republish">>, + <<"args">> => #{<<"topic">> => <<"some/topic">>} + }, <<"mysql:foo">>, <<"mqtt:bar">> ], @@ -1407,6 +1412,189 @@ t_sqlselect_02(_Config) -> emqtt:stop(Client), delete_rule(TopicRule1). +t_sqlselect_03(_Config) -> + init_events_counters(), + SQL = "SELECT * FROM \"t/r\" ", + Repub = republish_action( + <<"t/republish">>, + <<"${.}">>, + <<"${pub_props.'User-Property'}">>, + #{ + <<"Payload-Format-Indicator">> => <<"${.payload.pfi}">>, + <<"Message-Expiry-Interval">> => <<"${.payload.mei}">>, + <<"Content-Type">> => <<"${.payload.ct}">>, + <<"Response-Topic">> => <<"${.payload.rt}">>, + <<"Correlation-Data">> => <<"${.payload.cd}">>, + <<"Subscription-Identifier">> => <<"${.payload.si}">> + } + ), + RepubRaw = emqx_utils_maps:binary_key_map(Repub#{function => <<"republish">>}), + ct:pal("republish action raw:\n ~p", [RepubRaw]), + RuleRaw = #{ + <<"sql">> => SQL, + <<"actions">> => [RepubRaw] + }, + {ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw, #{}), + on_exit(fun() -> emqx_rule_engine:delete_rule(?TMP_RULEID) end), + %% to check what republish is actually producing without loss of information + SQL1 = "select * from \"t/republish\" ", + RuleId0 = ?TMP_RULEID, + RuleId1 = <>, + {ok, _} = emqx_rule_engine:create_rule( + #{ + sql => SQL1, + id => RuleId1, + actions => [ + #{ + function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>, + args => #{} + } + ] + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId1) end), + + UserProps = maps:to_list(#{<<"mykey">> => <<"myval">>}), + Payload = + emqx_utils_json:encode( + #{ + pfi => 1, + mei => 2, + ct => <<"3">>, + rt => <<"4">>, + cd => <<"5">>, + si => 6, + ta => 7 + } + ), + {ok, Client} = emqtt:start_link([ + {username, <<"emqx">>}, + {proto_ver, v5}, + {properties, #{'Topic-Alias-Maximum' => 100}} + ]), + on_exit(fun() -> emqtt:stop(Client) end), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"t/republish">>, 0), + PubProps = #{'User-Property' => UserProps}, + ExpectedMQTTProps0 = #{ + 'Payload-Format-Indicator' => 1, + 'Message-Expiry-Interval' => 2, + 'Content-Type' => <<"3">>, + 'Response-Topic' => <<"4">>, + 'Correlation-Data' => <<"5">>, + 'Subscription-Identifier' => 6, + %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', + %% so the channel controls those aliases on its own, starting from 1. + 'Topic-Alias' => 1, + 'User-Property' => UserProps + }, + emqtt:publish(Client, <<"t/r">>, PubProps, Payload, [{qos, 0}]), + receive + {publish, #{topic := <<"t/republish">>, properties := Props1}} -> + ?assertEqual(ExpectedMQTTProps0, Props1), + ok + after 2000 -> + ct:pal("mailbox:\n ~p", [?drainMailbox()]), + ct:fail("message not republished (l. ~b)", [?LINE]) + end, + ExpectedMQTTProps1 = #{ + 'Payload-Format-Indicator' => 1, + 'Message-Expiry-Interval' => 2, + 'Content-Type' => <<"3">>, + 'Response-Topic' => <<"4">>, + 'Correlation-Data' => <<"5">>, + 'Subscription-Identifier' => 6, + 'User-Property' => maps:from_list(UserProps), + 'User-Property-Pairs' => [ + #{key => K, value => V} + || {K, V} <- UserProps + ] + }, + ?assertMatch( + [ + {'message.publish', #{ + topic := <<"t/republish">>, + pub_props := ExpectedMQTTProps1 + }} + ], + ets:lookup(events_record_tab, 'message.publish'), + #{expected_props => ExpectedMQTTProps1} + ), + + ct:pal("testing payload that is not a json object"), + emqtt:publish(Client, <<"t/r">>, PubProps, <<"not-a-map">>, [{qos, 0}]), + ExpectedMQTTProps2 = #{ + %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', + %% so the channel controls those aliases on its own, starting from 1. + 'Topic-Alias' => 1, + 'User-Property' => UserProps + }, + receive + {publish, #{topic := T1, properties := Props2}} -> + ?assertEqual(ExpectedMQTTProps2, Props2), + %% empty this time, due to topic alias set before + ?assertEqual(<<>>, T1), + ok + after 2000 -> + ct:pal("mailbox:\n ~p", [?drainMailbox()]), + ct:fail("message not republished (l. ~b)", [?LINE]) + end, + + ct:pal("testing payload with some uncoercible keys"), + ets:delete_all_objects(events_record_tab), + Payload1 = + emqx_utils_json:encode(#{ + pfi => <<"bad_value1">>, + mei => <<"bad_value2">>, + ct => <<"some_value3">>, + rt => <<"some_value4">>, + cd => <<"some_value5">>, + si => <<"bad_value6">>, + ta => <<"bad_value7">> + }), + emqtt:publish(Client, <<"t/r">>, PubProps, Payload1, [{qos, 0}]), + ExpectedMQTTProps3 = #{ + %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', + %% so the channel controls those aliases on its own, starting from 1. + 'Topic-Alias' => 1, + 'Content-Type' => <<"some_value3">>, + 'Response-Topic' => <<"some_value4">>, + 'Correlation-Data' => <<"some_value5">>, + 'User-Property' => UserProps + }, + receive + {publish, #{topic := T2, properties := Props3}} -> + ?assertEqual(ExpectedMQTTProps3, Props3), + %% empty this time, due to topic alias set before + ?assertEqual(<<>>, T2), + ok + after 2000 -> + ct:pal("mailbox:\n ~p", [?drainMailbox()]), + ct:fail("message not republished (l. ~b)", [?LINE]) + end, + ExpectedMQTTProps4 = #{ + 'Content-Type' => <<"some_value3">>, + 'Response-Topic' => <<"some_value4">>, + 'Correlation-Data' => <<"some_value5">>, + 'User-Property' => maps:from_list(UserProps), + 'User-Property-Pairs' => [ + #{key => K, value => V} + || {K, V} <- UserProps + ] + }, + ?assertMatch( + [ + {'message.publish', #{ + topic := <<"t/republish">>, + pub_props := ExpectedMQTTProps4 + }} + ], + ets:lookup(events_record_tab, 'message.publish'), + #{expected_props => ExpectedMQTTProps4} + ), + + ok. + t_sqlselect_1(_Config) -> SQL = "SELECT json_decode(payload) as p, payload " @@ -3211,6 +3399,9 @@ republish_action(Topic, Payload) -> republish_action(Topic, Payload, <<"${user_properties}">>). republish_action(Topic, Payload, UserProperties) -> + republish_action(Topic, Payload, UserProperties, _MQTTProperties = #{}). + +republish_action(Topic, Payload, UserProperties, MQTTProperties) -> #{ function => republish, args => #{ @@ -3218,6 +3409,7 @@ republish_action(Topic, Payload, UserProperties) -> topic => Topic, qos => 0, retain => false, + mqtt_properties => MQTTProperties, user_properties => UserProperties } }. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl new file mode 100644 index 000000000..74b55e151 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl @@ -0,0 +1,134 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_schema_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%%=========================================================================== +%% Data Section +%%=========================================================================== + +%% erlfmt-ignore +republish_hocon0() -> +""" +rule_engine.rules.my_rule { + description = \"some desc\" + metadata = {created_at = 1693918992079} + sql = \"select * from \\\"t/topic\\\" \" + actions = [ + {function = console} + { function = republish + args = { + payload = \"${.}\" + qos = 0 + retain = false + topic = \"t/repu\" + mqtt_properties { + \"Payload-Format-Indicator\" = \"${.payload.pfi}\" + \"Message-Expiry-Interval\" = \"${.payload.mei}\" + \"Content-Type\" = \"${.payload.ct}\" + \"Response-Topic\" = \"${.payload.rt}\" + \"Correlation-Data\" = \"${.payload.cd}\" + \"Subscription-Identifier\" = \"${.payload.si}\" + } + user_properties = \"${pub_props.'User-Property'}\" + } + }, + \"bridges:kafka:kprodu\", + { function = custom_fn + args = { + actually = not_republish + } + } + ] +} +""". + +%%=========================================================================== +%% Helper functions +%%=========================================================================== + +parse(Hocon) -> + {ok, Conf} = hocon:binary(Hocon), + Conf. + +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_rule_engine_schema, Conf). + +-define(validation_error(Reason, Value), + {emqx_rule_engine_schema, [ + #{ + kind := validation_error, + reason := Reason, + value := Value + } + ]} +). + +-define(ok_config(Cfg), #{ + <<"rule_engine">> := + #{ + <<"rules">> := + #{ + <<"my_rule">> := + Cfg + } + } +}). + +%%=========================================================================== +%% Test cases +%%=========================================================================== + +republish_test_() -> + BaseConf = parse(republish_hocon0()), + [ + {"base config", + ?_assertMatch( + ?ok_config( + #{ + <<"actions">> := [ + #{<<"function">> := console}, + #{ + <<"function">> := republish, + <<"args">> := + #{ + <<"mqtt_properties">> := + #{ + <<"Payload-Format-Indicator">> := <<_/binary>>, + <<"Message-Expiry-Interval">> := <<_/binary>>, + <<"Content-Type">> := <<_/binary>>, + <<"Response-Topic">> := <<_/binary>>, + <<"Correlation-Data">> := <<_/binary>>, + <<"Subscription-Identifier">> := <<_/binary>> + } + } + }, + <<"bridges:kafka:kprodu">>, + #{ + <<"function">> := <<"custom_fn">>, + <<"args">> := + #{ + <<"actually">> := <<"not_republish">> + } + } + ] + } + ), + check(BaseConf) + )} + ]. diff --git a/changes/ce/feat-11568.en.md b/changes/ce/feat-11568.en.md new file mode 100644 index 000000000..8c5a70257 --- /dev/null +++ b/changes/ce/feat-11568.en.md @@ -0,0 +1 @@ +Added support for defining templates for MQTT publish properties in Republish rule action. diff --git a/changes/ce/fix-11568.en.md b/changes/ce/fix-11568.en.md new file mode 100644 index 000000000..b1a5461c5 --- /dev/null +++ b/changes/ce/fix-11568.en.md @@ -0,0 +1 @@ +Fixed an issue where an ill-defined builtin rule action config could be interpreted as a custom user function. diff --git a/rel/i18n/emqx_rule_engine_schema.hocon b/rel/i18n/emqx_rule_engine_schema.hocon index 9b1f1f802..c661c553a 100644 --- a/rel/i18n/emqx_rule_engine_schema.hocon +++ b/rel/i18n/emqx_rule_engine_schema.hocon @@ -103,6 +103,16 @@ You may also call map_put function like to inject user properties. NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.""" +republish_args_user_properties.label: +"""User Properties""" + +republish_args_mqtt_properties.desc: +"""From which variable should the MQTT Publish Properties of the message be taken from. +Placeholders like ${.payload.content_type} may be used.""" + +republish_args_mqtt_properties.label: +"""MQTT Properties""" + republish_function.desc: """Republish the message as a new MQTT message"""