From cbb2885499d40903438b67154c83aa0c112fc5db Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 10 Feb 2023 16:17:55 +0300 Subject: [PATCH] fix(mqtt-bridge): allow to configure `clean_start` for ingresses --- .../test/emqx_bridge_mqtt_SUITE.erl | 16 ++++++++-------- .../i18n/emqx_connector_mqtt_schema.conf | 4 ++-- .../emqx_connector/src/emqx_connector_mqtt.erl | 6 ++---- .../src/mqtt/emqx_connector_mqtt_schema.erl | 4 +--- .../src/mqtt/emqx_connector_mqtt_worker.erl | 18 ++++++++++++++++-- 5 files changed, 29 insertions(+), 19 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 40a8c0bf2..1bb477dad 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -242,26 +242,26 @@ t_mqtt_conn_bridge_ingress(_) -> ok. -t_mqtt_conn_bridge_ignores_clean_start(_) -> +t_mqtt_egress_bridge_ignores_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeID = create_bridge( ?SERVER_CONF(<<"user1">>)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => BridgeName, - <<"ingress">> => ?INGRESS_CONF, + <<"egress">> => ?EGRESS_CONF, <<"clean_start">> => false } ), - {ok, 200, BridgeJSON} = request(get, uri(["bridges", BridgeID]), []), - Bridge = jsx:decode(BridgeJSON), - - %% verify that there's no `clean_start` in response - ?assertEqual(#{}, maps:with([<<"clean_start">>], Bridge)), + {ok, _, #{state := #{name := WorkerName}}} = + emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)), + ?assertMatch( + #{clean_start := true}, + maps:from_list(emqx_connector_mqtt_worker:info(WorkerName)) + ), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. diff --git a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf index b0d1e0821..b075681f3 100644 --- a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf +++ b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf @@ -159,8 +159,8 @@ broker MUST support this feature.""" clean_start { desc { - en: "The clean-start or the clean-session of the MQTT protocol" - zh: "MQTT 清除会话" + en: "Whether or not to start a clean session when reconnecting a remote broker for ingress bridge" + zh: "与 ingress MQTT 桥的远程服务器重连时是否清除老的 MQTT 会话。" } label: { en: "Clean Session" diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index cffd138b5..5c62e7086 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -251,6 +251,7 @@ basic_config( server := Server, proto_ver := ProtoVer, bridge_mode := BridgeMode, + clean_start := CleanStart, keepalive := KeepAlive, retry_interval := RetryIntv, max_inflight := MaxInflight, @@ -270,11 +271,8 @@ basic_config( %% non-standard mqtt connection packets will be filtered out by LB. %% So let's disable bridge_mode. bridge_mode => BridgeMode, - %% NOTE - %% We are ignoring the user configuration here because there's currently no reliable way - %% to ensure proper session recovery according to the MQTT spec. - clean_start => true, keepalive => ms_to_s(KeepAlive), + clean_start => CleanStart, retry_interval => RetryIntv, max_inflight => MaxInflight, ssl => EnableSsl, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 365b082f2..073b75ae8 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -112,9 +112,7 @@ fields("server_configs") -> boolean(), #{ default => true, - desc => ?DESC("clean_start"), - hidden => true, - deprecated => {since, "v5.0.16"} + desc => ?DESC("clean_start") } )}, {keepalive, mk_duration("MQTT Keepalive.", #{default => "300s"})}, diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 7240151a5..8e41a9f0f 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -75,6 +75,7 @@ connect/1, status/1, ping/1, + info/1, send_to_remote/2, send_to_remote_async/3 ]). @@ -145,6 +146,16 @@ mk_client_options(Conf, BridgeOpts) -> Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined), Subscriptions = maps:get(subscriptions, Conf), Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), + CleanStart = + case Subscriptions of + #{remote := _} -> + maps:get(clean_start, BridgeOpts); + undefined -> + %% NOTE + %% We are ignoring the user configuration here because there's currently no reliable way + %% to ensure proper session recovery according to the MQTT spec. + true + end, Opts = maps:without( [ address, @@ -160,6 +171,7 @@ mk_client_options(Conf, BridgeOpts) -> Opts#{ msg_handler => mk_client_event_handler(Vars, #{server => Server}), hosts => [HostPort], + clean_start => CleanStart, force_ping => true, proto_ver => maps:get(proto_ver, BridgeOpts, v4) }. @@ -205,10 +217,12 @@ subscribe_remote_topics(_Ref, undefined) -> stop(Ref) -> emqtt:stop(ref(Ref)). +info(Ref) -> + emqtt:info(ref(Ref)). + status(Ref) -> try - Info = emqtt:info(ref(Ref)), - case proplists:get_value(socket, Info) of + case proplists:get_value(socket, info(Ref)) of Socket when Socket /= undefined -> connected; undefined ->