refactor: resource check & connector status

This commit is contained in:
DDDHuang 2022-04-27 16:56:51 +08:00
parent a50980c496
commit 2a2308bbf8
18 changed files with 201 additions and 220 deletions

View File

@ -25,7 +25,7 @@
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/4, on_query/4,
on_health_check/2, on_get_status/2,
connect/1 connect/1
]). ]).
@ -70,16 +70,15 @@ on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) ->
emqx_resource:query_success(AfterQuery), emqx_resource:query_success(AfterQuery),
ok. ok.
on_health_check(_InstId, State = #{pool_name := PoolName}) -> on_get_status(_InstId, #{pool_name := PoolName}) ->
emqx_plugin_libs_pool:health_check( emqx_plugin_libs_pool:get_status(
PoolName, PoolName,
fun(Pid) -> fun(Pid) ->
case emqx_authn_jwks_client:get_jwks(Pid) of case emqx_authn_jwks_client:get_jwks(Pid) of
{ok, _} -> true; {ok, _} -> true;
_ -> false _ -> false
end end
end, end
State
). ).
connect(Opts) -> connect(Opts) ->

View File

@ -25,12 +25,11 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([ on_start/2
on_start/2, , on_stop/2
on_stop/2, , on_query/4
on_query/4, , on_get_status/2
on_health_check/2 ]).
]).
-type url() :: emqx_http_lib:uri_map(). -type url() :: emqx_http_lib:uri_map().
-reflect_type([url/0]). -reflect_type([url/0]).
@ -306,13 +305,17 @@ on_query(
end, end,
Result. Result.
on_health_check(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) -> on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) ->
case do_health_check(Host, Port, Timeout) of case do_get_status(Host, Port, Timeout) of
ok -> {ok, State}; ok -> connected;
{error, Reason} -> {error, {http_health_check_failed, Reason}, State} {error, Reason} ->
?SLOG(error, #{msg => "http_connector_get_status_failed",
reason => Reason,
host => Host, port => Port}),
disconnected
end. end.
do_health_check(Host, Port, Timeout) -> do_get_status(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
{ok, Sock} -> {ok, Sock} ->
gen_tcp:close(Sock), gen_tcp:close(Sock),

View File

@ -24,14 +24,11 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([ on_start/2
on_start/2, , on_stop/2
on_stop/2, , on_query/4
on_query/4, , on_get_status/2
on_health_check/2 ]).
]).
-export([do_health_check/1]).
-export([connect/1]). -export([connect/1]).
@ -90,7 +87,7 @@ on_start(
], ],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{poolname => PoolName}}; ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
@ -128,11 +125,7 @@ on_query(InstId, {search, Base, Filter, Attributes}, AfterQuery, #{poolname := P
end, end,
Result. Result.
on_health_check(_InstId, #{poolname := PoolName} = State) -> on_get_status(_InstId, _State) -> connected.
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
do_health_check(_Conn) ->
{ok, true}.
reconn_interval(true) -> 15; reconn_interval(true) -> 15;
reconn_interval(false) -> false. reconn_interval(false) -> false.

View File

@ -24,12 +24,11 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([ on_start/2
on_start/2, , on_stop/2
on_stop/2, , on_query/4
on_query/4, , on_get_status/2
on_health_check/2 ]).
]).
%% ecpool callback %% ecpool callback
-export([connect/1]). -export([connect/1]).
@ -222,21 +221,17 @@ on_query(
Result Result
end. end.
-dialyzer({nowarn_function, [on_health_check/2]}). -dialyzer({nowarn_function, [on_get_status/2]}).
on_health_check(InstId, #{poolname := PoolName} = State) -> on_get_status(InstId, #{poolname := PoolName} = _State) ->
case health_check(PoolName) of case health_check(PoolName) of
true -> true ->
?tp(debug, emqx_connector_mongo_health_check, #{ ?tp(debug, emqx_connector_mongo_health_check, #{instance_id => InstId,
instance_id => InstId, status => ok}),
status => ok connected;
}),
{ok, State};
false -> false ->
?tp(warning, emqx_connector_mongo_health_check, #{ ?tp(warning, emqx_connector_mongo_health_check, #{instance_id => InstId,
instance_id => InstId, status => failed}),
status => failed disconnected
}),
{error, health_check_failed, State}
end. end.
health_check(PoolName) -> health_check(PoolName) ->
@ -252,30 +247,24 @@ check_worker_health(Worker) ->
%% we don't care if this returns something or not, we just to test the connection %% we don't care if this returns something or not, we just to test the connection
try do_test_query(Conn) of try do_test_query(Conn) of
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{ ?SLOG(warning, #{msg => "mongo_connection_get_status_error",
msg => "mongo_connection_health_check_error", worker => Worker,
worker => Worker, reason => Reason}),
reason => Reason
}),
false; false;
_ -> _ ->
true true
catch catch
Class:Error -> Class:Error ->
?SLOG(warning, #{ ?SLOG(warning, #{msg => "mongo_connection_get_status_exception",
msg => "mongo_connection_health_check_exception", worker => Worker,
worker => Worker, class => Class,
class => Class, error => Error}),
error => Error
}),
false false
end; end;
_ -> _ ->
?SLOG(warning, #{ ?SLOG(warning, #{msg => "mongo_connection_get_status_error",
msg => "mongo_connection_health_check_error", worker => Worker,
worker => Worker, reason => worker_not_found}),
reason => worker_not_found
}),
false false
end. end.

