fix(kafka_consumer): avoid leaking atoms in bridge probe API

This commit is contained in:
Thales Macedo Garitezi 2023-03-08 11:30:58 -03:00
parent 03342923b9
commit c182a4053e
2 changed files with 58 additions and 7 deletions

View File

@ -101,8 +101,7 @@ on_start(InstanceId, Config) ->
BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0),
KafkaType = kafka_consumer,
%% Note: this is distinct per node.
ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName),
ClientID = binary_to_atom(ClientID0),
ClientID = make_client_id(InstanceId, KafkaType, BridgeName),
ClientOpts0 =
case Auth of
none -> [];
@ -217,9 +216,9 @@ add_ssl_opts(ClientOpts, #{enable := false}) ->
add_ssl_opts(ClientOpts, SSL) ->
[{ssl, emqx_tls_lib:to_client_opts(SSL)} | ClientOpts].
-spec make_subscriber_id(atom()) -> emqx_ee_bridge_kafka_consumer_sup:child_id().
-spec make_subscriber_id(atom() | binary()) -> emqx_ee_bridge_kafka_consumer_sup:child_id().
make_subscriber_id(BridgeName) ->
BridgeNameBin = atom_to_binary(BridgeName),
BridgeNameBin = to_bin(BridgeName),
<<"kafka_subscriber:", BridgeNameBin/binary>>.
ensure_consumer_supervisor_started() ->
@ -398,7 +397,32 @@ log_when_error(Fun, Log) ->
})
end.
-spec consumer_group_id(atom()) -> binary().
-spec consumer_group_id(atom() | binary()) -> binary().
consumer_group_id(BridgeName0) ->
BridgeName = atom_to_binary(BridgeName0),
BridgeName = to_bin(BridgeName0),
<<"emqx-kafka-consumer:", BridgeName/binary>>.
-spec is_dry_run(manager_id()) -> boolean().
is_dry_run(InstanceId) ->
TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX),
case TestIdStart of
nomatch ->
false;
_ ->
string:equal(TestIdStart, InstanceId)
end.
-spec make_client_id(manager_id(), kafka_consumer, atom() | binary()) -> atom().
make_client_id(InstanceId, KafkaType, KafkaName) ->
case is_dry_run(InstanceId) of
false ->
ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, KafkaName),
binary_to_atom(ClientID0);
true ->
%% It is a dry run and we don't want to leak too many
%% atoms.
probing_brod_consumers
end.
to_bin(B) when is_binary(B) -> B;
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).

View File

@ -594,12 +594,29 @@ update_bridge_api(Config, Overrides) ->
ct:pal("updating bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of
{ok, Res0} -> {ok, emqx_json:decode(Res0, [return_maps])};
{ok, {_Status, _Headers, Body0}} -> {ok, emqx_json:decode(Body0, [return_maps])};
Error -> Error
end,
ct:pal("bridge update result: ~p", [Res]),
Res.
probe_bridge_api(Config) ->
TypeBin = ?BRIDGE_TYPE_BIN,
Name = ?config(kafka_name, Config),
KafkaConfig = ?config(kafka_config, Config),
Params = KafkaConfig#{<<"type">> => TypeBin, <<"name">> => Name},
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("probing bridge (via http): ~p", [Params]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
Error -> Error
end,
ct:pal("bridge probe result: ~p", [Res]),
Res.
send_message(Config, Payload) ->
Name = ?config(kafka_name, Config),
Type = ?BRIDGE_TYPE_BIN,
@ -866,6 +883,16 @@ t_start_and_consume_ok(Config) ->
#{?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _}},
20_000
),
%% Check that the bridge probe API doesn't leak atoms.
ProbeRes = probe_bridge_api(Config),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
Res
end,
fun({_Partition, OffsetReply}, Trace) ->