Merge pull request #9728 from sstrigler/EMQX-8625-add-bridges-probe-api-for-kafka-and-my-sql

EMQX-8625: add bridges probe api for kafka and mysql
This commit is contained in:
Stefan Strigler 2023-01-13 09:58:37 +01:00 committed by GitHub
commit a725cdd815
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 27 deletions

View File

@ -384,14 +384,10 @@ on_query_async(
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(PoolName, Timeout) of
true ->
connected;
false ->
?SLOG(error, #{
msg => "http_connector_get_status_failed",
state => State
}),
disconnected
ok ->
{connected, State};
{error, Reason} ->
{disconnected, State, Reason}
end.
do_get_status(PoolName, Timeout) ->
@ -400,24 +396,28 @@ do_get_status(PoolName, Timeout) ->
fun(Worker) ->
case ehttpc:health_check(Worker, Timeout) of
ok ->
true;
{error, Reason} ->
ok;
{error, Reason} = Error ->
?SLOG(error, #{
msg => "ehttpc_health_check_failed",
msg => "http_connector_get_status_failed",
reason => Reason,
worker => Worker
}),
false
Error
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
% we crash in case of non-empty lists since we don't know what to do in that case
[_ | _] = Results ->
case [E || {error, _} = E <- Results] of
[] ->
false
ok;
Errors ->
hd(Errors)
end
catch
exit:timeout ->
false
{error, timeout}
end.
%%--------------------------------------------------------------------

View File

@ -66,10 +66,21 @@ roots() ->
fields(config) ->
[{server, server()}] ++
emqx_connector_schema_lib:relational_db_fields() ++
add_default_username(emqx_connector_schema_lib:relational_db_fields(), []) ++
emqx_connector_schema_lib:ssl_fields() ++
emqx_connector_schema_lib:prepare_statement_fields().
add_default_username([{username, OrigUsernameFn} | Tail], Head) ->
Head ++ [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Tail];
add_default_username([Field | Tail], Head) ->
add_default_username(Tail, Head ++ [Field]).
add_default_fn(OrigFn, Default) ->
fun
(default) -> Default;
(Field) -> OrigFn(Field)
end.
server() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS).
@ -83,8 +94,7 @@ on_start(
#{
server := Server,
database := DB,
username := User,
password := Password,
username := Username,
auto_reconnect := AutoReconn,
pool_size := PoolSize,
ssl := SSL
@ -104,13 +114,15 @@ on_start(
[]
end,
Options = [
maybe_password_opt(maps:get(password, Config, undefined))
| [
{host, Host},
{port, Port},
{user, User},
{password, Password},
{user, Username},
{database, DB},
{auto_reconnect, reconn_interval(AutoReconn)},
{pool_size, PoolSize}
]
],
PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = parse_prepare_sql(Config),
@ -126,6 +138,11 @@ on_start(
{error, Reason}
end.
maybe_password_opt(undefined) ->
[];
maybe_password_opt(Password) ->
{password, Password}.
on_stop(InstId, #{poolname := PoolName}) ->
?SLOG(info, #{
msg => "stopping_mysql_connector",