diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 863a1d936..9308bbe95 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -134,7 +134,7 @@ create(#{method := Method, emqx_connector_http, Config#{base_url => maps:remove(query, URIMap), pool_type => random}, - #{wait_connected => 1000}) of + #{waiting_connect_complete => 5000}) of {ok, already_created} -> {ok, State}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index fc444d6f2..158306a87 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -116,7 +116,7 @@ create(#{selector := Selector} = Config) -> ?RESOURCE_GROUP, emqx_connector_mongo, Config, - #{wait_connected => 1000}) of + #{waiting_connect_complete => 5000}) of {ok, already_created} -> {ok, NState}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 62ff83b1e..b347ff30c 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -81,7 +81,11 @@ create(#{password_hash_algorithm := Algorithm, placeholders => PlaceHolders, query_timeout => QueryTimeout, resource_id => ResourceId}, - case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_mysql, Config, #{wait_connected => 1000}) of + case emqx_resource:create_local(ResourceId, + ?RESOURCE_GROUP, + emqx_connector_mysql, + Config, + #{waiting_connect_complete => 5000}) of {ok, already_created} -> {ok, State}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 550a6cfe1..856271db3 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -81,7 +81,7 @@ create(#{query := Query0, resource_id => ResourceId}, case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}, - #{wait_connected => 1000}) of + #{waiting_connect_complete => 5000}) of {ok, already_created} -> {ok, State}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index 389def83d..e604acfe3 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -93,7 +93,7 @@ create(#{cmd := Cmd, resource_id => ResourceId}, case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_redis, Config, - #{wait_connected => 1000}) of + #{waiting_connect_complete => 5000}) of {ok, already_created} -> {ok, NState}; {ok, _} -> diff --git a/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl index c471c1f8b..8ad0caa6d 100644 --- a/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mongo_tls_SUITE.erl @@ -91,9 +91,9 @@ t_create_invalid_server_name(_Config) -> create_mongo_auth_with_ssl_opts( #{<<"server_name_indication">> => <<"authn-server-unknown-host">>, <<"verify">> => <<"verify_peer">>}), - fun({ok, _}, Trace) -> - ?assertEqual( - [failed], + fun(_, Trace) -> + ?assertNotEqual( + [ok], ?projection( status, ?of_kind(emqx_connector_mongo_health_check, Trace))) @@ -109,9 +109,9 @@ t_create_invalid_version(_Config) -> #{<<"server_name_indication">> => <<"authn-server">>, <<"verify">> => <<"verify_peer">>, <<"versions">> => [<<"tlsv1.1">>]}), - fun({ok, _}, Trace) -> - ?assertEqual( - [failed], + fun(_, Trace) -> + ?assertNotEqual( + [ok], ?projection( status, ?of_kind(emqx_connector_mongo_health_check, Trace))) @@ -128,9 +128,9 @@ t_invalid_ciphers(_Config) -> <<"verify">> => <<"verify_peer">>, <<"versions">> => [<<"tlsv1.2">>], <<"ciphers">> => [<<"DHE-RSA-AES256-GCM-SHA384">>]}), - fun({ok, _}, Trace) -> - ?assertEqual( - [failed], + fun(_, Trace) -> + ?assertNotEqual( + [ok], ?projection( status, ?of_kind(emqx_connector_mongo_health_check, Trace))) @@ -142,7 +142,9 @@ t_invalid_ciphers(_Config) -> create_mongo_auth_with_ssl_opts(SpecificSSLOpts) -> AuthConfig = raw_mongo_auth_config(SpecificSSLOpts), - emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, AuthConfig}). + Res = emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, AuthConfig}), + timer:sleep(500), + Res. raw_mongo_auth_config(SpecificSSLOpts) -> SSLOpts = maps:merge( diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index b2a69b4c8..4199564c2 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -28,6 +28,7 @@ -define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>). -define(PATH, [authentication]). +-define(ResourceID, <<"password-based:mysql">>). all() -> [{group, require_seeds}, t_create, t_create_invalid]. @@ -61,7 +62,8 @@ init_per_suite(Config) -> ?MYSQL_RESOURCE, ?RESOURCE_GROUP, emqx_connector_mysql, - mysql_config()), + mysql_config(), + #{waiting_connect_complete => 5000}), Config; false -> {skip, no_mysql} @@ -86,7 +88,8 @@ t_create(_Config) -> ?PATH, {create_authenticator, ?GLOBAL, AuthConfig}), - {ok, [#{provider := emqx_authn_mysql}]} = emqx_authentication:list_authenticators(?GLOBAL). + {ok, [#{provider := emqx_authn_mysql}]} = emqx_authentication:list_authenticators(?GLOBAL), + emqx_authn_test_lib:delete_config(?ResourceID). t_create_invalid(_Config) -> AuthConfig = raw_mysql_auth_config(), @@ -104,8 +107,8 @@ t_create_invalid(_Config) -> {ok, _} = emqx:update_config( ?PATH, {create_authenticator, ?GLOBAL, Config}), - - {ok, []} = emqx_authentication:list_authenticators(?GLOBAL) + emqx_authn_test_lib:delete_config(?ResourceID), + {ok, _} = emqx_authentication:list_authenticators(?GLOBAL) end, InvalidConfigs). diff --git a/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl index ce9e04577..51bf9d235 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_tls_SUITE.erl @@ -27,6 +27,7 @@ -define(MYSQL_HOST, "mysql-tls"). -define(PATH, [authentication]). +-define(ResourceID, <<"password-based:mysql">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -84,7 +85,7 @@ t_create_invalid(_Config) -> create_mysql_auth_with_ssl_opts( #{<<"server_name_indication">> => <<"authn-server-unknown-host">>, <<"verify">> => <<"verify_peer">>})), - + emqx_authn_test_lib:delete_config(?ResourceID), %% incompatible versions ?assertMatch( {ok, _}, @@ -92,7 +93,7 @@ t_create_invalid(_Config) -> #{<<"server_name_indication">> => <<"authn-server">>, <<"verify">> => <<"verify_peer">>, <<"versions">> => [<<"tlsv1.1">>]})), - + emqx_authn_test_lib:delete_config(?ResourceID), %% incompatible ciphers ?assertMatch( {ok, _}, diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index fecd3d6a4..a4d6f8c07 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -27,6 +27,7 @@ -define(PGSQL_HOST, "pgsql"). -define(PGSQL_RESOURCE, <<"emqx_authn_pgsql_SUITE">>). +-define(ResourceID, <<"password-based:postgresql">>). -define(PATH, [authentication]). @@ -63,7 +64,7 @@ init_per_suite(Config) -> ?RESOURCE_GROUP, emqx_connector_pgsql, pgsql_config(), - #{wait_connected => 1000}), + #{waiting_connect_complete => 5000}), Config; false -> {skip, no_pgsql} @@ -88,7 +89,8 @@ t_create(_Config) -> ?PATH, {create_authenticator, ?GLOBAL, AuthConfig}), - {ok, [#{provider := emqx_authn_pgsql}]} = emqx_authentication:list_authenticators(?GLOBAL). + {ok, [#{provider := emqx_authn_pgsql}]} = emqx_authentication:list_authenticators(?GLOBAL), + emqx_authn_test_lib:delete_config(?ResourceID). t_create_invalid(_Config) -> AuthConfig = raw_pgsql_auth_config(), @@ -103,10 +105,10 @@ t_create_invalid(_Config) -> lists:foreach( fun(Config) -> - {error, _} = emqx:update_config( + {ok, _} = emqx:update_config( ?PATH, {create_authenticator, ?GLOBAL, Config}), - + emqx_authn_test_lib:delete_config(?ResourceID), {ok, []} = emqx_authentication:list_authenticators(?GLOBAL) end, InvalidConfigs). diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl index 0c21badc9..591dd1eea 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_tls_SUITE.erl @@ -27,6 +27,7 @@ -define(PGSQL_HOST, "pgsql-tls"). -define(PATH, [authentication]). +-define(ResourceID, <<"password-based:postgresql">>). all() -> emqx_common_test_helpers:all(?MODULE). @@ -84,7 +85,7 @@ t_create_invalid(_Config) -> create_pgsql_auth_with_ssl_opts( #{<<"server_name_indication">> => <<"authn-server-unknown-host">>, <<"verify">> => <<"verify_peer">>})), - + emqx_authn_test_lib:delete_config(?ResourceID), %% incompatible versions ?assertMatch( {ok, _}, @@ -92,7 +93,7 @@ t_create_invalid(_Config) -> #{<<"server_name_indication">> => <<"authn-server">>, <<"verify">> => <<"verify_peer">>, <<"versions">> => [<<"tlsv1.1">>]})), - + emqx_authn_test_lib:delete_config(?ResourceID), %% incompatible ciphers ?assertMatch( {ok, _}, diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index 7aad87b6a..c02830ab7 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -28,6 +28,7 @@ -define(REDIS_RESOURCE, <<"emqx_authn_redis_SUITE">>). -define(PATH, [authentication]). +-define(ResourceID, <<"password-based:redis">>). all() -> [{group, require_seeds}, t_create, t_create_invalid]. @@ -61,7 +62,8 @@ init_per_suite(Config) -> ?REDIS_RESOURCE, ?RESOURCE_GROUP, emqx_connector_redis, - redis_config()), + redis_config(), + #{waiting_connect_complete => 5000}), Config; false -> {skip, no_redis} @@ -91,13 +93,8 @@ t_create(_Config) -> t_create_invalid(_Config) -> AuthConfig = raw_redis_auth_config(), - InvalidConfigs = [ - maps:without([server], AuthConfig), - AuthConfig#{server => <<"unknownhost:3333">>}, - AuthConfig#{password => <<"wrongpass">>}, - AuthConfig#{database => <<"5678">>}, AuthConfig#{ cmd => <<"MGET password_hash:${username} salt:${username}">>}, AuthConfig#{ @@ -105,16 +102,33 @@ t_create_invalid(_Config) -> AuthConfig#{ cmd => <<"HMGET mqtt_user:${username} salt is_superuser">>} ], + lists:foreach( + fun(Config) -> + {error, _} = emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, Config}), + + {ok, []} = emqx_authentication:list_authenticators(?GLOBAL) + end, + InvalidConfigs), + + InvalidConfigs1 = + [ + maps:without([server], AuthConfig), + AuthConfig#{server => <<"unknownhost:3333">>}, + AuthConfig#{password => <<"wrongpass">>}, + AuthConfig#{database => <<"5678">>} + ], lists:foreach( fun(Config) -> {ok, _} = emqx:update_config( ?PATH, {create_authenticator, ?GLOBAL, Config}), - + emqx_authn_test_lib:delete_config(?ResourceID), {ok, []} = emqx_authentication:list_authenticators(?GLOBAL) end, - InvalidConfigs). + InvalidConfigs1). t_authenticate(_Config) -> ok = lists:foreach( @@ -270,7 +284,8 @@ user_seeds() -> }, #{data => #{ - password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, + password_hash => + <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>, is_superuser => <<"0">> }, @@ -303,7 +318,8 @@ user_seeds() -> result => {ok,#{is_superuser => false}} }, #{data => #{ - password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, + password_hash => + <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>, is_superuser => <<"0">> }, @@ -321,7 +337,8 @@ user_seeds() -> }, #{data => #{ - password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, + password_hash => + <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>, is_superuser => <<"0">> }, @@ -339,7 +356,8 @@ user_seeds() -> }, #{data => #{ - password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, + password_hash => + <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>, is_superuser => <<"0">> }, diff --git a/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl index d6249e328..670088e08 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_tls_SUITE.erl @@ -86,7 +86,7 @@ t_create_invalid(_Config) -> %% incompatible versions ?assertMatch( - {ok, _}, + {error, _}, create_redis_auth_with_ssl_opts( #{<<"server_name_indication">> => <<"authn-server">>, <<"verify">> => <<"verify_peer">>, @@ -94,7 +94,7 @@ t_create_invalid(_Config) -> %% incompatible ciphers ?assertMatch( - {ok, _}, + {error, _}, create_redis_auth_with_ssl_opts( #{<<"server_name_indication">> => <<"authn-server">>, <<"verify">> => <<"verify_peer">>, diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index 936cfc7e6..a127a9c2b 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -55,7 +55,8 @@ init(#{query := SQL0} = Source) -> ResourceID, ?RESOURCE_GROUP, emqx_connector_pgsql, - Source#{named_queries => #{ResourceID => SQL}}) of + Source#{named_queries => #{ResourceID => SQL}}, + #{waiting_connect_complete => 5000}) of {ok, _} -> Source#{annotations => #{id => ResourceID, diff --git a/apps/emqx_authz/src/emqx_authz_utils.erl b/apps/emqx_authz/src/emqx_authz_utils.erl index cef16b99b..73e387d81 100644 --- a/apps/emqx_authz/src/emqx_authz_utils.erl +++ b/apps/emqx_authz/src/emqx_authz_utils.erl @@ -38,7 +38,7 @@ create_resource(Module, Config) -> case emqx_resource:create_local(ResourceID, ?RESOURCE_GROUP, Module, Config, - #{wait_connected => 1000}) of + #{waiting_connect_complete => 5000}) of {ok, already_created} -> {ok, ResourceID}; {ok, _} -> {ok, ResourceID}; {error, Reason} -> {error, Reason} diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 638da1416..e62be0aa2 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -46,6 +46,7 @@ end_per_suite(_Config) -> #{<<"no_match">> => <<"allow">>, <<"cache">> => #{<<"enable">> => <<"true">>}, <<"sources">> => []}), + ok = stop_apps([emqx_resource]), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), meck:unload(emqx_resource), ok. @@ -222,3 +223,6 @@ t_move_source(_) -> ], emqx_authz:lookup()), ok. + +stop_apps(Apps) -> + lists:foreach(fun application:stop/1, Apps). diff --git a/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl index 6a48fee4d..e4b48c740 100644 --- a/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_settings_SUITE.erl @@ -44,6 +44,7 @@ end_per_suite(_Config) -> #{<<"no_match">> => <<"allow">>, <<"cache">> => #{<<"enable">> => <<"true">>}, <<"sources">> => []}), + ok = stop_apps([emqx_resource, emqx_connector]), emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), ok. @@ -131,3 +132,6 @@ auth_header_() -> Password = <<"public">>, {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. + +stop_apps(Apps) -> + lists:foreach(fun application:stop/1, Apps). diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl index 9e435267d..1bdff9455 100644 --- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl @@ -44,7 +44,8 @@ init_per_suite(Config) -> ?MYSQL_RESOURCE, ?RESOURCE_GROUP, emqx_connector_mysql, - mysql_config()), + mysql_config(), + #{waiting_connect_complete => 5000}), Config; false -> {skip, no_mysql} @@ -179,9 +180,9 @@ t_create_invalid(_Config) -> BadConfig = maps:merge( raw_mysql_authz_config(), #{<<"server">> => <<"255.255.255.255:33333">>}), - {error, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]), + {ok, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]), - [] = emqx_authz:lookup(). + [_] = emqx_authz:lookup(). t_nonbinary_values(_Config) -> ClientInfo = #{clientid => clientid, diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl index 8e4d25434..5f8c914fe 100644 --- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl @@ -44,7 +44,8 @@ init_per_suite(Config) -> ?PGSQL_RESOURCE, ?RESOURCE_GROUP, emqx_connector_pgsql, - pgsql_config()), + pgsql_config(), + #{waiting_connect_complete => 5000}), Config; false -> {skip, no_pgsql} @@ -180,9 +181,9 @@ t_create_invalid(_Config) -> BadConfig = maps:merge( raw_pgsql_authz_config(), #{<<"server">> => <<"255.255.255.255:33333">>}), - {error, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]), + {ok, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]), - [] = emqx_authz:lookup(). + [_] = emqx_authz:lookup(). t_nonbinary_values(_Config) -> ClientInfo = #{clientid => clientid, diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 32b62ac35..519973ebe 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -45,7 +45,8 @@ init_per_suite(Config) -> ?REDIS_RESOURCE, ?RESOURCE_GROUP, emqx_connector_redis, - redis_config()), + redis_config(), + #{waiting_connect_complete => 5000}), Config; false -> {skip, no_redis} @@ -151,8 +152,8 @@ t_create_invalid(_Config) -> lists:foreach( fun(Config) -> - {error, _} = emqx_authz:update(?CMD_REPLACE, [Config]), - [] = emqx_authz:lookup() + {ok, _} = emqx_authz:update(?CMD_REPLACE, [Config]), + [_] = emqx_authz:lookup() end, InvalidConfigs). diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 2d40c997b..67e9286bf 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -224,8 +224,11 @@ create(BridgeId, Conf) -> create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), - case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, emqx_bridge:resource_type(Type), - parse_confs(Type, Name, Conf), #{wait_connected => 1000}) of + case emqx_resource:create_local(resource_id(Type, Name), + <<"emqx_bridge">>, + emqx_bridge:resource_type(Type), + parse_confs(Type, Name, Conf), + #{waiting_connect_complete => 5000}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); {error, Reason} -> {error, Reason} @@ -270,7 +273,9 @@ recreate(Type, Name) -> recreate(Type, Name, Conf) -> emqx_resource:recreate_local(resource_id(Type, Name), - emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), #{wait_connected => 1000}). + emqx_bridge:resource_type(Type), + parse_confs(Type, Name, Conf), + #{waiting_connect_complete => 5000}). create_dry_run(Type, Conf) -> Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, diff --git a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl index 3eec37af1..47d8f31c8 100644 --- a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl @@ -63,26 +63,33 @@ t_lifecycle(_Config) -> ). perform_lifecycle_check(PoolName, InitialConfig) -> - {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig), - {ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local( + {ok, #{config := CheckedConfig}} = + emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig), + {ok, #{state := #{poolname := ReturnedPoolName} = State, + status := InitialStatus}} = emqx_resource:create_local( PoolName, ?CONNECTOR_RESOURCE_GROUP, ?MYSQL_RESOURCE_MOD, CheckedConfig, - #{wait_connected => 1000} + #{waiting_connect_complete => 5000} ), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource - {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, + status := InitialStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(ok, emqx_resource:health_check(PoolName)), % % Perform query as further check that the resource is working as expected ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())), - ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params_and_timeout())), + ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, + test_query_with_params_and_timeout())), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. - {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, + status := StoppedStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. @@ -90,12 +97,16 @@ perform_lifecycle_check(PoolName, InitialConfig) -> % Can call stop/1 again on an already stopped instance ?assertEqual(ok, emqx_resource:stop(PoolName)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName, #{wait_connected => 1000})), - {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(ok, emqx_resource:restart(PoolName)), + % async restart, need to wait resource + timer:sleep(500), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = + emqx_resource:get_instance(PoolName), ?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())), - ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params_and_timeout())), + ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, + test_query_with_params_and_timeout())), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl index b8b3980ac..bc7b2eb4d 100644 --- a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl @@ -63,17 +63,22 @@ t_lifecycle(_Config) -> ). perform_lifecycle_check(PoolName, InitialConfig) -> - {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig), - {ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local( + {ok, #{config := CheckedConfig}} = + emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig), + {ok, #{state := #{poolname := ReturnedPoolName} = State, + status := InitialStatus}} + = emqx_resource:create_local( PoolName, ?CONNECTOR_RESOURCE_GROUP, ?PGSQL_RESOURCE_MOD, CheckedConfig, - #{wait_connected => 1000} + #{waiting_connect_complete => 5000} ), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource - {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, + status := InitialStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(ok, emqx_resource:health_check(PoolName)), % % Perform query as further check that the resource is working as expected ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())), @@ -81,7 +86,9 @@ perform_lifecycle_check(PoolName, InitialConfig) -> ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. - {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, + status := StoppedStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. @@ -89,8 +96,11 @@ perform_lifecycle_check(PoolName, InitialConfig) -> % Can call stop/1 again on an already stopped instance ?assertEqual(ok, emqx_resource:stop(PoolName)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName, #{wait_connected => 1000})), - {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(ok, emqx_resource:restart(PoolName)), + % async restart, need to wait resource + timer:sleep(500), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())), diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl index 2b8495dac..f1fcee67c 100644 --- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -78,24 +78,30 @@ t_sentinel_lifecycle(_Config) -> ). perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> - {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig), - {ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local( + {ok, #{config := CheckedConfig}} = + emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig), + {ok, #{state := #{poolname := ReturnedPoolName} = State, + status := InitialStatus}} = emqx_resource:create_local( PoolName, ?CONNECTOR_RESOURCE_GROUP, ?REDIS_RESOURCE_MOD, CheckedConfig, - #{wait_connected => 1000} + #{waiting_connect_complete => 5000} ), ?assertEqual(InitialStatus, connected), % Instance should match the state and status of the just started resource - {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, + status := InitialStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(ok, emqx_resource:health_check(PoolName)), % Perform query as further check that the resource is working as expected ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. - {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, + status := StoppedStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(StoppedStatus, disconnected), ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. @@ -103,8 +109,11 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> % Can call stop/1 again on an already stopped instance ?assertEqual(ok, emqx_resource:stop(PoolName)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName, #{wait_connected => 1000})), - {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(ok, emqx_resource:restart(PoolName)), + % async restart, need to wait resource + timer:sleep(500), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} + = emqx_resource:get_instance(PoolName), ?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), % Stop and remove the resource in one go. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index cd10f9fa4..fdddcdc87 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -32,7 +32,7 @@ -type create_opts() :: #{ health_check_interval => integer(), health_check_timeout => integer(), - wait_connected => integer() + waiting_connect_complete => integer() }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | undefined. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 5957222cb..d59835123 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -147,7 +147,11 @@ create(InstId, Group, ResourceType, Config, Opts) -> create_local(InstId, Group, ResourceType, Config) -> create_local(InstId, Group, ResourceType, Config, #{}). --spec create_local(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> +-spec create_local(instance_id(), + resource_group(), + resource_type(), + resource_config(), + create_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. create_local(InstId, Group, ResourceType, Config, Opts) -> call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}). @@ -228,7 +232,8 @@ health_check(InstId) -> set_resource_status_connecting(InstId) -> call_instance(InstId, {set_resource_status_connecting, InstId}). --spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. +-spec get_instance(instance_id()) -> + {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(InstId) -> emqx_resource_instance:lookup(InstId). @@ -273,35 +278,54 @@ call_stop(InstId, Mod, ResourceState) -> check_config(ResourceType, Conf) -> emqx_hocon:check(ResourceType, Conf). --spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config()) -> +-spec check_and_create(instance_id(), + resource_group(), + resource_type(), + raw_resource_config()) -> {ok, resource_data() | 'already_created'} | {error, term()}. check_and_create(InstId, Group, ResourceType, RawConfig) -> check_and_create(InstId, Group, ResourceType, RawConfig, #{}). --spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config(), create_opts()) -> +-spec check_and_create(instance_id(), + resource_group(), + resource_type(), + raw_resource_config(), + create_opts()) -> {ok, resource_data() | 'already_created'} | {error, term()}. check_and_create(InstId, Group, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end). --spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config()) -> +-spec check_and_create_local(instance_id(), + resource_group(), + resource_type(), + raw_resource_config()) -> {ok, resource_data()} | {error, term()}. check_and_create_local(InstId, Group, ResourceType, RawConfig) -> check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}). --spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config(), +-spec check_and_create_local(instance_id(), + resource_group(), + resource_type(), + raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end). --spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> +-spec check_and_recreate(instance_id(), + resource_type(), + raw_resource_config(), + create_opts()) -> {ok, resource_data()} | {error, term()}. check_and_recreate(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end). --spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> +-spec check_and_recreate_local(instance_id(), + resource_type(), + raw_resource_config(), + create_opts()) -> {ok, resource_data()} | {error, term()}. check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 6d4abc9cc..0faaebabb 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -54,7 +54,7 @@ delete_checker(Name) -> case supervisor:terminate_child(?SUP, ?ID(Name)) of ok -> supervisor:delete_child(?SUP, ?ID(Name)); Error -> Error - end. + end. start_health_check(Name, Sleep, Timeout) -> Pid = self(), diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index ea88b24bd..b828f1b40 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -90,9 +90,9 @@ list_all() -> end. -spec list_group(resource_group()) -> [instance_id()]. -list_group(Group) -> +list_group(Group) -> List = ets:match(emqx_resource_instance, {'$1', Group, '_'}), - lists:map(fun([A|_]) -> A end, List). + lists:map(fun([A | _]) -> A end, List). %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -197,7 +197,7 @@ do_create(InstId, Group, ResourceType, Config, Opts) -> ok -> ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId, [matched, success, failed, exception], [matched]), - WaitTime = maps:get(wait_connected, Opts, 0), + WaitTime = maps:get(waiting_connect_complete , Opts, 0), {ok, wait_for_resource_ready(InstId, WaitTime div 100)}; Error -> Error diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 49cb563e3..0a1b769a8 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -360,7 +360,8 @@ create_resource(Context, #{type := DB} = Config) -> ResourceID, <<"emqx_retainer">>, list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), - Config) of + Config, + #{waiting_connect_complete => 5000}) of {ok, already_created} -> Context#{resource_id => ResourceID}; {ok, _} ->