From 75770f2842d7417a59fb836dd824cff72c3866dc Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Fri, 30 Dec 2022 10:18:52 +0100 Subject: [PATCH 1/2] fix(kafka): detect connectivity in on_get_status --- .../src/kafka/emqx_bridge_impl_kafka_producer.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index b46bdb486..e0a37bed9 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -173,8 +173,11 @@ on_kafka_ack(_Partition, _Offset, _Extra) -> %% Maybe need to bump some counters? ok. -on_get_status(_InstId, _State) -> - connected. +on_get_status(_InstId, #{client_id := ClientID}) -> + case wolff:check_connectivity(ClientID) of + ok -> connected; + _ -> disconnected + end. %% Parse comma separated host:port list into a [{Host,Port}] list hosts(Hosts) when is_binary(Hosts) -> From f410201dc33b453e348d458290f2cd32b2296e20 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Fri, 30 Dec 2022 11:43:24 +0100 Subject: [PATCH 2/2] chore: fix flaky test case in emqx_connector_jwt_worker_SUITE --- apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl index 74075917e..fa88c8010 100644 --- a/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl @@ -81,7 +81,7 @@ t_create_success(_Config) -> receive {Ref, token_created} -> ok - after 1_000 -> + after 5_000 -> ct:fail( "should have confirmed token creation; msgs: ~0p", [process_info(self(), messages)]