feat(mqtt): add option to customize clientid prefix for egress bridges

https://emqx.atlassian.net/browse/EMQX-8445

Currently the bridge client’s client ID is prefixed with the resource
ID.

Sometimes it’s useful for users to have control of this prefix,
e.g. prefix based ACL rules in the target broker.
This commit is contained in:
Thales Macedo Garitezi 2022-12-23 09:48:25 -03:00
parent 700c64fcf0
commit 35dc75b7ed
6 changed files with 65 additions and 5 deletions

View File

@ -225,6 +225,7 @@ t_mqtt_conn_bridge_egress(_) ->
<<"name">> := ?BRIDGE_NAME_EGRESS
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@ -238,8 +239,10 @@ t_mqtt_conn_bridge_egress(_) ->
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic, #message{payload = Payload}} ->
{deliver, RemoteTopic, #message{payload = Payload, from = From}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
Size = byte_size(ResourceID),
?assertMatch(<<ResourceID:Size/binary, _/binary>>, From),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
@ -271,6 +274,45 @@ t_mqtt_conn_bridge_egress(_) ->
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_egress_custom_clientid_prefix(_Config) ->
User1 = <<"user1">>,
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?SERVER_CONF(User1)#{
<<"clientid_prefix">> => <<"my-custom-prefix">>,
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF
}
),
#{
<<"type">> := ?TYPE_MQTT,
<<"name">> := ?BRIDGE_NAME_EGRESS
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
emqx:publish(emqx_message:make(LocalTopic, Payload)),
receive
{deliver, RemoteTopic, #message{from = From}} ->
Size = byte_size(ResourceID),
?assertMatch(<<"my-custom-prefix:", ResouceID:Size/binary, _/binary>>, From),
ok
after 1000 ->
ct:fail("should have published message")
end,
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_mqtt_conn_bridge_ingress_and_egress(_) ->
User1 = <<"user1">>,
%% create an MQTT bridge, using POST

View File

@ -337,4 +337,15 @@ Template with variables is allowed.
}
}
clientid_prefix {
desc {
en: """Optional prefix to prepend to the clientid used by egress bridges."""
zh: """可选的前缀用于在出口网桥使用的clientid前加上前缀。"""
}
label: {
en: "Clientid Prefix"
zh: "客户ID前缀"
}
}
}

View File

@ -152,7 +152,7 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
name => InstanceId,
clientid => clientid(InstId),
clientid => clientid(InstId, Conf),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
@ -246,7 +246,7 @@ basic_config(
ssl := #{enable := EnableSsl} = Ssl
} = Conf
) ->
BaiscConf = #{
BasicConf = #{
%% connection opts
server => Server,
%% 30s
@ -268,7 +268,7 @@ basic_config(
ssl_opts => maps:to_list(maps:remove(enable, Ssl)),
if_record_metrics => true
},
maybe_put_fields([username, password], Conf, BaiscConf).
maybe_put_fields([username, password], Conf, BasicConf).
maybe_put_fields(Fields, Conf, Acc0) ->
lists:foldl(
@ -285,5 +285,7 @@ maybe_put_fields(Fields, Conf, Acc0) ->
ms_to_s(Ms) ->
erlang:ceil(Ms / 1000).
clientid(Id) ->
clientid(Id, _Conf = #{clientid_prefix := Prefix = <<_/binary>>}) ->
iolist_to_binary([Prefix, ":", Id, ":", atom_to_list(node())]);
clientid(Id, _Conf) ->
iolist_to_binary([Id, ":", atom_to_list(node())]).

View File

@ -75,6 +75,7 @@ fields("server_configs") ->
desc => ?DESC("server")
}
)},
{clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})},
{reconnect_interval,
mk_duration(
"Reconnect interval. Delay for the MQTT bridge to retry establishing the connection "

View File

@ -14,6 +14,8 @@
- Return `204` instead of `200` for `PUT /authenticator/:id` [#9434](https://github.com/emqx/emqx/pull/9434/).
- Added the option to customize the clientid prefix of egress MQTT bridges. [#9609](https://github.com/emqx/emqx/pull/9609)
## Bug fixes
- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487).

View File

@ -14,6 +14,8 @@
- 现在,`PUT /authenticator/:id` 将会返回 204 而不再是 200 [#9434](https://github.com/emqx/emqx/pull/9434/)。
- 增加了自定义出口MQTT桥的clientid前缀的选项。[#9609](https://github.com/emqx/emqx/pull/9609)
## 修复
- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。