From c182a4053e85027ec63a6b8f1264cebe0a98591a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 8 Mar 2023 11:30:58 -0300 Subject: [PATCH] fix(kafka_consumer): avoid leaking atoms in bridge probe API --- .../kafka/emqx_bridge_impl_kafka_consumer.erl | 36 +++++++++++++++---- .../emqx_bridge_impl_kafka_consumer_SUITE.erl | 29 ++++++++++++++- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index f89b63d7b..43717dd89 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -101,8 +101,7 @@ on_start(InstanceId, Config) -> BootstrapHosts = emqx_bridge_impl_kafka:hosts(BootstrapHosts0), KafkaType = kafka_consumer, %% Note: this is distinct per node. - ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, BridgeName), - ClientID = binary_to_atom(ClientID0), + ClientID = make_client_id(InstanceId, KafkaType, BridgeName), ClientOpts0 = case Auth of none -> []; @@ -217,9 +216,9 @@ add_ssl_opts(ClientOpts, #{enable := false}) -> add_ssl_opts(ClientOpts, SSL) -> [{ssl, emqx_tls_lib:to_client_opts(SSL)} | ClientOpts]. --spec make_subscriber_id(atom()) -> emqx_ee_bridge_kafka_consumer_sup:child_id(). +-spec make_subscriber_id(atom() | binary()) -> emqx_ee_bridge_kafka_consumer_sup:child_id(). make_subscriber_id(BridgeName) -> - BridgeNameBin = atom_to_binary(BridgeName), + BridgeNameBin = to_bin(BridgeName), <<"kafka_subscriber:", BridgeNameBin/binary>>. ensure_consumer_supervisor_started() -> @@ -398,7 +397,32 @@ log_when_error(Fun, Log) -> }) end. --spec consumer_group_id(atom()) -> binary(). +-spec consumer_group_id(atom() | binary()) -> binary(). consumer_group_id(BridgeName0) -> - BridgeName = atom_to_binary(BridgeName0), + BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer:", BridgeName/binary>>. + +-spec is_dry_run(manager_id()) -> boolean(). +is_dry_run(InstanceId) -> + TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), + case TestIdStart of + nomatch -> + false; + _ -> + string:equal(TestIdStart, InstanceId) + end. + +-spec make_client_id(manager_id(), kafka_consumer, atom() | binary()) -> atom(). +make_client_id(InstanceId, KafkaType, KafkaName) -> + case is_dry_run(InstanceId) of + false -> + ClientID0 = emqx_bridge_impl_kafka:make_client_id(KafkaType, KafkaName), + binary_to_atom(ClientID0); + true -> + %% It is a dry run and we don't want to leak too many + %% atoms. + probing_brod_consumers + end. + +to_bin(B) when is_binary(B) -> B; +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 086699a07..e0be06e1a 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -594,12 +594,29 @@ update_bridge_api(Config, Overrides) -> ct:pal("updating bridge (via http): ~p", [Params]), Res = case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Params, Opts) of - {ok, Res0} -> {ok, emqx_json:decode(Res0, [return_maps])}; + {ok, {_Status, _Headers, Body0}} -> {ok, emqx_json:decode(Body0, [return_maps])}; Error -> Error end, ct:pal("bridge update result: ~p", [Res]), Res. +probe_bridge_api(Config) -> + TypeBin = ?BRIDGE_TYPE_BIN, + Name = ?config(kafka_name, Config), + KafkaConfig = ?config(kafka_config, Config), + Params = KafkaConfig#{<<"type">> => TypeBin, <<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("probing bridge (via http): ~p", [Params]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0}; + Error -> Error + end, + ct:pal("bridge probe result: ~p", [Res]), + Res. + send_message(Config, Payload) -> Name = ?config(kafka_name, Config), Type = ?BRIDGE_TYPE_BIN, @@ -866,6 +883,16 @@ t_start_and_consume_ok(Config) -> #{?snk_kind := kafka_consumer_handle_message, ?snk_span := {complete, _}}, 20_000 ), + + %% Check that the bridge probe API doesn't leak atoms. + ProbeRes = probe_bridge_api(Config), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + Res end, fun({_Partition, OffsetReply}, Trace) ->