Merge pull request #9778 from kjellwinblad/kjell/fix/kafka_atom_leak/EMQX-8739
fix: atom leak when doing Kafka bridge dry-run
This commit is contained in:
commit
cd093b6b9f
|
@ -75,7 +75,16 @@ on_start(InstId, Config) ->
|
||||||
}),
|
}),
|
||||||
throw(failed_to_start_kafka_client)
|
throw(failed_to_start_kafka_client)
|
||||||
end,
|
end,
|
||||||
WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig),
|
%% Check if this is a dry run
|
||||||
|
TestIdStart = string:find(InstId, ?TEST_ID_PREFIX),
|
||||||
|
IsDryRun =
|
||||||
|
case TestIdStart of
|
||||||
|
nomatch ->
|
||||||
|
false;
|
||||||
|
_ ->
|
||||||
|
string:equal(TestIdStart, InstId)
|
||||||
|
end,
|
||||||
|
WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig, IsDryRun),
|
||||||
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
|
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
|
||||||
{ok, Producers} ->
|
{ok, Producers} ->
|
||||||
{ok, #{
|
{ok, #{
|
||||||
|
@ -241,7 +250,7 @@ ssl(#{enable := true} = SSL) ->
|
||||||
ssl(_) ->
|
ssl(_) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
producers_config(BridgeName, ClientId, Input) ->
|
producers_config(BridgeName, ClientId, Input, IsDryRun) ->
|
||||||
#{
|
#{
|
||||||
max_batch_bytes := MaxBatchBytes,
|
max_batch_bytes := MaxBatchBytes,
|
||||||
compression := Compression,
|
compression := Compression,
|
||||||
|
@ -271,7 +280,7 @@ producers_config(BridgeName, ClientId, Input) ->
|
||||||
BridgeType = kafka,
|
BridgeType = kafka,
|
||||||
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||||
#{
|
#{
|
||||||
name => make_producer_name(BridgeName),
|
name => make_producer_name(BridgeName, IsDryRun),
|
||||||
partitioner => partitioner(PartitionStrategy),
|
partitioner => partitioner(PartitionStrategy),
|
||||||
partition_count_refresh_interval_seconds => PCntRefreshInterval,
|
partition_count_refresh_interval_seconds => PCntRefreshInterval,
|
||||||
replayq_dir => ReplayqDir,
|
replayq_dir => ReplayqDir,
|
||||||
|
@ -302,12 +311,20 @@ make_client_id(BridgeName) ->
|
||||||
|
|
||||||
%% Producer name must be an atom which will be used as a ETS table name for
|
%% Producer name must be an atom which will be used as a ETS table name for
|
||||||
%% partition worker lookup.
|
%% partition worker lookup.
|
||||||
make_producer_name(BridgeName) when is_atom(BridgeName) ->
|
make_producer_name(BridgeName, IsDryRun) when is_atom(BridgeName) ->
|
||||||
make_producer_name(atom_to_list(BridgeName));
|
make_producer_name(atom_to_list(BridgeName), IsDryRun);
|
||||||
make_producer_name(BridgeName) ->
|
make_producer_name(BridgeName, IsDryRun) ->
|
||||||
%% Woff needs atom for ets table name registration
|
%% Woff needs an atom for ets table name registration. The assumption here is
|
||||||
%% The assumption here is bridge is not often re-created
|
%% that bridges with new names are not often created.
|
||||||
binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName])).
|
case IsDryRun of
|
||||||
|
true ->
|
||||||
|
%% It is a dry run and we don't want to leak too many atoms
|
||||||
|
%% so we use the default producer name instead of creating
|
||||||
|
%% an unique name.
|
||||||
|
probing_wolff_producers;
|
||||||
|
false ->
|
||||||
|
binary_to_atom(iolist_to_binary(["kafka_producer_", BridgeName]))
|
||||||
|
end.
|
||||||
|
|
||||||
with_log_at_error(Fun, Log) ->
|
with_log_at_error(Fun, Log) ->
|
||||||
try
|
try
|
||||||
|
|
Loading…
Reference in New Issue