From 5011486b187c881a96919dd88b11d943da03968e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 30 Mar 2023 14:44:04 -0300 Subject: [PATCH] fix(kafka_consumer): return better error messages when probing kafka consumer bridge Fixes https://emqx.atlassian.net/browse/EMQX-9422 --- apps/emqx_resource/src/emqx_resource.erl | 4 +- .../kafka/emqx_bridge_impl_kafka_consumer.erl | 56 +++++++++++++++---- .../kafka/emqx_bridge_impl_kafka_producer.erl | 5 +- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 5 +- 4 files changed, 49 insertions(+), 21 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index a2fd9804b..0ed459c01 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -359,10 +359,10 @@ call_start(MgrId, Mod, Config) -> try Mod:on_start(MgrId, Config) catch - throw:{error, Error} -> + throw:Error -> {error, Error}; Kind:Error:Stacktrace -> - {error, {Kind, Error, Stacktrace}} + {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} end. -spec call_health_check(manager_id(), module(), resource_state()) -> diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 44ea95f90..f4dc3456e 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -95,6 +95,11 @@ commit_fun => brod_group_subscriber_v2:commit_fun() }. +-define(CLIENT_DOWN_MESSAGE, + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters." +). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -152,11 +157,7 @@ on_start(InstanceId, Config) -> kafka_hosts => BootstrapHosts, reason => emqx_misc:redact(Reason) }), - throw( - {error, - "Failed to start Kafka client. Please check the logs for errors and check" - " the connection parameters"} - ) + throw(?CLIENT_DOWN_MESSAGE) end, start_consumer(Config, InstanceId, ClientID). @@ -177,7 +178,7 @@ on_get_status(_InstanceID, State) -> kafka_client_id := ClientID, kafka_topics := KafkaTopics } = State, - do_get_status(ClientID, KafkaTopics, SubscriberId). + do_get_status(State, ClientID, KafkaTopics, SubscriberId). %%------------------------------------------------------------------------------------- %% `brod_group_subscriber' API @@ -374,22 +375,41 @@ stop_client(ClientID) -> ), ok. -do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> +do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) -> case brod:get_partitions_count(ClientID, KafkaTopic) of {ok, NPartitions} -> - case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of - connected -> do_get_status(ClientID, RestTopics, SubscriberId); + case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of + connected -> do_get_status(State, ClientID, RestTopics, SubscriberId); disconnected -> disconnected end; + {error, {client_down, Context}} -> + case infer_client_error(Context) of + auth_error -> + Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + {auth_error, Message0} -> + Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + connection_refused -> + Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + _ -> + {disconnected, State, ?CLIENT_DOWN_MESSAGE} + end; + {error, leader_not_available} -> + Message = + "Leader connection not available. Please check the Kafka topic used," + " the connection parameters and Kafka cluster health", + {disconnected, State, Message}; _ -> disconnected end; -do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> +do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) -> connected. --spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> +-spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> connected | disconnected. -do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> +do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) -> Results = lists:map( fun(N) -> @@ -508,3 +528,15 @@ encode(Value, base64) -> to_bin(B) when is_binary(B) -> B; to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). + +infer_client_error(Error) -> + case Error of + [{_BrokerEndpoint, {econnrefused, _}} | _] -> + connection_refused; + [{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) -> + {auth_error, Message}; + [{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] -> + auth_error; + _ -> + undefined + end. diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 8f5988aaf..09713a431 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -115,9 +115,8 @@ on_start(InstId, Config) -> } ), throw( - {error, - "Failed to start Kafka client. Please check the logs for errors and check" - " the connection parameters"} + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters." ) end. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 2fb6c6857..9e32f818d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -416,10 +416,7 @@ t_failed_creation_then_fix(Config) -> Type, erlang:list_to_atom(Name), WrongConf ), WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, - ?assertThrow( - {error, _}, - ?PRODUCER:on_start(ResourceId, WrongConfigAtom) - ), + ?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), %% before throwing, it should cleanup the client process. we %% retry because the supervisor might need some time to really %% remove it from its tree.