diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl
index 674465e6a..9683b1a8b 100644
--- a/apps/emqx/src/emqx_misc.erl
+++ b/apps/emqx/src/emqx_misc.erl
@@ -54,7 +54,8 @@
pmap/3,
readable_error_msg/1,
safe_to_existing_atom/1,
- safe_to_existing_atom/2
+ safe_to_existing_atom/2,
+ pub_props_to_packet/1
]).
-export([
@@ -568,3 +569,17 @@ ipv6_probe_test() ->
end.
-endif.
+
+pub_props_to_packet(Properties) ->
+ F = fun
+ ('User-Property', M) ->
+ case is_map(M) andalso map_size(M) > 0 of
+ true -> {true, maps:to_list(M)};
+ false -> false
+ end;
+ ('User-Property-Pairs', _) ->
+ false;
+ (_, _) ->
+ true
+ end,
+ maps:filtermap(F, Properties).
diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src
index 64003209f..547a37b8e 100644
--- a/apps/emqx_connector/src/emqx_connector.app.src
+++ b/apps/emqx_connector/src/emqx_connector.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "An OTP application"},
- {vsn, "0.1.7"},
+ {vsn, "0.1.8"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
index 43700506b..469dd952b 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
@@ -77,17 +77,20 @@ to_remote_msg(MapMsg, #{
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
+ PubProps = maps:get(pub_props, MapMsg, #{}),
#mqtt_msg{
qos = QoS,
retain = Retain,
topic = topic(Mountpoint, Topic),
- props = #{},
+ props = emqx_misc:pub_props_to_packet(PubProps),
payload = Payload
};
to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}.
%% published from remote node over a MQTT connection
+to_broker_msg(Msg, Vars, undefined) ->
+ to_broker_msg(Msg, Vars, #{});
to_broker_msg(
#{dup := Dup} = MapMsg,
#{
@@ -103,8 +106,9 @@ to_broker_msg(
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
+ PubProps = maps:get(pub_props, MapMsg, #{}),
set_headers(
- Props,
+ Props#{properties => emqx_misc:pub_props_to_packet(PubProps)},
emqx_message:set_flags(
#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
@@ -151,8 +155,6 @@ estimate_size(#{topic := Topic, payload := Payload}) ->
estimate_size(Term) ->
erlang:external_size(Term).
-set_headers(undefined, Msg) ->
- Msg;
set_headers(Val, Msg) ->
emqx_message:set_headers(Val, Msg).
topic(undefined, Topic) -> Topic;
diff --git a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf
index c0009a040..bc5735c67 100644
--- a/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf
+++ b/apps/emqx_rule_engine/i18n/emqx_rule_engine_schema.conf
@@ -218,7 +218,7 @@ Defaults to ${payload}. If variable ${payload} is not found from the selected re
of the rule, then the string "undefined" is used.
"""
zh: """
-要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。。
+要重新发布的消息的有效负载。允许使用带有变量的模板,请参阅“republish_args”的描述。
默认为 ${payload}。 如果从所选结果中未找到变量 ${payload},则使用字符串 "undefined"。
"""
}
@@ -227,6 +227,30 @@ of the rule, then the string "undefined" is used.
zh: "消息负载"
}
}
+ republish_args_user_properties {
+ desc {
+ en: """
+From which variable should the MQTT message's User-Property pairs be taken from.
+The value must be a map.
+You may configure it to ${pub_props.'User-Property'}
or
+use SELECT *,pub_props.'User-Property' as user_properties
+to forward the original user properties to the republished message.
+You may also call map_put
function like
+map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties
+to inject user properties.
+NOTE: MQTT spec allows duplicated user property names, but EMQX Rule-Engine does not.
+"""
+ zh: """
+指定使用哪个变量来填充 MQTT 消息的 User-Property 列表。这个变量的值必须是一个 map 类型。
+可以设置成 ${pub_props.'User-Property'}
或者
+使用 SELECT *,pub_props.'User-Property' as user_properties
来把源 MQTT 消息
+的 User-Property 列表用于填充。
+也可以使用 map_put
函数来添加新的 User-Property,
+map_put('my-prop-name', 'my-prop-value', user_properties) as user_properties
+注意:MQTT 协议允许一个消息中出现多次同一个 property 名,但是 EMQX 的规则引擎不允许。
+"""
+ }
+ }
rule_engine_ignore_sys_message {
desc {
diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl
index 998fc1a5e..8971159e7 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl
@@ -37,6 +37,8 @@
-callback pre_process_action_args(FuncName :: atom(), action_fun_args()) -> action_fun_args().
+-define(ORIGINAL_USER_PROPERTIES, original).
+
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
@@ -57,7 +59,8 @@ pre_process_action_args(
topic := Topic,
qos := QoS,
retain := Retain,
- payload := Payload
+ payload := Payload,
+ user_properties := UserProperties
} = Args
) ->
Args#{
@@ -65,7 +68,8 @@ pre_process_action_args(
topic => emqx_plugin_libs_rule:preproc_tmpl(Topic),
qos => preproc_vars(QoS),
retain => preproc_vars(Retain),
- payload => emqx_plugin_libs_rule:preproc_tmpl(Payload)
+ payload => emqx_plugin_libs_rule:preproc_tmpl(Payload),
+ user_properties => preproc_user_properties(UserProperties)
}
};
pre_process_action_args(_, Args) ->
@@ -93,16 +97,16 @@ republish(
_Args
) ->
?SLOG(error, #{msg => "recursive_republish_detected", topic => Topic});
-%% republish a PUBLISH message
republish(
Selected,
- #{flags := Flags, metadata := #{rule_id := RuleId}},
+ #{metadata := #{rule_id := RuleId}} = Env,
#{
preprocessed_tmpl := #{
qos := QoSTks,
retain := RetainTks,
topic := TopicTks,
- payload := PayloadTks
+ payload := PayloadTks,
+ user_properties := UserPropertiesTks
}
}
) ->
@@ -110,27 +114,22 @@ republish(
Payload = format_msg(PayloadTks, Selected),
QoS = replace_simple_var(QoSTks, Selected, 0),
Retain = replace_simple_var(RetainTks, Selected, false),
- ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}),
- safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload);
-%% in case this is a "$events/" event
-republish(
- Selected,
- #{metadata := #{rule_id := RuleId}},
- #{
- preprocessed_tmpl := #{
- qos := QoSTks,
- retain := RetainTks,
- topic := TopicTks,
- payload := PayloadTks
+ %% 'flags' is set for message re-publishes or message related
+ %% events such as message.acked and message.dropped
+ Flags0 = maps:get(flags, Env, #{}),
+ Flags = Flags0#{retain => Retain},
+ PubProps = format_pub_props(UserPropertiesTks, Selected, Env),
+ ?TRACE(
+ "RULE",
+ "republish_message",
+ #{
+ flags => Flags,
+ topic => Topic,
+ payload => Payload,
+ pub_props => PubProps
}
- }
-) ->
- Topic = emqx_plugin_libs_rule:proc_tmpl(TopicTks, Selected),
- Payload = format_msg(PayloadTks, Selected),
- QoS = replace_simple_var(QoSTks, Selected, 0),
- Retain = replace_simple_var(RetainTks, Selected, false),
- ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}),
- safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload).
+ ),
+ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).
%%--------------------------------------------------------------------
%% internal functions
@@ -168,13 +167,16 @@ pre_process_args(Mod, Func, Args) ->
false -> Args
end.
-safe_publish(RuleId, Topic, QoS, Flags, Payload) ->
+safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
Msg = #message{
id = emqx_guid:gen(),
qos = QoS,
from = RuleId,
flags = Flags,
- headers = #{republish_by => RuleId},
+ headers = #{
+ republish_by => RuleId,
+ properties => emqx_misc:pub_props_to_packet(PubProps)
+ },
topic = Topic,
payload = Payload,
timestamp = erlang:system_time(millisecond)
@@ -187,6 +189,19 @@ preproc_vars(Data) when is_binary(Data) ->
preproc_vars(Data) ->
Data.
+preproc_user_properties(<<"${pub_props.'User-Property'}">>) ->
+ %% keep the original
+ %% avoid processing this special variable because
+ %% we do not want to force users to select the value
+ %% the value will be taken from Env.pub_props directly
+ ?ORIGINAL_USER_PROPERTIES;
+preproc_user_properties(<<"${", _/binary>> = V) ->
+ %% use a variable
+ emqx_plugin_libs_rule:preproc_tmpl(V);
+preproc_user_properties(_) ->
+ %% invalid, discard
+ undefined.
+
replace_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
case Var of
@@ -201,3 +216,15 @@ format_msg([], Selected) ->
emqx_json:encode(Selected);
format_msg(Tokens, Selected) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected).
+
+format_pub_props(UserPropertiesTks, Selected, Env) ->
+ UserProperties =
+ case UserPropertiesTks of
+ ?ORIGINAL_USER_PROPERTIES ->
+ maps:get('User-Property', maps:get(pub_props, Env, #{}), #{});
+ undefined ->
+ #{};
+ _ ->
+ replace_simple_var(UserPropertiesTks, Selected, #{})
+ end,
+ #{'User-Property' => UserProperties}.
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
index 6bb9ad010..6419e4184 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src
@@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
- {vsn, "5.0.3"},
+ {vsn, "5.0.4"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt]},
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
index eec41bde8..d299a6bb4 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
@@ -173,6 +173,15 @@ fields("republish_args") ->
default => <<"${payload}">>,
example => <<"${payload}">>
}
+ )},
+ {user_properties,
+ ?HOCON(
+ binary(),
+ #{
+ desc => ?DESC("republish_args_user_properties"),
+ default => <<"${user_properties}">>,
+ example => <<"${pub_props.'User-Property'}">>
+ }
)}
].
diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl
index 2935aeeb9..b80b9777a 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_events.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl
@@ -1060,7 +1060,7 @@ printable_maps(Headers) ->
(K, V, AccIn) ->
AccIn#{K => V}
end,
- #{},
+ #{'User-Property' => #{}},
Headers
).
diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
index 17fe1a36c..50bb55fe1 100644
--- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
+++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
@@ -59,11 +59,14 @@ groups() ->
t_sqlselect_0,
t_sqlselect_00,
t_sqlselect_001,
+ t_sqlselect_inject_props,
t_sqlselect_01,
t_sqlselect_02,
t_sqlselect_1,
t_sqlselect_2,
t_sqlselect_3,
+ t_sqlselect_message_publish_event_keep_original_props_1,
+ t_sqlselect_message_publish_event_keep_original_props_2,
t_sqlparse_event_1,
t_sqlparse_event_2,
t_sqlparse_event_3,
@@ -936,9 +939,10 @@ t_sqlselect_001(_Config) ->
)
).
-t_sqlselect_01(_Config) ->
+t_sqlselect_inject_props(_Config) ->
SQL =
- "SELECT json_decode(payload) as p, payload "
+ "SELECT json_decode(payload) as p, payload, "
+ "map_put('inject_key', 'inject_val', user_properties) as user_properties "
"FROM \"t3/#\", \"t1\" "
"WHERE p.x = 1",
Repub = republish_action(<<"t2">>),
@@ -949,34 +953,64 @@ t_sqlselect_01(_Config) ->
actions => [Repub]
}
),
- {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
+ Props = user_properties(#{<<"inject_key">> => <<"inject_val">>}),
+ {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
- emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
- ct:sleep(100),
+ emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]),
+ receive
+ {publish, #{topic := T, payload := Payload, properties := Props2}} ->
+ ?assertEqual(Props, Props2),
+ ?assertEqual(<<"t2">>, T),
+ ?assertEqual(<<"{\"x\":1}">>, Payload)
+ after 2000 ->
+ ct:fail(wait_for_t2)
+ end,
+ emqtt:stop(Client),
+ delete_rule(TopicRule1).
+
+t_sqlselect_01(_Config) ->
+ SQL =
+ "SELECT json_decode(payload) as p, payload "
+ "FROM \"t3/#\", \"t1\" "
+ "WHERE p.x = 1",
+ Repub = republish_action(<<"t2">>, <<"${payload}">>, <<"${pub_props.'User-Property'}">>),
+ {ok, TopicRule1} = emqx_rule_engine:create_rule(
+ #{
+ sql => SQL,
+ id => ?TMP_RULEID,
+ actions => [Repub]
+ }
+ ),
+ Props = user_properties(#{<<"mykey">> => <<"myval">>}),
+ {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(Client),
+ {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
+ emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
receive
{publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"t2">>, T),
?assertEqual(<<"{\"x\":1}">>, Payload)
- after 1000 ->
+ after 2000 ->
ct:fail(wait_for_t2)
end,
- emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0),
+ emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":2}">>, [{qos, 0}]),
receive
{publish, #{topic := <<"t2">>, payload := _}} ->
ct:fail(unexpected_t2)
- after 1000 ->
+ after 2000 ->
ok
end,
- emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0),
+ emqtt:publish(Client, <<"t3/a">>, Props, <<"{\"x\":1}">>, [{qos, 0}]),
receive
- {publish, #{topic := T3, payload := Payload3}} ->
+ {publish, #{topic := T3, payload := Payload3, properties := Props2}} ->
+ ?assertEqual(Props, Props2),
?assertEqual(<<"t2">>, T3),
?assertEqual(<<"{\"x\":1}">>, Payload3)
- after 1000 ->
- ct:fail(wait_for_t2)
+ after 2000 ->
+ ct:fail(wait_for_t3)
end,
emqtt:stop(Client),
@@ -1044,13 +1078,12 @@ t_sqlselect_1(_Config) ->
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
- ct:sleep(200),
emqtt:publish(Client, <<"t1">>, <<"{\"x\":1,\"y\":2}">>, 0),
receive
{publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"t2">>, T),
?assertEqual(<<"{\"x\":1,\"y\":2}">>, Payload)
- after 1000 ->
+ after 2000 ->
ct:fail(wait_for_t2)
end,
@@ -1113,14 +1146,13 @@ t_sqlselect_3(_Config) ->
{ok, Client} = emqtt:start_link([{clientid, <<"emqx0">>}, {username, <<"emqx0">>}]),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
- ct:sleep(200),
{ok, Client1} = emqtt:start_link([{clientid, <<"c_emqx1">>}, {username, <<"emqx1">>}]),
{ok, _} = emqtt:connect(Client1),
receive
{publish, #{topic := T, payload := Payload}} ->
?assertEqual(<<"t2">>, T),
?assertEqual(<<"clientid=c_emqx1">>, Payload)
- after 1000 ->
+ after 2000 ->
ct:fail(wait_for_t2)
end,
@@ -1135,6 +1167,82 @@ t_sqlselect_3(_Config) ->
emqtt:stop(Client),
delete_rule(TopicRule).
+t_sqlselect_message_publish_event_keep_original_props_1(_Config) ->
+ %% republish the client.connected msg
+ Topic = <<"foo/bar/1">>,
+ SQL = <<
+ "SELECT clientid "
+ "FROM \"$events/message_dropped\" "
+ >>,
+
+ %"WHERE topic = \"", Topic/binary, "\"">>,
+ Repub = republish_action(
+ <<"t2">>,
+ <<"clientid=${clientid}">>,
+ <<"${pub_props.'User-Property'}">>
+ ),
+ {ok, TopicRule} = emqx_rule_engine:create_rule(
+ #{
+ sql => SQL,
+ id => ?TMP_RULEID,
+ actions => [Repub]
+ }
+ ),
+ {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(Client1),
+ {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
+ {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(Client2),
+ Props = user_properties(#{<<"mykey">> => <<"111111">>}),
+ emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
+ receive
+ {publish, #{topic := T, payload := Payload, properties := Props1}} ->
+ ?assertEqual(Props1, Props),
+ ?assertEqual(<<"t2">>, T),
+ ?assertEqual(<<"clientid=pub-02">>, Payload)
+ after 2000 ->
+ ct:fail(wait_for_t2)
+ end,
+ emqtt:stop(Client2),
+ emqtt:stop(Client1),
+ delete_rule(TopicRule).
+
+t_sqlselect_message_publish_event_keep_original_props_2(_Config) ->
+ %% republish the client.connected msg
+ Topic = <<"foo/bar/1">>,
+ SQL = <<
+ "SELECT clientid, pub_props.'User-Property' as user_properties "
+ "FROM \"$events/message_dropped\" "
+ >>,
+
+ %"WHERE topic = \"", Topic/binary, "\"">>,
+ Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>),
+ {ok, TopicRule} = emqx_rule_engine:create_rule(
+ #{
+ sql => SQL,
+ id => ?TMP_RULEID,
+ actions => [Repub]
+ }
+ ),
+ {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(Client1),
+ {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1),
+ {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]),
+ {ok, _} = emqtt:connect(Client2),
+ Props = user_properties(#{<<"mykey">> => <<"222222222222">>}),
+ emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]),
+ receive
+ {publish, #{topic := T, payload := Payload, properties := Props1}} ->
+ ?assertEqual(Props1, Props),
+ ?assertEqual(<<"t2">>, T),
+ ?assertEqual(<<"clientid=pub-02">>, Payload)
+ after 2000 ->
+ ct:fail(wait_for_t2)
+ end,
+ emqtt:stop(Client2),
+ emqtt:stop(Client1),
+ delete_rule(TopicRule).
+
t_sqlparse_event_1(_Config) ->
Sql =
"select topic as tp "
@@ -2480,10 +2588,20 @@ t_get_basic_usage_info_1(_Config) ->
republish_action(Topic) ->
republish_action(Topic, <<"${payload}">>).
+
republish_action(Topic, Payload) ->
+ republish_action(Topic, Payload, <<"${user_properties}">>).
+
+republish_action(Topic, Payload, UserProperties) ->
#{
function => republish,
- args => #{payload => Payload, topic => Topic, qos => 0, retain => false}
+ args => #{
+ payload => Payload,
+ topic => Topic,
+ qos => 0,
+ retain => false,
+ user_properties => UserProperties
+ }
}.
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
@@ -2869,6 +2987,9 @@ verify_ipaddr(IPAddrS) ->
init_events_counters() ->
ets:new(events_record_tab, [named_table, bag, public]).
+user_properties(PairsMap) ->
+ #{'User-Property' => maps:to_list(PairsMap)}.
+
%%------------------------------------------------------------------------------
%% Start Apps
%%------------------------------------------------------------------------------
diff --git a/changes/v5.0.11-en.md b/changes/v5.0.11-en.md
index def22e17c..e53c5785e 100644
--- a/changes/v5.0.11-en.md
+++ b/changes/v5.0.11-en.md
@@ -21,6 +21,8 @@
- Set the default value for the maximum level of a topic to 128 [#9406](https://github.com/emqx/emqx/pull/9406).
+- Keep MQTT v5 User-Property pairs from bridge ingested MQTT messsages to bridge target [#9398](https://github.com/emqx/emqx/pull/9398).
+
## Bug fixes
- Fix `ssl.existingName` option of helm chart not working [#9307](https://github.com/emqx/emqx/issues/9307).
diff --git a/changes/v5.0.11-zh.md b/changes/v5.0.11-zh.md
index 7a4e770f9..3ea516dad 100644
--- a/changes/v5.0.11-zh.md
+++ b/changes/v5.0.11-zh.md
@@ -19,6 +19,8 @@
- 将主题的最大层级限制的默认值设置为128 [#9406](https://github.com/emqx/emqx/pull/9406)。
+- 为桥接收到的 MQTT v5 消息再转发时保留 User-Property 列表 [#9398](https://github.com/emqx/emqx/pull/9398)。
+
## 修复
- 修复 helm chart 的 `ssl.existingName` 选项不起作用 [#9307](https://github.com/emqx/emqx/issues/9307)。