View File

@ -34,12 +34,11 @@
-export([on_message_received/3]). -export([on_message_received/3]).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([ on_start/2
on_start/2, , on_stop/2
on_stop/2, , on_query/4
on_query/4, , on_get_status/2
on_health_check/2 ]).
]).
-behaviour(hocon_schema). -behaviour(hocon_schema).
@ -188,10 +187,10 @@ on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
emqx_resource:query_success(AfterQuery). emqx_resource:query_success(AfterQuery).
on_health_check(_InstId, #{name := InstanceId} = State) -> on_get_status(_InstId, #{name := InstanceId}) ->
case emqx_connector_mqtt_worker:ping(InstanceId) of case emqx_connector_mqtt_worker:status(InstanceId) of
pong -> {ok, State}; connected -> connected;
_ -> {error, {connector_down, InstanceId}, State} _ -> disconnected
end. end.
ensure_mqtt_worker_started(InstanceId) -> ensure_mqtt_worker_started(InstanceId) ->

View File

@ -23,12 +23,11 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([ on_start/2
on_start/2, , on_stop/2
on_stop/2, , on_query/4
on_query/4, , on_get_status/2
on_health_check/2 ]).
]).
%% ecpool connect & reconnect %% ecpool connect & reconnect
-export([connect/1, prepare_sql_to_conn/2]). -export([connect/1, prepare_sql_to_conn/2]).
@ -37,7 +36,7 @@
-export([roots/0, fields/1]). -export([roots/0, fields/1]).
-export([do_health_check/1]). -export([do_get_status/1]).
-define(MYSQL_HOST_OPTIONS, #{ -define(MYSQL_HOST_OPTIONS, #{
host_type => inet_addr, host_type => inet_addr,
@ -98,9 +97,9 @@ on_start(
], ],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
Prepares = maps:get(prepare_statement, Config, #{}), Prepares = maps:get(prepare_statement, Config, #{}),
State = init_prepare(#{poolname => PoolName, prepare_statement => Prepares}), State = #{poolname => PoolName, prepare_statement => Prepares, auto_reconnect => AutoReconn},
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok -> {ok, State}; ok -> {ok, init_prepare(State)};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
@ -169,27 +168,34 @@ on_query(
mysql_function(sql) -> query; mysql_function(sql) -> query;
mysql_function(prepared_query) -> execute. mysql_function(prepared_query) -> execute.
on_health_check(_InstId, #{poolname := PoolName} = State) -> on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = State) ->
case emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State) of case emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn) of
{ok, State} -> connected ->
case do_health_check_prepares(State) of case do_check_prepares(State) of
ok -> ok ->
{ok, State}; connected;
{ok, NState} -> {ok, NState} ->
{ok, NState}; %% return new state with prepared statements
{connected, NState};
{error, _Reason} -> {error, _Reason} ->
{error, health_check_failed, State} %% do not log error, it is logged in prepare_sql_to_conn
case AutoReconn of
true ->
connecting;
false ->
disconnected
end
end; end;
{error, health_check_failed, State} -> ConnectStatus ->
{error, health_check_failed, State} ConnectStatus
end. end.
do_health_check(Conn) -> do_get_status(Conn) ->
ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)). ok == element(1, mysql:query(Conn, <<"SELECT count(1) AS T">>)).
do_health_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) ->
ok; ok;
do_health_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) -> do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) ->
%% retry to prepare %% retry to prepare
case prepare_sql(Prepares, PoolName) of case prepare_sql(Prepares, PoolName) of
ok -> ok ->

