diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 60d08c624..f6bdf55d4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -156,7 +156,7 @@ create_local(InstId, Group, ResourceType, Config) -> create_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create_local(InstId, Group, ResourceType, Config, Opts) -> - emqx_resource_manager:ensure_resource(InstId, Group, ResourceType, Config, Opts). + call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}). -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -166,7 +166,8 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - emqx_resource_manager:create_dry_run(ResourceType, Config). + RandId = iolist_to_binary(emqx_misc:gen_id(16)), + call_instance(RandId, {create_dry_run, ResourceType, Config}). -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -176,7 +177,7 @@ recreate(InstId, ResourceType, Config, Opts) -> -spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate_local(InstId, ResourceType, Config, Opts) -> - emqx_resource_manager:recreate(InstId, ResourceType, Config, Opts). + call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}). -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> @@ -184,11 +185,11 @@ remove(InstId) -> -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. remove_local(InstId) -> - emqx_resource_manager:remove(InstId). + call_instance(InstId, {remove, InstId}). -spec reset_metrics_local(instance_id()) -> ok. reset_metrics_local(InstId) -> - emqx_resource_manager:reset_metrics(InstId). + call_instance(InstId, {reset_metrics, InstId}). -spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}. reset_metrics(InstId) -> @@ -203,7 +204,13 @@ query(InstId, Request) -> %% it is the duty of the Module to apply the `after_query()` functions. -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). query(InstId, Request, AfterQuery) -> - case emqx_resource_manager:ets_lookup(InstId) of + case get_instance(InstId) of + {ok, _Group, #{status := connecting}} -> + query_error(connecting, <<"cannot serve query when the resource " + "instance is still connecting">>); + {ok, _Group, #{status := disconnected}} -> + query_error(disconnected, <<"cannot serve query when the resource " + "instance is disconnected">>); {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe @@ -223,23 +230,23 @@ restart(InstId) -> -spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. restart(InstId, Opts) -> - emqx_resource_manager:restart(InstId, Opts). + call_instance(InstId, {restart, InstId, Opts}). -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> - emqx_resource_manager:stop(InstId). + call_instance(InstId, {stop, InstId}). -spec health_check(instance_id()) -> ok | {error, Reason :: term()}. health_check(InstId) -> - emqx_resource_manager:health_check(InstId). + call_instance(InstId, {health_check, InstId}). set_resource_status_connecting(InstId) -> - emqx_resource_manager:set_resource_status_connecting(InstId). + call_instance(InstId, {set_resource_status_connecting, InstId}). -spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(InstId) -> - emqx_resource_manager:lookup(InstId). + emqx_resource_instance:lookup(InstId). -spec list_instances() -> [instance_id()]. list_instances() -> @@ -247,7 +254,7 @@ list_instances() -> -spec list_instances_verbose() -> [resource_data()]. list_instances_verbose() -> - emqx_resource_manager:list_all(). + emqx_resource_instance:list_all(). -spec list_instances_by_type(module()) -> [instance_id()]. list_instances_by_type(ResourceType) -> @@ -261,7 +268,7 @@ generate_id(Name) when is_binary(Name) -> <>. -spec list_group_instances(resource_group()) -> [instance_id()]. -list_group_instances(Group) -> emqx_resource_manager:list_group(Group). +list_group_instances(Group) -> emqx_resource_instance:list_group(Group). -spec call_start(instance_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. @@ -352,6 +359,9 @@ inc_metrics_funcs(InstId) -> ], {OnSucc, OnFailed}. +call_instance(InstId, Query) -> + emqx_resource_instance:hash_call(InstId, Query). + safe_apply(Func, Args) -> ?SAFE_CALL(erlang:apply(Func, Args)). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl deleted file mode 100644 index 35b9a32df..000000000 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ /dev/null @@ -1,416 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_resource_manager). --behaviour(gen_statem). - --include("emqx_resource.hrl"). --include("emqx_resource_utils.hrl"). - -% API --export([ - ensure_resource/5, - create_dry_run/2, - ets_lookup/1, - get_metrics/1, - health_check/1, - list_all/0, - list_group/1, - lookup/1, - recreate/4, - remove/1, - reset_metrics/1, - restart/2, - set_resource_status_connecting/1, - stop/1 -]). - -% Server --export([start_link/5]). - -% Behaviour --export([init/1, callback_mode/0, handle_event/4, terminate/3]). - -% State record --record(data, {id, group, mod, config, opts, status, state, error}). - --define(SHORT_HEALTHCHECK_INTERVAL, 1000). --define(HEALTHCHECK_INTERVAL, 15000). --define(ETS_TABLE, emqx_resource_manager). --define(TIME_DIVISOR, 100). --define(WAIT_FOR_RESOURCE_DELAY, 100). - -%%------------------------------------------------------------------------------ -%% API -%%------------------------------------------------------------------------------ - -%% @doc Called from emqx_resource when starting a resource instance. -%% -%% Triggers the emqx_resource_manager_sup supervisor to actually create -%% and link the process itself if not already started. --spec ensure_resource( - instance_id(), - resource_group(), - resource_type(), - resource_config(), - create_opts() -) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -ensure_resource(InstId, Group, ResourceType, Config, Opts) -> - case lookup(InstId) of - {ok, _Group, Data} -> - {ok, Data}; - {error, not_found} -> - case do_start(InstId, Group, ResourceType, Config, Opts) of - ok -> - {ok, _Group, Data} = lookup(InstId), - {ok, Data}; - Error -> - Error - end - end. - -%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. -%% -%% Triggers the `emqx_resource_manager_sup` supervisor to actually create -%% and link the process itself if not already started, and then immedately stops. --spec create_dry_run(resource_type(), resource_config()) -> - ok | {error, Reason :: term()}. -create_dry_run(ResourceType, Config) -> - InstId = make_test_id(), - case do_start(InstId, <<"dry_run">>, ResourceType, Config, #{}) of - ok -> - stop(InstId); - Error -> - stop(InstId), - Error - end. - -%% @doc Called from emqx_resource when recreating a resource which may or may not exist --spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> - {ok, resource_data()} | {error, Reason :: term()}. -recreate(InstId, ResourceType, NewConfig, Opts) -> - case lookup(InstId) of - {ok, Group, #{mod := ResourceType, status := connected} = _Data} -> - %% If this resource is in use (status='connected'), we should make sure - %% the new config is OK before removing the old one. - case create_dry_run(ResourceType, NewConfig) of - ok -> - remove(InstId, false), - ensure_resource(InstId, Group, ResourceType, NewConfig, Opts); - Error -> - Error - end; - {ok, Group, #{mod := ResourceType, status := _} = _Data} -> - remove(InstId, false), - ensure_resource(InstId, Group, ResourceType, NewConfig, Opts); - {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> - {error, updating_to_incorrect_resource_type}; - {error, not_found} -> - {error, not_found} - end. - -%% @doc Stops a running resource_manager and clears the metrics for the resource --spec remove(instance_id()) -> ok | {error, Reason :: term()}. -remove(InstId) when is_binary(InstId) -> - remove(InstId, true). - -%% @doc Stops a running resource_manager and optionally clears the metrics for the resource --spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}. -remove(InstId, ClearMetrics) when is_binary(InstId) -> - safe_call(InstId, {remove, ClearMetrics}). - -%% @doc Stops and then starts an instance that was already running --spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. -restart(InstId, Opts) when is_binary(InstId) -> - case lookup(InstId) of - {ok, Group, #{mod := ResourceType, config := Config} = _Data} -> - remove(InstId), - do_start(InstId, Group, ResourceType, Config, Opts); - Error -> - Error - end. - -%% @doc Stop the resource manager process --spec stop(instance_id()) -> ok | {error, Reason :: term()}. -stop(InstId) -> - safe_call(InstId, stop). - -%% @doc Test helper --spec set_resource_status_connecting(instance_id()) -> ok. -set_resource_status_connecting(InstId) -> - safe_call(InstId, set_resource_status_connecting). - -%% @doc Lookup the group and data of a resource --spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -lookup(InstId) -> - safe_call(InstId, lookup). - -%% @doc Lookup the group and data of a resource --spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -ets_lookup(InstId) -> - case ets:lookup(?ETS_TABLE, InstId) of - [{_id, Group, Data}] -> - {ok, Group, data_record_to_external_map(Data)}; - [] -> - {error, not_found} - end. - -%% @doc Reset the metrics for the specified resource --spec reset_metrics(instance_id()) -> ok. -reset_metrics(InstId) -> - emqx_plugin_libs_metrics:reset_metrics(resource_metrics, InstId). - -%% @doc Returns the data for all resorces --spec list_all() -> [resource_data()] | []. -list_all() -> - try - [ - data_record_to_external_map(Data) - || {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE) - ] - catch - error:badarg -> [] - end. - -%% @doc Returns a list of ids for all the resources in a group --spec list_group(resource_group()) -> [instance_id()]. -list_group(Group) -> - List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), - lists:flatten(List). - --spec health_check(instance_id()) -> ok | {error, Reason :: term()}. -health_check(InstId) -> - safe_call(InstId, health_check). - -%% Server start/stop callbacks - -%% @doc Function called from the supervisor to actually start the server -start_link(InstId, Group, ResourceType, Config, Opts) -> - Data = #data{ - id = InstId, - group = Group, - mod = ResourceType, - config = Config, - opts = Opts, - status = undefined, - state = undefined, - error = undefined - }, - gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). - -init(Data) -> - {ok, connecting, Data}. - -terminate(_Reason, _State, Data) -> - ets:delete(?ETS_TABLE, Data#data.id), - ok. - -%% Behavior callback - -callback_mode() -> [handle_event_function, state_enter]. - -%% Common event Function - -%% Called when the resource needs to be stopped -handle_event({call, From}, set_resource_status_connecting, _State, Data) -> - {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]}; -handle_event({call, From}, stop, _State, #data{status = disconnected} = _Data) -> - {keep_state_and_data, [{reply, From, ok}]}; -handle_event({call, From}, stop, _State, Data) -> - Result = do_stop(Data), - {next_state, disconnected, Data, [{reply, From, Result}]}; -handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> - handle_remove_event(From, ClearMetrics, Data); -handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> - Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)}, - {keep_state_and_data, [{reply, From, Reply}]}; -handle_event({call, From}, health_check, disconnected, Data) -> - Actions = [{reply, From, {error, Data#data.error}}], - {keep_state_and_data, Actions}; -handle_event({call, From}, health_check, _State, Data) -> - handle_health_check_event(From, Data); -handle_event(enter, connecting, connecting, Data) -> - handle_connecting_state_enter_event(Data); -handle_event(enter, _OldState, connecting, Data) -> - Actions = [{state_timeout, 0, healthcheck}], - {next_state, connecting, Data, Actions}; -handle_event(state_timeout, healthcheck, connecting, #data{status = disconnected} = Data) -> - {next_state, disconnected, Data}; -handle_event(state_timeout, healthcheck, connecting, Data) -> - connecting_healthcheck(Data); -%% The connected state is entered after a successful start of the callback mod -%% and successful healthchecks -handle_event(enter, _OldState, connected, Data) -> - Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, healthcheck}], - {next_state, connected, Data, Actions}; -handle_event(state_timeout, healthcheck, connected, Data) -> - perform_connected_healthcheck(Data); -handle_event(enter, _OldState, disconnected, #data{id = InstId} = Data) -> - UpdatedData = Data#data{status = disconnected}, - ets:delete(?ETS_TABLE, InstId), - {next_state, disconnected, UpdatedData}. - -%%------------------------------------------------------------------------------ -%% internal functions -%%------------------------------------------------------------------------------ - -handle_connecting_state_enter_event(Data) -> - case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of - {ok, ResourceState} -> - UpdatedData = Data#data{state = ResourceState, status = connecting}, - %% Perform an initial healthcheck immediately before transitioning into a connected state - Actions = [{state_timeout, 0, healthcheck}], - {next_state, connecting, UpdatedData, Actions}; - {error, Reason} -> - %% Keep track of the error reason why the connection did not work - %% so that the Reason can be returned when the verification call is made. - UpdatedData = Data#data{status = disconnected, error = Reason}, - Actions = [{state_timeout, 0, healthcheck}], - {next_state, connecting, UpdatedData, Actions} - end. - -handle_health_check_event(From, Data) -> - case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of - {ok, ResourceState} -> - UpdatedData = Data#data{state = ResourceState, status = connected, error = undefined}, - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}), - Actions = [{reply, From, ok}], - {next_state, connected, UpdatedData, Actions}; - {error, Reason} -> - logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]), - UpdatedData = Data#data{error = Reason}, - ets:delete(?ETS_TABLE, Data#data.id), - Actions = [{reply, From, {error, Reason}}], - {next_state, connecting, UpdatedData, Actions}; - {error, Reason, ResourceState} -> - logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]), - UpdatedData = Data#data{state = ResourceState, error = Reason}, - ets:delete(?ETS_TABLE, Data#data.id), - Actions = [{reply, From, {error, Reason}}], - {next_state, connecting, UpdatedData, Actions} - end. - -handle_remove_event(From, ClearMetrics, Data) -> - do_stop(Data), - ets:delete(?ETS_TABLE, Data#data.id), - case ClearMetrics of - true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, Data#data.id); - false -> ok - end, - {stop_and_reply, normal, [{reply, From, ok}]}. - -do_start(InstId, Group, ResourceType, Config, Opts) -> - % The state machine will make the actual call to the callback/resource module after init - emqx_resource_manager_sup:start_child(InstId, Group, ResourceType, Config, Opts), - case wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)) of - ok -> - ok = emqx_plugin_libs_metrics:create_metrics( - resource_metrics, - InstId, - [matched, success, failed, exception], - [matched] - ), - ok; - timeout -> - {error, timeout} - end. - -do_stop(Data) -> - Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), - ets:delete(?ETS_TABLE, Data#data.id), - Result. - -proc_name(Id) -> - binary_to_atom(Id). - -connecting_healthcheck(Data) -> - case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of - {ok, ResourceState} -> - UpdatedData = Data#data{state = ResourceState, status = connected, error = undefined}, - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}), - {next_state, connected, UpdatedData}; - {error, Reason} -> - logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]), - UpdatedData = Data#data{error = Reason}, - Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, healthcheck}], - {keep_state, UpdatedData, Actions}; - {error, Reason, ResourceState} -> - logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]), - UpdatedData = Data#data{state = ResourceState, error = Reason}, - Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, healthcheck}], - {keep_state, UpdatedData, Actions} - end. - -perform_connected_healthcheck(Data) -> - case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of - {ok, ResourceState} -> - UpdatedData = Data#data{state = ResourceState}, - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}), - Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, healthcheck}], - {keep_state, UpdatedData, Actions}; - {error, Reason} -> - logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]), - UpdatedData = Data#data{error = Reason}, - ets:delete(?ETS_TABLE, Data#data.id), - {next_state, connecting, UpdatedData}; - {error, Reason, ResourceState} -> - logger:error("health check for ~p failed: ~p", [Data#data.id, Reason]), - UpdatedData = Data#data{state = ResourceState, error = Reason}, - ets:delete(?ETS_TABLE, Data#data.id), - {next_state, connecting, UpdatedData} - end. - -data_record_to_external_map(Data) -> - #{ - id => Data#data.id, - mod => Data#data.mod, - config => Data#data.config, - status => Data#data.status, - state => Data#data.state - }. - -data_record_to_external_map_with_metrics(Data) -> - DataMap = data_record_to_external_map(Data), - DataMap#{metrics => get_metrics(Data#data.id)}. - -make_test_id() -> - RandId = iolist_to_binary(emqx_misc:gen_id(16)), - <>. - -wait_for_resource_ready(InstId, WaitTime) -> - do_wait_for_resource_ready(InstId, WaitTime div ?TIME_DIVISOR). - -do_wait_for_resource_ready(_InstId, 0) -> - timeout; -do_wait_for_resource_ready(InstId, Retry) -> - case lookup(InstId) of - {ok, _Group, #{status := connected}} -> - ok; - _ -> - timer:sleep(?WAIT_FOR_RESOURCE_DELAY), - do_wait_for_resource_ready(InstId, Retry - 1) - end. - -%% @doc Get the metrics for the specified resource -get_metrics(InstId) -> - emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). - -safe_call(InstId, Message) -> - try - gen_statem:call(proc_name(InstId), Message) - catch - exit:{noproc, _} -> - {error, not_found} - end. diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl deleted file mode 100644 index 662671b75..000000000 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ /dev/null @@ -1,44 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_resource_manager_sup). - --behaviour(supervisor). - --export([start_child/5]). - --export([start_link/0]). - --export([init/1]). - -start_child(InstId, Group, ResourceType, Config, Opts) -> - supervisor:start_child(?MODULE, [InstId, Group, ResourceType, Config, Opts]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - TabOpts = [named_table, set, public, {read_concurrency, true}], - _ = ets:new(emqx_resource_manager, TabOpts), - - ChildSpecs = [#{id => emqx_resource_manager, - start => {emqx_resource_manager, start_link, []}, - restart => transient, - shutdown => brutal_kill, - type => worker, - modules => [emqx_resource_manager]}], - - SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10}, - {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 3df01f1c5..a439c76bd 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -34,9 +34,33 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, Metrics = emqx_plugin_libs_metrics:child_spec(resource_metrics), - ResourceManager = - #{id => emqx_resource_manager_sup, - start => {emqx_resource_manager_sup, start_link, []}, - restart => permanent, - shutdown => infinity, type => supervisor, modules => [emqx_resource_manager_sup]}, - {ok, {SupFlags, [Metrics, ResourceManager]}}. + Pool = ?RESOURCE_INST_MOD, + Mod = ?RESOURCE_INST_MOD, + ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]), + ResourceInsts = [ + begin + ensure_pool_worker(Pool, {Pool, Idx}, Idx), + #{id => {Mod, Idx}, + start => {Mod, start_link, [Pool, Idx]}, + restart => transient, + shutdown => 5000, type => worker, modules => [Mod]} + end || Idx <- lists:seq(1, ?POOL_SIZE)], + HealthCheck = + #{id => emqx_resource_health_check_sup, + start => {emqx_resource_health_check_sup, start_link, []}, + restart => transient, + shutdown => infinity, type => supervisor, modules => [emqx_resource_health_check_sup]}, + {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}. + +%% internal functions +ensure_pool(Pool, Type, Opts) -> + try gproc_pool:new(Pool, Type, Opts) + catch + error:exists -> ok + end. + +ensure_pool_worker(Pool, Name, Slot) -> + try gproc_pool:add_worker(Pool, Name, Slot) + catch + error:exists -> ok + end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index d6d4a056f..8a952e036 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -71,7 +71,7 @@ t_create_remove(_) -> ?TEST_RESOURCE, #{name => test_resource}), - {ok, _} = emqx_resource:recreate( + emqx_resource:recreate( ?ID, ?TEST_RESOURCE, #{name => test_resource}, @@ -166,6 +166,7 @@ t_healthy(_) -> #{name => <<"test_resource">>}), timer:sleep(400), + emqx_resource_health_check:create_checker(?ID, 15000, 10000), #{pid := Pid} = emqx_resource:query(?ID, get_state), timer:sleep(300), emqx_resource:set_resource_status_connecting(?ID), @@ -183,7 +184,7 @@ t_healthy(_) -> emqx_resource:health_check(?ID)), ?assertMatch( - [], + [#{status := connecting}], emqx_resource:list_instances_verbose()), ok = emqx_resource:remove_local(?ID). @@ -215,7 +216,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertMatch({error, {emqx_resource, #{reason := not_found}}}, + ?assertMatch({error, {emqx_resource, #{reason := disconnected}}}, emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), @@ -253,7 +254,7 @@ t_stop_start_local(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertMatch({error, {emqx_resource, #{reason := not_found}}}, + ?assertMatch({error, {emqx_resource, #{reason := disconnected}}}, emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), @@ -294,17 +295,17 @@ t_create_dry_run_local(_) -> ?assertEqual(undefined, whereis(test_resource)). t_create_dry_run_local_failed(_) -> - {Res1, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE, + {Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE, #{cteate_error => true}), - ?assertEqual(error, Res1), + ?assertEqual(error, Res), - {Res2, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE, + {Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE, #{name => test_resource, health_check_error => true}), - ?assertEqual(error, Res2), + ?assertEqual(error, Res), - {Res3, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE, + {Res, _} = emqx_resource:create_dry_run_local(?TEST_RESOURCE, #{name => test_resource, stop_error => true}), - ?assertEqual(error, Res3). + ?assertEqual(error, Res). t_test_func(_) -> ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),