From e7d07ea17c2e0d47b109af22bf0f85dbed05a307 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Tue, 23 Jul 2024 16:15:57 +0800 Subject: [PATCH] feat: add resource_type to emqx_resource behaviour --- .../src/emqx_authn/emqx_authn_utils.erl | 11 ++- .../src/emqx_authz/emqx_authz_utils.erl | 14 ++- apps/emqx_auth_http/src/emqx_authn_http.erl | 5 +- apps/emqx_auth_http/src/emqx_authz_http.erl | 6 +- .../src/emqx_authn_jwks_connector.erl | 3 + apps/emqx_auth_jwt/src/emqx_authn_jwt.erl | 1 - apps/emqx_auth_ldap/src/emqx_authn_ldap.erl | 4 +- apps/emqx_auth_ldap/src/emqx_authz_ldap.erl | 4 +- .../test/emqx_authn_ldap_SUITE.erl | 1 - .../test/emqx_authn_ldap_bind_SUITE.erl | 1 - .../test/emqx_authz_ldap_SUITE.erl | 1 - .../src/emqx_authn_mongodb.erl | 3 +- .../src/emqx_authz_mongodb.erl | 4 +- apps/emqx_auth_mysql/src/emqx_authn_mysql.erl | 4 +- apps/emqx_auth_mysql/src/emqx_authz_mysql.erl | 4 +- .../test/emqx_authn_mysql_SUITE.erl | 1 - .../test/emqx_authz_mysql_SUITE.erl | 1 - .../src/emqx_authn_postgresql.erl | 3 +- .../src/emqx_authz_postgresql.erl | 2 - .../test/emqx_authn_postgresql_SUITE.erl | 9 +- .../test/emqx_authz_postgresql_SUITE.erl | 1 - apps/emqx_auth_redis/src/emqx_authn_redis.erl | 3 +- apps/emqx_auth_redis/src/emqx_authz_redis.erl | 4 +- .../test/emqx_authn_redis_SUITE.erl | 1 - .../test/emqx_authz_redis_SUITE.erl | 1 - apps/emqx_bridge/src/emqx_bridge_resource.erl | 6 +- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 1 + .../test/emqx_bridge_v2_api_SUITE.erl | 1 + .../test/emqx_bridge_v2_dummy_connector.erl | 4 +- .../test/emqx_bridge_v2_test_connector.erl | 3 + .../src/emqx_bridge_cassandra.app.src | 2 +- .../src/emqx_bridge_cassandra_connector.erl | 2 + .../emqx_bridge_cassandra_connector_SUITE.erl | 1 - .../src/emqx_bridge_clickhouse.app.src | 2 +- .../src/emqx_bridge_clickhouse_connector.erl | 2 + ...emqx_bridge_clickhouse_connector_SUITE.erl | 2 - .../src/emqx_bridge_dynamo.app.src | 2 +- .../src/emqx_bridge_dynamo_connector.erl | 2 + .../emqx_bridge_es/src/emqx_bridge_es.app.src | 2 +- .../src/emqx_bridge_es_connector.erl | 3 + .../src/emqx_bridge_gcp_pubsub.app.src | 2 +- .../emqx_bridge_gcp_pubsub_impl_consumer.erl | 3 + .../emqx_bridge_gcp_pubsub_impl_producer.erl | 2 + .../src/emqx_bridge_greptimedb_connector.erl | 3 + ...emqx_bridge_greptimedb_connector_SUITE.erl | 1 - .../src/emqx_bridge_hstreamdb.app.src | 2 +- .../src/emqx_bridge_hstreamdb_connector.erl | 3 + .../src/emqx_bridge_http_connector.erl | 2 + .../src/emqx_bridge_influxdb.app.src | 2 +- .../src/emqx_bridge_influxdb_connector.erl | 3 + .../emqx_bridge_influxdb_connector_SUITE.erl | 2 - .../src/emqx_bridge_iotdb.app.src | 2 +- .../src/emqx_bridge_iotdb_connector.erl | 3 + .../src/emqx_bridge_kafka_impl_consumer.erl | 2 + .../src/emqx_bridge_kafka_impl_producer.erl | 3 + .../src/emqx_bridge_kinesis.app.src | 2 +- .../src/emqx_bridge_kinesis_impl_producer.erl | 2 + .../src/emqx_bridge_mongodb.app.src | 2 +- .../src/emqx_bridge_mongodb_connector.erl | 2 + .../src/emqx_bridge_mqtt_connector.erl | 3 + .../src/emqx_bridge_mysql.app.src | 2 +- .../src/emqx_bridge_mysql_connector.erl | 2 + .../src/emqx_bridge_opents.app.src | 2 +- .../src/emqx_bridge_opents_connector.erl | 3 + .../src/emqx_bridge_pulsar_connector.erl | 2 + .../src/emqx_bridge_rabbitmq.app.src | 2 +- .../src/emqx_bridge_rabbitmq_connector.erl | 2 + .../emqx_bridge_rabbitmq_connector_SUITE.erl | 1 - .../src/emqx_bridge_redis.app.src | 2 +- .../src/emqx_bridge_redis_connector.erl | 5 +- .../src/emqx_bridge_rocketmq.app.src | 2 +- .../src/emqx_bridge_rocketmq_connector.erl | 3 + .../src/emqx_bridge_s3_connector.erl | 3 + .../src/emqx_bridge_sqlserver.app.src | 2 +- .../src/emqx_bridge_sqlserver_connector.erl | 2 + .../src/emqx_bridge_syskeeper.app.src | 2 +- .../src/emqx_bridge_syskeeper_connector.erl | 2 + .../emqx_bridge_syskeeper_proxy_server.erl | 3 + .../src/emqx_bridge_tdengine.app.src | 2 +- .../src/emqx_bridge_tdengine_connector.erl | 2 + .../src/emqx_connector_resource.erl | 6 +- .../test/emqx_connector_SUITE.erl | 5 ++ .../test/emqx_connector_api_SUITE.erl | 1 + .../test/emqx_connector_dummy_impl.erl | 3 + .../src/emqx_dashboard_sso_manager.erl | 3 +- apps/emqx_ldap/src/emqx_ldap.app.src | 2 +- apps/emqx_ldap/src/emqx_ldap.erl | 3 + apps/emqx_ldap/test/emqx_ldap_SUITE.erl | 2 - apps/emqx_mongodb/src/emqx_mongodb.app.src | 2 +- apps/emqx_mongodb/src/emqx_mongodb.erl | 2 + apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl | 1 - apps/emqx_mysql/src/emqx_mysql.app.src | 2 +- apps/emqx_mysql/src/emqx_mysql.erl | 3 + apps/emqx_mysql/test/emqx_mysql_SUITE.erl | 1 - apps/emqx_oracle/src/emqx_oracle.app.src | 2 +- apps/emqx_oracle/src/emqx_oracle.erl | 3 + .../src/emqx_postgresql.app.src | 2 +- apps/emqx_postgresql/src/emqx_postgresql.erl | 3 + .../test/emqx_postgresql_SUITE.erl | 1 - apps/emqx_redis/src/emqx_redis.erl | 3 + apps/emqx_redis/test/emqx_redis_SUITE.erl | 1 - apps/emqx_resource/include/emqx_resource.hrl | 4 +- apps/emqx_resource/src/emqx_resource.erl | 88 ++++++++----------- .../src/emqx_resource_manager.erl | 53 +++++------ .../src/emqx_resource_manager_sup.erl | 12 ++- .../src/proto/emqx_resource_proto_v1.erl | 6 +- .../test/emqx_connector_demo.erl | 3 + .../test/emqx_resource_SUITE.erl | 20 +---- .../test/emqx_rule_engine_test_connector.erl | 3 + 109 files changed, 248 insertions(+), 211 deletions(-) diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl index e81145f2c..a08ac260c 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_utils.erl @@ -21,8 +21,8 @@ -include_lib("snabbkaffe/include/trace.hrl"). -export([ - create_resource/4, - update_resource/4, + create_resource/3, + update_resource/3, check_password_from_selected_map/3, parse_deep/1, parse_str/1, @@ -66,9 +66,8 @@ %% APIs %%-------------------------------------------------------------------- -create_resource(Type, ResourceId, Module, Config) -> +create_resource(ResourceId, Module, Config) -> Result = emqx_resource:create_local( - Type, ResourceId, ?AUTHN_RESOURCE_GROUP, Module, @@ -77,9 +76,9 @@ create_resource(Type, ResourceId, Module, Config) -> ), start_resource_if_enabled(Result, ResourceId, Config). -update_resource(Type, Module, Config, ResourceId) -> +update_resource(Module, Config, ResourceId) -> Result = emqx_resource:recreate_local( - Type, ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS + ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS ), start_resource_if_enabled(Result, ResourceId, Config). diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl index 533f982c1..e4343b6fa 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz_utils.erl @@ -25,9 +25,9 @@ -export([ cleanup_resources/0, make_resource_id/1, + create_resource/2, create_resource/3, - create_resource/4, - update_resource/3, + update_resource/2, remove_resource/1, update_config/2, parse_deep/2, @@ -57,13 +57,12 @@ %% APIs %%-------------------------------------------------------------------- -create_resource(Type, Module, Config) -> +create_resource(Module, Config) -> ResourceId = make_resource_id(Module), - create_resource(Type, ResourceId, Module, Config). + create_resource(ResourceId, Module, Config). -create_resource(Type, ResourceId, Module, Config) -> +create_resource(ResourceId, Module, Config) -> Result = emqx_resource:create_local( - Type, ResourceId, ?AUTHZ_RESOURCE_GROUP, Module, @@ -72,11 +71,10 @@ create_resource(Type, ResourceId, Module, Config) -> ), start_resource_if_enabled(Result, ResourceId, Config). -update_resource(Type, Module, #{annotations := #{id := ResourceId}} = Config) -> +update_resource(Module, #{annotations := #{id := ResourceId}} = Config) -> Result = case emqx_resource:recreate_local( - Type, ResourceId, Module, Config, diff --git a/apps/emqx_auth_http/src/emqx_authn_http.erl b/apps/emqx_auth_http/src/emqx_authn_http.erl index ed151428f..d9c5c5ed5 100644 --- a/apps/emqx_auth_http/src/emqx_authn_http.erl +++ b/apps/emqx_auth_http/src/emqx_authn_http.erl @@ -40,7 +40,6 @@ create(Config0) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), % {Config, State} = parse_config(Config0), {ok, _Data} = emqx_authn_utils:create_resource( - http, ResourceId, emqx_bridge_http_connector, Config @@ -51,9 +50,7 @@ create(Config0) -> update(Config0, #{resource_id := ResourceId} = _State) -> with_validated_config(Config0, fun(Config, NState) -> % {Config, NState} = parse_config(Config0), - case - emqx_authn_utils:update_resource(http, emqx_bridge_http_connector, Config, ResourceId) - of + case emqx_authn_utils:update_resource(emqx_bridge_http_connector, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_http/src/emqx_authz_http.erl b/apps/emqx_auth_http/src/emqx_authz_http.erl index 2a1a2638b..a858ec7d7 100644 --- a/apps/emqx_auth_http/src/emqx_authz_http.erl +++ b/apps/emqx_auth_http/src/emqx_authz_http.erl @@ -67,13 +67,15 @@ create(Config) -> NConfig = parse_config(Config), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {ok, _Data} = emqx_authz_utils:create_resource( - http, ResourceId, emqx_bridge_http_connector, NConfig + ResourceId, + emqx_bridge_http_connector, + NConfig ), NConfig#{annotations => #{id => ResourceId}}. update(Config) -> NConfig = parse_config(Config), - case emqx_authz_utils:update_resource(http, emqx_bridge_http_connector, NConfig) of + case emqx_authz_utils:update_resource(emqx_bridge_http_connector, NConfig) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> NConfig#{annotations => #{id => Id}} end. diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl index ffa8175b7..22aed8e57 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl @@ -22,6 +22,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -32,6 +33,8 @@ -define(DEFAULT_POOL_SIZE, 8). +resource_type() -> jwks. + callback_mode() -> always_sync. on_start(InstId, Opts) -> diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl index 2f8ebec36..1d0d58474 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl @@ -183,7 +183,6 @@ do_create( ) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {ok, _Data} = emqx_resource:create_local( - jwt, ResourceId, ?AUTHN_RESOURCE_GROUP, emqx_authn_jwks_connector, diff --git a/apps/emqx_auth_ldap/src/emqx_authn_ldap.erl b/apps/emqx_auth_ldap/src/emqx_authn_ldap.erl index 8a9b5650a..a18236d15 100644 --- a/apps/emqx_auth_ldap/src/emqx_authn_ldap.erl +++ b/apps/emqx_auth_ldap/src/emqx_authn_ldap.erl @@ -40,12 +40,12 @@ create(_AuthenticatorID, Config) -> do_create(Module, Config) -> ResourceId = emqx_authn_utils:make_resource_id(Module), State = parse_config(Config), - {ok, _Data} = emqx_authn_utils:create_resource(ldap, ResourceId, emqx_ldap, Config), + {ok, _Data} = emqx_authn_utils:create_resource(ResourceId, emqx_ldap, Config), {ok, State#{resource_id => ResourceId}}. update(Config, #{resource_id := ResourceId} = _State) -> NState = parse_config(Config), - case emqx_authn_utils:update_resource(ldap, emqx_ldap, Config, ResourceId) of + case emqx_authn_utils:update_resource(emqx_ldap, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_ldap/src/emqx_authz_ldap.erl b/apps/emqx_auth_ldap/src/emqx_authz_ldap.erl index e70b840a3..24bd8c008 100644 --- a/apps/emqx_auth_ldap/src/emqx_authz_ldap.erl +++ b/apps/emqx_auth_ldap/src/emqx_authz_ldap.erl @@ -56,12 +56,12 @@ description() -> create(Source) -> ResourceId = emqx_authz_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(ldap, ResourceId, emqx_ldap, Source), + {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_ldap, Source), Annotations = new_annotations(#{id => ResourceId}, Source), Source#{annotations => Annotations}. update(Source) -> - case emqx_authz_utils:update_resource(ldap, emqx_ldap, Source) of + case emqx_authz_utils:update_resource(emqx_ldap, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> diff --git a/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl b/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl index f6ee582be..ac941f268 100644 --- a/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl +++ b/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl @@ -47,7 +47,6 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( - ldap, ?LDAP_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_ldap, diff --git a/apps/emqx_auth_ldap/test/emqx_authn_ldap_bind_SUITE.erl b/apps/emqx_auth_ldap/test/emqx_authn_ldap_bind_SUITE.erl index c39225e74..d8e9c0d8e 100644 --- a/apps/emqx_auth_ldap/test/emqx_authn_ldap_bind_SUITE.erl +++ b/apps/emqx_auth_ldap/test/emqx_authn_ldap_bind_SUITE.erl @@ -47,7 +47,6 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( - ldap, ?LDAP_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_ldap, diff --git a/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl b/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl index 3f8c0ba63..09875a3fa 100644 --- a/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl +++ b/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl @@ -178,7 +178,6 @@ stop_apps(Apps) -> create_ldap_resource() -> {ok, _} = emqx_resource:create_local( - ldap, ?LDAP_RESOURCE, ?AUTHZ_RESOURCE_GROUP, emqx_ldap, diff --git a/apps/emqx_auth_mongodb/src/emqx_authn_mongodb.erl b/apps/emqx_auth_mongodb/src/emqx_authn_mongodb.erl index ffe78159d..75a474c0c 100644 --- a/apps/emqx_auth_mongodb/src/emqx_authn_mongodb.erl +++ b/apps/emqx_auth_mongodb/src/emqx_authn_mongodb.erl @@ -37,7 +37,6 @@ create(Config0) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {Config, State} = parse_config(Config0), {ok, _Data} = emqx_authn_utils:create_resource( - mongodb, ResourceId, emqx_mongodb, Config @@ -46,7 +45,7 @@ create(Config0) -> update(Config0, #{resource_id := ResourceId} = _State) -> {Config, NState} = parse_config(Config0), - case emqx_authn_utils:update_resource(mongodb, emqx_mongodb, Config, ResourceId) of + case emqx_authn_utils:update_resource(emqx_mongodb, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_mongodb/src/emqx_authz_mongodb.erl b/apps/emqx_auth_mongodb/src/emqx_authz_mongodb.erl index 0d65afe0d..0bab6ef90 100644 --- a/apps/emqx_auth_mongodb/src/emqx_authz_mongodb.erl +++ b/apps/emqx_auth_mongodb/src/emqx_authz_mongodb.erl @@ -49,13 +49,13 @@ description() -> create(#{filter := Filter} = Source) -> ResourceId = emqx_authz_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(mongodb, ResourceId, emqx_mongodb, Source), + {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_mongodb, Source), FilterTemp = emqx_authz_utils:parse_deep(Filter, ?ALLOWED_VARS), Source#{annotations => #{id => ResourceId}, filter_template => FilterTemp}. update(#{filter := Filter} = Source) -> FilterTemp = emqx_authz_utils:parse_deep(Filter, ?ALLOWED_VARS), - case emqx_authz_utils:update_resource(mongodb, emqx_mongodb, Source) of + case emqx_authz_utils:update_resource(emqx_mongodb, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> diff --git a/apps/emqx_auth_mysql/src/emqx_authn_mysql.erl b/apps/emqx_auth_mysql/src/emqx_authn_mysql.erl index 7d03ca856..f68c74a14 100644 --- a/apps/emqx_auth_mysql/src/emqx_authn_mysql.erl +++ b/apps/emqx_auth_mysql/src/emqx_authn_mysql.erl @@ -39,12 +39,12 @@ create(_AuthenticatorID, Config) -> create(Config0) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {Config, State} = parse_config(Config0), - {ok, _Data} = emqx_authn_utils:create_resource(mysql, ResourceId, emqx_mysql, Config), + {ok, _Data} = emqx_authn_utils:create_resource(ResourceId, emqx_mysql, Config), {ok, State#{resource_id => ResourceId}}. update(Config0, #{resource_id := ResourceId} = _State) -> {Config, NState} = parse_config(Config0), - case emqx_authn_utils:update_resource(mysql, emqx_mysql, Config, ResourceId) of + case emqx_authn_utils:update_resource(emqx_mysql, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl b/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl index 557eed14f..0e2b77005 100644 --- a/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl +++ b/apps/emqx_auth_mysql/src/emqx_authz_mysql.erl @@ -53,13 +53,13 @@ create(#{query := SQL} = Source0) -> {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?ALLOWED_VARS), ResourceId = emqx_authz_utils:make_resource_id(?MODULE), Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, - {ok, _Data} = emqx_authz_utils:create_resource(mysql, ResourceId, emqx_mysql, Source), + {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_mysql, Source), Source#{annotations => #{id => ResourceId, tmpl_token => TmplToken}}. update(#{query := SQL} = Source0) -> {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?ALLOWED_VARS), Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}}, - case emqx_authz_utils:update_resource(mysql, emqx_mysql, Source) of + case emqx_authz_utils:update_resource(emqx_mysql, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> diff --git a/apps/emqx_auth_mysql/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_auth_mysql/test/emqx_authn_mysql_SUITE.erl index 78b8aa8b3..8ab812fc0 100644 --- a/apps/emqx_auth_mysql/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_auth_mysql/test/emqx_authn_mysql_SUITE.erl @@ -58,7 +58,6 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( - mysql, ?MYSQL_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_mysql, diff --git a/apps/emqx_auth_mysql/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_auth_mysql/test/emqx_authz_mysql_SUITE.erl index 5fe020f1b..ce30e203e 100644 --- a/apps/emqx_auth_mysql/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_auth_mysql/test/emqx_authz_mysql_SUITE.erl @@ -446,7 +446,6 @@ stop_apps(Apps) -> create_mysql_resource() -> {ok, _} = emqx_resource:create_local( - mysql, ?MYSQL_RESOURCE, ?AUTHZ_RESOURCE_GROUP, emqx_mysql, diff --git a/apps/emqx_auth_postgresql/src/emqx_authn_postgresql.erl b/apps/emqx_auth_postgresql/src/emqx_authn_postgresql.erl index 95e8ac7ce..980c2ddd8 100644 --- a/apps/emqx_auth_postgresql/src/emqx_authn_postgresql.erl +++ b/apps/emqx_auth_postgresql/src/emqx_authn_postgresql.erl @@ -45,7 +45,6 @@ create(Config0) -> ResourceId = emqx_authn_utils:make_resource_id(?MODULE), {Config, State} = parse_config(Config0, ResourceId), {ok, _Data} = emqx_authn_utils:create_resource( - postgresql, ResourceId, emqx_postgresql, Config @@ -54,7 +53,7 @@ create(Config0) -> update(Config0, #{resource_id := ResourceId} = _State) -> {Config, NState} = parse_config(Config0, ResourceId), - case emqx_authn_utils:update_resource(postgresql, emqx_postgresql, Config, ResourceId) of + case emqx_authn_utils:update_resource(emqx_postgresql, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl b/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl index e0fc9e032..d1a0b32ea 100644 --- a/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl +++ b/apps/emqx_auth_postgresql/src/emqx_authz_postgresql.erl @@ -53,7 +53,6 @@ create(#{query := SQL0} = Source) -> {SQL, PlaceHolders} = emqx_authz_utils:parse_sql(SQL0, '$n', ?ALLOWED_VARS), ResourceID = emqx_authz_utils:make_resource_id(emqx_postgresql), {ok, _Data} = emqx_authz_utils:create_resource( - postgresql, ResourceID, emqx_postgresql, Source#{prepare_statement => #{ResourceID => SQL}} @@ -64,7 +63,6 @@ update(#{query := SQL0, annotations := #{id := ResourceID}} = Source) -> {SQL, PlaceHolders} = emqx_authz_utils:parse_sql(SQL0, '$n', ?ALLOWED_VARS), case emqx_authz_utils:update_resource( - postgresql, emqx_postgresql, Source#{prepare_statement => #{ResourceID => SQL}} ) diff --git a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl index dde5a184a..1dfd30899 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl @@ -79,7 +79,6 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( - postgresql, ?PGSQL_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_postgresql, @@ -199,13 +198,9 @@ test_user_auth(#{ t_authenticate_disabled_prepared_statements(_Config) -> ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}), - {ok, _} = emqx_resource:recreate_local( - postgresql, ?PGSQL_RESOURCE, emqx_postgresql, ResConfig, #{} - ), + {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig, #{}), on_exit(fun() -> - emqx_resource:recreate_local( - postgresql, ?PGSQL_RESOURCE, emqx_postgresql, pgsql_config(), #{} - ) + emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config(), #{}) end), ok = lists:foreach( fun(Sample0) -> diff --git a/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl index 9346bf863..78b1e17a8 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authz_postgresql_SUITE.erl @@ -437,7 +437,6 @@ pgsql_config() -> create_pgsql_resource() -> emqx_resource:create_local( - postgresql, ?PGSQL_RESOURCE, ?AUTHZ_RESOURCE_GROUP, emqx_postgresql, diff --git a/apps/emqx_auth_redis/src/emqx_authn_redis.erl b/apps/emqx_auth_redis/src/emqx_authn_redis.erl index 4eb5a36a3..779c58e39 100644 --- a/apps/emqx_auth_redis/src/emqx_authn_redis.erl +++ b/apps/emqx_auth_redis/src/emqx_authn_redis.erl @@ -42,7 +42,6 @@ create(Config0) -> Res; {Config, State} -> {ok, _Data} = emqx_authn_utils:create_resource( - redis, ResourceId, emqx_redis, Config @@ -52,7 +51,7 @@ create(Config0) -> update(Config0, #{resource_id := ResourceId} = _State) -> {Config, NState} = parse_config(Config0), - case emqx_authn_utils:update_resource(redis, emqx_redis, Config, ResourceId) of + case emqx_authn_utils:update_resource(emqx_redis, Config, ResourceId) of {error, Reason} -> error({load_config_error, Reason}); {ok, _} -> diff --git a/apps/emqx_auth_redis/src/emqx_authz_redis.erl b/apps/emqx_auth_redis/src/emqx_authz_redis.erl index b83d1abad..8ce975033 100644 --- a/apps/emqx_auth_redis/src/emqx_authz_redis.erl +++ b/apps/emqx_auth_redis/src/emqx_authz_redis.erl @@ -50,12 +50,12 @@ description() -> create(#{cmd := CmdStr} = Source) -> CmdTemplate = parse_cmd(CmdStr), ResourceId = emqx_authz_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(redis, ResourceId, emqx_redis, Source), + {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_redis, Source), Source#{annotations => #{id => ResourceId}, cmd_template => CmdTemplate}. update(#{cmd := CmdStr} = Source) -> CmdTemplate = parse_cmd(CmdStr), - case emqx_authz_utils:update_resource(redis, emqx_redis, Source) of + case emqx_authz_utils:update_resource(emqx_redis, Source) of {error, Reason} -> error({load_config_error, Reason}); {ok, Id} -> diff --git a/apps/emqx_auth_redis/test/emqx_authn_redis_SUITE.erl b/apps/emqx_auth_redis/test/emqx_authn_redis_SUITE.erl index 1e9d825d2..e8c8760de 100644 --- a/apps/emqx_auth_redis/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_auth_redis/test/emqx_authn_redis_SUITE.erl @@ -63,7 +63,6 @@ init_per_suite(Config) -> work_dir => ?config(priv_dir, Config) }), {ok, _} = emqx_resource:create_local( - redis, ?REDIS_RESOURCE, ?AUTHN_RESOURCE_GROUP, emqx_redis, diff --git a/apps/emqx_auth_redis/test/emqx_authz_redis_SUITE.erl b/apps/emqx_auth_redis/test/emqx_authz_redis_SUITE.erl index d0c695c73..5818eea07 100644 --- a/apps/emqx_auth_redis/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_auth_redis/test/emqx_authz_redis_SUITE.erl @@ -384,7 +384,6 @@ stop_apps(Apps) -> create_redis_resource() -> {ok, _} = emqx_resource:create_local( - redis, ?REDIS_RESOURCE, ?AUTHZ_RESOURCE_GROUP, emqx_redis, diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 7e9f5300a..40db2aee9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -195,7 +195,6 @@ create(Type, Name, Conf0, Opts) -> TypeBin = bin(Type), Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, {ok, _Data} = emqx_resource:create_local( - Type, resource_id(Type, Name), <<"bridge">>, bridge_to_resource_type(Type), @@ -265,7 +264,6 @@ recreate(Type, Name, Conf0, Opts) -> TypeBin = bin(Type), Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, emqx_resource:recreate_local( - Type, resource_id(Type, Name), bridge_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), @@ -284,7 +282,7 @@ create_dry_run(Type0, Conf0) -> create_dry_run_bridge_v1(Type, Conf0) -> TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), TmpPath = emqx_utils:safe_filename(TmpName), - %% Already typechecked, no need to catch errors + %% Already type checked, no need to catch errors TypeBin = bin(Type), TypeAtom = safe_atom(Type), Conf1 = maps:without([<<"name">>], Conf0), @@ -302,7 +300,7 @@ create_dry_run_bridge_v1(Type, Conf0) -> {error, Reason}; {ok, ConfNew} -> ParseConf = parse_confs(TypeBin, TmpName, ConfNew), - emqx_resource:create_dry_run_local(Type, bridge_to_resource_type(Type), ParseConf) + emqx_resource:create_dry_run_local(bridge_to_resource_type(Type), ParseConf) end catch %% validation errors diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index dc2e8f275..d81d710ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -1110,6 +1110,7 @@ t_query_uses_action_query_mode(_Config) -> %% ... now we use a quite different query mode for the action meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), + meck:expect(con_mod(), resource_type, 0, dummy), meck:expect(con_mod(), callback_mode, 0, async_if_possible), {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 039402738..e5d13f452 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -293,6 +293,7 @@ init_mocks() -> meck:new(emqx_connector_resource, [passthrough, no_link]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), + meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect( ?CONNECTOR_IMPL, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl index 6b4001b6b..1bb7fd37f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl @@ -15,15 +15,17 @@ %% this module is only intended to be mocked -module(emqx_bridge_v2_dummy_connector). +-behavior(emqx_resource). -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, on_add_channel/4, on_get_channel_status/3 ]). - +resource_type() -> dummy. callback_mode() -> error(unexpected). on_start(_, _) -> error(unexpected). on_stop(_, _) -> error(unexpected). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index c528d097c..7daedf19a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -19,6 +19,7 @@ -export([ query_mode/1, + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -34,6 +35,8 @@ query_mode(_Config) -> sync. +resource_type() -> test_connector. + callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 946ca591a..3e7422112 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.3.1"}, + {vsn, "0.3.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index df278b791..9f830eb69 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -19,6 +19,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -94,6 +95,7 @@ desc("connector") -> %%-------------------------------------------------------------------- %% callbacks for emqx_resource +resource_type() -> cassandra. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl index 02c102832..b784d36c0 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl @@ -212,7 +212,6 @@ check_config(Config) -> create_local_resource(ResourceId, CheckedConfig) -> {ok, Bridge} = emqx_resource:create_local( - cassandra, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?CASSANDRA_RESOURCE_MOD, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index f38036b83..794d067bd 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.4.1"}, + {vsn, "0.4.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index f6888cad5..c5b82122a 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -29,6 +29,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -128,6 +129,7 @@ values(_) -> %% =================================================================== %% Callbacks defined in emqx_resource %% =================================================================== +resource_type() -> clickhouse. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl index 0b0acd78a..1c83961a5 100644 --- a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl +++ b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl @@ -109,7 +109,6 @@ t_start_passfile(Config) -> ?assertMatch( {ok, #{status := connected}}, emqx_resource:create_local( - clickhouse, ResourceID, ?CONNECTOR_RESOURCE_GROUP, ?CLICKHOUSE_RESOURCE_MOD, @@ -139,7 +138,6 @@ perform_lifecycle_check(ResourceID, InitialConfig) -> status := InitialStatus }} = emqx_resource:create_local( - clickhouse, ResourceID, ?CONNECTOR_RESOURCE_GROUP, ?CLICKHOUSE_RESOURCE_MOD, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index ac71e04e7..b8fee4dee 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 82f5fb18d..181de34a8 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -17,6 +17,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -68,6 +69,7 @@ fields(config) -> %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> dynamo. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src index 262ac84bd..8f3dc3a7e 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_es, [ {description, "EMQX Enterprise Elastic Search Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {modules, [ emqx_bridge_es, emqx_bridge_es_connector diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 20de92e6e..feccd42f1 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -14,6 +14,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -207,6 +208,8 @@ base_url(#{server := Server}) -> "http://" ++ Server. %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> elastic_search. + callback_mode() -> async_if_possible. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index d98355a90..eff7847f2 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.3.1"}, + {vsn, "0.3.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 5c51cd2d9..344fc05c6 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -8,6 +8,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -84,6 +85,8 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +-spec resource_type() -> resource_type(). +resource_type() -> gcp_pubsub_consumer. -spec callback_mode() -> callback_mode(). callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 48e50c416..aec78f74c 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -41,6 +41,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -62,6 +63,7 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +resource_type() -> gcp_pubsub. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index e4cc0aa31..a2ffd6219 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -67,6 +68,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> greptimedb. + callback_mode() -> async_if_possible. on_add_channel( diff --git a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl index 68a32e9c2..be36cb167 100644 --- a/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl +++ b/apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_connector_SUITE.erl @@ -83,7 +83,6 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := #{client := #{pool := ReturnedPoolName}} = State, status := InitialStatus }} = emqx_resource:create_local( - greptimedb, PoolName, ?CONNECTOR_RESOURCE_GROUP, ?GREPTIMEDB_RESOURCE_MOD, diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index af232accc..7ae86bba0 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index cf53291b2..154e43b3d 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -44,6 +45,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> hstreamsdb. + callback_mode() -> always_sync. on_start(InstId, Config) -> diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 91a0878c3..616b42e75 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -26,6 +26,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -183,6 +184,7 @@ sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). %% =================================================================== +resource_type() -> webhook. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index a8314541a..eae5028c6 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_influxdb, [ {description, "EMQX Enterprise InfluxDB Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 852f78485..bf93309f8 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -70,6 +71,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> influxdb. + callback_mode() -> async_if_possible. on_add_channel( diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl index a7f78f253..0ca693171 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl @@ -86,7 +86,6 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := #{client := #{pool := ReturnedPoolName}} = State, status := InitialStatus }} = emqx_resource:create_local( - influxdb, PoolName, ?CONNECTOR_RESOURCE_GROUP, ?INFLUXDB_RESOURCE_MOD, @@ -198,7 +197,6 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) -> config := #{ssl := #{enable := SslEnabled}}, status := Status }} = emqx_resource:create_local( - influxdb, PoolName, ?CONNECTOR_RESOURCE_GROUP, ?INFLUXDB_RESOURCE_MOD, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 691778cfd..88ac09da9 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_connector diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 78866ef79..ec880e785 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -15,6 +15,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -206,6 +207,8 @@ proplists_without(Keys, List) -> %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> iotdb. + callback_mode() -> async_if_possible. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 35ffbc90b..44e007415 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -7,6 +7,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -125,6 +126,7 @@ %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> kafka_consumer. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 8b6326545..6d88a329e 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -10,6 +10,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, query_mode/1, callback_mode/0, on_start/2, @@ -35,6 +36,8 @@ -define(kafka_client_id, kafka_client_id). -define(kafka_producers, kafka_producers). +resource_type() -> kafka_producer. + query_mode(#{parameters := #{query_mode := sync}}) -> simple_sync_internal_buffer; query_mode(_) -> diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src index f411b95fb..a85121905 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kinesis, [ {description, "EMQX Enterprise Amazon Kinesis Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index 95d193d92..3143cf904 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -30,6 +30,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -50,6 +51,7 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +resource_type() -> kinesis_producer. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index df9935dbb..5bb7e396d 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mongodb, [ {description, "EMQX Enterprise MongoDB Bridge"}, - {vsn, "0.3.2"}, + {vsn, "0.3.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index 6b6db358a..dac9bef57 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -11,6 +11,7 @@ %% `emqx_resource' API -export([ on_remove_channel/3, + resource_type/0, callback_mode/0, on_add_channel/4, on_get_channel_status/3, @@ -25,6 +26,7 @@ %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> emqx_mongodb:resource_type(). callback_mode() -> emqx_mongodb:callback_mode(). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 9c2506bab..118542356 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -30,6 +30,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -76,6 +77,8 @@ on_message_received(Msg, HookPoints, ResId) -> ok. %% =================================================================== +resource_type() -> mqtt. + callback_mode() -> async_if_possible. on_start(ResourceId, #{server := Server} = Conf) -> diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src index 63bc61e62..fb670e072 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mysql, [ {description, "EMQX Enterprise MySQL Bridge"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl index da9377814..6905c86eb 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl @@ -10,6 +10,7 @@ %% `emqx_resource' API -export([ on_remove_channel/3, + resource_type/0, callback_mode/0, on_add_channel/4, on_batch_query/3, @@ -24,6 +25,7 @@ %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> emqx_mysql:resource_type(). callback_mode() -> emqx_mysql:callback_mode(). diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index 65cc97e4c..a27791853 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_opents, [ {description, "EMQX Enterprise OpenTSDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 19e117a0d..a970bb374 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -18,6 +18,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -114,6 +115,8 @@ connector_example_values() -> -define(HTTP_CONNECT_TIMEOUT, 1000). +resource_type() -> opents. + callback_mode() -> always_sync. on_start( diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 835536bda..64dde77fb 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -10,6 +10,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -55,6 +56,7 @@ %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> pulsar. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index 27a4fedc4..c178b1f5e 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_bridge_rabbitmq_app, []}}, {applications, [ diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index dacb47a57..7e3e18e5f 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -31,6 +31,7 @@ on_remove_channel/3, on_get_channels/1, on_stop/2, + resource_type/0, callback_mode/0, on_get_status/2, on_get_channel_status/3, @@ -60,6 +61,7 @@ fields(config) -> %% =================================================================== %% emqx_resource callback +resource_type() -> rabbitmq. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl index fe288a185..77482ae0f 100644 --- a/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl +++ b/apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_connector_SUITE.erl @@ -135,7 +135,6 @@ check_config(Config) -> create_local_resource(ResourceID, CheckedConfig) -> {ok, Bridge} = emqx_resource:create_local( - rabbitmq, ResourceID, ?CONNECTOR_RESOURCE_GROUP, emqx_bridge_rabbitmq_connector, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index 2cd037ed5..61cd837bb 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_redis, [ {description, "EMQX Enterprise Redis Bridge"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index f117c4e7a..162c38368 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -12,6 +12,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_add_channel/4, on_remove_channel/3, @@ -29,7 +30,9 @@ %% resource callbacks %% ------------------------------------------------------------------------------------------------- -callback_mode() -> always_sync. +resource_type() -> emqx_redis:resource_type(). + +callback_mode() -> emqx_redis:callback_mode(). on_add_channel( _InstanceId, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index fc59aeeca..9657ac115 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, [ diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 4fe3ea4c4..b03602bd2 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -16,6 +16,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -90,6 +91,8 @@ servers() -> %% `emqx_resource' API %%======================================================================================== +resource_type() -> rocketmq. + callback_mode() -> always_sync. on_start( diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index fdc6d255b..a7aa7eae7 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -13,6 +13,7 @@ -behaviour(emqx_resource). -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -92,6 +93,8 @@ -define(AGGREG_SUP, emqx_bridge_s3_sup). %% +-spec resource_type() -> resource_type(). +resource_type() -> s3. -spec callback_mode() -> callback_mode(). callback_mode() -> diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index 3bc62734c..009a8d16b 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, odbc]}, {env, [ diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 603ef18d0..e14e395c0 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -30,6 +30,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -173,6 +174,7 @@ server() -> %%==================================================================== %% Callbacks defined in emqx_resource %%==================================================================== +resource_type() -> sqlserver. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src index 5ae95ca67..cd1d51b01 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_syskeeper, [ {description, "EMQX Enterprise Data bridge for Syskeeper"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 898915f56..c277faa4f 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -18,6 +18,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -147,6 +148,7 @@ server() -> %% ------------------------------------------------------------------------------------------------- %% `emqx_resource' API +resource_type() -> syskeeper. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl index e26ae43c1..d0aa44cd3 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl @@ -12,6 +12,7 @@ %% `emqx_resource' API -export([ + resource_type/0, query_mode/1, on_start/2, on_stop/2, @@ -40,6 +41,8 @@ %% ------------------------------------------------------------------------------------------------- %% emqx_resource +resource_type() -> + syskeeper_proxy_server. query_mode(_) -> no_queries. diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index d358ba8fa..5fe325b38 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_tdengine, [ {description, "EMQX Enterprise TDEngine Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 324694edc..46980e768 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -19,6 +19,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -140,6 +141,7 @@ connector_example_values() -> %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> tdengine. callback_mode() -> always_sync. diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 2a5b3bcfc..50b2132e2 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -126,7 +126,6 @@ create(Type, Name, Conf0, Opts) -> ResourceId = resource_id(Type, Name), Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, {ok, _Data} = emqx_resource:create_local( - Type, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?MODULE:connector_to_resource_type(Type), @@ -200,7 +199,6 @@ recreate(Type, Name, Conf) -> recreate(Type, Name, Conf, Opts) -> TypeBin = bin(Type), emqx_resource:recreate_local( - Type, resource_id(Type, Name), ?MODULE:connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), @@ -211,7 +209,7 @@ create_dry_run(Type, Conf) -> create_dry_run(Type, Conf, fun(_) -> ok end). create_dry_run(Type, Conf0, Callback) -> - %% Already typechecked, no need to catch errors + %% Already type checked, no need to catch errors TypeBin = bin(Type), TypeAtom = safe_atom(Type), %% We use a fixed name here to avoid creating an atom @@ -237,7 +235,7 @@ create_dry_run(Type, Conf0, Callback) -> {ok, ConfNew} -> ParseConf = parse_confs(bin(Type), TmpName, ConfNew), emqx_resource:create_dry_run_local( - Type, TmpName, ?MODULE:connector_to_resource_type(Type), ParseConf, Callback + TmpName, ?MODULE:connector_to_resource_type(Type), ParseConf, Callback ) end catch diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index fbdece6ff..8e5a6d288 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -50,6 +50,7 @@ t_connector_lifecycle({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -171,6 +172,7 @@ t_remove_fail({'init', Config}) -> meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{enable => true}}]), meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}), @@ -234,6 +236,7 @@ t_create_with_bad_name_direct_path({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -265,6 +268,7 @@ t_create_with_bad_name_root_path({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -299,6 +303,7 @@ t_no_buffer_workers({'init', Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_get_channels, 1, []), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index f3e91ef12..01f4fd188 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -224,6 +224,7 @@ init_mocks(_TestCase) -> meck:new(emqx_connector_resource, [passthrough, no_link]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), + meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect( ?CONNECTOR_IMPL, diff --git a/apps/emqx_connector/test/emqx_connector_dummy_impl.erl b/apps/emqx_connector/test/emqx_connector_dummy_impl.erl index c5d9e4f83..d506c9633 100644 --- a/apps/emqx_connector/test/emqx_connector_dummy_impl.erl +++ b/apps/emqx_connector/test/emqx_connector_dummy_impl.erl @@ -15,8 +15,10 @@ %% this module is only intended to be mocked -module(emqx_connector_dummy_impl). +-behavior(emqx_resource). -export([ + resource_type/0, query_mode/1, callback_mode/0, on_start/2, @@ -25,6 +27,7 @@ on_get_channel_status/3 ]). +resource_type() -> dummy. query_mode(_) -> error(unexpected). callback_mode() -> error(unexpected). on_start(_, _) -> error(unexpected). diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl index 6ac02efc6..60fec5171 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl @@ -132,7 +132,6 @@ make_resource_id(Backend) -> create_resource(ResourceId, Module, Config) -> Result = emqx_resource:create_local( - dashboard_sso, ResourceId, ?RESOURCE_GROUP, Module, @@ -143,7 +142,7 @@ create_resource(ResourceId, Module, Config) -> update_resource(ResourceId, Module, Config) -> Result = emqx_resource:recreate_local( - dashboard_sso, ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS + ResourceId, Module, Config, ?DEFAULT_RESOURCE_OPTS ), start_resource_if_enabled(ResourceId, Result, Config). diff --git a/apps/emqx_ldap/src/emqx_ldap.app.src b/apps/emqx_ldap/src/emqx_ldap.app.src index b0d8ec59c..1b1b667cc 100644 --- a/apps/emqx_ldap/src/emqx_ldap.app.src +++ b/apps/emqx_ldap/src/emqx_ldap.app.src @@ -1,6 +1,6 @@ {application, emqx_ldap, [ {description, "EMQX LDAP Connector"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_ldap/src/emqx_ldap.erl b/apps/emqx_ldap/src/emqx_ldap.erl index d04be5d68..67b250420 100644 --- a/apps/emqx_ldap/src/emqx_ldap.erl +++ b/apps/emqx_ldap/src/emqx_ldap.erl @@ -27,6 +27,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -129,6 +130,8 @@ ensure_username(Field) -> emqx_connector_schema_lib:username(Field). %% =================================================================== +resource_type() -> ldap. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_ldap/test/emqx_ldap_SUITE.erl b/apps/emqx_ldap/test/emqx_ldap_SUITE.erl index 413cbc3a5..a15ff2775 100644 --- a/apps/emqx_ldap/test/emqx_ldap_SUITE.erl +++ b/apps/emqx_ldap/test/emqx_ldap_SUITE.erl @@ -96,7 +96,6 @@ perform_lifecycle_check(ResourceId, InitialConfig) -> state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - ldap, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?LDAP_RESOURCE_MOD, @@ -172,7 +171,6 @@ t_get_status(Config) -> ?LDAP_RESOURCE_MOD, ldap_config(Config) ), {ok, _} = emqx_resource:create_local( - ldap, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?LDAP_RESOURCE_MOD, diff --git a/apps/emqx_mongodb/src/emqx_mongodb.app.src b/apps/emqx_mongodb/src/emqx_mongodb.app.src index 92d7026cc..51230fd47 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.app.src +++ b/apps/emqx_mongodb/src/emqx_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_mongodb, [ {description, "EMQX MongoDB Connector"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mongodb/src/emqx_mongodb.erl b/apps/emqx_mongodb/src/emqx_mongodb.erl index e262f5ccd..8d6fad89f 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.erl +++ b/apps/emqx_mongodb/src/emqx_mongodb.erl @@ -26,6 +26,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -172,6 +173,7 @@ desc(_) -> undefined. %% =================================================================== +resource_type() -> mongodb. callback_mode() -> always_sync. diff --git a/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl b/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl index 850683d99..8af05e0d3 100644 --- a/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl +++ b/apps/emqx_mongodb/test/emqx_mongodb_SUITE.erl @@ -143,7 +143,6 @@ check_config(Config) -> create_local_resource(ResourceId, CheckedConfig) -> {ok, Bridge} = emqx_resource:create_local( - mongodb, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?MONGO_RESOURCE_MOD, diff --git a/apps/emqx_mysql/src/emqx_mysql.app.src b/apps/emqx_mysql/src/emqx_mysql.app.src index 9637cc473..c7fcb0975 100644 --- a/apps/emqx_mysql/src/emqx_mysql.app.src +++ b/apps/emqx_mysql/src/emqx_mysql.app.src @@ -1,6 +1,6 @@ {application, emqx_mysql, [ {description, "EMQX MySQL Database Connector"}, - {vsn, "0.1.9"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index 6311d66f2..197e33d75 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -25,6 +25,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -91,6 +92,8 @@ server() -> emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS). %% =================================================================== +resource_type() -> mysql. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_mysql/test/emqx_mysql_SUITE.erl b/apps/emqx_mysql/test/emqx_mysql_SUITE.erl index be69140fc..03e6c6797 100644 --- a/apps/emqx_mysql/test/emqx_mysql_SUITE.erl +++ b/apps/emqx_mysql/test/emqx_mysql_SUITE.erl @@ -67,7 +67,6 @@ perform_lifecycle_check(ResourceId, InitialConfig) -> state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - mysql, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?MYSQL_RESOURCE_MOD, diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index 3f238ae9c..80ff8da09 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 5b25e049a..6e2a40f95 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -21,6 +21,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -67,6 +68,8 @@ batch_params_tokens := params_tokens() }. +resource_type() -> oracle. + % As ecpool is not monitoring the worker's PID when doing a handover_async, the % request can be lost if worker crashes. Thus, it's better to force requests to % be sync for now. diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src index 7aaf42e71..e1bd67325 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.app.src +++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src @@ -1,6 +1,6 @@ {application, emqx_postgresql, [ {description, "EMQX PostgreSQL Database Connector"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 7e64a3e83..4df8e0af1 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -29,6 +29,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -120,6 +121,8 @@ adjust_fields(Fields) -> ). %% =================================================================== +resource_type() -> postgresql. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl index 06210be86..d771d80d8 100644 --- a/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl +++ b/apps/emqx_postgresql/test/emqx_postgresql_SUITE.erl @@ -75,7 +75,6 @@ perform_lifecycle_check(ResourceId, InitialConfig) -> status := InitialStatus }} = emqx_resource:create_local( - postgresql, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?PGSQL_RESOURCE_MOD, diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 059e9aa23..9507913ed 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -28,6 +28,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -119,6 +120,8 @@ redis_type(Type) -> desc => ?DESC(Type) }}. +resource_type() -> redis. + callback_mode() -> always_sync. on_start(InstId, Config0) -> diff --git a/apps/emqx_redis/test/emqx_redis_SUITE.erl b/apps/emqx_redis/test/emqx_redis_SUITE.erl index 06ac82143..a9064f184 100644 --- a/apps/emqx_redis/test/emqx_redis_SUITE.erl +++ b/apps/emqx_redis/test/emqx_redis_SUITE.erl @@ -115,7 +115,6 @@ perform_lifecycle_check(ResourceId, InitialConfig, RedisCommand) -> state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - redis, ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?REDIS_RESOURCE_MOD, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 0c0cc8f6b..8c2bb39a1 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -23,8 +23,8 @@ %% remind us of that. -define(rm_status_stopped, stopped). --type type() :: atom() | binary(). --type resource_type() :: module(). +-type resource_type() :: atom(). +-type resource_module() :: module(). -type resource_id() :: binary(). -type channel_id() :: binary(). -type raw_resource_config() :: binary() | raw_term_resource_config(). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index e0a2e9343..721df9690 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -28,9 +28,9 @@ -export([ check_config/2, + check_and_create_local/4, check_and_create_local/5, - check_and_create_local/6, - check_and_recreate_local/5 + check_and_recreate_local/4 ]). %% Sync resource instances and files @@ -39,11 +39,11 @@ -export([ %% store the config and start the instance - create_local/6, + create_local/5, + create_dry_run_local/2, create_dry_run_local/3, create_dry_run_local/4, - create_dry_run_local/5, - recreate_local/5, + recreate_local/4, %% remove the config and stop the instance remove_local/1, reset_metrics/1, @@ -96,6 +96,7 @@ -export([ %% get the callback mode of a specific module get_callback_mode/1, + get_resource_type/1, %% start the instance call_start/3, %% verify if the resource is working normally @@ -140,9 +141,6 @@ -export([is_dry_run/1]). -%% For emqx_resource_proto_v1 rpc only --export([create_local/5, recreate_local/4, create_dry_run_local/2]). - -export_type([ query_mode/0, resource_id/0, @@ -283,42 +281,42 @@ is_resource_mod(Module) -> %% APIs for resource instances %% ================================================================================= -spec create_local( - type(), resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> {ok, resource_data()}. -create_local(Type, ResId, Group, ResourceType, Config, Opts) -> - emqx_resource_manager:ensure_resource(Type, ResId, Group, ResourceType, Config, Opts). +create_local(ResId, Group, ResourceType, Config, Opts) -> + emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts). --spec create_dry_run_local(type(), resource_type(), resource_config()) -> +-spec create_dry_run_local(resource_module(), resource_config()) -> ok | {error, Reason :: term()}. -create_dry_run_local(Type, ResourceType, Config) -> - emqx_resource_manager:create_dry_run(Type, ResourceType, Config). +create_dry_run_local(ResourceType, Config) -> + emqx_resource_manager:create_dry_run(ResourceType, Config). -create_dry_run_local(Type, ResId, ResourceType, Config) -> - emqx_resource_manager:create_dry_run(Type, ResId, ResourceType, Config). +create_dry_run_local(ResId, ResourceType, Config) -> + emqx_resource_manager:create_dry_run(ResId, ResourceType, Config). -spec create_dry_run_local( - type(), resource_id(), - resource_type(), + resource_module(), resource_config(), OnReadyCallback ) -> ok | {error, Reason :: term()} when OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). -create_dry_run_local(Type, ResId, ResourceType, Config, OnReadyCallback) -> - emqx_resource_manager:create_dry_run(Type, ResId, ResourceType, Config, OnReadyCallback). +create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) -> + emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback). --spec recreate_local(type(), resource_id(), resource_type(), resource_config(), creation_opts()) -> +-spec recreate_local( + resource_id(), resource_module(), resource_config(), creation_opts() +) -> {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(Type, ResId, ResourceType, Config, Opts) -> - emqx_resource_manager:recreate(Type, ResId, ResourceType, Config, Opts). +recreate_local(ResId, ResourceType, Config, Opts) -> + emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). -spec remove_local(resource_id()) -> ok. remove_local(ResId) -> @@ -490,6 +488,10 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group). get_callback_mode(Mod) -> Mod:callback_mode(). +-spec get_resource_type(module()) -> resource_type(). +get_resource_type(Mod) -> + Mod:resource_type(). + -spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(ResId, Mod, Config) -> @@ -602,50 +604,47 @@ query_mode(Mod, Config, Opts) -> maps:get(query_mode, Opts, sync) end. --spec check_config(resource_type(), raw_resource_config()) -> +-spec check_config(resource_module(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. check_config(ResourceType, Conf) -> emqx_hocon:check(ResourceType, Conf). -spec check_and_create_local( - type(), resource_id(), resource_group(), - resource_type(), + resource_module(), raw_resource_config() ) -> {ok, resource_data()} | {error, term()}. -check_and_create_local(Type, ResId, Group, ResourceType, RawConfig) -> - check_and_create_local(Type, ResId, Group, ResourceType, RawConfig, #{}). +check_and_create_local(ResId, Group, ResourceType, RawConfig) -> + check_and_create_local(ResId, Group, ResourceType, RawConfig, #{}). -spec check_and_create_local( - type(), resource_id(), resource_group(), - resource_type(), + resource_module(), raw_resource_config(), creation_opts() ) -> {ok, resource_data()} | {error, term()}. -check_and_create_local(Type, ResId, Group, ResourceType, RawConfig, Opts) -> +check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> check_and_do( ResourceType, RawConfig, - fun(ResConf) -> create_local(Type, ResId, Group, ResourceType, ResConf, Opts) end + fun(ResConf) -> create_local(ResId, Group, ResourceType, ResConf, Opts) end ). -spec check_and_recreate_local( - type(), resource_id(), - resource_type(), + resource_module(), raw_resource_config(), creation_opts() ) -> {ok, resource_data()} | {error, term()}. -check_and_recreate_local(Type, ResId, ResourceType, RawConfig, Opts) -> +check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) -> check_and_do( ResourceType, RawConfig, - fun(ResConf) -> recreate_local(Type, ResId, ResourceType, ResConf, Opts) end + fun(ResConf) -> recreate_local(ResId, ResourceType, ResConf, Opts) end ). check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> @@ -809,18 +808,3 @@ validate_name(Name, Opts) -> -spec invalid_data(binary()) -> no_return(). invalid_data(Reason) -> throw(#{kind => validation_error, reason => Reason}). - -%% Those functions is only used in the emqx_resource_proto_v1 -%% for versions that are less than version 5.6.0. -%% begin --spec create_local( - resource_id(), resource_group(), resource_type(), resource_config(), creation_opts() -) -> - {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create_local(ResId, Group, ResourceType, Config, Opts) -> - create_local(deprecated, ResId, Group, ResourceType, Config, Opts). -create_dry_run_local(ResourceType, Config) -> - create_dry_run_local(deprecated, ResourceType, Config). -recreate_local(ResId, ResourceType, Config, Opts) -> - recreate_local(deprecated, ResId, ResourceType, Config, Opts). -%% end diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index a742c4486..fe674630c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -25,12 +25,12 @@ % API -export([ - ensure_resource/6, - recreate/5, + ensure_resource/5, + recreate/4, remove/1, + create_dry_run/2, create_dry_run/3, create_dry_run/4, - create_dry_run/5, restart/2, start/2, stop/1, @@ -59,7 +59,7 @@ ]). % Server --export([start_link/6]). +-export([start_link/5]). % Behaviour -export([init/1, callback_mode/0, handle_event/4, terminate/3]). @@ -162,44 +162,45 @@ %% Triggers the emqx_resource_manager_sup supervisor to actually create %% and link the process itself if not already started. -spec ensure_resource( - type(), resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> {ok, resource_data()}. -ensure_resource(Type, ResId, Group, ResourceType, Config, Opts) -> +ensure_resource(ResId, Group, ResourceType, Config, Opts) -> case lookup(ResId) of {ok, _Group, Data} -> {ok, Data}; {error, not_found} -> - create_and_return_data(Type, ResId, Group, ResourceType, Config, Opts) + create_and_return_data(ResId, Group, ResourceType, Config, Opts) end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist --spec recreate(type(), resource_id(), resource_type(), resource_config(), creation_opts()) -> +-spec recreate( + resource_id(), resource_module(), resource_config(), creation_opts() +) -> {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. -recreate(Type, ResId, ResourceType, NewConfig, Opts) -> +recreate(ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of {ok, Group, #{mod := ResourceType, status := _} = _Data} -> _ = remove(ResId, false), - create_and_return_data(Type, ResId, Group, ResourceType, NewConfig, Opts); + create_and_return_data(ResId, Group, ResourceType, NewConfig, Opts); {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> {error, not_found} end. -create_and_return_data(Type, ResId, Group, ResourceType, Config, Opts) -> - _ = create(Type, ResId, Group, ResourceType, Config, Opts), +create_and_return_data(ResId, Group, ResourceType, Config, Opts) -> + _ = create(ResId, Group, ResourceType, Config, Opts), {ok, _Group, Data} = lookup(ResId), {ok, Data}. %% @doc Create a resource_manager and wait until it is running -create(Type, ResId, Group, ResourceType, Config, Opts) -> +create(ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init - ok = emqx_resource_manager_sup:ensure_child(Type, ResId, Group, ResourceType, Config, Opts), + ok = emqx_resource_manager_sup:ensure_child(ResId, Group, ResourceType, Config, Opts), % Create metrics for the resource ok = emqx_resource:create_metrics(ResId), QueryMode = emqx_resource:query_mode(ResourceType, Config, Opts), @@ -222,30 +223,32 @@ create(Type, ResId, Group, ResourceType, Config, Opts) -> %% %% Triggers the `emqx_resource_manager_sup` supervisor to actually create %% and link the process itself if not already started, and then immediately stops. --spec create_dry_run(type(), resource_type(), resource_config()) -> +-spec create_dry_run(resource_module(), resource_config()) -> ok | {error, Reason :: term()}. -create_dry_run(Type, ResourceType, Config) -> +create_dry_run(ResourceType, Config) -> ResId = make_test_id(), - create_dry_run(Type, ResId, ResourceType, Config). + create_dry_run(ResId, ResourceType, Config). -create_dry_run(Type, ResId, ResourceType, Config) -> - create_dry_run(Type, ResId, ResourceType, Config, fun do_nothing_on_ready/1). +create_dry_run(ResId, ResourceType, Config) -> + create_dry_run(ResId, ResourceType, Config, fun do_nothing_on_ready/1). do_nothing_on_ready(_ResId) -> ok. --spec create_dry_run(type(), resource_id(), resource_type(), resource_config(), OnReadyCallback) -> +-spec create_dry_run( + resource_id(), resource_module(), resource_config(), OnReadyCallback +) -> ok | {error, Reason :: term()} when OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). -create_dry_run(Type, ResId, ResourceType, Config, OnReadyCallback) -> +create_dry_run(ResId, ResourceType, Config, OnReadyCallback) -> Opts = case is_map(Config) of true -> maps:get(resource_opts, Config, #{}); false -> #{} end, ok = emqx_resource_manager_sup:ensure_child( - Type, ResId, <<"dry_run">>, ResourceType, Config, Opts + ResId, <<"dry_run">>, ResourceType, Config, Opts ), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000), @@ -495,7 +498,7 @@ try_clean_allocated_resources(ResId) -> %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server -start_link(Type, ResId, Group, ResourceType, Config, Opts) -> +start_link(ResId, Group, ResourceType, Config, Opts) -> QueryMode = emqx_resource:query_mode( ResourceType, Config, @@ -503,7 +506,7 @@ start_link(Type, ResId, Group, ResourceType, Config, Opts) -> ), Data = #data{ id = ResId, - type = Type, + type = emqx_resource:get_resource_type(ResourceType), group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index c14b08f94..8542eec1c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -19,16 +19,14 @@ -include("emqx_resource.hrl"). --export([ensure_child/6, delete_child/1]). +-export([ensure_child/5, delete_child/1]). -export([start_link/0]). -export([init/1]). -ensure_child(Type, ResId, Group, ResourceType, Config, Opts) -> - case - supervisor:start_child(?MODULE, child_spec(Type, ResId, Group, ResourceType, Config, Opts)) - of +ensure_child(ResId, Group, ResourceType, Config, Opts) -> + case supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)) of {error, Reason} -> %% This should not happen in production but it can be a huge time sink in %% development environments if the error is just silently ignored. @@ -57,11 +55,11 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, ChildSpecs}}. -child_spec(Type, ResId, Group, ResourceType, Config, Opts) -> +child_spec(ResId, Group, ResourceType, Config, Opts) -> #{ id => ResId, start => - {emqx_resource_manager, start_link, [Type, ResId, Group, ResourceType, Config, Opts]}, + {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, restart => transient, %% never force kill a resource manager. %% because otherwise it may lead to release leak, diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index 859b9fa52..47b724271 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -40,7 +40,7 @@ deprecated_since() -> -spec create( resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> @@ -51,7 +51,7 @@ create(ResId, Group, ResourceType, Config, Opts) -> ]). -spec create_dry_run( - resource_type(), + resource_module(), resource_config() ) -> ok | {error, Reason :: term()}. @@ -60,7 +60,7 @@ create_dry_run(ResourceType, Config) -> -spec recreate( resource_id(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 0fc11cc66..e068defb1 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -24,6 +24,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -62,6 +63,8 @@ register(required) -> true; register(default) -> false; register(_) -> undefined. +resource_type() -> demo. + callback_mode() -> persistent_term:get(?CM_KEY). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 8e16ec26a..b8fc66c10 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -23,7 +23,6 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(TEST_RESOURCE, emqx_connector_demo). --define(TYPE, test). -define(ID, <<"id">>). -define(ID1, <<"id1">>). -define(DEFAULT_RESOURCE_GROUP, <<"default">>). @@ -91,7 +90,6 @@ t_create_remove(_) -> ?assertMatch( {error, _}, emqx_resource:check_and_create_local( - ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -112,7 +110,6 @@ t_create_remove(_) -> ?assertMatch( {ok, _}, emqx_resource:recreate_local( - ?TYPE, ?ID, ?TEST_RESOURCE, #{name => test_resource}, @@ -138,7 +135,6 @@ t_create_remove_local(_) -> ?assertMatch( {error, _}, emqx_resource:check_and_create_local( - ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -157,7 +153,6 @@ t_create_remove_local(_) -> ), emqx_resource:recreate_local( - ?TYPE, ?ID, ?TEST_RESOURCE, #{name => test_resource}, @@ -171,7 +166,6 @@ t_create_remove_local(_) -> emqx_resource:set_resource_status_connecting(?ID), emqx_resource:recreate_local( - ?TYPE, ?ID, ?TEST_RESOURCE, #{name => test_resource}, @@ -943,7 +937,6 @@ t_stop_start(_) -> ?assertMatch( {error, _}, emqx_resource:check_and_create_local( - ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -954,7 +947,6 @@ t_stop_start(_) -> ?assertMatch( {ok, _}, emqx_resource:check_and_create_local( - ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -972,7 +964,6 @@ t_stop_start(_) -> ?assertMatch( {ok, _}, emqx_resource:check_and_recreate_local( - ?TYPE, ?ID, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>}, @@ -1022,7 +1013,6 @@ t_stop_start_local(_) -> ?assertMatch( {error, _}, emqx_resource:check_and_create_local( - ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -1033,7 +1023,6 @@ t_stop_start_local(_) -> ?assertMatch( {ok, _}, emqx_resource:check_and_create_local( - ?TYPE, ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, @@ -1044,7 +1033,6 @@ t_stop_start_local(_) -> ?assertMatch( {ok, _}, emqx_resource:check_and_recreate_local( - ?TYPE, ?ID, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>}, @@ -1120,7 +1108,6 @@ create_dry_run_local_succ() -> ?assertEqual( ok, emqx_resource:create_dry_run_local( - test, ?TEST_RESOURCE, #{name => test_resource, register => true} ) @@ -1131,7 +1118,6 @@ t_create_dry_run_local_failed(_) -> ct:timetrap({seconds, 120}), ct:pal("creating with creation error"), Res1 = emqx_resource:create_dry_run_local( - test, ?TEST_RESOURCE, #{create_error => true} ), @@ -1139,7 +1125,6 @@ t_create_dry_run_local_failed(_) -> ct:pal("creating with health check error"), Res2 = emqx_resource:create_dry_run_local( - test, ?TEST_RESOURCE, #{name => test_resource, health_check_error => true} ), @@ -1147,7 +1132,6 @@ t_create_dry_run_local_failed(_) -> ct:pal("creating with stop error"), Res3 = emqx_resource:create_dry_run_local( - test, ?TEST_RESOURCE, #{name => test_resource, stop_error => true} ), @@ -3506,10 +3490,10 @@ gauge_metric_set_fns() -> ]. create(Id, Group, Type, Config) -> - emqx_resource:create_local(test, Id, Group, Type, Config, #{}). + emqx_resource:create_local(Id, Group, Type, Config, #{}). create(Id, Group, Type, Config, Opts) -> - emqx_resource:create_local(test, Id, Group, Type, Config, Opts). + emqx_resource:create_local(Id, Group, Type, Config, Opts). log_consistency_prop() -> {"check state and cache consistency", fun ?MODULE:log_consistency_prop/1}. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl index 6a0d6b3ec..cf5d3afbe 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl @@ -25,6 +25,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -40,6 +41,8 @@ ]). %% =================================================================== +resource_type() -> test_connector. + callback_mode() -> always_sync. on_start(