View File

@ -26,12 +26,11 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([ on_start/2
on_start/2, , on_stop/2
on_stop/2, , on_query/4
on_query/4, , on_get_status/2
on_health_check/2 ]).
]).
-export([connect/1]). -export([connect/1]).
@ -40,7 +39,7 @@
prepared_query/3 prepared_query/3
]). ]).
-export([do_health_check/1]). -export([do_get_status/1]).
-define(PGSQL_HOST_OPTIONS, #{ -define(PGSQL_HOST_OPTIONS, #{
host_type => inet_addr, host_type => inet_addr,
@ -105,7 +104,7 @@ on_start(
], ],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of
ok -> {ok, #{poolname => PoolName}}; ok -> {ok, #{poolname => PoolName, auto_reconnect => AutoReconn}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
@ -139,10 +138,10 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName}
end, end,
Result. Result.
on_health_check(_InstId, #{poolname := PoolName} = State) -> on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
do_health_check(Conn) -> do_get_status(Conn) ->
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")). ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
%% =================================================================== %% ===================================================================

View File

@ -19,20 +19,20 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([roots/0, fields/1]). -export([roots/0, fields/1]).
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([ on_start/2
on_start/2, , on_stop/2
on_stop/2, , on_query/4
on_query/4, , on_get_status/2
on_health_check/2 ]).
]).
-export([do_health_check/1]). -export([do_get_status/1]).
-export([connect/1]). -export([connect/1]).
@ -146,18 +146,17 @@ on_start(
[{ssl, false}] [{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
State = #{poolname => PoolName, type => Type, auto_reconnect => AutoReconn},
case Type of case Type of
cluster -> cluster ->
case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of
{ok, _} -> {ok, #{poolname => PoolName, type => Type}}; {ok, _} -> {ok, State};
{ok, _, _} -> {ok, #{poolname => PoolName, type => Type}}; {ok, _, _} -> {ok, State};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end; end;
_ -> _ ->
case case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) of
emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) ok -> {ok, State};
of
ok -> {ok, #{poolname => PoolName, type => Type}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end end
end. end.
@ -212,26 +211,30 @@ eredis_cluster_workers_exist_and_are_connected(Workers) ->
Workers Workers
). ).
on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) -> on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect := AutoReconn}) ->
case eredis_cluster:pool_exists(PoolName) of case eredis_cluster:pool_exists(PoolName) of
true -> true ->
Workers = extract_eredis_cluster_workers(PoolName), Workers = extract_eredis_cluster_workers(PoolName),
case eredis_cluster_workers_exist_and_are_connected(Workers) of Health = eredis_cluster_workers_exist_and_are_connected(Workers),
true -> {ok, State}; status_result(Health, AutoReconn);
false -> {error, health_check_failed, State}
end;
false -> false ->
{error, health_check_failed, State} disconnect
end; end;
on_health_check(_InstId, #{poolname := PoolName} = State) ->
emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).
do_health_check(Conn) ->
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of case eredis:q(Conn, ["PING"]) of
{ok, _} -> true; {ok, _} -> true;
_ -> false _ -> false
end. end.
status_result(_Status = true, _AutoReconn) -> connected;
status_result(_Status = false, _AutoReconn = true) -> connecting;
status_result(_Status = false, _AutoReconn = false) -> disconnected.
reconn_interval(true) -> 15; reconn_interval(true) -> 15;
reconn_interval(false) -> false. reconn_interval(false) -> false.

View File

