diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx/src/emqx_misc.erl index 674465e6a..9683b1a8b 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx/src/emqx_misc.erl @@ -54,7 +54,8 @@ pmap/3, readable_error_msg/1, safe_to_existing_atom/1, - safe_to_existing_atom/2 + safe_to_existing_atom/2, + pub_props_to_packet/1 ]). -export([ @@ -568,3 +569,17 @@ ipv6_probe_test() -> end. -endif. + +pub_props_to_packet(Properties) -> + F = fun + ('User-Property', M) -> + case is_map(M) andalso map_size(M) > 0 of + true -> {true, maps:to_list(M)}; + false -> false + end; + ('User-Property-Pairs', _) -> + false; + (_, _) -> + true + end, + maps:filtermap(F, Properties). diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 64003209f..547a37b8e 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "An OTP application"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 43700506b..469dd952b 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -77,17 +77,20 @@ to_remote_msg(MapMsg, #{ Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), + PubProps = maps:get(pub_props, MapMsg, #{}), #mqtt_msg{ qos = QoS, retain = Retain, topic = topic(Mountpoint, Topic), - props = #{}, + props = emqx_misc:pub_props_to_packet(PubProps), payload = Payload }; to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> Msg#message{topic = topic(Mountpoint, Topic)}. %% published from remote node over a MQTT connection +to_broker_msg(Msg, Vars, undefined) -> + to_broker_msg(Msg, Vars, #{}); to_broker_msg( #{dup := Dup} = MapMsg, #{ @@ -103,8 +106,9 @@ to_broker_msg( Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), + PubProps = maps:get(pub_props, MapMsg, #{}), set_headers( - Props, + Props#{properties => emqx_misc:pub_props_to_packet(PubProps)}, emqx_message:set_flags( #{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload) @@ -151,8 +155,6 @@ estimate_size(#{topic := Topic, payload := Payload}) -> estimate_size(Term) -> erlang:external_size(Term). -set_headers(undefined, Msg) -> - Msg; set_headers(Val, Msg) -> emqx_message:set_headers(Val, Msg). topic(undefined, Topic) -> Topic; diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 998fc1a5e..dd26f0a29 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -110,8 +110,9 @@ republish( Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), + PubProps = format_pub_props(maps:get(<<"pub_props">>, Selected, #{})), ?TRACE("RULE", "republish_message", #{topic => Topic, payload => Payload}), - safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload); + safe_publish(RuleId, Topic, QoS, Flags#{retain => Retain}, Payload, PubProps); %% in case this is a "$events/" event republish( Selected, @@ -129,8 +130,9 @@ republish( Payload = format_msg(PayloadTks, Selected), QoS = replace_simple_var(QoSTks, Selected, 0), Retain = replace_simple_var(RetainTks, Selected, false), + PubProps = maps:get(pub_props, Selected, #{}), ?TRACE("RULE", "republish_message_with_flags", #{topic => Topic, payload => Payload}), - safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload). + safe_publish(RuleId, Topic, QoS, #{retain => Retain}, Payload, PubProps). %%-------------------------------------------------------------------- %% internal functions @@ -168,13 +170,16 @@ pre_process_args(Mod, Func, Args) -> false -> Args end. -safe_publish(RuleId, Topic, QoS, Flags, Payload) -> +safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) -> Msg = #message{ id = emqx_guid:gen(), qos = QoS, from = RuleId, flags = Flags, - headers = #{republish_by => RuleId}, + headers = #{ + republish_by => RuleId, + properties => emqx_misc:pub_props_to_packet(PubProps) + }, topic = Topic, payload = Payload, timestamp = erlang:system_time(millisecond) @@ -201,3 +206,17 @@ format_msg([], Selected) -> emqx_json:encode(Selected); format_msg(Tokens, Selected) -> emqx_plugin_libs_rule:proc_tmpl(Tokens, Selected). + +format_pub_props(Props) -> + maps:fold(fun format_pub_prop/3, #{}, Props). + +format_pub_prop(K, V, Acc) when is_atom(K) -> + Acc#{K => V}; +format_pub_prop(K, V, Acc) when is_binary(K) -> + try + K1 = erlang:binary_to_existing_atom(K), + format_pub_prop(K1, V, Acc) + catch + _:_ -> + Acc#{K => V} + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index 6bb9ad010..6419e4184 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -2,7 +2,7 @@ {application, emqx_rule_engine, [ {description, "EMQX Rule Engine"}, % strict semver, bump manually! - {vsn, "5.0.3"}, + {vsn, "5.0.4"}, {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_engine]}, {applications, [kernel, stdlib, rulesql, getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 2935aeeb9..b80b9777a 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -1060,7 +1060,7 @@ printable_maps(Headers) -> (K, V, AccIn) -> AccIn#{K => V} end, - #{}, + #{'User-Property' => #{}}, Headers ). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 17fe1a36c..f292afbcb 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -59,11 +59,13 @@ groups() -> t_sqlselect_0, t_sqlselect_00, t_sqlselect_001, + t_sqlselect_inject_props, t_sqlselect_01, t_sqlselect_02, t_sqlselect_1, t_sqlselect_2, t_sqlselect_3, + t_sqlselect_message_publish_event, t_sqlparse_event_1, t_sqlparse_event_2, t_sqlparse_event_3, @@ -936,6 +938,38 @@ t_sqlselect_001(_Config) -> ) ). +t_sqlselect_inject_props(_Config) -> + SQL = + "SELECT json_decode(payload) as p, payload, " + "map_put('discard', 'discard', pub_props) as pub_props, " + "map_put('inject_key', 'inject_val', pub_props.'User-Property') as pub_props.'User-Property' " + "FROM \"t3/#\", \"t1\" " + "WHERE p.x = 1", + Repub = republish_action(<<"t2">>), + {ok, TopicRule1} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [Repub] + } + ), + Props = user_properties(#{<<"inject_key">> => <<"inject_val">>}), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), + emqtt:publish(Client, <<"t1">>, #{}, <<"{\"x\":1}">>, [{qos, 0}]), + ct:sleep(100), + receive + {publish, #{topic := T, payload := Payload, properties := Props2}} -> + ?assertEqual(Props, Props2), + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"{\"x\":1}">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, + emqtt:stop(Client), + delete_rule(TopicRule1). + t_sqlselect_01(_Config) -> SQL = "SELECT json_decode(payload) as p, payload " @@ -949,10 +983,11 @@ t_sqlselect_01(_Config) -> actions => [Repub] } ), - {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + Props = user_properties(#{<<"mykey">> => <<"myval">>}), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}, {proto_ver, v5}]), {ok, _} = emqtt:connect(Client), {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0), - emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0), + emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":1}">>, [{qos, 0}]), ct:sleep(100), receive {publish, #{topic := T, payload := Payload}} -> @@ -962,7 +997,7 @@ t_sqlselect_01(_Config) -> ct:fail(wait_for_t2) end, - emqtt:publish(Client, <<"t1">>, <<"{\"x\":2}">>, 0), + emqtt:publish(Client, <<"t1">>, Props, <<"{\"x\":2}">>, [{qos, 0}]), receive {publish, #{topic := <<"t2">>, payload := _}} -> ct:fail(unexpected_t2) @@ -970,9 +1005,10 @@ t_sqlselect_01(_Config) -> ok end, - emqtt:publish(Client, <<"t3/a">>, <<"{\"x\":1}">>, 0), + emqtt:publish(Client, <<"t3/a">>, Props, <<"{\"x\":1}">>, [{qos, 0}]), receive - {publish, #{topic := T3, payload := Payload3}} -> + {publish, #{topic := T3, payload := Payload3, properties := Props2}} -> + ?assertEqual(Props, Props2), ?assertEqual(<<"t2">>, T3), ?assertEqual(<<"{\"x\":1}">>, Payload3) after 1000 -> @@ -1135,6 +1171,43 @@ t_sqlselect_3(_Config) -> emqtt:stop(Client), delete_rule(TopicRule). +t_sqlselect_message_publish_event(_Config) -> + %% republish the client.connected msg + Topic = <<"foo/bar/1">>, + SQL = << + "SELECT clientid, pub_props " + "FROM \"$events/message_dropped\" " + >>, + + %"WHERE topic = \"", Topic/binary, "\"">>, + Repub = republish_action(<<"t2">>, <<"clientid=${clientid}">>), + {ok, TopicRule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [Repub] + } + ), + {ok, Client1} = emqtt:start_link([{clientid, <<"sub-01">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client1), + {ok, _, _} = emqtt:subscribe(Client1, <<"t2">>, 1), + ct:sleep(200), + {ok, Client2} = emqtt:start_link([{clientid, <<"pub-02">>}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(Client2), + Props = user_properties(#{<<"mykey">> => <<"222222222222">>}), + emqtt:publish(Client2, Topic, Props, <<"{\"x\":1}">>, [{qos, 1}]), + receive + {publish, #{topic := T, payload := Payload, properties := Props1}} -> + ?assertEqual(Props1, Props), + ?assertEqual(<<"t2">>, T), + ?assertEqual(<<"clientid=pub-02">>, Payload) + after 1000 -> + ct:fail(wait_for_t2) + end, + emqtt:stop(Client2), + emqtt:stop(Client1), + delete_rule(TopicRule). + t_sqlparse_event_1(_Config) -> Sql = "select topic as tp " @@ -2869,6 +2942,9 @@ verify_ipaddr(IPAddrS) -> init_events_counters() -> ets:new(events_record_tab, [named_table, bag, public]). +user_properties(PairsMap) -> + #{'User-Property' => maps:to_list(PairsMap)}. + %%------------------------------------------------------------------------------ %% Start Apps %%------------------------------------------------------------------------------ diff --git a/changes/v5.0.11-en.md b/changes/v5.0.11-en.md index 59ef42b4d..1e4cf8f01 100644 --- a/changes/v5.0.11-en.md +++ b/changes/v5.0.11-en.md @@ -19,6 +19,7 @@ - Improve node name generation rules to avoid potential atom table overflow risk [#9387](https://github.com/emqx/emqx/pull/9387). +- Keep MQTT v5 User-Properties from bridge ingested MQTT messsages to bridge target [#9240](https://github.com/emqx/emqx/pull/9240). ## Bug fixes diff --git a/changes/v5.0.11-zh.md b/changes/v5.0.11-zh.md index b48c09e7a..256022c53 100644 --- a/changes/v5.0.11-zh.md +++ b/changes/v5.0.11-zh.md @@ -17,6 +17,8 @@ - 改进了节点名称生成规则,以避免潜在的原子表溢出风险 [#9387](https://github.com/emqx/emqx/pull/9387)。 +- 为桥接收到的 MQTT v5 消息再转发时保留 User-Properties [#9240](https://github.com/emqx/emqx/pull/9240)。 + ## 修复 - 修复创建追踪日志时偶尔会报`end_at time has already passed`错误,导致创建失败。[#9303](https://github.com/emqx/emqx/pull/9303)