From 632bffd45168bfb5ab8ab1e4b5f020b312de7fc8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 29 Mar 2023 11:59:29 -0300 Subject: [PATCH 1/2] fix: return friendly message when kafka producer fails to start (rv5.0) Fixes https://emqx.atlassian.net/browse/EMQX-9392 The returned information does not allow to diagnose the issue (i.e.: a connection issue due to the wrong host and port, the wrong password failing authn). However, such information is printed to the logs. This changes the returned error to the API so that the user is hinted at looking at the logs for further investigation of the error. --- apps/emqx_resource/src/emqx_resource.app.src | 2 +- apps/emqx_resource/src/emqx_resource.erl | 9 ++++++++- .../src/kafka/emqx_bridge_impl_kafka_consumer.erl | 6 +++++- .../src/kafka/emqx_bridge_impl_kafka_producer.erl | 6 +++++- .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 12 +++++++++--- 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 7be1bcb1c..fbfe8c1fa 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 1ccb5ca71..a2fd9804b 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -356,7 +356,14 @@ is_buffer_supported(Module) -> -spec call_start(manager_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(MgrId, Mod, Config) -> - ?SAFE_CALL(Mod:on_start(MgrId, Config)). + try + Mod:on_start(MgrId, Config) + catch + throw:{error, Error} -> + {error, Error}; + Kind:Error:Stacktrace -> + {error, {Kind, Error, Stacktrace}} + end. -spec call_health_check(manager_id(), module(), resource_state()) -> resource_status() diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index a05f6ec13..44ea95f90 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -152,7 +152,11 @@ on_start(InstanceId, Config) -> kafka_hosts => BootstrapHosts, reason => emqx_misc:redact(Reason) }), - throw(failed_to_start_kafka_client) + throw( + {error, + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters"} + ) end, start_consumer(Config, InstanceId, ClientID). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 5703c69f5..8f5988aaf 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -114,7 +114,11 @@ on_start(InstId, Config) -> client_id => ClientId } ), - throw(failed_to_start_kafka_producer) + throw( + {error, + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters"} + ) end. on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 4b9642442..2fb6c6857 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). -define(PRODUCER, emqx_bridge_impl_kafka_producer). @@ -415,9 +416,14 @@ t_failed_creation_then_fix(Config) -> Type, erlang:list_to_atom(Name), WrongConf ), WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, - ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), - %% before throwing, it should cleanup the client process. - ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertThrow( + {error, _}, + ?PRODUCER:on_start(ResourceId, WrongConfigAtom) + ), + %% before throwing, it should cleanup the client process. we + %% retry because the supervisor might need some time to really + %% remove it from its tree. + ?retry(50, 10, ?assertEqual([], supervisor:which_children(wolff_client_sup))), %% must succeed with correct config {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( Type, erlang:list_to_atom(Name), ValidConf From 5011486b187c881a96919dd88b11d943da03968e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 30 Mar 2023 14:44:04 -0300 Subject: [PATCH 2/2] fix(kafka_consumer): return better error messages when probing kafka consumer bridge Fixes https://emqx.atlassian.net/browse/EMQX-9422 --- apps/emqx_resource/src/emqx_resource.erl | 4 +- .../kafka/emqx_bridge_impl_kafka_consumer.erl | 56 +++++++++++++++---- .../kafka/emqx_bridge_impl_kafka_producer.erl | 5 +- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 5 +- 4 files changed, 49 insertions(+), 21 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index a2fd9804b..0ed459c01 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -359,10 +359,10 @@ call_start(MgrId, Mod, Config) -> try Mod:on_start(MgrId, Config) catch - throw:{error, Error} -> + throw:Error -> {error, Error}; Kind:Error:Stacktrace -> - {error, {Kind, Error, Stacktrace}} + {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} end. -spec call_health_check(manager_id(), module(), resource_state()) -> diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 44ea95f90..f4dc3456e 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -95,6 +95,11 @@ commit_fun => brod_group_subscriber_v2:commit_fun() }. +-define(CLIENT_DOWN_MESSAGE, + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters." +). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -152,11 +157,7 @@ on_start(InstanceId, Config) -> kafka_hosts => BootstrapHosts, reason => emqx_misc:redact(Reason) }), - throw( - {error, - "Failed to start Kafka client. Please check the logs for errors and check" - " the connection parameters"} - ) + throw(?CLIENT_DOWN_MESSAGE) end, start_consumer(Config, InstanceId, ClientID). @@ -177,7 +178,7 @@ on_get_status(_InstanceID, State) -> kafka_client_id := ClientID, kafka_topics := KafkaTopics } = State, - do_get_status(ClientID, KafkaTopics, SubscriberId). + do_get_status(State, ClientID, KafkaTopics, SubscriberId). %%------------------------------------------------------------------------------------- %% `brod_group_subscriber' API @@ -374,22 +375,41 @@ stop_client(ClientID) -> ), ok. -do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> +do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) -> case brod:get_partitions_count(ClientID, KafkaTopic) of {ok, NPartitions} -> - case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of - connected -> do_get_status(ClientID, RestTopics, SubscriberId); + case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of + connected -> do_get_status(State, ClientID, RestTopics, SubscriberId); disconnected -> disconnected end; + {error, {client_down, Context}} -> + case infer_client_error(Context) of + auth_error -> + Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + {auth_error, Message0} -> + Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + connection_refused -> + Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + _ -> + {disconnected, State, ?CLIENT_DOWN_MESSAGE} + end; + {error, leader_not_available} -> + Message = + "Leader connection not available. Please check the Kafka topic used," + " the connection parameters and Kafka cluster health", + {disconnected, State, Message}; _ -> disconnected end; -do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> +do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) -> connected. --spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> +-spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> connected | disconnected. -do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> +do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) -> Results = lists:map( fun(N) -> @@ -508,3 +528,15 @@ encode(Value, base64) -> to_bin(B) when is_binary(B) -> B; to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). + +infer_client_error(Error) -> + case Error of + [{_BrokerEndpoint, {econnrefused, _}} | _] -> + connection_refused; + [{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) -> + {auth_error, Message}; + [{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] -> + auth_error; + _ -> + undefined + end. diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 8f5988aaf..09713a431 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -115,9 +115,8 @@ on_start(InstId, Config) -> } ), throw( - {error, - "Failed to start Kafka client. Please check the logs for errors and check" - " the connection parameters"} + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters." ) end. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 2fb6c6857..9e32f818d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -416,10 +416,7 @@ t_failed_creation_then_fix(Config) -> Type, erlang:list_to_atom(Name), WrongConf ), WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, - ?assertThrow( - {error, _}, - ?PRODUCER:on_start(ResourceId, WrongConfigAtom) - ), + ?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), %% before throwing, it should cleanup the client process. we %% retry because the supervisor might need some time to really %% remove it from its tree.