From 78c5a779d7c721663470609acc80c52efd3c7886 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 5 Sep 2023 14:34:18 -0300 Subject: [PATCH 1/8] feat(republish): allow templating mqtt properties Fixes https://emqx.atlassian.net/browse/EMQX-10912 --- .../test/emqx_bridge_mqtt_SUITE.erl | 2 +- .../emqx_dashboard/src/emqx_dashboard.app.src | 2 +- .../src/emqx_rule_actions.erl | 84 +++++++- .../src/emqx_rule_engine.app.src | 2 +- .../src/emqx_rule_engine_schema.erl | 54 ++++- .../test/emqx_rule_engine_SUITE.erl | 196 +++++++++++++++++- .../test/emqx_rule_engine_schema_tests.erl | 134 ++++++++++++ changes/ce/feat-11568.en.md | 1 + changes/ce/fix-11568.en.md | 1 + rel/i18n/emqx_rule_engine_schema.hocon | 10 + 10 files changed, 471 insertions(+), 15 deletions(-) create mode 100644 apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl create mode 100644 changes/ce/feat-11568.en.md create mode 100644 changes/ce/fix-11568.en.md diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index 29cfe976a..bc0f2450a 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -640,7 +640,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> #{ <<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>, <<"enable">> => true, - <<"actions">> => [#{<<"function">> => "emqx_bridge_mqtt_SUITE:inspect"}], + <<"actions">> => [#{<<"function">> => <<"emqx_bridge_mqtt_SUITE:inspect">>}], <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> } ), diff --git a/apps/emqx_dashboard/src/emqx_dashboard.app.src b/apps/emqx_dashboard/src/emqx_dashboard.app.src index f8395025e..42a379bf7 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.app.src +++ b/apps/emqx_dashboard/src/emqx_dashboard.app.src @@ -2,7 +2,7 @@ {application, emqx_dashboard, [ {description, "EMQX Web Dashboard"}, % strict semver, bump manually! - {vsn, "5.0.26"}, + {vsn, "5.0.27"}, {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index a5ad35066..93da7191a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -20,6 +20,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqtt/include/emqtt.hrl"). %% APIs -export([parse_action/1]). @@ -60,16 +61,23 @@ pre_process_action_args( qos := QoS, retain := Retain, payload := Payload, - user_properties := UserProperties + mqtt_properties := MQTTPropertiesTemplate0, + user_properties := UserPropertiesTemplate } = Args ) -> + MQTTPropertiesTemplate = + maps:map( + fun(_Key, V) -> emqx_placeholder:preproc_tmpl(V) end, + MQTTPropertiesTemplate0 + ), Args#{ preprocessed_tmpl => #{ topic => emqx_placeholder:preproc_tmpl(Topic), qos => preproc_vars(QoS), retain => preproc_vars(Retain), payload => emqx_placeholder:preproc_tmpl(Payload), - user_properties => preproc_user_properties(UserProperties) + mqtt_properties => MQTTPropertiesTemplate, + user_properties => preproc_user_properties(UserPropertiesTemplate) } }; pre_process_action_args(_, Args) -> @@ -106,6 +114,7 @@ republish( retain := RetainTks, topic := TopicTks, payload := PayloadTks, + mqtt_properties := MQTTPropertiesTemplate, user_properties := UserPropertiesTks } } @@ -118,7 +127,9 @@ republish( %% events such as message.acked and message.dropped Flags0 = maps:get(flags, Env, #{}), Flags = Flags0#{retain => Retain}, - PubProps = format_pub_props(UserPropertiesTks, Selected, Env), + PubProps0 = format_pub_props(UserPropertiesTks, Selected, Env), + MQTTProps = format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env), + PubProps = maps:merge(PubProps0, MQTTProps), ?TRACE( "RULE", "republish_message", @@ -232,3 +243,70 @@ format_pub_props(UserPropertiesTks, Selected, Env) -> replace_simple_var(UserPropertiesTks, Selected, #{}) end, #{'User-Property' => UserProperties}. + +format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> + #{metadata := #{rule_id := RuleId}} = Env, + MQTTProperties0 = + maps:fold( + fun(K, Template, Acc) -> + try + V = emqx_placeholder:proc_tmpl(Template, Selected), + Acc#{K => V} + catch + Kind:Error -> + ?SLOG( + error, + #{ + msg => "bad_mqtt_prop_value", + rule_id => RuleId, + reason => {Kind, Error}, + property => K, + selected => Selected + } + ), + Acc + end + end, + #{}, + MQTTPropertiesTemplate + ), + coerce_properties_values(MQTTProperties0, Env). + +ensure_int(B) when is_binary(B) -> binary_to_integer(B); +ensure_int(I) when is_integer(I) -> I. + +coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) -> + PublishProperties = + maps:from_list([ + {K, T} + || {_Id, {K, T, Tags}} <- maps:to_list(emqtt_props:all()), + is_list(Tags) andalso lists:member(?PUBLISH, Tags) + ]), + maps:fold( + fun(K, V, Acc) -> + try + case maps:get(K, PublishProperties) of + 'Byte' -> Acc#{K => ensure_int(V)}; + 'Two-Byte-Integer' -> Acc#{K => ensure_int(V)}; + 'Four-Byte-Integer' -> Acc#{K => ensure_int(V)}; + 'Variable-Byte-Integer' -> Acc#{K => ensure_int(V)}; + _ -> Acc#{K => V} + end + catch + Kind:Error -> + ?SLOG( + error, + #{ + msg => "bad_mqtt_prop_value", + rule_id => RuleId, + reason => {Kind, Error}, + property => K, + value => V + } + ), + Acc + end + end, + #{}, + MQTTProperties + ). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index e6d00bcae..23e4a3f05 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.23"}, + {vsn, "5.0.24"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl, uuid]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index 56da4ea41..bd7d44b69 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -63,7 +63,7 @@ fields("rules") -> )}, {"actions", ?HOCON( - ?ARRAY(?UNION(actions())), + ?ARRAY(hoconsc:union(actions())), #{ desc => ?DESC("rules_actions"), default => [], @@ -161,6 +161,14 @@ fields("republish_args") -> example => <<"${payload}">> } )}, + {mqtt_properties, + ?HOCON( + ?R_REF("republish_mqtt_properties"), + #{ + desc => ?DESC("republish_args_mqtt_properties"), + default => #{} + } + )}, {user_properties, ?HOCON( binary(), @@ -170,6 +178,19 @@ fields("republish_args") -> example => <<"${pub_props.'User-Property'}">> } )} + ]; +fields("republish_mqtt_properties") -> + [ + {'Payload-Format-Indicator', + ?HOCON(binary(), #{required => false, desc => ?DESC('Payload-Format-Indicator')})}, + {'Message-Expiry-Interval', + ?HOCON(binary(), #{required => false, desc => ?DESC('Message-Expiry-Interval')})}, + {'Content-Type', ?HOCON(binary(), #{required => false, desc => ?DESC('Content-Type')})}, + {'Response-Topic', ?HOCON(binary(), #{required => false, desc => ?DESC('Response-Topic')})}, + {'Correlation-Data', + ?HOCON(binary(), #{required => false, desc => ?DESC('Correlation-Data')})}, + {'Subscription-Identifier', + ?HOCON(binary(), #{required => false, desc => ?DESC('Subscription-Identifier')})} ]. desc("rule_engine") -> @@ -200,12 +221,31 @@ rule_name() -> )}. actions() -> - [ - binary(), - ?R_REF("builtin_action_republish"), - ?R_REF("builtin_action_console"), - ?R_REF("user_provided_function") - ]. + fun + (all_union_members) -> + [ + binary(), + ?R_REF("builtin_action_republish"), + ?R_REF("builtin_action_console"), + ?R_REF("user_provided_function") + ]; + ({value, V}) -> + case V of + #{<<"function">> := <<"console">>} -> + [?R_REF("builtin_action_console")]; + #{<<"function">> := <<"republish">>} -> + [?R_REF("builtin_action_republish")]; + #{<<"function">> := <<_/binary>>} -> + [?R_REF("user_provided_function")]; + <<_/binary>> -> + [binary()]; + _ -> + throw(#{ + field_name => actions, + reason => <<"unknown action type">> + }) + end + end. qos() -> ?UNION([emqx_schema:qos(), binary()]). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 8c3bd0ebb..2e24ab3a2 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("emqx/include/emqx.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -74,6 +75,7 @@ groups() -> t_sqlselect_inject_props, t_sqlselect_01, t_sqlselect_02, + t_sqlselect_03, t_sqlselect_1, t_sqlselect_2, t_sqlselect_3, @@ -520,13 +522,16 @@ t_get_rule_ids_by_action(_) -> t_ensure_action_removed(_) -> Id = <<"t_ensure_action_removed">>, GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>, - emqx:update_config( + {ok, _} = emqx:update_config( [rule_engine, rules, Id], #{ <<"actions">> => [ #{<<"function">> => GetSelectedData}, #{<<"function">> => <<"console">>}, - #{<<"function">> => <<"republish">>}, + #{ + <<"function">> => <<"republish">>, + <<"args">> => #{<<"topic">> => <<"some/topic">>} + }, <<"mysql:foo">>, <<"mqtt:bar">> ], @@ -1407,6 +1412,189 @@ t_sqlselect_02(_Config) -> emqtt:stop(Client), delete_rule(TopicRule1). +t_sqlselect_03(_Config) -> + init_events_counters(), + SQL = "SELECT * FROM \"t/r\" ", + Repub = republish_action( + <<"t/republish">>, + <<"${.}">>, + <<"${pub_props.'User-Property'}">>, + #{ + <<"Payload-Format-Indicator">> => <<"${.payload.pfi}">>, + <<"Message-Expiry-Interval">> => <<"${.payload.mei}">>, + <<"Content-Type">> => <<"${.payload.ct}">>, + <<"Response-Topic">> => <<"${.payload.rt}">>, + <<"Correlation-Data">> => <<"${.payload.cd}">>, + <<"Subscription-Identifier">> => <<"${.payload.si}">> + } + ), + RepubRaw = emqx_utils_maps:binary_key_map(Repub#{function => <<"republish">>}), + ct:pal("republish action raw:\n ~p", [RepubRaw]), + RuleRaw = #{ + <<"sql">> => SQL, + <<"actions">> => [RepubRaw] + }, + {ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw, #{}), + on_exit(fun() -> emqx_rule_engine:delete_rule(?TMP_RULEID) end), + %% to check what republish is actually producing without loss of information + SQL1 = "select * from \"t/republish\" ", + RuleId0 = ?TMP_RULEID, + RuleId1 = <>, + {ok, _} = emqx_rule_engine:create_rule( + #{ + sql => SQL1, + id => RuleId1, + actions => [ + #{ + function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>, + args => #{} + } + ] + } + ), + on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId1) end), + + UserProps = maps:to_list(#{<<"mykey">> => <<"myval">>}), + Payload = + emqx_utils_json:encode( + #{ + pfi => 1, + mei => 2, + ct => <<"3">>, + rt => <<"4">>, + cd => <<"5">>, + si => 6, + ta => 7 + } + ), + {ok, Client} = emqtt:start_link([ + {username, <<"emqx">>}, + {proto_ver, v5}, + {properties, #{'Topic-Alias-Maximum' => 100}} + ]), + on_exit(fun() -> emqtt:stop(Client) end), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"t/republish">>, 0), + PubProps = #{'User-Property' => UserProps}, + ExpectedMQTTProps0 = #{ + 'Payload-Format-Indicator' => 1, + 'Message-Expiry-Interval' => 2, + 'Content-Type' => <<"3">>, + 'Response-Topic' => <<"4">>, + 'Correlation-Data' => <<"5">>, + 'Subscription-Identifier' => 6, + %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', + %% so the channel controls those aliases on its own, starting from 1. + 'Topic-Alias' => 1, + 'User-Property' => UserProps + }, + emqtt:publish(Client, <<"t/r">>, PubProps, Payload, [{qos, 0}]), + receive + {publish, #{topic := <<"t/republish">>, properties := Props1}} -> + ?assertEqual(ExpectedMQTTProps0, Props1), + ok + after 2000 -> + ct:pal("mailbox:\n ~p", [?drainMailbox()]), + ct:fail("message not republished (l. ~b)", [?LINE]) + end, + ExpectedMQTTProps1 = #{ + 'Payload-Format-Indicator' => 1, + 'Message-Expiry-Interval' => 2, + 'Content-Type' => <<"3">>, + 'Response-Topic' => <<"4">>, + 'Correlation-Data' => <<"5">>, + 'Subscription-Identifier' => 6, + 'User-Property' => maps:from_list(UserProps), + 'User-Property-Pairs' => [ + #{key => K, value => V} + || {K, V} <- UserProps + ] + }, + ?assertMatch( + [ + {'message.publish', #{ + topic := <<"t/republish">>, + pub_props := ExpectedMQTTProps1 + }} + ], + ets:lookup(events_record_tab, 'message.publish'), + #{expected_props => ExpectedMQTTProps1} + ), + + ct:pal("testing payload that is not a json object"), + emqtt:publish(Client, <<"t/r">>, PubProps, <<"not-a-map">>, [{qos, 0}]), + ExpectedMQTTProps2 = #{ + %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', + %% so the channel controls those aliases on its own, starting from 1. + 'Topic-Alias' => 1, + 'User-Property' => UserProps + }, + receive + {publish, #{topic := T1, properties := Props2}} -> + ?assertEqual(ExpectedMQTTProps2, Props2), + %% empty this time, due to topic alias set before + ?assertEqual(<<>>, T1), + ok + after 2000 -> + ct:pal("mailbox:\n ~p", [?drainMailbox()]), + ct:fail("message not republished (l. ~b)", [?LINE]) + end, + + ct:pal("testing payload with some uncoercible keys"), + ets:delete_all_objects(events_record_tab), + Payload1 = + emqx_utils_json:encode(#{ + pfi => <<"bad_value1">>, + mei => <<"bad_value2">>, + ct => <<"some_value3">>, + rt => <<"some_value4">>, + cd => <<"some_value5">>, + si => <<"bad_value6">>, + ta => <<"bad_value7">> + }), + emqtt:publish(Client, <<"t/r">>, PubProps, Payload1, [{qos, 0}]), + ExpectedMQTTProps3 = #{ + %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', + %% so the channel controls those aliases on its own, starting from 1. + 'Topic-Alias' => 1, + 'Content-Type' => <<"some_value3">>, + 'Response-Topic' => <<"some_value4">>, + 'Correlation-Data' => <<"some_value5">>, + 'User-Property' => UserProps + }, + receive + {publish, #{topic := T2, properties := Props3}} -> + ?assertEqual(ExpectedMQTTProps3, Props3), + %% empty this time, due to topic alias set before + ?assertEqual(<<>>, T2), + ok + after 2000 -> + ct:pal("mailbox:\n ~p", [?drainMailbox()]), + ct:fail("message not republished (l. ~b)", [?LINE]) + end, + ExpectedMQTTProps4 = #{ + 'Content-Type' => <<"some_value3">>, + 'Response-Topic' => <<"some_value4">>, + 'Correlation-Data' => <<"some_value5">>, + 'User-Property' => maps:from_list(UserProps), + 'User-Property-Pairs' => [ + #{key => K, value => V} + || {K, V} <- UserProps + ] + }, + ?assertMatch( + [ + {'message.publish', #{ + topic := <<"t/republish">>, + pub_props := ExpectedMQTTProps4 + }} + ], + ets:lookup(events_record_tab, 'message.publish'), + #{expected_props => ExpectedMQTTProps4} + ), + + ok. + t_sqlselect_1(_Config) -> SQL = "SELECT json_decode(payload) as p, payload " @@ -3211,6 +3399,9 @@ republish_action(Topic, Payload) -> republish_action(Topic, Payload, <<"${user_properties}">>). republish_action(Topic, Payload, UserProperties) -> + republish_action(Topic, Payload, UserProperties, _MQTTProperties = #{}). + +republish_action(Topic, Payload, UserProperties, MQTTProperties) -> #{ function => republish, args => #{ @@ -3218,6 +3409,7 @@ republish_action(Topic, Payload, UserProperties) -> topic => Topic, qos => 0, retain => false, + mqtt_properties => MQTTProperties, user_properties => UserProperties } }. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl new file mode 100644 index 000000000..74b55e151 --- /dev/null +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl @@ -0,0 +1,134 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_rule_engine_schema_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%%=========================================================================== +%% Data Section +%%=========================================================================== + +%% erlfmt-ignore +republish_hocon0() -> +""" +rule_engine.rules.my_rule { + description = \"some desc\" + metadata = {created_at = 1693918992079} + sql = \"select * from \\\"t/topic\\\" \" + actions = [ + {function = console} + { function = republish + args = { + payload = \"${.}\" + qos = 0 + retain = false + topic = \"t/repu\" + mqtt_properties { + \"Payload-Format-Indicator\" = \"${.payload.pfi}\" + \"Message-Expiry-Interval\" = \"${.payload.mei}\" + \"Content-Type\" = \"${.payload.ct}\" + \"Response-Topic\" = \"${.payload.rt}\" + \"Correlation-Data\" = \"${.payload.cd}\" + \"Subscription-Identifier\" = \"${.payload.si}\" + } + user_properties = \"${pub_props.'User-Property'}\" + } + }, + \"bridges:kafka:kprodu\", + { function = custom_fn + args = { + actually = not_republish + } + } + ] +} +""". + +%%=========================================================================== +%% Helper functions +%%=========================================================================== + +parse(Hocon) -> + {ok, Conf} = hocon:binary(Hocon), + Conf. + +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_rule_engine_schema, Conf). + +-define(validation_error(Reason, Value), + {emqx_rule_engine_schema, [ + #{ + kind := validation_error, + reason := Reason, + value := Value + } + ]} +). + +-define(ok_config(Cfg), #{ + <<"rule_engine">> := + #{ + <<"rules">> := + #{ + <<"my_rule">> := + Cfg + } + } +}). + +%%=========================================================================== +%% Test cases +%%=========================================================================== + +republish_test_() -> + BaseConf = parse(republish_hocon0()), + [ + {"base config", + ?_assertMatch( + ?ok_config( + #{ + <<"actions">> := [ + #{<<"function">> := console}, + #{ + <<"function">> := republish, + <<"args">> := + #{ + <<"mqtt_properties">> := + #{ + <<"Payload-Format-Indicator">> := <<_/binary>>, + <<"Message-Expiry-Interval">> := <<_/binary>>, + <<"Content-Type">> := <<_/binary>>, + <<"Response-Topic">> := <<_/binary>>, + <<"Correlation-Data">> := <<_/binary>>, + <<"Subscription-Identifier">> := <<_/binary>> + } + } + }, + <<"bridges:kafka:kprodu">>, + #{ + <<"function">> := <<"custom_fn">>, + <<"args">> := + #{ + <<"actually">> := <<"not_republish">> + } + } + ] + } + ), + check(BaseConf) + )} + ]. diff --git a/changes/ce/feat-11568.en.md b/changes/ce/feat-11568.en.md new file mode 100644 index 000000000..8c5a70257 --- /dev/null +++ b/changes/ce/feat-11568.en.md @@ -0,0 +1 @@ +Added support for defining templates for MQTT publish properties in Republish rule action. diff --git a/changes/ce/fix-11568.en.md b/changes/ce/fix-11568.en.md new file mode 100644 index 000000000..b1a5461c5 --- /dev/null +++ b/changes/ce/fix-11568.en.md @@ -0,0 +1 @@ +Fixed an issue where an ill-defined builtin rule action config could be interpreted as a custom user function. diff --git a/rel/i18n/emqx_rule_engine_schema.hocon b/rel/i18n/emqx_rule_engine_schema.hocon index 9b1f1f802..c661c553a 100644 --- a/rel/i18n/emqx_rule_engine_schema.hocon +++ b/rel/i18n/emqx_rule_engine_schema.hocon @@ -103,6 +103,16 @@ You may also call map_put function like to inject user properties. NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.""" +republish_args_user_properties.label: +"""User Properties""" + +republish_args_mqtt_properties.desc: +"""From which variable should the MQTT Publish Properties of the message be taken from. +Placeholders like ${.payload.content_type} may be used.""" + +republish_args_mqtt_properties.label: +"""MQTT Properties""" + republish_function.desc: """Republish the message as a new MQTT message""" From c9100c75915f2d209c941c1cc5f0657ec0507ff3 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Sep 2023 13:33:59 -0300 Subject: [PATCH 2/8] refactor: improve logging information and descriptions Co-authored-by: Zaiming (Stone) Shi --- apps/emqx_rule_engine/src/emqx_rule_actions.erl | 10 ++++++---- rel/i18n/emqx_rule_engine_schema.hocon | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 93da7191a..89aceb96e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -257,9 +257,10 @@ format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> ?SLOG( error, #{ - msg => "bad_mqtt_prop_value", + msg => "bad_mqtt_property_value_ignored", rule_id => RuleId, - reason => {Kind, Error}, + exception => Kind, + reason => Error, property => K, selected => Selected } @@ -297,9 +298,10 @@ coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) -> ?SLOG( error, #{ - msg => "bad_mqtt_prop_value", + msg => "bad_mqtt_property_value_ignored", rule_id => RuleId, - reason => {Kind, Error}, + reason => Error, + exception => Kind, property => K, value => V } diff --git a/rel/i18n/emqx_rule_engine_schema.hocon b/rel/i18n/emqx_rule_engine_schema.hocon index c661c553a..8a710a443 100644 --- a/rel/i18n/emqx_rule_engine_schema.hocon +++ b/rel/i18n/emqx_rule_engine_schema.hocon @@ -107,7 +107,7 @@ republish_args_user_properties.label: """User Properties""" republish_args_mqtt_properties.desc: -"""From which variable should the MQTT Publish Properties of the message be taken from. +"""From which variable should the MQTT Publish Properties of the message be taken. Placeholders like ${.payload.content_type} may be used.""" republish_args_mqtt_properties.label: From bb55b04d46ab20b855fff1532a2a6c8adc8d1d5f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Sep 2023 13:54:40 -0300 Subject: [PATCH 3/8] refactor: inline fn clauses, improve error logging --- .../src/emqx_rule_actions.erl | 56 ++++++++++++------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 89aceb96e..97e9225ca 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -273,37 +273,46 @@ format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> ), coerce_properties_values(MQTTProperties0, Env). -ensure_int(B) when is_binary(B) -> binary_to_integer(B); -ensure_int(I) when is_integer(I) -> I. +ensure_int(B) when is_binary(B) -> + try + binary_to_integer(B) + catch + error:badarg -> + throw(bad_integer) + end; +ensure_int(I) when is_integer(I) -> + I. coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) -> - PublishProperties = - maps:from_list([ - {K, T} - || {_Id, {K, T, Tags}} <- maps:to_list(emqtt_props:all()), - is_list(Tags) andalso lists:member(?PUBLISH, Tags) - ]), maps:fold( - fun(K, V, Acc) -> + fun(K, V0, Acc) -> try - case maps:get(K, PublishProperties) of - 'Byte' -> Acc#{K => ensure_int(V)}; - 'Two-Byte-Integer' -> Acc#{K => ensure_int(V)}; - 'Four-Byte-Integer' -> Acc#{K => ensure_int(V)}; - 'Variable-Byte-Integer' -> Acc#{K => ensure_int(V)}; - _ -> Acc#{K => V} - end + V = encode_mqtt_property(K, V0), + Acc#{K => V} catch - Kind:Error -> + throw:bad_integer -> ?SLOG( error, #{ msg => "bad_mqtt_property_value_ignored", rule_id => RuleId, - reason => Error, - exception => Kind, + reason => bad_integer, property => K, - value => V + value => V0 + } + ), + Acc; + Kind:Reason:Stacktrace -> + ?SLOG( + error, + #{ + msg => "bad_mqtt_property_value_ignored", + rule_id => RuleId, + exception => Kind, + reason => Reason, + property => K, + value => V0, + stacktrace => Stacktrace } ), Acc @@ -312,3 +321,10 @@ coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) -> #{}, MQTTProperties ). + +%% Note: currently we do not support `Topic-Alias', which would need to be encoded as an +%% int. +encode_mqtt_property('Payload-Format-Indicator', V) -> ensure_int(V); +encode_mqtt_property('Message-Expiry-Interval', V) -> ensure_int(V); +encode_mqtt_property('Subscription-Identifier', V) -> ensure_int(V); +encode_mqtt_property(_Prop, V) -> V. From 080cb73da15fe698185b8d371c8f73bf2d10919d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Sep 2023 14:14:46 -0300 Subject: [PATCH 4/8] fix: handle badmap inside `emqx_placeholder:proc_tmpl` --- apps/emqx_utils/src/emqx_placeholder.erl | 23 +++++++++++++------ apps/emqx_utils/src/emqx_utils.app.src | 2 +- .../test/emqx_placeholder_SUITE.erl | 22 ++++++++++++++++++ 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/apps/emqx_utils/src/emqx_placeholder.erl b/apps/emqx_utils/src/emqx_placeholder.erl index 0f677236d..4d386840f 100644 --- a/apps/emqx_utils/src/emqx_placeholder.erl +++ b/apps/emqx_utils/src/emqx_placeholder.erl @@ -277,20 +277,29 @@ lookup_var([Prop | Rest], Data0) -> end. lookup(Prop, Data) when is_binary(Prop) -> - case maps:get(Prop, Data, undefined) of - undefined -> - try - {ok, maps:get(binary_to_existing_atom(Prop, utf8), Data)} + case do_one_lookup(Prop, Data) of + {error, undefined} -> + try binary_to_existing_atom(Prop, utf8) of + AtomKey -> + do_one_lookup(AtomKey, Data) catch - error:{badkey, _} -> - {error, undefined}; error:badarg -> {error, undefined} end; - Value -> + {ok, Value} -> {ok, Value} end. +do_one_lookup(Key, Data) -> + try + {ok, maps:get(Key, Data)} + catch + error:{badkey, _} -> + {error, undefined}; + error:{badmap, _} -> + {error, undefined} + end. + %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_utils/src/emqx_utils.app.src b/apps/emqx_utils/src/emqx_utils.app.src index 539bfd3b7..de7326a21 100644 --- a/apps/emqx_utils/src/emqx_utils.app.src +++ b/apps/emqx_utils/src/emqx_utils.app.src @@ -2,7 +2,7 @@ {application, emqx_utils, [ {description, "Miscellaneous utilities for EMQX apps"}, % strict semver, bump manually! - {vsn, "5.0.7"}, + {vsn, "5.0.8"}, {modules, [ emqx_utils, emqx_utils_api, diff --git a/apps/emqx_utils/test/emqx_placeholder_SUITE.erl b/apps/emqx_utils/test/emqx_placeholder_SUITE.erl index f813656f2..a1250269f 100644 --- a/apps/emqx_utils/test/emqx_placeholder_SUITE.erl +++ b/apps/emqx_utils/test/emqx_placeholder_SUITE.erl @@ -256,3 +256,25 @@ t_proc_tmpl_arbitrary_var_name_double_quote(_) -> <<"a:1,a:1-1,b:1,b:2,c:1.0,d:oo,d1:hi}">>, emqx_placeholder:proc_tmpl(Tks, Selected) ). + +t_proc_tmpl_badmap(_Config) -> + ThisTks = emqx_placeholder:preproc_tmpl(<<"${.}">>), + Tks = emqx_placeholder:preproc_tmpl(<<"${.a.b.c}">>), + BadMap = <<"not-a-map">>, + ?assertEqual( + <<"not-a-map">>, + emqx_placeholder:proc_tmpl(ThisTks, BadMap) + ), + ?assertEqual( + <<"undefined">>, + emqx_placeholder:proc_tmpl(Tks, #{<<"a">> => #{<<"b">> => BadMap}}) + ), + ?assertEqual( + <<"undefined">>, + emqx_placeholder:proc_tmpl(Tks, #{<<"a">> => BadMap}) + ), + ?assertEqual( + <<"undefined">>, + emqx_placeholder:proc_tmpl(Tks, BadMap) + ), + ok. From 47bd19dee4ade4a01a00aad1bcbab1bbb1de023b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Sep 2023 15:03:45 -0300 Subject: [PATCH 5/8] chore: lower error log messages down to debug --- apps/emqx_rule_engine/src/emqx_rule_actions.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 97e9225ca..a83902aad 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -255,7 +255,7 @@ format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) -> catch Kind:Error -> ?SLOG( - error, + debug, #{ msg => "bad_mqtt_property_value_ignored", rule_id => RuleId, @@ -292,7 +292,7 @@ coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) -> catch throw:bad_integer -> ?SLOG( - error, + debug, #{ msg => "bad_mqtt_property_value_ignored", rule_id => RuleId, @@ -304,7 +304,7 @@ coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) -> Acc; Kind:Reason:Stacktrace -> ?SLOG( - error, + debug, #{ msg => "bad_mqtt_property_value_ignored", rule_id => RuleId, From 02b8bbf76a9318029f6854542d71f854fef0a0ad Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Sep 2023 15:25:52 -0300 Subject: [PATCH 6/8] chore: drop templating support for `Subscription-Identifier` This is highly-dependent on the session state, as is `Topic-Alias`. --- .../src/emqx_rule_engine_schema.erl | 4 +--- .../test/emqx_rule_engine_SUITE.erl | 16 ++++++---------- .../test/emqx_rule_engine_schema_tests.erl | 4 +--- 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index bd7d44b69..ba4056421 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -188,9 +188,7 @@ fields("republish_mqtt_properties") -> {'Content-Type', ?HOCON(binary(), #{required => false, desc => ?DESC('Content-Type')})}, {'Response-Topic', ?HOCON(binary(), #{required => false, desc => ?DESC('Response-Topic')})}, {'Correlation-Data', - ?HOCON(binary(), #{required => false, desc => ?DESC('Correlation-Data')})}, - {'Subscription-Identifier', - ?HOCON(binary(), #{required => false, desc => ?DESC('Subscription-Identifier')})} + ?HOCON(binary(), #{required => false, desc => ?DESC('Correlation-Data')})} ]. desc("rule_engine") -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 2e24ab3a2..e23da39f8 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1424,8 +1424,7 @@ t_sqlselect_03(_Config) -> <<"Message-Expiry-Interval">> => <<"${.payload.mei}">>, <<"Content-Type">> => <<"${.payload.ct}">>, <<"Response-Topic">> => <<"${.payload.rt}">>, - <<"Correlation-Data">> => <<"${.payload.cd}">>, - <<"Subscription-Identifier">> => <<"${.payload.si}">> + <<"Correlation-Data">> => <<"${.payload.cd}">> } ), RepubRaw = emqx_utils_maps:binary_key_map(Repub#{function => <<"republish">>}), @@ -1462,9 +1461,7 @@ t_sqlselect_03(_Config) -> mei => 2, ct => <<"3">>, rt => <<"4">>, - cd => <<"5">>, - si => 6, - ta => 7 + cd => <<"5">> } ), {ok, Client} = emqtt:start_link([ @@ -1482,7 +1479,6 @@ t_sqlselect_03(_Config) -> 'Content-Type' => <<"3">>, 'Response-Topic' => <<"4">>, 'Correlation-Data' => <<"5">>, - 'Subscription-Identifier' => 6, %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', %% so the channel controls those aliases on its own, starting from 1. 'Topic-Alias' => 1, @@ -1503,7 +1499,6 @@ t_sqlselect_03(_Config) -> 'Content-Type' => <<"3">>, 'Response-Topic' => <<"4">>, 'Correlation-Data' => <<"5">>, - 'Subscription-Identifier' => 6, 'User-Property' => maps:from_list(UserProps), 'User-Property-Pairs' => [ #{key => K, value => V} @@ -1524,6 +1519,9 @@ t_sqlselect_03(_Config) -> ct:pal("testing payload that is not a json object"), emqtt:publish(Client, <<"t/r">>, PubProps, <<"not-a-map">>, [{qos, 0}]), ExpectedMQTTProps2 = #{ + 'Content-Type' => <<"undefined">>, + 'Correlation-Data' => <<"undefined">>, + 'Response-Topic' => <<"undefined">>, %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', %% so the channel controls those aliases on its own, starting from 1. 'Topic-Alias' => 1, @@ -1548,9 +1546,7 @@ t_sqlselect_03(_Config) -> mei => <<"bad_value2">>, ct => <<"some_value3">>, rt => <<"some_value4">>, - cd => <<"some_value5">>, - si => <<"bad_value6">>, - ta => <<"bad_value7">> + cd => <<"some_value5">> }), emqtt:publish(Client, <<"t/r">>, PubProps, Payload1, [{qos, 0}]), ExpectedMQTTProps3 = #{ diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl index 74b55e151..27dbafdc8 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_schema_tests.erl @@ -43,7 +43,6 @@ rule_engine.rules.my_rule { \"Content-Type\" = \"${.payload.ct}\" \"Response-Topic\" = \"${.payload.rt}\" \"Correlation-Data\" = \"${.payload.cd}\" - \"Subscription-Identifier\" = \"${.payload.si}\" } user_properties = \"${pub_props.'User-Property'}\" } @@ -113,8 +112,7 @@ republish_test_() -> <<"Message-Expiry-Interval">> := <<_/binary>>, <<"Content-Type">> := <<_/binary>>, <<"Response-Topic">> := <<_/binary>>, - <<"Correlation-Data">> := <<_/binary>>, - <<"Subscription-Identifier">> := <<_/binary>> + <<"Correlation-Data">> := <<_/binary>> } } }, From 014bc64d3b30cb4e26b584f8ceb2224fefb3c64e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Sep 2023 16:12:18 -0300 Subject: [PATCH 7/8] chore: ensure general prop is a binary --- apps/emqx_rule_engine/src/emqx_rule_actions.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index a83902aad..ce74203a4 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -327,4 +327,5 @@ coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) -> encode_mqtt_property('Payload-Format-Indicator', V) -> ensure_int(V); encode_mqtt_property('Message-Expiry-Interval', V) -> ensure_int(V); encode_mqtt_property('Subscription-Identifier', V) -> ensure_int(V); -encode_mqtt_property(_Prop, V) -> V. +%% note: `emqx_placeholder:proc_tmpl/2' currently always return a binary. +encode_mqtt_property(_Prop, V) when is_binary(V) -> V. From 68293231b86b54357c0a570a541a9246e23453fb Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 6 Sep 2023 17:21:35 -0300 Subject: [PATCH 8/8] test: add cases for more complex placeholders --- .../test/emqx_rule_engine_SUITE.erl | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index e23da39f8..6aa5351ee 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -1589,6 +1589,81 @@ t_sqlselect_03(_Config) -> #{expected_props => ExpectedMQTTProps4} ), + ct:pal("testing a payload with a more complex placeholder"), + Repub1 = republish_action( + <<"t/republish">>, + <<"${.}">>, + <<"${pub_props.'User-Property'}">>, + #{ + %% Note: `Payload-Format-Indicator' is capped at 225. + <<"Payload-Format-Indicator">> => <<"1${.payload.pfi}3">>, + <<"Message-Expiry-Interval">> => <<"9${.payload.mei}6">> + } + ), + RepubRaw1 = emqx_utils_maps:binary_key_map(Repub1#{function => <<"republish">>}), + ct:pal("republish action raw:\n ~p", [RepubRaw1]), + RuleRaw1 = #{ + <<"sql">> => SQL, + <<"actions">> => [RepubRaw1] + }, + {ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw1, #{}), + + Payload2 = + emqx_utils_json:encode(#{ + pfi => <<"2">>, + mei => <<"87">> + }), + emqtt:publish(Client, <<"t/r">>, PubProps, Payload2, [{qos, 0}]), + ExpectedMQTTProps5 = #{ + %% Note: PFI should be 0 or 1 according to spec, but we don't validate this when + %% serializing nor parsing... + 'Payload-Format-Indicator' => 123, + 'Message-Expiry-Interval' => 9876, + %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', + %% so the channel controls those aliases on its own, starting from 1. + 'Topic-Alias' => 1, + 'User-Property' => UserProps + }, + receive + {publish, #{topic := T3, properties := Props4}} -> + ?assertEqual(ExpectedMQTTProps5, Props4), + %% empty this time, due to topic alias set before + ?assertEqual(<<>>, T3), + ok + after 2000 -> + ct:pal("mailbox:\n ~p", [?drainMailbox()]), + ct:fail("message not republished (l. ~b)", [?LINE]) + end, + + ct:pal("testing payload-format-indicator cap"), + Payload3 = + emqx_utils_json:encode(#{ + pfi => <<"999999">>, + mei => <<"87">> + }), + emqtt:publish(Client, <<"t/r">>, PubProps, Payload3, [{qos, 0}]), + ExpectedMQTTProps6 = #{ + %% Note: PFI should be 0 or 1 according to spec, but we don't validate this when + %% serializing nor parsing... + %% Note: PFI is capped at 16#FF + 'Payload-Format-Indicator' => 16#FF band 19999993, + 'Message-Expiry-Interval' => 9876, + %% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props', + %% so the channel controls those aliases on its own, starting from 1. + 'Topic-Alias' => 1, + 'User-Property' => UserProps + }, + receive + {publish, #{topic := T4, properties := Props5}} -> + ?assertEqual(ExpectedMQTTProps6, Props5), + %% empty this time, due to topic alias set before + ?assertEqual(<<>>, T4), + ok + after 2000 -> + ct:pal("mailbox:\n ~p", [?drainMailbox()]), + ct:fail("message not republished (l. ~b)", [?LINE]) + end, + ok. t_sqlselect_1(_Config) ->