From 841f816e3901030252ef3727e318d9a079856028 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 7 Jul 2022 17:32:15 +0800 Subject: [PATCH] refactor: improve the health_check for webhook bridges --- .../emqx_authn_jwks_connector.erl | 14 +++-- .../src/emqx_connector_http.erl | 45 +++++++++----- .../src/emqx_connector_mongo.erl | 59 ++++++------------- .../src/emqx_connector_mysql.erl | 20 +++---- .../src/emqx_connector_pgsql.erl | 10 +++- .../src/emqx_connector_redis.erl | 5 +- .../src/emqx_plugin_libs_pool.erl | 39 ++++++------ mix.exs | 2 +- rebar.config | 2 +- 9 files changed, 97 insertions(+), 99 deletions(-) diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl index de3da25cc..add797f2e 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl @@ -71,15 +71,17 @@ on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) -> ok. on_get_status(_InstId, #{pool_name := PoolName}) -> - emqx_plugin_libs_pool:get_status( - PoolName, - fun(Pid) -> - case emqx_authn_jwks_client:get_jwks(Pid) of + Func = + fun(Conn) -> + case emqx_authn_jwks_client:get_jwks(Conn) of {ok, _} -> true; _ -> false end - end - ). + end, + case emqx_plugin_libs_pool:health_check_ecpool_workers(PoolName, Func) of + true -> connecting; + false -> disconnected + end. connect(Opts) -> ConnectorOpts = proplists:get_value(connector_opts, Opts), diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 128e5b810..f9e63dc57 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -309,27 +309,42 @@ on_query( end, Result. -on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) -> - case do_get_status(Host, Port, Timeout) of - ok -> +on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) -> + case do_get_status(PoolName, Timeout) of + true -> connected; - {error, Reason} -> + false -> ?SLOG(error, #{ msg => "http_connector_get_status_failed", - reason => Reason, - host => Host, - port => Port + state => State }), - {disconnected, State, Reason} + disconnected end. -do_get_status(Host, Port, Timeout) -> - case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of - {ok, Sock} -> - gen_tcp:close(Sock), - ok; - {error, Reason} -> - {error, Reason} +do_get_status(PoolName, Timeout) -> + Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)], + DoPerWorker = + fun(Worker) -> + case ehttpc:health_check(Worker, Timeout) of + ok -> + true; + {error, Reason} -> + ?SLOG(error, #{ + msg => "ehttpc_health_check_failed", + reason => Reason, + worker => Worker + }), + false + end + end, + try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of + [_ | _] = Status -> + lists:all(fun(St) -> St =:= true end, Status); + [] -> + false + catch + exit:timeout -> + false end. %%-------------------------------------------------------------------- diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 4396db933..5b07c5003 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -240,52 +240,29 @@ on_get_status(InstId, #{poolname := PoolName} = _State) -> end. health_check(PoolName) -> - Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], - try - emqx_misc:pmap( - fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1) - ) - of - [_ | _] = Status -> - lists:all(fun(St) -> St =:= true end, Status); - [] -> - false - catch - exit:timeout -> - false - end. + emqx_plugin_libs_pool:health_check_ecpool_workers( + PoolName, fun ?MODULE:check_worker_health/1, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1) + ). %% =================================================================== -check_worker_health(Worker) -> - case ecpool_worker:client(Worker) of - {ok, Conn} -> - %% we don't care if this returns something or not, we just to test the connection - try do_test_query(Conn) of - {error, Reason} -> - ?SLOG(warning, #{ - msg => "mongo_connection_get_status_error", - worker => Worker, - reason => Reason - }), - false; - _ -> - true - catch - Class:Error -> - ?SLOG(warning, #{ - msg => "mongo_connection_get_status_exception", - worker => Worker, - class => Class, - error => Error - }), - false - end; - _ -> +check_worker_health(Conn) -> + %% we don't care if this returns something or not, we just to test the connection + try do_test_query(Conn) of + {error, Reason} -> ?SLOG(warning, #{ msg => "mongo_connection_get_status_error", - worker => Worker, - reason => worker_not_found + reason => Reason + }), + false; + _ -> + true + catch + Class:Error -> + ?SLOG(warning, #{ + msg => "mongo_connection_get_status_exception", + class => Class, + error => Error }), false end. diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 802fdec5f..d6963d04e 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -169,9 +169,9 @@ on_query( mysql_function(sql) -> query; mysql_function(prepared_query) -> execute. -on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = State) -> - case emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn) of - connected -> +on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) -> + case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of + true -> case do_check_prepares(State) of ok -> connected; @@ -180,15 +180,10 @@ on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = S {connected, NState}; {error, _Reason} -> %% do not log error, it is logged in prepare_sql_to_conn - case AutoReconn of - true -> - connecting; - false -> - disconnected - end + conn_status(AutoReconn) end; - ConnectStatus -> - ConnectStatus + false -> + conn_status(AutoReconn) end. do_get_status(Conn) -> @@ -207,6 +202,9 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P end. %% =================================================================== +conn_status(_AutoReconn = true) -> connecting; +conn_status(_AutoReconn = false) -> disconnected. + reconn_interval(true) -> 15; reconn_interval(false) -> false. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 448cbb209..6f89e7ff1 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -139,13 +139,19 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} end, Result. -on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> - emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). +on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) -> + case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of + true -> connected; + false -> conn_status(AutoReconn) + end. do_get_status(Conn) -> ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). %% =================================================================== +conn_status(_AutoReconn = true) -> connecting; +conn_status(_AutoReconn = false) -> disconnected. + reconn_interval(true) -> 15; reconn_interval(false) -> false. diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 0de7fc312..f70093c4e 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -225,8 +225,9 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect : false -> disconnect end; -on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> - emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). +on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) -> + Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), + status_result(Health, AutoReconn). do_get_status(Conn) -> case eredis:q(Conn, ["PING"]) of diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index 4358999a5..cce45fa4a 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -20,12 +20,14 @@ start_pool/3, stop_pool/1, pool_name/1, - get_status/2, - get_status/3 + health_check_ecpool_workers/2, + health_check_ecpool_workers/3 ]). -include_lib("emqx/include/logger.hrl"). +-define(HEALTH_CHECK_TIMEOUT, 15000). + pool_name(ID) when is_binary(ID) -> list_to_atom(binary_to_list(ID)). @@ -61,29 +63,26 @@ stop_pool(Name) -> error({stop_pool_failed, Name, Reason}) end. -get_status(PoolName, CheckFunc) -> - get_status(PoolName, CheckFunc, false). +health_check_ecpool_workers(PoolName, CheckFunc) -> + health_check_ecpool_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT). -get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) -> - Status = [ - begin +health_check_ecpool_workers(PoolName, CheckFunc, Timeout) when is_function(CheckFunc) -> + Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], + DoPerWorker = + fun(Worker) -> case ecpool_worker:client(Worker) of {ok, Conn} -> erlang:is_process_alive(Conn) andalso CheckFunc(Conn); _ -> false end - end - || {_WorkerName, Worker} <- ecpool:workers(PoolName) - ], - case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of - true -> - connected; - false -> - case AutoReconn of - true -> - connecting; - false -> - disconnect - end + end, + try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of + [_ | _] = Status -> + lists:all(fun(St) -> St =:= true end, Status); + [] -> + false + catch + exit:timeout -> + false end. diff --git a/mix.exs b/mix.exs index bf5ce811a..0636b7a07 100644 --- a/mix.exs +++ b/mix.exs @@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do {:lc, github: "emqx/lc", tag: "0.3.1"}, {:redbug, "2.0.7"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.2.0"}, + {:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"}, {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, diff --git a/rebar.config b/rebar.config index eb96dc2ef..d7d168f7f 100644 --- a/rebar.config +++ b/rebar.config @@ -49,7 +49,7 @@ , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} , {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}} - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}