fix(bridge): filter the topic of received msgs got from remote MQTT broker

This commit is contained in:
Shawn 2021-12-31 12:00:43 +08:00
parent 14ee053a0e
commit 9ba454a63d
1 changed files with 17 additions and 5 deletions

View File

@ -173,15 +173,27 @@ handle_publish(Msg, Vars) ->
_ = erlang:apply(Mod, Func, [Msg | Args]); _ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok _ -> ok
end, end,
case maps:get(local_topic, Vars, undefined) of maybe_publish_to_local_broker(Msg, Vars).
undefined -> ok;
_Topic ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
end.
handle_disconnected(Reason, Parent) -> handle_disconnected(Reason, Parent) ->
Parent ! {disconnected, self(), Reason}. Parent ! {disconnected, self(), Reason}.
maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopic} = Vars) ->
case maps:get(local_topic, Vars, undefined) of
undefined ->
%% local topic is not set, discard it
ok;
_ ->
case emqx_topic:match(Topic, SubTopic) of
true ->
_ = emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)),
ok;
false ->
?SLOG(warning, #{msg => "discard_message_as_topic_not_matched",
message => Msg, subscribed => SubTopic, got_topic => Topic})
end
end.
make_hdlr(Parent, Vars) -> make_hdlr(Parent, Vars) ->
#{puback => {fun ?MODULE:handle_puback/2, [Parent]}, #{puback => {fun ?MODULE:handle_puback/2, [Parent]},
publish => {fun ?MODULE:handle_publish/2, [Vars]}, publish => {fun ?MODULE:handle_publish/2, [Vars]},