From 0a04b1ad6e73dc0ae29e1b6c15ad3ff23edd0b2e Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 11 Jul 2024 17:09:10 +0800 Subject: [PATCH] feat: add group/type to resource slog --- apps/emqx_auth/include/emqx_authn.hrl | 2 +- apps/emqx_auth/include/emqx_authz.hrl | 2 +- .../src/emqx_authn/emqx_authn_utils.erl | 11 +- .../src/emqx_authz/emqx_authz_utils.erl | 14 ++- apps/emqx_auth_http/src/emqx_authn_http.erl | 5 +- apps/emqx_auth_http/src/emqx_authz_http.erl | 6 +- apps/emqx_auth_jwt/src/emqx_authn_jwt.erl | 1 + .../emqx_auth_ldap/src/emqx_auth_ldap.app.src | 2 +- apps/emqx_auth_ldap/src/emqx_authn_ldap.erl | 4 +- apps/emqx_auth_ldap/src/emqx_authz_ldap.erl | 4 +- .../test/emqx_authn_ldap_SUITE.erl | 1 + .../test/emqx_authn_ldap_bind_SUITE.erl | 1 + .../test/emqx_authz_ldap_SUITE.erl | 1 + .../src/emqx_auth_mongodb.app.src | 2 +- .../src/emqx_authn_mongodb.erl | 3 +- .../src/emqx_authz_mongodb.erl | 4 +- apps/emqx_auth_mysql/src/emqx_authn_mysql.erl | 4 +- apps/emqx_auth_mysql/src/emqx_authz_mysql.erl | 4 +- .../test/emqx_authn_mysql_SUITE.erl | 1 + .../test/emqx_authz_mysql_SUITE.erl | 1 + .../src/emqx_authn_postgresql.erl | 3 +- .../src/emqx_authz_postgresql.erl | 2 + .../test/emqx_authn_postgresql_SUITE.erl | 5 +- .../test/emqx_authz_postgresql_SUITE.erl | 1 + apps/emqx_auth_redis/src/emqx_authn_redis.erl | 3 +- apps/emqx_auth_redis/src/emqx_authz_redis.erl | 4 +- .../test/emqx_authn_redis_SUITE.erl | 1 + .../test/emqx_authz_redis_SUITE.erl | 1 + apps/emqx_bridge/src/emqx_bridge_resource.erl | 6 +- .../emqx_bridge_cassandra_connector_SUITE.erl | 1 + ...emqx_bridge_clickhouse_connector_SUITE.erl | 2 + ...emqx_bridge_greptimedb_connector_SUITE.erl | 1 + .../emqx_bridge_influxdb_connector_SUITE.erl | 2 + .../src/emqx_bridge_pulsar_connector.erl | 2 +- .../emqx_bridge_rabbitmq_connector_SUITE.erl | 1 + .../emqx_connector/include/emqx_connector.hrl | 2 +- .../src/emqx_connector_resource.erl | 7 +- .../src/emqx_dashboard_sso_manager.erl | 5 +- .../test/emqx_dashboard_sso_ldap_SUITE.erl | 2 +- apps/emqx_ldap/test/emqx_ldap_SUITE.erl | 2 + apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl | 1 + apps/emqx_mysql/test/emqx_mysql_SUITE.erl | 1 + .../test/emqx_postgresql_SUITE.erl | 1 + apps/emqx_redis/test/emqx_redis_SUITE.erl | 1 + apps/emqx_resource/include/emqx_resource.hrl | 1 + apps/emqx_resource/src/emqx_resource.erl | 70 ++++++----- .../src/emqx_resource_manager.erl | 118 ++++++++++++------ .../src/emqx_resource_manager_sup.erl | 15 ++- .../test/emqx_resource_SUITE.erl | 20 ++- 49 files changed, 232 insertions(+), 122 deletions(-) diff --git a/apps/emqx_auth/include/emqx_authn.hrl b/apps/emqx_auth/include/emqx_authn.hrl index 1f2c6b8b9..8ffab1203 100644 --- a/apps/emqx_auth/include/emqx_authn.hrl +++ b/apps/emqx_auth/include/emqx_authn.hrl @@ -30,6 +30,6 @@ -type authenticator_id() :: binary(). --define(AUTHN_RESOURCE_GROUP, <<"emqx_authn">>). +-define(AUTHN_RESOURCE_GROUP, <<"authn">>). -endif. diff --git a/apps/emqx_auth/include/emqx_authz.hrl b/apps/emqx_auth/include/emqx_authz.hrl index 6dc80cb3f..73ff44c25 100644 --- a/apps/emqx_auth/include/emqx_authz.hrl +++ b/apps/emqx_auth/include/emqx_authz.hrl @@ -158,7 +158,7 @@ count => 1 }). --define(AUTHZ_RESOURCE_GROUP, <<"emqx_authz">>). +-define(AUTHZ_RESOURCE_GROUP, <<"authz">>). -define(AUTHZ_FEATURES, [rich_actions]). diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl index a08ac260c..e81145f2c 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl @@ -21,8 +21,8 @@ -include_lib("snabbkaffe/include/trace.hrl"). -export([ - create_resource/3, - update_resource/3, + create_resource/4, + update_resource/4, check_password_from_selected_map/3, parse_deep/1, parse_str/1, @@ -66,8 +66,9 @@ %% APIs %%-------------------------------------------------------------------- -create_resource(ResourceId, Module, Config) -> +create_resource(Type, ResourceId, Module, Config) -> Result = emqx_resource:create_local( + Type, ResourceId, ?AUTHN_RESOURCE_GROUP, Module, @@ -76,9 +77,9 @@ create_resource(ResourceId, Module, Config) -> ), start_resource_if_enabled(Result, ResourceId, Config). -update_resource(Module, Config, ResourceId) -> +update_resource(Type, Module, Config, ResourceId) -> Result = emqx_resource:recreate_local( - ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS + Type, ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS ), start_resource_if_enabled(Result, ResourceId, Config). diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl index e4343b6fa..533f982c1 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl @@ -25,9 +25,9 @@ -export([ cleanup_resources/0, make_resource_id/1, - create_resource/2, create_resource/3, - update_resource/2, + create_resource/4, + update_resource/3, remove_resource/1, update_config/2, parse_deep/2, @@ -57,12 +57,13 @@ %% APIs %%-------------------------------------------------------------------- -create_resource(Module, Config) -> +create_resource(Type, Module, Config) -> ResourceId = make_resource_id(Module), - create_resource(ResourceId, Module, Config). + create_resource(Type, ResourceId, Module, Config). -create_resource(ResourceId, Module, Config) -> +create_resource(Type, ResourceId, Module, Config) -> Result = emqx_resource:create_local( + Type, ResourceId, ?AUTHZ_RESOURCE_GROUP, Module, @@ -71,10 +72,11 @@ create_resource(ResourceId, Module, Config) -> ), start_resource_if_enabled(Result, ResourceId, Config). -update_resource(Module, #{annotations := #{id := ResourceId}} = Config) -> +update_resource(Type, Module, #{annotations := #{id := ResourceId}} = Config) -> Result = case emqx_resource:recreate_local( + Type, ResourceId, Module, Config, diff --git a/apps/emqx_auth_http/src/emqx_authn_http.erl b/apps/emqx_auth_http/src/emqx_authn_http.erl index d9c5c5ed5..ed151428f 100644 --- a/apps/emqx_auth_http/src/emqx_authn_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_http.erl @@ -40,6 +40,7 @@ create(Config0) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), % {Config, State} = parse_config(Config0), {ok, _Data} = emqx_authn_utils:create_resource( + http, ResourceId, emqx_bridge_http_connector, Config @@ -50,7 +51,9 @@ create(Config0) -> update(Config0, #{resource_id := ResourceId} = _State) -> with_validated_config(Config0, fun(Config, NState) -> % {Config, NState} = parse_config(Config0), - case emqx_authn_utils:update_resource(emqx_bridge_http_connector, Config, ResourceId) of + case + emqx_authn_utils:update_resource(http, emqx_bridge_http_connector, Config, ResourceId) + of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_http/src/emqx_authz_http.erl b/apps/emqx_auth_http/src/emqx_authz_http.erl index 6b0152b7d..2a1a2638b 100644 --- a/apps/emqx_auth_http/src/emqx_authz_http.erl +++ b/apps/emqx_auth_http/src/emqx_authz_http.erl @@ -66,12 +66,14 @@ description() -> create(Config) -> NConfig = parse_config(Config), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_bridge_http_connector, NConfig), + {ok, _Data} = emqx_authz_utils:create_resource( + http, ResourceId, emqx_bridge_http_connector, NConfig + ), NConfig#{annotations => #{id => ResourceId}}. update(Config) -> NConfig = parse_config(Config), - case emqx_authz_utils:update_resource(emqx_bridge_http_connector, NConfig) of + case emqx_authz_utils:update_resource(http, emqx_bridge_http_connector, NConfig) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> NConfig#{annotations => #{id => Id}} end. diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl index 18c954ab5..e01121a01 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl @@ -183,6 +183,7 @@ do_create( ) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {ok, _Data} = emqx_resource:create_local( + jwt, ResourceId, ?AUTHN_RESOURCE_GROUP, emqx_authn_jwks_connector, diff --git a/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src b/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src index d84d6ff81..a58117356 100644 --- a/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src +++ b/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_ldap, [ {description, "EMQX LDAP Authentication and Authorization"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {mod, {emqx_auth_ldap_app, []}}, {applications, [ diff --git a/apps/emqx_auth_ldap/src/emqx_authn_ldap.erl b/apps/emqx_auth_ldap/src/emqx_authn_ldap.erl index a18236d15..8a9b5650a 100644 --- a/apps/emqx_auth_ldap/src/emqx_authn_ldap.erl +++ b/apps/emqx_auth_ldap/src/emqx_authn_ldap.erl @@ -40,12 +40,12 @@ create(_AuthenticatorID, Config) -> do_create(Module, Config) -> ResourceId = emqx_authn_utils:make_resource_id(Module), State = parse_config(Config), - {ok, _Data} = emqx_authn_utils:create_resource(ResourceId, emqx_ldap, Config), + {ok, _Data} = emqx_authn_utils:create_resource(ldap, ResourceId, emqx_ldap, Config), {ok, State#{resource_id => ResourceId}}. update(Config, #{resource_id := ResourceId} = _State) -> NState = parse_config(Config), - case emqx_authn_utils:update_resource(emqx_ldap, Config, ResourceId) of + case emqx_authn_utils:update_resource(ldap, emqx_ldap, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_ldap/src/emqx_authz_ldap.erl b/apps/emqx_auth_ldap/src/emqx_authz_ldap.erl index 24bd8c008..e70b840a3 100644 --- a/apps/emqx_auth_ldap/src/emqx_authz_ldap.erl +++ b/apps/emqx_auth_ldap/src/emqx_authz_ldap.erl @@ -56,12 +56,12 @@ description() -> create(Source) -> ResourceId = emqx_authz_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_ldap, Source), + {ok, _Data} = emqx_authz_utils:create_resource(ldap, ResourceId, emqx_ldap, Source), Annotations = new_annotations(#{id => ResourceId}, Source), Source#{annotations => Annotations}. update(Source) -> - case emqx_authz_utils:update_resource(emqx_ldap, Source) of + case emqx_authz_utils:update_resource(ldap, emqx_ldap, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> diff --git a/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl b/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl index ac941f268..f6ee582be 100644 --- a/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl +++ b/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl @@ -47,6 +47,7 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( + ldap, ?LDAP_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_ldap, diff --git a/apps/emqx_auth_ldap/test/emqx_authn_ldap_bind_SUITE.erl b/apps/emqx_auth_ldap/test/emqx_authn_ldap_bind_SUITE.erl index d8e9c0d8e..c39225e74 100644 --- a/apps/emqx_auth_ldap/test/emqx_authn_ldap_bind_SUITE.erl +++ b/apps/emqx_auth_ldap/test/emqx_authn_ldap_bind_SUITE.erl @@ -47,6 +47,7 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( + ldap, ?LDAP_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_ldap, diff --git a/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl b/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl index 09875a3fa..3f8c0ba63 100644 --- a/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl +++ b/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl @@ -178,6 +178,7 @@ stop_apps(Apps) -> create_ldap_resource() -> {ok, _} = emqx_resource:create_local( + ldap, ?LDAP_RESOURCE, ?AUTHZ_RESOURCE_GROUP, emqx_ldap, diff --git a/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src b/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src index df3cc1268..5ffc59787 100644 --- a/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src +++ b/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mongodb, [ {description, "EMQX MongoDB Authentication and Authorization"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {mod, {emqx_auth_mongodb_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mongodb/src/emqx_authn_mongodb.erl b/apps/emqx_auth_mongodb/src/emqx_authn_mongodb.erl index 75a474c0c..ffe78159d 100644 --- a/apps/emqx_auth_mongodb/src/emqx_authn_mongodb.erl +++ b/apps/emqx_auth_mongodb/src/emqx_authn_mongodb.erl @@ -37,6 +37,7 @@ create(Config0) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {Config, State} = parse_config(Config0), {ok, _Data} = emqx_authn_utils:create_resource( + mongodb, ResourceId, emqx_mongodb, Config @@ -45,7 +46,7 @@ create(Config0) -> update(Config0, #{resource_id := ResourceId} = _State) -> {Config, NState} = parse_config(Config0), - case emqx_authn_utils:update_resource(emqx_mongodb, Config, ResourceId) of + case emqx_authn_utils:update_resource(mongodb, emqx_mongodb, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_mongodb/src/emqx_authz_mongodb.erl b/apps/emqx_auth_mongodb/src/emqx_authz_mongodb.erl index 0bab6ef90..0d65afe0d 100644 --- a/apps/emqx_auth_mongodb/src/emqx_authz_mongodb.erl +++ b/apps/emqx_auth_mongodb/src/emqx_authz_mongodb.erl @@ -49,13 +49,13 @@ description() -> create(#{filter := Filter} = Source) -> ResourceId = emqx_authz_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_mongodb, Source), + {ok, _Data} = emqx_authz_utils:create_resource(mongodb, ResourceId, emqx_mongodb, Source), FilterTemp = emqx_authz_utils:parse_deep(Filter, ?ALLOWED_VARS), Source#{annotations => #{id => ResourceId}, filter_template => FilterTemp}. update(#{filter := Filter} = Source) -> FilterTemp = emqx_authz_utils:parse_deep(Filter, ?ALLOWED_VARS), - case emqx_authz_utils:update_resource(emqx_mongodb, Source) of + case emqx_authz_utils:update_resource(mongodb, emqx_mongodb, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> diff --git a/apps/emqx_auth_mysql/src/emqx_authn_mysql.erl b/apps/emqx_auth_mysql/src/emqx_authn_mysql.erl index f68c74a14..7d03ca856 100644 --- a/apps/emqx_auth_mysql/src/emqx_authn_mysql.erl +++ b/apps/emqx_auth_mysql/src/emqx_authn_mysql.erl @@ -39,12 +39,12 @@ create(_AuthenticatorID, Config) -> create(Config0) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {Config, State} = parse_config(Config0), - {ok, _Data} = emqx_authn_utils:create_resource(ResourceId, emqx_mysql, Config), + {ok, _Data} = emqx_authn_utils:create_resource(mysql, ResourceId, emqx_mysql, Config), {ok, State#{resource_id => ResourceId}}. update(Config0, #{resource_id := ResourceId} = _State) -> {Config, NState} = parse_config(Config0), - case emqx_authn_utils:update_resource(emqx_mysql, Config, ResourceId) of + case emqx_authn_utils:update_resource(mysql, emqx_mysql, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl b/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl index 0e2b77005..557eed14f 100644 --- a/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl +++ b/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl @@ -53,13 +53,13 @@ create(#{query := SQL} = Source0) -> {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?ALLOWED_VARS), ResourceId = emqx_authz_utils:make_resource_id(?MODULE), Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, - {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_mysql, Source), + {ok, _Data} = emqx_authz_utils:create_resource(mysql, ResourceId, emqx_mysql, Source), Source#{annotations => #{id => ResourceId, tmpl_token => TmplToken}}. update(#{query := SQL} = Source0) -> {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?ALLOWED_VARS), Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, - case emqx_authz_utils:update_resource(emqx_mysql, Source) of + case emqx_authz_utils:update_resource(mysql, emqx_mysql, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> diff --git a/apps/emqx_auth_mysql/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_auth_mysql/test/emqx_authn_mysql_SUITE.erl index 8ab812fc0..78b8aa8b3 100644 --- a/apps/emqx_auth_mysql/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_auth_mysql/test/emqx_authn_mysql_SUITE.erl @@ -58,6 +58,7 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( + mysql, ?MYSQL_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_mysql, diff --git a/apps/emqx_auth_mysql/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_auth_mysql/test/emqx_authz_mysql_SUITE.erl index ce30e203e..5fe020f1b 100644 --- a/apps/emqx_auth_mysql/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_auth_mysql/test/emqx_authz_mysql_SUITE.erl @@ -446,6 +446,7 @@ stop_apps(Apps) -> create_mysql_resource() -> {ok, _} = emqx_resource:create_local( + mysql, ?MYSQL_RESOURCE, ?AUTHZ_RESOURCE_GROUP, emqx_mysql, diff --git a/apps/emqx_auth_postgresql/src/emqx_authn_postgresql.erl b/apps/emqx_auth_postgresql/src/emqx_authn_postgresql.erl index 980c2ddd8..95e8ac7ce 100644 --- a/apps/emqx_auth_postgresql/src/emqx_authn_postgresql.erl +++ b/apps/emqx_auth_postgresql/src/emqx_authn_postgresql.erl @@ -45,6 +45,7 @@ create(Config0) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {Config, State} = parse_config(Config0, ResourceId), {ok, _Data} = emqx_authn_utils:create_resource( + postgresql, ResourceId, emqx_postgresql, Config @@ -53,7 +54,7 @@ create(Config0) -> update(Config0, #{resource_id := ResourceId} = _State) -> {Config, NState} = parse_config(Config0, ResourceId), - case emqx_authn_utils:update_resource(emqx_postgresql, Config, ResourceId) of + case emqx_authn_utils:update_resource(postgresql, emqx_postgresql, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl b/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl index d1a0b32ea..e0fc9e032 100644 --- a/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl +++ b/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl @@ -53,6 +53,7 @@ create(#{query := SQL0} = Source) -> {SQL, PlaceHolders} = emqx_authz_utils:parse_sql(SQL0, '$n', ?ALLOWED_VARS), ResourceID = emqx_authz_utils:make_resource_id(emqx_postgresql), {ok, _Data} = emqx_authz_utils:create_resource( + postgresql, ResourceID, emqx_postgresql, Source#{prepare_statement => #{ResourceID => SQL}} @@ -63,6 +64,7 @@ update(#{query := SQL0, annotations := #{id := ResourceID}} = Source) -> {SQL, PlaceHolders} = emqx_authz_utils:parse_sql(SQL0, '$n', ?ALLOWED_VARS), case emqx_authz_utils:update_resource( + postgresql, emqx_postgresql, Source#{prepare_statement => #{ResourceID => SQL}} ) diff --git a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl index 50bff634d..e899edbd9 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl @@ -79,6 +79,7 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( + postgresql, ?PGSQL_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_postgresql, @@ -198,9 +199,9 @@ test_user_auth(#{ t_authenticate_disabled_prepared_statements(_Config) -> ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}), - {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig), + {ok, _} = emqx_resource:recreate_local(postgresql, ?PGSQL_RESOURCE, emqx_postgresql, ResConfig), on_exit(fun() -> - emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config()) + emqx_resource:recreate_local(postgresql, ?PGSQL_RESOURCE, emqx_postgresql, pgsql_config()) end), ok = lists:foreach( fun(Sample0) -> diff --git a/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl index 78b1e17a8..9346bf863 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl @@ -437,6 +437,7 @@ pgsql_config() -> create_pgsql_resource() -> emqx_resource:create_local( + postgresql, ?PGSQL_RESOURCE, ?AUTHZ_RESOURCE_GROUP, emqx_postgresql, diff --git a/apps/emqx_auth_redis/src/emqx_authn_redis.erl b/apps/emqx_auth_redis/src/emqx_authn_redis.erl index 779c58e39..4eb5a36a3 100644 --- a/apps/emqx_auth_redis/src/emqx_authn_redis.erl +++ b/apps/emqx_auth_redis/src/emqx_authn_redis.erl @@ -42,6 +42,7 @@ create(Config0) -> Res; {Config, State} -> {ok, _Data} = emqx_authn_utils:create_resource( + redis, ResourceId, emqx_redis, Config @@ -51,7 +52,7 @@ create(Config0) -> update(Config0, #{resource_id := ResourceId} = _State) -> {Config, NState} = parse_config(Config0), - case emqx_authn_utils:update_resource(emqx_redis, Config, ResourceId) of + case emqx_authn_utils:update_resource(redis, emqx_redis, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_redis/src/emqx_authz_redis.erl b/apps/emqx_auth_redis/src/emqx_authz_redis.erl index 8ce975033..b83d1abad 100644 --- a/apps/emqx_auth_redis/src/emqx_authz_redis.erl +++ b/apps/emqx_auth_redis/src/emqx_authz_redis.erl @@ -50,12 +50,12 @@ description() -> create(#{cmd := CmdStr} = Source) -> CmdTemplate = parse_cmd(CmdStr), ResourceId = emqx_authz_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_redis, Source), + {ok, _Data} = emqx_authz_utils:create_resource(redis, ResourceId, emqx_redis, Source), Source#{annotations => #{id => ResourceId}, cmd_template => CmdTemplate}. update(#{cmd := CmdStr} = Source) -> CmdTemplate = parse_cmd(CmdStr), - case emqx_authz_utils:update_resource(emqx_redis, Source) of + case emqx_authz_utils:update_resource(redis, emqx_redis, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> diff --git a/apps/emqx_auth_redis/test/emqx_authn_redis_SUITE.erl b/apps/emqx_auth_redis/test/emqx_authn_redis_SUITE.erl index e8c8760de..1e9d825d2 100644 --- a/apps/emqx_auth_redis/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_auth_redis/test/emqx_authn_redis_SUITE.erl @@ -63,6 +63,7 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( + redis, ?REDIS_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_redis, diff --git a/apps/emqx_auth_redis/test/emqx_authz_redis_SUITE.erl b/apps/emqx_auth_redis/test/emqx_authz_redis_SUITE.erl index 5818eea07..d0c695c73 100644 --- a/apps/emqx_auth_redis/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_auth_redis/test/emqx_authz_redis_SUITE.erl @@ -384,6 +384,7 @@ stop_apps(Apps) -> create_redis_resource() -> {ok, _} = emqx_resource:create_local( + redis, ?REDIS_RESOURCE, ?AUTHZ_RESOURCE_GROUP, emqx_redis, diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index d2408ca73..7e9f5300a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -195,8 +195,9 @@ create(Type, Name, Conf0, Opts) -> TypeBin = bin(Type), Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, {ok, _Data} = emqx_resource:create_local( + Type, resource_id(Type, Name), - <<"emqx_bridge">>, + <<"bridge">>, bridge_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) @@ -264,6 +265,7 @@ recreate(Type, Name, Conf0, Opts) -> TypeBin = bin(Type), Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, emqx_resource:recreate_local( + Type, resource_id(Type, Name), bridge_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), @@ -300,7 +302,7 @@ create_dry_run_bridge_v1(Type, Conf0) -> {error, Reason}; {ok, ConfNew} -> ParseConf = parse_confs(TypeBin, TmpName, ConfNew), - emqx_resource:create_dry_run_local(bridge_to_resource_type(Type), ParseConf) + emqx_resource:create_dry_run_local(Type, bridge_to_resource_type(Type), ParseConf) end catch %% validation errors diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl index b784d36c0..02c102832 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl @@ -212,6 +212,7 @@ check_config(Config) -> create_local_resource(ResourceId, CheckedConfig) -> {ok, Bridge} = emqx_resource:create_local( + cassandra, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?CASSANDRA_RESOURCE_MOD, diff --git a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl index 1c83961a5..0b0acd78a 100644 --- a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl +++ b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl @@ -109,6 +109,7 @@ t_start_passfile(Config) -> ?assertMatch( {ok, #{status := connected}}, emqx_resource:create_local( + clickhouse, ResourceID, ?CONNECTOR_RESOURCE_GROUP, ?CLICKHOUSE_RESOURCE_MOD, @@ -138,6 +139,7 @@ perform_lifecycle_check(ResourceID, InitialConfig) -> status := InitialStatus }} = emqx_resource:create_local( + clickhouse, ResourceID, ?CONNECTOR_RESOURCE_GROUP, ?CLICKHOUSE_RESOURCE_MOD, diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl index be36cb167..68a32e9c2 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl @@ -83,6 +83,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := #{client := #{pool := ReturnedPoolName}} = State, status := InitialStatus }} = emqx_resource:create_local( + greptimedb, PoolName, ?CONNECTOR_RESOURCE_GROUP, ?GREPTIMEDB_RESOURCE_MOD, diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl index 0ca693171..a7f78f253 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl @@ -86,6 +86,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := #{client := #{pool := ReturnedPoolName}} = State, status := InitialStatus }} = emqx_resource:create_local( + influxdb, PoolName, ?CONNECTOR_RESOURCE_GROUP, ?INFLUXDB_RESOURCE_MOD, @@ -197,6 +198,7 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) -> config := #{ssl := #{enable := SslEnabled}}, status := Status }} = emqx_resource:create_local( + influxdb, PoolName, ?CONNECTOR_RESOURCE_GROUP, ?INFLUXDB_RESOURCE_MOD, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 4157deec2..835536bda 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -289,7 +289,7 @@ replayq_dir(ClientId) -> filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]). producer_name(InstanceId, ChannelId) -> - case is_dry_run(InstanceId) of + case emqx_resource:is_dry_run(InstanceId) of %% do not create more atom true -> pulsar_producer_probe_worker; diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl index 77482ae0f..fe288a185 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl @@ -135,6 +135,7 @@ check_config(Config) -> create_local_resource(ResourceID, CheckedConfig) -> {ok, Bridge} = emqx_resource:create_local( + rabbitmq, ResourceID, ?CONNECTOR_RESOURCE_GROUP, emqx_bridge_rabbitmq_connector, diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl index 4b29dd5ce..0004cd72c 100644 --- a/apps/emqx_connector/include/emqx_connector.hrl +++ b/apps/emqx_connector/include/emqx_connector.hrl @@ -37,4 +37,4 @@ "The " ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if `[:Port]` is not specified." ). --define(CONNECTOR_RESOURCE_GROUP, <<"emqx_connector">>). +-define(CONNECTOR_RESOURCE_GROUP, <<"connector">>). diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index be8d3a32d..2a5b3bcfc 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -18,6 +18,7 @@ -include("../../emqx_bridge/include/emqx_bridge_resource.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include("emqx_connector.hrl"). -export([ connector_to_resource_type/1, @@ -125,8 +126,9 @@ create(Type, Name, Conf0, Opts) -> ResourceId = resource_id(Type, Name), Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, {ok, _Data} = emqx_resource:create_local( + Type, ResourceId, - <<"emqx_connector">>, + ?CONNECTOR_RESOURCE_GROUP, ?MODULE:connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) @@ -198,6 +200,7 @@ recreate(Type, Name, Conf) -> recreate(Type, Name, Conf, Opts) -> TypeBin = bin(Type), emqx_resource:recreate_local( + Type, resource_id(Type, Name), ?MODULE:connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), @@ -234,7 +237,7 @@ create_dry_run(Type, Conf0, Callback) -> {ok, ConfNew} -> ParseConf = parse_confs(bin(Type), TmpName, ConfNew), emqx_resource:create_dry_run_local( - TmpName, ?MODULE:connector_to_resource_type(Type), ParseConf, Callback + Type, TmpName, ?MODULE:connector_to_resource_type(Type), ParseConf, Callback ) end catch diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl index 6834da9e9..6ac02efc6 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl @@ -45,7 +45,7 @@ -define(MOD_TAB, emqx_dashboard_sso). -define(MOD_KEY_PATH, [dashboard, sso]). -define(MOD_KEY_PATH(Sub), [dashboard, sso, Sub]). --define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). +-define(RESOURCE_GROUP, <<"dashboard_sso">>). -define(NO_ERROR, <<>>). -define(DEFAULT_RESOURCE_OPTS, #{ start_after_created => false @@ -132,6 +132,7 @@ make_resource_id(Backend) -> create_resource(ResourceId, Module, Config) -> Result = emqx_resource:create_local( + dashboard_sso, ResourceId, ?RESOURCE_GROUP, Module, @@ -142,7 +143,7 @@ create_resource(ResourceId, Module, Config) -> update_resource(ResourceId, Module, Config) -> Result = emqx_resource:recreate_local( - ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS + dashboard_sso, ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS ), start_resource_if_enabled(ResourceId, Result, Config). diff --git a/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl b/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl index 51524f0fd..40c5de9e5 100644 --- a/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl +++ b/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl @@ -24,7 +24,7 @@ -define(MOD_TAB, emqx_dashboard_sso). -define(MOD_KEY_PATH, [dashboard, sso, ldap]). --define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). +-define(RESOURCE_GROUP, <<"dashboard_sso">>). -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1, request_api/3]). diff --git a/apps/emqx_ldap/test/emqx_ldap_SUITE.erl b/apps/emqx_ldap/test/emqx_ldap_SUITE.erl index a15ff2775..413cbc3a5 100644 --- a/apps/emqx_ldap/test/emqx_ldap_SUITE.erl +++ b/apps/emqx_ldap/test/emqx_ldap_SUITE.erl @@ -96,6 +96,7 @@ perform_lifecycle_check(ResourceId, InitialConfig) -> state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( + ldap, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?LDAP_RESOURCE_MOD, @@ -171,6 +172,7 @@ t_get_status(Config) -> ?LDAP_RESOURCE_MOD, ldap_config(Config) ), {ok, _} = emqx_resource:create_local( + ldap, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?LDAP_RESOURCE_MOD, diff --git a/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl b/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl index 8af05e0d3..850683d99 100644 --- a/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl +++ b/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl @@ -143,6 +143,7 @@ check_config(Config) -> create_local_resource(ResourceId, CheckedConfig) -> {ok, Bridge} = emqx_resource:create_local( + mongodb, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?MONGO_RESOURCE_MOD, diff --git a/apps/emqx_mysql/test/emqx_mysql_SUITE.erl b/apps/emqx_mysql/test/emqx_mysql_SUITE.erl index 03e6c6797..be69140fc 100644 --- a/apps/emqx_mysql/test/emqx_mysql_SUITE.erl +++ b/apps/emqx_mysql/test/emqx_mysql_SUITE.erl @@ -67,6 +67,7 @@ perform_lifecycle_check(ResourceId, InitialConfig) -> state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( + mysql, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?MYSQL_RESOURCE_MOD, diff --git a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl index d771d80d8..06210be86 100644 --- a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl +++ b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl @@ -75,6 +75,7 @@ perform_lifecycle_check(ResourceId, InitialConfig) -> status := InitialStatus }} = emqx_resource:create_local( + postgresql, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?PGSQL_RESOURCE_MOD, diff --git a/apps/emqx_redis/test/emqx_redis_SUITE.erl b/apps/emqx_redis/test/emqx_redis_SUITE.erl index a9064f184..06ac82143 100644 --- a/apps/emqx_redis/test/emqx_redis_SUITE.erl +++ b/apps/emqx_redis/test/emqx_redis_SUITE.erl @@ -115,6 +115,7 @@ perform_lifecycle_check(ResourceId, InitialConfig, RedisCommand) -> state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( + redis, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?REDIS_RESOURCE_MOD, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 587786cb2..aa86fe239 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -23,6 +23,7 @@ %% remind us of that. -define(rm_status_stopped, stopped). +-type type() :: atom() | binary(). -type resource_type() :: module(). -type resource_id() :: binary(). -type channel_id() :: binary(). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b6f01fde5..71ccaa696 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -28,9 +28,9 @@ -export([ check_config/2, - check_and_create_local/4, check_and_create_local/5, - check_and_recreate_local/4 + check_and_create_local/6, + check_and_recreate_local/5 ]). %% Sync resource instances and files @@ -39,13 +39,13 @@ -export([ %% store the config and start the instance - create_local/4, create_local/5, - create_dry_run_local/2, + create_local/6, create_dry_run_local/3, create_dry_run_local/4, - recreate_local/3, + create_dry_run_local/5, recreate_local/4, + recreate_local/5, %% remove the config and stop the instance remove_local/1, reset_metrics/1, @@ -282,12 +282,13 @@ is_resource_mod(Module) -> %% APIs for resource instances %% ================================================================================= --spec create_local(resource_id(), resource_group(), resource_type(), resource_config()) -> +-spec create_local(type(), resource_id(), resource_group(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create_local(ResId, Group, ResourceType, Config) -> - create_local(ResId, Group, ResourceType, Config, #{}). +create_local(Type, ResId, Group, ResourceType, Config) -> + create_local(Type, ResId, Group, ResourceType, Config, #{}). -spec create_local( + type(), resource_id(), resource_group(), resource_type(), @@ -295,33 +296,39 @@ create_local(ResId, Group, ResourceType, Config) -> creation_opts() ) -> {ok, resource_data()}. -create_local(ResId, Group, ResourceType, Config, Opts) -> - emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts). +create_local(Type, ResId, Group, ResourceType, Config, Opts) -> + emqx_resource_manager:ensure_resource(Type, ResId, Group, ResourceType, Config, Opts). --spec create_dry_run_local(resource_type(), resource_config()) -> +-spec create_dry_run_local(type(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. -create_dry_run_local(ResourceType, Config) -> - emqx_resource_manager:create_dry_run(ResourceType, Config). +create_dry_run_local(Type, ResourceType, Config) -> + emqx_resource_manager:create_dry_run(Type, ResourceType, Config). -create_dry_run_local(ResId, ResourceType, Config) -> - emqx_resource_manager:create_dry_run(ResId, ResourceType, Config). +create_dry_run_local(Type, ResId, ResourceType, Config) -> + emqx_resource_manager:create_dry_run(Type, ResId, ResourceType, Config). --spec create_dry_run_local(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> +-spec create_dry_run_local( + type(), + resource_id(), + resource_type(), + resource_config(), + OnReadyCallback +) -> ok | {error, Reason :: term()} when OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). -create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) -> - emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback). +create_dry_run_local(Type, ResId, ResourceType, Config, OnReadyCallback) -> + emqx_resource_manager:create_dry_run(Type, ResId, ResourceType, Config, OnReadyCallback). --spec recreate_local(resource_id(), resource_type(), resource_config()) -> +-spec recreate_local(type(), resource_id(), resource_type(), resource_config()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(ResId, ResourceType, Config) -> - recreate_local(ResId, ResourceType, Config, #{}). +recreate_local(Type, ResId, ResourceType, Config) -> + recreate_local(Type, ResId, ResourceType, Config, #{}). --spec recreate_local(resource_id(), resource_type(), resource_config(), creation_opts()) -> +-spec recreate_local(type(), resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(ResId, ResourceType, Config, Opts) -> - emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). +recreate_local(Type, ResId, ResourceType, Config, Opts) -> + emqx_resource_manager:recreate(Type, ResId, ResourceType, Config, Opts). -spec remove_local(resource_id()) -> ok. remove_local(ResId) -> @@ -607,41 +614,44 @@ check_config(ResourceType, Conf) -> emqx_hocon:check(ResourceType, Conf). -spec check_and_create_local( + type(), resource_id(), resource_group(), resource_type(), raw_resource_config() ) -> {ok, resource_data()} | {error, term()}. -check_and_create_local(ResId, Group, ResourceType, RawConfig) -> - check_and_create_local(ResId, Group, ResourceType, RawConfig, #{}). +check_and_create_local(Type, ResId, Group, ResourceType, RawConfig) -> + check_and_create_local(Type, ResId, Group, ResourceType, RawConfig, #{}). -spec check_and_create_local( + type(), resource_id(), resource_group(), resource_type(), raw_resource_config(), creation_opts() ) -> {ok, resource_data()} | {error, term()}. -check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> +check_and_create_local(Type, ResId, Group, ResourceType, RawConfig, Opts) -> check_and_do( ResourceType, RawConfig, - fun(ResConf) -> create_local(ResId, Group, ResourceType, ResConf, Opts) end + fun(ResConf) -> create_local(Type, ResId, Group, ResourceType, ResConf, Opts) end ). -spec check_and_recreate_local( + type(), resource_id(), resource_type(), raw_resource_config(), creation_opts() ) -> {ok, resource_data()} | {error, term()}. -check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) -> +check_and_recreate_local(Type, ResId, ResourceType, RawConfig, Opts) -> check_and_do( ResourceType, RawConfig, - fun(ResConf) -> recreate_local(ResId, ResourceType, ResConf, Opts) end + fun(ResConf) -> recreate_local(Type, ResId, ResourceType, ResConf, Opts) end ). check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 95b1271f4..1c0b74edd 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -25,12 +25,12 @@ % API -export([ - ensure_resource/5, - recreate/4, + ensure_resource/6, + recreate/5, remove/1, - create_dry_run/2, create_dry_run/3, create_dry_run/4, + create_dry_run/5, restart/2, start/2, stop/1, @@ -59,7 +59,7 @@ ]). % Server --export([start_link/5]). +-export([start_link/6]). % Behaviour -export([init/1, callback_mode/0, handle_event/4, terminate/3]). @@ -75,6 +75,7 @@ -record(data, { id, group, + type, mod, callback_mode, query_mode, @@ -161,43 +162,44 @@ %% Triggers the emqx_resource_manager_sup supervisor to actually create %% and link the process itself if not already started. -spec ensure_resource( + type(), resource_id(), resource_group(), resource_type(), resource_config(), creation_opts() ) -> {ok, resource_data()}. -ensure_resource(ResId, Group, ResourceType, Config, Opts) -> +ensure_resource(Type, ResId, Group, ResourceType, Config, Opts) -> case lookup(ResId) of {ok, _Group, Data} -> {ok, Data}; {error, not_found} -> - create_and_return_data(ResId, Group, ResourceType, Config, Opts) + create_and_return_data(Type, ResId, Group, ResourceType, Config, Opts) end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist --spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> +-spec recreate(type(), resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. -recreate(ResId, ResourceType, NewConfig, Opts) -> +recreate(Type, ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of {ok, Group, #{mod := ResourceType, status := _} = _Data} -> _ = remove(ResId, false), - create_and_return_data(ResId, Group, ResourceType, NewConfig, Opts); + create_and_return_data(Type, ResId, Group, ResourceType, NewConfig, Opts); {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> {error, not_found} end. -create_and_return_data(ResId, Group, ResourceType, Config, Opts) -> - _ = create(ResId, Group, ResourceType, Config, Opts), +create_and_return_data(Type, ResId, Group, ResourceType, Config, Opts) -> + _ = create(Type, ResId, Group, ResourceType, Config, Opts), {ok, _Group, Data} = lookup(ResId), {ok, Data}. %% @doc Create a resource_manager and wait until it is running -create(ResId, Group, ResourceType, Config, Opts) -> +create(Type, ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init - ok = emqx_resource_manager_sup:ensure_child(ResId, Group, ResourceType, Config, Opts), + ok = emqx_resource_manager_sup:ensure_child(Type, ResId, Group, ResourceType, Config, Opts), % Create metrics for the resource ok = emqx_resource:create_metrics(ResId), QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts), @@ -219,30 +221,32 @@ create(ResId, Group, ResourceType, Config, Opts) -> %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% %% Triggers the `emqx_resource_manager_sup` supervisor to actually create -%% and link the process itself if not already started, and then immedately stops. --spec create_dry_run(resource_type(), resource_config()) -> +%% and link the process itself if not already started, and then immediately stops. +-spec create_dry_run(type(), resource_type(), resource_config()) -> ok | {error, Reason :: term()}. -create_dry_run(ResourceType, Config) -> +create_dry_run(Type, ResourceType, Config) -> ResId = make_test_id(), - create_dry_run(ResId, ResourceType, Config). + create_dry_run(Type, ResId, ResourceType, Config). -create_dry_run(ResId, ResourceType, Config) -> - create_dry_run(ResId, ResourceType, Config, fun do_nothing_on_ready/1). +create_dry_run(Type, ResId, ResourceType, Config) -> + create_dry_run(Type, ResId, ResourceType, Config, fun do_nothing_on_ready/1). do_nothing_on_ready(_ResId) -> ok. --spec create_dry_run(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> +-spec create_dry_run(type(), resource_id(), resource_type(), resource_config(), OnReadyCallback) -> ok | {error, Reason :: term()} when OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). -create_dry_run(ResId, ResourceType, Config, OnReadyCallback) -> +create_dry_run(Type, ResId, ResourceType, Config, OnReadyCallback) -> Opts = case is_map(Config) of true -> maps:get(resource_opts, Config, #{}); false -> #{} end, - ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts), + ok = emqx_resource_manager_sup:ensure_child( + Type, ResId, <<"dry_run">>, ResourceType, Config, Opts + ), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000), case wait_for_ready(ResId, Timeout) of @@ -491,7 +495,7 @@ try_clean_allocated_resources(ResId) -> %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server -start_link(ResId, Group, ResourceType, Config, Opts) -> +start_link(Type, ResId, Group, ResourceType, Config, Opts) -> QueryMode = emqx_resource:query_mode( ResourceType, Config, @@ -499,6 +503,7 @@ start_link(ResId, Group, ResourceType, Config, Opts) -> ), Data = #data{ id = ResId, + type = Type, group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), @@ -683,6 +688,9 @@ handle_event(EventType, EventData, State, Data) -> error, #{ msg => "ignore_all_other_events", + resource_id => Data#data.id, + group => Data#data.group, + type => Data#data.type, event_type => EventType, event_data => EventData, state => State, @@ -752,8 +760,8 @@ handle_remove_event(From, ClearMetrics, Data) -> start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - ResId = Data#data.id, - case emqx_resource:call_start(ResId, Data#data.mod, Data#data.config) of + #data{id = ResId, mod = Mod, config = Config, group = Group, type = Type} = Data, + case emqx_resource:call_start(ResId, Mod, Config) of {ok, ResourceState} -> UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState}, %% Perform an initial health_check immediately before transitioning into a connected state @@ -764,7 +772,9 @@ start_resource(Data, From) -> IsDryRun = emqx_resource:is_dry_run(ResId), ?SLOG(log_level(IsDryRun), #{ msg => "start_resource_failed", - id => ResId, + resource_id => ResId, + group => Group, + type => Type, reason => Reason }), _ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error), @@ -798,14 +808,20 @@ add_channels(Data) -> add_channels_in_list([], Data) -> Data; add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> - Id = Data#data.id, + #data{ + id = ResId, + mod = Mod, + state = State, + added_channels = AddedChannelsMap, + group = Group, + type = Type + } = Data, case emqx_resource:call_add_channel( - Id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig + ResId, Mod, State, ChannelID, ChannelConfig ) of {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, %% Set the channel status to connecting to indicate that %% we have not yet performed the initial health_check NewAddedChannelsMap = maps:put( @@ -819,10 +835,12 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> }, add_channels_in_list(Rest, NewData); {error, Reason} = Error -> - IsDryRun = emqx_resource:is_dry_run(Id), + IsDryRun = emqx_resource:is_dry_run(ResId), ?SLOG(log_level(IsDryRun), #{ msg => "add_channel_failed", - id => Id, + resource_id => ResId, + type => Type, + group => Group, channel_id => ChannelID, reason => Reason }), @@ -872,9 +890,15 @@ remove_channels(Data) -> remove_channels_in_list([], Data, _KeepInChannelMap) -> Data; remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> - AddedChannelsMap = Data#data.added_channels, - Id = Data#data.id, - IsDryRun = emqx_resource:is_dry_run(Id), + #data{ + id = ResId, + added_channels = AddedChannelsMap, + mod = Mod, + state = State, + group = Group, + type = Type + } = Data, + IsDryRun = emqx_resource:is_dry_run(ResId), NewAddedChannelsMap = case KeepInChannelMap of true -> @@ -883,7 +907,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> _ = maybe_clear_alarm(IsDryRun, ChannelID), maps:remove(ChannelID, AddedChannelsMap) end, - case safe_call_remove_channel(Id, Data#data.mod, Data#data.state, ChannelID) of + case safe_call_remove_channel(ResId, Mod, State, ChannelID) of {ok, NewState} -> NewData = Data#data{ state = NewState, @@ -893,7 +917,9 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> {error, Reason} -> ?SLOG(log_level(IsDryRun), #{ msg => "remove_channel_failed", - id => Id, + resource_id => ResId, + group => Group, + type => Type, channel_id => ChannelID, reason => Reason }), @@ -997,7 +1023,12 @@ handle_remove_channel(From, ChannelId, Data) -> end. handle_remove_channel_exists(From, ChannelId, Data) -> - #data{id = Id, added_channels = AddedChannelsMap} = Data, + #data{ + id = Id, + group = Group, + type = Type, + added_channels = AddedChannelsMap + } = Data, case emqx_resource:call_remove_channel( Id, Data#data.mod, Data#data.state, ChannelId @@ -1014,7 +1045,9 @@ handle_remove_channel_exists(From, ChannelId, Data) -> IsDryRun = emqx_resource:is_dry_run(Id), ?SLOG(log_level(IsDryRun), #{ msg => "remove_channel_failed", - id => Id, + resource_id => Id, + group => Group, + type => Type, channel_id => ChannelId, reason => Reason }), @@ -1123,10 +1156,13 @@ continue_resource_health_check_connected(NewStatus, Data0) -> Actions = Replies ++ resource_health_check_actions(Data), {keep_state, Data, Actions}; _ -> - IsDryRun = emqx_resource:is_dry_run(Data0#data.id), + #data{id = ResId, group = Group, type = Type} = Data0, + IsDryRun = emqx_resource:is_dry_run(ResId), ?SLOG(log_level(IsDryRun), #{ msg => "health_check_failed", - id => Data0#data.id, + resource_id => ResId, + group => Group, + type => Type, status => NewStatus }), %% Note: works because, coincidentally, channel/resource status is a @@ -1633,6 +1669,8 @@ parse_health_check_result({error, Error}, Data) -> #{ msg => "health_check_exception", resource_id => Data#data.id, + type => Data#data.type, + group => Data#data.group, reason => Error } ), diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 7af6eca81..c14b08f94 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -19,14 +19,16 @@ -include("emqx_resource.hrl"). --export([ensure_child/5, delete_child/1]). +-export([ensure_child/6, delete_child/1]). -export([start_link/0]). -export([init/1]). -ensure_child(ResId, Group, ResourceType, Config, Opts) -> - case supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)) of +ensure_child(Type, ResId, Group, ResourceType, Config, Opts) -> + case + supervisor:start_child(?MODULE, child_spec(Type, ResId, Group, ResourceType, Config, Opts)) + of {error, Reason} -> %% This should not happen in production but it can be a huge time sink in %% development environments if the error is just silently ignored. @@ -55,13 +57,14 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, ChildSpecs}}. -child_spec(ResId, Group, ResourceType, Config, Opts) -> +child_spec(Type, ResId, Group, ResourceType, Config, Opts) -> #{ id => ResId, - start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, + start => + {emqx_resource_manager, start_link, [Type, ResId, Group, ResourceType, Config, Opts]}, restart => transient, %% never force kill a resource manager. - %% becasue otherwise it may lead to release leak, + %% because otherwise it may lead to release leak, %% resource_manager's terminate callback calls resource on_stop shutdown => infinity, type => worker, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index af9abe95b..981c88edd 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -23,6 +23,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(TEST_RESOURCE, emqx_connector_demo). +-define(TYPE, test). -define(ID, <<"id">>). -define(ID1, <<"id1">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>). @@ -90,6 +91,7 @@ t_create_remove(_) -> ?assertMatch( {error, _}, emqx_resource:check_and_create_local( + ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -110,6 +112,7 @@ t_create_remove(_) -> ?assertMatch( {ok, _}, emqx_resource:recreate_local( + ?TYPE, ?ID, ?TEST_RESOURCE, #{name => test_resource}, @@ -135,6 +138,7 @@ t_create_remove_local(_) -> ?assertMatch( {error, _}, emqx_resource:check_and_create_local( + ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -153,6 +157,7 @@ t_create_remove_local(_) -> ), emqx_resource:recreate_local( + ?TYPE, ?ID, ?TEST_RESOURCE, #{name => test_resource}, @@ -166,6 +171,7 @@ t_create_remove_local(_) -> emqx_resource:set_resource_status_connecting(?ID), emqx_resource:recreate_local( + ?TYPE, ?ID, ?TEST_RESOURCE, #{name => test_resource}, @@ -937,6 +943,7 @@ t_stop_start(_) -> ?assertMatch( {error, _}, emqx_resource:check_and_create_local( + ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -947,6 +954,7 @@ t_stop_start(_) -> ?assertMatch( {ok, _}, emqx_resource:check_and_create_local( + ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -964,6 +972,7 @@ t_stop_start(_) -> ?assertMatch( {ok, _}, emqx_resource:check_and_recreate_local( + ?TYPE, ?ID, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>}, @@ -1013,6 +1022,7 @@ t_stop_start_local(_) -> ?assertMatch( {error, _}, emqx_resource:check_and_create_local( + ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -1023,6 +1033,7 @@ t_stop_start_local(_) -> ?assertMatch( {ok, _}, emqx_resource:check_and_create_local( + ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -1033,6 +1044,7 @@ t_stop_start_local(_) -> ?assertMatch( {ok, _}, emqx_resource:check_and_recreate_local( + ?TYPE, ?ID, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>}, @@ -1108,6 +1120,7 @@ create_dry_run_local_succ() -> ?assertEqual( ok, emqx_resource:create_dry_run_local( + test, ?TEST_RESOURCE, #{name => test_resource, register => true} ) @@ -1118,6 +1131,7 @@ t_create_dry_run_local_failed(_) -> ct:timetrap({seconds, 120}), ct:pal("creating with creation error"), Res1 = emqx_resource:create_dry_run_local( + test, ?TEST_RESOURCE, #{create_error => true} ), @@ -1125,6 +1139,7 @@ t_create_dry_run_local_failed(_) -> ct:pal("creating with health check error"), Res2 = emqx_resource:create_dry_run_local( + test, ?TEST_RESOURCE, #{name => test_resource, health_check_error => true} ), @@ -1132,6 +1147,7 @@ t_create_dry_run_local_failed(_) -> ct:pal("creating with stop error"), Res3 = emqx_resource:create_dry_run_local( + test, ?TEST_RESOURCE, #{name => test_resource, stop_error => true} ), @@ -3490,10 +3506,10 @@ gauge_metric_set_fns() -> ]. create(Id, Group, Type, Config) -> - emqx_resource:create_local(Id, Group, Type, Config). + emqx_resource:create_local(test, Id, Group, Type, Config). create(Id, Group, Type, Config, Opts) -> - emqx_resource:create_local(Id, Group, Type, Config, Opts). + emqx_resource:create_local(test, Id, Group, Type, Config, Opts). log_consistency_prop() -> {"check state and cache consistency", fun ?MODULE:log_consistency_prop/1}.