diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 1e582a81c..e66d97a07 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index f133bf334..92491991f 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -57,6 +57,7 @@ -define(HEALTH_CHECK_TIMEOUT, 1000). -define(INGRESS, "I"). -define(EGRESS, "E"). +-define(IS_NO_PREFIX(P), (P =:= undefined orelse P =:= <<>>)). %% =================================================================== %% When use this bridge as a data source, ?MODULE:on_message_received will be called @@ -441,9 +442,9 @@ ms_to_s(Ms) -> clientid(Name, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) andalso Prefix =/= <<>> -> - emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name]); + {Prefix, emqx_bridge_mqtt_lib:clientid_base(Name)}; clientid(Name, _Conf) -> - emqx_bridge_mqtt_lib:clientid_base([Name]). + {undefined, emqx_bridge_mqtt_lib:clientid_base(Name)}. %% @doc Start an ingress bridge worker. -spec connect([option() | {ecpool_worker_id, pos_integer()}]) -> @@ -481,8 +482,17 @@ mk_client_opts( msg_handler => mk_client_event_handler(Name, TopicToHandlerIndex) }. -mk_clientid(WorkerId, ClientId) -> - emqx_bridge_mqtt_lib:bytes23([ClientId], WorkerId). +mk_clientid(WorkerId, {Prefix, ClientId}) when ?IS_NO_PREFIX(Prefix) -> + %% When there is no prefix, try to keep the client ID length within 23 bytes + emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId); +mk_clientid(WorkerId, {Prefix, ClientId}) when size(Prefix) < 20 -> + %% Try to respect client ID prefix when it's less than 20 bytes + %% meaning there is at least 3 bytes to randomize + %% Must add $: for backward compatibility + emqx_bridge_mqtt_lib:bytes23_with_prefix(Prefix, ClientId, WorkerId); +mk_clientid(WorkerId, {Prefix, ClientId}) -> + %% There is no other option but to use a long client ID + iolist_to_binary([Prefix, ClientId, $:, integer_to_binary(WorkerId)]). mk_client_event_handler(Name, TopicToHandlerIndex) -> #{ diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl index 740775192..12f445cb1 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl @@ -16,7 +16,7 @@ -module(emqx_bridge_mqtt_lib). --export([clientid_base/1, bytes23/2]). +-export([clientid_base/1, bytes23/2, bytes23_with_prefix/3]). %% @doc Make the base ID of client IDs. %% A base ID is used to concatenate with pool worker ID to build a @@ -28,18 +28,29 @@ clientid_base(Name) -> bin([Name, shortener(atom_to_list(node()), 8)]). %% @doc Limit the number of bytes for client ID under 23 bytes. -%% If Prefix and suffix concatenated is longer than 23 bytes +%% If ClientID base and suffix concatenated is longer than 23 bytes %% it hashes the concatenation and replace the non-random suffix. -bytes23(Prefix, SeqNo) -> +bytes23(ClientId, SeqNo) -> + bytes_n(ClientId, SeqNo, 23). + +bytes_n(ClientId, SeqNo, N) -> Suffix = integer_to_binary(SeqNo), - Concat = bin([Prefix, $:, Suffix]), - case size(Concat) =< 23 of + Concat = bin([ClientId, $:, Suffix]), + case size(Concat) =< N of true -> Concat; false -> - shortener(Concat, 23) + shortener(Concat, N) end. +%% @doc Limit the number of bytes for client ID under 23 bytes. +%% If Prefix, ClientID base and suffix concatenated is longer than 23 bytes +%% it hashes the ClientID and SeqNo before appended to the Prefix +bytes23_with_prefix(Prefix, ClientId, SeqNo) when Prefix =/= <<>> -> + SuffixLen = 23 - size(Prefix), + true = (SuffixLen > 0), + bin([Prefix, bytes_n(ClientId, SeqNo, SuffixLen)]). + %% @private SHA hash a string and return the prefix of %% the given length as hex string in binary format. shortener(Str, Length) when is_list(Str) -> diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index d784a5acb..5d4c82ca6 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -568,6 +568,7 @@ t_egress_short_clientid(_Config) -> Name = <<"abc01234">>, BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]), ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]), + ?assertMatch(<<"abc01234", _/binary>>, ExpectedClientId), test_egress_clientid(Name, ExpectedClientId). t_egress_long_clientid(_Config) -> @@ -578,11 +579,34 @@ t_egress_long_clientid(_Config) -> ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1), test_egress_clientid(Name, ExpectedClientId). +t_egress_with_short_prefix(_Config) -> + %% Expect the actual client ID in use is hashed from + %% head(sha1(:), 16) + Prefix = <<"012-">>, + Name = <<"345">>, + BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]), + ExpectedClientId = emqx_bridge_mqtt_lib:bytes23_with_prefix(Prefix, BaseId, 1), + ?assertMatch(<<"012-", _/binary>>, ExpectedClientId), + test_egress_clientid(Name, Prefix, ExpectedClientId). + +t_egress_with_long_prefix(_Config) -> + %% Expect the actual client ID in use is hashed from + %% : + Prefix = <<"0123456789abcdef01234-">>, + Name = <<"345">>, + BaseId = emqx_bridge_mqtt_lib:clientid_base([Name]), + ExpectedClientId = iolist_to_binary([Prefix, BaseId, <<":1">>]), + test_egress_clientid(Name, Prefix, ExpectedClientId). + test_egress_clientid(Name, ExpectedClientId) -> + test_egress_clientid(Name, <<>>, ExpectedClientId). + +test_egress_clientid(Name, ClientIdPrefix, ExpectedClientId) -> BridgeIDEgress = create_bridge( ?SERVER_CONF#{ <<"name">> => Name, - <<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1} + <<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1}, + <<"clientid_prefix">> => ClientIdPrefix } ), LocalTopic = <>,