From 85cff5e7ebf7c25d2d4298995a5dd2c614cd333b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 31 Jul 2024 09:14:29 -0300 Subject: [PATCH] fix: merge conflicts --- .../src/emqx_bridge_kafka_impl_producer.erl | 12 +++++++----- .../test/emqx_bridge_v2_kafka_producer_SUITE.erl | 15 +++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index c886b8d58..1b18a1767 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -564,7 +564,7 @@ on_kafka_ack(_Partition, message_too_large, {ReplyFn, Args}) -> %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_manager' will kill the wolff producers and messages might be lost. on_get_status( - _InstId, + ConnResId, #{client_id := ClientId} = State ) -> %% Note: we must avoid returning `?status_disconnected' here if the connector ever was @@ -574,7 +574,7 @@ on_get_status( %% held in wolff producer's replayq. case check_client_connectivity(ClientId) of ok -> - maybe_check_health_check_topic(State); + maybe_check_health_check_topic(ConnResId, State); {error, {find_client, _Error}} -> ?status_connecting; {error, {connectivity, Error}} -> @@ -648,21 +648,23 @@ check_client_connectivity(ClientId) -> {error, {find_client, Reason}} end. -maybe_check_health_check_topic(#{health_check_topic := Topic} = ConnectorState) when +maybe_check_health_check_topic(ConnResId, #{health_check_topic := Topic} = ConnectorState) when is_binary(Topic) -> #{client_id := ClientId} = ConnectorState, MaxPartitions = all_partitions, - try check_topic_and_leader_connections(ClientId, Topic, MaxPartitions) of + try check_topic_and_leader_connections(ConnResId, ClientId, Topic, MaxPartitions) of ok -> ?status_connected catch + throw:{unhealthy_target, Msg} -> + {?status_disconnected, ConnectorState, Msg}; throw:#{reason := {connection_down, _} = Reason} -> {?status_disconnected, ConnectorState, Reason}; throw:#{reason := Reason} -> {?status_connecting, ConnectorState, Reason} end; -maybe_check_health_check_topic(_) -> +maybe_check_health_check_topic(_ConnResId, _ConnState) -> %% Cannot infer further information. Maybe upgraded from older version. ?status_connected. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 5e9d53fc5..1db3c1725 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -740,6 +740,21 @@ t_connector_health_check_topic(_Config) -> emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig1) ), + %% By providing an inexistent health check topic, we should detect it's + %% disconnected without the need for an action. + ConnectorConfig2 = connector_config(#{ + <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), + <<"health_check_topic">> => <<"i-dont-exist-999">> + }), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Unknown topic or partition", _/binary>> + }}}, + emqx_bridge_v2_testlib:update_connector_api(Name, Type, ConnectorConfig2) + ), + ok end, []