refactor: improve the health_check for webhook bridges

This commit is contained in:
Shawn 2022-07-07 17:32:15 +08:00
parent 71f642518a
commit 841f816e39
9 changed files with 97 additions and 99 deletions

View File

@ -71,15 +71,17 @@ on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) ->
ok. ok.
on_get_status(_InstId, #{pool_name := PoolName}) -> on_get_status(_InstId, #{pool_name := PoolName}) ->
emqx_plugin_libs_pool:get_status( Func =
PoolName, fun(Conn) ->
fun(Pid) -> case emqx_authn_jwks_client:get_jwks(Conn) of
case emqx_authn_jwks_client:get_jwks(Pid) of
{ok, _} -> true; {ok, _} -> true;
_ -> false _ -> false
end end
end end,
). case emqx_plugin_libs_pool:health_check_ecpool_workers(PoolName, Func) of
true -> connecting;
false -> disconnected
end.
connect(Opts) -> connect(Opts) ->
ConnectorOpts = proplists:get_value(connector_opts, Opts), ConnectorOpts = proplists:get_value(connector_opts, Opts),

View File

@ -309,27 +309,42 @@ on_query(
end, end,
Result. Result.
on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
case do_get_status(Host, Port, Timeout) of case do_get_status(PoolName, Timeout) of
ok -> true ->
connected; connected;
{error, Reason} -> false ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "http_connector_get_status_failed", msg => "http_connector_get_status_failed",
reason => Reason, state => State
host => Host,
port => Port
}), }),
{disconnected, State, Reason} disconnected
end. end.
do_get_status(Host, Port, Timeout) -> do_get_status(PoolName, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
{ok, Sock} -> DoPerWorker =
gen_tcp:close(Sock), fun(Worker) ->
ok; case ehttpc:health_check(Worker, Timeout) of
{error, Reason} -> ok ->
{error, Reason} true;
{error, Reason} ->
?SLOG(error, #{
msg => "ehttpc_health_check_failed",
reason => Reason,
worker => Worker
}),
false
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->
false
catch
exit:timeout ->
false
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -240,52 +240,29 @@ on_get_status(InstId, #{poolname := PoolName} = _State) ->
end. end.
health_check(PoolName) -> health_check(PoolName) ->
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], emqx_plugin_libs_pool:health_check_ecpool_workers(
try PoolName, fun ?MODULE:check_worker_health/1, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
emqx_misc:pmap( ).
fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
)
of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->
false
catch
exit:timeout ->
false
end.
%% =================================================================== %% ===================================================================
check_worker_health(Worker) -> check_worker_health(Conn) ->
case ecpool_worker:client(Worker) of %% we don't care if this returns something or not, we just to test the connection
{ok, Conn} -> try do_test_query(Conn) of
%% we don't care if this returns something or not, we just to test the connection {error, Reason} ->
try do_test_query(Conn) of
{error, Reason} ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_error",
worker => Worker,
reason => Reason
}),
false;
_ ->
true
catch
Class:Error ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_exception",
worker => Worker,
class => Class,
error => Error
}),
false
end;
_ ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "mongo_connection_get_status_error", msg => "mongo_connection_get_status_error",
worker => Worker, reason => Reason
reason => worker_not_found }),
false;
_ ->
true
catch
Class:Error ->
?SLOG(warning, #{
msg => "mongo_connection_get_status_exception",
class => Class,
error => Error
}), }),
false false
end. end.

View File

@ -169,9 +169,9 @@ on_query(
mysql_function(sql) -> query; mysql_function(sql) -> query;
mysql_function(prepared_query) -> execute. mysql_function(prepared_query) -> execute.
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = State) -> on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
case emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn) of case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
connected -> true ->
case do_check_prepares(State) of case do_check_prepares(State) of
ok -> ok ->
connected; connected;
@ -180,15 +180,10 @@ on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = S
{connected, NState}; {connected, NState};
{error, _Reason} -> {error, _Reason} ->
%% do not log error, it is logged in prepare_sql_to_conn %% do not log error, it is logged in prepare_sql_to_conn
case AutoReconn of conn_status(AutoReconn)
true ->
connecting;
false ->
disconnected
end
end; end;
ConnectStatus -> false ->
ConnectStatus conn_status(AutoReconn)
end. end.
do_get_status(Conn) -> do_get_status(Conn) ->
@ -207,6 +202,9 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P
end. end.
%% =================================================================== %% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15; reconn_interval(true) -> 15;
reconn_interval(false) -> false. reconn_interval(false) -> false.

View File

@ -139,13 +139,19 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName}
end, end,
Result. Result.
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
true -> connected;
false -> conn_status(AutoReconn)
end.
do_get_status(Conn) -> do_get_status(Conn) ->
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
%% =================================================================== %% ===================================================================
conn_status(_AutoReconn = true) -> connecting;
conn_status(_AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15; reconn_interval(true) -> 15;
reconn_interval(false) -> false. reconn_interval(false) -> false.

View File

@ -225,8 +225,9 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect :
false -> false ->
disconnect disconnect
end; end;
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
status_result(Health, AutoReconn).
do_get_status(Conn) -> do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of case eredis:q(Conn, ["PING"]) of

View File

@ -20,12 +20,14 @@
start_pool/3, start_pool/3,
stop_pool/1, stop_pool/1,
pool_name/1, pool_name/1,
get_status/2, health_check_ecpool_workers/2,
get_status/3 health_check_ecpool_workers/3
]). ]).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-define(HEALTH_CHECK_TIMEOUT, 15000).
pool_name(ID) when is_binary(ID) -> pool_name(ID) when is_binary(ID) ->
list_to_atom(binary_to_list(ID)). list_to_atom(binary_to_list(ID)).
@ -61,29 +63,26 @@ stop_pool(Name) ->
error({stop_pool_failed, Name, Reason}) error({stop_pool_failed, Name, Reason})
end. end.
get_status(PoolName, CheckFunc) -> health_check_ecpool_workers(PoolName, CheckFunc) ->
get_status(PoolName, CheckFunc, false). health_check_ecpool_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT).
get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) -> health_check_ecpool_workers(PoolName, CheckFunc, Timeout) when is_function(CheckFunc) ->
Status = [ Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
begin DoPerWorker =
fun(Worker) ->
case ecpool_worker:client(Worker) of case ecpool_worker:client(Worker) of
{ok, Conn} -> {ok, Conn} ->
erlang:is_process_alive(Conn) andalso CheckFunc(Conn); erlang:is_process_alive(Conn) andalso CheckFunc(Conn);
_ -> _ ->
false false
end end
end end,
|| {_WorkerName, Worker} <- ecpool:workers(PoolName) try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
], [_ | _] = Status ->
case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of lists:all(fun(St) -> St =:= true end, Status);
true -> [] ->
connected; false
false -> catch
case AutoReconn of exit:timeout ->
true -> false
connecting;
false ->
disconnect
end
end. end.

View File

@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do
{:lc, github: "emqx/lc", tag: "0.3.1"}, {:lc, github: "emqx/lc", tag: "0.3.1"},
{:redbug, "2.0.7"}, {:redbug, "2.0.7"},
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.2.0"}, {:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"},
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},

View File

@ -49,7 +49,7 @@
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}