From 7002fe2ef415975fb75adf7ceda8fc0e396ea6c9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 10 Feb 2023 16:15:46 +0300 Subject: [PATCH] fix(mqtt-bridge): disallow QoS 2 on ingress bridges --- .../test/emqx_bridge_mqtt_SUITE.erl | 32 +++++++++++++- .../src/mqtt/emqx_connector_mqtt_schema.erl | 6 +-- .../src/mqtt/emqx_connector_mqtt_worker.erl | 42 +++++++++++++++---- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 49b333216..40a8c0bf2 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -52,7 +52,7 @@ -define(INGRESS_CONF, #{ <<"remote">> => #{ <<"topic">> => <>, - <<"qos">> => 2 + <<"qos">> => 1 }, <<"local">> => #{ <<"topic">> => <>, @@ -77,7 +77,7 @@ -define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{ <<"remote">> => #{ <<"topic">> => <>, - <<"qos">> => 2 + <<"qos">> => 1 }, <<"local">> => #{ <<"topic">> => <>, @@ -265,6 +265,34 @@ t_mqtt_conn_bridge_ignores_clean_start(_) -> ok. +t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) -> + BridgeName = atom_to_binary(?FUNCTION_NAME), + BridgeID = create_bridge( + ?SERVER_CONF(<<"user1">>)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => BridgeName, + <<"ingress">> => emqx_map_lib:deep_merge( + ?INGRESS_CONF, + #{<<"remote">> => #{<<"qos">> => 2}} + ) + } + ), + + RemoteTopic = <>, + LocalTopic = <>, + Payload = <<"whatqos">>, + emqx:subscribe(LocalTopic), + emqx:publish(emqx_message:make(undefined, _QoS = 2, RemoteTopic, Payload)), + + %% we should receive a message on the local broker, with specified topic + Msg = assert_mqtt_msg_received(LocalTopic, Payload), + ?assertMatch(#message{qos = 1}, Msg), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + + ok. + t_mqtt_conn_bridge_ingress_no_payload_template(_) -> User1 = <<"user1">>, BridgeIDIngress = create_bridge( diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index f2163d952..365b082f2 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -18,6 +18,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). -behaviour(hocon_schema). @@ -143,8 +144,7 @@ fields("ingress") -> mk( ref(?MODULE, "ingress_local"), #{ - desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local"), - is_required => false + desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local") } )} ]; @@ -161,7 +161,7 @@ fields("ingress_remote") -> )}, {qos, mk( - qos(), + emqx_schema:qos(), #{ default => 1, desc => ?DESC("ingress_remote_qos") diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 6da63f99a..7240151a5 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -114,7 +114,7 @@ start_link(Name, BridgeOpts) -> name => Name, options => BridgeOpts }), - Conf = init_config(BridgeOpts), + Conf = init_config(Name, BridgeOpts), Options = mk_client_options(Conf, BridgeOpts), case emqtt:start_link(Options) of {ok, Pid} -> @@ -129,13 +129,13 @@ start_link(Name, BridgeOpts) -> Error end. -init_config(Opts) -> +init_config(Name, Opts) -> Mountpoint = maps:get(forward_mountpoint, Opts, undefined), Subscriptions = maps:get(subscriptions, Opts, undefined), Forwards = maps:get(forwards, Opts, undefined), #{ mountpoint => format_mountpoint(Mountpoint), - subscriptions => pre_process_subscriptions(Subscriptions), + subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts), forwards => pre_process_forwards(Forwards) }. @@ -282,11 +282,18 @@ format_mountpoint(undefined) -> format_mountpoint(Prefix) -> binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). -pre_process_subscriptions(undefined) -> +pre_process_subscriptions(undefined, _, _) -> undefined; -pre_process_subscriptions(#{local := LC} = Conf) when is_map(Conf) -> - Conf#{local => pre_process_in_out_common(LC)}; -pre_process_subscriptions(Conf) when is_map(Conf) -> +pre_process_subscriptions( + #{remote := RC, local := LC} = Conf, + BridgeName, + BridgeOpts +) when is_map(Conf) -> + Conf#{ + remote => pre_process_in_remote(RC, BridgeName, BridgeOpts), + local => pre_process_in_out_common(LC) + }; +pre_process_subscriptions(Conf, _, _) when is_map(Conf) -> %% have no 'local' field in the config undefined. @@ -314,6 +321,27 @@ pre_process_conf(Key, Conf) -> Conf#{Key => Val} end. +pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) -> + QoS = downgrade_ingress_qos(QoSIn), + case QoS of + QoSIn -> + ok; + _ -> + ?SLOG(warning, #{ + msg => "downgraded_unsupported_ingress_qos", + qos_configured => QoSIn, + qos_used => QoS, + name => BridgeName, + options => BridgeOpts + }) + end, + Conf#{qos => QoS}. + +downgrade_ingress_qos(2) -> + 1; +downgrade_ingress_qos(QoS) -> + QoS. + get_pid(Name) -> case gproc:where(?NAME(Name)) of Pid when is_pid(Pid) ->