diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 49b333216..1bb477dad 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -52,7 +52,7 @@ -define(INGRESS_CONF, #{ <<"remote">> => #{ <<"topic">> => <>, - <<"qos">> => 2 + <<"qos">> => 1 }, <<"local">> => #{ <<"topic">> => <>, @@ -77,7 +77,7 @@ -define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{ <<"remote">> => #{ <<"topic">> => <>, - <<"qos">> => 2 + <<"qos">> => 1 }, <<"local">> => #{ <<"topic">> => <>, @@ -242,26 +242,54 @@ t_mqtt_conn_bridge_ingress(_) -> ok. -t_mqtt_conn_bridge_ignores_clean_start(_) -> +t_mqtt_egress_bridge_ignores_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeID = create_bridge( ?SERVER_CONF(<<"user1">>)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => BridgeName, - <<"ingress">> => ?INGRESS_CONF, + <<"egress">> => ?EGRESS_CONF, <<"clean_start">> => false } ), - {ok, 200, BridgeJSON} = request(get, uri(["bridges", BridgeID]), []), - Bridge = jsx:decode(BridgeJSON), - - %% verify that there's no `clean_start` in response - ?assertEqual(#{}, maps:with([<<"clean_start">>], Bridge)), + {ok, _, #{state := #{name := WorkerName}}} = + emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)), + ?assertMatch( + #{clean_start := true}, + maps:from_list(emqx_connector_mqtt_worker:info(WorkerName)) + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + + ok. + +t_mqtt_conn_bridge_ingress_downgrades_qos_2(_) -> + BridgeName = atom_to_binary(?FUNCTION_NAME), + BridgeID = create_bridge( + ?SERVER_CONF(<<"user1">>)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => BridgeName, + <<"ingress">> => emqx_map_lib:deep_merge( + ?INGRESS_CONF, + #{<<"remote">> => #{<<"qos">> => 2}} + ) + } + ), + + RemoteTopic = <>, + LocalTopic = <>, + Payload = <<"whatqos">>, + emqx:subscribe(LocalTopic), + emqx:publish(emqx_message:make(undefined, _QoS = 2, RemoteTopic, Payload)), + + %% we should receive a message on the local broker, with specified topic + Msg = assert_mqtt_msg_received(LocalTopic, Payload), + ?assertMatch(#message{qos = 1}, Msg), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. diff --git a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf index b0d1e0821..b075681f3 100644 --- a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf +++ b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf @@ -159,8 +159,8 @@ broker MUST support this feature.""" clean_start { desc { - en: "The clean-start or the clean-session of the MQTT protocol" - zh: "MQTT 清除会话" + en: "Whether or not to start a clean session when reconnecting a remote broker for ingress bridge" + zh: "与 ingress MQTT 桥的远程服务器重连时是否清除老的 MQTT 会话。" } label: { en: "Clean Session" diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index cffd138b5..5c62e7086 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -251,6 +251,7 @@ basic_config( server := Server, proto_ver := ProtoVer, bridge_mode := BridgeMode, + clean_start := CleanStart, keepalive := KeepAlive, retry_interval := RetryIntv, max_inflight := MaxInflight, @@ -270,11 +271,8 @@ basic_config( %% non-standard mqtt connection packets will be filtered out by LB. %% So let's disable bridge_mode. bridge_mode => BridgeMode, - %% NOTE - %% We are ignoring the user configuration here because there's currently no reliable way - %% to ensure proper session recovery according to the MQTT spec. - clean_start => true, keepalive => ms_to_s(KeepAlive), + clean_start => CleanStart, retry_interval => RetryIntv, max_inflight => MaxInflight, ssl => EnableSsl, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index f2163d952..073b75ae8 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -18,6 +18,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). -behaviour(hocon_schema). @@ -111,9 +112,7 @@ fields("server_configs") -> boolean(), #{ default => true, - desc => ?DESC("clean_start"), - hidden => true, - deprecated => {since, "v5.0.16"} + desc => ?DESC("clean_start") } )}, {keepalive, mk_duration("MQTT Keepalive.", #{default => "300s"})}, @@ -143,8 +142,7 @@ fields("ingress") -> mk( ref(?MODULE, "ingress_local"), #{ - desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local"), - is_required => false + desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local") } )} ]; @@ -161,7 +159,7 @@ fields("ingress_remote") -> )}, {qos, mk( - qos(), + emqx_schema:qos(), #{ default => 1, desc => ?DESC("ingress_remote_qos") diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 6da63f99a..8e41a9f0f 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -75,6 +75,7 @@ connect/1, status/1, ping/1, + info/1, send_to_remote/2, send_to_remote_async/3 ]). @@ -114,7 +115,7 @@ start_link(Name, BridgeOpts) -> name => Name, options => BridgeOpts }), - Conf = init_config(BridgeOpts), + Conf = init_config(Name, BridgeOpts), Options = mk_client_options(Conf, BridgeOpts), case emqtt:start_link(Options) of {ok, Pid} -> @@ -129,13 +130,13 @@ start_link(Name, BridgeOpts) -> Error end. -init_config(Opts) -> +init_config(Name, Opts) -> Mountpoint = maps:get(forward_mountpoint, Opts, undefined), Subscriptions = maps:get(subscriptions, Opts, undefined), Forwards = maps:get(forwards, Opts, undefined), #{ mountpoint => format_mountpoint(Mountpoint), - subscriptions => pre_process_subscriptions(Subscriptions), + subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts), forwards => pre_process_forwards(Forwards) }. @@ -145,6 +146,16 @@ mk_client_options(Conf, BridgeOpts) -> Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined), Subscriptions = maps:get(subscriptions, Conf), Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), + CleanStart = + case Subscriptions of + #{remote := _} -> + maps:get(clean_start, BridgeOpts); + undefined -> + %% NOTE + %% We are ignoring the user configuration here because there's currently no reliable way + %% to ensure proper session recovery according to the MQTT spec. + true + end, Opts = maps:without( [ address, @@ -160,6 +171,7 @@ mk_client_options(Conf, BridgeOpts) -> Opts#{ msg_handler => mk_client_event_handler(Vars, #{server => Server}), hosts => [HostPort], + clean_start => CleanStart, force_ping => true, proto_ver => maps:get(proto_ver, BridgeOpts, v4) }. @@ -205,10 +217,12 @@ subscribe_remote_topics(_Ref, undefined) -> stop(Ref) -> emqtt:stop(ref(Ref)). +info(Ref) -> + emqtt:info(ref(Ref)). + status(Ref) -> try - Info = emqtt:info(ref(Ref)), - case proplists:get_value(socket, Info) of + case proplists:get_value(socket, info(Ref)) of Socket when Socket /= undefined -> connected; undefined -> @@ -282,11 +296,18 @@ format_mountpoint(undefined) -> format_mountpoint(Prefix) -> binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). -pre_process_subscriptions(undefined) -> +pre_process_subscriptions(undefined, _, _) -> undefined; -pre_process_subscriptions(#{local := LC} = Conf) when is_map(Conf) -> - Conf#{local => pre_process_in_out_common(LC)}; -pre_process_subscriptions(Conf) when is_map(Conf) -> +pre_process_subscriptions( + #{remote := RC, local := LC} = Conf, + BridgeName, + BridgeOpts +) when is_map(Conf) -> + Conf#{ + remote => pre_process_in_remote(RC, BridgeName, BridgeOpts), + local => pre_process_in_out_common(LC) + }; +pre_process_subscriptions(Conf, _, _) when is_map(Conf) -> %% have no 'local' field in the config undefined. @@ -314,6 +335,27 @@ pre_process_conf(Key, Conf) -> Conf#{Key => Val} end. +pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) -> + QoS = downgrade_ingress_qos(QoSIn), + case QoS of + QoSIn -> + ok; + _ -> + ?SLOG(warning, #{ + msg => "downgraded_unsupported_ingress_qos", + qos_configured => QoSIn, + qos_used => QoS, + name => BridgeName, + options => BridgeOpts + }) + end, + Conf#{qos => QoS}. + +downgrade_ingress_qos(2) -> + 1; +downgrade_ingress_qos(QoS) -> + QoS. + get_pid(Name) -> case gproc:where(?NAME(Name)) of Pid when is_pid(Pid) -> diff --git a/changes/v5.0.17/fix-9952.en.md b/changes/v5.0.17/fix-9952.en.md new file mode 100644 index 000000000..3a951b423 --- /dev/null +++ b/changes/v5.0.17/fix-9952.en.md @@ -0,0 +1,2 @@ +Disallow subscribing with QoS 2 for ingress MQTT bridges. +Allow user to configure `clean_start` option for ingress MQTT bridges however. diff --git a/changes/v5.0.17/fix-9952.zh.md b/changes/v5.0.17/fix-9952.zh.md new file mode 100644 index 000000000..1b9064c7d --- /dev/null +++ b/changes/v5.0.17/fix-9952.zh.md @@ -0,0 +1,2 @@ +不允许对 ingress MQTT 网桥的 QoS 2 进行订阅。 +但允许用户为 ingress MQTT 桥配置 "clean_start" 选项。