Merge pull request #11875 from thalesmg/fix-kafka-connecting-r53-20231103
fix(kafka_producer): don't return `disconnected` when there are connections issues while starting the bridge
This commit is contained in:
commit
0ff4465c78
|
@ -194,18 +194,11 @@ on_stop(InstanceId, _State) ->
|
||||||
|
|
||||||
ensure_client(ClientId, Hosts, ClientConfig) ->
|
ensure_client(ClientId, Hosts, ClientConfig) ->
|
||||||
case wolff_client_sup:find_client(ClientId) of
|
case wolff_client_sup:find_client(ClientId) of
|
||||||
{ok, Pid} ->
|
{ok, _Pid} ->
|
||||||
case wolff_client:check_connectivity(Pid) of
|
ok;
|
||||||
ok ->
|
|
||||||
ok;
|
|
||||||
{error, Error} ->
|
|
||||||
deallocate_client(ClientId),
|
|
||||||
throw({failed_to_connect, Error})
|
|
||||||
end;
|
|
||||||
{error, no_such_client} ->
|
{error, no_such_client} ->
|
||||||
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
|
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
ok = ensure_connectivity(ClientId),
|
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "kafka_client_started",
|
msg => "kafka_client_started",
|
||||||
client_id => ClientId,
|
client_id => ClientId,
|
||||||
|
@ -225,21 +218,6 @@ ensure_client(ClientId, Hosts, ClientConfig) ->
|
||||||
throw({failed_to_find_created_client, Reason})
|
throw({failed_to_find_created_client, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ensure_connectivity(ClientId) ->
|
|
||||||
case wolff_client_sup:find_client(ClientId) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
case wolff_client:check_connectivity(Pid) of
|
|
||||||
ok ->
|
|
||||||
ok;
|
|
||||||
{error, Error} ->
|
|
||||||
deallocate_client(ClientId),
|
|
||||||
throw({failed_to_connect, Error})
|
|
||||||
end;
|
|
||||||
{error, Reason} ->
|
|
||||||
deallocate_client(ClientId),
|
|
||||||
throw({failed_to_find_created_client, Reason})
|
|
||||||
end.
|
|
||||||
|
|
||||||
deallocate_client(ClientId) ->
|
deallocate_client(ClientId) ->
|
||||||
_ = with_log_at_error(
|
_ = with_log_at_error(
|
||||||
fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
|
fun() -> wolff:stop_and_delete_supervised_client(ClientId) end,
|
||||||
|
|
|
@ -474,7 +474,11 @@ t_failed_creation_then_fix(Config) ->
|
||||||
%% before throwing, it should cleanup the client process. we
|
%% before throwing, it should cleanup the client process. we
|
||||||
%% retry because the supervisor might need some time to really
|
%% retry because the supervisor might need some time to really
|
||||||
%% remove it from its tree.
|
%% remove it from its tree.
|
||||||
?retry(50, 10, ?assertEqual([], supervisor:which_children(wolff_client_sup))),
|
?retry(
|
||||||
|
_Sleep0 = 50,
|
||||||
|
_Attempts0 = 10,
|
||||||
|
?assertEqual([], supervisor:which_children(wolff_producers_sup))
|
||||||
|
),
|
||||||
%% must succeed with correct config
|
%% must succeed with correct config
|
||||||
{ok, #{config := _ValidConfigAtom1}} = emqx_bridge:create(
|
{ok, #{config := _ValidConfigAtom1}} = emqx_bridge:create(
|
||||||
list_to_atom(Type), list_to_atom(Name), ValidConf
|
list_to_atom(Type), list_to_atom(Name), ValidConf
|
||||||
|
|
Loading…
Reference in New Issue