@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
}} = }} =
emqx_resource:get_instance(PoolName), emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected), ?assertEqual(StoppedStatus, disconnected),
?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance % Can call stop/1 again on an already stopped instance

View File

@ -77,11 +77,9 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
), ),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource % Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{ {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
state := State, status := InitialStatus}}
status := InitialStatus = emqx_resource:get_instance(PoolName),
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected % % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
@ -102,7 +100,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
}} = }} =
emqx_resource:get_instance(PoolName), emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected), ?assertEqual(StoppedStatus, disconnected),
?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance % Can call stop/1 again on an already stopped instance

View File

@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
}} = }} =
emqx_resource:get_instance(PoolName), emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected), ?assertEqual(StoppedStatus, disconnected),
?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance % Can call stop/1 again on an already stopped instance

View File

@ -109,7 +109,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
}} = }} =
emqx_resource:get_instance(PoolName), emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected), ?assertEqual(StoppedStatus, disconnected),
?assertEqual({error, health_check_failed}, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance % Can call stop/1 again on an already stopped instance

View File

@ -20,7 +20,8 @@
start_pool/3, start_pool/3,
stop_pool/1, stop_pool/1,
pool_name/1, pool_name/1,
health_check/3 get_status/2,
get_status/3
]). ]).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -60,7 +61,10 @@ stop_pool(Name) ->
error({stop_pool_failed, Name, Reason}) error({stop_pool_failed, Name, Reason})
end. end.
health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) -> get_status(PoolName, CheckFunc) ->
get_status(PoolName, CheckFunc, false).
get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) ->
Status = [ Status = [
begin begin
case ecpool_worker:client(Worker) of case ecpool_worker:client(Worker) of
@ -71,6 +75,12 @@ health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) ->
|| {_WorkerName, Worker} <- ecpool:workers(PoolName) || {_WorkerName, Worker} <- ecpool:workers(PoolName)
], ],
case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of
true -> {ok, State}; true -> connected;
false -> {error, health_check_failed, State} false ->
case AutoReconn of
true ->
connecting;
false ->
disconnect
end
end. end.

View File

@ -20,12 +20,13 @@
-type resource_config() :: term(). -type resource_config() :: term().
-type resource_spec() :: map(). -type resource_spec() :: map().
-type resource_state() :: term(). -type resource_state() :: term().
-type resource_connection_status() :: connected | disconnected | connecting.
-type resource_data() :: #{ -type resource_data() :: #{
id := instance_id(), id := instance_id(),
mod := module(), mod := module(),
config := resource_config(), config := resource_config(),
state := resource_state(), state := resource_state(),
status := connected | disconnected | connecting, status := resource_connection_status(),
metrics := emqx_plugin_libs_metrics:metrics() metrics := emqx_plugin_libs_metrics:metrics()
}. }.
-type resource_group() :: binary(). -type resource_group() :: binary().

View File

