fix(bridge/mqtt): ensure short clientid
Some mqtt brokers do not allow long client IDs. To make it compatible with this limitation, this commit tries to limit the number of bytes under 23 with a best-effort attempt to derive it from the bridge name.
This commit is contained in:
parent
2d209ec576
commit
4669c4d552
|
@ -32,10 +32,10 @@
|
|||
%% `apps/emqx/src/bpapi/README.md'
|
||||
|
||||
%% Opensource edition
|
||||
-define(EMQX_RELEASE_CE, "5.4.0").
|
||||
-define(EMQX_RELEASE_CE, "5.4.1").
|
||||
|
||||
%% Enterprise edition
|
||||
-define(EMQX_RELEASE_EE, "5.4.0").
|
||||
-define(EMQX_RELEASE_EE, "5.4.1").
|
||||
|
||||
%% The HTTP API version
|
||||
-define(EMQX_API_VERSION, "5.0").
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx, [
|
||||
{id, "emqx"},
|
||||
{description, "EMQX Core"},
|
||||
{vsn, "5.1.17"},
|
||||
{vsn, "5.1.18"},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
|
|
|
@ -141,4 +141,5 @@ pbkdf2(MacFun, Password, Salt, Iterations, DKLength) ->
|
|||
pbkdf2:pbkdf2(MacFun, Password, Salt, Iterations, DKLength).
|
||||
|
||||
hex(X) when is_binary(X) ->
|
||||
pbkdf2:to_hex(X).
|
||||
%% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25
|
||||
string:lowercase(binary:encode_hex(X)).
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
{deps, [
|
||||
{emqx, {path, "../../apps/emqx"}}
|
||||
{emqx, {path, "../../apps/emqx"}},
|
||||
{emqx_resource, {path, "../../apps/emqx_resource"}}
|
||||
]}.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_bridge_mqtt, [
|
||||
{description, "EMQX MQTT Broker Bridge"},
|
||||
{vsn, "0.1.6"},
|
||||
{vsn, "0.1.7"},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||
|
||||
-behaviour(emqx_resource).
|
||||
|
||||
|
@ -35,6 +36,8 @@
|
|||
-export([on_async_result/2]).
|
||||
|
||||
-define(HEALTH_CHECK_TIMEOUT, 1000).
|
||||
-define(INGRESS, "I").
|
||||
-define(EGRESS, "E").
|
||||
|
||||
%% ===================================================================
|
||||
%% When use this bridge as a data source, ?MODULE:on_message_received will be called
|
||||
|
@ -66,7 +69,7 @@ on_start(ResourceId, Conf) ->
|
|||
end.
|
||||
|
||||
start_ingress(ResourceId, Conf) ->
|
||||
ClientOpts = mk_client_opts(ResourceId, "ingress", Conf),
|
||||
ClientOpts = mk_client_opts(ResourceId, ?INGRESS, Conf),
|
||||
case mk_ingress_config(ResourceId, Conf) of
|
||||
Ingress = #{} ->
|
||||
start_ingress(ResourceId, Ingress, ClientOpts);
|
||||
|
@ -91,6 +94,8 @@ start_ingress(ResourceId, Ingress, ClientOpts) ->
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
choose_ingress_pool_size(<<?TEST_ID_PREFIX, _/binary>>, _) ->
|
||||
1;
|
||||
choose_ingress_pool_size(
|
||||
ResourceId,
|
||||
#{remote := #{topic := RemoteTopic}, pool_size := PoolSize}
|
||||
|
@ -119,7 +124,7 @@ start_egress(ResourceId, Conf) ->
|
|||
% NOTE
|
||||
% We are ignoring the user configuration here because there's currently no reliable way
|
||||
% to ensure proper session recovery according to the MQTT spec.
|
||||
ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)),
|
||||
ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, ?EGRESS, Conf)),
|
||||
case mk_egress_config(Conf) of
|
||||
Egress = #{} ->
|
||||
start_egress(ResourceId, Egress, ClientOpts);
|
||||
|
@ -326,9 +331,10 @@ mk_client_opts(
|
|||
],
|
||||
Config
|
||||
),
|
||||
Name = parse_id_to_name(ResourceId),
|
||||
mk_client_opt_password(Options#{
|
||||
hosts => [HostPort],
|
||||
clientid => clientid(ResourceId, ClientScope, Config),
|
||||
clientid => clientid(Name, ClientScope, Config),
|
||||
connect_timeout => 30,
|
||||
keepalive => ms_to_s(KeepAlive),
|
||||
force_ping => true,
|
||||
|
@ -336,6 +342,12 @@ mk_client_opts(
|
|||
ssl_opts => maps:to_list(maps:remove(enable, Ssl))
|
||||
}).
|
||||
|
||||
parse_id_to_name(<<?TEST_ID_PREFIX, Name/binary>>) ->
|
||||
Name;
|
||||
parse_id_to_name(Id) ->
|
||||
{_Type, Name} = emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}),
|
||||
Name.
|
||||
|
||||
mk_client_opt_password(Options = #{password := Secret}) ->
|
||||
%% TODO: Teach `emqtt` to accept 0-arity closures as passwords.
|
||||
Options#{password := emqx_secret:unwrap(Secret)};
|
||||
|
@ -345,7 +357,9 @@ mk_client_opt_password(Options) ->
|
|||
ms_to_s(Ms) ->
|
||||
erlang:ceil(Ms / 1000).
|
||||
|
||||
clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) ->
|
||||
iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]);
|
||||
clientid(Id, ClientScope, _Conf) ->
|
||||
iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]).
|
||||
clientid(Name, ClientScope, _Conf = #{clientid_prefix := Prefix}) when
|
||||
is_binary(Prefix) andalso Prefix =/= <<>>
|
||||
->
|
||||
emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name, ClientScope]);
|
||||
clientid(Name, ClientScope, _Conf) ->
|
||||
emqx_bridge_mqtt_lib:clientid_base([Name, ClientScope]).
|
||||
|
|
|
@ -81,7 +81,7 @@ mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) ->
|
|||
ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}.
|
||||
|
||||
mk_clientid(WorkerId, ClientId) ->
|
||||
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
|
||||
emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId).
|
||||
|
||||
connect(Pid, Name) ->
|
||||
case emqtt:connect(Pid) of
|
||||
|
|
|
@ -81,7 +81,7 @@ mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) ->
|
|||
}.
|
||||
|
||||
mk_clientid(WorkerId, ClientId) ->
|
||||
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
|
||||
emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId).
|
||||
|
||||
mk_client_event_handler(Name, Ingress = #{}) ->
|
||||
IngressVars = maps:with([server], Ingress),
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_mqtt_lib).
|
||||
|
||||
-export([clientid_base/1, bytes23/2]).
|
||||
|
||||
%% @doc Make the base ID of client IDs.
|
||||
%% A base ID is used to concatenate with pool worker ID to build a
|
||||
%% full ID.
|
||||
%% In order to avoid client ID clashing when EMQX is clustered,
|
||||
%% the base ID is the resource name concatenated with
|
||||
%% broker node name SHA-hash and truncated to 8 hex characters.
|
||||
clientid_base(Name) ->
|
||||
bin([Name, shortener(atom_to_list(node()), 8)]).
|
||||
|
||||
%% @doc Limit the number of bytes for client ID under 23 bytes.
|
||||
%% If Prefix and suffix concatenated is longer than 23 bytes
|
||||
%% it hashes the concatenation and replace the non-random suffix.
|
||||
bytes23(Prefix, SeqNo) ->
|
||||
Suffix = integer_to_binary(SeqNo),
|
||||
Concat = bin([Prefix, $:, Suffix]),
|
||||
case size(Concat) =< 23 of
|
||||
true ->
|
||||
Concat;
|
||||
false ->
|
||||
shortener(Concat, 23)
|
||||
end.
|
||||
|
||||
%% @private SHA hash a string and return the prefix of
|
||||
%% the given length as hex string in binary format.
|
||||
shortener(Str, Length) when is_list(Str) ->
|
||||
shortener(bin(Str), Length);
|
||||
shortener(Str, Length) when is_binary(Str) ->
|
||||
true = size(Str) > 0,
|
||||
true = (Length > 0 andalso Length =< 40),
|
||||
Sha = crypto:hash(sha, Str),
|
||||
%% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25
|
||||
Hex = string:lowercase(binary:encode_hex(Sha)),
|
||||
<<UniqueEnough:Length/binary, _/binary>> = Hex,
|
||||
UniqueEnough.
|
||||
|
||||
bin(IoList) ->
|
||||
iolist_to_binary(IoList).
|
|
@ -495,7 +495,6 @@ t_mqtt_conn_bridge_egress(_) ->
|
|||
<<"egress">> => ?EGRESS_CONF
|
||||
}
|
||||
),
|
||||
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||
|
||||
%% we now test if the bridge works as expected
|
||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||
|
@ -510,11 +509,6 @@ t_mqtt_conn_bridge_egress(_) ->
|
|||
#{?snk_kind := buffer_worker_flush_ack}
|
||||
),
|
||||
|
||||
%% we should receive a message on the "remote" broker, with specified topic
|
||||
Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
|
||||
Size = byte_size(ResourceID),
|
||||
?assertMatch(<<ResourceID:Size/binary, _/binary>>, Msg#message.from),
|
||||
|
||||
%% verify the metrics of the bridge
|
||||
?retry(
|
||||
_Interval = 200,
|
||||
|
@ -538,7 +532,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
|||
<<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
|
||||
}
|
||||
),
|
||||
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||
|
||||
%% we now test if the bridge works as expected
|
||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||
|
@ -555,8 +548,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
|||
|
||||
%% we should receive a message on the "remote" broker, with specified topic
|
||||
Msg = assert_mqtt_msg_received(RemoteTopic),
|
||||
%% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
|
||||
?assertMatch(<<ResourceID:(byte_size(ResourceID))/binary, _/binary>>, Msg#message.from),
|
||||
?assertMatch(#{<<"payload">> := Payload}, emqx_utils_json:decode(Msg#message.payload)),
|
||||
|
||||
%% verify the metrics of the bridge
|
||||
|
@ -575,15 +566,29 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
|
|||
|
||||
ok.
|
||||
|
||||
t_egress_custom_clientid_prefix(_Config) ->
|
||||
t_egress_short_clientid(_Config) ->
|
||||
%% Name is short, expect the actual client ID in use is hashed from
|
||||
%% <name>E<nodename-hash>:<pool_worker_id>
|
||||
Name = "abc01234",
|
||||
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]),
|
||||
ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]),
|
||||
test_egress_clientid(Name, ExpectedClientId).
|
||||
|
||||
t_egress_long_clientid(_Config) ->
|
||||
%% Expect the actual client ID in use is hashed from
|
||||
%% <name>E<nodename-hash>:<pool_worker_id>
|
||||
Name = "abc01234567890123456789",
|
||||
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]),
|
||||
ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1),
|
||||
test_egress_clientid(Name, ExpectedClientId).
|
||||
|
||||
test_egress_clientid(Name, ExpectedClientId) ->
|
||||
BridgeIDEgress = create_bridge(
|
||||
?SERVER_CONF#{
|
||||
<<"clientid_prefix">> => <<"my-custom-prefix">>,
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||
<<"egress">> => ?EGRESS_CONF
|
||||
<<"name">> => Name,
|
||||
<<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1}
|
||||
}
|
||||
),
|
||||
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
|
||||
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
|
||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
|
||||
Payload = <<"hello">>,
|
||||
|
@ -592,8 +597,7 @@ t_egress_custom_clientid_prefix(_Config) ->
|
|||
emqx:publish(emqx_message:make(LocalTopic, Payload)),
|
||||
|
||||
Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
|
||||
Size = byte_size(ResourceID),
|
||||
?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, Msg#message.from),
|
||||
?assertEqual(ExpectedClientId, Msg#message.from),
|
||||
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
|
||||
ok.
|
||||
|
@ -843,7 +847,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
|
|||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||
<<"egress">> => ?EGRESS_CONF,
|
||||
<<"resource_opts">> => #{
|
||||
<<"worker_pool_size">> => 2,
|
||||
<<"query_mode">> => <<"sync">>,
|
||||
%% using a long time so we can test recovery
|
||||
<<"request_ttl">> => <<"15s">>,
|
||||
|
@ -949,7 +952,6 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
|
|||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||
<<"egress">> => ?EGRESS_CONF,
|
||||
<<"resource_opts">> => #{
|
||||
<<"worker_pool_size">> => 2,
|
||||
<<"query_mode">> => <<"async">>,
|
||||
%% using a long time so we can test recovery
|
||||
<<"request_ttl">> => <<"15s">>,
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Ensure short client ID for MQTT bridges.
|
Loading…
Reference in New Issue