fix(kafka_producer): don't return `disconnected` when there are connection issues while starting the bridge

Fixes https://emqx.atlassian.net/browse/EMQX-11284
Fixex https://emqx.atlassian.net/browse/EMQX-11298

We don't enforce the connection to be up when starting/creating the bridge, otherwise the
status will be `disconnected` for a possibly transient reason such as network issues or
Kafka broker restart.

Same applies for Azure Event Hub Producer bridge, as they share the same module.
This commit is contained in:
Thales Macedo Garitezi 2023-11-03 09:47:22 -03:00
parent 1b56216104
commit 4265ef66cc
2 changed files with 7 additions and 25 deletions

View File

@ -194,18 +194,11 @@ on_stop(InstanceId, _State) ->
ensure_client(ClientId, Hosts, ClientConfig) -> ensure_client(ClientId, Hosts, ClientConfig) ->
case wolff_client_sup:find_client(ClientId) of case wolff_client_sup:find_client(ClientId) of
{ok, Pid} -> {ok, _Pid} ->
case wolff_client:check_connectivity(Pid) of ok;
ok ->
ok;
{error, Error} ->
deallocate_client(ClientId),
throw({failed_to_connect, Error})
end;
{error, no_such_client} -> {error, no_such_client} ->
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
{ok, _} -> {ok, _} ->
ok = ensure_connectivity(ClientId),
?SLOG(info, #{ ?SLOG(info, #{
msg => "kafka_client_started", msg => "kafka_client_started",
client_id => ClientId, client_id => ClientId,
@ -225,21 +218,6 @@ ensure_client(ClientId, Hosts, ClientConfig) ->
throw({failed_to_find_created_client, Reason}) throw({failed_to_find_created_client, Reason})
end. end.
ensure_connectivity(ClientId) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
case wolff_client:check_connectivity(Pid) of
ok ->
ok;
{error, Error} ->
deallocate_client(ClientId),
throw({failed_to_connect, Error})
end;
{error, Reason} ->
deallocate_client(ClientId),
throw({failed_to_find_created_client, Reason})
end.
deallocate_client(ClientId) -> deallocate_client(ClientId) ->
_ = with_log_at_error( _ = with_log_at_error(
fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,

View File

@ -474,7 +474,11 @@ t_failed_creation_then_fix(Config) ->
%% before throwing, it should cleanup the client process. we %% before throwing, it should cleanup the client process. we
%% retry because the supervisor might need some time to really %% retry because the supervisor might need some time to really
%% remove it from its tree. %% remove it from its tree.
?retry(50, 10, ?assertEqual([], supervisor:which_children(wolff_client_sup))), ?retry(
_Sleep0 = 50,
_Attempts0 = 10,
?assertEqual([], supervisor:which_children(wolff_producers_sup))
),
%% must succeed with correct config %% must succeed with correct config
{ok, #{config := _ValidConfigAtom1}} = emqx_bridge:create( {ok, #{config := _ValidConfigAtom1}} = emqx_bridge:create(
list_to_atom(Type), list_to_atom(Name), ValidConf list_to_atom(Type), list_to_atom(Name), ValidConf