From 4669c4d552a43f90434f81053ecde23534ea83ed Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 27 Dec 2023 17:47:51 +0100 Subject: [PATCH] fix(bridge/mqtt): ensure short clientid Some mqtt brokers do not allow long client IDs. To make it compatible with this limitation, this commit tries to limit the number of bytes under 23 with a best-effort attempt to derive it from the bridge name. --- apps/emqx/include/emqx_release.hrl | 4 +- apps/emqx/src/emqx.app.src | 2 +- apps/emqx/src/emqx_passwd.erl | 3 +- apps/emqx_bridge_mqtt/rebar.config | 3 +- .../src/emqx_bridge_mqtt.app.src | 2 +- .../src/emqx_bridge_mqtt_connector.erl | 28 ++++++--- .../src/emqx_bridge_mqtt_egress.erl | 2 +- .../src/emqx_bridge_mqtt_ingress.erl | 2 +- .../src/emqx_bridge_mqtt_lib.erl | 57 +++++++++++++++++++ .../test/emqx_bridge_mqtt_SUITE.erl | 38 +++++++------ changes/ce/fix-12236.en.md | 1 + 11 files changed, 109 insertions(+), 33 deletions(-) create mode 100644 apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl create mode 100644 changes/ce/fix-12236.en.md diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index c96f93d8e..0115edc91 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,10 +32,10 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.4.0"). +-define(EMQX_RELEASE_CE, "5.4.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.4.0"). +-define(EMQX_RELEASE_EE, "5.4.1"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/src/emqx.app.src b/apps/emqx/src/emqx.app.src index f837161be..4dd2fca24 100644 --- a/apps/emqx/src/emqx.app.src +++ b/apps/emqx/src/emqx.app.src @@ -2,7 +2,7 @@ {application, emqx, [ {id, "emqx"}, {description, "EMQX Core"}, - {vsn, "5.1.17"}, + {vsn, "5.1.18"}, {modules, []}, {registered, []}, {applications, [ diff --git a/apps/emqx/src/emqx_passwd.erl b/apps/emqx/src/emqx_passwd.erl index 1232dfcb4..29067244f 100644 --- a/apps/emqx/src/emqx_passwd.erl +++ b/apps/emqx/src/emqx_passwd.erl @@ -141,4 +141,5 @@ pbkdf2(MacFun, Password, Salt, Iterations, DKLength) -> pbkdf2:pbkdf2(MacFun, Password, Salt, Iterations, DKLength). hex(X) when is_binary(X) -> - pbkdf2:to_hex(X). + %% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25 + string:lowercase(binary:encode_hex(X)). diff --git a/apps/emqx_bridge_mqtt/rebar.config b/apps/emqx_bridge_mqtt/rebar.config index 35ccc1a37..189ba970d 100644 --- a/apps/emqx_bridge_mqtt/rebar.config +++ b/apps/emqx_bridge_mqtt/rebar.config @@ -1,3 +1,4 @@ {deps, [ - {emqx, {path, "../../apps/emqx"}} + {emqx, {path, "../../apps/emqx"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}} ]}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 716626bdf..e6fe78ab8 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 61e9353ce..cc2296d3c 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -17,6 +17,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -behaviour(emqx_resource). @@ -35,6 +36,8 @@ -export([on_async_result/2]). -define(HEALTH_CHECK_TIMEOUT, 1000). +-define(INGRESS, "I"). +-define(EGRESS, "E"). %% =================================================================== %% When use this bridge as a data source, ?MODULE:on_message_received will be called @@ -66,7 +69,7 @@ on_start(ResourceId, Conf) -> end. start_ingress(ResourceId, Conf) -> - ClientOpts = mk_client_opts(ResourceId, "ingress", Conf), + ClientOpts = mk_client_opts(ResourceId, ?INGRESS, Conf), case mk_ingress_config(ResourceId, Conf) of Ingress = #{} -> start_ingress(ResourceId, Ingress, ClientOpts); @@ -91,6 +94,8 @@ start_ingress(ResourceId, Ingress, ClientOpts) -> {error, Reason} end. +choose_ingress_pool_size(<>, _) -> + 1; choose_ingress_pool_size( ResourceId, #{remote := #{topic := RemoteTopic}, pool_size := PoolSize} @@ -119,7 +124,7 @@ start_egress(ResourceId, Conf) -> % NOTE % We are ignoring the user configuration here because there's currently no reliable way % to ensure proper session recovery according to the MQTT spec. - ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)), + ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, ?EGRESS, Conf)), case mk_egress_config(Conf) of Egress = #{} -> start_egress(ResourceId, Egress, ClientOpts); @@ -326,9 +331,10 @@ mk_client_opts( ], Config ), + Name = parse_id_to_name(ResourceId), mk_client_opt_password(Options#{ hosts => [HostPort], - clientid => clientid(ResourceId, ClientScope, Config), + clientid => clientid(Name, ClientScope, Config), connect_timeout => 30, keepalive => ms_to_s(KeepAlive), force_ping => true, @@ -336,6 +342,12 @@ mk_client_opts( ssl_opts => maps:to_list(maps:remove(enable, Ssl)) }). +parse_id_to_name(<>) -> + Name; +parse_id_to_name(Id) -> + {_Type, Name} = emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}), + Name. + mk_client_opt_password(Options = #{password := Secret}) -> %% TODO: Teach `emqtt` to accept 0-arity closures as passwords. Options#{password := emqx_secret:unwrap(Secret)}; @@ -345,7 +357,9 @@ mk_client_opt_password(Options) -> ms_to_s(Ms) -> erlang:ceil(Ms / 1000). -clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) -> - iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]); -clientid(Id, ClientScope, _Conf) -> - iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]). +clientid(Name, ClientScope, _Conf = #{clientid_prefix := Prefix}) when + is_binary(Prefix) andalso Prefix =/= <<>> +-> + emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name, ClientScope]); +clientid(Name, ClientScope, _Conf) -> + emqx_bridge_mqtt_lib:clientid_base([Name, ClientScope]). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index a9415294c..2573cad8b 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -81,7 +81,7 @@ mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) -> ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}. mk_clientid(WorkerId, ClientId) -> - iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId). connect(Pid, Name) -> case emqtt:connect(Pid) of diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 3174cdb6f..a051ffbd8 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -81,7 +81,7 @@ mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) -> }. mk_clientid(WorkerId, ClientId) -> - iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId). mk_client_event_handler(Name, Ingress = #{}) -> IngressVars = maps:with([server], Ingress), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl new file mode 100644 index 000000000..c5f763e6c --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_lib). + +-export([clientid_base/1, bytes23/2]). + +%% @doc Make the base ID of client IDs. +%% A base ID is used to concatenate with pool worker ID to build a +%% full ID. +%% In order to avoid client ID clashing when EMQX is clustered, +%% the base ID is the resource name concatenated with +%% broker node name SHA-hash and truncated to 8 hex characters. +clientid_base(Name) -> + bin([Name, shortener(atom_to_list(node()), 8)]). + +%% @doc Limit the number of bytes for client ID under 23 bytes. +%% If Prefix and suffix concatenated is longer than 23 bytes +%% it hashes the concatenation and replace the non-random suffix. +bytes23(Prefix, SeqNo) -> + Suffix = integer_to_binary(SeqNo), + Concat = bin([Prefix, $:, Suffix]), + case size(Concat) =< 23 of + true -> + Concat; + false -> + shortener(Concat, 23) + end. + +%% @private SHA hash a string and return the prefix of +%% the given length as hex string in binary format. +shortener(Str, Length) when is_list(Str) -> + shortener(bin(Str), Length); +shortener(Str, Length) when is_binary(Str) -> + true = size(Str) > 0, + true = (Length > 0 andalso Length =< 40), + Sha = crypto:hash(sha, Str), + %% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25 + Hex = string:lowercase(binary:encode_hex(Sha)), + <> = Hex, + UniqueEnough. + +bin(IoList) -> + iolist_to_binary(IoList). diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index bde546bd0..e887a98f4 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -495,7 +495,6 @@ t_mqtt_conn_bridge_egress(_) -> <<"egress">> => ?EGRESS_CONF } ), - ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected LocalTopic = <>, @@ -510,11 +509,6 @@ t_mqtt_conn_bridge_egress(_) -> #{?snk_kind := buffer_worker_flush_ack} ), - %% we should receive a message on the "remote" broker, with specified topic - Msg = assert_mqtt_msg_received(RemoteTopic, Payload), - Size = byte_size(ResourceID), - ?assertMatch(<>, Msg#message.from), - %% verify the metrics of the bridge ?retry( _Interval = 200, @@ -538,7 +532,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE } ), - ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected LocalTopic = <>, @@ -555,8 +548,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> %% we should receive a message on the "remote" broker, with specified topic Msg = assert_mqtt_msg_received(RemoteTopic), - %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. - ?assertMatch(<>, Msg#message.from), ?assertMatch(#{<<"payload">> := Payload}, emqx_utils_json:decode(Msg#message.payload)), %% verify the metrics of the bridge @@ -575,15 +566,29 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> ok. -t_egress_custom_clientid_prefix(_Config) -> +t_egress_short_clientid(_Config) -> + %% Name is short, expect the actual client ID in use is hashed from + %% E: + Name = "abc01234", + BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]), + ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]), + test_egress_clientid(Name, ExpectedClientId). + +t_egress_long_clientid(_Config) -> + %% Expect the actual client ID in use is hashed from + %% E: + Name = "abc01234567890123456789", + BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]), + ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1), + test_egress_clientid(Name, ExpectedClientId). + +test_egress_clientid(Name, ExpectedClientId) -> BridgeIDEgress = create_bridge( ?SERVER_CONF#{ - <<"clientid_prefix">> => <<"my-custom-prefix">>, - <<"name">> => ?BRIDGE_NAME_EGRESS, - <<"egress">> => ?EGRESS_CONF + <<"name">> => Name, + <<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1} } ), - ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), LocalTopic = <>, RemoteTopic = <>, Payload = <<"hello">>, @@ -592,8 +597,7 @@ t_egress_custom_clientid_prefix(_Config) -> emqx:publish(emqx_message:make(LocalTopic, Payload)), Msg = assert_mqtt_msg_received(RemoteTopic, Payload), - Size = byte_size(ResourceID), - ?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, Msg#message.from), + ?assertEqual(ExpectedClientId, Msg#message.from), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), ok. @@ -843,7 +847,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF, <<"resource_opts">> => #{ - <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, %% using a long time so we can test recovery <<"request_ttl">> => <<"15s">>, @@ -949,7 +952,6 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF, <<"resource_opts">> => #{ - <<"worker_pool_size">> => 2, <<"query_mode">> => <<"async">>, %% using a long time so we can test recovery <<"request_ttl">> => <<"15s">>, diff --git a/changes/ce/fix-12236.en.md b/changes/ce/fix-12236.en.md new file mode 100644 index 000000000..1cb763488 --- /dev/null +++ b/changes/ce/fix-12236.en.md @@ -0,0 +1 @@ +Ensure short client ID for MQTT bridges.