feat(kafka producer): add health check topic option

Fixes https://emqx.atlassian.net/browse/EMQX-12241

This allows more accurate health checking for Kafka Producers.  Without a topic, it's not
possible to actually probe the connection to partition leaders, so the connector might not
be reported as `disconnected` without testing a concrete topic.
This commit is contained in:
Thales Macedo Garitezi 2024-04-29 17:14:08 -03:00
parent 437e7968b1
commit 6f3da6b131
11 changed files with 90 additions and 7 deletions

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -656,6 +656,11 @@ kafka_connector_config_fields() ->
default => none, desc => ?DESC("authentication")
})},
{socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
{health_check_topic,
mk(binary(), #{
required => false,
desc => ?DESC(producer_health_check_topic)
})},
{ssl, mk(ref(ssl_client_opts), #{})}
] ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts).

View File

@ -89,10 +89,13 @@ on_start(InstId, Config) ->
%% only when its producers start.
case check_client_connectivity(ClientId) of
ok ->
{ok, #{
HealthCheckTopic = maps:get(health_check_topic, Config, undefined),
ConnectorState = #{
client_id => ClientId,
health_check_topic => HealthCheckTopic,
installed_bridge_v2s => #{}
}};
},
{ok, ConnectorState};
{error, {find_client, Reason}} ->
%% Race condition? Crash? We just checked it with `ensure_client'...
{error, Reason};
@ -508,7 +511,7 @@ on_get_status(
%% held in wolff producer's replayq.
case check_client_connectivity(ClientId) of
ok ->
?status_connected;
maybe_check_health_check_topic(State);
{error, {find_client, _Error}} ->
?status_connecting;
{error, {connectivity, Error}} ->
@ -572,6 +575,24 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}}
end.
maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when
is_binary(Topic)
->
#{client_id := ClientId} = ConnectorState,
MaxPartitions = all_partitions,
try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of
ok ->
?status_connected
catch
throw:#{reason := {connection_down, _} = Reason} ->
{?status_disconnected, ConnectorState, Reason};
throw:#{reason := Reason} ->
{?status_connecting, ConnectorState, Reason}
end;
maybe_check_health_check_topic(_) ->
%% Cannot infer further information. Maybe upgraded from older version.
?status_connected.
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of

View File

@ -638,3 +638,43 @@ t_ancient_v1_config_migration_without_local_topic(Config) ->
erpc:call(Node, fun emqx_bridge_v2:list/0)
),
ok.
t_connector_health_check_topic(_Config) ->
?check_trace(
begin
%% We create a connector pointing to a broker that expects authentication, but
%% we don't provide it in the config.
%% Without a health check topic, we're unable to probe any topic leaders to
%% check the actual connection parameters, so the status is "connected".
Type = ?TYPE,
Name = ?FUNCTION_NAME,
PlainAuthBootstrapHost = <<"kafka-1.emqx.net:9093">>,
ConnectorConfig0 = connector_config(#{
<<"bootstrap_hosts">> => PlainAuthBootstrapHost
}),
?assertMatch(
{ok, {{_, 201, _}, _, #{<<"status">> := <<"connected">>}}},
emqx_bridge_v2_testlib:create_connector_api([
{connector_type, Type},
{connector_name, Name},
{connector_config, ConnectorConfig0}
])
),
%% By providing a health check topic, we should detect it's disconnected
%% without the need for an action.
ConnectorConfig1 = connector_config(#{
<<"bootstrap_hosts">> => PlainAuthBootstrapHost,
<<"health_check_topic">> =>
emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
}),
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"status">> := <<"disconnected">>}}},
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig1)
),
ok
end,
[]
),
ok.

View File

@ -0,0 +1,3 @@
Added a new option to configure a topic solely for health check purposes in Kafka Producer connectors.
By configuring this option, it's now possible to more accurately detect connection issues towards partition leaders, such as wrong or missing credentials that prevent establishing the connection.

View File

@ -206,7 +206,7 @@ defmodule EMQXUmbrella.MixProject do
{:hstreamdb_erl,
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.2"},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.3"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},

View File

@ -350,4 +350,9 @@ Setting this to a value which is greater than the total number of partitions in
partitions_limit.label:
"""Max Partitions"""
producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""
}

View File

@ -350,4 +350,9 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""
producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""
}

View File

@ -446,5 +446,9 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""
producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""
}