fix(kafka_consumer): return better error messages when probing kafka consumer bridge

Fixes https://emqx.atlassian.net/browse/EMQX-9422
This commit is contained in:
Thales Macedo Garitezi 2023-03-30 14:44:04 -03:00
parent 632bffd451
commit 5011486b18
4 changed files with 49 additions and 21 deletions

View File

@ -359,10 +359,10 @@ call_start(MgrId, Mod, Config) ->
try try
Mod:on_start(MgrId, Config) Mod:on_start(MgrId, Config)
catch catch
throw:{error, Error} -> throw:Error ->
{error, Error}; {error, Error};
Kind:Error:Stacktrace -> Kind:Error:Stacktrace ->
{error, {Kind, Error, Stacktrace}} {error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}}
end. end.
-spec call_health_check(manager_id(), module(), resource_state()) -> -spec call_health_check(manager_id(), module(), resource_state()) ->

View File

@ -95,6 +95,11 @@
commit_fun => brod_group_subscriber_v2:commit_fun() commit_fun => brod_group_subscriber_v2:commit_fun()
}. }.
-define(CLIENT_DOWN_MESSAGE,
"Failed to start Kafka client. Please check the logs for errors and check"
" the connection parameters."
).
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
@ -152,11 +157,7 @@ on_start(InstanceId, Config) ->
kafka_hosts => BootstrapHosts, kafka_hosts => BootstrapHosts,
reason => emqx_misc:redact(Reason) reason => emqx_misc:redact(Reason)
}), }),
throw( throw(?CLIENT_DOWN_MESSAGE)
{error,
"Failed to start Kafka client. Please check the logs for errors and check"
" the connection parameters"}
)
end, end,
start_consumer(Config, InstanceId, ClientID). start_consumer(Config, InstanceId, ClientID).
@ -177,7 +178,7 @@ on_get_status(_InstanceID, State) ->
kafka_client_id := ClientID, kafka_client_id := ClientID,
kafka_topics := KafkaTopics kafka_topics := KafkaTopics
} = State, } = State,
do_get_status(ClientID, KafkaTopics, SubscriberId). do_get_status(State, ClientID, KafkaTopics, SubscriberId).
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `brod_group_subscriber' API %% `brod_group_subscriber' API
@ -374,22 +375,41 @@ stop_client(ClientID) ->
), ),
ok. ok.
do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
case brod:get_partitions_count(ClientID, KafkaTopic) of case brod:get_partitions_count(ClientID, KafkaTopic) of
{ok, NPartitions} -> {ok, NPartitions} ->
case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of
connected -> do_get_status(ClientID, RestTopics, SubscriberId); connected -> do_get_status(State, ClientID, RestTopics, SubscriberId);
disconnected -> disconnected disconnected -> disconnected
end; end;
{error, {client_down, Context}} ->
case infer_client_error(Context) of
auth_error ->
Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
{auth_error, Message0} ->
Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
connection_refused ->
Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
_ ->
{disconnected, State, ?CLIENT_DOWN_MESSAGE}
end;
{error, leader_not_available} ->
Message =
"Leader connection not available. Please check the Kafka topic used,"
" the connection parameters and Kafka cluster health",
{disconnected, State, Message};
_ -> _ ->
disconnected disconnected
end; end;
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) ->
connected. connected.
-spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> -spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
connected | disconnected. connected | disconnected.
do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
Results = Results =
lists:map( lists:map(
fun(N) -> fun(N) ->
@ -508,3 +528,15 @@ encode(Value, base64) ->
to_bin(B) when is_binary(B) -> B; to_bin(B) when is_binary(B) -> B;
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
infer_client_error(Error) ->
case Error of
[{_BrokerEndpoint, {econnrefused, _}} | _] ->
connection_refused;
[{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) ->
{auth_error, Message};
[{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] ->
auth_error;
_ ->
undefined
end.

View File

@ -115,9 +115,8 @@ on_start(InstId, Config) ->
} }
), ),
throw( throw(
{error,
"Failed to start Kafka client. Please check the logs for errors and check" "Failed to start Kafka client. Please check the logs for errors and check"
" the connection parameters"} " the connection parameters."
) )
end. end.

View File

@ -416,10 +416,7 @@ t_failed_creation_then_fix(Config) ->
Type, erlang:list_to_atom(Name), WrongConf Type, erlang:list_to_atom(Name), WrongConf
), ),
WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name},
?assertThrow( ?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)),
{error, _},
?PRODUCER:on_start(ResourceId, WrongConfigAtom)
),
%% before throwing, it should cleanup the client process. we %% before throwing, it should cleanup the client process. we
%% retry because the supervisor might need some time to really %% retry because the supervisor might need some time to really
%% remove it from its tree. %% remove it from its tree.