diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index ec3caff7d..1bf156ed4 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -225,6 +225,7 @@ t_mqtt_conn_bridge_egress(_) -> <<"name">> := ?BRIDGE_NAME_EGRESS } = jsx:decode(Bridge), BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -238,8 +239,10 @@ t_mqtt_conn_bridge_egress(_) -> %% we should receive a message on the "remote" broker, with specified topic ?assert( receive - {deliver, RemoteTopic, #message{payload = Payload}} -> + {deliver, RemoteTopic, #message{payload = Payload, from = From}} -> ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + Size = byte_size(ResourceID), + ?assertMatch(<>, From), true; Msg -> ct:pal("Msg: ~p", [Msg]), @@ -271,6 +274,45 @@ t_mqtt_conn_bridge_egress(_) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. +t_egress_custom_clientid_prefix(_Config) -> + User1 = <<"user1">>, + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"clientid_prefix">> => <<"my-custom-prefix">>, + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_EGRESS + } = jsx:decode(Bridge), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + LocalTopic = <>, + RemoteTopic = <>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + timer:sleep(100), + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + receive + {deliver, RemoteTopic, #message{from = From}} -> + Size = byte_size(ResourceID), + ?assertMatch(<<"my-custom-prefix:", ResouceID:Size/binary, _/binary>>, From), + ok + after 1000 -> + ct:fail("should have published message") + end, + + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + ok. + t_mqtt_conn_bridge_ingress_and_egress(_) -> User1 = <<"user1">>, %% create an MQTT bridge, using POST diff --git a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf index d7e6cc033..6f573bb73 100644 --- a/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf +++ b/apps/emqx_connector/i18n/emqx_connector_mqtt_schema.conf @@ -337,4 +337,15 @@ Template with variables is allowed. } } + clientid_prefix { + desc { + en: """Optional prefix to prepend to the clientid used by egress bridges.""" + zh: """可选的前缀,用于在出口网桥使用的clientid前加上前缀。""" + } + label: { + en: "Clientid Prefix" + zh: "客户ID前缀" + } + } + } diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 438c7b87e..f22563960 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -152,7 +152,7 @@ on_start(InstId, Conf) -> BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ name => InstanceId, - clientid => clientid(InstId), + clientid => clientid(InstId, Conf), subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId), forwards => make_forward_confs(maps:get(egress, Conf, undefined)) }, @@ -246,7 +246,7 @@ basic_config( ssl := #{enable := EnableSsl} = Ssl } = Conf ) -> - BaiscConf = #{ + BasicConf = #{ %% connection opts server => Server, %% 30s @@ -268,7 +268,7 @@ basic_config( ssl_opts => maps:to_list(maps:remove(enable, Ssl)), if_record_metrics => true }, - maybe_put_fields([username, password], Conf, BaiscConf). + maybe_put_fields([username, password], Conf, BasicConf). maybe_put_fields(Fields, Conf, Acc0) -> lists:foldl( @@ -285,5 +285,7 @@ maybe_put_fields(Fields, Conf, Acc0) -> ms_to_s(Ms) -> erlang:ceil(Ms / 1000). -clientid(Id) -> +clientid(Id, _Conf = #{clientid_prefix := Prefix = <<_/binary>>}) -> + iolist_to_binary([Prefix, ":", Id, ":", atom_to_list(node())]); +clientid(Id, _Conf) -> iolist_to_binary([Id, ":", atom_to_list(node())]). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 4c6d9cb84..1c9f66d21 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -75,6 +75,7 @@ fields("server_configs") -> desc => ?DESC("server") } )}, + {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})}, {reconnect_interval, mk_duration( "Reconnect interval. Delay for the MQTT bridge to retry establishing the connection " diff --git a/changes/v5.0.13-en.md b/changes/v5.0.13-en.md index 38ca0a967..3381f3a4a 100644 --- a/changes/v5.0.13-en.md +++ b/changes/v5.0.13-en.md @@ -14,6 +14,8 @@ - Return `204` instead of `200` for `PUT /authenticator/:id` [#9434](https://github.com/emqx/emqx/pull/9434/). +- Added the option to customize the clientid prefix of egress MQTT bridges. [#9609](https://github.com/emqx/emqx/pull/9609) + ## Bug fixes - Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487). diff --git a/changes/v5.0.13-zh.md b/changes/v5.0.13-zh.md index 619803dfc..4a5b0f67b 100644 --- a/changes/v5.0.13-zh.md +++ b/changes/v5.0.13-zh.md @@ -14,6 +14,8 @@ - 现在,`PUT /authenticator/:id` 将会返回 204 而不再是 200 [#9434](https://github.com/emqx/emqx/pull/9434/)。 +- 增加了自定义出口MQTT桥的clientid前缀的选项。[#9609](https://github.com/emqx/emqx/pull/9609) + ## 修复 - 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。