fix(bridge/mqtt): respect client ID prefix

This commit is contained in:
zmstone 2024-06-10 10:27:45 +02:00
parent 5da5ce1aae
commit 4347f3de3e
4 changed files with 57 additions and 12 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"},
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{registered, []},
{applications, [
kernel,

View File

@ -57,6 +57,7 @@
-define(HEALTH_CHECK_TIMEOUT, 1000).
-define(INGRESS, "I").
-define(EGRESS, "E").
-define(IS_NO_PREFIX(P), (P =:= undefined orelse P =:= <<>>)).
%% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received will be called
@ -441,9 +442,9 @@ ms_to_s(Ms) ->
clientid(Name, _Conf = #{clientid_prefix := Prefix}) when
is_binary(Prefix) andalso Prefix =/= <<>>
->
emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name]);
{Prefix, emqx_bridge_mqtt_lib:clientid_base(Name)};
clientid(Name, _Conf) ->
emqx_bridge_mqtt_lib:clientid_base([Name]).
{undefined, emqx_bridge_mqtt_lib:clientid_base(Name)}.
%% @doc Start an ingress bridge worker.
-spec connect([option() | {ecpool_worker_id, pos_integer()}]) ->
@ -481,8 +482,17 @@ mk_client_opts(
msg_handler => mk_client_event_handler(Name, TopicToHandlerIndex)
}.
mk_clientid(WorkerId, ClientId) ->
emqx_bridge_mqtt_lib:bytes23([ClientId], WorkerId).
mk_clientid(WorkerId, {Prefix, ClientId}) when ?IS_NO_PREFIX(Prefix) ->
%% When there is no prefix, try to keep the client ID length within 23 bytes
emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId);
mk_clientid(WorkerId, {Prefix, ClientId}) when size(Prefix) < 20 ->
%% Try to respect client ID prefix when it's less than 20 bytes
%% meaning there is at least 3 bytes to randomize
%% Must add $: for backward compatibility
emqx_bridge_mqtt_lib:bytes23_with_prefix(Prefix, ClientId, WorkerId);
mk_clientid(WorkerId, {Prefix, ClientId}) ->
%% There is no other option but to use a long client ID
iolist_to_binary([Prefix, ClientId, $:, integer_to_binary(WorkerId)]).
mk_client_event_handler(Name, TopicToHandlerIndex) ->
#{

View File

@ -16,7 +16,7 @@
-module(emqx_bridge_mqtt_lib).
-export([clientid_base/1, bytes23/2]).
-export([clientid_base/1, bytes23/2, bytes23_with_prefix/3]).
%% @doc Make the base ID of client IDs.
%% A base ID is used to concatenate with pool worker ID to build a
@ -28,18 +28,29 @@ clientid_base(Name) ->
bin([Name, shortener(atom_to_list(node()), 8)]).
%% @doc Limit the number of bytes for client ID under 23 bytes.
%% If Prefix and suffix concatenated is longer than 23 bytes
%% If ClientID base and suffix concatenated is longer than 23 bytes
%% it hashes the concatenation and replace the non-random suffix.
bytes23(Prefix, SeqNo) ->
bytes23(ClientId, SeqNo) ->
bytes_n(ClientId, SeqNo, 23).
bytes_n(ClientId, SeqNo, N) ->
Suffix = integer_to_binary(SeqNo),
Concat = bin([Prefix, $:, Suffix]),
case size(Concat) =< 23 of
Concat = bin([ClientId, $:, Suffix]),
case size(Concat) =< N of
true ->
Concat;
false ->
shortener(Concat, 23)
shortener(Concat, N)
end.
%% @doc Limit the number of bytes for client ID under 23 bytes.
%% If Prefix, ClientID base and suffix concatenated is longer than 23 bytes
%% it hashes the ClientID and SeqNo before appended to the Prefix
bytes23_with_prefix(Prefix, ClientId, SeqNo) when Prefix =/= <<>> ->
SuffixLen = 23 - size(Prefix),
true = (SuffixLen > 0),
bin([Prefix, bytes_n(ClientId, SeqNo, SuffixLen)]).
%% @private SHA hash a string and return the prefix of
%% the given length as hex string in binary format.
shortener(Str, Length) when is_list(Str) ->

View File

@ -568,6 +568,7 @@ t_egress_short_clientid(_Config) ->
Name = <<"abc01234">>,
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]),
ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]),
?assertMatch(<<"abc01234", _/binary>>, ExpectedClientId),
test_egress_clientid(Name, ExpectedClientId).
t_egress_long_clientid(_Config) ->
@ -578,11 +579,34 @@ t_egress_long_clientid(_Config) ->
ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1),
test_egress_clientid(Name, ExpectedClientId).
t_egress_with_short_prefix(_Config) ->
%% Expect the actual client ID in use is hashed from
%% <prefix>head(sha1(<name><nodename-hash>:<pool_worker_id>), 16)
Prefix = <<"012-">>,
Name = <<"345">>,
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]),
ExpectedClientId = emqx_bridge_mqtt_lib:bytes23_with_prefix(Prefix, BaseId, 1),
?assertMatch(<<"012-", _/binary>>, ExpectedClientId),
test_egress_clientid(Name, Prefix, ExpectedClientId).
t_egress_with_long_prefix(_Config) ->
%% Expect the actual client ID in use is hashed from
%% <prefix><name><nodename-hash>:<pool_worker_id>
Prefix = <<"0123456789abcdef01234-">>,
Name = <<"345">>,
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]),
ExpectedClientId = iolist_to_binary([Prefix, BaseId, <<":1">>]),
test_egress_clientid(Name, Prefix, ExpectedClientId).
test_egress_clientid(Name, ExpectedClientId) ->
test_egress_clientid(Name, <<>>, ExpectedClientId).
test_egress_clientid(Name, ClientIdPrefix, ExpectedClientId) ->
BridgeIDEgress = create_bridge(
?SERVER_CONF#{
<<"name">> => Name,
<<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1}
<<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1},
<<"clientid_prefix">> => ClientIdPrefix
}
),
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,