From aefa0e5ffb05ac2c66c1b48f91d6100e3c8a9eb5 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 29 Mar 2023 15:06:15 +0200 Subject: [PATCH 1/6] docs: make clickhouse config label for Server URL more concise --- .../emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf index 1e07c29b4..b3c2f6532 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf @@ -7,8 +7,8 @@ emqx_ee_connector_clickhouse { zh: """你想连接到的Clickhouse服务器的HTTP URL(例如http://myhostname:8123)。""" } label: { - en: "URL to clickhouse server" - zh: "到clickhouse服务器的URL" + en: "Server URL" + zh: "服务器 URL" } } From 91a784134c15ea56bdd7dae04307187633364da4 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 29 Mar 2023 15:32:30 +0200 Subject: [PATCH 2/6] docs: update Chinese version of batch value separator label --- lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf index 6a28b371a..5096f8590 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf @@ -60,7 +60,7 @@ https://clickhouse.com/docs/en/interfaces/formats#formats 了解更多关于 } label { en: "Batch Value Separator" - zh: "批量值分离器" + zh: "分隔符" } } config_enable { From 632bffd45168bfb5ab8ab1e4b5f020b312de7fc8 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 29 Mar 2023 11:59:29 -0300 Subject: [PATCH 3/6] fix: return friendly message when kafka producer fails to start (rv5.0) Fixes https://emqx.atlassian.net/browse/EMQX-9392 The returned information does not allow to diagnose the issue (i.e.: a connection issue due to the wrong host and port, the wrong password failing authn). However, such information is printed to the logs. This changes the returned error to the API so that the user is hinted at looking at the logs for further investigation of the error. --- apps/emqx_resource/src/emqx_resource.app.src | 2 +- apps/emqx_resource/src/emqx_resource.erl | 9 ++++++++- .../src/kafka/emqx_bridge_impl_kafka_consumer.erl | 6 +++++- .../src/kafka/emqx_bridge_impl_kafka_producer.erl | 6 +++++- .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 12 +++++++++--- 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 7be1bcb1c..fbfe8c1fa 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 1ccb5ca71..a2fd9804b 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -356,7 +356,14 @@ is_buffer_supported(Module) -> -spec call_start(manager_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(MgrId, Mod, Config) -> - ?SAFE_CALL(Mod:on_start(MgrId, Config)). + try + Mod:on_start(MgrId, Config) + catch + throw:{error, Error} -> + {error, Error}; + Kind:Error:Stacktrace -> + {error, {Kind, Error, Stacktrace}} + end. -spec call_health_check(manager_id(), module(), resource_state()) -> resource_status() diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index a05f6ec13..44ea95f90 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -152,7 +152,11 @@ on_start(InstanceId, Config) -> kafka_hosts => BootstrapHosts, reason => emqx_misc:redact(Reason) }), - throw(failed_to_start_kafka_client) + throw( + {error, + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters"} + ) end, start_consumer(Config, InstanceId, ClientID). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 5703c69f5..8f5988aaf 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -114,7 +114,11 @@ on_start(InstId, Config) -> client_id => ClientId } ), - throw(failed_to_start_kafka_producer) + throw( + {error, + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters"} + ) end. on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 4b9642442..2fb6c6857 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -9,6 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). -define(PRODUCER, emqx_bridge_impl_kafka_producer). @@ -415,9 +416,14 @@ t_failed_creation_then_fix(Config) -> Type, erlang:list_to_atom(Name), WrongConf ), WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, - ?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), - %% before throwing, it should cleanup the client process. - ?assertEqual([], supervisor:which_children(wolff_client_sup)), + ?assertThrow( + {error, _}, + ?PRODUCER:on_start(ResourceId, WrongConfigAtom) + ), + %% before throwing, it should cleanup the client process. we + %% retry because the supervisor might need some time to really + %% remove it from its tree. + ?retry(50, 10, ?assertEqual([], supervisor:which_children(wolff_client_sup))), %% must succeed with correct config {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( Type, erlang:list_to_atom(Name), ValidConf From bcde52383b7166e9bfc4683ec651122da1d42c0a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 31 Mar 2023 12:35:27 +0200 Subject: [PATCH 4/6] docs: fix max batch size desc --- apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index fb6b2eb06..600289b1d 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -149,11 +149,11 @@ When disabled the messages are buffered in RAM only.""" batch_size { desc { en: """Maximum batch count. If equal to 1, there's effectively no batching.""" - zh: """批量请求大小。如果设为1,则无批处理。""" + zh: """最大批量请求大小。如果设为1,则无批处理。""" } label { - en: """Batch size""" - zh: """批量请求大小""" + en: """Max batch size""" + zh: """最大批量请求大小""" } } @@ -163,7 +163,7 @@ When disabled the messages are buffered in RAM only.""" zh: """在较低消息率情况下尝试累积批量输出时的最大等待间隔,以提高资源的利用率。""" } label { - en: """Max Batch Wait Time""" + en: """Max batch wait time""" zh: """批量等待最大间隔""" } } From abf0329b60a66a6bb77dfd799354731501071f37 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 31 Mar 2023 13:03:00 +0200 Subject: [PATCH 5/6] test(emqx_banned_SUITE): fix flaky test case --- apps/emqx/test/emqx_banned_SUITE.erl | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/apps/emqx/test/emqx_banned_SUITE.erl b/apps/emqx/test/emqx_banned_SUITE.erl index 605c1de6d..80427ac47 100644 --- a/apps/emqx/test/emqx_banned_SUITE.erl +++ b/apps/emqx/test/emqx_banned_SUITE.erl @@ -154,7 +154,12 @@ t_session_taken(_) -> {clean_start, false}, {properties, #{'Session-Expiry-Interval' => 120}} ]), - {ok, _} = emqtt:connect(C), + case emqtt:connect(C) of + {ok, _} -> + ok; + {error, econnrefused} -> + throw(mqtt_listener_not_ready) + end, {ok, _, [0]} = emqtt:subscribe(C, Topic, []), C end, @@ -168,9 +173,21 @@ t_session_taken(_) -> lists:seq(1, MsgNum) ) end, - - C1 = Connect(), - ok = emqtt:disconnect(C1), + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + try + C = Connect(), + emqtt:disconnect(C), + true + catch + throw:mqtt_listener_not_ready -> + false + end + end, + 3000 + ), Publish(), From 5011486b187c881a96919dd88b11d943da03968e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 30 Mar 2023 14:44:04 -0300 Subject: [PATCH 6/6] fix(kafka_consumer): return better error messages when probing kafka consumer bridge Fixes https://emqx.atlassian.net/browse/EMQX-9422 --- apps/emqx_resource/src/emqx_resource.erl | 4 +- .../kafka/emqx_bridge_impl_kafka_consumer.erl | 56 +++++++++++++++---- .../kafka/emqx_bridge_impl_kafka_producer.erl | 5 +- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 5 +- 4 files changed, 49 insertions(+), 21 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index a2fd9804b..0ed459c01 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -359,10 +359,10 @@ call_start(MgrId, Mod, Config) -> try Mod:on_start(MgrId, Config) catch - throw:{error, Error} -> + throw:Error -> {error, Error}; Kind:Error:Stacktrace -> - {error, {Kind, Error, Stacktrace}} + {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}} end. -spec call_health_check(manager_id(), module(), resource_state()) -> diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl index 44ea95f90..f4dc3456e 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_consumer.erl @@ -95,6 +95,11 @@ commit_fun => brod_group_subscriber_v2:commit_fun() }. +-define(CLIENT_DOWN_MESSAGE, + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters." +). + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -152,11 +157,7 @@ on_start(InstanceId, Config) -> kafka_hosts => BootstrapHosts, reason => emqx_misc:redact(Reason) }), - throw( - {error, - "Failed to start Kafka client. Please check the logs for errors and check" - " the connection parameters"} - ) + throw(?CLIENT_DOWN_MESSAGE) end, start_consumer(Config, InstanceId, ClientID). @@ -177,7 +178,7 @@ on_get_status(_InstanceID, State) -> kafka_client_id := ClientID, kafka_topics := KafkaTopics } = State, - do_get_status(ClientID, KafkaTopics, SubscriberId). + do_get_status(State, ClientID, KafkaTopics, SubscriberId). %%------------------------------------------------------------------------------------- %% `brod_group_subscriber' API @@ -374,22 +375,41 @@ stop_client(ClientID) -> ), ok. -do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> +do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) -> case brod:get_partitions_count(ClientID, KafkaTopic) of {ok, NPartitions} -> - case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of - connected -> do_get_status(ClientID, RestTopics, SubscriberId); + case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of + connected -> do_get_status(State, ClientID, RestTopics, SubscriberId); disconnected -> disconnected end; + {error, {client_down, Context}} -> + case infer_client_error(Context) of + auth_error -> + Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + {auth_error, Message0} -> + Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + connection_refused -> + Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE, + {disconnected, State, Message}; + _ -> + {disconnected, State, ?CLIENT_DOWN_MESSAGE} + end; + {error, leader_not_available} -> + Message = + "Leader connection not available. Please check the Kafka topic used," + " the connection parameters and Kafka cluster health", + {disconnected, State, Message}; _ -> disconnected end; -do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> +do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) -> connected. --spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> +-spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> connected | disconnected. -do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> +do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) -> Results = lists:map( fun(N) -> @@ -508,3 +528,15 @@ encode(Value, base64) -> to_bin(B) when is_binary(B) -> B; to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). + +infer_client_error(Error) -> + case Error of + [{_BrokerEndpoint, {econnrefused, _}} | _] -> + connection_refused; + [{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) -> + {auth_error, Message}; + [{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] -> + auth_error; + _ -> + undefined + end. diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 8f5988aaf..09713a431 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -115,9 +115,8 @@ on_start(InstId, Config) -> } ), throw( - {error, - "Failed to start Kafka client. Please check the logs for errors and check" - " the connection parameters"} + "Failed to start Kafka client. Please check the logs for errors and check" + " the connection parameters." ) end. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 2fb6c6857..9e32f818d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -416,10 +416,7 @@ t_failed_creation_then_fix(Config) -> Type, erlang:list_to_atom(Name), WrongConf ), WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, - ?assertThrow( - {error, _}, - ?PRODUCER:on_start(ResourceId, WrongConfigAtom) - ), + ?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), %% before throwing, it should cleanup the client process. we %% retry because the supervisor might need some time to really %% remove it from its tree.