diff --git a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl index af4b87718..65b0d54aa 100644 --- a/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl +++ b/apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl @@ -12,6 +12,7 @@ -define(BRIDGE_TYPE, azure_event_hub_producer). -define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>). +-define(KAFKA_BRIDGE_TYPE, kafka). -define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine]). -import(emqx_common_test_helpers, [on_exit/1]). @@ -281,3 +282,38 @@ t_sync_query(Config) -> emqx_bridge_kafka_impl_producer_sync_query ), ok. + +t_same_name_azure_kafka_bridges(AehConfig) -> + ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}), + BridgeName = ?config(bridge_name, AehConfig), + AehResourceId = emqx_bridge_testlib:resource_id(AehConfig), + TracePoint = emqx_bridge_kafka_impl_producer_sync_query, + %% creates the AEH bridge and check it's working + ok = emqx_bridge_testlib:t_sync_query( + AehConfig, + fun make_message/0, + fun(Res) -> ?assertEqual(ok, Res) end, + TracePoint + ), + %% than creates a Kafka bridge with same name and delete it after creation + ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka), + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge:disable_enable(disable, ?KAFKA_BRIDGE_TYPE, BridgeName), + #{?snk_kind := kafka_producer_stopped}, + 5_000 + ) + ), + % check that AEH bridge is still working + ?check_trace( + begin + Message = {send_message, make_message()}, + ?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)), + ok + end, + fun(Trace) -> + ?assertMatch([#{instance_id := AehResourceId}], ?of_kind(TracePoint, Trace)) + end + ), + ok. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl index bbdb4f3c7..64ac8cf99 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl.erl @@ -7,7 +7,7 @@ -export([ hosts/1, - make_client_id/2, + make_client_id/1, sasl/1, socket_opts/1 ]). @@ -24,10 +24,11 @@ hosts(Hosts) when is_list(Hosts) -> kpro:parse_endpoints(Hosts). %% Client ID is better to be unique to make it easier for Kafka side trouble shooting. -make_client_id(KafkaType0, BridgeName0) -> - KafkaType = to_bin(KafkaType0), - BridgeName = to_bin(BridgeName0), - iolist_to_binary([KafkaType, ":", BridgeName, ":", atom_to_list(node())]). +make_client_id(InstanceId) -> + InstanceIdBin0 = to_bin(InstanceId), + % Removing the <<"bridge:">> from beginning for backward compatibility + InstanceIdBin = binary:replace(InstanceIdBin0, <<"bridge:">>, <<>>), + iolist_to_binary([InstanceIdBin, ":", atom_to_list(node())]). sasl(none) -> undefined; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index b8abb928c..06176ac93 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -121,7 +121,6 @@ on_start(ResourceId, Config) -> #{ authentication := Auth, bootstrap_hosts := BootstrapHosts0, - bridge_name := BridgeName, hookpoint := _, kafka := #{ max_batch_bytes := _, @@ -134,9 +133,8 @@ on_start(ResourceId, Config) -> topic_mapping := _ } = Config, BootstrapHosts = emqx_bridge_kafka_impl:hosts(BootstrapHosts0), - KafkaType = kafka_consumer, %% Note: this is distinct per node. - ClientID = make_client_id(ResourceId, KafkaType, BridgeName), + ClientID = make_client_id(ResourceId), ClientOpts0 = case Auth of none -> []; @@ -517,11 +515,11 @@ is_dry_run(ResourceId) -> string:equal(TestIdStart, ResourceId) end. --spec make_client_id(resource_id(), kafka_consumer, atom() | binary()) -> atom(). -make_client_id(ResourceId, KafkaType, KafkaName) -> - case is_dry_run(ResourceId) of +-spec make_client_id(resource_id()) -> atom(). +make_client_id(InstanceId) -> + case is_dry_run(InstanceId) of false -> - ClientID0 = emqx_bridge_kafka_impl:make_client_id(KafkaType, KafkaName), + ClientID0 = emqx_bridge_kafka_impl:make_client_id(InstanceId), binary_to_atom(ClientID0); true -> %% It is a dry run and we don't want to leak too many diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index ea6666ea0..c25c849e5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -65,7 +65,7 @@ on_start(InstId, Config) -> ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), _ = maybe_install_wolff_telemetry_handlers(ResourceId), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), - ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), + ClientId = emqx_bridge_kafka_impl:make_client_id(InstId), ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 2d8355e8e..756fa8b3b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -898,8 +898,8 @@ ensure_connected(Config) -> ok. consumer_clientid(Config) -> - KafkaName = ?config(kafka_name, Config), - binary_to_atom(emqx_bridge_kafka_impl:make_client_id(kafka_consumer, KafkaName)). + ResourceId = resource_id(Config), + binary_to_atom(emqx_bridge_kafka_impl:make_client_id(ResourceId)). get_client_connection(Config) -> KafkaHost = ?config(kafka_host, Config),