feat(republish): allow templating mqtt properties

Fixes https://emqx.atlassian.net/browse/EMQX-10912
This commit is contained in:
Thales Macedo Garitezi 2023-09-05 14:34:18 -03:00
parent 954b73a9aa
commit 78c5a779d7
10 changed files with 471 additions and 15 deletions

View File

@ -640,7 +640,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
#{ #{
<<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>, <<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>,
<<"enable">> => true, <<"enable">> => true,
<<"actions">> => [#{<<"function">> => "emqx_bridge_mqtt_SUITE:inspect"}], <<"actions">> => [#{<<"function">> => <<"emqx_bridge_mqtt_SUITE:inspect">>}],
<<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">>
} }
), ),

View File

@ -2,7 +2,7 @@
{application, emqx_dashboard, [ {application, emqx_dashboard, [
{description, "EMQX Web Dashboard"}, {description, "EMQX Web Dashboard"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.26"}, {vsn, "5.0.27"},
{modules, []}, {modules, []},
{registered, [emqx_dashboard_sup]}, {registered, [emqx_dashboard_sup]},
{applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]}, {applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl, emqx_bridge_http]},

View File

@ -20,6 +20,7 @@
-include("rule_engine.hrl"). -include("rule_engine.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqtt/include/emqtt.hrl").
%% APIs %% APIs
-export([parse_action/1]). -export([parse_action/1]).
@ -60,16 +61,23 @@ pre_process_action_args(
qos := QoS, qos := QoS,
retain := Retain, retain := Retain,
payload := Payload, payload := Payload,
user_properties := UserProperties mqtt_properties := MQTTPropertiesTemplate0,
user_properties := UserPropertiesTemplate
} = Args } = Args
) -> ) ->
MQTTPropertiesTemplate =
maps:map(
fun(_Key, V) -> emqx_placeholder:preproc_tmpl(V) end,
MQTTPropertiesTemplate0
),
Args#{ Args#{
preprocessed_tmpl => #{ preprocessed_tmpl => #{
topic => emqx_placeholder:preproc_tmpl(Topic), topic => emqx_placeholder:preproc_tmpl(Topic),
qos => preproc_vars(QoS), qos => preproc_vars(QoS),
retain => preproc_vars(Retain), retain => preproc_vars(Retain),
payload => emqx_placeholder:preproc_tmpl(Payload), payload => emqx_placeholder:preproc_tmpl(Payload),
user_properties => preproc_user_properties(UserProperties) mqtt_properties => MQTTPropertiesTemplate,
user_properties => preproc_user_properties(UserPropertiesTemplate)
} }
}; };
pre_process_action_args(_, Args) -> pre_process_action_args(_, Args) ->
@ -106,6 +114,7 @@ republish(
retain := RetainTks, retain := RetainTks,
topic := TopicTks, topic := TopicTks,
payload := PayloadTks, payload := PayloadTks,
mqtt_properties := MQTTPropertiesTemplate,
user_properties := UserPropertiesTks user_properties := UserPropertiesTks
} }
} }
@ -118,7 +127,9 @@ republish(
%% events such as message.acked and message.dropped %% events such as message.acked and message.dropped
Flags0 = maps:get(flags, Env, #{}), Flags0 = maps:get(flags, Env, #{}),
Flags = Flags0#{retain => Retain}, Flags = Flags0#{retain => Retain},
PubProps = format_pub_props(UserPropertiesTks, Selected, Env), PubProps0 = format_pub_props(UserPropertiesTks, Selected, Env),
MQTTProps = format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env),
PubProps = maps:merge(PubProps0, MQTTProps),
?TRACE( ?TRACE(
"RULE", "RULE",
"republish_message", "republish_message",
@ -232,3 +243,70 @@ format_pub_props(UserPropertiesTks, Selected, Env) ->
replace_simple_var(UserPropertiesTks, Selected, #{}) replace_simple_var(UserPropertiesTks, Selected, #{})
end, end,
#{'User-Property' => UserProperties}. #{'User-Property' => UserProperties}.
format_mqtt_properties(MQTTPropertiesTemplate, Selected, Env) ->
#{metadata := #{rule_id := RuleId}} = Env,
MQTTProperties0 =
maps:fold(
fun(K, Template, Acc) ->
try
V = emqx_placeholder:proc_tmpl(Template, Selected),
Acc#{K => V}
catch
Kind:Error ->
?SLOG(
error,
#{
msg => "bad_mqtt_prop_value",
rule_id => RuleId,
reason => {Kind, Error},
property => K,
selected => Selected
}
),
Acc
end
end,
#{},
MQTTPropertiesTemplate
),
coerce_properties_values(MQTTProperties0, Env).
ensure_int(B) when is_binary(B) -> binary_to_integer(B);
ensure_int(I) when is_integer(I) -> I.
coerce_properties_values(MQTTProperties, #{metadata := #{rule_id := RuleId}}) ->
PublishProperties =
maps:from_list([
{K, T}
|| {_Id, {K, T, Tags}} <- maps:to_list(emqtt_props:all()),
is_list(Tags) andalso lists:member(?PUBLISH, Tags)
]),
maps:fold(
fun(K, V, Acc) ->
try
case maps:get(K, PublishProperties) of
'Byte' -> Acc#{K => ensure_int(V)};
'Two-Byte-Integer' -> Acc#{K => ensure_int(V)};
'Four-Byte-Integer' -> Acc#{K => ensure_int(V)};
'Variable-Byte-Integer' -> Acc#{K => ensure_int(V)};
_ -> Acc#{K => V}
end
catch
Kind:Error ->
?SLOG(
error,
#{
msg => "bad_mqtt_prop_value",
rule_id => RuleId,
reason => {Kind, Error},
property => K,
value => V
}
),
Acc
end
end,
#{},
MQTTProperties
).

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [ {application, emqx_rule_engine, [
{description, "EMQX Rule Engine"}, {description, "EMQX Rule Engine"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.23"}, {vsn, "5.0.24"},
{modules, []}, {modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl, uuid]}, {applications, [kernel, stdlib, rulesql, getopt, emqx_ctl, uuid]},

View File

@ -63,7 +63,7 @@ fields("rules") ->
)}, )},
{"actions", {"actions",
?HOCON( ?HOCON(
?ARRAY(?UNION(actions())), ?ARRAY(hoconsc:union(actions())),
#{ #{
desc => ?DESC("rules_actions"), desc => ?DESC("rules_actions"),
default => [], default => [],
@ -161,6 +161,14 @@ fields("republish_args") ->
example => <<"${payload}">> example => <<"${payload}">>
} }
)}, )},
{mqtt_properties,
?HOCON(
?R_REF("republish_mqtt_properties"),
#{
desc => ?DESC("republish_args_mqtt_properties"),
default => #{}
}
)},
{user_properties, {user_properties,
?HOCON( ?HOCON(
binary(), binary(),
@ -170,6 +178,19 @@ fields("republish_args") ->
example => <<"${pub_props.'User-Property'}">> example => <<"${pub_props.'User-Property'}">>
} }
)} )}
];
fields("republish_mqtt_properties") ->
[
{'Payload-Format-Indicator',
?HOCON(binary(), #{required => false, desc => ?DESC('Payload-Format-Indicator')})},
{'Message-Expiry-Interval',
?HOCON(binary(), #{required => false, desc => ?DESC('Message-Expiry-Interval')})},
{'Content-Type', ?HOCON(binary(), #{required => false, desc => ?DESC('Content-Type')})},
{'Response-Topic', ?HOCON(binary(), #{required => false, desc => ?DESC('Response-Topic')})},
{'Correlation-Data',
?HOCON(binary(), #{required => false, desc => ?DESC('Correlation-Data')})},
{'Subscription-Identifier',
?HOCON(binary(), #{required => false, desc => ?DESC('Subscription-Identifier')})}
]. ].
desc("rule_engine") -> desc("rule_engine") ->
@ -200,12 +221,31 @@ rule_name() ->
)}. )}.
actions() -> actions() ->
fun
(all_union_members) ->
[ [
binary(), binary(),
?R_REF("builtin_action_republish"), ?R_REF("builtin_action_republish"),
?R_REF("builtin_action_console"), ?R_REF("builtin_action_console"),
?R_REF("user_provided_function") ?R_REF("user_provided_function")
]. ];
({value, V}) ->
case V of
#{<<"function">> := <<"console">>} ->
[?R_REF("builtin_action_console")];
#{<<"function">> := <<"republish">>} ->
[?R_REF("builtin_action_republish")];
#{<<"function">> := <<_/binary>>} ->
[?R_REF("user_provided_function")];
<<_/binary>> ->
[binary()];
_ ->
throw(#{
field_name => actions,
reason => <<"unknown action type">>
})
end
end.
qos() -> qos() ->
?UNION([emqx_schema:qos(), binary()]). ?UNION([emqx_schema:qos(), binary()]).

View File

@ -23,6 +23,7 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -74,6 +75,7 @@ groups() ->
t_sqlselect_inject_props, t_sqlselect_inject_props,
t_sqlselect_01, t_sqlselect_01,
t_sqlselect_02, t_sqlselect_02,
t_sqlselect_03,
t_sqlselect_1, t_sqlselect_1,
t_sqlselect_2, t_sqlselect_2,
t_sqlselect_3, t_sqlselect_3,
@ -520,13 +522,16 @@ t_get_rule_ids_by_action(_) ->
t_ensure_action_removed(_) -> t_ensure_action_removed(_) ->
Id = <<"t_ensure_action_removed">>, Id = <<"t_ensure_action_removed">>,
GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>, GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>,
emqx:update_config( {ok, _} = emqx:update_config(
[rule_engine, rules, Id], [rule_engine, rules, Id],
#{ #{
<<"actions">> => [ <<"actions">> => [
#{<<"function">> => GetSelectedData}, #{<<"function">> => GetSelectedData},
#{<<"function">> => <<"console">>}, #{<<"function">> => <<"console">>},
#{<<"function">> => <<"republish">>}, #{
<<"function">> => <<"republish">>,
<<"args">> => #{<<"topic">> => <<"some/topic">>}
},
<<"mysql:foo">>, <<"mysql:foo">>,
<<"mqtt:bar">> <<"mqtt:bar">>
], ],
@ -1407,6 +1412,189 @@ t_sqlselect_02(_Config) ->
emqtt:stop(Client), emqtt:stop(Client),
delete_rule(TopicRule1). delete_rule(TopicRule1).
t_sqlselect_03(_Config) ->
init_events_counters(),
SQL = "SELECT * FROM \"t/r\" ",
Repub = republish_action(
<<"t/republish">>,
<<"${.}">>,
<<"${pub_props.'User-Property'}">>,
#{
<<"Payload-Format-Indicator">> => <<"${.payload.pfi}">>,
<<"Message-Expiry-Interval">> => <<"${.payload.mei}">>,
<<"Content-Type">> => <<"${.payload.ct}">>,
<<"Response-Topic">> => <<"${.payload.rt}">>,
<<"Correlation-Data">> => <<"${.payload.cd}">>,
<<"Subscription-Identifier">> => <<"${.payload.si}">>
}
),
RepubRaw = emqx_utils_maps:binary_key_map(Repub#{function => <<"republish">>}),
ct:pal("republish action raw:\n ~p", [RepubRaw]),
RuleRaw = #{
<<"sql">> => SQL,
<<"actions">> => [RepubRaw]
},
{ok, _} = emqx_conf:update([rule_engine, rules, ?TMP_RULEID], RuleRaw, #{}),
on_exit(fun() -> emqx_rule_engine:delete_rule(?TMP_RULEID) end),
%% to check what republish is actually producing without loss of information
SQL1 = "select * from \"t/republish\" ",
RuleId0 = ?TMP_RULEID,
RuleId1 = <<RuleId0/binary, "2">>,
{ok, _} = emqx_rule_engine:create_rule(
#{
sql => SQL1,
id => RuleId1,
actions => [
#{
function => <<"emqx_rule_engine_SUITE:action_record_triggered_events">>,
args => #{}
}
]
}
),
on_exit(fun() -> emqx_rule_engine:delete_rule(RuleId1) end),
UserProps = maps:to_list(#{<<"mykey">> => <<"myval">>}),
Payload =
emqx_utils_json:encode(
#{
pfi => 1,
mei => 2,
ct => <<"3">>,
rt => <<"4">>,
cd => <<"5">>,
si => 6,
ta => 7
}
),
{ok, Client} = emqtt:start_link([
{username, <<"emqx">>},
{proto_ver, v5},
{properties, #{'Topic-Alias-Maximum' => 100}}
]),
on_exit(fun() -> emqtt:stop(Client) end),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t/republish">>, 0),
PubProps = #{'User-Property' => UserProps},
ExpectedMQTTProps0 = #{
'Payload-Format-Indicator' => 1,
'Message-Expiry-Interval' => 2,
'Content-Type' => <<"3">>,
'Response-Topic' => <<"4">>,
'Correlation-Data' => <<"5">>,
'Subscription-Identifier' => 6,
%% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
%% so the channel controls those aliases on its own, starting from 1.
'Topic-Alias' => 1,
'User-Property' => UserProps
},
emqtt:publish(Client, <<"t/r">>, PubProps, Payload, [{qos, 0}]),
receive
{publish, #{topic := <<"t/republish">>, properties := Props1}} ->
?assertEqual(ExpectedMQTTProps0, Props1),
ok
after 2000 ->
ct:pal("mailbox:\n ~p", [?drainMailbox()]),
ct:fail("message not republished (l. ~b)", [?LINE])
end,
ExpectedMQTTProps1 = #{
'Payload-Format-Indicator' => 1,
'Message-Expiry-Interval' => 2,
'Content-Type' => <<"3">>,
'Response-Topic' => <<"4">>,
'Correlation-Data' => <<"5">>,
'Subscription-Identifier' => 6,
'User-Property' => maps:from_list(UserProps),
'User-Property-Pairs' => [
#{key => K, value => V}
|| {K, V} <- UserProps
]
},
?assertMatch(
[
{'message.publish', #{
topic := <<"t/republish">>,
pub_props := ExpectedMQTTProps1
}}
],
ets:lookup(events_record_tab, 'message.publish'),
#{expected_props => ExpectedMQTTProps1}
),
ct:pal("testing payload that is not a json object"),
emqtt:publish(Client, <<"t/r">>, PubProps, <<"not-a-map">>, [{qos, 0}]),
ExpectedMQTTProps2 = #{
%% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
%% so the channel controls those aliases on its own, starting from 1.
'Topic-Alias' => 1,
'User-Property' => UserProps
},
receive
{publish, #{topic := T1, properties := Props2}} ->
?assertEqual(ExpectedMQTTProps2, Props2),
%% empty this time, due to topic alias set before
?assertEqual(<<>>, T1),
ok
after 2000 ->
ct:pal("mailbox:\n ~p", [?drainMailbox()]),
ct:fail("message not republished (l. ~b)", [?LINE])
end,
ct:pal("testing payload with some uncoercible keys"),
ets:delete_all_objects(events_record_tab),
Payload1 =
emqx_utils_json:encode(#{
pfi => <<"bad_value1">>,
mei => <<"bad_value2">>,
ct => <<"some_value3">>,
rt => <<"some_value4">>,
cd => <<"some_value5">>,
si => <<"bad_value6">>,
ta => <<"bad_value7">>
}),
emqtt:publish(Client, <<"t/r">>, PubProps, Payload1, [{qos, 0}]),
ExpectedMQTTProps3 = #{
%% currently, `Topic-Alias' is dropped `emqx_message:filter_pub_props',
%% so the channel controls those aliases on its own, starting from 1.
'Topic-Alias' => 1,
'Content-Type' => <<"some_value3">>,
'Response-Topic' => <<"some_value4">>,
'Correlation-Data' => <<"some_value5">>,
'User-Property' => UserProps
},
receive
{publish, #{topic := T2, properties := Props3}} ->
?assertEqual(ExpectedMQTTProps3, Props3),
%% empty this time, due to topic alias set before
?assertEqual(<<>>, T2),
ok
after 2000 ->
ct:pal("mailbox:\n ~p", [?drainMailbox()]),
ct:fail("message not republished (l. ~b)", [?LINE])
end,
ExpectedMQTTProps4 = #{
'Content-Type' => <<"some_value3">>,
'Response-Topic' => <<"some_value4">>,
'Correlation-Data' => <<"some_value5">>,
'User-Property' => maps:from_list(UserProps),
'User-Property-Pairs' => [
#{key => K, value => V}
|| {K, V} <- UserProps
]
},
?assertMatch(
[
{'message.publish', #{
topic := <<"t/republish">>,
pub_props := ExpectedMQTTProps4
}}
],
ets:lookup(events_record_tab, 'message.publish'),
#{expected_props => ExpectedMQTTProps4}
),
ok.
t_sqlselect_1(_Config) -> t_sqlselect_1(_Config) ->
SQL = SQL =
"SELECT json_decode(payload) as p, payload " "SELECT json_decode(payload) as p, payload "
@ -3211,6 +3399,9 @@ republish_action(Topic, Payload) ->
republish_action(Topic, Payload, <<"${user_properties}">>). republish_action(Topic, Payload, <<"${user_properties}">>).
republish_action(Topic, Payload, UserProperties) -> republish_action(Topic, Payload, UserProperties) ->
republish_action(Topic, Payload, UserProperties, _MQTTProperties = #{}).
republish_action(Topic, Payload, UserProperties, MQTTProperties) ->
#{ #{
function => republish, function => republish,
args => #{ args => #{
@ -3218,6 +3409,7 @@ republish_action(Topic, Payload, UserProperties) ->
topic => Topic, topic => Topic,
qos => 0, qos => 0,
retain => false, retain => false,
mqtt_properties => MQTTProperties,
user_properties => UserProperties user_properties => UserProperties
} }
}. }.

