Merge pull request #12236 from zmstone/1227-ensure-short-mqtt-bridge-clientid

fix(bridge/mqtt): ensure short clientid
This commit is contained in:
Zaiming (Stone) Shi 2023-12-28 16:06:09 +01:00 committed by GitHub
commit 2b61f5290d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 108 additions and 32 deletions

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md' %% `apps/emqx/src/bpapi/README.md'
%% Opensource edition %% Opensource edition
-define(EMQX_RELEASE_CE, "5.4.0"). -define(EMQX_RELEASE_CE, "5.4.1").
%% Enterprise edition %% Enterprise edition
-define(EMQX_RELEASE_EE, "5.4.0"). -define(EMQX_RELEASE_EE, "5.4.1").
%% The HTTP API version %% The HTTP API version
-define(EMQX_API_VERSION, "5.0"). -define(EMQX_API_VERSION, "5.0").

View File

@ -141,4 +141,5 @@ pbkdf2(MacFun, Password, Salt, Iterations, DKLength) ->
pbkdf2:pbkdf2(MacFun, Password, Salt, Iterations, DKLength). pbkdf2:pbkdf2(MacFun, Password, Salt, Iterations, DKLength).
hex(X) when is_binary(X) -> hex(X) when is_binary(X) ->
pbkdf2:to_hex(X). %% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25
string:lowercase(binary:encode_hex(X)).

View File

@ -1,3 +1,4 @@
{deps, [ {deps, [
{emqx, {path, "../../apps/emqx"}} {emqx, {path, "../../apps/emqx"}},
{emqx_resource, {path, "../../apps/emqx_resource"}}
]}. ]}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [ {application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"}, {description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -17,6 +17,7 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-behaviour(emqx_resource). -behaviour(emqx_resource).
@ -35,6 +36,8 @@
-export([on_async_result/2]). -export([on_async_result/2]).
-define(HEALTH_CHECK_TIMEOUT, 1000). -define(HEALTH_CHECK_TIMEOUT, 1000).
-define(INGRESS, "I").
-define(EGRESS, "E").
%% =================================================================== %% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received will be called %% When use this bridge as a data source, ?MODULE:on_message_received will be called
@ -66,7 +69,7 @@ on_start(ResourceId, Conf) ->
end. end.
start_ingress(ResourceId, Conf) -> start_ingress(ResourceId, Conf) ->
ClientOpts = mk_client_opts(ResourceId, "ingress", Conf), ClientOpts = mk_client_opts(ResourceId, ?INGRESS, Conf),
case mk_ingress_config(ResourceId, Conf) of case mk_ingress_config(ResourceId, Conf) of
Ingress = #{} -> Ingress = #{} ->
start_ingress(ResourceId, Ingress, ClientOpts); start_ingress(ResourceId, Ingress, ClientOpts);
@ -91,6 +94,8 @@ start_ingress(ResourceId, Ingress, ClientOpts) ->
{error, Reason} {error, Reason}
end. end.
choose_ingress_pool_size(<<?TEST_ID_PREFIX, _/binary>>, _) ->
1;
choose_ingress_pool_size( choose_ingress_pool_size(
ResourceId, ResourceId,
#{remote := #{topic := RemoteTopic}, pool_size := PoolSize} #{remote := #{topic := RemoteTopic}, pool_size := PoolSize}
@ -119,7 +124,7 @@ start_egress(ResourceId, Conf) ->
% NOTE % NOTE
% We are ignoring the user configuration here because there's currently no reliable way % We are ignoring the user configuration here because there's currently no reliable way
% to ensure proper session recovery according to the MQTT spec. % to ensure proper session recovery according to the MQTT spec.
ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)), ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, ?EGRESS, Conf)),
case mk_egress_config(Conf) of case mk_egress_config(Conf) of
Egress = #{} -> Egress = #{} ->
start_egress(ResourceId, Egress, ClientOpts); start_egress(ResourceId, Egress, ClientOpts);
@ -326,9 +331,10 @@ mk_client_opts(
], ],
Config Config
), ),
Name = parse_id_to_name(ResourceId),
mk_client_opt_password(Options#{ mk_client_opt_password(Options#{
hosts => [HostPort], hosts => [HostPort],
clientid => clientid(ResourceId, ClientScope, Config), clientid => clientid(Name, ClientScope, Config),
connect_timeout => 30, connect_timeout => 30,
keepalive => ms_to_s(KeepAlive), keepalive => ms_to_s(KeepAlive),
force_ping => true, force_ping => true,
@ -336,6 +342,12 @@ mk_client_opts(
ssl_opts => maps:to_list(maps:remove(enable, Ssl)) ssl_opts => maps:to_list(maps:remove(enable, Ssl))
}). }).
parse_id_to_name(<<?TEST_ID_PREFIX, Name/binary>>) ->
Name;
parse_id_to_name(Id) ->
{_Type, Name} = emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}),
Name.
mk_client_opt_password(Options = #{password := Secret}) -> mk_client_opt_password(Options = #{password := Secret}) ->
%% TODO: Teach `emqtt` to accept 0-arity closures as passwords. %% TODO: Teach `emqtt` to accept 0-arity closures as passwords.
Options#{password := emqx_secret:unwrap(Secret)}; Options#{password := emqx_secret:unwrap(Secret)};
@ -345,7 +357,9 @@ mk_client_opt_password(Options) ->
ms_to_s(Ms) -> ms_to_s(Ms) ->
erlang:ceil(Ms / 1000). erlang:ceil(Ms / 1000).
clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) -> clientid(Name, ClientScope, _Conf = #{clientid_prefix := Prefix}) when
iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]); is_binary(Prefix) andalso Prefix =/= <<>>
clientid(Id, ClientScope, _Conf) -> ->
iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]). emqx_bridge_mqtt_lib:clientid_base([Prefix, $:, Name, ClientScope]);
clientid(Name, ClientScope, _Conf) ->
emqx_bridge_mqtt_lib:clientid_base([Name, ClientScope]).

