diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 7bf591c72..d98e9f940 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -181,34 +181,22 @@ create( request_timeout => RequestTimeout, resource_id => ResourceId }, - case - emqx_resource:create_local( - ResourceId, - ?RESOURCE_GROUP, - emqx_connector_http, - Config#{ - base_url => BaseUrl, - pool_type => random - }, - #{} - ) - of - {ok, already_created} -> - {ok, State}; - {ok, _} -> - {ok, State}; - {error, Reason} -> - {error, Reason} - end. + {ok, _Data} = emqx_resource:create_local( + ResourceId, + ?RESOURCE_GROUP, + emqx_connector_http, + Config#{ + base_url => BaseUrl, + pool_type => random + }, + #{} + ), + {ok, State}. update(Config, State) -> - case create(Config) of - {ok, NewState} -> - ok = destroy(State), - {ok, NewState}; - {error, Reason} -> - {error, Reason} - end. + {ok, NewState} = create(Config), + ok = destroy(State), + {ok, NewState}. authenticate(#{auth_method := _}, _) -> ignore; diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl index 5de07550e..0d839fc7f 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl @@ -302,7 +302,7 @@ create2( } = Config ) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), - {ok, _} = emqx_resource:create_local( + {ok, _Data} = emqx_resource:create_local( ResourceId, ?RESOURCE_GROUP, emqx_authn_jwks_connector, diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index f515a12e9..e12fcee0d 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -146,31 +146,19 @@ create(#{filter := Filter} = Config) -> filter_template => FilterTemplate, resource_id => ResourceId }, - case - emqx_resource:create_local( - ResourceId, - ?RESOURCE_GROUP, - emqx_connector_mongo, - Config, - #{} - ) - of - {ok, already_created} -> - {ok, NState}; - {ok, _} -> - {ok, NState}; - {error, Reason} -> - {error, Reason} - end. + {ok, _Data} = emqx_resource:create_local( + ResourceId, + ?RESOURCE_GROUP, + emqx_connector_mongo, + Config, + #{} + ), + {ok, NState}. update(Config, State) -> - case create(Config) of - {ok, NewState} -> - ok = destroy(State), - {ok, NewState}; - {error, Reason} -> - {error, Reason} - end. + {ok, NewState} = create(Config), + ok = destroy(State), + {ok, NewState}. authenticate(#{auth_method := _}, _) -> ignore; diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 11575149c..f7cea29d1 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -99,29 +99,19 @@ create( query_timeout => QueryTimeout, resource_id => ResourceId }, - case - emqx_resource:create_local( - ResourceId, - ?RESOURCE_GROUP, - emqx_connector_mysql, - Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}}, - #{} - ) - of - {ok, _} -> - {ok, State}; - {error, Reason} -> - {error, Reason} - end. + {ok, _Data} = emqx_resource:create_local( + ResourceId, + ?RESOURCE_GROUP, + emqx_connector_mysql, + Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}}, + #{} + ), + {ok, State}. update(Config, State) -> - case create(Config) of - {ok, NewState} -> - ok = destroy(State), - {ok, NewState}; - {error, Reason} -> - {error, Reason} - end. + {ok, NewState} = create(Config), + ok = destroy(State), + {ok, NewState}. authenticate(#{auth_method := _}, _) -> ignore; diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 47eb57634..ad248c3c8 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -96,31 +96,19 @@ create( password_hash_algorithm => Algorithm, resource_id => ResourceId }, - case - emqx_resource:create_local( - ResourceId, - ?RESOURCE_GROUP, - emqx_connector_pgsql, - Config#{prepare_statement => #{ResourceId => Query}}, - #{} - ) - of - {ok, already_created} -> - {ok, State}; - {ok, _} -> - {ok, State}; - {error, Reason} -> - {error, Reason} - end. + {ok, _Data} = emqx_resource:create_local( + ResourceId, + ?RESOURCE_GROUP, + emqx_connector_pgsql, + Config#{prepare_statement => #{ResourceId => Query}}, + #{} + ), + {ok, State}. update(Config, State) -> - case create(Config) of - {ok, NewState} -> - ok = destroy(State), - {ok, NewState}; - {error, Reason} -> - {error, Reason} - end. + {ok, NewState} = create(Config), + ok = destroy(State), + {ok, NewState}. authenticate(#{auth_method := _}, _) -> ignore; diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index 37870448c..44754f8eb 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -115,22 +115,14 @@ create( cmd => NCmd, resource_id => ResourceId }, - case - emqx_resource:create_local( - ResourceId, - ?RESOURCE_GROUP, - emqx_connector_redis, - Config, - #{} - ) - of - {ok, already_created} -> - {ok, NState}; - {ok, _} -> - {ok, NState}; - {error, Reason} -> - {error, Reason} - end + {ok, _Data} = emqx_resource:create_local( + ResourceId, + ?RESOURCE_GROUP, + emqx_connector_redis, + Config, + #{} + ), + {ok, NState} catch error:{unsupported_cmd, _Cmd} -> {error, {unsupported_cmd, Cmd}}; diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index 319928670..3c17312ad 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -52,10 +52,8 @@ description() -> init(Config) -> NConfig = parse_config(Config), - case emqx_authz_utils:create_resource(emqx_connector_http, NConfig) of - {error, Reason} -> error({load_config_error, Reason}); - {ok, Id} -> NConfig#{annotations => #{id => Id}} - end. + {ok, Id} = emqx_authz_utils:create_resource(emqx_connector_http, NConfig), + NConfig#{annotations => #{id => Id}}. destroy(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove_local(Id). diff --git a/apps/emqx_authz/src/emqx_authz_mongodb.erl b/apps/emqx_authz/src/emqx_authz_mongodb.erl index 620ca5b42..7c79897e6 100644 --- a/apps/emqx_authz/src/emqx_authz_mongodb.erl +++ b/apps/emqx_authz/src/emqx_authz_mongodb.erl @@ -46,18 +46,14 @@ description() -> "AuthZ with MongoDB". init(#{filter := Filter} = Source) -> - case emqx_authz_utils:create_resource(emqx_connector_mongo, Source) of - {error, Reason} -> - error({load_config_error, Reason}); - {ok, Id} -> - Source#{ - annotations => #{id => Id}, - filter_template => emqx_authz_utils:parse_deep( - Filter, - ?PLACEHOLDERS - ) - } - end. + {ok, Id} = emqx_authz_utils:create_resource(emqx_connector_mongo, Source), + Source#{ + annotations => #{id => Id}, + filter_template => emqx_authz_utils:parse_deep( + Filter, + ?PLACEHOLDERS + ) + }. destroy(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove_local(Id). diff --git a/apps/emqx_authz/src/emqx_authz_mysql.erl b/apps/emqx_authz/src/emqx_authz_mysql.erl index 1bdf511a5..54a85885c 100644 --- a/apps/emqx_authz/src/emqx_authz_mysql.erl +++ b/apps/emqx_authz/src/emqx_authz_mysql.erl @@ -52,12 +52,8 @@ description() -> init(#{query := SQL} = Source0) -> {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS), Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, - case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of - {error, Reason} -> - error({load_config_error, Reason}); - {ok, Id} -> - Source#{annotations => #{id => Id, tmpl_oken => TmplToken}} - end. + {ok, Id} = emqx_authz_utils:create_resource(emqx_connector_mysql, Source), + Source#{annotations => #{id => Id, tmpl_oken => TmplToken}}. destroy(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove_local(Id). diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index 1d7542655..2543620b7 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -54,26 +54,20 @@ init(#{query := SQL0} = Source) -> ?PLACEHOLDERS ), ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql), - case - emqx_resource:create_local( - ResourceID, - ?RESOURCE_GROUP, - emqx_connector_pgsql, - Source#{prepare_statement => #{ResourceID => SQL}}, - #{} - ) - of - {ok, _} -> - Source#{ - annotations => - #{ - id => ResourceID, - placeholders => PlaceHolders - } - }; - {error, Reason} -> - error({load_config_error, Reason}) - end. + {ok, _Data} = emqx_resource:create_local( + ResourceID, + ?RESOURCE_GROUP, + emqx_connector_pgsql, + Source#{prepare_statement => #{ResourceID => SQL}}, + #{} + ), + Source#{ + annotations => + #{ + id => ResourceID, + placeholders => PlaceHolders + } + }. destroy(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove_local(Id). diff --git a/apps/emqx_authz/src/emqx_authz_redis.erl b/apps/emqx_authz/src/emqx_authz_redis.erl index 54ec16475..27d9d31bc 100644 --- a/apps/emqx_authz/src/emqx_authz_redis.erl +++ b/apps/emqx_authz/src/emqx_authz_redis.erl @@ -50,15 +50,11 @@ description() -> init(#{cmd := CmdStr} = Source) -> Cmd = tokens(CmdStr), CmdTemplate = emqx_authz_utils:parse_deep(Cmd, ?PLACEHOLDERS), - case emqx_authz_utils:create_resource(emqx_connector_redis, Source) of - {error, Reason} -> - error({load_config_error, Reason}); - {ok, Id} -> - Source#{ - annotations => #{id => Id}, - cmd_template => CmdTemplate - } - end. + {ok, Id} = emqx_authz_utils:create_resource(emqx_connector_redis, Source), + Source#{ + annotations => #{id => Id}, + cmd_template => CmdTemplate + }. destroy(#{annotations := #{id := Id}}) -> ok = emqx_resource:remove_local(Id). diff --git a/apps/emqx_authz/src/emqx_authz_utils.erl b/apps/emqx_authz/src/emqx_authz_utils.erl index 4babb373b..1d95c7936 100644 --- a/apps/emqx_authz/src/emqx_authz_utils.erl +++ b/apps/emqx_authz/src/emqx_authz_utils.erl @@ -38,19 +38,14 @@ create_resource(Module, Config) -> ResourceID = make_resource_id(Module), - case - emqx_resource:create_local( - ResourceID, - ?RESOURCE_GROUP, - Module, - Config, - #{} - ) - of - {ok, already_created} -> {ok, ResourceID}; - {ok, _} -> {ok, ResourceID}; - {error, Reason} -> {error, Reason} - end. + {ok, _Data} = emqx_resource:create_local( + ResourceID, + ?RESOURCE_GROUP, + Module, + Config, + #{} + ), + {ok, ResourceID}. cleanup_resources() -> lists:foreach( diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d26f8834e..fcd01a44b 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -273,19 +273,14 @@ create(Type, Name, Conf) -> name => Name, config => Conf }), - case - emqx_resource:create_local( - resource_id(Type, Name), - <<"emqx_bridge">>, - emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), - #{} - ) - of - {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); - {ok, _} -> maybe_disable_bridge(Type, Name, Conf); - {error, Reason} -> {error, Reason} - end. + {ok, _Data} = emqx_resource:create_local( + resource_id(Type, Name), + <<"emqx_bridge">>, + emqx_bridge:resource_type(Type), + parse_confs(Type, Name, Conf), + #{} + ), + maybe_disable_bridge(Type, Name, Conf). update(BridgeId, {OldConf, Conf}) -> {BridgeType, BridgeName} = parse_bridge_id(BridgeId), diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index d49c907b7..325acbfe9 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -134,7 +134,7 @@ setup_fake_telemetry_data() -> Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end, NEvents = 3, BackInTime = 0, - Timeout = 1_000, + Timeout = 11_000, {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime), ok = emqx_bridge:load(), {ok, _} = snabbkaffe_collector:receive_events(Sub), diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index edbc02b30..344750c4a 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -120,6 +120,8 @@ lookup_raw(Type, Name) -> end end. +-spec create_dry_run(module(), binary() | #{binary() => term()} | [#{binary() => term()}]) -> + ok | {error, Reason :: term()}. create_dry_run(Type, Conf) -> emqx_bridge:create_dry_run(Type, Conf). diff --git a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl index e1b99942c..294424638 100644 --- a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl @@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual(ok, emqx_resource:health_check(PoolName)), + ?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl index f419a08c4..0631c7ccd 100644 --- a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl @@ -102,7 +102,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual(ok, emqx_resource:health_check(PoolName)), + ?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl index c3620c63c..ed7bf7f4f 100644 --- a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl @@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual(ok, emqx_resource:health_check(PoolName)), + ?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl index fad5c030b..9f0312eb3 100644 --- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -109,7 +109,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), - ?assertEqual(ok, emqx_resource:health_check(PoolName)), + ?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), % Can call stop/1 again on an already stopped instance diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index ba03494fa..647c1b949 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -186,9 +186,9 @@ create_local(InstId, Group, ResourceType, Config) -> resource_config(), create_opts() ) -> - {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. + {ok, resource_data()}. create_local(InstId, Group, ResourceType, Config, Opts) -> - call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}). + emqx_resource_manager:ensure_resource(InstId, Group, ResourceType, Config, Opts). -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -198,8 +198,7 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - RandId = iolist_to_binary(emqx_misc:gen_id(16)), - call_instance(RandId, {create_dry_run, ResourceType, Config}). + emqx_resource_manager:create_dry_run(ResourceType, Config). -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. @@ -209,7 +208,7 @@ recreate(InstId, ResourceType, Config, Opts) -> -spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. recreate_local(InstId, ResourceType, Config, Opts) -> - call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}). + emqx_resource_manager:recreate(InstId, ResourceType, Config, Opts). -spec remove(instance_id()) -> ok | {error, Reason :: term()}. remove(InstId) -> @@ -217,11 +216,11 @@ remove(InstId) -> -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. remove_local(InstId) -> - call_instance(InstId, {remove, InstId}). + emqx_resource_manager:remove(InstId). -spec reset_metrics_local(instance_id()) -> ok. reset_metrics_local(InstId) -> - call_instance(InstId, {reset_metrics, InstId}). + emqx_resource_manager:reset_metrics(InstId). -spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}. reset_metrics(InstId) -> @@ -236,17 +235,7 @@ query(InstId, Request) -> %% it is the duty of the Module to apply the `after_query()` functions. -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). query(InstId, Request, AfterQuery) -> - case get_instance(InstId) of - {ok, _Group, #{status := connecting}} -> - query_error(connecting, << - "cannot serve query when the resource " - "instance is still connecting" - >>); - {ok, _Group, #{status := disconnected}} -> - query_error(disconnected, << - "cannot serve query when the resource " - "instance is disconnected" - >>); + case emqx_resource_manager:ets_lookup(InstId) of {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe @@ -268,23 +257,23 @@ restart(InstId) -> -spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. restart(InstId, Opts) -> - call_instance(InstId, {restart, InstId, Opts}). + emqx_resource_manager:restart(InstId, Opts). -spec stop(instance_id()) -> ok | {error, Reason :: term()}. stop(InstId) -> - call_instance(InstId, {stop, InstId}). + emqx_resource_manager:stop(InstId). -spec health_check(instance_id()) -> ok | {error, Reason :: term()}. health_check(InstId) -> - call_instance(InstId, {health_check, InstId}). + emqx_resource_manager:health_check(InstId). set_resource_status_connecting(InstId) -> - call_instance(InstId, {set_resource_status_connecting, InstId}). + emqx_resource_manager:set_resource_status_connecting(InstId). -spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(InstId) -> - emqx_resource_instance:lookup(InstId). + emqx_resource_manager:lookup(InstId). -spec list_instances() -> [instance_id()]. list_instances() -> @@ -292,7 +281,7 @@ list_instances() -> -spec list_instances_verbose() -> [resource_data()]. list_instances_verbose() -> - emqx_resource_instance:list_all(). + emqx_resource_manager:list_all(). -spec list_instances_by_type(module()) -> [instance_id()]. list_instances_by_type(ResourceType) -> @@ -307,7 +296,7 @@ generate_id(Name) when is_binary(Name) -> <>. -spec list_group_instances(resource_group()) -> [instance_id()]. -list_group_instances(Group) -> emqx_resource_instance:list_group(Group). +list_group_instances(Group) -> emqx_resource_manager:list_group(Group). -spec call_start(instance_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. @@ -422,9 +411,6 @@ inc_metrics_funcs(InstId) -> OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, success]}], {OnSucc, OnFailed}. -call_instance(InstId, Query) -> - emqx_resource_instance:hash_call(InstId, Query). - safe_apply(Func, Args) -> ?SAFE_CALL(erlang:apply(Func, Args)). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl new file mode 100644 index 000000000..c06035a81 --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -0,0 +1,429 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_resource_manager). +-behaviour(gen_statem). + +-include("emqx_resource.hrl"). +-include("emqx_resource_utils.hrl"). + +% API +-export([ + ensure_resource/5, + create_dry_run/2, + ets_lookup/1, + get_metrics/1, + health_check/1, + list_all/0, + list_group/1, + lookup/1, + recreate/4, + remove/1, + reset_metrics/1, + restart/2, + set_resource_status_connecting/1, + stop/1 +]). + +% Server +-export([start_link/5]). + +% Behaviour +-export([init/1, callback_mode/0, handle_event/4, terminate/3]). + +% State record +-record(data, {id, group, mod, config, opts, status, state, error}). + +-define(SHORT_HEALTHCHECK_INTERVAL, 1000). +-define(HEALTHCHECK_INTERVAL, 15000). +-define(ETS_TABLE, emqx_resource_manager). +-define(WAIT_FOR_RESOURCE_DELAY, 100). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +%% @doc Called from emqx_resource when starting a resource instance. +%% +%% Triggers the emqx_resource_manager_sup supervisor to actually create +%% and link the process itself if not already started. +-spec ensure_resource( + instance_id(), + resource_group(), + resource_type(), + resource_config(), + create_opts() +) -> {ok, resource_data()}. +ensure_resource(InstId, Group, ResourceType, Config, Opts) -> + case lookup(InstId) of + {ok, _Group, Data} -> + {ok, Data}; + {error, not_found} -> + do_start(InstId, Group, ResourceType, Config, Opts), + {ok, _Group, Data} = lookup(InstId), + {ok, Data} + end. + +%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. +%% +%% Triggers the `emqx_resource_manager_sup` supervisor to actually create +%% and link the process itself if not already started, and then immedately stops. +-spec create_dry_run(resource_type(), resource_config()) -> + ok | {error, Reason :: term()}. +create_dry_run(ResourceType, Config) -> + InstId = make_test_id(), + ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}), + case wait_for_resource_ready(InstId, 5000) of + ok -> + _ = stop(InstId); + timeout -> + _ = stop(InstId), + {error, timeout} + end. + +%% @doc Called from emqx_resource when recreating a resource which may or may not exist +-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> + {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. +recreate(InstId, ResourceType, NewConfig, Opts) -> + case lookup(InstId) of + {ok, Group, #{mod := ResourceType, status := _} = _Data} -> + _ = remove(InstId, false), + ensure_resource(InstId, Group, ResourceType, NewConfig, Opts); + {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> + {error, updating_to_incorrect_resource_type}; + {error, not_found} -> + {error, not_found} + end. + +%% @doc Stops a running resource_manager and clears the metrics for the resource +-spec remove(instance_id()) -> ok | {error, Reason :: term()}. +remove(InstId) when is_binary(InstId) -> + remove(InstId, true). + +%% @doc Stops a running resource_manager and optionally clears the metrics for the resource +-spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}. +remove(InstId, ClearMetrics) when is_binary(InstId) -> + safe_call(InstId, {remove, ClearMetrics}). + +%% @doc Stops and then starts an instance that was already running +-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. +restart(InstId, Opts) when is_binary(InstId) -> + case lookup(InstId) of + {ok, Group, #{mod := ResourceType, config := Config} = _Data} -> + _ = remove(InstId), + do_start(InstId, Group, ResourceType, Config, Opts); + Error -> + Error + end. + +%% @doc Stop the resource manager process +-spec stop(instance_id()) -> ok | {error, Reason :: term()}. +stop(InstId) -> + case safe_call(InstId, stop) of + ok -> + ok; + {error, not_found} -> + ok; + {error, _Reason} = Error -> + Error + end. + +%% @doc Test helper +-spec set_resource_status_connecting(instance_id()) -> ok. +set_resource_status_connecting(InstId) -> + safe_call(InstId, set_resource_status_connecting). + +%% @doc Lookup the group and data of a resource +-spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. +lookup(InstId) -> + safe_call(InstId, lookup). + +%% @doc Lookup the group and data of a resource +-spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. +ets_lookup(InstId) -> + case ets:lookup(?ETS_TABLE, InstId) of + [{_Id, Group, Data}] -> + {ok, Group, data_record_to_external_map_with_metrics(Data)}; + [] -> + {error, not_found} + end. + +%% @doc Get the metrics for the specified resource +get_metrics(InstId) -> + emqx_metrics_worker:get_metrics(resource_metrics, InstId). + +%% @doc Reset the metrics for the specified resource +-spec reset_metrics(instance_id()) -> ok. +reset_metrics(InstId) -> + emqx_metrics_worker:reset_metrics(resource_metrics, InstId). + +%% @doc Returns the data for all resorces +-spec list_all() -> [resource_data()] | []. +list_all() -> + try + [ + data_record_to_external_map_with_metrics(Data) + || {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE) + ] + catch + error:badarg -> [] + end. + +%% @doc Returns a list of ids for all the resources in a group +-spec list_group(resource_group()) -> [instance_id()]. +list_group(Group) -> + List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), + lists:flatten(List). + +-spec health_check(instance_id()) -> ok | {error, Reason :: term()}. +health_check(InstId) -> + safe_call(InstId, health_check). + +%% Server start/stop callbacks + +%% @doc Function called from the supervisor to actually start the server +start_link(InstId, Group, ResourceType, Config, Opts) -> + Data = #data{ + id = InstId, + group = Group, + mod = ResourceType, + config = Config, + opts = Opts, + status = undefined, + state = undefined, + error = undefined + }, + gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). + +init(Data) -> + {ok, connecting, Data}. + +terminate(_Reason, _State, Data) -> + ets:delete(?ETS_TABLE, Data#data.id), + ok. + +%% Behavior callback + +callback_mode() -> [handle_event_function, state_enter]. + +%% Common event Function + +% Called during testing to force a specific state +handle_event({call, From}, set_resource_status_connecting, _State, Data) -> + {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]}; +% Called when the resource is to be stopped +handle_event({call, From}, stop, _State, #data{status = disconnected} = Data) -> + {next_state, stopped, Data, [{reply, From, ok}]}; +handle_event({call, From}, stop, _State, Data) -> + Result = do_stop(Data), + UpdatedData = Data#data{status = disconnected}, + {next_state, stopped, UpdatedData, [{reply, From, Result}]}; +% Nothing happens once the stopped state is entered. It is a 'holding' state waiting for external actions. +handle_event(enter, _OldState, stopped, Data) -> + {next_state, stopped, Data}; +% Called when a resource is to be stopped and removed. +handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> + handle_remove_event(From, ClearMetrics, Data); +% Called when the state of the resource is being looked up. +handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> + Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)}, + {keep_state_and_data, [{reply, From, Reply}]}; +% An external health check call. Disconnected usually means an error has happened. +handle_event({call, From}, health_check, disconnected, Data) -> + Actions = [{reply, From, {error, Data#data.error}}], + {keep_state_and_data, Actions}; +% Resource has been explicitly stopped, so return that as the error reason. +handle_event({call, From}, health_check, stopped, _Data) -> + Actions = [{reply, From, {error, stopped}}], + {keep_state_and_data, Actions}; +handle_event({call, From}, health_check, _State, Data) -> + handle_health_check_event(From, Data); +% Connecting state enter +handle_event(enter, connecting, connecting, Data) -> + handle_connecting_state_enter_event(Data); +handle_event(enter, _OldState, connecting, Data) -> + Actions = [{state_timeout, 0, health_check}], + {next_state, connecting, Data, Actions}; +% Connecting state health_check timeouts. +% First clause supports desired behavior on initial connection. +handle_event(state_timeout, health_check, connecting, #data{status = disconnected} = Data) -> + {next_state, disconnected, Data}; +handle_event(state_timeout, health_check, connecting, Data) -> + connecting_health_check(Data); +%% The connected state is entered after a successful start of the callback mod +%% and successful health_checks +handle_event(enter, _OldState, connected, Data) -> + Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], + {next_state, connected, Data, Actions}; +handle_event(state_timeout, health_check, connected, Data) -> + perform_connected_health_check(Data); +% Disconnected state entered when a healtcheck has failed. +handle_event(enter, _OldState, disconnected, #data{id = InstId} = Data) -> + UpdatedData = Data#data{status = disconnected}, + ets:delete(?ETS_TABLE, InstId), + {next_state, disconnected, UpdatedData}. + +%%------------------------------------------------------------------------------ +%% internal functions +%%------------------------------------------------------------------------------ + +handle_connecting_state_enter_event(Data) -> + Result = emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config), + case Result of + {ok, ResourceState} -> + UpdatedData = Data#data{state = ResourceState, status = connecting}, + %% Perform an initial health_check immediately before transitioning into a connected state + Actions = [{state_timeout, 0, health_check}], + {next_state, connecting, UpdatedData, Actions}; + {error, Reason} -> + %% Keep track of the error reason why the connection did not work + %% so that the Reason can be returned when the verification call is made. + UpdatedData = Data#data{status = disconnected, error = Reason}, + Actions = [{state_timeout, 0, health_check}], + {next_state, connecting, UpdatedData, Actions} + end. + +handle_remove_event(From, ClearMetrics, Data) -> + do_stop(Data), + ets:delete(?ETS_TABLE, Data#data.id), + case ClearMetrics of + true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id); + false -> ok + end, + {stop_and_reply, normal, [{reply, From, ok}]}. + +do_start(InstId, Group, ResourceType, Config, Opts) -> + % The state machine will make the actual call to the callback/resource module after init + ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts), + ok = emqx_metrics_worker:create_metrics( + resource_metrics, + InstId, + [matched, success, failed, exception], + [matched] + ), + wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), + ok. + +do_stop(#data{state = undefined} = _Data) -> + ok; +do_stop(Data) -> + Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), + ets:delete(?ETS_TABLE, Data#data.id), + Result. + +proc_name(Id) -> + Module = atom_to_binary(?MODULE), + Connector = <<"_">>, + binary_to_atom(<>). + +handle_health_check_event(From, Data) -> + case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of + connected -> + UpdatedData = Data#data{status = connected, error = undefined}, + update_resource(Data#data.id, Data#data.group, UpdatedData), + Actions = [{reply, From, ok}], + {next_state, connected, UpdatedData, Actions}; + {connected, NewResourceState} -> + UpdatedData = Data#data{ + state = NewResourceState, status = connected, error = undefined + }, + update_resource(Data#data.id, Data#data.group, UpdatedData), + Actions = [{reply, From, ok}], + {next_state, connected, UpdatedData, Actions}; + ConnectStatus -> + logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]), + UpdatedData = Data#data{status = connecting, error = ConnectStatus}, + ets:delete(?ETS_TABLE, Data#data.id), + Actions = [{reply, From, {error, ConnectStatus}}], + {next_state, connecting, UpdatedData, Actions} + end. + +connecting_health_check(Data) -> + case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of + connected -> + UpdatedData = Data#data{status = connected, error = undefined}, + update_resource(Data#data.id, Data#data.group, UpdatedData), + {next_state, connected, UpdatedData}; + {connected, NewResourceState} -> + UpdatedData = Data#data{ + state = NewResourceState, status = connected, error = undefined + }, + update_resource(Data#data.id, Data#data.group, UpdatedData), + {next_state, connected, UpdatedData}; + ConnectStatus -> + logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]), + UpdatedData = Data#data{error = ConnectStatus}, + Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}], + {keep_state, UpdatedData, Actions} + end. + +perform_connected_health_check(Data) -> + case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of + connected -> + Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], + {keep_state_and_data, Actions}; + {connected, NewResourceState} -> + UpdatedData = Data#data{ + state = NewResourceState, status = connected, error = undefined + }, + update_resource(Data#data.id, Data#data.group, UpdatedData), + Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], + {keep_state, NewResourceState, Actions}; + ConnectStatus -> + logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]), + UpdatedData = Data#data{error = ConnectStatus}, + ets:delete(?ETS_TABLE, Data#data.id), + {next_state, connecting, UpdatedData} + end. + +data_record_to_external_map_with_metrics(Data) -> + #{ + id => Data#data.id, + mod => Data#data.mod, + config => Data#data.config, + status => Data#data.status, + state => Data#data.state, + metrics => get_metrics(Data#data.id) + }. + +make_test_id() -> + RandId = iolist_to_binary(emqx_misc:gen_id(16)), + <>. + +-spec wait_for_resource_ready(instance_id(), integer()) -> ok | timeout. +wait_for_resource_ready(InstId, WaitTime) -> + do_wait_for_resource_ready(InstId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). + +do_wait_for_resource_ready(_InstId, 0) -> + timeout; +do_wait_for_resource_ready(InstId, Retry) -> + case ets_lookup(InstId) of + {ok, _Group, #{status := connected}} -> + ok; + _ -> + timer:sleep(?WAIT_FOR_RESOURCE_DELAY), + do_wait_for_resource_ready(InstId, Retry - 1) + end. + +safe_call(InstId, Message) -> + try + gen_statem:call(proc_name(InstId), Message) + catch + exit:_ -> + {error, not_found} + end. + +update_resource(InstId, Group, Data) -> + ets:insert(?ETS_TABLE, {InstId, Group, Data}). diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl new file mode 100644 index 000000000..b26d968df --- /dev/null +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_resource_manager_sup). + +-behaviour(supervisor). + +-export([ensure_child/5]). + +-export([start_link/0]). + +-export([init/1]). + +ensure_child(InstId, Group, ResourceType, Config, Opts) -> + _ = supervisor:start_child(?MODULE, [InstId, Group, ResourceType, Config, Opts]), + ok. + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + TabOpts = [named_table, set, public, {read_concurrency, true}], + _ = ets:new(emqx_resource_manager, TabOpts), + + ChildSpecs = [ + #{ + id => emqx_resource_manager, + start => {emqx_resource_manager, start_link, []}, + restart => transient, + shutdown => brutal_kill, + type => worker, + modules => [emqx_resource_manager] + } + ], + + SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10}, + {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 3f35c2399..9eb23be17 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -35,45 +35,13 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, Metrics = emqx_metrics_worker:child_spec(resource_metrics), - Pool = ?RESOURCE_INST_MOD, - Mod = ?RESOURCE_INST_MOD, - ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]), - ResourceInsts = [ - begin - ensure_pool_worker(Pool, {Pool, Idx}, Idx), - #{ - id => {Mod, Idx}, - start => {Mod, start_link, [Pool, Idx]}, - restart => transient, - shutdown => 5000, - type => worker, - modules => [Mod] - } - end - || Idx <- lists:seq(1, ?POOL_SIZE) - ], - HealthCheck = + ResourceManager = #{ - id => emqx_resource_health_check_sup, - start => {emqx_resource_health_check_sup, start_link, []}, - restart => transient, + id => emqx_resource_manager_sup, + start => {emqx_resource_manager_sup, start_link, []}, + restart => permanent, shutdown => infinity, type => supervisor, - modules => [emqx_resource_health_check_sup] + modules => [emqx_resource_manager_sup] }, - {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}. - -%% internal functions -ensure_pool(Pool, Type, Opts) -> - try - gproc_pool:new(Pool, Type, Opts) - catch - error:exists -> ok - end. - -ensure_pool_worker(Pool, Name, Slot) -> - try - gproc_pool:add_worker(Pool, Name, Slot) - catch - error:exists -> ok - end. + {ok, {SupFlags, [Metrics, ResourceManager]}}. diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index 0aa4fd40b..04e489f78 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -20,7 +20,6 @@ -export([ introduced_in/0, - create/5, create_dry_run/2, recreate/4, @@ -29,47 +28,48 @@ ]). -include_lib("emqx/include/bpapi.hrl"). +-include("emqx_resource.hrl"). introduced_in() -> "5.0.0". -spec create( - emqx_resource:instance_id(), - emqx_resource:resource_group(), - emqx_resource:resource_type(), - emqx_resource:resource_config(), - emqx_resource:create_opts() + instance_id(), + resource_group(), + resource_type(), + resource_config(), + create_opts() ) -> - emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()). + emqx_cluster_rpc:multicall_return(resource_data()). create(InstId, Group, ResourceType, Config, Opts) -> emqx_cluster_rpc:multicall(emqx_resource, create_local, [ InstId, Group, ResourceType, Config, Opts ]). -spec create_dry_run( - emqx_resource:resource_type(), - emqx_resource:resource_config() + resource_type(), + resource_config() ) -> - emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()). + emqx_cluster_rpc:multicall_return(resource_data()). create_dry_run(ResourceType, Config) -> emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]). -spec recreate( - emqx_resource:instance_id(), - emqx_resource:resource_type(), - emqx_resource:resource_config(), - emqx_resource:create_opts() + instance_id(), + resource_type(), + resource_config(), + create_opts() ) -> - emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()). + emqx_cluster_rpc:multicall_return(resource_data()). recreate(InstId, ResourceType, Config, Opts) -> emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]). --spec remove(emqx_resource:instance_id()) -> +-spec remove(instance_id()) -> emqx_cluster_rpc:multicall_return(ok). remove(InstId) -> emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]). --spec reset_metrics(emqx_resource:instance_id()) -> +-spec reset_metrics(instance_id()) -> emqx_cluster_rpc:multicall_return(ok). reset_metrics(InstId) -> emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 865917128..00868ebde 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -73,7 +73,7 @@ t_create_remove(_) -> #{name => test_resource} ), - emqx_resource:recreate( + {ok, _} = emqx_resource:recreate( ?ID, ?TEST_RESOURCE, #{name => test_resource}, @@ -178,7 +178,6 @@ t_healthy(_) -> ), timer:sleep(400), - emqx_resource_health_check:create_checker(?ID, 15000, 10000), #{pid := Pid} = emqx_resource:query(?ID, get_state), timer:sleep(300), emqx_resource:set_resource_status_connecting(?ID), @@ -192,10 +191,10 @@ t_healthy(_) -> erlang:exit(Pid, shutdown), - ?assertEqual(ok, emqx_resource:health_check(?ID)), + ?assertEqual({error, connecting}, emqx_resource:health_check(?ID)), ?assertMatch( - [#{status := connecting}], + [], emqx_resource:list_instances_verbose() ), @@ -232,7 +231,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), ?assertMatch( - {error, {emqx_resource, #{reason := disconnected}}}, + {error, {emqx_resource, #{reason := not_found}}}, emqx_resource:query(?ID, get_state) ), @@ -275,7 +274,7 @@ t_stop_start_local(_) -> ?assertNot(is_process_alive(Pid0)), ?assertMatch( - {error, {emqx_resource, #{reason := disconnected}}}, + {error, {emqx_resource, #{reason := not_found}}}, emqx_resource:query(?ID, get_state) ), @@ -323,23 +322,23 @@ t_create_dry_run_local(_) -> ?assertEqual(undefined, whereis(test_resource)). t_create_dry_run_local_failed(_) -> - {Res, _} = emqx_resource:create_dry_run_local( + {Res1, _} = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{cteate_error => true} ), - ?assertEqual(error, Res), + ?assertEqual(error, Res1), - {Res, _} = emqx_resource:create_dry_run_local( + {Res2, _} = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, health_check_error => true} ), - ?assertEqual(error, Res), + ?assertEqual(error, Res2), - {Res, _} = emqx_resource:create_dry_run_local( + {Res3, _} = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, stop_error => true} ), - ?assertEqual(error, Res). + ?assertEqual(error, Res3). t_test_func(_) -> ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])), diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 51ffe950b..1b198a0d4 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -385,22 +385,14 @@ create_resource(Context, #{type := built_in_database} = Cfg) -> Context; create_resource(Context, #{type := DB} = Config) -> ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]), - case - emqx_resource:create( - ResourceID, - <<"emqx_retainer">>, - list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), - Config, - #{} - ) - of - {ok, already_created} -> - Context#{resource_id => ResourceID}; - {ok, _} -> - Context#{resource_id => ResourceID}; - {error, Reason} -> - error({load_config_error, Reason}) - end. + _ = emqx_resource:create( + ResourceID, + <<"emqx_retainer">>, + list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), + Config, + #{} + ), + Context#{resource_id => ResourceID}. -spec close_resource(context()) -> ok | {error, term()}. close_resource(#{resource_id := ResourceId}) ->