View File

@ -0,0 +1,134 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine_schema_tests).
-include_lib("eunit/include/eunit.hrl").
%%===========================================================================
%% Data Section
%%===========================================================================
%% erlfmt-ignore
republish_hocon0() ->
"""
rule_engine.rules.my_rule {
description = \"some desc\"
metadata = {created_at = 1693918992079}
sql = \"select * from \\\"t/topic\\\" \"
actions = [
{function = console}
{ function = republish
args = {
payload = \"${.}\"
qos = 0
retain = false
topic = \"t/repu\"
mqtt_properties {
\"Payload-Format-Indicator\" = \"${.payload.pfi}\"
\"Message-Expiry-Interval\" = \"${.payload.mei}\"
\"Content-Type\" = \"${.payload.ct}\"
\"Response-Topic\" = \"${.payload.rt}\"
\"Correlation-Data\" = \"${.payload.cd}\"
\"Subscription-Identifier\" = \"${.payload.si}\"
}
user_properties = \"${pub_props.'User-Property'}\"
}
},
\"bridges:kafka:kprodu\",
{ function = custom_fn
args = {
actually = not_republish
}
}
]
}
""".
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_rule_engine_schema, Conf).
-define(validation_error(Reason, Value),
{emqx_rule_engine_schema, [
#{
kind := validation_error,
reason := Reason,
value := Value
}
]}
).
-define(ok_config(Cfg), #{
<<"rule_engine">> :=
#{
<<"rules">> :=
#{
<<"my_rule">> :=
Cfg
}
}
}).
%%===========================================================================
%% Test cases
%%===========================================================================
republish_test_() ->
BaseConf = parse(republish_hocon0()),
[
{"base config",
?_assertMatch(
?ok_config(
#{
<<"actions">> := [
#{<<"function">> := console},
#{
<<"function">> := republish,
<<"args">> :=
#{
<<"mqtt_properties">> :=
#{
<<"Payload-Format-Indicator">> := <<_/binary>>,
<<"Message-Expiry-Interval">> := <<_/binary>>,
<<"Content-Type">> := <<_/binary>>,
<<"Response-Topic">> := <<_/binary>>,
<<"Correlation-Data">> := <<_/binary>>,
<<"Subscription-Identifier">> := <<_/binary>>
}
}
},
<<"bridges:kafka:kprodu">>,
#{
<<"function">> := <<"custom_fn">>,
<<"args">> :=
#{
<<"actually">> := <<"not_republish">>
}
}
]
}
),
check(BaseConf)
)}
].

View File

@ -0,0 +1 @@
Added support for defining templates for MQTT publish properties in Republish rule action.

View File

@ -0,0 +1 @@
Fixed an issue where an ill-defined builtin rule action config could be interpreted as a custom user function.

View File

@ -103,6 +103,16 @@ You may also call <code>map_put</code> function like
to inject user properties. to inject user properties.
NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.""" NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not."""
republish_args_user_properties.label:
"""User Properties"""
republish_args_mqtt_properties.desc:
"""From which variable should the MQTT Publish Properties of the message be taken from.
Placeholders like <code>${.payload.content_type}</code> may be used."""
republish_args_mqtt_properties.label:
"""MQTT Properties"""
republish_function.desc: republish_function.desc:
"""Republish the message as a new MQTT message""" """Republish the message as a new MQTT message"""