From 4265ef66cc37df29b8938a80d06ea402043b2500 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 3 Nov 2023 09:47:22 -0300 Subject: [PATCH] fix(kafka_producer): don't return `disconnected` when there are connection issues while starting the bridge Fixes https://emqx.atlassian.net/browse/EMQX-11284 Fixex https://emqx.atlassian.net/browse/EMQX-11298 We don't enforce the connection to be up when starting/creating the bridge, otherwise the status will be `disconnected` for a possibly transient reason such as network issues or Kafka broker restart. Same applies for Azure Event Hub Producer bridge, as they share the same module. --- .../src/emqx_bridge_kafka_impl_producer.erl | 26 ++----------------- .../emqx_bridge_kafka_impl_producer_SUITE.erl | 6 ++++- 2 files changed, 7 insertions(+), 25 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 2c6e6bc1c..42585c3e3 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -194,18 +194,11 @@ on_stop(InstanceId, _State) -> ensure_client(ClientId, Hosts, ClientConfig) -> case wolff_client_sup:find_client(ClientId) of - {ok, Pid} -> - case wolff_client:check_connectivity(Pid) of - ok -> - ok; - {error, Error} -> - deallocate_client(ClientId), - throw({failed_to_connect, Error}) - end; + {ok, _Pid} -> + ok; {error, no_such_client} -> case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of {ok, _} -> - ok = ensure_connectivity(ClientId), ?SLOG(info, #{ msg => "kafka_client_started", client_id => ClientId, @@ -225,21 +218,6 @@ ensure_client(ClientId, Hosts, ClientConfig) -> throw({failed_to_find_created_client, Reason}) end. -ensure_connectivity(ClientId) -> - case wolff_client_sup:find_client(ClientId) of - {ok, Pid} -> - case wolff_client:check_connectivity(Pid) of - ok -> - ok; - {error, Error} -> - deallocate_client(ClientId), - throw({failed_to_connect, Error}) - end; - {error, Reason} -> - deallocate_client(ClientId), - throw({failed_to_find_created_client, Reason}) - end. - deallocate_client(ClientId) -> _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 9a28ed26a..9915582ac 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -474,7 +474,11 @@ t_failed_creation_then_fix(Config) -> %% before throwing, it should cleanup the client process. we %% retry because the supervisor might need some time to really %% remove it from its tree. - ?retry(50, 10, ?assertEqual([], supervisor:which_children(wolff_client_sup))), + ?retry( + _Sleep0 = 50, + _Attempts0 = 10, + ?assertEqual([], supervisor:which_children(wolff_producers_sup)) + ), %% must succeed with correct config {ok, #{config := _ValidConfigAtom1}} = emqx_bridge:create( list_to_atom(Type), list_to_atom(Name), ValidConf