From 9ba454a63d3cc75686112251f6f553d2d323f67c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 12:00:43 +0800 Subject: [PATCH] fix(bridge): filter the topic of received msgs got from remote MQTT broker --- .../src/mqtt/emqx_connector_mqtt_mod.erl | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index d7abcda84..30a1ccb30 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -173,15 +173,27 @@ handle_publish(Msg, Vars) -> _ = erlang:apply(Mod, Func, [Msg | Args]); _ -> ok end, - case maps:get(local_topic, Vars, undefined) of - undefined -> ok; - _Topic -> - emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)) - end. + maybe_publish_to_local_broker(Msg, Vars). handle_disconnected(Reason, Parent) -> Parent ! {disconnected, self(), Reason}. +maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) -> + case maps:get(local_topic, Vars, undefined) of + undefined -> + %% local topic is not set, discard it + ok; + _ -> + case emqx_topic:match(Topic, SubTopic) of + true -> + _ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)), + ok; + false -> + ?SLOG(warning, #{msg => "discard_message_as_topic_not_matched", + message => Msg, subscribed => SubTopic, got_topic => Topic}) + end + end. + make_hdlr(Parent, Vars) -> #{puback => {fun ?MODULE:handle_puback/2, [Parent]}, publish => {fun ?MODULE:handle_publish/2, [Vars]},