diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 6c103f73b..64811c91c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index f7958af81..225f90c18 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -101,6 +101,10 @@ " the connection parameters." ). +%% Allocatable resources +-define(kafka_client_id, kafka_client_id). +-define(kafka_subscriber_id, kafka_subscriber_id). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -140,6 +144,7 @@ on_start(ResourceId, Config) -> Auth -> [{sasl, emqx_bridge_kafka_impl:sasl(Auth)}] end, ClientOpts = add_ssl_opts(ClientOpts0, SSL), + ok = emqx_resource:allocate_resource(ResourceId, ?kafka_client_id, ClientID), case brod:start_client(BootstrapHosts, ClientID, ClientOpts) of ok -> ?tp( @@ -163,7 +168,21 @@ on_start(ResourceId, Config) -> start_consumer(Config, ResourceId, ClientID). -spec on_stop(resource_id(), state()) -> ok. -on_stop(_ResourceID, State) -> +on_stop(ResourceId, _State = undefined) -> + case emqx_resource:get_allocated_resources(ResourceId) of + #{?kafka_client_id := ClientID, ?kafka_subscriber_id := SubscriberId} -> + stop_subscriber(SubscriberId), + stop_client(ClientID), + ?tp(kafka_consumer_subcriber_and_client_stopped, #{}), + ok; + #{?kafka_client_id := ClientID} -> + stop_client(ClientID), + ?tp(kafka_consumer_just_client_stopped, #{}), + ok; + _ -> + ok + end; +on_stop(_ResourceId, State) -> #{ subscriber_id := SubscriberId, kafka_client_id := ClientID @@ -333,6 +352,9 @@ start_consumer(Config, ResourceId, ClientID) -> %% spawns one worker for each assigned topic-partition %% automatically, so we should not spawn duplicate workers. SubscriberId = make_subscriber_id(BridgeName), + ?tp(kafka_consumer_about_to_start_subscriber, #{}), + ok = emqx_resource:allocate_resource(ResourceId, ?kafka_subscriber_id, SubscriberId), + ?tp(kafka_consumer_subscriber_allocated, #{}), case emqx_bridge_kafka_consumer_sup:start_child(SubscriberId, GroupSubscriberConfig) of {ok, _ConsumerPid} -> ?tp( diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 7bee2c70d..8b8337b09 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -23,6 +23,11 @@ -include_lib("emqx/include/logger.hrl"). +%% Allocatable resources +-define(kafka_resource_id, kafka_resource_id). +-define(kafka_client_id, kafka_client_id). +-define(kafka_producers, kafka_producers). + %% TODO: rename this to `kafka_producer' after alias support is added %% to hocon; keeping this as just `kafka' for backwards compatibility. -define(BRIDGE_TYPE, kafka). @@ -46,9 +51,11 @@ on_start(InstId, Config) -> } = Config, BridgeType = ?BRIDGE_TYPE, ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), + ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), _ = maybe_install_wolff_telemetry_handlers(ResourceId), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), + ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, connect_timeout => ConnTimeout, @@ -86,6 +93,7 @@ on_start(InstId, Config) -> WolffProducerConfig = producers_config(BridgeName, ClientId, KafkaConfig, IsDryRun), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> + ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers), {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, @@ -120,28 +128,63 @@ on_start(InstId, Config) -> ) end. -on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, +on_stop(InstanceId, _State) -> + case emqx_resource:get_allocated_resources(InstanceId) of #{ - msg => "failed_to_delete_kafka_producer", - client_id => ClientID - } - ), - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientID - } - ), - with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceID) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - client_id => ClientID - } - ). + ?kafka_client_id := ClientId, + ?kafka_producers := Producers, + ?kafka_resource_id := ResourceId + } -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, + #{ + msg => "failed_to_delete_kafka_producer", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + #{?kafka_client_id := ClientId, ?kafka_resource_id := ResourceId} -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ), + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + #{?kafka_resource_id := ResourceId} -> + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(ResourceId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => ResourceId + } + ), + ok; + _ -> + ok + end. on_query( _InstId, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index c17d21635..bffd4caa4 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -59,7 +59,9 @@ only_once_tests() -> t_cluster_group, t_node_joins_existing_cluster, t_cluster_node_down, - t_multiple_topic_mappings + t_multiple_topic_mappings, + t_resource_manager_crash_after_subscriber_started, + t_resource_manager_crash_before_subscriber_started ]. init_per_suite(Config) -> @@ -1118,6 +1120,24 @@ stop_async_publisher(Pid) -> end, ok. +kill_resource_managers() -> + ct:pal("gonna kill resource managers"), + lists:foreach( + fun({_, Pid, _, _}) -> + ct:pal("terminating resource manager ~p", [Pid]), + Ref = monitor(process, Pid), + exit(Pid, kill), + receive + {'DOWN', Ref, process, Pid, killed} -> + ok + after 500 -> + ct:fail("pid ~p didn't die!", [Pid]) + end, + ok + end, + supervisor:which_children(emqx_resource_manager_sup) + ). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -2024,3 +2044,65 @@ t_begin_offset_earliest(Config) -> end ), ok. + +t_resource_manager_crash_after_subscriber_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := kafka_consumer_subscriber_allocated}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := kafka_consumer_subscriber_started} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := kafka_consumer_subcriber_and_client_stopped}, + 10_000 + ), + ok + end, + [] + ), + ok. + +t_resource_manager_crash_before_subscriber_started(Config) -> + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := kafka_consumer_client_started}, + #{?snk_kind := will_kill_resource_manager} + ), + ?force_ordering( + #{?snk_kind := resource_manager_killed}, + #{?snk_kind := kafka_consumer_about_to_start_subscriber} + ), + spawn_link(fun() -> + ?tp(will_kill_resource_manager, #{}), + kill_resource_managers(), + ?tp(resource_manager_killed, #{}), + ok + end), + %% even if the resource manager is dead, we can still + %% clear the allocated resources. + {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := kafka_consumer_just_client_stopped}, + 10_000 + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index d1a29fffe..ad41c9904 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -446,6 +446,8 @@ t_failed_creation_then_fix(Config) -> ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), %% TODO: refactor those into init/end per testcase ok = ?PRODUCER:on_stop(ResourceId, State), + ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertEqual([], supervisor:which_children(wolff_producers_sup)), ok = emqx_bridge_resource:remove(BridgeId), delete_all_bridges(), ok. diff --git a/changes/ee/feat-10813.en.md b/changes/ee/feat-10813.en.md new file mode 100644 index 000000000..b36039f4c --- /dev/null +++ b/changes/ee/feat-10813.en.md @@ -0,0 +1 @@ +Refactored Kafka Producer and Consumer bridges to avoid leaking resources during crashes at creation.