diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index bec3c49fa..25741b6cd 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -75,7 +75,16 @@ on_start(InstId, Config) -> }), throw(failed_to_start_kafka_client) end, - WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig), + %% Check if this is a dry run + TestIdStart = string:find(InstId, ?TEST_ID_PREFIX), + IsDryRun = + case TestIdStart of + nomatch -> + false; + _ -> + string:equal(TestIdStart, InstId) + end, + WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig, IsDryRun), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> {ok, #{ @@ -241,7 +250,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> []. -producers_config(BridgeName, ClientId, Input) -> +producers_config(BridgeName, ClientId, Input, IsDryRun) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -271,7 +280,7 @@ producers_config(BridgeName, ClientId, Input) -> BridgeType = kafka, ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ - name => make_producer_name(BridgeName), + name => make_producer_name(BridgeName, IsDryRun), partitioner => partitioner(PartitionStrategy), partition_count_refresh_interval_seconds => PCntRefreshInterval, replayq_dir => ReplayqDir, @@ -302,12 +311,20 @@ make_client_id(BridgeName) -> %% Producer name must be an atom which will be used as a ETS table name for %% partition worker lookup. -make_producer_name(BridgeName) when is_atom(BridgeName) -> - make_producer_name(atom_to_list(BridgeName)); -make_producer_name(BridgeName) -> - %% Woff needs atom for ets table name registration - %% The assumption here is bridge is not often re-created - binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName])). +make_producer_name(BridgeName, IsDryRun) when is_atom(BridgeName) -> + make_producer_name(atom_to_list(BridgeName), IsDryRun); +make_producer_name(BridgeName, IsDryRun) -> + %% Woff needs an atom for ets table name registration. The assumption here is + %% that bridges with new names are not often created. + case IsDryRun of + true -> + %% It is a dry run and we don't want to leak too many atoms + %% so we use the default producer name instead of creating + %% an unique name. + probing_wolff_producers; + false -> + binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName])) + end. with_log_at_error(Fun, Log) -> try