diff --git a/apps/emqx/test/emqx_banned_SUITE.erl b/apps/emqx/test/emqx_banned_SUITE.erl index 605c1de6d..80427ac47 100644 --- a/apps/emqx/test/emqx_banned_SUITE.erl +++ b/apps/emqx/test/emqx_banned_SUITE.erl @@ -154,7 +154,12 @@ t_session_taken(_) -> {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 120}} ]), - {ok, _} = emqtt:connect(C), + case emqtt:connect(C) of + {ok, _} -> + ok; + {error, econnrefused} -> + throw(mqtt_listener_not_ready) + end, {ok, _, [0]} = emqtt:subscribe(C, Topic, []), C end, @@ -168,9 +173,21 @@ t_session_taken(_) -> lists:seq(1, MsgNum) ) end, - - C1 = Connect(), - ok = emqtt:disconnect(C1), + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + try + C = Connect(), + emqtt:disconnect(C), + true + catch + throw:mqtt_listener_not_ready -> + false + end + end, + 3000 + ), Publish(), diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 341ba2463..933aa009b 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -160,11 +160,11 @@ When disabled the messages are buffered in RAM only.""" batch_size { desc { en: """Maximum batch count. If equal to 1, there's effectively no batching.""" - zh: """批量请求大小。如果设为1,则无批处理。""" + zh: """最大批量请求大小。如果设为1,则无批处理。""" } label { - en: """Batch size""" - zh: """批量请求大小""" + en: """Max batch size""" + zh: """最大批量请求大小""" } } @@ -174,7 +174,7 @@ When disabled the messages are buffered in RAM only.""" zh: """在较低消息率情况下尝试累积批量输出时的最大等待间隔,以提高资源的利用率。""" } label { - en: """Max Batch Wait Time""" + en: """Max batch wait time""" zh: """批量等待最大间隔""" } } diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 2e72c2a28..0f7e93a9b 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -356,7 +356,14 @@ is_buffer_supported(Module) -> -spec call_start(manager_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(MgrId, Mod, Config) -> - ?SAFE_CALL(Mod:on_start(MgrId, Config)). + try + Mod:on_start(MgrId, Config) + catch + throw:Error -> + {error, Error}; + Kind:Error:Stacktrace -> + {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} + end. -spec call_health_check(manager_id(), module(), resource_state()) -> resource_status() diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf index 6a28b371a..5096f8590 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf @@ -60,7 +60,7 @@ https://clickhouse.com/docs/en/interfaces/formats#formats 了解更多关于 } label { en: "Batch Value Separator" - zh: "批量值分离器" + zh: "分隔符" } } config_enable { diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index a05f6ec13..f4dc3456e 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -95,6 +95,11 @@ commit_fun => brod_group_subscriber_v2:commit_fun() }. +-define(CLIENT_DOWN_MESSAGE, + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters." +). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -152,7 +157,7 @@ on_start(InstanceId, Config) -> kafka_hosts => BootstrapHosts, reason => emqx_misc:redact(Reason) }), - throw(failed_to_start_kafka_client) + throw(?CLIENT_DOWN_MESSAGE) end, start_consumer(Config, InstanceId, ClientID). @@ -173,7 +178,7 @@ on_get_status(_InstanceID, State) -> kafka_client_id := ClientID, kafka_topics := KafkaTopics } = State, - do_get_status(ClientID, KafkaTopics, SubscriberId). + do_get_status(State, ClientID, KafkaTopics, SubscriberId). %%------------------------------------------------------------------------------------- %% `brod_group_subscriber' API @@ -370,22 +375,41 @@ stop_client(ClientID) -> ), ok. -do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> +do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) -> case brod:get_partitions_count(ClientID, KafkaTopic) of {ok, NPartitions} -> - case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of - connected -> do_get_status(ClientID, RestTopics, SubscriberId); + case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of + connected -> do_get_status(State, ClientID, RestTopics, SubscriberId); disconnected -> disconnected end; + {error, {client_down, Context}} -> + case infer_client_error(Context) of + auth_error -> + Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + {auth_error, Message0} -> + Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + connection_refused -> + Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + _ -> + {disconnected, State, ?CLIENT_DOWN_MESSAGE} + end; + {error, leader_not_available} -> + Message = + "Leader connection not available. Please check the Kafka topic used," + " the connection parameters and Kafka cluster health", + {disconnected, State, Message}; _ -> disconnected end; -do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> +do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) -> connected. --spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> +-spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> connected | disconnected. -do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> +do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) -> Results = lists:map( fun(N) -> @@ -504,3 +528,15 @@ encode(Value, base64) -> to_bin(B) when is_binary(B) -> B; to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). + +infer_client_error(Error) -> + case Error of + [{_BrokerEndpoint, {econnrefused, _}} | _] -> + connection_refused; + [{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) -> + {auth_error, Message}; + [{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] -> + auth_error; + _ -> + undefined + end. diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 5703c69f5..09713a431 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -114,7 +114,10 @@ on_start(InstId, Config) -> client_id => ClientId } ), - throw(failed_to_start_kafka_producer) + throw( + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters." + ) end. on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 4b9642442..9e32f818d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). -define(PRODUCER, emqx_bridge_impl_kafka_producer). @@ -415,9 +416,11 @@ t_failed_creation_then_fix(Config) -> Type, erlang:list_to_atom(Name), WrongConf ), WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, - ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), - %% before throwing, it should cleanup the client process. - ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), + %% before throwing, it should cleanup the client process. we + %% retry because the supervisor might need some time to really + %% remove it from its tree. + ?retry(50, 10, ?assertEqual([], supervisor:which_children(wolff_client_sup))), %% must succeed with correct config {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( Type, erlang:list_to_atom(Name), ValidConf diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf index 533d100bf..069505a69 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf @@ -7,8 +7,8 @@ emqx_ee_connector_clickhouse { zh: """你想连接到的Clickhouse服务器的HTTP URL(例如http://myhostname:8123)。""" } label: { - en: "URL to clickhouse server" - zh: "到clickhouse服务器的URL" + en: "Server URL" + zh: "服务器 URL" } }