From f9276edaf84ab5b3ca560b71c8cae4d5ef0f8d5d Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 2 Nov 2022 10:03:47 +0800 Subject: [PATCH] fix: mqtt_bridge and republish validate msg topic --- .../src/emqx_bridge_mqtt_actions.erl | 13 ++++++++++--- apps/emqx_rule_engine/src/emqx_rule_actions.erl | 14 +++++++++++--- changes/v4.3.22-en.md | 2 ++ changes/v4.3.22-zh.md | 2 ++ 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 1739bd47a..e45903113 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -40,6 +40,7 @@ -define(RESOURCE_TYPE_MQTT, 'bridge_mqtt'). -define(RESOURCE_TYPE_RPC, 'bridge_rpc'). +-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish). -define(RESOURCE_CONFIG_SPEC_MQTT, #{ address => #{ @@ -494,7 +495,7 @@ on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName, PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), TopicTks = case ForwardTopic == <<"">> of true -> undefined; - false -> emqx_rule_utils:preproc_tmpl(ForwardTopic) + false -> emqx_rule_utils:preproc_tmpl(assert_topic_valid(ForwardTopic)) end, Opts. @@ -515,7 +516,7 @@ on_action_data_to_mqtt_broker(Msg, _Env = qos = QoS, from = From, flags = Flags, - topic = Topic1, + topic = assert_topic_valid(Topic1), payload = format_data(PayloadTks, Msg), timestamp = TimeStamp}, ecpool:with_client(PoolName, @@ -583,7 +584,7 @@ options(Options, PoolName, ResId) -> Get = fun(Key) -> GetD(Key, undefined) end, Address = Get(<<"address">>), [{max_inflight_batches, 32}, - {forward_mountpoint, str(Get(<<"mountpoint">>))}, + {forward_mountpoint, str(assert_topic_valid(Get(<<"mountpoint">>)))}, {disk_cache, cuttlefish_flag:parse(str(GetD(<<"disk_cache">>, "off")))}, {start_type, auto}, {reconnect_delay_ms, cuttlefish_duration:parse(str(Get(<<"reconnect_interval">>)), ms)}, @@ -610,6 +611,12 @@ options(Options, PoolName, ResId) -> | maybe_ssl(Options, Get(<<"ssl">>), ResId)] end. +assert_topic_valid(T) -> + case emqx_topic:wildcard(T) of + true -> throw({?BAD_TOPIC_WITH_WILDCARD, T}); + false -> T + end. + maybe_ssl(_Options, false, _ResId) -> []; maybe_ssl(Options, true, ResId) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 557ddf423..67f68c99b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -22,6 +22,8 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). +-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish). + -define(REPUBLISH_PARAMS_SPEC, #{ target_topic => #{ order => 1, @@ -163,7 +165,7 @@ on_action_create_republish(Id, Params = #{ }) -> TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)), TargetQoS = to_qos(TargetQoS0), - TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic), + TopicTks = emqx_rule_utils:preproc_tmpl(assert_topic_valid(TargetTopic)), PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl), Params. @@ -201,7 +203,7 @@ on_action_republish(Selected, _Envs = #{ from = ActId, flags = Flags#{retain => get_retain(TargetRetain, Selected)}, headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)), payload = format_msg(PayloadTks, Selected), timestamp = Timestamp }, @@ -226,7 +228,7 @@ on_action_republish(Selected, _Envs = #{ from = ActId, flags = #{dup => false, retain => get_retain(TargetRetain, Selected)}, headers = #{republish_by => ActId}, - topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected), + topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)), payload = format_msg(PayloadTks, Selected), timestamp = erlang:system_time(millisecond) }, @@ -270,6 +272,12 @@ get_qos(-1, _Data, Default) -> Default; get_qos(TargetQoS, Data, _Default) -> qos(emqx_rule_utils:replace_var(TargetQoS, Data)). +assert_topic_valid(T) -> + case emqx_topic:wildcard(T) of + true -> throw({?BAD_TOPIC_WITH_WILDCARD, T}); + false -> T + end. + qos(<<"0">>) -> 0; qos(<<"1">>) -> 1; qos(<<"2">>) -> 2; diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index ba4b7ddbd..0004ce400 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -69,3 +69,5 @@ - Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190). Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`. See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources). + +- When republishing messages or bridge messages to other brokers, check the validity of the topic and make sure it does not have topic wildcards [#9291](https://github.com/emqx/emqx/pull/9291). diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index 0b245d698..cb1bf8e67 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -63,3 +63,5 @@ - 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id` 及 `resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。 注意在创建规则或资源时,HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。 详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。 + +- 在进行消息重发布或桥接消息到其他 mqtt broker 时,检查 topic 合法性,确定其不带有主题通配符 [#9291](https://github.com/emqx/emqx/pull/9291)。