fix(mqtt-bridge): allow to configure `clean_start` for ingresses

This commit is contained in:
Andrew Mayorov 2023-02-10 16:17:55 +03:00
parent 7002fe2ef4
commit cbb2885499
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 29 additions and 19 deletions

View File

@ -242,26 +242,26 @@ t_mqtt_conn_bridge_ingress(_) ->
ok.
t_mqtt_conn_bridge_ignores_clean_start(_) ->
t_mqtt_egress_bridge_ignores_clean_start(_) ->
BridgeName = atom_to_binary(?FUNCTION_NAME),
BridgeID = create_bridge(
?SERVER_CONF(<<"user1">>)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => BridgeName,
<<"ingress">> => ?INGRESS_CONF,
<<"egress">> => ?EGRESS_CONF,
<<"clean_start">> => false
}
),
{ok, 200, BridgeJSON} = request(get, uri(["bridges", BridgeID]), []),
Bridge = jsx:decode(BridgeJSON),
%% verify that there's no `clean_start` in response
?assertEqual(#{}, maps:with([<<"clean_start">>], Bridge)),
{ok, _, #{state := #{name := WorkerName}}} =
emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)),
?assertMatch(
#{clean_start := true},
maps:from_list(emqx_connector_mqtt_worker:info(WorkerName))
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.

View File

@ -159,8 +159,8 @@ broker MUST support this feature."""
clean_start {
desc {
en: "The clean-start or the clean-session of the MQTT protocol"
zh: "MQTT 清除会话"
en: "Whether or not to start a clean session when reconnecting a remote broker for ingress bridge"
zh: "与 ingress MQTT 桥的远程服务器重连时是否清除老的 MQTT 会话"
}
label: {
en: "Clean Session"

View File

@ -251,6 +251,7 @@ basic_config(
server := Server,
proto_ver := ProtoVer,
bridge_mode := BridgeMode,
clean_start := CleanStart,
keepalive := KeepAlive,
retry_interval := RetryIntv,
max_inflight := MaxInflight,
@ -270,11 +271,8 @@ basic_config(
%% non-standard mqtt connection packets will be filtered out by LB.
%% So let's disable bridge_mode.
bridge_mode => BridgeMode,
%% NOTE
%% We are ignoring the user configuration here because there's currently no reliable way
%% to ensure proper session recovery according to the MQTT spec.
clean_start => true,
keepalive => ms_to_s(KeepAlive),
clean_start => CleanStart,
retry_interval => RetryIntv,
max_inflight => MaxInflight,
ssl => EnableSsl,

View File

@ -112,9 +112,7 @@ fields("server_configs") ->
boolean(),
#{
default => true,
desc => ?DESC("clean_start"),
hidden => true,
deprecated => {since, "v5.0.16"}
desc => ?DESC("clean_start")
}
)},
{keepalive, mk_duration("MQTT Keepalive.", #{default => "300s"})},

View File

@ -75,6 +75,7 @@
connect/1,
status/1,
ping/1,
info/1,
send_to_remote/2,
send_to_remote_async/3
]).
@ -145,6 +146,16 @@ mk_client_options(Conf, BridgeOpts) ->
Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined),
Subscriptions = maps:get(subscriptions, Conf),
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
CleanStart =
case Subscriptions of
#{remote := _} ->
maps:get(clean_start, BridgeOpts);
undefined ->
%% NOTE
%% We are ignoring the user configuration here because there's currently no reliable way
%% to ensure proper session recovery according to the MQTT spec.
true
end,
Opts = maps:without(
[
address,
@ -160,6 +171,7 @@ mk_client_options(Conf, BridgeOpts) ->
Opts#{
msg_handler => mk_client_event_handler(Vars, #{server => Server}),
hosts => [HostPort],
clean_start => CleanStart,
force_ping => true,
proto_ver => maps:get(proto_ver, BridgeOpts, v4)
}.
@ -205,10 +217,12 @@ subscribe_remote_topics(_Ref, undefined) ->
stop(Ref) ->
emqtt:stop(ref(Ref)).
info(Ref) ->
emqtt:info(ref(Ref)).
status(Ref) ->
try
Info = emqtt:info(ref(Ref)),
case proplists:get_value(socket, Info) of
case proplists:get_value(socket, info(Ref)) of
Socket when Socket /= undefined ->
connected;
undefined ->