Merge pull request #9609 from thalesmg/mqtt-conn-clientid-v50

feat(mqtt): add option to customize clientid prefix for egress bridges
This commit is contained in:
Thales Macedo Garitezi 2022-12-23 12:31:44 -03:00 committed by GitHub
commit 1e3b0c3777
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 65 additions and 5 deletions

View File

@ -225,6 +225,7 @@ t_mqtt_conn_bridge_egress(_) ->
<<"name">> := ?BRIDGE_NAME_EGRESS
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@ -238,8 +239,10 @@ t_mqtt_conn_bridge_egress(_) ->
%% we should receive a message on the "remote" broker, with specified topic
?assert(
receive
{deliver, RemoteTopic, #message{payload = Payload}} ->
{deliver, RemoteTopic, #message{payload = Payload, from = From}} ->
ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]),
Size = byte_size(ResourceID),
?assertMatch(<<ResourceID:Size/binary, _/binary>>, From),
true;
Msg ->
ct:pal("Msg: ~p", [Msg]),
@ -271,6 +274,45 @@ t_mqtt_conn_bridge_egress(_) ->
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_egress_custom_clientid_prefix(_Config) ->
User1 = <<"user1">>,
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?SERVER_CONF(User1)#{
<<"clientid_prefix">> => <<"my-custom-prefix">>,
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF
}
),
#{
<<"type">> := ?TYPE_MQTT,
<<"name">> := ?BRIDGE_NAME_EGRESS
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
emqx:publish(emqx_message:make(LocalTopic, Payload)),
receive
{deliver, RemoteTopic, #message{from = From}} ->
Size = byte_size(ResourceID),
?assertMatch(<<"my-custom-prefix:", ResouceID:Size/binary, _/binary>>, From),
ok
after 1000 ->
ct:fail("should have published message")
end,
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_mqtt_conn_bridge_ingress_and_egress(_) ->
User1 = <<"user1">>,
%% create an MQTT bridge, using POST

View File

@ -337,4 +337,15 @@ Template with variables is allowed.
}
}
clientid_prefix {
desc {
en: """Optional prefix to prepend to the clientid used by egress bridges."""
zh: """可选的前缀用于在出口网桥使用的clientid前加上前缀。"""
}
label: {
en: "Clientid Prefix"
zh: "客户ID前缀"
}
}
}

View File

@ -152,7 +152,7 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
name => InstanceId,
clientid => clientid(InstId),
clientid => clientid(InstId, Conf),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
@ -246,7 +246,7 @@ basic_config(
ssl := #{enable := EnableSsl} = Ssl
} = Conf
) ->
BaiscConf = #{
BasicConf = #{
%% connection opts
server => Server,
%% 30s
@ -268,7 +268,7 @@ basic_config(
ssl_opts => maps:to_list(maps:remove(enable, Ssl)),
if_record_metrics => true
},
maybe_put_fields([username, password], Conf, BaiscConf).
maybe_put_fields([username, password], Conf, BasicConf).
maybe_put_fields(Fields, Conf, Acc0) ->
lists:foldl(
@ -285,5 +285,7 @@ maybe_put_fields(Fields, Conf, Acc0) ->
ms_to_s(Ms) ->
erlang:ceil(Ms / 1000).
clientid(Id) ->
clientid(Id, _Conf = #{clientid_prefix := Prefix = <<_/binary>>}) ->
iolist_to_binary([Prefix, ":", Id, ":", atom_to_list(node())]);
clientid(Id, _Conf) ->
iolist_to_binary([Id, ":", atom_to_list(node())]).

View File

@ -75,6 +75,7 @@ fields("server_configs") ->
desc => ?DESC("server")
}
)},
{clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})},
{reconnect_interval,
mk_duration(
"Reconnect interval. Delay for the MQTT bridge to retry establishing the connection "

View File

@ -14,6 +14,8 @@
- Return `204` instead of `200` for `PUT /authenticator/:id` [#9434](https://github.com/emqx/emqx/pull/9434/).
- Added the option to customize the clientid prefix of egress MQTT bridges. [#9609](https://github.com/emqx/emqx/pull/9609)
## Bug fixes
- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487).

View File

@ -14,6 +14,8 @@
- 现在,`PUT /authenticator/:id` 将会返回 204 而不再是 200 [#9434](https://github.com/emqx/emqx/pull/9434/)。
- 增加了自定义出口MQTT桥的clientid前缀的选项。[#9609](https://github.com/emqx/emqx/pull/9609)
## 修复
- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。