@ -85,33 +85,22 @@
]). ]).
%% Direct calls to the callback module %% Direct calls to the callback module
-export([ call_start/3 %% start the instance
, call_health_check/3 %% verify if the resource is working normally
, call_stop/3 %% stop the instance
]).
%% start the instance -export([ list_instances/0 %% list all the instances, id only.
-export([ , list_instances_verbose/0 %% list all the instances
call_start/3, , get_instance/1 %% return the data of the instance
%% verify if the resource is working normally , list_instances_by_type/1 %% return all the instances of the same resource type
call_health_check/3, , generate_id/1
%% stop the instance , list_group_instances/1
call_stop/3 ]).
]).
%% list all the instances, id only. -optional_callbacks([ on_query/4
-export([ , on_get_status/2
list_instances/0, ]).
%% list all the instances
list_instances_verbose/0,
%% return the data of the instance
get_instance/1,
%% return all the instances of the same resource type
list_instances_by_type/1,
generate_id/1,
list_group_instances/1
]).
-optional_callbacks([
on_query/4,
on_health_check/2
]).
%% when calling emqx_resource:start/1 %% when calling emqx_resource:start/1
-callback on_start(instance_id(), resource_config()) -> -callback on_start(instance_id(), resource_config()) ->
@ -124,8 +113,9 @@
-callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term(). -callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term().
%% when calling emqx_resource:health_check/2 %% when calling emqx_resource:health_check/2
-callback on_health_check(instance_id(), resource_state()) -> -callback on_get_status(instance_id(), resource_state()) ->
{ok, resource_state()} | {error, Reason :: term(), resource_state()}. resource_connection_status() |
{resource_connection_status(), resource_state()}.
-spec list_types() -> [module()]. -spec list_types() -> [module()].
list_types() -> list_types() ->
@ -314,11 +304,10 @@ call_start(InstId, Mod, Config) ->
?SAFE_CALL(Mod:on_start(InstId, Config)). ?SAFE_CALL(Mod:on_start(InstId, Config)).
-spec call_health_check(instance_id(), module(), resource_state()) -> -spec call_health_check(instance_id(), module(), resource_state()) ->
{ok, resource_state()} resource_connection_status() |
| {error, Reason :: term()} {resource_connection_status(), resource_state()}.
| {error, Reason :: term(), resource_state()}.
call_health_check(InstId, Mod, ResourceState) -> call_health_check(InstId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_health_check(InstId, ResourceState)). ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)).
-spec call_stop(instance_id(), module(), resource_state()) -> term(). -spec call_stop(instance_id(), module(), resource_state()) -> term().
call_stop(InstId, Mod, ResourceState) -> call_stop(InstId, Mod, ResourceState) ->

View File

