diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 2c6e6bc1c..42585c3e3 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -194,18 +194,11 @@ on_stop(InstanceId, _State) -> ensure_client(ClientId, Hosts, ClientConfig) -> case wolff_client_sup:find_client(ClientId) of - {ok, Pid} -> - case wolff_client:check_connectivity(Pid) of - ok -> - ok; - {error, Error} -> - deallocate_client(ClientId), - throw({failed_to_connect, Error}) - end; + {ok, _Pid} -> + ok; {error, no_such_client} -> case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of {ok, _} -> - ok = ensure_connectivity(ClientId), ?SLOG(info, #{ msg => "kafka_client_started", client_id => ClientId, @@ -225,21 +218,6 @@ ensure_client(ClientId, Hosts, ClientConfig) -> throw({failed_to_find_created_client, Reason}) end. -ensure_connectivity(ClientId) -> - case wolff_client_sup:find_client(ClientId) of - {ok, Pid} -> - case wolff_client:check_connectivity(Pid) of - ok -> - ok; - {error, Error} -> - deallocate_client(ClientId), - throw({failed_to_connect, Error}) - end; - {error, Reason} -> - deallocate_client(ClientId), - throw({failed_to_find_created_client, Reason}) - end. - deallocate_client(ClientId) -> _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 9a28ed26a..9915582ac 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -474,7 +474,11 @@ t_failed_creation_then_fix(Config) -> %% before throwing, it should cleanup the client process. we %% retry because the supervisor might need some time to really %% remove it from its tree. - ?retry(50, 10, ?assertEqual([], supervisor:which_children(wolff_client_sup))), + ?retry( + _Sleep0 = 50, + _Attempts0 = 10, + ?assertEqual([], supervisor:which_children(wolff_producers_sup)) + ), %% must succeed with correct config {ok, #{config := _ValidConfigAtom1}} = emqx_bridge:create( list_to_atom(Type), list_to_atom(Name), ValidConf