View File

@ -81,7 +81,7 @@ mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) ->
ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}. ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}.
mk_clientid(WorkerId, ClientId) -> mk_clientid(WorkerId, ClientId) ->
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId).
connect(Pid, Name) -> connect(Pid, Name) ->
case emqtt:connect(Pid) of case emqtt:connect(Pid) of

View File

@ -81,7 +81,7 @@ mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) ->
}. }.
mk_clientid(WorkerId, ClientId) -> mk_clientid(WorkerId, ClientId) ->
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). emqx_bridge_mqtt_lib:bytes23(ClientId, WorkerId).
mk_client_event_handler(Name, Ingress = #{}) -> mk_client_event_handler(Name, Ingress = #{}) ->
IngressVars = maps:with([server], Ingress), IngressVars = maps:with([server], Ingress),

View File

@ -0,0 +1,57 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_mqtt_lib).
-export([clientid_base/1, bytes23/2]).
%% @doc Make the base ID of client IDs.
%% A base ID is used to concatenate with pool worker ID to build a
%% full ID.
%% In order to avoid client ID clashing when EMQX is clustered,
%% the base ID is the resource name concatenated with
%% broker node name SHA-hash and truncated to 8 hex characters.
clientid_base(Name) ->
bin([Name, shortener(atom_to_list(node()), 8)]).
%% @doc Limit the number of bytes for client ID under 23 bytes.
%% If Prefix and suffix concatenated is longer than 23 bytes
%% it hashes the concatenation and replace the non-random suffix.
bytes23(Prefix, SeqNo) ->
Suffix = integer_to_binary(SeqNo),
Concat = bin([Prefix, $:, Suffix]),
case size(Concat) =< 23 of
true ->
Concat;
false ->
shortener(Concat, 23)
end.
%% @private SHA hash a string and return the prefix of
%% the given length as hex string in binary format.
shortener(Str, Length) when is_list(Str) ->
shortener(bin(Str), Length);
shortener(Str, Length) when is_binary(Str) ->
true = size(Str) > 0,
true = (Length > 0 andalso Length =< 40),
Sha = crypto:hash(sha, Str),
%% TODO: change to binary:encode_hex(X, lowercase) when OTP version is always > 25
Hex = string:lowercase(binary:encode_hex(Sha)),
<<UniqueEnough:Length/binary, _/binary>> = Hex,
UniqueEnough.
bin(IoList) ->
iolist_to_binary(IoList).