@ -214,13 +214,12 @@ do_create_dry_run(ResourceType, Config) ->
case emqx_resource:call_start(InstId, ResourceType, Config) of case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState} -> {ok, ResourceState} ->
case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of case emqx_resource:call_health_check(InstId, ResourceType, ResourceState) of
{ok, _} -> connected ->
case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of case emqx_resource:call_stop(InstId, ResourceType, ResourceState) of
{error, _} = Error -> Error; {error, _} = Error -> Error;
_ -> ok _ -> ok
end; end;
{error, Reason, _} -> ConnectStatus -> {error, ConnectStatus}
{error, Reason}
end; end;
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
@ -262,7 +261,7 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
state => undefined state => undefined
}, },
%% The `emqx_resource:call_start/3` need the instance exist beforehand %% The `emqx_resource:call_start/3` need the instance exist beforehand
ets:insert(emqx_resource_instance, {InstId, Group, InitData}), update_resource(InstId, Group, InitData),
spawn(fun() -> spawn(fun() ->
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData) start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
end), end),
@ -273,10 +272,10 @@ start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->
case emqx_resource:call_start(InstId, ResourceType, Config) of case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState} -> {ok, ResourceState} ->
Data2 = Data#{state => ResourceState, status => connected}, Data2 = Data#{state => ResourceState, status => connected},
ets:insert(emqx_resource_instance, {InstId, Group, Data2}), update_resource(InstId, Group, Data2),
create_default_checker(InstId, Opts); create_default_checker(InstId, Opts);
{error, Reason} -> {error, Reason} ->
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), update_resource(InstId, Group, Data#{status => disconnected}),
{error, Reason} {error, Reason}
end. end.
@ -295,7 +294,7 @@ do_stop(_Group, #{state := undefined}) ->
do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) -> do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState), _ = emqx_resource:call_stop(InstId, Mod, ResourceState),
_ = emqx_resource_health_check:delete_checker(InstId), _ = emqx_resource_health_check:delete_checker(InstId),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}), update_resource(InstId, Group, Data#{status => disconnected}),
ok. ok.
do_health_check(InstId) when is_binary(InstId) -> do_health_check(InstId) when is_binary(InstId) ->
@ -304,44 +303,41 @@ do_health_check(InstId) when is_binary(InstId) ->
do_health_check(_Group, #{state := undefined}) -> do_health_check(_Group, #{state := undefined}) ->
{error, resource_not_initialized}; {error, resource_not_initialized};
do_health_check(Group, do_health_check(Group,
#{id := InstId, mod := Mod, state := ResourceState0, config := Config} = Data) -> #{id := InstId, mod := Mod, state := ResourceState, status := OldStatus} = Data) ->
FailedConnectStatus = case emqx_resource:call_health_check(InstId, Mod, ResourceState) of
case maps:get(auto_reconnect, Config, true) of {NewConnStatus, NewResourceState} ->
true -> connecting; NData = Data#{status => NewConnStatus, state => NewResourceState},
false -> disconnected update_resource(InstId, Group, NData),
end, maybe_log_health_check_result(InstId, NewConnStatus);
case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of NewConnStatus ->
{ok, ResourceState1} -> NData = Data#{status => NewConnStatus},
ets:insert( NewConnStatus /= OldStatus andalso update_resource(InstId, Group, NData),
emqx_resource_instance, maybe_log_health_check_result(InstId, NewConnStatus)
{InstId, Group, Data#{status => connected, state => ResourceState1}} end.
),
maybe_log_health_check_result(InstId, Result) ->
case Result of
connected ->
ok; ok;
{error, Reason} -> ConnectStatus ->
logger:error("health check for ~p failed: ~p", [InstId, Reason]), logger:error("health check for ~p failed: ~p", [InstId, ConnectStatus])
ets:insert(emqx_resource_instance,
{InstId, Group, Data#{status => FailedConnectStatus}}),
{error, Reason};
{error, Reason, ResourceState1} ->
logger:error("health check for ~p failed: ~p", [InstId, Reason]),
ets:insert(emqx_resource_instance,
{InstId, Group, Data#{status => FailedConnectStatus, state => ResourceState1}}),
{error, Reason}
end. end.
do_set_resource_status_connecting(InstId) -> do_set_resource_status_connecting(InstId) ->
case emqx_resource_instance:lookup(InstId) of case emqx_resource_instance:lookup(InstId) of
{ok, Group, #{id := InstId} = Data} -> {ok, Group, #{id := InstId} = Data} ->
logger:error("health check for ~p failed: timeout", [InstId]), logger:error("health check for ~p failed: timeout", [InstId]),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => connecting}}); update_resource(InstId, Group, Data#{status => connecting});
Error -> Error -> {error, Error}
{error, Error}
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% internal functions %% internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
update_resource(InstId, Group, Data) ->
ets:insert(emqx_resource_instance, {InstId, Group, Data}).
do_with_group_and_instance_data(InstId, Do, Args) -> do_with_group_and_instance_data(InstId, Do, Args) ->
case lookup(InstId) of case lookup(InstId) of
{ok, Group, Data} -> erlang:apply(Do, [Group, Data | Args]); {ok, Group, Data} -> erlang:apply(Do, [Group, Data | Args]);

View File

@ -192,10 +192,7 @@ t_healthy(_) ->
erlang:exit(Pid, shutdown), erlang:exit(Pid, shutdown),
?assertEqual( ?assertEqual(ok, emqx_resource:health_check(?ID)),
{error, dead},
emqx_resource:health_check(?ID)
),
?assertMatch( ?assertMatch(
[#{status := connecting}], [#{status := connecting}],

View File

@ -21,12 +21,11 @@
-behaviour(emqx_resource). -behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([ on_start/2
on_start/2, , on_stop/2
on_stop/2, , on_query/4
on_query/4, , on_get_status/2
on_health_check/2 ]).
]).
%% callbacks for emqx_resource config schema %% callbacks for emqx_resource config schema
-export([roots/0]). -export([roots/0]).
@ -85,13 +84,13 @@ on_query(_InstId, get_state_failed, AfterQuery, State) ->
emqx_resource:query_failed(AfterQuery), emqx_resource:query_failed(AfterQuery),
State. State.
on_health_check(_InstId, State = #{health_check_error := true}) -> on_get_status(_InstId, #{health_check_error := true}) ->
{error, dead, State}; disconnected;
on_health_check(_InstId, State = #{pid := Pid}) -> on_get_status(_InstId, #{pid := Pid}) ->
timer:sleep(300), timer:sleep(300),
case is_process_alive(Pid) of case is_process_alive(Pid) of
true -> {ok, State}; true -> connected;
false -> {error, dead, State} false -> connecting
end. end.
spawn_dummy_process(Name, Register) -> spawn_dummy_process(Name, Register) ->