diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index c96f93d8e..0115edc91 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -32,10 +32,10 @@ %% `apps/emqx/src/bpapi/README.md' %% Opensource edition --define(EMQX_RELEASE_CE, "5.4.0"). +-define(EMQX_RELEASE_CE, "5.4.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.4.0"). +-define(EMQX_RELEASE_EE, "5.4.1"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/src/emqx_passwd.erl b/apps/emqx/src/emqx_passwd.erl index 1232dfcb4..29067244f 100644 --- a/apps/emqx/src/emqx_passwd.erl +++ b/apps/emqx/src/emqx_passwd.erl @@ -141,4 +141,5 @@ pbkdf2(MacFun, Password, Salt, Iterations, DKLength) -> pbkdf2:pbkdf2(MacFun, Password, Salt, Iterations, DKLength). hex(X) when is_binary(X) -> - pbkdf2:to_hex(X). + %% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25 + string:lowercase(binary:encode_hex(X)). diff --git a/apps/emqx_bridge_mqtt/rebar.config b/apps/emqx_bridge_mqtt/rebar.config index 35ccc1a37..189ba970d 100644 --- a/apps/emqx_bridge_mqtt/rebar.config +++ b/apps/emqx_bridge_mqtt/rebar.config @@ -1,3 +1,4 @@ {deps, [ - {emqx, {path, "../../apps/emqx"}} + {emqx, {path, "../../apps/emqx"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}} ]}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 716626bdf..e6fe78ab8 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 61e9353ce..cc2296d3c 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -17,6 +17,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -behaviour(emqx_resource). @@ -35,6 +36,8 @@ -export([on_async_result/2]). -define(HEALTH_CHECK_TIMEOUT, 1000). +-define(INGRESS, "I"). +-define(EGRESS, "E"). %% =================================================================== %% When use this bridge as a data source, ?MODULE:on_message_received will be called @@ -66,7 +69,7 @@ on_start(ResourceId, Conf) -> end. start_ingress(ResourceId, Conf) -> - ClientOpts = mk_client_opts(ResourceId, "ingress", Conf), + ClientOpts = mk_client_opts(ResourceId, ?INGRESS, Conf), case mk_ingress_config(ResourceId, Conf) of Ingress = #{} -> start_ingress(ResourceId, Ingress, ClientOpts); @@ -91,6 +94,8 @@ start_ingress(ResourceId, Ingress, ClientOpts) -> {error, Reason} end. +choose_ingress_pool_size(<>, _) -> + 1; choose_ingress_pool_size( ResourceId, #{remote := #{topic := RemoteTopic}, pool_size := PoolSize} @@ -119,7 +124,7 @@ start_egress(ResourceId, Conf) -> % NOTE % We are ignoring the user configuration here because there's currently no reliable way % to ensure proper session recovery according to the MQTT spec. - ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)), + ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, ?EGRESS, Conf)), case mk_egress_config(Conf) of Egress = #{} -> start_egress(ResourceId, Egress, ClientOpts); @@ -326,9 +331,10 @@ mk_client_opts( ], Config ), + Name = parse_id_to_name(ResourceId), mk_client_opt_password(Options#{ hosts => [HostPort], - clientid => clientid(ResourceId, ClientScope, Config), + clientid => clientid(Name, ClientScope, Config), connect_timeout => 30, keepalive => ms_to_s(KeepAlive), force_ping => true, @@ -336,6 +342,12 @@ mk_client_opts( ssl_opts => maps:to_list(maps:remove(enable, Ssl)) }). +parse_id_to_name(<>) -> + Name; +parse_id_to_name(Id) -> + {_Type, Name} = emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}), + Name. + mk_client_opt_password(Options = #{password := Secret}) -> %% TODO: Teach `emqtt` to accept 0-arity closures as passwords. Options#{password := emqx_secret:unwrap(Secret)}; @@ -345,7 +357,9 @@ mk_client_opt_password(Options) -> ms_to_s(Ms) -> erlang:ceil(Ms / 1000). -clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) -> - iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]); -clientid(Id, ClientScope, _Conf) -> - iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]). +clientid(Name, ClientScope, _Conf = #{clientid_prefix := Prefix}) when + is_binary(Prefix) andalso Prefix =/= <<>> +-> + emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name, ClientScope]); +clientid(Name, ClientScope, _Conf) -> + emqx_bridge_mqtt_lib:clientid_base([Name, ClientScope]). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl index a9415294c..2573cad8b 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -81,7 +81,7 @@ mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) -> ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}. mk_clientid(WorkerId, ClientId) -> - iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId). connect(Pid, Name) -> case emqtt:connect(Pid) of diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 3174cdb6f..a051ffbd8 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -81,7 +81,7 @@ mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) -> }. mk_clientid(WorkerId, ClientId) -> - iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId). mk_client_event_handler(Name, Ingress = #{}) -> IngressVars = maps:with([server], Ingress), diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl new file mode 100644 index 000000000..c5f763e6c --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_lib.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_lib). + +-export([clientid_base/1, bytes23/2]). + +%% @doc Make the base ID of client IDs. +%% A base ID is used to concatenate with pool worker ID to build a +%% full ID. +%% In order to avoid client ID clashing when EMQX is clustered, +%% the base ID is the resource name concatenated with +%% broker node name SHA-hash and truncated to 8 hex characters. +clientid_base(Name) -> + bin([Name, shortener(atom_to_list(node()), 8)]). + +%% @doc Limit the number of bytes for client ID under 23 bytes. +%% If Prefix and suffix concatenated is longer than 23 bytes +%% it hashes the concatenation and replace the non-random suffix. +bytes23(Prefix, SeqNo) -> + Suffix = integer_to_binary(SeqNo), + Concat = bin([Prefix, $:, Suffix]), + case size(Concat) =< 23 of + true -> + Concat; + false -> + shortener(Concat, 23) + end. + +%% @private SHA hash a string and return the prefix of +%% the given length as hex string in binary format. +shortener(Str, Length) when is_list(Str) -> + shortener(bin(Str), Length); +shortener(Str, Length) when is_binary(Str) -> + true = size(Str) > 0, + true = (Length > 0 andalso Length =< 40), + Sha = crypto:hash(sha, Str), + %% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25 + Hex = string:lowercase(binary:encode_hex(Sha)), + <> = Hex, + UniqueEnough. + +bin(IoList) -> + iolist_to_binary(IoList). diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index bde546bd0..e887a98f4 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -495,7 +495,6 @@ t_mqtt_conn_bridge_egress(_) -> <<"egress">> => ?EGRESS_CONF } ), - ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected LocalTopic = <>, @@ -510,11 +509,6 @@ t_mqtt_conn_bridge_egress(_) -> #{?snk_kind := buffer_worker_flush_ack} ), - %% we should receive a message on the "remote" broker, with specified topic - Msg = assert_mqtt_msg_received(RemoteTopic, Payload), - Size = byte_size(ResourceID), - ?assertMatch(<>, Msg#message.from), - %% verify the metrics of the bridge ?retry( _Interval = 200, @@ -538,7 +532,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE } ), - ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected LocalTopic = <>, @@ -555,8 +548,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> %% we should receive a message on the "remote" broker, with specified topic Msg = assert_mqtt_msg_received(RemoteTopic), - %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. - ?assertMatch(<>, Msg#message.from), ?assertMatch(#{<<"payload">> := Payload}, emqx_utils_json:decode(Msg#message.payload)), %% verify the metrics of the bridge @@ -575,15 +566,29 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> ok. -t_egress_custom_clientid_prefix(_Config) -> +t_egress_short_clientid(_Config) -> + %% Name is short, expect the actual client ID in use is hashed from + %% E: + Name = "abc01234", + BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]), + ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]), + test_egress_clientid(Name, ExpectedClientId). + +t_egress_long_clientid(_Config) -> + %% Expect the actual client ID in use is hashed from + %% E: + Name = "abc01234567890123456789", + BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]), + ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1), + test_egress_clientid(Name, ExpectedClientId). + +test_egress_clientid(Name, ExpectedClientId) -> BridgeIDEgress = create_bridge( ?SERVER_CONF#{ - <<"clientid_prefix">> => <<"my-custom-prefix">>, - <<"name">> => ?BRIDGE_NAME_EGRESS, - <<"egress">> => ?EGRESS_CONF + <<"name">> => Name, + <<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1} } ), - ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), LocalTopic = <>, RemoteTopic = <>, Payload = <<"hello">>, @@ -592,8 +597,7 @@ t_egress_custom_clientid_prefix(_Config) -> emqx:publish(emqx_message:make(LocalTopic, Payload)), Msg = assert_mqtt_msg_received(RemoteTopic, Payload), - Size = byte_size(ResourceID), - ?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, Msg#message.from), + ?assertEqual(ExpectedClientId, Msg#message.from), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), ok. @@ -843,7 +847,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF, <<"resource_opts">> => #{ - <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, %% using a long time so we can test recovery <<"request_ttl">> => <<"15s">>, @@ -949,7 +952,6 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF, <<"resource_opts">> => #{ - <<"worker_pool_size">> => 2, <<"query_mode">> => <<"async">>, %% using a long time so we can test recovery <<"request_ttl">> => <<"15s">>, diff --git a/changes/ce/fix-12236.en.md b/changes/ce/fix-12236.en.md new file mode 100644 index 000000000..1cb763488 --- /dev/null +++ b/changes/ce/fix-12236.en.md @@ -0,0 +1 @@ +Ensure short client ID for MQTT bridges.