diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 269239620..c820cee06 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 0519e39c9..2c109298c 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index 7c98bf571..69ace9289 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 83bc33266..9d15a26ee 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -656,6 +656,11 @@ kafka_connector_config_fields() -> default => none, desc => ?DESC("authentication") })}, {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}, + {health_check_topic, + mk(binary(), #{ + required => false, + desc => ?DESC(producer_health_check_topic) + })}, {ssl, mk(ref(ssl_client_opts), #{})} ] ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 16bca153a..88cd06a3b 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -89,10 +89,13 @@ on_start(InstId, Config) -> %% only when its producers start. case check_client_connectivity(ClientId) of ok -> - {ok, #{ + HealthCheckTopic = maps:get(health_check_topic, Config, undefined), + ConnectorState = #{ client_id => ClientId, + health_check_topic => HealthCheckTopic, installed_bridge_v2s => #{} - }}; + }, + {ok, ConnectorState}; {error, {find_client, Reason}} -> %% Race condition? Crash? We just checked it with `ensure_client'... {error, Reason}; @@ -508,7 +511,7 @@ on_get_status( %% held in wolff producer's replayq. case check_client_connectivity(ClientId) of ok -> - ?status_connected; + maybe_check_health_check_topic(State); {error, {find_client, _Error}} -> ?status_connecting; {error, {connectivity, Error}} -> @@ -572,6 +575,24 @@ check_client_connectivity(ClientId) -> {error, {find_client, Reason}} end. +maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when + is_binary(Topic) +-> + #{client_id := ClientId} = ConnectorState, + MaxPartitions = all_partitions, + try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of + ok -> + ?status_connected + catch + throw:#{reason := {connection_down, _} = Reason} -> + {?status_disconnected, ConnectorState, Reason}; + throw:#{reason := Reason} -> + {?status_connecting, ConnectorState, Reason} + end; +maybe_check_health_check_topic(_) -> + %% Cannot infer further information. Maybe upgraded from older version. + ?status_connected. + check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> Leaders = case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index b51bd196c..05f73684b 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -638,3 +638,43 @@ t_ancient_v1_config_migration_without_local_topic(Config) -> erpc:call(Node, fun emqx_bridge_v2:list/0) ), ok. + +t_connector_health_check_topic(_Config) -> + ?check_trace( + begin + %% We create a connector pointing to a broker that expects authentication, but + %% we don't provide it in the config. + %% Without a health check topic, we're unable to probe any topic leaders to + %% check the actual connection parameters, so the status is "connected". + Type = ?TYPE, + Name = ?FUNCTION_NAME, + PlainAuthBootstrapHost = <<"kafka-1.emqx.net:9093">>, + ConnectorConfig0 = connector_config(#{ + <<"bootstrap_hosts">> => PlainAuthBootstrapHost + }), + ?assertMatch( + {ok, {{_, 201, _}, _, #{<<"status">> := <<"connected">>}}}, + emqx_bridge_v2_testlib:create_connector_api([ + {connector_type, Type}, + {connector_name, Name}, + {connector_config, ConnectorConfig0} + ]) + ), + + %% By providing a health check topic, we should detect it's disconnected + %% without the need for an action. + ConnectorConfig1 = connector_config(#{ + <<"bootstrap_hosts">> => PlainAuthBootstrapHost, + <<"health_check_topic">> => + emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() + }), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"status">> := <<"disconnected">>}}}, + emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig1) + ), + + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-12959.en.md b/changes/ee/feat-12959.en.md new file mode 100644 index 000000000..e82900a2f --- /dev/null +++ b/changes/ee/feat-12959.en.md @@ -0,0 +1,3 @@ +Added a new option to configure a topic solely for health check purposes in Kafka Producer connectors. + +By configuring this option, it's now possible to more accurately detect connection issues towards partition leaders, such as wrong or missing credentials that prevent establishing the connection. diff --git a/mix.exs b/mix.exs index d77bfc99c..e78016fd6 100644 --- a/mix.exs +++ b/mix.exs @@ -206,7 +206,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.10.2"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.10.3"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"}, diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index 3b96e23e6..2a3071f2c 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -350,4 +350,9 @@ Setting this to a value which is greater than the total number of partitions in partitions_limit.label: """Max Partitions""" +producer_health_check_topic.desc: +"""Topic name used exclusively for more accurate connector health checks.""" +producer_health_check_topic.label: +"""Connector health check topic""" + } diff --git a/rel/i18n/emqx_bridge_confluent_producer.hocon b/rel/i18n/emqx_bridge_confluent_producer.hocon index 748373691..234da3e5f 100644 --- a/rel/i18n/emqx_bridge_confluent_producer.hocon +++ b/rel/i18n/emqx_bridge_confluent_producer.hocon @@ -350,4 +350,9 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +producer_health_check_topic.desc: +"""Topic name used exclusively for more accurate connector health checks.""" +producer_health_check_topic.label: +"""Connector health check topic""" + } diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 6e0074ddd..f63e6f3eb 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -446,5 +446,9 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +producer_health_check_topic.desc: +"""Topic name used exclusively for more accurate connector health checks.""" +producer_health_check_topic.label: +"""Connector health check topic""" }