Merge pull request #12959 from thalesmg/kprodu-connector-hc-m-20240429

feat(kafka producer): add health check topic option
This commit is contained in:
Thales Macedo Garitezi 2024-05-03 12:48:48 -03:00 committed by GitHub
commit 3818b75188
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 90 additions and 7 deletions

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.3"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -656,6 +656,11 @@ kafka_connector_config_fields() ->
default => none, desc => ?DESC("authentication") default => none, desc => ?DESC("authentication")
})}, })},
{socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}, {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
{health_check_topic,
mk(binary(), #{
required => false,
desc => ?DESC(producer_health_check_topic)
})},
{ssl, mk(ref(ssl_client_opts), #{})} {ssl, mk(ref(ssl_client_opts), #{})}
] ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts). ] ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts).

View File

@ -89,10 +89,13 @@ on_start(InstId, Config) ->
%% only when its producers start. %% only when its producers start.
case check_client_connectivity(ClientId) of case check_client_connectivity(ClientId) of
ok -> ok ->
{ok, #{ HealthCheckTopic = maps:get(health_check_topic, Config, undefined),
ConnectorState = #{
client_id => ClientId, client_id => ClientId,
health_check_topic => HealthCheckTopic,
installed_bridge_v2s => #{} installed_bridge_v2s => #{}
}}; },
{ok, ConnectorState};
{error, {find_client, Reason}} -> {error, {find_client, Reason}} ->
%% Race condition? Crash? We just checked it with `ensure_client'... %% Race condition? Crash? We just checked it with `ensure_client'...
{error, Reason}; {error, Reason};
@ -508,7 +511,7 @@ on_get_status(
%% held in wolff producer's replayq. %% held in wolff producer's replayq.
case check_client_connectivity(ClientId) of case check_client_connectivity(ClientId) of
ok -> ok ->
?status_connected; maybe_check_health_check_topic(State);
{error, {find_client, _Error}} -> {error, {find_client, _Error}} ->
?status_connecting; ?status_connecting;
{error, {connectivity, Error}} -> {error, {connectivity, Error}} ->
@ -572,6 +575,24 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}} {error, {find_client, Reason}}
end. end.
maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when
is_binary(Topic)
->
#{client_id := ClientId} = ConnectorState,
MaxPartitions = all_partitions,
try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of
ok ->
?status_connected
catch
throw:#{reason := {connection_down, _} = Reason} ->
{?status_disconnected, ConnectorState, Reason};
throw:#{reason := Reason} ->
{?status_connecting, ConnectorState, Reason}
end;
maybe_check_health_check_topic(_) ->
%% Cannot infer further information. Maybe upgraded from older version.
?status_connected.
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders = Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of

View File

@ -638,3 +638,43 @@ t_ancient_v1_config_migration_without_local_topic(Config) ->
erpc:call(Node, fun emqx_bridge_v2:list/0) erpc:call(Node, fun emqx_bridge_v2:list/0)
), ),
ok. ok.
t_connector_health_check_topic(_Config) ->
?check_trace(
begin
%% We create a connector pointing to a broker that expects authentication, but
%% we don't provide it in the config.
%% Without a health check topic, we're unable to probe any topic leaders to
%% check the actual connection parameters, so the status is "connected".
Type = ?TYPE,
Name = ?FUNCTION_NAME,
PlainAuthBootstrapHost = <<"kafka-1.emqx.net:9093">>,
ConnectorConfig0 = connector_config(#{
<<"bootstrap_hosts">> => PlainAuthBootstrapHost
}),
?assertMatch(
{ok, {{_, 201, _}, _, #{<<"status">> := <<"connected">>}}},
emqx_bridge_v2_testlib:create_connector_api([
{connector_type, Type},
{connector_name, Name},
{connector_config, ConnectorConfig0}
])
),
%% By providing a health check topic, we should detect it's disconnected
%% without the need for an action.
ConnectorConfig1 = connector_config(#{
<<"bootstrap_hosts">> => PlainAuthBootstrapHost,
<<"health_check_topic">> =>
emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
}),
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"status">> := <<"disconnected">>}}},
emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig1)
),
ok
end,
[]
),
ok.

View File

@ -0,0 +1,3 @@
Added a new option to configure a topic solely for health check purposes in Kafka Producer connectors.
By configuring this option, it's now possible to more accurately detect connection issues towards partition leaders, such as wrong or missing credentials that prevent establishing the connection.

View File

@ -208,7 +208,7 @@ defmodule EMQXUmbrella.MixProject do
{:hstreamdb_erl, {:hstreamdb_erl,
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.2"}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.3"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},

View File

@ -350,4 +350,9 @@ Setting this to a value which is greater than the total number of partitions in
partitions_limit.label: partitions_limit.label:
"""Max Partitions""" """Max Partitions"""
producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""
} }

View File

@ -350,4 +350,9 @@ server_name_indication.desc:
server_name_indication.label: server_name_indication.label:
"""SNI""" """SNI"""
producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""
} }

View File

@ -446,5 +446,9 @@ server_name_indication.desc:
server_name_indication.label: server_name_indication.label:
"""SNI""" """SNI"""
producer_health_check_topic.desc:
"""Topic name used exclusively for more accurate connector health checks."""
producer_health_check_topic.label:
"""Connector health check topic"""
} }