From 0aa9a872a3a392721c4b762321d25387bbbfaeaf Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Feb 2024 09:53:23 -0300 Subject: [PATCH] feat: check `brod_sup` for client id --- .../src/emqx_bridge_kafka_impl_consumer.erl | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index e4f2376ec..115266252 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -210,9 +210,15 @@ on_stop(ConnectorResId, State) -> ?tp(kafka_consumer_subcriber_and_client_stopped, #{instance_id => ConnectorResId}), ok. --spec on_get_status(resource_id(), connector_state()) -> connected | disconnected. -on_get_status(_ResourceID, _State) -> - ?status_connected. +-spec on_get_status(connector_resource_id(), connector_state()) -> + ?status_connected | ?status_disconnected. +on_get_status(_ConnectorResId, _State = #{kafka_client_id := ClientID}) -> + case brod_sup:find_client(ClientID) of + [_Pid] -> ?status_connected; + _ -> ?status_disconnected + end; +on_get_status(_ConnectorResId, _State) -> + ?status_disconnected. -spec on_add_channel( connector_resource_id(),