From 35dc75b7ed8046787d914ba2fdbd3436af4a2270 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 23 Dec 2022 09:48:25 -0300 Subject: [PATCH] feat(mqtt): add option to customize clientid prefix for egress bridges MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://emqx.atlassian.net/browse/EMQX-8445 Currently the bridge client’s client ID is prefixed with the resource ID. Sometimes it’s useful for users to have control of this prefix, e.g. prefix based ACL rules in the target broker. --- .../test/emqx_bridge_mqtt_SUITE.erl | 44 ++++++++++++++++++- .../i18n/emqx_connector_mqtt_schema.conf | 11 +++++ .../src/emqx_connector_mqtt.erl | 10 +++-- .../src/mqtt/emqx_connector_mqtt_schema.erl | 1 + changes/v5.0.13-en.md | 2 + changes/v5.0.13-zh.md | 2 + 6 files changed, 65 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index ec3caff7d..1bf156ed4 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -225,6 +225,7 @@ t_mqtt_conn_bridge_egress(_) -> <<"name">> := ?BRIDGE_NAME_EGRESS } = jsx:decode(Bridge), BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -238,8 +239,10 @@ t_mqtt_conn_bridge_egress(_) -> %% we should receive a message on the "remote" broker, with specified topic ?assert( receive - {deliver, RemoteTopic, #message{payload = Payload}} -> + {deliver, RemoteTopic, #message{payload = Payload, from = From}} -> ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + Size = byte_size(ResourceID), + ?assertMatch(<>, From), true; Msg -> ct:pal("Msg: ~p", [Msg]), @@ -271,6 +274,45 @@ t_mqtt_conn_bridge_egress(_) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. +t_egress_custom_clientid_prefix(_Config) -> + User1 = <<"user1">>, + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"clientid_prefix">> => <<"my-custom-prefix">>, + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_EGRESS + } = jsx:decode(Bridge), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + LocalTopic = <>, + RemoteTopic = <>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + timer:sleep(100), + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + receive + {deliver, RemoteTopic, #message{from = From}} -> + Size = byte_size(ResourceID), + ?assertMatch(<<"my-custom-prefix:", ResouceID:Size/binary, _/binary>>, From), + ok + after 1000 -> + ct:fail("should have published message") + end, + + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + ok. + t_mqtt_conn_bridge_ingress_and_egress(_) -> User1 = <<"user1">>, %% create an MQTT bridge, using POST diff --git a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf index d7e6cc033..6f573bb73 100644 --- a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf +++ b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf @@ -337,4 +337,15 @@ Template with variables is allowed. } } + clientid_prefix { + desc { + en: """Optional prefix to prepend to the clientid used by egress bridges.""" + zh: """可选的前缀,用于在出口网桥使用的clientid前加上前缀。""" + } + label: { + en: "Clientid Prefix" + zh: "客户ID前缀" + } + } + } diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 438c7b87e..f22563960 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -152,7 +152,7 @@ on_start(InstId, Conf) -> BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ name => InstanceId, - clientid => clientid(InstId), + clientid => clientid(InstId, Conf), subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId), forwards => make_forward_confs(maps:get(egress, Conf, undefined)) }, @@ -246,7 +246,7 @@ basic_config( ssl := #{enable := EnableSsl} = Ssl } = Conf ) -> - BaiscConf = #{ + BasicConf = #{ %% connection opts server => Server, %% 30s @@ -268,7 +268,7 @@ basic_config( ssl_opts => maps:to_list(maps:remove(enable, Ssl)), if_record_metrics => true }, - maybe_put_fields([username, password], Conf, BaiscConf). + maybe_put_fields([username, password], Conf, BasicConf). maybe_put_fields(Fields, Conf, Acc0) -> lists:foldl( @@ -285,5 +285,7 @@ maybe_put_fields(Fields, Conf, Acc0) -> ms_to_s(Ms) -> erlang:ceil(Ms / 1000). -clientid(Id) -> +clientid(Id, _Conf = #{clientid_prefix := Prefix = <<_/binary>>}) -> + iolist_to_binary([Prefix, ":", Id, ":", atom_to_list(node())]); +clientid(Id, _Conf) -> iolist_to_binary([Id, ":", atom_to_list(node())]). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 4c6d9cb84..1c9f66d21 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -75,6 +75,7 @@ fields("server_configs") -> desc => ?DESC("server") } )}, + {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})}, {reconnect_interval, mk_duration( "Reconnect interval. Delay for the MQTT bridge to retry establishing the connection " diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index 38ca0a967..3381f3a4a 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -14,6 +14,8 @@ - Return `204` instead of `200` for `PUT /authenticator/:id` [#9434](https://github.com/emqx/emqx/pull/9434/). +- Added the option to customize the clientid prefix of egress MQTT bridges. [#9609](https://github.com/emqx/emqx/pull/9609) + ## Bug fixes - Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487). diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index 619803dfc..4a5b0f67b 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -14,6 +14,8 @@ - 现在,`PUT /authenticator/:id` 将会返回 204 而不再是 200 [#9434](https://github.com/emqx/emqx/pull/9434/)。 +- 增加了自定义出口MQTT桥的clientid前缀的选项。[#9609](https://github.com/emqx/emqx/pull/9609) + ## 修复 - 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。