diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl index 86ad02c52..de3da25cc 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl @@ -25,7 +25,7 @@ on_start/2, on_stop/2, on_query/4, - on_health_check/2, + on_get_status/2, connect/1 ]). @@ -70,16 +70,15 @@ on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) -> emqx_resource:query_success(AfterQuery), ok. -on_health_check(_InstId, State = #{pool_name := PoolName}) -> - emqx_plugin_libs_pool:health_check( +on_get_status(_InstId, #{pool_name := PoolName}) -> + emqx_plugin_libs_pool:get_status( PoolName, fun(Pid) -> case emqx_authn_jwks_client:get_jwks(Pid) of {ok, _} -> true; _ -> false end - end, - State + end ). connect(Opts) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 38ddd2c23..d2edc5fca 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -25,12 +25,11 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/4, - on_health_check/2 -]). +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_get_status/2 + ]). -type url() :: emqx_http_lib:uri_map(). -reflect_type([url/0]). @@ -306,13 +305,17 @@ on_query( end, Result. -on_health_check(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) -> - case do_health_check(Host, Port, Timeout) of - ok -> {ok, State}; - {error, Reason} -> {error, {http_health_check_failed, Reason}, State} +on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) -> + case do_get_status(Host, Port, Timeout) of + ok -> connected; + {error, Reason} -> + ?SLOG(error, #{msg => "http_connector_get_status_failed", + reason => Reason, + host => Host, port => Port}), + disconnected end. -do_health_check(Host, Port, Timeout) -> +do_get_status(Host, Port, Timeout) -> case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of {ok, Sock} -> gen_tcp:close(Sock), diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 25798fae5..4214380b3 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -24,14 +24,11 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/4, - on_health_check/2 -]). - --export([do_health_check/1]). +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_get_status/2 + ]). -export([connect/1]). @@ -90,7 +87,7 @@ on_start( ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of - ok -> {ok, #{poolname => PoolName}}; + ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; {error, Reason} -> {error, Reason} end. @@ -128,11 +125,7 @@ on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := P end, Result. -on_health_check(_InstId, #{poolname := PoolName} = State) -> - emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). - -do_health_check(_Conn) -> - {ok, true}. +on_get_status(_InstId, _State) -> connected. reconn_interval(true) -> 15; reconn_interval(false) -> false. diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 170fa096b..a9d660545 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -24,12 +24,11 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/4, - on_health_check/2 -]). +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_get_status/2 + ]). %% ecpool callback -export([connect/1]). @@ -222,21 +221,17 @@ on_query( Result end. --dialyzer({nowarn_function, [on_health_check/2]}). -on_health_check(InstId, #{poolname := PoolName} = State) -> +-dialyzer({nowarn_function, [on_get_status/2]}). +on_get_status(InstId, #{poolname := PoolName} = _State) -> case health_check(PoolName) of true -> - ?tp(debug, emqx_connector_mongo_health_check, #{ - instance_id => InstId, - status => ok - }), - {ok, State}; + ?tp(debug, emqx_connector_mongo_health_check, #{instance_id => InstId, + status => ok}), + connected; false -> - ?tp(warning, emqx_connector_mongo_health_check, #{ - instance_id => InstId, - status => failed - }), - {error, health_check_failed, State} + ?tp(warning, emqx_connector_mongo_health_check, #{instance_id => InstId, + status => failed}), + disconnected end. health_check(PoolName) -> @@ -252,30 +247,24 @@ check_worker_health(Worker) -> %% we don't care if this returns something or not, we just to test the connection try do_test_query(Conn) of {error, Reason} -> - ?SLOG(warning, #{ - msg => "mongo_connection_health_check_error", - worker => Worker, - reason => Reason - }), + ?SLOG(warning, #{msg => "mongo_connection_get_status_error", + worker => Worker, + reason => Reason}), false; _ -> true catch Class:Error -> - ?SLOG(warning, #{ - msg => "mongo_connection_health_check_exception", - worker => Worker, - class => Class, - error => Error - }), + ?SLOG(warning, #{msg => "mongo_connection_get_status_exception", + worker => Worker, + class => Class, + error => Error}), false end; _ -> - ?SLOG(warning, #{ - msg => "mongo_connection_health_check_error", - worker => Worker, - reason => worker_not_found - }), + ?SLOG(warning, #{msg => "mongo_connection_get_status_error", + worker => Worker, + reason => worker_not_found}), false end. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 5f228027f..ab95285f4 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -34,12 +34,11 @@ -export([on_message_received/3]). %% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/4, - on_health_check/2 -]). +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_get_status/2 + ]). -behaviour(hocon_schema). @@ -188,10 +187,10 @@ on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_resource:query_success(AfterQuery). -on_health_check(_InstId, #{name := InstanceId} = State) -> - case emqx_connector_mqtt_worker:ping(InstanceId) of - pong -> {ok, State}; - _ -> {error, {connector_down, InstanceId}, State} +on_get_status(_InstId, #{name := InstanceId}) -> + case emqx_connector_mqtt_worker:status(InstanceId) of + connected -> connected; + _ -> disconnected end. ensure_mqtt_worker_started(InstanceId) -> diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 5e42a2ee2..8aba5ec00 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -23,12 +23,11 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/4, - on_health_check/2 -]). +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_get_status/2 + ]). %% ecpool connect & reconnect -export([connect/1, prepare_sql_to_conn/2]). @@ -37,7 +36,7 @@ -export([roots/0, fields/1]). --export([do_health_check/1]). +-export([do_get_status/1]). -define(MYSQL_HOST_OPTIONS, #{ host_type => inet_addr, @@ -98,9 +97,9 @@ on_start( ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), Prepares = maps:get(prepare_statement, Config, #{}), - State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}), + State = #{poolname => PoolName, prepare_statement => Prepares, auto_reconnect => AutoReconn}, case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, State}; + ok -> {ok, init_prepare(State)}; {error, Reason} -> {error, Reason} end. @@ -169,27 +168,34 @@ on_query( mysql_function(sql) -> query; mysql_function(prepared_query) -> execute. -on_health_check(_InstId, #{poolname := PoolName} = State) -> - case emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State) of - {ok, State} -> - case do_health_check_prepares(State) of +on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = State) -> + case emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn) of + connected -> + case do_check_prepares(State) of ok -> - {ok, State}; + connected; {ok, NState} -> - {ok, NState}; + %% return new state with prepared statements + {connected, NState}; {error, _Reason} -> - {error, health_check_failed, State} + %% do not log error, it is logged in prepare_sql_to_conn + case AutoReconn of + true -> + connecting; + false -> + disconnected + end end; - {error, health_check_failed, State} -> - {error, health_check_failed, State} + ConnectStatus -> + ConnectStatus end. -do_health_check(Conn) -> +do_get_status(Conn) -> ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)). -do_health_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> +do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> ok; -do_health_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) -> +do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) -> %% retry to prepare case prepare_sql(Prepares, PoolName) of ok -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 00ec31849..72f85955f 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -26,12 +26,11 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/4, - on_health_check/2 -]). +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_get_status/2 + ]). -export([connect/1]). @@ -40,7 +39,7 @@ prepared_query/3 ]). --export([do_health_check/1]). +-export([do_get_status/1]). -define(PGSQL_HOST_OPTIONS, #{ host_type => inet_addr, @@ -105,7 +104,7 @@ on_start( ], PoolName = emqx_plugin_libs_pool:pool_name(InstId), case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of - ok -> {ok, #{poolname => PoolName}}; + ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}}; {error, Reason} -> {error, Reason} end. @@ -139,10 +138,10 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName} end, Result. -on_health_check(_InstId, #{poolname := PoolName} = State) -> - emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). +on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> + emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). -do_health_check(Conn) -> +do_get_status(Conn) -> ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). %% =================================================================== diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 189d5e8c2..ca7bb12ea 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -19,20 +19,20 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -export([roots/0, fields/1]). -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/4, - on_health_check/2 -]). +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_get_status/2 + ]). --export([do_health_check/1]). +-export([do_get_status/1]). -export([connect/1]). @@ -146,18 +146,17 @@ on_start( [{ssl, false}] end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], PoolName = emqx_plugin_libs_pool:pool_name(InstId), + State = #{poolname => PoolName, type => Type, auto_reconnect => AutoReconn}, case Type of cluster -> case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of - {ok, _} -> {ok, #{poolname => PoolName, type => Type}}; - {ok, _, _} -> {ok, #{poolname => PoolName, type => Type}}; + {ok, _} -> {ok, State}; + {ok, _, _} -> {ok, State}; {error, Reason} -> {error, Reason} end; _ -> - case - emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) - of - ok -> {ok, #{poolname => PoolName, type => Type}}; + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) of + ok -> {ok, State}; {error, Reason} -> {error, Reason} end end. @@ -212,26 +211,30 @@ eredis_cluster_workers_exist_and_are_connected(Workers) -> Workers ). -on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) -> +on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect := AutoReconn}) -> case eredis_cluster:pool_exists(PoolName) of true -> Workers = extract_eredis_cluster_workers(PoolName), - case eredis_cluster_workers_exist_and_are_connected(Workers) of - true -> {ok, State}; - false -> {error, health_check_failed, State} - end; + Health = eredis_cluster_workers_exist_and_are_connected(Workers), + status_result(Health, AutoReconn); false -> - {error, health_check_failed, State} + disconnect end; -on_health_check(_InstId, #{poolname := PoolName} = State) -> - emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). -do_health_check(Conn) -> + +on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) -> + emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn). + +do_get_status(Conn) -> case eredis:q(Conn, ["PING"]) of {ok, _} -> true; _ -> false end. +status_result(_Status = true, _AutoReconn) -> connected; +status_result(_Status = false, _AutoReconn = true) -> connecting; +status_result(_Status = false, _AutoReconn = false) -> disconnected. + reconn_interval(true) -> 15; reconn_interval(false) -> false. diff --git a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl index 33a0397de..e1b99942c 100644 --- a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl @@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl index c039da168..c576e3de7 100644 --- a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl @@ -77,11 +77,9 @@ perform_lifecycle_check(PoolName, InitialConfig) -> ), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource - {ok, ?CONNECTOR_RESOURCE_GROUP, #{ - state := State, - status := InitialStatus - }} = - emqx_resource:get_instance(PoolName), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, + status := InitialStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(ok, emqx_resource:health_check(PoolName)), % % Perform query as further check that the resource is working as expected ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), @@ -102,7 +100,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl index b7044ea38..c3620c63c 100644 --- a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl @@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl index 64dd9e683..fad5c030b 100644 --- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -109,7 +109,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index edbd28242..f331e028f 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -20,7 +20,8 @@ start_pool/3, stop_pool/1, pool_name/1, - health_check/3 + get_status/2, + get_status/3 ]). -include_lib("emqx/include/logger.hrl"). @@ -60,7 +61,10 @@ stop_pool(Name) -> error({stop_pool_failed, Name, Reason}) end. -health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> +get_status(PoolName, CheckFunc) -> + get_status(PoolName, CheckFunc, false). + +get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) -> Status = [ begin case ecpool_worker:client(Worker) of @@ -71,6 +75,12 @@ health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> || {_WorkerName, Worker} <- ecpool:workers(PoolName) ], case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of - true -> {ok, State}; - false -> {error, health_check_failed, State} + true -> connected; + false -> + case AutoReconn of + true -> + connecting; + false -> + disconnect + end end. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index f10f4b440..41cbc872e 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -20,12 +20,13 @@ -type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). +-type resource_connection_status() :: connected | disconnected | connecting. -type resource_data() :: #{ id := instance_id(), mod := module(), config := resource_config(), state := resource_state(), - status := connected | disconnected | connecting, + status := resource_connection_status(), metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 48615a6e3..b24213eae 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -85,33 +85,22 @@ ]). %% Direct calls to the callback module +-export([ call_start/3 %% start the instance + , call_health_check/3 %% verify if the resource is working normally + , call_stop/3 %% stop the instance + ]). -%% start the instance --export([ - call_start/3, - %% verify if the resource is working normally - call_health_check/3, - %% stop the instance - call_stop/3 -]). +-export([ list_instances/0 %% list all the instances, id only. + , list_instances_verbose/0 %% list all the instances + , get_instance/1 %% return the data of the instance + , list_instances_by_type/1 %% return all the instances of the same resource type + , generate_id/1 + , list_group_instances/1 + ]). -%% list all the instances, id only. --export([ - list_instances/0, - %% list all the instances - list_instances_verbose/0, - %% return the data of the instance - get_instance/1, - %% return all the instances of the same resource type - list_instances_by_type/1, - generate_id/1, - list_group_instances/1 -]). - --optional_callbacks([ - on_query/4, - on_health_check/2 -]). +-optional_callbacks([ on_query/4 + , on_get_status/2 + ]). %% when calling emqx_resource:start/1 -callback on_start(instance_id(), resource_config()) -> @@ -124,8 +113,9 @@ -callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term(). %% when calling emqx_resource:health_check/2 --callback on_health_check(instance_id(), resource_state()) -> - {ok, resource_state()} | {error, Reason :: term(), resource_state()}. +-callback on_get_status(instance_id(), resource_state()) -> + resource_connection_status() | + {resource_connection_status(), resource_state()}. -spec list_types() -> [module()]. list_types() -> @@ -314,11 +304,10 @@ call_start(InstId, Mod, Config) -> ?SAFE_CALL(Mod:on_start(InstId, Config)). -spec call_health_check(instance_id(), module(), resource_state()) -> - {ok, resource_state()} - | {error, Reason :: term()} - | {error, Reason :: term(), resource_state()}. + resource_connection_status() | + {resource_connection_status(), resource_state()}. call_health_check(InstId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_health_check(InstId, ResourceState)). + ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)). -spec call_stop(instance_id(), module(), resource_state()) -> term(). call_stop(InstId, Mod, ResourceState) -> diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 351c34eeb..504afbe3d 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -214,13 +214,12 @@ do_create_dry_run(ResourceType, Config) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of - {ok, _} -> + connected -> case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of {error, _} = Error -> Error; _ -> ok end; - {error, Reason, _} -> - {error, Reason} + ConnectStatus -> {error, ConnectStatus} end; {error, Reason} -> {error, Reason} @@ -262,7 +261,7 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) -> state => undefined }, %% The `emqx_resource:call_start/3` need the instance exist beforehand - ets:insert(emqx_resource_instance, {InstId, Group, InitData}), + update_resource(InstId, Group, InitData), spawn(fun() -> start_and_check(InstId, Group, ResourceType, Config, Opts, InitData) end), @@ -273,10 +272,10 @@ start_and_check(InstId, Group, ResourceType, Config, Opts, Data) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> Data2 = Data#{state => ResourceState, status => connected}, - ets:insert(emqx_resource_instance, {InstId, Group, Data2}), + update_resource(InstId, Group, Data2), create_default_checker(InstId, Opts); {error, Reason} -> - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), + update_resource(InstId, Group, Data#{status => disconnected}), {error, Reason} end. @@ -295,7 +294,7 @@ do_stop(_Group, #{state := undefined}) -> do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), _ = emqx_resource_health_check:delete_checker(InstId), - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), + update_resource(InstId, Group, Data#{status => disconnected}), ok. do_health_check(InstId) when is_binary(InstId) -> @@ -304,44 +303,41 @@ do_health_check(InstId) when is_binary(InstId) -> do_health_check(_Group, #{state := undefined}) -> {error, resource_not_initialized}; do_health_check(Group, - #{id := InstId, mod := Mod, state := ResourceState0, config := Config} = Data) -> - FailedConnectStatus = - case maps:get(auto_reconnect, Config, true) of - true -> connecting; - false -> disconnected - end, - case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of - {ok, ResourceState1} -> - ets:insert( - emqx_resource_instance, - {InstId, Group, Data#{status => connected, state => ResourceState1}} - ), + #{id := InstId, mod := Mod, state := ResourceState, status := OldStatus} = Data) -> + case emqx_resource:call_health_check(InstId, Mod, ResourceState) of + {NewConnStatus, NewResourceState} -> + NData = Data#{status => NewConnStatus, state => NewResourceState}, + update_resource(InstId, Group, NData), + maybe_log_health_check_result(InstId, NewConnStatus); + NewConnStatus -> + NData = Data#{status => NewConnStatus}, + NewConnStatus /= OldStatus andalso update_resource(InstId, Group, NData), + maybe_log_health_check_result(InstId, NewConnStatus) + end. + +maybe_log_health_check_result(InstId, Result) -> + case Result of + connected -> ok; - {error, Reason} -> - logger:error("health check for ~p failed: ~p", [InstId, Reason]), - ets:insert(emqx_resource_instance, - {InstId, Group, Data#{status => FailedConnectStatus}}), - {error, Reason}; - {error, Reason, ResourceState1} -> - logger:error("health check for ~p failed: ~p", [InstId, Reason]), - ets:insert(emqx_resource_instance, - {InstId, Group, Data#{status => FailedConnectStatus, state => ResourceState1}}), - {error, Reason} + ConnectStatus -> + logger:error("health check for ~p failed: ~p", [InstId, ConnectStatus]) end. do_set_resource_status_connecting(InstId) -> case emqx_resource_instance:lookup(InstId) of {ok, Group, #{id := InstId} = Data} -> logger:error("health check for ~p failed: timeout", [InstId]), - ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => connecting}}); - Error -> - {error, Error} + update_resource(InstId, Group, Data#{status => connecting}); + Error -> {error, Error} end. %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ +update_resource(InstId, Group, Data) -> + ets:insert(emqx_resource_instance, {InstId, Group, Data}). + do_with_group_and_instance_data(InstId, Do, Args) -> case lookup(InstId) of {ok, Group, Data} -> erlang:apply(Do, [Group, Data | Args]); diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 973cf0ab7..865917128 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -192,10 +192,7 @@ t_healthy(_) -> erlang:exit(Pid, shutdown), - ?assertEqual( - {error, dead}, - emqx_resource:health_check(?ID) - ), + ?assertEqual(ok, emqx_resource:health_check(?ID)), ?assertMatch( [#{status := connecting}], diff --git a/apps/emqx_resource/test/emqx_test_resource.erl b/apps/emqx_resource/test/emqx_test_resource.erl index 783393e74..a1148f94e 100644 --- a/apps/emqx_resource/test/emqx_test_resource.erl +++ b/apps/emqx_resource/test/emqx_test_resource.erl @@ -21,12 +21,11 @@ -behaviour(emqx_resource). %% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/4, - on_health_check/2 -]). +-export([ on_start/2 + , on_stop/2 + , on_query/4 + , on_get_status/2 + ]). %% callbacks for emqx_resource config schema -export([roots/0]). @@ -85,13 +84,13 @@ on_query(_InstId, get_state_failed, AfterQuery, State) -> emqx_resource:query_failed(AfterQuery), State. -on_health_check(_InstId, State = #{health_check_error := true}) -> - {error, dead, State}; -on_health_check(_InstId, State = #{pid := Pid}) -> +on_get_status(_InstId, #{health_check_error := true}) -> + disconnected; +on_get_status(_InstId, #{pid := Pid}) -> timer:sleep(300), case is_process_alive(Pid) of - true -> {ok, State}; - false -> {error, dead, State} + true -> connected; + false -> connecting end. spawn_dummy_process(Name, Register) ->