refactor(kafka_consumer): follow up refactoring requested from previous pull request

Follow up from https://github.com/emqx/emqx/pull/10273
This commit is contained in:
Thales Macedo Garitezi 2023-04-13 16:26:03 -03:00
parent 26883eec02
commit 4bcfbea056
1 changed files with 17 additions and 12 deletions

View File

@ -179,7 +179,12 @@ on_get_status(_InstanceID, State) ->
kafka_client_id := ClientID,
kafka_topics := KafkaTopics
} = State,
do_get_status(State, ClientID, KafkaTopics, SubscriberId).
case do_get_status(ClientID, KafkaTopics, SubscriberId) of
{disconnected, Message} ->
{disconnected, State, Message};
Res ->
Res
end.
%%-------------------------------------------------------------------------------------
%% `brod_group_subscriber' API
@ -376,41 +381,41 @@ stop_client(ClientID) ->
),
ok.
do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
case brod:get_partitions_count(ClientID, KafkaTopic) of
{ok, NPartitions} ->
case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of
connected -> do_get_status(State, ClientID, RestTopics, SubscriberId);
case do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of
connected -> do_get_status(ClientID, RestTopics, SubscriberId);
disconnected -> disconnected
end;
{error, {client_down, Context}} ->
case infer_client_error(Context) of
auth_error ->
Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
{disconnected, Message};
{auth_error, Message0} ->
Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
{disconnected, Message};
connection_refused ->
Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
{disconnected, Message};
_ ->
{disconnected, State, ?CLIENT_DOWN_MESSAGE}
{disconnected, ?CLIENT_DOWN_MESSAGE}
end;
{error, leader_not_available} ->
Message =
"Leader connection not available. Please check the Kafka topic used,"
" the connection parameters and Kafka cluster health",
{disconnected, State, Message};
{disconnected, Message};
_ ->
disconnected
end;
do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) ->
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) ->
connected.
-spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
-spec do_get_topic_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
connected | disconnected.
do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
do_get_topic_status(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
Results =
lists:map(
fun(N) ->