fix: atom leak when doing Kafka bridge dry-run

A new atom was created every time one did a dry run of a Kafka bridge
(that is, clicking the Test button in the settings dialog for the
bridge).

After this fix, we will only create a new atom when a bridge with a new
name is created. This should be acceptable as bridges with new names are
created relatively rarely and it seems to be useful to have an unique
atom for every Wolff producer.

Fixes: https://emqx.atlassian.net/browse/EMQX-8739
This commit is contained in:
Kjell Winblad 2023-01-16 13:01:51 +01:00
parent 0363f12f2f
commit 8c52264c41
1 changed files with 26 additions and 9 deletions

View File

@ -75,7 +75,16 @@ on_start(InstId, Config) ->
}),
throw(failed_to_start_kafka_client)
end,
WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig),
%% Check if this is a dry run
TestIdStart = string:find(InstId, ?TEST_ID_PREFIX),
IsDryRun =
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, InstId)
end,
WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig, IsDryRun),
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
{ok, Producers} ->
{ok, #{
@ -241,7 +250,7 @@ ssl(#{enable := true} = SSL) ->
ssl(_) ->
[].
producers_config(BridgeName, ClientId, Input) ->
producers_config(BridgeName, ClientId, Input, IsDryRun) ->
#{
max_batch_bytes := MaxBatchBytes,
compression := Compression,
@ -271,7 +280,7 @@ producers_config(BridgeName, ClientId, Input) ->
BridgeType = kafka,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
#{
name => make_producer_name(BridgeName),
name => make_producer_name(BridgeName, IsDryRun),
partitioner => partitioner(PartitionStrategy),
partition_count_refresh_interval_seconds => PCntRefreshInterval,
replayq_dir => ReplayqDir,
@ -302,12 +311,20 @@ make_client_id(BridgeName) ->
%% Producer name must be an atom which will be used as a ETS table name for
%% partition worker lookup.
make_producer_name(BridgeName) when is_atom(BridgeName) ->
make_producer_name(atom_to_list(BridgeName));
make_producer_name(BridgeName) ->
%% Woff needs atom for ets table name registration
%% The assumption here is bridge is not often re-created
binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName])).
make_producer_name(BridgeName, IsDryRun) when is_atom(BridgeName) ->
make_producer_name(atom_to_list(BridgeName), IsDryRun);
make_producer_name(BridgeName, IsDryRun) ->
%% Woff needs an atom for ets table name registration. The assumption here is
%% that bridges with new names are not often created.
case IsDryRun of
true ->
%% It is a dry run and we don't want to leak too many atoms
%% so we use the default producer name instead of creating
%% an unique name.
probing_wolff_producers;
false ->
binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName]))
end.
with_log_at_error(Fun, Log) ->
try