fix(kafka): improve shutdown and health check logs during shutdown

This commit is contained in:
Thales Macedo Garitezi 2023-05-24 17:47:26 -03:00
parent 5df7314255
commit 0ca3f51503
2 changed files with 24 additions and 11 deletions

View File

@ -381,7 +381,13 @@ start_consumer(Config, ResourceId, ClientID) ->
stop_subscriber(SubscriberId) -> stop_subscriber(SubscriberId) ->
_ = log_when_error( _ = log_when_error(
fun() -> fun() ->
emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId) try
emqx_bridge_kafka_consumer_sup:ensure_child_deleted(SubscriberId)
catch
exit:{noproc, _} ->
%% may happen when node is shutting down
ok
end
end, end,
#{ #{
msg => "failed_to_delete_kafka_subscriber", msg => "failed_to_delete_kafka_subscriber",
@ -465,16 +471,22 @@ do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
end. end.
are_subscriber_workers_alive(SubscriberId) -> are_subscriber_workers_alive(SubscriberId) ->
Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup), try
case lists:keyfind(SubscriberId, 1, Children) of Children = supervisor:which_children(emqx_bridge_kafka_consumer_sup),
false -> case lists:keyfind(SubscriberId, 1, Children) of
false; false ->
{_, Pid, _, _} -> false;
Workers = brod_group_subscriber_v2:get_workers(Pid), {_, Pid, _, _} ->
%% we can't enforce the number of partitions on a single Workers = brod_group_subscriber_v2:get_workers(Pid),
%% node, as the group might be spread across an emqx %% we can't enforce the number of partitions on a single
%% cluster. %% node, as the group might be spread across an emqx
lists:all(fun is_process_alive/1, maps:values(Workers)) %% cluster.
lists:all(fun is_process_alive/1, maps:values(Workers))
end
catch
exit:{shutdown, _} ->
%% may happen if node is shutting down
false
end. end.
log_when_error(Fun, Log) -> log_when_error(Fun, Log) ->

View File

@ -335,6 +335,7 @@ init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config0) -> common_init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(60)), ct:timetrap(timer:seconds(60)),
delete_all_bridges(), delete_all_bridges(),
emqx_config:delete_override_conf_files(),
KafkaTopic = KafkaTopic =
<< <<
(atom_to_binary(TestCase))/binary, (atom_to_binary(TestCase))/binary,