diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl index 86ad02c52..de3da25cc 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl @@ -25,7 +25,7 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2, + on_get_status/2, connect/1 ]). @@ -70,16 +70,15 @@ on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) -> emqx_resource:query_success(AfterQuery), ok. -on_health_check(_InstId, State = #{pool_name := PoolName}) -> - emqx_plugin_libs_pool:health_check( +on_get_status(_InstId, #{pool_name := PoolName}) -> + emqx_plugin_libs_pool:get_status( PoolName, fun(Pid) -> case emqx_authn_jwks_client:get_jwks(Pid) of {ok, _} -> true; _ -> false end - end, - State + end ). connect(Opts) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 38ddd2c23..956cdb840 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -29,7 +29,7 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2 + on_get_status/2 ]). -type url() :: emqx_http_lib:uri_map(). @@ -306,13 +306,21 @@ on_query( end, Result. -on_health_check(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) -> - case do_health_check(Host, Port, Timeout) of - ok -> {ok, State}; - {error, Reason} -> {error, {http_health_check_failed, Reason}, State} +on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) -> + case do_get_status(Host, Port, Timeout) of + ok -> + connected; + {error, Reason} -> + ?SLOG(error, #{ + msg => "http_connector_get_status_failed", + reason => Reason, + host => Host, + port => Port + }), + disconnected end. -do_health_check(Host, Port, Timeout) -> +do_get_status(Host, Port, Timeout) -> case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of {ok, Sock} -> gen_tcp:close(Sock), diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 25798fae5..195aa89a9 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -28,11 +28,9 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2 + on_get_status/2 ]). --export([do_health_check/1]). - -export([connect/1]). -export([search/4]). @@ -90,7 +88,7 @@ on_start( ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of - ok -> {ok, #{poolname => PoolName}}; + ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; {error, Reason} -> {error, Reason} end. @@ -128,11 +126,7 @@ on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := P end, Result. -on_health_check(_InstId, #{poolname := PoolName} = State) -> - emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). - -do_health_check(_Conn) -> - {ok, true}. +on_get_status(_InstId, _State) -> connected. reconn_interval(true) -> 15; reconn_interval(false) -> false. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 170fa096b..f2ecab8b1 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -28,7 +28,7 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2 + on_get_status/2 ]). %% ecpool callback @@ -222,21 +222,21 @@ on_query( Result end. --dialyzer({nowarn_function, [on_health_check/2]}). -on_health_check(InstId, #{poolname := PoolName} = State) -> +-dialyzer({nowarn_function, [on_get_status/2]}). +on_get_status(InstId, #{poolname := PoolName} = _State) -> case health_check(PoolName) of true -> ?tp(debug, emqx_connector_mongo_health_check, #{ instance_id => InstId, status => ok }), - {ok, State}; + connected; false -> ?tp(warning, emqx_connector_mongo_health_check, #{ instance_id => InstId, status => failed }), - {error, health_check_failed, State} + disconnected end. health_check(PoolName) -> @@ -253,7 +253,7 @@ check_worker_health(Worker) -> try do_test_query(Conn) of {error, Reason} -> ?SLOG(warning, #{ - msg => "mongo_connection_health_check_error", + msg => "mongo_connection_get_status_error", worker => Worker, reason => Reason }), @@ -263,7 +263,7 @@ check_worker_health(Worker) -> catch Class:Error -> ?SLOG(warning, #{ - msg => "mongo_connection_health_check_exception", + msg => "mongo_connection_get_status_exception", worker => Worker, class => Class, error => Error @@ -272,7 +272,7 @@ check_worker_health(Worker) -> end; _ -> ?SLOG(warning, #{ - msg => "mongo_connection_health_check_error", + msg => "mongo_connection_get_status_error", worker => Worker, reason => worker_not_found }), diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 5f228027f..6182881e2 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -38,7 +38,7 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2 + on_get_status/2 ]). -behaviour(hocon_schema). @@ -188,10 +188,10 @@ on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_resource:query_success(AfterQuery). -on_health_check(_InstId, #{name := InstanceId} = State) -> - case emqx_connector_mqtt_worker:ping(InstanceId) of - pong -> {ok, State}; - _ -> {error, {connector_down, InstanceId}, State} +on_get_status(_InstId, #{name := InstanceId}) -> + case emqx_connector_mqtt_worker:status(InstanceId) of + connected -> connected; + _ -> disconnected end. ensure_mqtt_worker_started(InstanceId) -> diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 5e42a2ee2..802fdec5f 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -27,7 +27,7 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2 + on_get_status/2 ]). %% ecpool connect & reconnect @@ -37,7 +37,7 @@ -export([roots/0, fields/1]). --export([do_health_check/1]). +-export([do_get_status/1]). -define(MYSQL_HOST_OPTIONS, #{ host_type => inet_addr, @@ -98,9 +98,9 @@ on_start( ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = maps:get(prepare_statement, Config, #{}), - State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}), + State = #{poolname => PoolName, prepare_statement => Prepares, auto_reconnect => AutoReconn}, case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, State}; + ok -> {ok, init_prepare(State)}; {error, Reason} -> {error, Reason} end. @@ -169,27 +169,34 @@ on_query( mysql_function(sql) -> query; mysql_function(prepared_query) -> execute. -on_health_check(_InstId, #{poolname := PoolName} = State) -> - case emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State) of - {ok, State} -> - case do_health_check_prepares(State) of +on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = State) -> + case emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn) of + connected -> + case do_check_prepares(State) of ok -> - {ok, State}; + connected; {ok, NState} -> - {ok, NState}; + %% return new state with prepared statements + {connected, NState}; {error, _Reason} -> - {error, health_check_failed, State} + %% do not log error, it is logged in prepare_sql_to_conn + case AutoReconn of + true -> + connecting; + false -> + disconnected + end end; - {error, health_check_failed, State} -> - {error, health_check_failed, State} + ConnectStatus -> + ConnectStatus end. -do_health_check(Conn) -> +do_get_status(Conn) -> ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)). -do_health_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> +do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> ok; -do_health_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) -> +do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) -> %% retry to prepare case prepare_sql(Prepares, PoolName) of ok -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 00ec31849..85d3d327e 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -30,7 +30,7 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2 + on_get_status/2 ]). -export([connect/1]). @@ -40,7 +40,7 @@ prepared_query/3 ]). --export([do_health_check/1]). +-export([do_get_status/1]). -define(PGSQL_HOST_OPTIONS, #{ host_type => inet_addr, @@ -105,7 +105,7 @@ on_start( ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, #{poolname => PoolName}}; + ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; {error, Reason} -> {error, Reason} end. @@ -139,10 +139,10 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} end, Result. -on_health_check(_InstId, #{poolname := PoolName} = State) -> - emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). +on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> + emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). -do_health_check(Conn) -> +do_get_status(Conn) -> ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). %% =================================================================== diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 189d5e8c2..b8350dfd1 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -29,10 +29,10 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2 + on_get_status/2 ]). --export([do_health_check/1]). +-export([do_get_status/1]). -export([connect/1]). @@ -146,18 +146,19 @@ on_start( [{ssl, false}] end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), + State = #{poolname => PoolName, type => Type, auto_reconnect => AutoReconn}, case Type of cluster -> case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of - {ok, _} -> {ok, #{poolname => PoolName, type => Type}}; - {ok, _, _} -> {ok, #{poolname => PoolName, type => Type}}; + {ok, _} -> {ok, State}; + {ok, _, _} -> {ok, State}; {error, Reason} -> {error, Reason} end; _ -> case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) of - ok -> {ok, #{poolname => PoolName, type => Type}}; + ok -> {ok, State}; {error, Reason} -> {error, Reason} end end. @@ -212,26 +213,28 @@ eredis_cluster_workers_exist_and_are_connected(Workers) -> Workers ). -on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) -> +on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect := AutoReconn}) -> case eredis_cluster:pool_exists(PoolName) of true -> Workers = extract_eredis_cluster_workers(PoolName), - case eredis_cluster_workers_exist_and_are_connected(Workers) of - true -> {ok, State}; - false -> {error, health_check_failed, State} - end; + Health = eredis_cluster_workers_exist_and_are_connected(Workers), + status_result(Health, AutoReconn); false -> - {error, health_check_failed, State} + disconnect end; -on_health_check(_InstId, #{poolname := PoolName} = State) -> - emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). +on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> + emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). -do_health_check(Conn) -> +do_get_status(Conn) -> case eredis:q(Conn, ["PING"]) of {ok, _} -> true; _ -> false end. +status_result(_Status = true, _AutoReconn) -> connected; +status_result(_Status = false, _AutoReconn = true) -> connecting; +status_result(_Status = false, _AutoReconn = false) -> disconnected. + reconn_interval(true) -> 15; reconn_interval(false) -> false. diff --git a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl index 33a0397de..e1b99942c 100644 --- a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl @@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl index c039da168..f419a08c4 100644 --- a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl @@ -102,7 +102,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl index b7044ea38..c3620c63c 100644 --- a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl @@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl index 64dd9e683..fad5c030b 100644 --- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -109,7 +109,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index edbd28242..e6a7ad366 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -20,7 +20,8 @@ start_pool/3, stop_pool/1, pool_name/1, - health_check/3 + get_status/2, + get_status/3 ]). -include_lib("emqx/include/logger.hrl"). @@ -60,7 +61,10 @@ stop_pool(Name) -> error({stop_pool_failed, Name, Reason}) end. -health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> +get_status(PoolName, CheckFunc) -> + get_status(PoolName, CheckFunc, false). + +get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) -> Status = [ begin case ecpool_worker:client(Worker) of @@ -71,6 +75,13 @@ health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> || {_WorkerName, Worker} <- ecpool:workers(PoolName) ], case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of - true -> {ok, State}; - false -> {error, health_check_failed, State} + true -> + connected; + false -> + case AutoReconn of + true -> + connecting; + false -> + disconnect + end end. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index f10f4b440..41cbc872e 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -20,12 +20,13 @@ -type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). +-type resource_connection_status() :: connected | disconnected | connecting. -type resource_data() :: #{ id := instance_id(), mod := module(), config := resource_config(), state := resource_state(), - status := connected | disconnected | connecting, + status := resource_connection_status(), metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 48615a6e3..12e02ce0a 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -110,7 +110,7 @@ -optional_callbacks([ on_query/4, - on_health_check/2 + on_get_status/2 ]). %% when calling emqx_resource:start/1 @@ -124,8 +124,9 @@ -callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term(). %% when calling emqx_resource:health_check/2 --callback on_health_check(instance_id(), resource_state()) -> - {ok, resource_state()} | {error, Reason :: term(), resource_state()}. +-callback on_get_status(instance_id(), resource_state()) -> + resource_connection_status() + | {resource_connection_status(), resource_state()}. -spec list_types() -> [module()]. list_types() -> @@ -314,11 +315,10 @@ call_start(InstId, Mod, Config) -> ?SAFE_CALL(Mod:on_start(InstId, Config)). -spec call_health_check(instance_id(), module(), resource_state()) -> - {ok, resource_state()} - | {error, Reason :: term()} - | {error, Reason :: term(), resource_state()}. + resource_connection_status() + | {resource_connection_status(), resource_state()}. call_health_check(InstId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_health_check(InstId, ResourceState)). + ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)). -spec call_stop(instance_id(), module(), resource_state()) -> term(). call_stop(InstId, Mod, ResourceState) -> diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 97014e413..04592159c 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -213,14 +213,18 @@ do_create_dry_run(ResourceType, Config) -> InstId = make_test_id(), case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> - case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of - {ok, _} -> - case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of - {error, _} = Error -> Error; - _ -> ok - end; - {error, Reason, _} -> - {error, Reason} + Health = + case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of + connected -> + ok; + {connected, _N} -> + ok; + ConnectStatus -> + {error, ConnectStatus} + end, + case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of + {error, _} = Error -> Error; + _ -> Health end; {error, Reason} -> {error, Reason} @@ -262,7 +266,7 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) -> state => undefined }, %% The `emqx_resource:call_start/3` need the instance exist beforehand - ets:insert(emqx_resource_instance, {InstId, Group, InitData}), + update_resource(InstId, Group, InitData), spawn(fun() -> start_and_check(InstId, Group, ResourceType, Config, Opts, InitData) end), @@ -273,10 +277,10 @@ start_and_check(InstId, Group, ResourceType, Config, Opts, Data) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> Data2 = Data#{state => ResourceState, status => connected}, - ets:insert(emqx_resource_instance, {InstId, Group, Data2}), + update_resource(InstId, Group, Data2), create_default_checker(InstId, Opts); {error, Reason} -> - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), + update_resource(InstId, Group, Data#{status => disconnected}), {error, Reason} end. @@ -295,7 +299,7 @@ do_stop(_Group, #{state := undefined}) -> do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), _ = emqx_resource_health_check:delete_checker(InstId), - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), + update_resource(InstId, Group, Data#{status => disconnected}), ok. do_health_check(InstId) when is_binary(InstId) -> @@ -303,35 +307,34 @@ do_health_check(InstId) when is_binary(InstId) -> do_health_check(_Group, #{state := undefined}) -> {error, resource_not_initialized}; -do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Data) -> - case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of - {ok, ResourceState1} -> - ets:insert( - emqx_resource_instance, - {InstId, Group, Data#{status => connected, state => ResourceState1}} - ), +do_health_check( + Group, + #{id := InstId, mod := Mod, state := ResourceState, status := OldStatus} = Data +) -> + case emqx_resource:call_health_check(InstId, Mod, ResourceState) of + {NewConnStatus, NewResourceState} -> + NData = Data#{status => NewConnStatus, state => NewResourceState}, + update_resource(InstId, Group, NData), + maybe_log_health_check_result(InstId, NewConnStatus); + NewConnStatus -> + NData = Data#{status => NewConnStatus}, + NewConnStatus /= OldStatus andalso update_resource(InstId, Group, NData), + maybe_log_health_check_result(InstId, NewConnStatus) + end. + +maybe_log_health_check_result(InstId, Result) -> + case Result of + connected -> ok; - {error, Reason} -> - logger:error("health check for ~p failed: ~p", [InstId, Reason]), - ets:insert( - emqx_resource_instance, - {InstId, Group, Data#{status => connecting}} - ), - {error, Reason}; - {error, Reason, ResourceState1} -> - logger:error("health check for ~p failed: ~p", [InstId, Reason]), - ets:insert( - emqx_resource_instance, - {InstId, Group, Data#{status => connecting, state => ResourceState1}} - ), - {error, Reason} + ConnectStatus -> + logger:error("health check for ~p failed: ~p", [InstId, ConnectStatus]) end. do_set_resource_status_connecting(InstId) -> case emqx_resource_instance:lookup(InstId) of {ok, Group, #{id := InstId} = Data} -> logger:error("health check for ~p failed: timeout", [InstId]), - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => connecting}}); + update_resource(InstId, Group, Data#{status => connecting}); Error -> {error, Error} end. @@ -340,6 +343,9 @@ do_set_resource_status_connecting(InstId) -> %% internal functions %%------------------------------------------------------------------------------ +update_resource(InstId, Group, Data) -> + ets:insert(emqx_resource_instance, {InstId, Group, Data}). + do_with_group_and_instance_data(InstId, Do, Args) -> case lookup(InstId) of {ok, Group, Data} -> erlang:apply(Do, [Group, Data | Args]); diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 973cf0ab7..865917128 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -192,10 +192,7 @@ t_healthy(_) -> erlang:exit(Pid, shutdown), - ?assertEqual( - {error, dead}, - emqx_resource:health_check(?ID) - ), + ?assertEqual(ok, emqx_resource:health_check(?ID)), ?assertMatch( [#{status := connecting}], diff --git a/apps/emqx_resource/test/emqx_test_resource.erl b/apps/emqx_resource/test/emqx_test_resource.erl index 783393e74..c23f87d50 100644 --- a/apps/emqx_resource/test/emqx_test_resource.erl +++ b/apps/emqx_resource/test/emqx_test_resource.erl @@ -25,7 +25,7 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2 + on_get_status/2 ]). %% callbacks for emqx_resource config schema @@ -85,13 +85,13 @@ on_query(_InstId, get_state_failed, AfterQuery, State) -> emqx_resource:query_failed(AfterQuery), State. -on_health_check(_InstId, State = #{health_check_error := true}) -> - {error, dead, State}; -on_health_check(_InstId, State = #{pid := Pid}) -> +on_get_status(_InstId, #{health_check_error := true}) -> + disconnected; +on_get_status(_InstId, #{pid := Pid}) -> timer:sleep(300), case is_process_alive(Pid) of - true -> {ok, State}; - false -> {error, dead, State} + true -> connected; + false -> connecting end. spawn_dummy_process(Name, Register) ->