From 4bcfbea056719be91158be4d7eb2eff6299c83f4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Apr 2023 16:26:03 -0300 Subject: [PATCH] refactor(kafka_consumer): follow up refactoring requested from previous pull request Follow up from https://github.com/emqx/emqx/pull/10273 --- .../src/emqx_bridge_kafka_impl_consumer.erl | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index fdfa3300c..c549b3467 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -179,7 +179,12 @@ on_get_status(_InstanceID, State) -> kafka_client_id := ClientID, kafka_topics := KafkaTopics } = State, - do_get_status(State, ClientID, KafkaTopics, SubscriberId). + case do_get_status(ClientID, KafkaTopics, SubscriberId) of + {disconnected, Message} -> + {disconnected, State, Message}; + Res -> + Res + end. %%------------------------------------------------------------------------------------- %% `brod_group_subscriber' API @@ -376,41 +381,41 @@ stop_client(ClientID) -> ), ok. -do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) -> +do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> case brod:get_partitions_count(ClientID, KafkaTopic) of {ok, NPartitions} -> - case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of - connected -> do_get_status(State, ClientID, RestTopics, SubscriberId); + case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of + connected -> do_get_status(ClientID, RestTopics, SubscriberId); disconnected -> disconnected end; {error, {client_down, Context}} -> case infer_client_error(Context) of auth_error -> Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE, - {disconnected, State, Message}; + {disconnected, Message}; {auth_error, Message0} -> Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE, - {disconnected, State, Message}; + {disconnected, Message}; connection_refused -> Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE, - {disconnected, State, Message}; + {disconnected, Message}; _ -> - {disconnected, State, ?CLIENT_DOWN_MESSAGE} + {disconnected, ?CLIENT_DOWN_MESSAGE} end; {error, leader_not_available} -> Message = "Leader connection not available. Please check the Kafka topic used," " the connection parameters and Kafka cluster health", - {disconnected, State, Message}; + {disconnected, Message}; _ -> disconnected end; -do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) -> +do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> connected. --spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> +-spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> connected | disconnected. -do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) -> +do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> Results = lists:map( fun(N) ->