Merge pull request #9291 from JimMoen/fix/repub-or-mqtt-bridge-topic-validate
fix: mqtt_bridge and republish validate msg topic
This commit is contained in:
commit
1887ae735b
|
@ -40,6 +40,7 @@
|
||||||
|
|
||||||
-define(RESOURCE_TYPE_MQTT, 'bridge_mqtt').
|
-define(RESOURCE_TYPE_MQTT, 'bridge_mqtt').
|
||||||
-define(RESOURCE_TYPE_RPC, 'bridge_rpc').
|
-define(RESOURCE_TYPE_RPC, 'bridge_rpc').
|
||||||
|
-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish).
|
||||||
|
|
||||||
-define(RESOURCE_CONFIG_SPEC_MQTT, #{
|
-define(RESOURCE_CONFIG_SPEC_MQTT, #{
|
||||||
address => #{
|
address => #{
|
||||||
|
@ -494,7 +495,7 @@ on_action_create_data_to_mqtt_broker(ActId, Opts = #{<<"pool">> := PoolName,
|
||||||
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
||||||
TopicTks = case ForwardTopic == <<"">> of
|
TopicTks = case ForwardTopic == <<"">> of
|
||||||
true -> undefined;
|
true -> undefined;
|
||||||
false -> emqx_rule_utils:preproc_tmpl(ForwardTopic)
|
false -> emqx_rule_utils:preproc_tmpl(assert_topic_valid(ForwardTopic))
|
||||||
end,
|
end,
|
||||||
Opts.
|
Opts.
|
||||||
|
|
||||||
|
@ -515,7 +516,7 @@ on_action_data_to_mqtt_broker(Msg, _Env =
|
||||||
qos = QoS,
|
qos = QoS,
|
||||||
from = From,
|
from = From,
|
||||||
flags = Flags,
|
flags = Flags,
|
||||||
topic = Topic1,
|
topic = assert_topic_valid(Topic1),
|
||||||
payload = format_data(PayloadTks, Msg),
|
payload = format_data(PayloadTks, Msg),
|
||||||
timestamp = TimeStamp},
|
timestamp = TimeStamp},
|
||||||
ecpool:with_client(PoolName,
|
ecpool:with_client(PoolName,
|
||||||
|
@ -583,7 +584,7 @@ options(Options, PoolName, ResId) ->
|
||||||
Get = fun(Key) -> GetD(Key, undefined) end,
|
Get = fun(Key) -> GetD(Key, undefined) end,
|
||||||
Address = Get(<<"address">>),
|
Address = Get(<<"address">>),
|
||||||
[{max_inflight_batches, 32},
|
[{max_inflight_batches, 32},
|
||||||
{forward_mountpoint, str(Get(<<"mountpoint">>))},
|
{forward_mountpoint, str(assert_topic_valid(Get(<<"mountpoint">>)))},
|
||||||
{disk_cache, cuttlefish_flag:parse(str(GetD(<<"disk_cache">>, "off")))},
|
{disk_cache, cuttlefish_flag:parse(str(GetD(<<"disk_cache">>, "off")))},
|
||||||
{start_type, auto},
|
{start_type, auto},
|
||||||
{reconnect_delay_ms, cuttlefish_duration:parse(str(Get(<<"reconnect_interval">>)), ms)},
|
{reconnect_delay_ms, cuttlefish_duration:parse(str(Get(<<"reconnect_interval">>)), ms)},
|
||||||
|
@ -610,6 +611,12 @@ options(Options, PoolName, ResId) ->
|
||||||
| maybe_ssl(Options, Get(<<"ssl">>), ResId)]
|
| maybe_ssl(Options, Get(<<"ssl">>), ResId)]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
assert_topic_valid(T) ->
|
||||||
|
case emqx_topic:wildcard(T) of
|
||||||
|
true -> throw({?BAD_TOPIC_WITH_WILDCARD, T});
|
||||||
|
false -> T
|
||||||
|
end.
|
||||||
|
|
||||||
maybe_ssl(_Options, false, _ResId) ->
|
maybe_ssl(_Options, false, _ResId) ->
|
||||||
[];
|
[];
|
||||||
maybe_ssl(Options, true, ResId) ->
|
maybe_ssl(Options, true, ResId) ->
|
||||||
|
|
|
@ -22,6 +22,8 @@
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-define(BAD_TOPIC_WITH_WILDCARD, wildcard_topic_not_allowed_for_publish).
|
||||||
|
|
||||||
-define(REPUBLISH_PARAMS_SPEC, #{
|
-define(REPUBLISH_PARAMS_SPEC, #{
|
||||||
target_topic => #{
|
target_topic => #{
|
||||||
order => 1,
|
order => 1,
|
||||||
|
@ -163,7 +165,7 @@ on_action_create_republish(Id, Params = #{
|
||||||
}) ->
|
}) ->
|
||||||
TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
|
TargetRetain = to_retain(maps:get(<<"target_retain">>, Params, <<"false">>)),
|
||||||
TargetQoS = to_qos(TargetQoS0),
|
TargetQoS = to_qos(TargetQoS0),
|
||||||
TopicTks = emqx_rule_utils:preproc_tmpl(TargetTopic),
|
TopicTks = emqx_rule_utils:preproc_tmpl(assert_topic_valid(TargetTopic)),
|
||||||
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
PayloadTks = emqx_rule_utils:preproc_tmpl(PayloadTmpl),
|
||||||
Params.
|
Params.
|
||||||
|
|
||||||
|
@ -201,7 +203,7 @@ on_action_republish(Selected, _Envs = #{
|
||||||
from = ActId,
|
from = ActId,
|
||||||
flags = Flags#{retain => get_retain(TargetRetain, Selected)},
|
flags = Flags#{retain => get_retain(TargetRetain, Selected)},
|
||||||
headers = #{republish_by => ActId},
|
headers = #{republish_by => ActId},
|
||||||
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)),
|
||||||
payload = format_msg(PayloadTks, Selected),
|
payload = format_msg(PayloadTks, Selected),
|
||||||
timestamp = Timestamp
|
timestamp = Timestamp
|
||||||
},
|
},
|
||||||
|
@ -226,7 +228,7 @@ on_action_republish(Selected, _Envs = #{
|
||||||
from = ActId,
|
from = ActId,
|
||||||
flags = #{dup => false, retain => get_retain(TargetRetain, Selected)},
|
flags = #{dup => false, retain => get_retain(TargetRetain, Selected)},
|
||||||
headers = #{republish_by => ActId},
|
headers = #{republish_by => ActId},
|
||||||
topic = emqx_rule_utils:proc_tmpl(TopicTks, Selected),
|
topic = assert_topic_valid(emqx_rule_utils:proc_tmpl(TopicTks, Selected)),
|
||||||
payload = format_msg(PayloadTks, Selected),
|
payload = format_msg(PayloadTks, Selected),
|
||||||
timestamp = erlang:system_time(millisecond)
|
timestamp = erlang:system_time(millisecond)
|
||||||
},
|
},
|
||||||
|
@ -270,6 +272,12 @@ get_qos(-1, _Data, Default) -> Default;
|
||||||
get_qos(TargetQoS, Data, _Default) ->
|
get_qos(TargetQoS, Data, _Default) ->
|
||||||
qos(emqx_rule_utils:replace_var(TargetQoS, Data)).
|
qos(emqx_rule_utils:replace_var(TargetQoS, Data)).
|
||||||
|
|
||||||
|
assert_topic_valid(T) ->
|
||||||
|
case emqx_topic:wildcard(T) of
|
||||||
|
true -> throw({?BAD_TOPIC_WITH_WILDCARD, T});
|
||||||
|
false -> T
|
||||||
|
end.
|
||||||
|
|
||||||
qos(<<"0">>) -> 0;
|
qos(<<"0">>) -> 0;
|
||||||
qos(<<"1">>) -> 1;
|
qos(<<"1">>) -> 1;
|
||||||
qos(<<"2">>) -> 2;
|
qos(<<"2">>) -> 2;
|
||||||
|
|
|
@ -69,3 +69,5 @@
|
||||||
- Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190).
|
- Make sure Rule-Engine API supports Percent-encoding `rule_id` and `resource_id` in HTTP request path [#9190](https://github.com/emqx/emqx/pull/9190).
|
||||||
Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`.
|
Note that the `id` in `POST /api/v4/rules` should be literals (not encoded) when creating a `rule` or `resource`.
|
||||||
See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources).
|
See docs [Create Rule](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) [Create Resource](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources).
|
||||||
|
|
||||||
|
- When republishing messages or bridge messages to other brokers, check the validity of the topic and make sure it does not have topic wildcards [#9291](https://github.com/emqx/emqx/pull/9291).
|
||||||
|
|
|
@ -63,3 +63,5 @@
|
||||||
- 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id` 及 `resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。
|
- 使规则引擎 API 在 HTTP 请求路径中支持百分号编码的 `rule_id` 及 `resource_id` [#9190](https://github.com/emqx/emqx/pull/9190)。
|
||||||
注意在创建规则或资源时,HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。
|
注意在创建规则或资源时,HTTP body 中的 `id` 字段仍为字面值,而不是编码之后的值。
|
||||||
详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。
|
详情请参考 [创建规则](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-rules) 和 [创建资源](https://www.emqx.io/docs/zh/v4.3/advanced/http-api.html#post-api-v4-resources)。
|
||||||
|
|
||||||
|
- 在进行消息重发布或桥接消息到其他 mqtt broker 时,检查 topic 合法性,确定其不带有主题通配符 [#9291](https://github.com/emqx/emqx/pull/9291)。
|
||||||
|
|
Loading…
Reference in New Issue