From cc25f922732281e43801f801aa2ac734a8ed471e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 16 Jun 2022 23:34:52 +0800 Subject: [PATCH 1/2] feat: add start_after_created option to resource:create/4 --- apps/emqx_authn/src/emqx_authn_utils.erl | 18 ++++---- .../src/emqx_connector_mongo.erl | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 8 ++++ apps/emqx_resource/src/emqx_resource.erl | 13 +++++- .../src/emqx_resource_manager.erl | 28 ++++++++----- .../test/emqx_resource_SUITE.erl | 42 ++++++++++++++++++- 6 files changed, 86 insertions(+), 25 deletions(-) diff --git a/apps/emqx_authn/src/emqx_authn_utils.erl b/apps/emqx_authn/src/emqx_authn_utils.erl index bcc6f4e0e..232cf5ac1 100644 --- a/apps/emqx_authn/src/emqx_authn_utils.erl +++ b/apps/emqx_authn/src/emqx_authn_utils.erl @@ -59,16 +59,14 @@ create_resource(ResourceId, Module, Config) -> ). update_resource(Module, Config, ResourceId) -> - %% recreate before maybe stop - %% resource will auto start during recreate - Result = emqx_resource:recreate_local(ResourceId, Module, Config), - case Config of - #{enable := true} -> - Result; - #{enable := false} -> - ok = emqx_resource:stop(ResourceId), - Result - end. + Opts = #{start_after_created => false}, + Result = emqx_resource:recreate_local(ResourceId, Module, Config, Opts), + _ = + case Config of + #{enable := true} -> emqx_resource:start(ResourceId); + #{enable := false} -> ok + end, + Result. check_password_from_selected_map(_Algorithm, _Selected, undefined) -> {error, bad_username_or_password}; diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 5c10d0a6f..4396db933 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -38,7 +38,7 @@ -export([mongo_query/5, check_worker_health/1]). --define(HEALTH_CHECK_TIMEOUT, 10000). +-define(HEALTH_CHECK_TIMEOUT, 30000). %% mongo servers don't need parse -define(MONGO_HOST_OPTIONS, #{ diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index d1d82a401..f9256ac7c 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -33,7 +33,15 @@ -type create_opts() :: #{ health_check_interval => integer(), health_check_timeout => integer(), + %% We can choose to block the return of emqx_resource:start until + %% the resource connected, wait max to `wait_for_resource_ready` ms. wait_for_resource_ready => integer(), + %% If `start_after_created` is set to true, the resource is started right + %% after it is created. But note that a `started` resource is not guaranteed + %% to be `connected`. + start_after_created => boolean(), + %% If the resource disconnected, we can set to retry starting the resource + %% periodically. auto_retry_interval => integer() }. -type after_query() :: diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 2a3f29122..4837161ae 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -70,8 +70,9 @@ %% Calls to the callback module with current resource state %% They also save the state after the call finished (except query/2,3). -%% restart the instance. -export([ + start/1, + start/2, restart/1, restart/2, %% verify if the resource is working normally @@ -261,11 +262,19 @@ query(InstId, Request, AfterQuery) -> erlang:raise(Err, Reason, ST) end; {ok, _Group, _Data} -> - query_error(not_found, <<"resource not connected">>); + query_error(not_connected, <<"resource not connected">>); {error, not_found} -> query_error(not_found, <<"resource not found">>) end. +-spec start(instance_id()) -> ok | {error, Reason :: term()}. +start(InstId) -> + start(InstId, #{}). + +-spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +start(InstId, Opts) -> + emqx_resource_manager:start(InstId, Opts). + -spec restart(instance_id()) -> ok | {error, Reason :: term()}. restart(InstId) -> restart(InstId, #{}). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index bcc4422b2..54439f62e 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -96,7 +96,10 @@ create(InstId, Group, ResourceType, Config, Opts) -> [matched, success, failed, exception], [matched] ), - wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + case maps:get(start_after_created, Opts, true) of + true -> wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)); + false -> ok + end, ok. %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. @@ -108,9 +111,9 @@ create(InstId, Group, ResourceType, Config, Opts) -> create_dry_run(ResourceType, Config) -> InstId = make_test_id(), ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}), - case wait_for_resource_ready(InstId, 5000) of + case wait_for_resource_ready(InstId, 15000) of ok -> - _ = remove(InstId); + remove(InstId); timeout -> _ = remove(InstId), {error, timeout} @@ -123,7 +126,9 @@ recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of {ok, Group, #{mod := ResourceType, status := _} = _Data} -> _ = remove(InstId, false), - ensure_resource(InstId, Group, ResourceType, NewConfig, Opts); + create(InstId, Group, ResourceType, NewConfig, Opts), + {ok, _Group, Data} = lookup(InstId), + {ok, Data}; {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> @@ -151,7 +156,7 @@ restart(InstId, Opts) when is_binary(InstId) -> Error end. -%% @doc Stop the resource +%% @doc Start the resource -spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. start(InstId, Opts) -> case safe_call(InstId, start, ?T_OPERATION) of @@ -162,7 +167,7 @@ start(InstId, Opts) -> Error end. -%% @doc Start the resource +%% @doc Stop the resource -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> case safe_call(InstId, stop, ?T_OPERATION) of @@ -240,13 +245,16 @@ start_link(InstId, Group, ResourceType, Config, Opts) -> state = undefined, error = undefined }, - gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). + gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, {Data, Opts}, []). -init(Data) -> +init({Data, Opts}) -> process_flag(trap_exit, true), %% init the cache so that lookup/1 will always return something ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), - {ok, connecting, Data, {next_event, internal, try_connect}}. + case maps:get(start_after_created, Opts, true) of + true -> {ok, connecting, Data, {next_event, internal, start_resource}}; + false -> {ok, stopped, Data} + end. terminate(_Reason, _State, Data) -> _ = maybe_clear_alarm(Data#data.id), @@ -296,7 +304,7 @@ handle_event(enter, _OldState, connecting, Data) -> ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), Actions = [{state_timeout, 0, health_check}], {keep_state_and_data, Actions}; -handle_event(internal, try_connect, connecting, Data) -> +handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> handle_connecting_health_check(Data); diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 2519bc7d3..6da68a537 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -126,8 +126,46 @@ t_create_remove_local(_) -> ok = emqx_resource:remove_local(?ID), {error, _} = emqx_resource:remove_local(?ID), + ?assertMatch( + {error, {emqx_resource, #{reason := not_found}}}, + emqx_resource:query(?ID, get_state) + ), ?assertNot(is_process_alive(Pid)). +t_do_not_start_after_created(_) -> + {ok, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{start_after_created => false} + ), + %% the resource should remain `disconnected` after created + timer:sleep(200), + ?assertMatch( + {error, {emqx_resource, #{reason := not_connected}}}, + emqx_resource:query(?ID, get_state) + ), + ?assertMatch( + {ok, _, #{status := disconnected}}, + emqx_resource:get_instance(?ID) + ), + + %% start the resource manually.. + ok = emqx_resource:start(?ID), + #{pid := Pid} = emqx_resource:query(?ID, get_state), + ?assert(is_process_alive(Pid)), + + %% restart the resource + ok = emqx_resource:restart(?ID), + ?assertNot(is_process_alive(Pid)), + #{pid := Pid2} = emqx_resource:query(?ID, get_state), + ?assert(is_process_alive(Pid2)), + + ok = emqx_resource:remove_local(?ID), + + ?assertNot(is_process_alive(Pid2)). + t_query(_) -> {ok, _} = emqx_resource:create_local( ?ID, @@ -231,7 +269,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), ?assertMatch( - {error, {emqx_resource, #{reason := not_found}}}, + {error, {emqx_resource, #{reason := not_connected}}}, emqx_resource:query(?ID, get_state) ), @@ -273,7 +311,7 @@ t_stop_start_local(_) -> ?assertNot(is_process_alive(Pid0)), ?assertMatch( - {error, {emqx_resource, #{reason := not_found}}}, + {error, {emqx_resource, #{reason := not_connected}}}, emqx_resource:query(?ID, get_state) ), From d6ef2f7502a8d854f35a63208fe97770bdc2d4cb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 17 Jun 2022 05:29:18 +0800 Subject: [PATCH 2/2] refactor: graceful recreate resources --- apps/emqx/src/emqx_map_lib.erl | 17 +- apps/emqx_bridge/src/emqx_bridge_resource.erl | 16 +- apps/emqx_resource/include/emqx_resource.hrl | 5 +- apps/emqx_resource/src/emqx_resource.erl | 219 ++++++------- .../src/emqx_resource_manager.erl | 288 +++++++++++------- .../src/emqx_resource_manager_sup.erl | 6 +- .../src/proto/emqx_resource_proto_v1.erl | 24 +- .../test/emqx_resource_SUITE.erl | 11 +- 8 files changed, 333 insertions(+), 253 deletions(-) diff --git a/apps/emqx/src/emqx_map_lib.erl b/apps/emqx/src/emqx_map_lib.erl index f5da55283..c714d7dbc 100644 --- a/apps/emqx/src/emqx_map_lib.erl +++ b/apps/emqx/src/emqx_map_lib.erl @@ -31,7 +31,8 @@ deep_convert/3, diff_maps/2, merge_with/3, - best_effort_recursive_sum/3 + best_effort_recursive_sum/3, + if_only_to_toggle_enable/2 ]). -export_type([config_key/0, config_key_path/0]). @@ -316,3 +317,17 @@ deep_filter(M, F) when is_map(M) -> maps:to_list(M) ) ). + +if_only_to_toggle_enable(OldConf, Conf) -> + #{added := Added, removed := Removed, changed := Updated} = + emqx_map_lib:diff_maps(OldConf, Conf), + case {Added, Removed, Updated} of + {Added, Removed, #{enable := _} = Updated} when + map_size(Added) =:= 0, + map_size(Removed) =:= 0, + map_size(Updated) =:= 1 + -> + true; + {_, _, _} -> + false + end. diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 186f99557..678aa1f10 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -111,7 +111,7 @@ update(Type, Name, {OldConf, Conf}) -> %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated %% without restarting the bridge. %% - case if_only_to_toggle_enable(OldConf, Conf) of + case emqx_map_lib:if_only_to_toggle_enable(OldConf, Conf) of false -> ?SLOG(info, #{ msg => "update bridge", @@ -198,20 +198,6 @@ maybe_disable_bridge(Type, Name, Conf) -> true -> ok end. -if_only_to_toggle_enable(OldConf, Conf) -> - #{added := Added, removed := Removed, changed := Updated} = - emqx_map_lib:diff_maps(OldConf, Conf), - case {Added, Removed, Updated} of - {Added, Removed, #{enable := _} = Updated} when - map_size(Added) =:= 0, - map_size(Removed) =:= 0, - map_size(Updated) =:= 1 - -> - true; - {_, _, _} -> - false - end. - fill_dry_run_conf(Conf) -> Conf#{ <<"egress">> => diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index f9256ac7c..dd384af7c 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -14,7 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -type resource_type() :: module(). --type instance_id() :: binary(). +-type resource_id() :: binary(). +-type manager_id() :: binary(). -type raw_resource_config() :: binary() | raw_term_resource_config(). -type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()]. -type resource_config() :: term(). @@ -22,7 +23,7 @@ -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting. -type resource_data() :: #{ - id := instance_id(), + id := resource_id(), mod := module(), config := resource_config(), state := resource_state(), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 4837161ae..33f0d0a3d 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -117,17 +117,17 @@ ]). %% when calling emqx_resource:start/1 --callback on_start(instance_id(), resource_config()) -> +-callback on_start(resource_id(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. %% when calling emqx_resource:stop/1 --callback on_stop(instance_id(), resource_state()) -> term(). +-callback on_stop(resource_id(), resource_state()) -> term(). %% when calling emqx_resource:query/3 --callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term(). +-callback on_query(resource_id(), Request :: term(), after_query(), resource_state()) -> term(). %% when calling emqx_resource:health_check/2 --callback on_get_status(instance_id(), resource_state()) -> +-callback on_get_status(resource_id(), resource_state()) -> resource_status() | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()}. @@ -167,32 +167,32 @@ apply_query_after_calls(Funcs) -> %% ================================================================================= %% APIs for resource instances %% ================================================================================= --spec create(instance_id(), resource_group(), resource_type(), resource_config()) -> +-spec create(resource_id(), resource_group(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create(InstId, Group, ResourceType, Config) -> - create(InstId, Group, ResourceType, Config, #{}). +create(ResId, Group, ResourceType, Config) -> + create(ResId, Group, ResourceType, Config, #{}). --spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> +-spec create(resource_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create(InstId, Group, ResourceType, Config, Opts) -> - emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts). +create(ResId, Group, ResourceType, Config, Opts) -> + emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts). % -------------------------------------------- --spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) -> +-spec create_local(resource_id(), resource_group(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create_local(InstId, Group, ResourceType, Config) -> - create_local(InstId, Group, ResourceType, Config, #{}). +create_local(ResId, Group, ResourceType, Config) -> + create_local(ResId, Group, ResourceType, Config, #{}). -spec create_local( - instance_id(), + resource_id(), resource_group(), resource_type(), resource_config(), create_opts() ) -> {ok, resource_data()}. -create_local(InstId, Group, ResourceType, Config, Opts) -> - emqx_resource_manager:ensure_resource(InstId, Group, ResourceType, Config, Opts). +create_local(ResId, Group, ResourceType, Config, Opts) -> + emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts). -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -204,61 +204,61 @@ create_dry_run(ResourceType, Config) -> create_dry_run_local(ResourceType, Config) -> emqx_resource_manager:create_dry_run(ResourceType, Config). --spec recreate(instance_id(), resource_type(), resource_config()) -> +-spec recreate(resource_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate(InstId, ResourceType, Config) -> - recreate(InstId, ResourceType, Config, #{}). +recreate(ResId, ResourceType, Config) -> + recreate(ResId, ResourceType, Config, #{}). --spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> +-spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate(InstId, ResourceType, Config, Opts) -> - emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts). +recreate(ResId, ResourceType, Config, Opts) -> + emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts). --spec recreate_local(instance_id(), resource_type(), resource_config()) -> +-spec recreate_local(resource_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(InstId, ResourceType, Config) -> - recreate_local(InstId, ResourceType, Config, #{}). +recreate_local(ResId, ResourceType, Config) -> + recreate_local(ResId, ResourceType, Config, #{}). --spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> +-spec recreate_local(resource_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(InstId, ResourceType, Config, Opts) -> - emqx_resource_manager:recreate(InstId, ResourceType, Config, Opts). +recreate_local(ResId, ResourceType, Config, Opts) -> + emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). --spec remove(instance_id()) -> ok | {error, Reason :: term()}. -remove(InstId) -> - emqx_resource_proto_v1:remove(InstId). +-spec remove(resource_id()) -> ok | {error, Reason :: term()}. +remove(ResId) -> + emqx_resource_proto_v1:remove(ResId). --spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. -remove_local(InstId) -> - emqx_resource_manager:remove(InstId). +-spec remove_local(resource_id()) -> ok | {error, Reason :: term()}. +remove_local(ResId) -> + emqx_resource_manager:remove(ResId). --spec reset_metrics_local(instance_id()) -> ok. -reset_metrics_local(InstId) -> - emqx_resource_manager:reset_metrics(InstId). +-spec reset_metrics_local(resource_id()) -> ok. +reset_metrics_local(ResId) -> + emqx_resource_manager:reset_metrics(ResId). --spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}. -reset_metrics(InstId) -> - emqx_resource_proto_v1:reset_metrics(InstId). +-spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}. +reset_metrics(ResId) -> + emqx_resource_proto_v1:reset_metrics(ResId). %% ================================================================================= --spec query(instance_id(), Request :: term()) -> Result :: term(). -query(InstId, Request) -> - query(InstId, Request, inc_metrics_funcs(InstId)). +-spec query(resource_id(), Request :: term()) -> Result :: term(). +query(ResId, Request) -> + query(ResId, Request, inc_metrics_funcs(ResId)). %% same to above, also defines what to do when the Module:on_query success or failed %% it is the duty of the Module to apply the `after_query()` functions. --spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). -query(InstId, Request, AfterQuery) -> - case emqx_resource_manager:ets_lookup(InstId) of +-spec query(resource_id(), Request :: term(), after_query()) -> Result :: term(). +query(ResId, Request, AfterQuery) -> + case emqx_resource_manager:ets_lookup(ResId) of {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe - ok = emqx_metrics_worker:inc(resource_metrics, InstId, matched), + ok = emqx_metrics_worker:inc(resource_metrics, ResId, matched), try - Mod:on_query(InstId, Request, AfterQuery, ResourceState) + Mod:on_query(ResId, Request, AfterQuery, ResourceState) catch Err:Reason:ST -> - emqx_metrics_worker:inc(resource_metrics, InstId, exception), + emqx_metrics_worker:inc(resource_metrics, ResId, exception), erlang:raise(Err, Reason, ST) end; {ok, _Group, _Data} -> @@ -267,39 +267,39 @@ query(InstId, Request, AfterQuery) -> query_error(not_found, <<"resource not found">>) end. --spec start(instance_id()) -> ok | {error, Reason :: term()}. -start(InstId) -> - start(InstId, #{}). +-spec start(resource_id()) -> ok | {error, Reason :: term()}. +start(ResId) -> + start(ResId, #{}). --spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. -start(InstId, Opts) -> - emqx_resource_manager:start(InstId, Opts). +-spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +start(ResId, Opts) -> + emqx_resource_manager:start(ResId, Opts). --spec restart(instance_id()) -> ok | {error, Reason :: term()}. -restart(InstId) -> - restart(InstId, #{}). +-spec restart(resource_id()) -> ok | {error, Reason :: term()}. +restart(ResId) -> + restart(ResId, #{}). --spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. -restart(InstId, Opts) -> - emqx_resource_manager:restart(InstId, Opts). +-spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +restart(ResId, Opts) -> + emqx_resource_manager:restart(ResId, Opts). --spec stop(instance_id()) -> ok | {error, Reason :: term()}. -stop(InstId) -> - emqx_resource_manager:stop(InstId). +-spec stop(resource_id()) -> ok | {error, Reason :: term()}. +stop(ResId) -> + emqx_resource_manager:stop(ResId). --spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}. -health_check(InstId) -> - emqx_resource_manager:health_check(InstId). +-spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}. +health_check(ResId) -> + emqx_resource_manager:health_check(ResId). -set_resource_status_connecting(InstId) -> - emqx_resource_manager:set_resource_status_connecting(InstId). +set_resource_status_connecting(ResId) -> + emqx_resource_manager:set_resource_status_connecting(ResId). --spec get_instance(instance_id()) -> +-spec get_instance(resource_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. -get_instance(InstId) -> - emqx_resource_manager:lookup(InstId). +get_instance(ResId) -> + emqx_resource_manager:lookup(ResId). --spec list_instances() -> [instance_id()]. +-spec list_instances() -> [resource_id()]. list_instances() -> [Id || #{id := Id} <- list_instances_verbose()]. @@ -307,36 +307,37 @@ list_instances() -> list_instances_verbose() -> emqx_resource_manager:list_all(). --spec list_instances_by_type(module()) -> [instance_id()]. +-spec list_instances_by_type(module()) -> [resource_id()]. list_instances_by_type(ResourceType) -> filter_instances(fun (_, RT) when RT =:= ResourceType -> true; (_, _) -> false end). --spec generate_id(term()) -> instance_id(). +-spec generate_id(term()) -> resource_id(). generate_id(Name) when is_binary(Name) -> - Id = integer_to_binary(erlang:unique_integer([positive])), + Id = integer_to_binary(erlang:unique_integer([monotonic, positive])), <>. --spec list_group_instances(resource_group()) -> [instance_id()]. +-spec list_group_instances(resource_group()) -> [resource_id()]. list_group_instances(Group) -> emqx_resource_manager:list_group(Group). --spec call_start(instance_id(), module(), resource_config()) -> +-spec call_start(manager_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. -call_start(InstId, Mod, Config) -> - ?SAFE_CALL(Mod:on_start(InstId, Config)). +call_start(MgrId, Mod, Config) -> + ?SAFE_CALL(Mod:on_start(MgrId, Config)). --spec call_health_check(instance_id(), module(), resource_state()) -> +-spec call_health_check(manager_id(), module(), resource_state()) -> resource_status() | {resource_status(), resource_state()} - | {resource_status(), resource_state(), term()}. -call_health_check(InstId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)). + | {resource_status(), resource_state(), term()} + | {error, term()}. +call_health_check(MgrId, Mod, ResourceState) -> + ?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)). --spec call_stop(instance_id(), module(), resource_state()) -> term(). -call_stop(InstId, Mod, ResourceState) -> - ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)). +-spec call_stop(manager_id(), module(), resource_state()) -> term(). +call_stop(MgrId, Mod, ResourceState) -> + ?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)). -spec check_config(resource_type(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. @@ -344,85 +345,85 @@ check_config(ResourceType, Conf) -> emqx_hocon:check(ResourceType, Conf). -spec check_and_create( - instance_id(), + resource_id(), resource_group(), resource_type(), raw_resource_config() ) -> {ok, resource_data() | 'already_created'} | {error, term()}. -check_and_create(InstId, Group, ResourceType, RawConfig) -> - check_and_create(InstId, Group, ResourceType, RawConfig, #{}). +check_and_create(ResId, Group, ResourceType, RawConfig) -> + check_and_create(ResId, Group, ResourceType, RawConfig, #{}). -spec check_and_create( - instance_id(), + resource_id(), resource_group(), resource_type(), raw_resource_config(), create_opts() ) -> {ok, resource_data() | 'already_created'} | {error, term()}. -check_and_create(InstId, Group, ResourceType, RawConfig, Opts) -> +check_and_create(ResId, Group, ResourceType, RawConfig, Opts) -> check_and_do( ResourceType, RawConfig, - fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end + fun(ResConf) -> create(ResId, Group, ResourceType, ResConf, Opts) end ). -spec check_and_create_local( - instance_id(), + resource_id(), resource_group(), resource_type(), raw_resource_config() ) -> {ok, resource_data()} | {error, term()}. -check_and_create_local(InstId, Group, ResourceType, RawConfig) -> - check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}). +check_and_create_local(ResId, Group, ResourceType, RawConfig) -> + check_and_create_local(ResId, Group, ResourceType, RawConfig, #{}). -spec check_and_create_local( - instance_id(), + resource_id(), resource_group(), resource_type(), raw_resource_config(), create_opts() ) -> {ok, resource_data()} | {error, term()}. -check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) -> +check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> check_and_do( ResourceType, RawConfig, - fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end + fun(ResConf) -> create_local(ResId, Group, ResourceType, ResConf, Opts) end ). -spec check_and_recreate( - instance_id(), + resource_id(), resource_type(), raw_resource_config(), create_opts() ) -> {ok, resource_data()} | {error, term()}. -check_and_recreate(InstId, ResourceType, RawConfig, Opts) -> +check_and_recreate(ResId, ResourceType, RawConfig, Opts) -> check_and_do( ResourceType, RawConfig, - fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end + fun(ResConf) -> recreate(ResId, ResourceType, ResConf, Opts) end ). -spec check_and_recreate_local( - instance_id(), + resource_id(), resource_type(), raw_resource_config(), create_opts() ) -> {ok, resource_data()} | {error, term()}. -check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) -> +check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) -> check_and_do( ResourceType, RawConfig, - fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end + fun(ResConf) -> recreate_local(ResId, ResourceType, ResConf, Opts) end ). check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> case check_config(ResourceType, RawConfig) of - {ok, InstConf} -> Do(InstConf); + {ok, ResConf} -> Do(ResConf); Error -> Error end. @@ -431,9 +432,9 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> filter_instances(Filter) -> [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. -inc_metrics_funcs(InstId) -> - OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, failed]}], - OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, success]}], +inc_metrics_funcs(ResId) -> + OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, failed]}], + OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, success]}], {OnSucc, OnFailed}. safe_apply(Func, Args) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 54439f62e..ddee2fe8d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -23,7 +23,6 @@ % API -export([ ensure_resource/5, - create/5, recreate/4, remove/1, create_dry_run/2, @@ -44,13 +43,13 @@ ]). % Server --export([start_link/5]). +-export([start_link/6]). % Behaviour -export([init/1, callback_mode/0, handle_event/4, terminate/3]). % State record --record(data, {id, group, mod, config, opts, status, state, error}). +-record(data, {id, manager_id, group, mod, config, opts, status, state, error}). -define(SHORT_HEALTHCHECK_INTERVAL, 1000). -define(HEALTHCHECK_INTERVAL, 15000). @@ -70,34 +69,53 @@ %% Triggers the emqx_resource_manager_sup supervisor to actually create %% and link the process itself if not already started. -spec ensure_resource( - instance_id(), + resource_id(), resource_group(), resource_type(), resource_config(), create_opts() ) -> {ok, resource_data()}. -ensure_resource(InstId, Group, ResourceType, Config, Opts) -> - case lookup(InstId) of +ensure_resource(ResId, Group, ResourceType, Config, Opts) -> + case lookup(ResId) of {ok, _Group, Data} -> {ok, Data}; {error, not_found} -> - create(InstId, Group, ResourceType, Config, Opts), - {ok, _Group, Data} = lookup(InstId), - {ok, Data} + MgrId = set_new_owner(ResId), + create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) end. +%% @doc Called from emqx_resource when recreating a resource which may or may not exist +-spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) -> + {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. +recreate(ResId, ResourceType, NewConfig, Opts) -> + case lookup(ResId) of + {ok, Group, #{mod := ResourceType, status := _} = _Data} -> + _ = remove(ResId, false), + MgrId = set_new_owner(ResId), + create_and_return_data(MgrId, ResId, Group, ResourceType, NewConfig, Opts); + {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> + {error, updating_to_incorrect_resource_type}; + {error, not_found} -> + {error, not_found} + end. + +create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> + create(MgrId, ResId, Group, ResourceType, Config, Opts), + {ok, _Group, Data} = lookup(ResId), + {ok, Data}. + %% @doc Create a resource_manager and wait until it is running -create(InstId, Group, ResourceType, Config, Opts) -> +create(MgrId, ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init - ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts), + ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts), ok = emqx_metrics_worker:create_metrics( resource_metrics, - InstId, + ResId, [matched, success, failed, exception], [matched] ), case maps:get(start_after_created, Opts, true) of - true -> wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)); + true -> wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)); false -> ok end, ok. @@ -109,68 +127,55 @@ create(InstId, Group, ResourceType, Config, Opts) -> -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> - InstId = make_test_id(), - ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}), - case wait_for_resource_ready(InstId, 15000) of + ResId = make_test_id(), + MgrId = set_new_owner(ResId), + ok = emqx_resource_manager_sup:ensure_child( + MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{} + ), + case wait_for_resource_ready(ResId, 15000) of ok -> - remove(InstId); + remove(ResId); timeout -> - _ = remove(InstId), + _ = remove(ResId), {error, timeout} end. -%% @doc Called from emqx_resource when recreating a resource which may or may not exist --spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> - {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. -recreate(InstId, ResourceType, NewConfig, Opts) -> - case lookup(InstId) of - {ok, Group, #{mod := ResourceType, status := _} = _Data} -> - _ = remove(InstId, false), - create(InstId, Group, ResourceType, NewConfig, Opts), - {ok, _Group, Data} = lookup(InstId), - {ok, Data}; - {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> - {error, updating_to_incorrect_resource_type}; - {error, not_found} -> - {error, not_found} - end. - %% @doc Stops a running resource_manager and clears the metrics for the resource --spec remove(instance_id()) -> ok | {error, Reason :: term()}. -remove(InstId) when is_binary(InstId) -> - remove(InstId, true). +-spec remove(resource_id()) -> ok | {error, Reason :: term()}. +remove(ResId) when is_binary(ResId) -> + remove(ResId, true). %% @doc Stops a running resource_manager and optionally clears the metrics for the resource --spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}. -remove(InstId, ClearMetrics) when is_binary(InstId) -> - safe_call(InstId, {remove, ClearMetrics}, ?T_OPERATION). +-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. +remove(ResId, ClearMetrics) when is_binary(ResId) -> + safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). %% @doc Stops and then starts an instance that was already running --spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. -restart(InstId, Opts) when is_binary(InstId) -> - case safe_call(InstId, restart, ?T_OPERATION) of +-spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +restart(ResId, Opts) when is_binary(ResId) -> + case safe_call(ResId, restart, ?T_OPERATION) of ok -> - wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)), ok; {error, _Reason} = Error -> Error end. %% @doc Start the resource --spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. -start(InstId, Opts) -> - case safe_call(InstId, start, ?T_OPERATION) of +-spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}. +start(ResId, Opts) -> + case safe_call(ResId, start, ?T_OPERATION) of ok -> - wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)), ok; {error, _Reason} = Error -> Error end. %% @doc Stop the resource --spec stop(instance_id()) -> ok | {error, Reason :: term()}. -stop(InstId) -> - case safe_call(InstId, stop, ?T_OPERATION) of +-spec stop(resource_id()) -> ok | {error, Reason :: term()}. +stop(ResId) -> + case safe_call(ResId, stop, ?T_OPERATION) of ok -> ok; {error, _Reason} = Error -> @@ -178,36 +183,36 @@ stop(InstId) -> end. %% @doc Test helper --spec set_resource_status_connecting(instance_id()) -> ok. -set_resource_status_connecting(InstId) -> - safe_call(InstId, set_resource_status_connecting, infinity). +-spec set_resource_status_connecting(resource_id()) -> ok. +set_resource_status_connecting(ResId) -> + safe_call(ResId, set_resource_status_connecting, infinity). %% @doc Lookup the group and data of a resource --spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -lookup(InstId) -> - case safe_call(InstId, lookup, ?T_LOOKUP) of - {error, timeout} -> ets_lookup(InstId); +-spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. +lookup(ResId) -> + case safe_call(ResId, lookup, ?T_LOOKUP) of + {error, timeout} -> ets_lookup(ResId); Result -> Result end. %% @doc Lookup the group and data of a resource --spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -ets_lookup(InstId) -> - case ets:lookup(?ETS_TABLE, InstId) of - [{_Id, Group, Data}] -> +-spec ets_lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. +ets_lookup(ResId) -> + case read_cache(ResId) of + {Group, Data} -> {ok, Group, data_record_to_external_map_with_metrics(Data)}; - [] -> + not_found -> {error, not_found} end. %% @doc Get the metrics for the specified resource -get_metrics(InstId) -> - emqx_metrics_worker:get_metrics(resource_metrics, InstId). +get_metrics(ResId) -> + emqx_metrics_worker:get_metrics(resource_metrics, ResId). %% @doc Reset the metrics for the specified resource --spec reset_metrics(instance_id()) -> ok. -reset_metrics(InstId) -> - emqx_metrics_worker:reset_metrics(resource_metrics, InstId). +-spec reset_metrics(resource_id()) -> ok. +reset_metrics(ResId) -> + emqx_metrics_worker:reset_metrics(resource_metrics, ResId). %% @doc Returns the data for all resorces -spec list_all() -> [resource_data()] | []. @@ -222,21 +227,22 @@ list_all() -> end. %% @doc Returns a list of ids for all the resources in a group --spec list_group(resource_group()) -> [instance_id()]. +-spec list_group(resource_group()) -> [resource_id()]. list_group(Group) -> List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), lists:flatten(List). --spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}. -health_check(InstId) -> - safe_call(InstId, health_check, ?T_OPERATION). +-spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}. +health_check(ResId) -> + safe_call(ResId, health_check, ?T_OPERATION). %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server -start_link(InstId, Group, ResourceType, Config, Opts) -> +start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> Data = #data{ - id = InstId, + id = ResId, + manager_id = MgrId, group = Group, mod = ResourceType, config = Config, @@ -245,12 +251,14 @@ start_link(InstId, Group, ResourceType, Config, Opts) -> state = undefined, error = undefined }, - gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, {Data, Opts}, []). + Module = atom_to_binary(?MODULE), + ProcName = binary_to_atom(<>, utf8), + gen_statem:start_link({local, ProcName}, ?MODULE, {Data, Opts}, []). init({Data, Opts}) -> process_flag(trap_exit, true), %% init the cache so that lookup/1 will always return something - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), + insert_cache(Data#data.id, Data#data.group, Data), case maps:get(start_after_created, Opts, true) of true -> {ok, connecting, Data, {next_event, internal, start_resource}}; false -> {ok, stopped, Data} @@ -258,7 +266,7 @@ init({Data, Opts}) -> terminate(_Reason, _State, Data) -> _ = maybe_clear_alarm(Data#data.id), - ets:delete(?ETS_TABLE, Data#data.id), + delete_cache(Data#data.id, Data#data.manager_id), ok. %% Behavior callback @@ -279,6 +287,12 @@ handle_event({call, From}, start, stopped, Data) -> start_resource(Data, From); handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; +% Called when the resource received a `quit` message +handle_event(info, quit, stopped, _Data) -> + {stop, {shutdown, quit}}; +handle_event(info, quit, _State, Data) -> + _ = stop_resource(Data), + {stop, {shutdown, quit}}; % Called when the resource is to be stopped handle_event({call, From}, stop, stopped, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; @@ -301,7 +315,7 @@ handle_event({call, From}, health_check, _State, Data) -> handle_manually_health_check(From, Data); % State: CONNECTING handle_event(enter, _OldState, connecting, Data) -> - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), + insert_cache(Data#data.id, Data#data.group, Data), Actions = [{state_timeout, 0, health_check}], {keep_state_and_data, Actions}; handle_event(internal, start_resource, connecting, Data) -> @@ -312,7 +326,7 @@ handle_event(state_timeout, health_check, connecting, Data) -> %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks handle_event(enter, _OldState, connected, Data) -> - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), + insert_cache(Data#data.id, Data#data.group, Data), _ = emqx_alarm:deactivate(Data#data.id), Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], {next_state, connected, Data, Actions}; @@ -320,7 +334,7 @@ handle_event(state_timeout, health_check, connected, Data) -> handle_connected_health_check(Data); %% State: DISCONNECTED handle_event(enter, _OldState, disconnected, Data) -> - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), + insert_cache(Data#data.id, Data#data.group, Data), handle_disconnected_state_enter(Data); handle_event(state_timeout, auto_retry, disconnected, Data) -> start_resource(Data, undefined); @@ -328,14 +342,14 @@ handle_event(state_timeout, auto_retry, disconnected, Data) -> %% The stopped state is entered after the resource has been explicitly stopped handle_event(enter, _OldState, stopped, Data) -> UpdatedData = Data#data{status = disconnected}, - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}), + insert_cache(Data#data.id, Data#data.group, UpdatedData), {next_state, stopped, UpdatedData}; % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( error, #{ - msg => "ignore all other events", + msg => ignore_all_other_events, event_type => EventType, event_data => EventData, state => State, @@ -347,6 +361,47 @@ handle_event(EventType, EventData, State, Data) -> %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ +insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> + case get_owner(ResId) of + not_found -> ets:insert(?ETS_TABLE, {ResId, Group, Data}); + MgrId -> ets:insert(?ETS_TABLE, {ResId, Group, Data}); + _ -> self() ! quit + end. + +read_cache(ResId) -> + case ets:lookup(?ETS_TABLE, ResId) of + [{_Id, Group, Data}] -> {Group, Data}; + [] -> not_found + end. + +delete_cache(ResId, MgrId) -> + case get_owner(ResId) of + MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId -> + do_delete_cache(ResId); + _ -> + ok + end. + +do_delete_cache(<> = ResId) -> + ets:delete(?ETS_TABLE, {owner, ResId}), + ets:delete(?ETS_TABLE, ResId); +do_delete_cache(ResId) -> + ets:delete(?ETS_TABLE, ResId). + +set_new_owner(ResId) -> + MgrId = make_manager_id(ResId), + ok = set_owner(ResId, MgrId), + MgrId. + +set_owner(ResId, MgrId) -> + ets:insert(?ETS_TABLE, {{owner, ResId}, MgrId}), + ok. + +get_owner(ResId) -> + case ets:lookup(?ETS_TABLE, {owner, ResId}) of + [{_, MgrId}] -> MgrId; + [] -> not_found + end. handle_disconnected_state_enter(Data) -> case maps:get(auto_retry_interval, Data#data.opts, undefined) of @@ -367,8 +422,8 @@ handle_remove_event(From, ClearMetrics, Data) -> start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), - case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of + insert_cache(Data#data.id, Data#data.group, Data), + case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of {ok, ResourceState} -> UpdatedData = Data#data{state = ResourceState, status = connecting}, %% Perform an initial health_check immediately before transitioning into a connected state @@ -390,14 +445,16 @@ stop_resource(Data) -> %% We don't care the return value of the Mod:on_stop/2. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. - _ = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), + _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state), _ = maybe_clear_alarm(Data#data.id), ok. -proc_name(Id) -> - Module = atom_to_binary(?MODULE), - Connector = <<"_">>, - binary_to_atom(<>). +make_manager_id(ResId) -> + emqx_resource:generate_id(ResId). + +make_test_id() -> + RandId = iolist_to_binary(emqx_misc:gen_id(16)), + <>. handle_manually_health_check(From, Data) -> with_health_check(Data, fun(Status, UpdatedData) -> @@ -434,13 +491,13 @@ handle_connected_health_check(Data) -> with_health_check(Data, Func) -> ResId = Data#data.id, - HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state), - {Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state), + HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), + {Status, NewState, Err} = parse_health_check_result(HCRes, Data), _ = maybe_alarm(Status, ResId), UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, - ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}), + insert_cache(ResId, UpdatedData#data.group, UpdatedData), Func(Status, UpdatedData). maybe_alarm(connected, _ResId) -> @@ -459,12 +516,22 @@ maybe_clear_alarm(<>) -> maybe_clear_alarm(ResId) -> emqx_alarm:deactivate(ResId). -parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) -> - {Status, OldState, undefined}; -parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) -> +parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> + {Status, Data#data.state, undefined}; +parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) -> {Status, NewState, undefined}; -parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) -> - {Status, NewState, Error}. +parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) -> + {Status, NewState, Error}; +parse_health_check_result({error, Error}, Data) -> + ?SLOG( + error, + #{ + msg => health_check_exception, + resource_id => Data#data.id, + reason => Error + } + ), + {disconnected, Data#data.state, Error}. maybe_reply(Actions, undefined, _Reply) -> Actions; @@ -481,29 +548,30 @@ data_record_to_external_map_with_metrics(Data) -> metrics => get_metrics(Data#data.id) }. -make_test_id() -> - RandId = iolist_to_binary(emqx_misc:gen_id(16)), - <>. +-spec wait_for_resource_ready(resource_id(), integer()) -> ok | timeout. +wait_for_resource_ready(ResId, WaitTime) -> + do_wait_for_resource_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). --spec wait_for_resource_ready(instance_id(), integer()) -> ok | timeout. -wait_for_resource_ready(InstId, WaitTime) -> - do_wait_for_resource_ready(InstId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). - -do_wait_for_resource_ready(_InstId, 0) -> +do_wait_for_resource_ready(_ResId, 0) -> timeout; -do_wait_for_resource_ready(InstId, Retry) -> - case ets_lookup(InstId) of +do_wait_for_resource_ready(ResId, Retry) -> + case ets_lookup(ResId) of {ok, _Group, #{status := connected}} -> ok; _ -> timer:sleep(?WAIT_FOR_RESOURCE_DELAY), - do_wait_for_resource_ready(InstId, Retry - 1) + do_wait_for_resource_ready(ResId, Retry - 1) end. -safe_call(InstId, Message, Timeout) -> +safe_call(ResId, Message, Timeout) -> try - gen_statem:call(proc_name(InstId), Message, {clean_timeout, Timeout}) + Module = atom_to_binary(?MODULE), + MgrId = get_owner(ResId), + ProcName = binary_to_existing_atom(<>, utf8), + gen_statem:call(ProcName, Message, {clean_timeout, Timeout}) catch + error:badarg -> + {error, not_found}; exit:{R, _} when R == noproc; R == normal; R == shutdown -> {error, not_found}; exit:{timeout, _} -> diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index b26d968df..b29c28f70 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -17,14 +17,14 @@ -behaviour(supervisor). --export([ensure_child/5]). +-export([ensure_child/6]). -export([start_link/0]). -export([init/1]). -ensure_child(InstId, Group, ResourceType, Config, Opts) -> - _ = supervisor:start_child(?MODULE, [InstId, Group, ResourceType, Config, Opts]), +ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts) -> + _ = supervisor:start_child(?MODULE, [MgrId, ResId, Group, ResourceType, Config, Opts]), ok. start_link() -> diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index cbbc4e552..cdd2592d9 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -34,16 +34,16 @@ introduced_in() -> "5.0.0". -spec create( - instance_id(), + resource_id(), resource_group(), resource_type(), resource_config(), create_opts() ) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create(InstId, Group, ResourceType, Config, Opts) -> +create(ResId, Group, ResourceType, Config, Opts) -> emqx_cluster_rpc:multicall(emqx_resource, create_local, [ - InstId, Group, ResourceType, Config, Opts + ResId, Group, ResourceType, Config, Opts ]). -spec create_dry_run( @@ -55,19 +55,19 @@ create_dry_run(ResourceType, Config) -> emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]). -spec recreate( - instance_id(), + resource_id(), resource_type(), resource_config(), create_opts() ) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate(InstId, ResourceType, Config, Opts) -> - emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]). +recreate(ResId, ResourceType, Config, Opts) -> + emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [ResId, ResourceType, Config, Opts]). --spec remove(instance_id()) -> ok | {error, Reason :: term()}. -remove(InstId) -> - emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]). +-spec remove(resource_id()) -> ok | {error, Reason :: term()}. +remove(ResId) -> + emqx_cluster_rpc:multicall(emqx_resource, remove_local, [ResId]). --spec reset_metrics(instance_id()) -> ok | {error, any()}. -reset_metrics(InstId) -> - emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]). +-spec reset_metrics(resource_id()) -> ok | {error, any()}. +reset_metrics(ResId) -> + emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [ResId]). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 6da68a537..51e6bac43 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -348,6 +348,16 @@ t_list_filter(_) -> ). t_create_dry_run_local(_) -> + ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}), + lists:foreach( + fun(_) -> + create_dry_run_local_succ() + end, + lists:seq(1, 10) + ), + [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}). + +create_dry_run_local_succ() -> ?assertEqual( ok, emqx_resource:create_dry_run_local( @@ -355,7 +365,6 @@ t_create_dry_run_local(_) -> #{name => test_resource, register => true} ) ), - timer:sleep(100), ?assertEqual(undefined, whereis(test_resource)). t_create_dry_run_local_failed(_) ->