From 0ca3f51503f6381dcaac69442d9a2e67c5e2a4ca Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 24 May 2023 17:47:26 -0300 Subject: [PATCH] fix(kafka): improve shutdown and health check logs during shutdown --- .../src/emqx_bridge_kafka_impl_consumer.erl | 34 +++++++++++++------ .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 1 + 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 225f90c18..c0de23d94 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -381,7 +381,13 @@ start_consumer(Config, ResourceId, ClientID) -> stop_subscriber(SubscriberId) -> _ = log_when_error( fun() -> - emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) + try + emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) + catch + exit:{noproc, _} -> + %% may happen when node is shutting down + ok + end end, #{ msg => "failed_to_delete_kafka_subscriber", @@ -465,16 +471,22 @@ do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> end. are_subscriber_workers_alive(SubscriberId) -> - Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup), - case lists:keyfind(SubscriberId, 1, Children) of - false -> - false; - {_, Pid, _, _} -> - Workers = brod_group_subscriber_v2:get_workers(Pid), - %% we can't enforce the number of partitions on a single - %% node, as the group might be spread across an emqx - %% cluster. - lists:all(fun is_process_alive/1, maps:values(Workers)) + try + Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup), + case lists:keyfind(SubscriberId, 1, Children) of + false -> + false; + {_, Pid, _, _} -> + Workers = brod_group_subscriber_v2:get_workers(Pid), + %% we can't enforce the number of partitions on a single + %% node, as the group might be spread across an emqx + %% cluster. + lists:all(fun is_process_alive/1, maps:values(Workers)) + end + catch + exit:{shutdown, _} -> + %% may happen if node is shutting down + false end. log_when_error(Fun, Log) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index bffd4caa4..194ca95d6 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -335,6 +335,7 @@ init_per_testcase(TestCase, Config) -> common_init_per_testcase(TestCase, Config0) -> ct:timetrap(timer:seconds(60)), delete_all_bridges(), + emqx_config:delete_override_conf_files(), KafkaTopic = << (atom_to_binary(TestCase))/binary,