View File

@ -495,7 +495,6 @@ t_mqtt_conn_bridge_egress(_) ->
<<"egress">> => ?EGRESS_CONF <<"egress">> => ?EGRESS_CONF
} }
), ),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>, LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
@ -510,11 +509,6 @@ t_mqtt_conn_bridge_egress(_) ->
#{?snk_kind := buffer_worker_flush_ack} #{?snk_kind := buffer_worker_flush_ack}
), ),
%% we should receive a message on the "remote" broker, with specified topic
Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
Size = byte_size(ResourceID),
?assertMatch(<<ResourceID:Size/binary, _/binary>>, Msg#message.from),
%% verify the metrics of the bridge %% verify the metrics of the bridge
?retry( ?retry(
_Interval = 200, _Interval = 200,
@ -538,7 +532,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
<<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE
} }
), ),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>, LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
@ -555,8 +548,6 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
%% we should receive a message on the "remote" broker, with specified topic %% we should receive a message on the "remote" broker, with specified topic
Msg = assert_mqtt_msg_received(RemoteTopic), Msg = assert_mqtt_msg_received(RemoteTopic),
%% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here.
?assertMatch(<<ResourceID:(byte_size(ResourceID))/binary, _/binary>>, Msg#message.from),
?assertMatch(#{<<"payload">> := Payload}, emqx_utils_json:decode(Msg#message.payload)), ?assertMatch(#{<<"payload">> := Payload}, emqx_utils_json:decode(Msg#message.payload)),
%% verify the metrics of the bridge %% verify the metrics of the bridge
@ -575,15 +566,29 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) ->
ok. ok.
t_egress_custom_clientid_prefix(_Config) -> t_egress_short_clientid(_Config) ->
%% Name is short, expect the actual client ID in use is hashed from
%% <name>E<nodename-hash>:<pool_worker_id>
Name = "abc01234",
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]),
ExpectedClientId = iolist_to_binary([BaseId, $:, "1"]),
test_egress_clientid(Name, ExpectedClientId).
t_egress_long_clientid(_Config) ->
%% Expect the actual client ID in use is hashed from
%% <name>E<nodename-hash>:<pool_worker_id>
Name = "abc01234567890123456789",
BaseId = emqx_bridge_mqtt_lib:clientid_base([Name, "E"]),
ExpectedClientId = emqx_bridge_mqtt_lib:bytes23(BaseId, 1),
test_egress_clientid(Name, ExpectedClientId).
test_egress_clientid(Name, ExpectedClientId) ->
BridgeIDEgress = create_bridge( BridgeIDEgress = create_bridge(
?SERVER_CONF#{ ?SERVER_CONF#{
<<"clientid_prefix">> => <<"my-custom-prefix">>, <<"name">> => Name,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => (?EGRESS_CONF)#{<<"pool_size">> => 1}
<<"egress">> => ?EGRESS_CONF
} }
), ),
ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>, LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>, RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
Payload = <<"hello">>, Payload = <<"hello">>,
@ -592,8 +597,7 @@ t_egress_custom_clientid_prefix(_Config) ->
emqx:publish(emqx_message:make(LocalTopic, Payload)), emqx:publish(emqx_message:make(LocalTopic, Payload)),
Msg = assert_mqtt_msg_received(RemoteTopic, Payload), Msg = assert_mqtt_msg_received(RemoteTopic, Payload),
Size = byte_size(ResourceID), ?assertEqual(ExpectedClientId, Msg#message.from),
?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, Msg#message.from),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
ok. ok.
@ -843,7 +847,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF, <<"egress">> => ?EGRESS_CONF,
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
%% using a long time so we can test recovery %% using a long time so we can test recovery
<<"request_ttl">> => <<"15s">>, <<"request_ttl">> => <<"15s">>,
@ -949,7 +952,6 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF, <<"egress">> => ?EGRESS_CONF,
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"query_mode">> => <<"async">>, <<"query_mode">> => <<"async">>,
%% using a long time so we can test recovery %% using a long time so we can test recovery
<<"request_ttl">> => <<"15s">>, <<"request_ttl">> => <<"15s">>,

View File

@ -0,0 +1 @@
Ensure short client ID for MQTT bridges.