diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf
index c0009a040..bc5735c67 100644
--- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf
+++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf
@@ -218,7 +218,7 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
of the rule, then the string "undefined" is used.
"""
zh: """
-要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。。
+要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
默认为 ${payload}。 如果从所选结果中未找到变量 ${payload},则使用字符串 "undefined"。
"""
}
@@ -227,6 +227,30 @@ of the rule, then the string "undefined" is used.
zh: "消息负载"
}
}
+ republish_args_user_properties {
+ desc {
+ en: """
+From which variable should the MQTT message's User-Property pairs be taken from.
+The value must be a map.
+You may configure it to ${pub_props.'User-Property'}
or
+use SELECT *,pub_props.'User-Property' as user_properties
+to forward the original user properties to the republished message.
+You may also call map_put
function like
+map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties
+to inject user properties.
+NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.
+"""
+ zh: """
+指定使用哪个变量来填充 MQTT 消息的 User-Property 列表。这个变量的值必须是一个 map 类型。
+可以设置成 ${pub_props.'User-Property'}
或者
+使用 SELECT *,pub_props.'User-Property' as user_properties
来把源 MQTT 消息
+的 User-Property 列表用于填充。
+也可以使用 map_put
函数来添加新的 User-Property,
+map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties
+注意:MQTT 协议允许一个消息中出现多次同一个 property 名,但是 EMQX 的规则引擎不允许。
+"""
+ }
+ }
rule_engine_ignore_sys_message {
desc {
diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl
index dd26f0a29..8971159e7 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl
@@ -37,6 +37,8 @@
-callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().
+-define(ORIGINAL_USER_PROPERTIES, original).
+
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
@@ -57,7 +59,8 @@ pre_process_action_args(
topic := Topic,
qos := QoS,
retain := Retain,
- payload := Payload
+ payload := Payload,
+ user_properties := UserProperties
} = Args
) ->
Args#{
@@ -65,7 +68,8 @@ pre_process_action_args(
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
- payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
+ payload => emqx_plugin_libs_rule:preproc_tmpl(Payload),
+ user_properties => preproc_user_properties(UserProperties)
}
};
pre_process_action_args(_, Args) ->
@@ -93,16 +97,16 @@ republish(
_Args
) ->
?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic});
-%% republish a PUBLISH message
republish(
Selected,
- #{flags := Flags, metadata := #{rule_id := RuleId}},
+ #{metadata := #{rule_id := RuleId}} = Env,
#{
preprocessed_tmpl := #{
qos := QoSTks,
retain := RetainTks,
topic := TopicTks,
- payload := PayloadTks
+ payload := PayloadTks,
+ user_properties := UserPropertiesTks
}
}
) ->
@@ -110,29 +114,22 @@ republish(
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
- PubProps = format_pub_props(maps:get(<<"pub_props">>, Selected, #{})),
- ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
- safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload, PubProps);
-%% in case this is a "$events/" event
-republish(
- Selected,
- #{metadata := #{rule_id := RuleId}},
- #{
- preprocessed_tmpl := #{
- qos := QoSTks,
- retain := RetainTks,
- topic := TopicTks,
- payload := PayloadTks
+ %% 'flags' is set for message re-publishes or message related
+ %% events such as message.acked and message.dropped
+ Flags0 = maps:get(flags, Env, #{}),
+ Flags = Flags0#{retain => Retain},
+ PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
+ ?TRACE(
+ "RULE",
+ "republish_message",
+ #{
+ flags => Flags,
+ topic => Topic,
+ payload => Payload,
+ pub_props => PubProps
}
- }
-) ->
- Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
- Payload = format_msg(PayloadTks, Selected),
- QoS = replace_simple_var(QoSTks, Selected, 0),
- Retain = replace_simple_var(RetainTks, Selected, false),
- PubProps = maps:get(pub_props, Selected, #{}),
- ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
- safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload, PubProps).
+ ),
+ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).
%%--------------------------------------------------------------------
%% internal functions
@@ -192,6 +189,19 @@ preproc_vars(Data) when is_binary(Data) ->
preproc_vars(Data) ->
Data.
+preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
+ %% keep the original
+ %% avoid processing this special variable because
+ %% we do not want to force users to select the value
+ %% the value will be taken from Env.pub_props directly
+ ?ORIGINAL_USER_PROPERTIES;
+preproc_user_properties(<<"${", _/binary>> = V) ->
+ %% use a variable
+ emqx_plugin_libs_rule:preproc_tmpl(V);
+preproc_user_properties(_) ->
+ %% invalid, discard
+ undefined.
+
replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
case Var of
@@ -207,16 +217,14 @@ format_msg([], Selected) ->
format_msg(Tokens, Selected) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
-format_pub_props(Props) ->
- maps:fold(fun format_pub_prop/3, #{}, Props).
-
-format_pub_prop(K, V, Acc) when is_atom(K) ->
- Acc#{K => V};
-format_pub_prop(K, V, Acc) when is_binary(K) ->
- try
- K1 = erlang:binary_to_existing_atom(K),
- format_pub_prop(K1, V, Acc)
- catch
- _:_ ->
- Acc#{K => V}
- end.
+format_pub_props(UserPropertiesTks, Selected, Env) ->
+ UserProperties =
+ case UserPropertiesTks of
+ ?ORIGINAL_USER_PROPERTIES ->
+ maps:get('User-Property', maps:get(pub_props, Env, #{}), #{});
+ undefined ->
+ #{};
+ _ ->
+ replace_simple_var(UserPropertiesTks, Selected, #{})
+ end,
+ #{'User-Property' => UserProperties}.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
index eec41bde8..d299a6bb4 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
@@ -173,6 +173,15 @@ fields("republish_args") ->
default => <<"${payload}">>,
example => <<"${payload}">>
}
+ )},
+ {user_properties,
+ ?HOCON(
+ binary(),
+ #{
+ desc => ?DESC("republish_args_user_properties"),
+ default => <<"${user_properties}">>,
+ example => <<"${pub_props.'User-Property'}">>
+ }
)}
].
diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
index 087a73c34..50bb55fe1 100644
--- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
+++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
@@ -65,7 +65,8 @@ groups() ->
t_sqlselect_1,
t_sqlselect_2,
t_sqlselect_3,
- t_sqlselect_message_publish_event,
+ t_sqlselect_message_publish_event_keep_original_props_1,
+ t_sqlselect_message_publish_event_keep_original_props_2,
t_sqlparse_event_1,
t_sqlparse_event_2,
t_sqlparse_event_3,
@@ -941,8 +942,7 @@ t_sqlselect_001(_Config) ->
t_sqlselect_inject_props(_Config) ->
SQL =
"SELECT json_decode(payload) as p, payload, "
- "map_put('discard', 'discard', pub_props) as pub_props, "
- "map_put('inject_key', 'inject_val', pub_props.'User-Property') as pub_props.'User-Property' "
+ "map_put('inject_key', 'inject_val', user_properties) as user_properties "
"FROM \"t3/#\", \"t1\" "
"WHERE p.x = 1",
Repub = republish_action(<<"t2">>),
@@ -958,13 +958,12 @@ t_sqlselect_inject_props(_Config) ->
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]),
- ct:sleep(100),
receive
{publish, #{topic := T, payload := Payload, properties := Props2}} ->
?assertEqual(Props, Props2),
?assertEqual(<<"t2">>, T),
?assertEqual(<<"{\"x\":1}">>, Payload)
- after 1000 ->
+ after 2000 ->
ct:fail(wait_for_t2)
end,
emqtt:stop(Client),
@@ -972,10 +971,10 @@ t_sqlselect_inject_props(_Config) ->
t_sqlselect_01(_Config) ->
SQL =
- "SELECT json_decode(payload) as p, payload, pub_props "
+ "SELECT json_decode(payload) as p, payload "
"FROM \"t3/#\", \"t1\" "
"WHERE p.x = 1",
- Repub = republish_action(<<"t2">>),
+ Repub = republish_action(<<"t2">>, <<"${payload}">>, <<"${pub_props.'User-Property'}">>),
{ok, TopicRule1} = emqx_rule_engine:create_rule(
#{
sql => SQL,
@@ -988,12 +987,11 @@ t_sqlselect_01(_Config) ->
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
- ct:sleep(100),
receive
{publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"t2">>, T),
?assertEqual(<<"{\"x\":1}">>, Payload)
- after 1000 ->
+ after 2000 ->
ct:fail(wait_for_t2)
end,
@@ -1001,7 +999,7 @@ t_sqlselect_01(_Config) ->
receive
{publish, #{topic := <<"t2">>, payload := _}} ->
ct:fail(unexpected_t2)
- after 1000 ->
+ after 2000 ->
ok
end,
@@ -1011,8 +1009,8 @@ t_sqlselect_01(_Config) ->
?assertEqual(Props, Props2),
?assertEqual(<<"t2">>, T3),
?assertEqual(<<"{\"x\":1}">>, Payload3)
- after 1000 ->
- ct:fail(wait_for_t2)
+ after 2000 ->
+ ct:fail(wait_for_t3)
end,
emqtt:stop(Client),
@@ -1080,13 +1078,12 @@ t_sqlselect_1(_Config) ->
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
- ct:sleep(200),
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0),
receive
{publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"t2">>, T),
?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload)
- after 1000 ->
+ after 2000 ->
ct:fail(wait_for_t2)
end,
@@ -1149,14 +1146,13 @@ t_sqlselect_3(_Config) ->
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
- ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
{ok, _} = emqtt:connect(Client1),
receive
{publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"t2">>, T),
?assertEqual(<<"clientid=c_emqx1">>, Payload)
- after 1000 ->
+ after 2000 ->
ct:fail(wait_for_t2)
end,
@@ -1171,11 +1167,51 @@ t_sqlselect_3(_Config) ->
emqtt:stop(Client),
delete_rule(TopicRule).
-t_sqlselect_message_publish_event(_Config) ->
+t_sqlselect_message_publish_event_keep_original_props_1(_Config) ->
%% republish the client.connected msg
Topic = <<"foo/bar/1">>,
SQL = <<
- "SELECT clientid, pub_props "
+ "SELECT clientid "
+ "FROM \"$events/message_dropped\" "
+ >>,
+
+ %"WHERE topic = \"", Topic/binary, "\"">>,
+ Repub = republish_action(
+ <<"t2">>,
+ <<"clientid=${clientid}">>,
+ <<"${pub_props.'User-Property'}">>
+ ),
+ {ok, TopicRule} = emqx_rule_engine:create_rule(
+ #{
+ sql => SQL,
+ id => ?TMP_RULEID,
+ actions => [Repub]
+ }
+ ),
+ {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(Client1),
+ {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
+ {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(Client2),
+ Props = user_properties(#{<<"mykey">> => <<"111111">>}),
+ emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
+ receive
+ {publish, #{topic := T, payload := Payload, properties := Props1}} ->
+ ?assertEqual(Props1, Props),
+ ?assertEqual(<<"t2">>, T),
+ ?assertEqual(<<"clientid=pub-02">>, Payload)
+ after 2000 ->
+ ct:fail(wait_for_t2)
+ end,
+ emqtt:stop(Client2),
+ emqtt:stop(Client1),
+ delete_rule(TopicRule).
+
+t_sqlselect_message_publish_event_keep_original_props_2(_Config) ->
+ %% republish the client.connected msg
+ Topic = <<"foo/bar/1">>,
+ SQL = <<
+ "SELECT clientid, pub_props.'User-Property' as user_properties "
"FROM \"$events/message_dropped\" "
>>,
@@ -1191,7 +1227,6 @@ t_sqlselect_message_publish_event(_Config) ->
{ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1),
{ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
- ct:sleep(200),
{ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client2),
Props = user_properties(#{<<"mykey">> => <<"222222222222">>}),
@@ -1201,7 +1236,7 @@ t_sqlselect_message_publish_event(_Config) ->
?assertEqual(Props1, Props),
?assertEqual(<<"t2">>, T),
?assertEqual(<<"clientid=pub-02">>, Payload)
- after 1000 ->
+ after 2000 ->
ct:fail(wait_for_t2)
end,
emqtt:stop(Client2),
@@ -2553,10 +2588,20 @@ t_get_basic_usage_info_1(_Config) ->
republish_action(Topic) ->
republish_action(Topic, <<"${payload}">>).
+
republish_action(Topic, Payload) ->
+ republish_action(Topic, Payload, <<"${user_properties}">>).
+
+republish_action(Topic, Payload, UserProperties) ->
#{
function => republish,
- args => #{payload => Payload, topic => Topic, qos => 0, retain => false}
+ args => #{
+ payload => Payload,
+ topic => Topic,
+ qos => 0,
+ retain => false,
+ user_properties => UserProperties
+ }
}.
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->