diff --git a/apps/emqx_auth/include/emqx_authn.hrl b/apps/emqx_auth/include/emqx_authn.hrl index 1f2c6b8b9..8ffab1203 100644 --- a/apps/emqx_auth/include/emqx_authn.hrl +++ b/apps/emqx_auth/include/emqx_authn.hrl @@ -30,6 +30,6 @@ -type authenticator_id() :: binary(). --define(AUTHN_RESOURCE_GROUP, <<"emqx_authn">>). +-define(AUTHN_RESOURCE_GROUP, <<"authn">>). -endif. diff --git a/apps/emqx_auth/include/emqx_authz.hrl b/apps/emqx_auth/include/emqx_authz.hrl index 6dc80cb3f..73ff44c25 100644 --- a/apps/emqx_auth/include/emqx_authz.hrl +++ b/apps/emqx_auth/include/emqx_authz.hrl @@ -158,7 +158,7 @@ count => 1 }). --define(AUTHZ_RESOURCE_GROUP, <<"emqx_authz">>). +-define(AUTHZ_RESOURCE_GROUP, <<"authz">>). -define(AUTHZ_FEATURES, [rich_actions]). diff --git a/apps/emqx_auth_http/src/emqx_authz_http.erl b/apps/emqx_auth_http/src/emqx_authz_http.erl index 6b0152b7d..a858ec7d7 100644 --- a/apps/emqx_auth_http/src/emqx_authz_http.erl +++ b/apps/emqx_auth_http/src/emqx_authz_http.erl @@ -66,7 +66,11 @@ description() -> create(Config) -> NConfig = parse_config(Config), ResourceId = emqx_authn_utils:make_resource_id(?MODULE), - {ok, _Data} = emqx_authz_utils:create_resource(ResourceId, emqx_bridge_http_connector, NConfig), + {ok, _Data} = emqx_authz_utils:create_resource( + ResourceId, + emqx_bridge_http_connector, + NConfig + ), NConfig#{annotations => #{id => ResourceId}}. update(Config) -> diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl index ffa8175b7..22aed8e57 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwks_connector.erl @@ -22,6 +22,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -32,6 +33,8 @@ -define(DEFAULT_POOL_SIZE, 8). +resource_type() -> jwks. + callback_mode() -> always_sync. on_start(InstId, Opts) -> diff --git a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl index 18c954ab5..1d0d58474 100644 --- a/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl +++ b/apps/emqx_auth_jwt/src/emqx_authn_jwt.erl @@ -186,7 +186,8 @@ do_create( ResourceId, ?AUTHN_RESOURCE_GROUP, emqx_authn_jwks_connector, - connector_opts(Config) + connector_opts(Config), + #{} ), {ok, #{ jwk_resource => ResourceId, diff --git a/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src b/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src index d84d6ff81..a58117356 100644 --- a/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src +++ b/apps/emqx_auth_ldap/src/emqx_auth_ldap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_ldap, [ {description, "EMQX LDAP Authentication and Authorization"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {mod, {emqx_auth_ldap_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src b/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src index df3cc1268..5ffc59787 100644 --- a/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src +++ b/apps/emqx_auth_mongodb/src/emqx_auth_mongodb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mongodb, [ {description, "EMQX MongoDB Authentication and Authorization"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {mod, {emqx_auth_mongodb_app, []}}, {applications, [ diff --git a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl index 50bff634d..1dfd30899 100644 --- a/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl +++ b/apps/emqx_auth_postgresql/test/emqx_authn_postgresql_SUITE.erl @@ -198,9 +198,9 @@ test_user_auth(#{ t_authenticate_disabled_prepared_statements(_Config) -> ResConfig = maps:merge(pgsql_config(), #{disable_prepared_statements => true}), - {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig), + {ok, _} = emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, ResConfig, #{}), on_exit(fun() -> - emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config()) + emqx_resource:recreate_local(?PGSQL_RESOURCE, emqx_postgresql, pgsql_config(), #{}) end), ok = lists:foreach( fun(Sample0) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index d2408ca73..40db2aee9 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -196,7 +196,7 @@ create(Type, Name, Conf0, Opts) -> Conf = Conf0#{bridge_type => TypeBin, bridge_name => Name}, {ok, _Data} = emqx_resource:create_local( resource_id(Type, Name), - <<"emqx_bridge">>, + <<"bridge">>, bridge_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) @@ -282,7 +282,7 @@ create_dry_run(Type0, Conf0) -> create_dry_run_bridge_v1(Type, Conf0) -> TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), TmpPath = emqx_utils:safe_filename(TmpName), - %% Already typechecked, no need to catch errors + %% Already type checked, no need to catch errors TypeBin = bin(Type), TypeAtom = safe_atom(Type), Conf1 = maps:without([<<"name">>], Conf0), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index dc2e8f275..d81d710ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -1110,6 +1110,7 @@ t_query_uses_action_query_mode(_Config) -> %% ... now we use a quite different query mode for the action meck:expect(con_mod(), query_mode, 1, simple_async_internal_buffer), + meck:expect(con_mod(), resource_type, 0, dummy), meck:expect(con_mod(), callback_mode, 0, async_if_possible), {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 039402738..e5d13f452 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -293,6 +293,7 @@ init_mocks() -> meck:new(emqx_connector_resource, [passthrough, no_link]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), + meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect( ?CONNECTOR_IMPL, diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl index 6b4001b6b..1bb7fd37f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_dummy_connector.erl @@ -15,15 +15,17 @@ %% this module is only intended to be mocked -module(emqx_bridge_v2_dummy_connector). +-behavior(emqx_resource). -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, on_add_channel/4, on_get_channel_status/3 ]). - +resource_type() -> dummy. callback_mode() -> error(unexpected). on_start(_, _) -> error(unexpected). on_stop(_, _) -> error(unexpected). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index c528d097c..7daedf19a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -19,6 +19,7 @@ -export([ query_mode/1, + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -34,6 +35,8 @@ query_mode(_Config) -> sync. +resource_type() -> test_connector. + callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src index 946ca591a..3e7422112 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_cassandra, [ {description, "EMQX Enterprise Cassandra Bridge"}, - {vsn, "0.3.1"}, + {vsn, "0.3.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index df278b791..9f830eb69 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -19,6 +19,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -94,6 +95,7 @@ desc("connector") -> %%-------------------------------------------------------------------- %% callbacks for emqx_resource +resource_type() -> cassandra. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index f38036b83..794d067bd 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.4.1"}, + {vsn, "0.4.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index f6888cad5..c5b82122a 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -29,6 +29,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -128,6 +129,7 @@ values(_) -> %% =================================================================== %% Callbacks defined in emqx_resource %% =================================================================== +resource_type() -> clickhouse. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src index ac71e04e7..b8fee4dee 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_dynamo, [ {description, "EMQX Enterprise Dynamo Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 82f5fb18d..181de34a8 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -17,6 +17,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -68,6 +69,7 @@ fields(config) -> %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> dynamo. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src index 262ac84bd..8f3dc3a7e 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.app.src +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_es, [ {description, "EMQX Enterprise Elastic Search Bridge"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {modules, [ emqx_bridge_es, emqx_bridge_es_connector diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 20de92e6e..feccd42f1 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -14,6 +14,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -207,6 +208,8 @@ base_url(#{server := Server}) -> "http://" ++ Server. %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> elastic_search. + callback_mode() -> async_if_possible. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src index d98355a90..eff7847f2 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_gcp_pubsub, [ {description, "EMQX Enterprise GCP Pub/Sub Bridge"}, - {vsn, "0.3.1"}, + {vsn, "0.3.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 5c51cd2d9..344fc05c6 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -8,6 +8,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -84,6 +85,8 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +-spec resource_type() -> resource_type(). +resource_type() -> gcp_pubsub_consumer. -spec callback_mode() -> callback_mode(). callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 48e50c416..aec78f74c 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -41,6 +41,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -62,6 +63,7 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +resource_type() -> gcp_pubsub. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index e4cc0aa31..a2ffd6219 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -67,6 +68,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> greptimedb. + callback_mode() -> async_if_possible. on_add_channel( diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src index af232accc..7ae86bba0 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_hstreamdb, [ {description, "EMQX Enterprise HStreamDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index cf53291b2..2d061e455 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -44,6 +45,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> hstreamdb. + callback_mode() -> always_sync. on_start(InstId, Config) -> diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 91a0878c3..616b42e75 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -26,6 +26,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -183,6 +184,7 @@ sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). %% =================================================================== +resource_type() -> webhook. callback_mode() -> async_if_possible. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index a8314541a..eae5028c6 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_influxdb, [ {description, "EMQX Enterprise InfluxDB Bridge"}, - {vsn, "0.2.3"}, + {vsn, "0.2.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 852f78485..bf93309f8 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -16,6 +16,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -70,6 +71,8 @@ %% ------------------------------------------------------------------------------------------------- %% resource callback +resource_type() -> influxdb. + callback_mode() -> async_if_possible. on_add_channel( diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 691778cfd..88ac09da9 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_connector diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 78866ef79..ec880e785 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -15,6 +15,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -206,6 +207,8 @@ proplists_without(Keys, List) -> %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> iotdb. + callback_mode() -> async_if_possible. -spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c4f66dfff..44e007415 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -7,6 +7,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -125,6 +126,7 @@ %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> kafka_consumer. callback_mode() -> async_if_possible. @@ -628,16 +630,6 @@ consumer_group_id(BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. --spec is_dry_run(connector_resource_id()) -> boolean(). -is_dry_run(ConnectorResId) -> - TestIdStart = string:find(ConnectorResId, ?TEST_ID_PREFIX), - case TestIdStart of - nomatch -> - false; - _ -> - string:equal(TestIdStart, ConnectorResId) - end. - -spec check_client_connectivity(pid()) -> ?status_connected | ?status_disconnected @@ -673,7 +665,7 @@ maybe_clean_error(Reason) -> -spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom(). make_client_id(ConnectorResId, BridgeType, BridgeName) -> - case is_dry_run(ConnectorResId) of + case emqx_resource:is_dry_run(ConnectorResId) of false -> ClientID0 = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), binary_to_atom(ClientID0); diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index b819925ac..6d88a329e 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -10,6 +10,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, query_mode/1, callback_mode/0, on_start/2, @@ -35,6 +36,8 @@ -define(kafka_client_id, kafka_client_id). -define(kafka_producers, kafka_producers). +resource_type() -> kafka_producer. + query_mode(#{parameters := #{query_mode := sync}}) -> simple_sync_internal_buffer; query_mode(_) -> @@ -137,14 +140,7 @@ create_producers_for_bridge_v2( KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), - TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), - IsDryRun = - case TestIdStart of - nomatch -> - false; - _ -> - string:equal(TestIdStart, InstId) - end, + IsDryRun = emqx_resource:is_dry_run(BridgeV2Id), ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src index f411b95fb..a85121905 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kinesis, [ {description, "EMQX Enterprise Amazon Kinesis Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index 95d193d92..3143cf904 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -30,6 +30,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -50,6 +51,7 @@ %%------------------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------------------- +resource_type() -> kinesis_producer. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src index df9935dbb..5bb7e396d 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mongodb, [ {description, "EMQX Enterprise MongoDB Bridge"}, - {vsn, "0.3.2"}, + {vsn, "0.3.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index 6b6db358a..dac9bef57 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -11,6 +11,7 @@ %% `emqx_resource' API -export([ on_remove_channel/3, + resource_type/0, callback_mode/0, on_add_channel/4, on_get_channel_status/3, @@ -25,6 +26,7 @@ %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> emqx_mongodb:resource_type(). callback_mode() -> emqx_mongodb:callback_mode(). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 26b8967f0..d43ec5591 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_mqtt, [ {description, "EMQX MQTT Broker Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index d507d11b8..118542356 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -30,6 +30,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -76,6 +77,8 @@ on_message_received(Msg, HookPoints, ResId) -> ok. %% =================================================================== +resource_type() -> mqtt. + callback_mode() -> async_if_possible. on_start(ResourceId, #{server := Server} = Conf) -> @@ -207,7 +210,7 @@ start_mqtt_clients(ResourceId, Conf) -> start_mqtt_clients(ResourceId, Conf, ClientOpts). start_mqtt_clients(ResourceId, StartConf, ClientOpts) -> - PoolName = <>, + PoolName = ResourceId, #{ pool_size := PoolSize } = StartConf, @@ -227,7 +230,7 @@ start_mqtt_clients(ResourceId, StartConf, ClientOpts) -> on_stop(ResourceId, State) -> ?SLOG(info, #{ msg => "stopping_mqtt_connector", - connector => ResourceId + resource_id => ResourceId }), %% on_stop can be called with State = undefined StateMap = @@ -271,7 +274,7 @@ on_query( on_query(ResourceId, {_ChannelId, Msg}, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", - connector => ResourceId, + resource_id => ResourceId, message => Msg, reason => "Egress is not configured" }). @@ -298,7 +301,7 @@ on_query_async( on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", - connector => ResourceId, + resource_id => ResourceId, message => Msg, reason => "Egress is not configured" }). @@ -463,8 +466,10 @@ connect(Options) -> {ok, Pid} -> connect(Pid, Name); {error, Reason} = Error -> - ?SLOG(error, #{ + IsDryRun = emqx_resource:is_dry_run(Name), + ?SLOG(?LOG_LEVEL(IsDryRun), #{ msg => "client_start_failed", + resource_id => Name, config => emqx_utils:redact(ClientOpts), reason => Reason }), @@ -508,10 +513,11 @@ connect(Pid, Name) -> {ok, _Props} -> {ok, Pid}; {error, Reason} = Error -> - ?SLOG(warning, #{ + IsDryRun = emqx_resource:is_dry_run(Name), + ?SLOG(?LOG_LEVEL(IsDryRun), #{ msg => "ingress_client_connect_failed", reason => Reason, - name => Name + resource_id => Name }), _ = catch emqtt:stop(Pid), Error diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src index 63bc61e62..fb670e072 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mysql, [ {description, "EMQX Enterprise MySQL Bridge"}, - {vsn, "0.1.7"}, + {vsn, "0.1.8"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl index da9377814..6905c86eb 100644 --- a/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl +++ b/apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl @@ -10,6 +10,7 @@ %% `emqx_resource' API -export([ on_remove_channel/3, + resource_type/0, callback_mode/0, on_add_channel/4, on_batch_query/3, @@ -24,6 +25,7 @@ %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> emqx_mysql:resource_type(). callback_mode() -> emqx_mysql:callback_mode(). diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src index 65cc97e4c..a27791853 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_opents, [ {description, "EMQX Enterprise OpenTSDB Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 19e117a0d..a970bb374 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -18,6 +18,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -114,6 +115,8 @@ connector_example_values() -> -define(HTTP_CONNECT_TIMEOUT, 1000). +resource_type() -> opents. + callback_mode() -> always_sync. on_start( diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src index a8eeba483..dcb86a3ca 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_pulsar, [ {description, "EMQX Pulsar Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 9d269493d..64dde77fb 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -10,6 +10,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -55,6 +56,7 @@ %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- +resource_type() -> pulsar. callback_mode() -> async_if_possible. @@ -255,7 +257,7 @@ format_servers(Servers0) -> -spec make_client_id(resource_id()) -> pulsar_client_id(). make_client_id(InstanceId) -> - case is_dry_run(InstanceId) of + case emqx_resource:is_dry_run(InstanceId) of true -> pulsar_producer_probe; false -> @@ -269,14 +271,6 @@ make_client_id(InstanceId) -> binary_to_atom(ClientIdBin) end. --spec is_dry_run(resource_id()) -> boolean(). -is_dry_run(InstanceId) -> - TestIdStart = string:find(InstanceId, ?TEST_ID_PREFIX), - case TestIdStart of - nomatch -> false; - _ -> string:equal(TestIdStart, InstanceId) - end. - conn_opts(#{authentication := none}) -> #{}; conn_opts(#{authentication := #{username := Username, password := Password}}) -> @@ -297,7 +291,7 @@ replayq_dir(ClientId) -> filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]). producer_name(InstanceId, ChannelId) -> - case is_dry_run(InstanceId) of + case emqx_resource:is_dry_run(InstanceId) of %% do not create more atom true -> pulsar_producer_probe_worker; diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src index 27a4fedc4..c178b1f5e 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rabbitmq, [ {description, "EMQX Enterprise RabbitMQ Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {mod, {emqx_bridge_rabbitmq_app, []}}, {applications, [ diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index dacb47a57..7e3e18e5f 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -31,6 +31,7 @@ on_remove_channel/3, on_get_channels/1, on_stop/2, + resource_type/0, callback_mode/0, on_get_status/2, on_get_channel_status/3, @@ -60,6 +61,7 @@ fields(config) -> %% =================================================================== %% emqx_resource callback +resource_type() -> rabbitmq. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src index 2cd037ed5..61cd837bb 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_redis, [ {description, "EMQX Enterprise Redis Bridge"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index f117c4e7a..162c38368 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -12,6 +12,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_add_channel/4, on_remove_channel/3, @@ -29,7 +30,9 @@ %% resource callbacks %% ------------------------------------------------------------------------------------------------- -callback_mode() -> always_sync. +resource_type() -> emqx_redis:resource_type(). + +callback_mode() -> emqx_redis:callback_mode(). on_add_channel( _InstanceId, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index fc59aeeca..9657ac115 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, {env, [ diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 4fe3ea4c4..b03602bd2 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -16,6 +16,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -90,6 +91,8 @@ servers() -> %% `emqx_resource' API %%======================================================================================== +resource_type() -> rocketmq. + callback_mode() -> always_sync. on_start( diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index fdc6d255b..a7aa7eae7 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -13,6 +13,7 @@ -behaviour(emqx_resource). -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -92,6 +93,8 @@ -define(AGGREG_SUP, emqx_bridge_s3_sup). %% +-spec resource_type() -> resource_type(). +resource_type() -> s3. -spec callback_mode() -> callback_mode(). callback_mode() -> diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src index 3bc62734c..009a8d16b 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_sqlserver, [ {description, "EMQX Enterprise SQL Server Bridge"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, odbc]}, {env, [ diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 603ef18d0..e14e395c0 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -30,6 +30,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -173,6 +174,7 @@ server() -> %%==================================================================== %% Callbacks defined in emqx_resource %%==================================================================== +resource_type() -> sqlserver. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src index 5ae95ca67..cd1d51b01 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_syskeeper, [ {description, "EMQX Enterprise Data bridge for Syskeeper"}, - {vsn, "0.1.3"}, + {vsn, "0.1.4"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 898915f56..c277faa4f 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -18,6 +18,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, query_mode/1, on_start/2, @@ -147,6 +148,7 @@ server() -> %% ------------------------------------------------------------------------------------------------- %% `emqx_resource' API +resource_type() -> syskeeper. callback_mode() -> always_sync. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl index e26ae43c1..d0aa44cd3 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl @@ -12,6 +12,7 @@ %% `emqx_resource' API -export([ + resource_type/0, query_mode/1, on_start/2, on_stop/2, @@ -40,6 +41,8 @@ %% ------------------------------------------------------------------------------------------------- %% emqx_resource +resource_type() -> + syskeeper_proxy_server. query_mode(_) -> no_queries. diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index d358ba8fa..5fe325b38 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_tdengine, [ {description, "EMQX Enterprise TDEngine Bridge"}, - {vsn, "0.2.1"}, + {vsn, "0.2.2"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 324694edc..46980e768 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -19,6 +19,7 @@ %% `emqx_resource' API -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -140,6 +141,7 @@ connector_example_values() -> %%======================================================================================== %% `emqx_resource' API %%======================================================================================== +resource_type() -> tdengine. callback_mode() -> always_sync. diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl index 4b29dd5ce..0004cd72c 100644 --- a/apps/emqx_connector/include/emqx_connector.hrl +++ b/apps/emqx_connector/include/emqx_connector.hrl @@ -37,4 +37,4 @@ "The " ++ TYPE ++ " default port " ++ DEFAULT_PORT ++ " is used if `[:Port]` is not specified." ). --define(CONNECTOR_RESOURCE_GROUP, <<"emqx_connector">>). +-define(CONNECTOR_RESOURCE_GROUP, <<"connector">>). diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index be8d3a32d..50b2132e2 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -18,6 +18,7 @@ -include("../../emqx_bridge/include/emqx_bridge_resource.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include("emqx_connector.hrl"). -export([ connector_to_resource_type/1, @@ -126,7 +127,7 @@ create(Type, Name, Conf0, Opts) -> Conf = Conf0#{connector_type => TypeBin, connector_name => Name}, {ok, _Data} = emqx_resource:create_local( ResourceId, - <<"emqx_connector">>, + ?CONNECTOR_RESOURCE_GROUP, ?MODULE:connector_to_resource_type(Type), parse_confs(TypeBin, Name, Conf), parse_opts(Conf, Opts) @@ -208,7 +209,7 @@ create_dry_run(Type, Conf) -> create_dry_run(Type, Conf, fun(_) -> ok end). create_dry_run(Type, Conf0, Callback) -> - %% Already typechecked, no need to catch errors + %% Already type checked, no need to catch errors TypeBin = bin(Type), TypeAtom = safe_atom(Type), %% We use a fixed name here to avoid creating an atom diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index fbdece6ff..8e5a6d288 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -50,6 +50,7 @@ t_connector_lifecycle({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -171,6 +172,7 @@ t_remove_fail({'init', Config}) -> meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_get_channels, 1, [{<<"my_channel">>, #{enable => true}}]), meck:expect(?CONNECTOR, on_add_channel, 4, {ok, connector_state}), @@ -234,6 +236,7 @@ t_create_with_bad_name_direct_path({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -265,6 +268,7 @@ t_create_with_bad_name_root_path({init, Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_stop, 2, ok), @@ -299,6 +303,7 @@ t_no_buffer_workers({'init', Config}) -> meck:new(emqx_connector_resource, [passthrough]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR), meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, resource_type, 0, dummy), meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), meck:expect(?CONNECTOR, on_get_channels, 1, []), diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index f3e91ef12..01f4fd188 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -224,6 +224,7 @@ init_mocks(_TestCase) -> meck:new(emqx_connector_resource, [passthrough, no_link]), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, ?CONNECTOR_IMPL), meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), + meck:expect(?CONNECTOR_IMPL, resource_type, 0, dummy), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect( ?CONNECTOR_IMPL, diff --git a/apps/emqx_connector/test/emqx_connector_dummy_impl.erl b/apps/emqx_connector/test/emqx_connector_dummy_impl.erl index c5d9e4f83..d506c9633 100644 --- a/apps/emqx_connector/test/emqx_connector_dummy_impl.erl +++ b/apps/emqx_connector/test/emqx_connector_dummy_impl.erl @@ -15,8 +15,10 @@ %% this module is only intended to be mocked -module(emqx_connector_dummy_impl). +-behavior(emqx_resource). -export([ + resource_type/0, query_mode/1, callback_mode/0, on_start/2, @@ -25,6 +27,7 @@ on_get_channel_status/3 ]). +resource_type() -> dummy. query_mode(_) -> error(unexpected). callback_mode() -> error(unexpected). on_start(_, _) -> error(unexpected). diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl index 6834da9e9..60fec5171 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_manager.erl @@ -45,7 +45,7 @@ -define(MOD_TAB, emqx_dashboard_sso). -define(MOD_KEY_PATH, [dashboard, sso]). -define(MOD_KEY_PATH(Sub), [dashboard, sso, Sub]). --define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). +-define(RESOURCE_GROUP, <<"dashboard_sso">>). -define(NO_ERROR, <<>>). -define(DEFAULT_RESOURCE_OPTS, #{ start_after_created => false diff --git a/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl b/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl index 51524f0fd..40c5de9e5 100644 --- a/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl +++ b/apps/emqx_dashboard_sso/test/emqx_dashboard_sso_ldap_SUITE.erl @@ -24,7 +24,7 @@ -define(MOD_TAB, emqx_dashboard_sso). -define(MOD_KEY_PATH, [dashboard, sso, ldap]). --define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). +-define(RESOURCE_GROUP, <<"dashboard_sso">>). -import(emqx_mgmt_api_test_util, [request/2, request/3, uri/1, request_api/3]). diff --git a/apps/emqx_ldap/src/emqx_ldap.app.src b/apps/emqx_ldap/src/emqx_ldap.app.src index b0d8ec59c..1b1b667cc 100644 --- a/apps/emqx_ldap/src/emqx_ldap.app.src +++ b/apps/emqx_ldap/src/emqx_ldap.app.src @@ -1,6 +1,6 @@ {application, emqx_ldap, [ {description, "EMQX LDAP Connector"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_ldap/src/emqx_ldap.erl b/apps/emqx_ldap/src/emqx_ldap.erl index d04be5d68..67b250420 100644 --- a/apps/emqx_ldap/src/emqx_ldap.erl +++ b/apps/emqx_ldap/src/emqx_ldap.erl @@ -27,6 +27,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -129,6 +130,8 @@ ensure_username(Field) -> emqx_connector_schema_lib:username(Field). %% =================================================================== +resource_type() -> ldap. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_mongodb/src/emqx_mongodb.app.src b/apps/emqx_mongodb/src/emqx_mongodb.app.src index 92d7026cc..51230fd47 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.app.src +++ b/apps/emqx_mongodb/src/emqx_mongodb.app.src @@ -1,6 +1,6 @@ {application, emqx_mongodb, [ {description, "EMQX MongoDB Connector"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mongodb/src/emqx_mongodb.erl b/apps/emqx_mongodb/src/emqx_mongodb.erl index e262f5ccd..8d6fad89f 100644 --- a/apps/emqx_mongodb/src/emqx_mongodb.erl +++ b/apps/emqx_mongodb/src/emqx_mongodb.erl @@ -26,6 +26,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -172,6 +173,7 @@ desc(_) -> undefined. %% =================================================================== +resource_type() -> mongodb. callback_mode() -> always_sync. diff --git a/apps/emqx_mysql/src/emqx_mysql.app.src b/apps/emqx_mysql/src/emqx_mysql.app.src index 9637cc473..c7fcb0975 100644 --- a/apps/emqx_mysql/src/emqx_mysql.app.src +++ b/apps/emqx_mysql/src/emqx_mysql.app.src @@ -1,6 +1,6 @@ {application, emqx_mysql, [ {description, "EMQX MySQL Database Connector"}, - {vsn, "0.1.9"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index 6311d66f2..197e33d75 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -25,6 +25,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -91,6 +92,8 @@ server() -> emqx_schema:servers_sc(Meta, ?MYSQL_HOST_OPTIONS). %% =================================================================== +resource_type() -> mysql. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_oracle/src/emqx_oracle.app.src b/apps/emqx_oracle/src/emqx_oracle.app.src index 3f238ae9c..80ff8da09 100644 --- a/apps/emqx_oracle/src/emqx_oracle.app.src +++ b/apps/emqx_oracle/src/emqx_oracle.app.src @@ -1,6 +1,6 @@ {application, emqx_oracle, [ {description, "EMQX Enterprise Oracle Database Connector"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_oracle/src/emqx_oracle.erl b/apps/emqx_oracle/src/emqx_oracle.erl index 5b25e049a..6e2a40f95 100644 --- a/apps/emqx_oracle/src/emqx_oracle.erl +++ b/apps/emqx_oracle/src/emqx_oracle.erl @@ -21,6 +21,7 @@ %% callbacks for behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -67,6 +68,8 @@ batch_params_tokens := params_tokens() }. +resource_type() -> oracle. + % As ecpool is not monitoring the worker's PID when doing a handover_async, the % request can be lost if worker crashes. Thus, it's better to force requests to % be sync for now. diff --git a/apps/emqx_postgresql/src/emqx_postgresql.app.src b/apps/emqx_postgresql/src/emqx_postgresql.app.src index 7aaf42e71..e1bd67325 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.app.src +++ b/apps/emqx_postgresql/src/emqx_postgresql.app.src @@ -1,6 +1,6 @@ {application, emqx_postgresql, [ {description, "EMQX PostgreSQL Database Connector"}, - {vsn, "0.2.2"}, + {vsn, "0.2.3"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_postgresql/src/emqx_postgresql.erl b/apps/emqx_postgresql/src/emqx_postgresql.erl index 7e64a3e83..a061fc15e 100644 --- a/apps/emqx_postgresql/src/emqx_postgresql.erl +++ b/apps/emqx_postgresql/src/emqx_postgresql.erl @@ -29,6 +29,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -120,6 +121,8 @@ adjust_fields(Fields) -> ). %% =================================================================== +resource_type() -> pgsql. + callback_mode() -> always_sync. -spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}. diff --git a/apps/emqx_redis/src/emqx_redis.erl b/apps/emqx_redis/src/emqx_redis.erl index 059e9aa23..9507913ed 100644 --- a/apps/emqx_redis/src/emqx_redis.erl +++ b/apps/emqx_redis/src/emqx_redis.erl @@ -28,6 +28,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -119,6 +120,8 @@ redis_type(Type) -> desc => ?DESC(Type) }}. +resource_type() -> redis. + callback_mode() -> always_sync. on_start(InstId, Config0) -> diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 587786cb2..8c2bb39a1 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -23,7 +23,8 @@ %% remind us of that. -define(rm_status_stopped, stopped). --type resource_type() :: module(). +-type resource_type() :: atom(). +-type resource_module() :: module(). -type resource_id() :: binary(). -type channel_id() :: binary(). -type raw_resource_config() :: binary() | raw_term_resource_config(). @@ -158,5 +159,12 @@ %% See `hocon_tconf` -define(TEST_ID_PREFIX, "t_probe_"). -define(RES_METRICS, resource_metrics). +-define(LOG_LEVEL(_L_), + case _L_ of + true -> info; + false -> warning + end +). +-define(TAG, "RESOURCE"). -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 15c0a0293..721df9690 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -39,12 +39,10 @@ -export([ %% store the config and start the instance - create_local/4, create_local/5, create_dry_run_local/2, create_dry_run_local/3, create_dry_run_local/4, - recreate_local/3, recreate_local/4, %% remove the config and stop the instance remove_local/1, @@ -98,6 +96,7 @@ -export([ %% get the callback mode of a specific module get_callback_mode/1, + get_resource_type/1, %% start the instance call_start/3, %% verify if the resource is working normally @@ -140,6 +139,8 @@ validate_name/1 ]). +-export([is_dry_run/1]). + -export_type([ query_mode/0, resource_id/0, @@ -279,16 +280,10 @@ is_resource_mod(Module) -> %% ================================================================================= %% APIs for resource instances %% ================================================================================= - --spec create_local(resource_id(), resource_group(), resource_type(), resource_config()) -> - {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create_local(ResId, Group, ResourceType, Config) -> - create_local(ResId, Group, ResourceType, Config, #{}). - -spec create_local( resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> @@ -296,7 +291,7 @@ create_local(ResId, Group, ResourceType, Config) -> create_local(ResId, Group, ResourceType, Config, Opts) -> emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts). --spec create_dry_run_local(resource_type(), resource_config()) -> +-spec create_dry_run_local(resource_module(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> emqx_resource_manager:create_dry_run(ResourceType, Config). @@ -304,19 +299,21 @@ create_dry_run_local(ResourceType, Config) -> create_dry_run_local(ResId, ResourceType, Config) -> emqx_resource_manager:create_dry_run(ResId, ResourceType, Config). --spec create_dry_run_local(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> +-spec create_dry_run_local( + resource_id(), + resource_module(), + resource_config(), + OnReadyCallback +) -> ok | {error, Reason :: term()} when OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) -> emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback). --spec recreate_local(resource_id(), resource_type(), resource_config()) -> - {ok, resource_data()} | {error, Reason :: term()}. -recreate_local(ResId, ResourceType, Config) -> - recreate_local(ResId, ResourceType, Config, #{}). - --spec recreate_local(resource_id(), resource_type(), resource_config(), creation_opts()) -> +-spec recreate_local( + resource_id(), resource_module(), resource_config(), creation_opts() +) -> {ok, resource_data()} | {error, Reason :: term()}. recreate_local(ResId, ResourceType, Config, Opts) -> emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts). @@ -330,11 +327,15 @@ remove_local(ResId) -> ok; Error -> %% Only log, the ResId worker is always removed in manager's remove action. - ?SLOG(warning, #{ - msg => "remove_local_resource_failed", - error => Error, - resource_id => ResId - }), + ?SLOG( + warning, + #{ + msg => "remove_resource_failed", + error => Error, + resource_id => ResId + }, + #{tag => ?TAG} + ), ok end. @@ -487,6 +488,10 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group). get_callback_mode(Mod) -> Mod:callback_mode(). +-spec get_resource_type(module()) -> resource_type(). +get_resource_type(Mod) -> + Mod:resource_type(). + -spec call_start(resource_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(ResId, Mod, Config) -> @@ -599,7 +604,7 @@ query_mode(Mod, Config, Opts) -> maps:get(query_mode, Opts, sync) end. --spec check_config(resource_type(), raw_resource_config()) -> +-spec check_config(resource_module(), raw_resource_config()) -> {ok, resource_config()} | {error, term()}. check_config(ResourceType, Conf) -> emqx_hocon:check(ResourceType, Conf). @@ -607,7 +612,7 @@ check_config(ResourceType, Conf) -> -spec check_and_create_local( resource_id(), resource_group(), - resource_type(), + resource_module(), raw_resource_config() ) -> {ok, resource_data()} | {error, term()}. @@ -617,7 +622,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig) -> -spec check_and_create_local( resource_id(), resource_group(), - resource_type(), + resource_module(), raw_resource_config(), creation_opts() ) -> {ok, resource_data()} | {error, term()}. @@ -630,7 +635,7 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) -> -spec check_and_recreate_local( resource_id(), - resource_type(), + resource_module(), raw_resource_config(), creation_opts() ) -> @@ -769,6 +774,13 @@ validate_name(Name) -> _ = validate_name(Name, #{atom_name => false}), ok. +-spec is_dry_run(resource_id()) -> boolean(). +is_dry_run(ResId) -> + case string:find(ResId, ?TEST_ID_PREFIX) of + nomatch -> false; + TestIdStart -> string:equal(TestIdStart, ResId) + end. + validate_name(<<>>, _Opts) -> invalid_data("Name cannot be empty string"); validate_name(Name, _Opts) when size(Name) >= 255 -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 50a25620c..fe674630c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -75,6 +75,7 @@ -record(data, { id, group, + type, mod, callback_mode, query_mode, @@ -163,7 +164,7 @@ -spec ensure_resource( resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> {ok, resource_data()}. @@ -176,7 +177,9 @@ ensure_resource(ResId, Group, ResourceType, Config, Opts) -> end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist --spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> +-spec recreate( + resource_id(), resource_module(), resource_config(), creation_opts() +) -> {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. recreate(ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of @@ -219,8 +222,8 @@ create(ResId, Group, ResourceType, Config, Opts) -> %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% %% Triggers the `emqx_resource_manager_sup` supervisor to actually create -%% and link the process itself if not already started, and then immedately stops. --spec create_dry_run(resource_type(), resource_config()) -> +%% and link the process itself if not already started, and then immediately stops. +-spec create_dry_run(resource_module(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> ResId = make_test_id(), @@ -232,7 +235,9 @@ create_dry_run(ResId, ResourceType, Config) -> do_nothing_on_ready(_ResId) -> ok. --spec create_dry_run(resource_id(), resource_type(), resource_config(), OnReadyCallback) -> +-spec create_dry_run( + resource_id(), resource_module(), resource_config(), OnReadyCallback +) -> ok | {error, Reason :: term()} when OnReadyCallback :: fun((resource_id()) -> ok | {error, Reason :: term()}). @@ -242,7 +247,9 @@ create_dry_run(ResId, ResourceType, Config, OnReadyCallback) -> true -> maps:get(resource_opts, Config, #{}); false -> #{} end, - ok = emqx_resource_manager_sup:ensure_child(ResId, <<"dry_run">>, ResourceType, Config, Opts), + ok = emqx_resource_manager_sup:ensure_child( + ResId, <<"dry_run">>, ResourceType, Config, Opts + ), HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), Timeout = emqx_utils:clamp(HealthCheckInterval, 5_000, 60_000), case wait_for_ready(ResId, Timeout) of @@ -499,6 +506,7 @@ start_link(ResId, Group, ResourceType, Config, Opts) -> ), Data = #data{ id = ResId, + type = emqx_resource:get_resource_type(ResourceType), group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), @@ -683,11 +691,13 @@ handle_event(EventType, EventData, State, Data) -> error, #{ msg => "ignore_all_other_events", + resource_id => Data#data.id, event_type => EventType, event_data => EventData, state => State, data => emqx_utils:redact(Data) - } + }, + #{tag => tag(Data#data.group, Data#data.type)} ), keep_state_and_data. @@ -752,7 +762,8 @@ handle_remove_event(From, ClearMetrics, Data) -> start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of + #data{id = ResId, mod = Mod, config = Config, group = Group, type = Type} = Data, + case emqx_resource:call_start(ResId, Mod, Config) of {ok, ResourceState} -> UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState}, %% Perform an initial health_check immediately before transitioning into a connected state @@ -760,12 +771,17 @@ start_resource(Data, From) -> Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), {next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions}; {error, Reason} = Err -> - ?SLOG(warning, #{ - msg => "start_resource_failed", - id => Data#data.id, - reason => Reason - }), - _ = maybe_alarm(?status_disconnected, Data#data.id, Err, Data#data.error), + IsDryRun = emqx_resource:is_dry_run(ResId), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "start_resource_failed", + resource_id => ResId, + reason => Reason + }, + #{tag => tag(Group, Type)} + ), + _ = maybe_alarm(?status_disconnected, IsDryRun, ResId, Err, Data#data.error), %% Add channels and raise alarms NewData1 = channels_health_check(?status_disconnected, add_channels(Data)), %% Keep track of the error reason why the connection did not work @@ -796,13 +812,20 @@ add_channels(Data) -> add_channels_in_list([], Data) -> Data; add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> + #data{ + id = ResId, + mod = Mod, + state = State, + added_channels = AddedChannelsMap, + group = Group, + type = Type + } = Data, case emqx_resource:call_add_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelID, ChannelConfig + ResId, Mod, State, ChannelID, ChannelConfig ) of {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, %% Set the channel status to connecting to indicate that %% we have not yet performed the initial health_check NewAddedChannelsMap = maps:put( @@ -816,12 +839,17 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> }, add_channels_in_list(Rest, NewData); {error, Reason} = Error -> - ?SLOG(warning, #{ - msg => add_channel_failed, - id => Data#data.id, - channel_id => ChannelID, - reason => Reason - }), + IsDryRun = emqx_resource:is_dry_run(ResId), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "add_channel_failed", + resource_id => ResId, + channel_id => ChannelID, + reason => Reason + }, + #{tag => tag(Group, Type)} + ), AddedChannelsMap = Data#data.added_channels, NewAddedChannelsMap = maps:put( ChannelID, @@ -832,7 +860,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> added_channels = NewAddedChannelsMap }, %% Raise an alarm since the channel could not be added - _ = maybe_alarm(?status_disconnected, ChannelID, Error, no_prev_error), + _ = maybe_alarm(?status_disconnected, IsDryRun, ChannelID, Error, no_prev_error), add_channels_in_list(Rest, NewData) end. @@ -856,7 +884,8 @@ stop_resource(#data{id = ResId} = Data) -> false -> ok end, - _ = maybe_clear_alarm(ResId), + IsDryRun = emqx_resource:is_dry_run(ResId), + _ = maybe_clear_alarm(IsDryRun, ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), NewData#data{status = ?rm_status_stopped}. @@ -867,16 +896,24 @@ remove_channels(Data) -> remove_channels_in_list([], Data, _KeepInChannelMap) -> Data; remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> - AddedChannelsMap = Data#data.added_channels, + #data{ + id = ResId, + added_channels = AddedChannelsMap, + mod = Mod, + state = State, + group = Group, + type = Type + } = Data, + IsDryRun = emqx_resource:is_dry_run(ResId), NewAddedChannelsMap = case KeepInChannelMap of true -> AddedChannelsMap; false -> - _ = maybe_clear_alarm(ChannelID), + _ = maybe_clear_alarm(IsDryRun, ChannelID), maps:remove(ChannelID, AddedChannelsMap) end, - case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of + case safe_call_remove_channel(ResId, Mod, State, ChannelID) of {ok, NewState} -> NewData = Data#data{ state = NewState, @@ -884,12 +921,18 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> }, remove_channels_in_list(Rest, NewData, KeepInChannelMap); {error, Reason} -> - ?SLOG(warning, #{ - msg => remove_channel_failed, - id => Data#data.id, - channel_id => ChannelID, - reason => Reason - }), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "remove_channel_failed", + resource_id => ResId, + group => Group, + type => Type, + channel_id => ChannelID, + reason => Reason + }, + #{tag => tag(Group, Type)} + ), NewData = Data#data{ added_channels = NewAddedChannelsMap }, @@ -968,8 +1011,8 @@ handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) -> handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, - %% Deactivate alarm - _ = maybe_clear_alarm(ChannelId), + IsDryRun = emqx_resource:is_dry_run(Data#data.id), + _ = maybe_clear_alarm(IsDryRun, ChannelId), case channel_status_is_channel_added( maps:get(ChannelId, Channels, channel_status_not_added(undefined)) @@ -990,13 +1033,18 @@ handle_remove_channel(From, ChannelId, Data) -> end. handle_remove_channel_exists(From, ChannelId, Data) -> + #data{ + id = Id, + group = Group, + type = Type, + added_channels = AddedChannelsMap + } = Data, case emqx_resource:call_remove_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelId + Id, Data#data.mod, Data#data.state, ChannelId ) of {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, NewAddedChannelsMap = maps:remove(ChannelId, AddedChannelsMap), UpdatedData = Data#data{ state = NewState, @@ -1004,13 +1052,17 @@ handle_remove_channel_exists(From, ChannelId, Data) -> }, {keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]}; {error, Reason} = Error -> - %% Log the error as a warning - ?SLOG(warning, #{ - msg => remove_channel_failed, - id => Data#data.id, - channel_id => ChannelId, - reason => Reason - }), + IsDryRun = emqx_resource:is_dry_run(Id), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "remove_channel_failed", + resource_id => Id, + channel_id => ChannelId, + reason => Reason + }, + #{tag => tag(Group, Type)} + ), {keep_state_and_data, [{reply, From, Error}]} end. @@ -1021,7 +1073,8 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, NewChannels = maps:remove(ChannelId, Channels), NewData = Data#data{added_channels = NewChannels}, - _ = maybe_clear_alarm(ChannelId), + IsDryRun = emqx_resource:is_dry_run(Data#data.id), + _ = maybe_clear_alarm(IsDryRun, ChannelId), {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when @@ -1090,7 +1143,8 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) -> error = PrevError } = Data0, {NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0), - _ = maybe_alarm(NewStatus, ResId, Err, PrevError), + IsDryRun = emqx_resource:is_dry_run(ResId), + _ = maybe_alarm(NewStatus, IsDryRun, ResId, Err, PrevError), ok = maybe_resume_resource_workers(ResId, NewStatus), Data1 = Data0#data{ state = NewState, status = NewStatus, error = Err @@ -1114,11 +1168,17 @@ continue_resource_health_check_connected(NewStatus, Data0) -> Actions = Replies ++ resource_health_check_actions(Data), {keep_state, Data, Actions}; _ -> - ?SLOG(warning, #{ - msg => "health_check_failed", - id => Data0#data.id, - status => NewStatus - }), + #data{id = ResId, group = Group, type = Type} = Data0, + IsDryRun = emqx_resource:is_dry_run(ResId), + ?SLOG( + log_level(IsDryRun), + #{ + msg => "health_check_failed", + resource_id => ResId, + status => NewStatus + }, + #{tag => tag(Group, Type)} + ), %% Note: works because, coincidentally, channel/resource status is a %% subset of resource manager state... But there should be a conversion %% between the two here, as resource manager also has `stopped', which is @@ -1214,7 +1274,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) -> channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> %% Whenever the resource is connecting: %% 1. Change the status of all added channels to connecting - %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) + %% 2. Raise alarms Channels = Data0#data.added_channels, ChannelsToChangeStatusFor = [ {ChannelId, Config} @@ -1240,9 +1300,10 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> || {ChannelId, NewStatus} <- maps:to_list(NewChannels) ], %% Raise alarms for all channels + IsDryRun = emqx_resource:is_dry_run(Data0#data.id), lists:foreach( fun({ChannelId, Status, PrevStatus}) -> - maybe_alarm(?status_connecting, ChannelId, Status, PrevStatus) + maybe_alarm(?status_connecting, IsDryRun, ChannelId, Status, PrevStatus) end, ChannelsWithNewAndPrevErrorStatuses ), @@ -1275,9 +1336,10 @@ channels_health_check(ConnectorStatus, Data0) -> || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels) ], %% Raise alarms + IsDryRun = emqx_resource:is_dry_run(Data1#data.id), _ = lists:foreach( fun({ChannelId, OldStatus, NewStatus}) -> - _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus) + _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus) end, ChannelsWithNewAndOldStatuses ), @@ -1386,13 +1448,14 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta NewStatus = maps:get(ChannelId, Data1#data.added_channels), ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], Data = remove_channels_in_list(ChannelsToRemove, Data1, true), + IsDryRun = emqx_resource:is_dry_run(Data1#data.id), %% Raise/clear alarms case NewStatus of #{status := ?status_connected} -> - _ = maybe_clear_alarm(ChannelId), + _ = maybe_clear_alarm(IsDryRun, ChannelId), ok; _ -> - _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus), + _ = maybe_alarm(NewStatus, IsDryRun, ChannelId, NewStatus, OldStatus), ok end, Data. @@ -1556,15 +1619,21 @@ remove_runtime_data(#data{} = Data0) -> health_check_interval(Opts) -> maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). --spec maybe_alarm(resource_status(), resource_id(), _Error :: term(), _PrevError :: term()) -> ok. -maybe_alarm(?status_connected, _ResId, _Error, _PrevError) -> +-spec maybe_alarm( + resource_status(), + boolean(), + resource_id(), + _Error :: term(), + _PrevError :: term() +) -> ok. +maybe_alarm(?status_connected, _IsDryRun, _ResId, _Error, _PrevError) -> ok; -maybe_alarm(_Status, <>, _Error, _PrevError) -> +maybe_alarm(_Status, true, _ResId, _Error, _PrevError) -> ok; %% Assume that alarm is already active -maybe_alarm(_Status, _ResId, Error, Error) -> +maybe_alarm(_Status, _IsDryRun, _ResId, Error, Error) -> ok; -maybe_alarm(_Status, ResId, Error, _PrevError) -> +maybe_alarm(_Status, false, ResId, Error, _PrevError) -> HrError = case Error of {error, undefined} -> @@ -1596,10 +1665,10 @@ maybe_resume_resource_workers(ResId, ?status_connected) -> maybe_resume_resource_workers(_, _) -> ok. --spec maybe_clear_alarm(resource_id()) -> ok | {error, not_found}. -maybe_clear_alarm(<>) -> +-spec maybe_clear_alarm(boolean(), resource_id()) -> ok | {error, not_found}. +maybe_clear_alarm(true, _ResId) -> ok; -maybe_clear_alarm(ResId) -> +maybe_clear_alarm(false, ResId) -> emqx_alarm:safe_deactivate(ResId). parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> @@ -1615,7 +1684,8 @@ parse_health_check_result({error, Error}, Data) -> msg => "health_check_exception", resource_id => Data#data.id, reason => Error - } + }, + #{tag => tag(Data#data.group, Data#data.type)} ), {?status_disconnected, Data#data.state, {error, Error}}. @@ -1767,10 +1837,18 @@ add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) -> ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig), NewChannels = maps:put(ChannelId, ChannelStatus, Channels), ResStatus = state_to_status(State), - maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), + IsDryRun = emqx_resource:is_dry_run(ChannelId), + maybe_alarm(ResStatus, IsDryRun, ChannelId, ChannelStatus, no_prev), Data#data{added_channels = NewChannels}. state_to_status(?state_stopped) -> ?rm_status_stopped; state_to_status(?state_connected) -> ?status_connected; state_to_status(?state_connecting) -> ?status_connecting; state_to_status(?state_disconnected) -> ?status_disconnected. + +log_level(true) -> info; +log_level(false) -> warning. + +tag(Group, Type) -> + Str = emqx_utils_conv:str(Group) ++ "/" ++ emqx_utils_conv:str(Type), + string:uppercase(Str). diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index 7af6eca81..8542eec1c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -58,10 +58,11 @@ init([]) -> child_spec(ResId, Group, ResourceType, Config, Opts) -> #{ id => ResId, - start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, + start => + {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]}, restart => transient, %% never force kill a resource manager. - %% becasue otherwise it may lead to release leak, + %% because otherwise it may lead to release leak, %% resource_manager's terminate callback calls resource on_stop shutdown => infinity, type => worker, diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index 97a09b074..98a74dfad 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -17,6 +17,7 @@ -module(emqx_resource_metrics). -include_lib("emqx/include/logger.hrl"). +-include("emqx_resource.hrl"). -export([ events/0, @@ -74,7 +75,6 @@ success_get/1 ]). --define(RES_METRICS, resource_metrics). -define(TELEMETRY_PREFIX, emqx, resource). -spec events() -> [telemetry:event_name()]. @@ -127,15 +127,19 @@ handle_telemetry_event( %% We catch errors to avoid detaching the telemetry handler function. %% When restarting a resource while it's under load, there might be transient %% failures while the metrics are not yet created. - ?SLOG(warning, #{ - msg => "handle_resource_metrics_failed", - hint => "transient failures may occur when restarting a resource", - kind => Kind, - reason => Reason, - stacktrace => Stacktrace, - resource_id => ID, - event => Event - }), + ?SLOG( + warning, + #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }, + #{tag => ?TAG} + ), ok end; handle_telemetry_event( @@ -151,15 +155,19 @@ handle_telemetry_event( %% We catch errors to avoid detaching the telemetry handler function. %% When restarting a resource while it's under load, there might be transient %% failures while the metrics are not yet created. - ?SLOG(warning, #{ - msg => "handle_resource_metrics_failed", - hint => "transient failures may occur when restarting a resource", - kind => Kind, - reason => Reason, - stacktrace => Stacktrace, - resource_id => ID, - event => Event - }), + ?SLOG( + warning, + #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }, + #{tag => ?TAG} + ), ok end; handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> diff --git a/apps/emqx_resource/src/emqx_resource_pool.erl b/apps/emqx_resource/src/emqx_resource_pool.erl index 47e7ed7ff..ba286e35c 100644 --- a/apps/emqx_resource/src/emqx_resource_pool.erl +++ b/apps/emqx_resource/src/emqx_resource_pool.erl @@ -26,6 +26,7 @@ ]). -include_lib("emqx/include/logger.hrl"). +-include("emqx_resource.hrl"). -ifndef(TEST). -define(HEALTH_CHECK_TIMEOUT, 15000). @@ -37,33 +38,43 @@ start(Name, Mod, Options) -> case ecpool:start_sup_pool(Name, Mod, Options) of {ok, _} -> - ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}), + ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}, #{tag => ?TAG}), ok; {error, {already_started, _Pid}} -> stop(Name), start(Name, Mod, Options); {error, Reason} -> NReason = parse_reason(Reason), - ?SLOG(error, #{ - msg => "start_ecpool_error", - pool_name => Name, - reason => NReason - }), + IsDryRun = emqx_resource:is_dry_run(Name), + ?SLOG( + ?LOG_LEVEL(IsDryRun), + #{ + msg => "start_ecpool_error", + resource_id => Name, + reason => NReason + }, + #{tag => ?TAG} + ), {error, {start_pool_failed, Name, NReason}} end. stop(Name) -> case ecpool:stop_sup_pool(Name) of ok -> - ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}); + ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}, #{tag => ?TAG}); {error, not_found} -> ok; {error, Reason} -> - ?SLOG(error, #{ - msg => "stop_ecpool_failed", - pool_name => Name, - reason => Reason - }), + IsDryRun = emqx_resource:is_dry_run(Name), + ?SLOG( + ?LOG_LEVEL(IsDryRun), + #{ + msg => "stop_ecpool_failed", + resource_id => Name, + reason => Reason + }, + #{tag => ?TAG} + ), error({stop_pool_failed, Name, Reason}) end. diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index 859b9fa52..47b724271 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -40,7 +40,7 @@ deprecated_since() -> -spec create( resource_id(), resource_group(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> @@ -51,7 +51,7 @@ create(ResId, Group, ResourceType, Config, Opts) -> ]). -spec create_dry_run( - resource_type(), + resource_module(), resource_config() ) -> ok | {error, Reason :: term()}. @@ -60,7 +60,7 @@ create_dry_run(ResourceType, Config) -> -spec recreate( resource_id(), - resource_type(), + resource_module(), resource_config(), creation_opts() ) -> diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 0fc11cc66..e068defb1 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -24,6 +24,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -62,6 +63,8 @@ register(required) -> true; register(default) -> false; register(_) -> undefined. +resource_type() -> demo. + callback_mode() -> persistent_term:get(?CM_KEY). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index af9abe95b..b8fc66c10 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -3490,7 +3490,7 @@ gauge_metric_set_fns() -> ]. create(Id, Group, Type, Config) -> - emqx_resource:create_local(Id, Group, Type, Config). + emqx_resource:create_local(Id, Group, Type, Config, #{}). create(Id, Group, Type, Config, Opts) -> emqx_resource:create_local(Id, Group, Type, Config, Opts). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl index 6a0d6b3ec..cf5d3afbe 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl @@ -25,6 +25,7 @@ %% callbacks of behaviour emqx_resource -export([ + resource_type/0, callback_mode/0, on_start/2, on_stop/2, @@ -40,6 +41,8 @@ ]). %% =================================================================== +resource_type() -> test_connector. + callback_mode() -> always_sync. on_start(