fix(kafka): avoid ClientId collision between Kafka and Azure bridges

Fixes https://emqx.atlassian.net/browse/EMQX-10860
This commit is contained in:
Paulo Zulato 2023-08-24 12:55:13 -03:00
parent 3aa15556a9
commit 535c7f8b43
5 changed files with 50 additions and 15 deletions

View File

@ -12,6 +12,7 @@
-define(BRIDGE_TYPE, azure_event_hub_producer). -define(BRIDGE_TYPE, azure_event_hub_producer).
-define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>). -define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>).
-define(KAFKA_BRIDGE_TYPE, kafka).
-define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine]). -define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine]).
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
@ -281,3 +282,38 @@ t_sync_query(Config) ->
emqx_bridge_kafka_impl_producer_sync_query emqx_bridge_kafka_impl_producer_sync_query
), ),
ok. ok.
t_same_name_azure_kafka_bridges(AehConfig) ->
ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
BridgeName = ?config(bridge_name, AehConfig),
AehResourceId = emqx_bridge_testlib:resource_id(AehConfig),
TracePoint = emqx_bridge_kafka_impl_producer_sync_query,
%% creates the AEH bridge and check it's working
ok = emqx_bridge_testlib:t_sync_query(
AehConfig,
fun make_message/0,
fun(Res) -> ?assertEqual(ok, Res) end,
TracePoint
),
%% than creates a Kafka bridge with same name and delete it after creation
ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka),
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_bridge:disable_enable(disable, ?KAFKA_BRIDGE_TYPE, BridgeName),
#{?snk_kind := kafka_producer_stopped},
5_000
)
),
% check that AEH bridge is still working
?check_trace(
begin
Message = {send_message, make_message()},
?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)),
ok
end,
fun(Trace) ->
?assertMatch([#{instance_id := AehResourceId}], ?of_kind(TracePoint, Trace))
end
),
ok.

View File

@ -7,7 +7,7 @@
-export([ -export([
hosts/1, hosts/1,
make_client_id/2, make_client_id/1,
sasl/1, sasl/1,
socket_opts/1 socket_opts/1
]). ]).
@ -24,10 +24,11 @@ hosts(Hosts) when is_list(Hosts) ->
kpro:parse_endpoints(Hosts). kpro:parse_endpoints(Hosts).
%% Client ID is better to be unique to make it easier for Kafka side trouble shooting. %% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
make_client_id(KafkaType0, BridgeName0) -> make_client_id(InstanceId) ->
KafkaType = to_bin(KafkaType0), InstanceIdBin0 = to_bin(InstanceId),
BridgeName = to_bin(BridgeName0), % Removing the <<"bridge:">> from beginning for backward compatibility
iolist_to_binary([KafkaType, ":", BridgeName, ":", atom_to_list(node())]). InstanceIdBin = binary:replace(InstanceIdBin0, <<"bridge:">>, <<>>),
iolist_to_binary([InstanceIdBin, ":", atom_to_list(node())]).
sasl(none) -> sasl(none) ->
undefined; undefined;

View File

@ -121,7 +121,6 @@ on_start(ResourceId, Config) ->
#{ #{
authentication := Auth, authentication := Auth,
bootstrap_hosts := BootstrapHosts0, bootstrap_hosts := BootstrapHosts0,
bridge_name := BridgeName,
hookpoint := _, hookpoint := _,
kafka := #{ kafka := #{
max_batch_bytes := _, max_batch_bytes := _,
@ -134,9 +133,8 @@ on_start(ResourceId, Config) ->
topic_mapping := _ topic_mapping := _
} = Config, } = Config,
BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0), BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
KafkaType = kafka_consumer,
%% Note: this is distinct per node. %% Note: this is distinct per node.
ClientID = make_client_id(ResourceId, KafkaType, BridgeName), ClientID = make_client_id(ResourceId),
ClientOpts0 = ClientOpts0 =
case Auth of case Auth of
none -> []; none -> [];
@ -517,11 +515,11 @@ is_dry_run(ResourceId) ->
string:equal(TestIdStart, ResourceId) string:equal(TestIdStart, ResourceId)
end. end.
-spec make_client_id(resource_id(), kafka_consumer, atom() | binary()) -> atom(). -spec make_client_id(resource_id()) -> atom().
make_client_id(ResourceId, KafkaType, KafkaName) -> make_client_id(InstanceId) ->
case is_dry_run(ResourceId) of case is_dry_run(InstanceId) of
false -> false ->
ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName), ClientID0 = emqx_bridge_kafka_impl:make_client_id(InstanceId),
binary_to_atom(ClientID0); binary_to_atom(ClientID0);
true -> true ->
%% It is a dry run and we don't want to leak too many %% It is a dry run and we don't want to leak too many

View File

@ -65,7 +65,7 @@ on_start(InstId, Config) ->
ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId),
_ = maybe_install_wolff_telemetry_handlers(ResourceId), _ = maybe_install_wolff_telemetry_handlers(ResourceId),
Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0),
ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), ClientId = emqx_bridge_kafka_impl:make_client_id(InstId),
ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId),
ClientConfig = #{ ClientConfig = #{
min_metadata_refresh_interval => MinMetaRefreshInterval, min_metadata_refresh_interval => MinMetaRefreshInterval,

View File

@ -898,8 +898,8 @@ ensure_connected(Config) ->
ok. ok.
consumer_clientid(Config) -> consumer_clientid(Config) ->
KafkaName = ?config(kafka_name, Config), ResourceId = resource_id(Config),
binary_to_atom(emqx_bridge_kafka_impl:make_client_id(kafka_consumer, KafkaName)). binary_to_atom(emqx_bridge_kafka_impl:make_client_id(ResourceId)).
get_client_connection(Config) -> get_client_connection(Config) ->
KafkaHost = ?config(kafka_host, Config), KafkaHost = ?config(kafka_host, Config),