fix(emqx_authn): fix test suite

This commit is contained in:
EMQ-YangM 2022-03-02 10:49:50 +08:00
parent ca7a43986a
commit 583624fb8d
28 changed files with 195 additions and 92 deletions

View File

@ -134,7 +134,7 @@ create(#{method := Method,
emqx_connector_http, emqx_connector_http,
Config#{base_url => maps:remove(query, URIMap), Config#{base_url => maps:remove(query, URIMap),
pool_type => random}, pool_type => random},
#{wait_connected => 1000}) of #{waiting_connect_complete => 5000}) of
{ok, already_created} -> {ok, already_created} ->
{ok, State}; {ok, State};
{ok, _} -> {ok, _} ->

View File

@ -116,7 +116,7 @@ create(#{selector := Selector} = Config) ->
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_mongo, emqx_connector_mongo,
Config, Config,
#{wait_connected => 1000}) of #{waiting_connect_complete => 5000}) of
{ok, already_created} -> {ok, already_created} ->
{ok, NState}; {ok, NState};
{ok, _} -> {ok, _} ->

View File

@ -81,7 +81,11 @@ create(#{password_hash_algorithm := Algorithm,
placeholders => PlaceHolders, placeholders => PlaceHolders,
query_timeout => QueryTimeout, query_timeout => QueryTimeout,
resource_id => ResourceId}, resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_mysql, Config, #{wait_connected => 1000}) of case emqx_resource:create_local(ResourceId,
?RESOURCE_GROUP,
emqx_connector_mysql,
Config,
#{waiting_connect_complete => 5000}) of
{ok, already_created} -> {ok, already_created} ->
{ok, State}; {ok, State};
{ok, _} -> {ok, _} ->

View File

@ -81,7 +81,7 @@ create(#{query := Query0,
resource_id => ResourceId}, resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql, case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql,
Config#{named_queries => #{ResourceId => Query}}, Config#{named_queries => #{ResourceId => Query}},
#{wait_connected => 1000}) of #{waiting_connect_complete => 5000}) of
{ok, already_created} -> {ok, already_created} ->
{ok, State}; {ok, State};
{ok, _} -> {ok, _} ->

View File

@ -93,7 +93,7 @@ create(#{cmd := Cmd,
resource_id => ResourceId}, resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP,
emqx_connector_redis, Config, emqx_connector_redis, Config,
#{wait_connected => 1000}) of #{waiting_connect_complete => 5000}) of
{ok, already_created} -> {ok, already_created} ->
{ok, NState}; {ok, NState};
{ok, _} -> {ok, _} ->

View File

@ -91,9 +91,9 @@ t_create_invalid_server_name(_Config) ->
create_mongo_auth_with_ssl_opts( create_mongo_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server-unknown-host">>, #{<<"server_name_indication">> => <<"authn-server-unknown-host">>,
<<"verify">> => <<"verify_peer">>}), <<"verify">> => <<"verify_peer">>}),
fun({ok, _}, Trace) -> fun(_, Trace) ->
?assertEqual( ?assertNotEqual(
[failed], [ok],
?projection( ?projection(
status, status,
?of_kind(emqx_connector_mongo_health_check, Trace))) ?of_kind(emqx_connector_mongo_health_check, Trace)))
@ -109,9 +109,9 @@ t_create_invalid_version(_Config) ->
#{<<"server_name_indication">> => <<"authn-server">>, #{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>, <<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.1">>]}), <<"versions">> => [<<"tlsv1.1">>]}),
fun({ok, _}, Trace) -> fun(_, Trace) ->
?assertEqual( ?assertNotEqual(
[failed], [ok],
?projection( ?projection(
status, status,
?of_kind(emqx_connector_mongo_health_check, Trace))) ?of_kind(emqx_connector_mongo_health_check, Trace)))
@ -128,9 +128,9 @@ t_invalid_ciphers(_Config) ->
<<"verify">> => <<"verify_peer">>, <<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.2">>], <<"versions">> => [<<"tlsv1.2">>],
<<"ciphers">> => [<<"DHE-RSA-AES256-GCM-SHA384">>]}), <<"ciphers">> => [<<"DHE-RSA-AES256-GCM-SHA384">>]}),
fun({ok, _}, Trace) -> fun(_, Trace) ->
?assertEqual( ?assertNotEqual(
[failed], [ok],
?projection( ?projection(
status, status,
?of_kind(emqx_connector_mongo_health_check, Trace))) ?of_kind(emqx_connector_mongo_health_check, Trace)))
@ -142,7 +142,9 @@ t_invalid_ciphers(_Config) ->
create_mongo_auth_with_ssl_opts(SpecificSSLOpts) -> create_mongo_auth_with_ssl_opts(SpecificSSLOpts) ->
AuthConfig = raw_mongo_auth_config(SpecificSSLOpts), AuthConfig = raw_mongo_auth_config(SpecificSSLOpts),
emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, AuthConfig}). Res = emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, AuthConfig}),
timer:sleep(500),
Res.
raw_mongo_auth_config(SpecificSSLOpts) -> raw_mongo_auth_config(SpecificSSLOpts) ->
SSLOpts = maps:merge( SSLOpts = maps:merge(

View File

@ -28,6 +28,7 @@
-define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>). -define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>).
-define(PATH, [authentication]). -define(PATH, [authentication]).
-define(ResourceID, <<"password-based:mysql">>).
all() -> all() ->
[{group, require_seeds}, t_create, t_create_invalid]. [{group, require_seeds}, t_create, t_create_invalid].
@ -61,7 +62,8 @@ init_per_suite(Config) ->
?MYSQL_RESOURCE, ?MYSQL_RESOURCE,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_mysql, emqx_connector_mysql,
mysql_config()), mysql_config(),
#{waiting_connect_complete => 5000}),
Config; Config;
false -> false ->
{skip, no_mysql} {skip, no_mysql}
@ -86,7 +88,8 @@ t_create(_Config) ->
?PATH, ?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}), {create_authenticator, ?GLOBAL, AuthConfig}),
{ok, [#{provider := emqx_authn_mysql}]} = emqx_authentication:list_authenticators(?GLOBAL). {ok, [#{provider := emqx_authn_mysql}]} = emqx_authentication:list_authenticators(?GLOBAL),
emqx_authn_test_lib:delete_config(?ResourceID).
t_create_invalid(_Config) -> t_create_invalid(_Config) ->
AuthConfig = raw_mysql_auth_config(), AuthConfig = raw_mysql_auth_config(),
@ -104,8 +107,8 @@ t_create_invalid(_Config) ->
{ok, _} = emqx:update_config( {ok, _} = emqx:update_config(
?PATH, ?PATH,
{create_authenticator, ?GLOBAL, Config}), {create_authenticator, ?GLOBAL, Config}),
emqx_authn_test_lib:delete_config(?ResourceID),
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL) {ok, _} = emqx_authentication:list_authenticators(?GLOBAL)
end, end,
InvalidConfigs). InvalidConfigs).

View File

@ -27,6 +27,7 @@
-define(MYSQL_HOST, "mysql-tls"). -define(MYSQL_HOST, "mysql-tls").
-define(PATH, [authentication]). -define(PATH, [authentication]).
-define(ResourceID, <<"password-based:mysql">>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -84,7 +85,7 @@ t_create_invalid(_Config) ->
create_mysql_auth_with_ssl_opts( create_mysql_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server-unknown-host">>, #{<<"server_name_indication">> => <<"authn-server-unknown-host">>,
<<"verify">> => <<"verify_peer">>})), <<"verify">> => <<"verify_peer">>})),
emqx_authn_test_lib:delete_config(?ResourceID),
%% incompatible versions %% incompatible versions
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
@ -92,7 +93,7 @@ t_create_invalid(_Config) ->
#{<<"server_name_indication">> => <<"authn-server">>, #{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>, <<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.1">>]})), <<"versions">> => [<<"tlsv1.1">>]})),
emqx_authn_test_lib:delete_config(?ResourceID),
%% incompatible ciphers %% incompatible ciphers
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},

View File

@ -27,6 +27,7 @@
-define(PGSQL_HOST, "pgsql"). -define(PGSQL_HOST, "pgsql").
-define(PGSQL_RESOURCE, <<"emqx_authn_pgsql_SUITE">>). -define(PGSQL_RESOURCE, <<"emqx_authn_pgsql_SUITE">>).
-define(ResourceID, <<"password-based:postgresql">>).
-define(PATH, [authentication]). -define(PATH, [authentication]).
@ -63,7 +64,7 @@ init_per_suite(Config) ->
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_pgsql, emqx_connector_pgsql,
pgsql_config(), pgsql_config(),
#{wait_connected => 1000}), #{waiting_connect_complete => 5000}),
Config; Config;
false -> false ->
{skip, no_pgsql} {skip, no_pgsql}
@ -88,7 +89,8 @@ t_create(_Config) ->
?PATH, ?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}), {create_authenticator, ?GLOBAL, AuthConfig}),
{ok, [#{provider := emqx_authn_pgsql}]} = emqx_authentication:list_authenticators(?GLOBAL). {ok, [#{provider := emqx_authn_pgsql}]} = emqx_authentication:list_authenticators(?GLOBAL),
emqx_authn_test_lib:delete_config(?ResourceID).
t_create_invalid(_Config) -> t_create_invalid(_Config) ->
AuthConfig = raw_pgsql_auth_config(), AuthConfig = raw_pgsql_auth_config(),
@ -103,10 +105,10 @@ t_create_invalid(_Config) ->
lists:foreach( lists:foreach(
fun(Config) -> fun(Config) ->
{error, _} = emqx:update_config( {ok, _} = emqx:update_config(
?PATH, ?PATH,
{create_authenticator, ?GLOBAL, Config}), {create_authenticator, ?GLOBAL, Config}),
emqx_authn_test_lib:delete_config(?ResourceID),
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL) {ok, []} = emqx_authentication:list_authenticators(?GLOBAL)
end, end,
InvalidConfigs). InvalidConfigs).

View File

@ -27,6 +27,7 @@
-define(PGSQL_HOST, "pgsql-tls"). -define(PGSQL_HOST, "pgsql-tls").
-define(PATH, [authentication]). -define(PATH, [authentication]).
-define(ResourceID, <<"password-based:postgresql">>).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -84,7 +85,7 @@ t_create_invalid(_Config) ->
create_pgsql_auth_with_ssl_opts( create_pgsql_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server-unknown-host">>, #{<<"server_name_indication">> => <<"authn-server-unknown-host">>,
<<"verify">> => <<"verify_peer">>})), <<"verify">> => <<"verify_peer">>})),
emqx_authn_test_lib:delete_config(?ResourceID),
%% incompatible versions %% incompatible versions
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
@ -92,7 +93,7 @@ t_create_invalid(_Config) ->
#{<<"server_name_indication">> => <<"authn-server">>, #{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>, <<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.1">>]})), <<"versions">> => [<<"tlsv1.1">>]})),
emqx_authn_test_lib:delete_config(?ResourceID),
%% incompatible ciphers %% incompatible ciphers
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},

View File

@ -28,6 +28,7 @@
-define(REDIS_RESOURCE, <<"emqx_authn_redis_SUITE">>). -define(REDIS_RESOURCE, <<"emqx_authn_redis_SUITE">>).
-define(PATH, [authentication]). -define(PATH, [authentication]).
-define(ResourceID, <<"password-based:redis">>).
all() -> all() ->
[{group, require_seeds}, t_create, t_create_invalid]. [{group, require_seeds}, t_create, t_create_invalid].
@ -61,7 +62,8 @@ init_per_suite(Config) ->
?REDIS_RESOURCE, ?REDIS_RESOURCE,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_redis, emqx_connector_redis,
redis_config()), redis_config(),
#{waiting_connect_complete => 5000}),
Config; Config;
false -> false ->
{skip, no_redis} {skip, no_redis}
@ -91,13 +93,8 @@ t_create(_Config) ->
t_create_invalid(_Config) -> t_create_invalid(_Config) ->
AuthConfig = raw_redis_auth_config(), AuthConfig = raw_redis_auth_config(),
InvalidConfigs = InvalidConfigs =
[ [
maps:without([server], AuthConfig),
AuthConfig#{server => <<"unknownhost:3333">>},
AuthConfig#{password => <<"wrongpass">>},
AuthConfig#{database => <<"5678">>},
AuthConfig#{ AuthConfig#{
cmd => <<"MGET password_hash:${username} salt:${username}">>}, cmd => <<"MGET password_hash:${username} salt:${username}">>},
AuthConfig#{ AuthConfig#{
@ -105,16 +102,33 @@ t_create_invalid(_Config) ->
AuthConfig#{ AuthConfig#{
cmd => <<"HMGET mqtt_user:${username} salt is_superuser">>} cmd => <<"HMGET mqtt_user:${username} salt is_superuser">>}
], ],
lists:foreach(
fun(Config) ->
{error, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, Config}),
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL)
end,
InvalidConfigs),
InvalidConfigs1 =
[
maps:without([server], AuthConfig),
AuthConfig#{server => <<"unknownhost:3333">>},
AuthConfig#{password => <<"wrongpass">>},
AuthConfig#{database => <<"5678">>}
],
lists:foreach( lists:foreach(
fun(Config) -> fun(Config) ->
{ok, _} = emqx:update_config( {ok, _} = emqx:update_config(
?PATH, ?PATH,
{create_authenticator, ?GLOBAL, Config}), {create_authenticator, ?GLOBAL, Config}),
emqx_authn_test_lib:delete_config(?ResourceID),
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL) {ok, []} = emqx_authentication:list_authenticators(?GLOBAL)
end, end,
InvalidConfigs). InvalidConfigs1).
t_authenticate(_Config) -> t_authenticate(_Config) ->
ok = lists:foreach( ok = lists:foreach(
@ -270,7 +284,8 @@ user_seeds() ->
}, },
#{data => #{ #{data => #{
password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, password_hash =>
<<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>, salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>,
is_superuser => <<"0">> is_superuser => <<"0">>
}, },
@ -303,7 +318,8 @@ user_seeds() ->
result => {ok,#{is_superuser => false}} result => {ok,#{is_superuser => false}}
}, },
#{data => #{ #{data => #{
password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, password_hash =>
<<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>, salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>,
is_superuser => <<"0">> is_superuser => <<"0">>
}, },
@ -321,7 +337,8 @@ user_seeds() ->
}, },
#{data => #{ #{data => #{
password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, password_hash =>
<<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>, salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>,
is_superuser => <<"0">> is_superuser => <<"0">>
}, },
@ -339,7 +356,8 @@ user_seeds() ->
}, },
#{data => #{ #{data => #{
password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>, password_hash =>
<<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>, salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>,
is_superuser => <<"0">> is_superuser => <<"0">>
}, },

View File

@ -86,7 +86,7 @@ t_create_invalid(_Config) ->
%% incompatible versions %% incompatible versions
?assertMatch( ?assertMatch(
{ok, _}, {error, _},
create_redis_auth_with_ssl_opts( create_redis_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server">>, #{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>, <<"verify">> => <<"verify_peer">>,
@ -94,7 +94,7 @@ t_create_invalid(_Config) ->
%% incompatible ciphers %% incompatible ciphers
?assertMatch( ?assertMatch(
{ok, _}, {error, _},
create_redis_auth_with_ssl_opts( create_redis_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server">>, #{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>, <<"verify">> => <<"verify_peer">>,

View File

@ -55,7 +55,8 @@ init(#{query := SQL0} = Source) ->
ResourceID, ResourceID,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_pgsql, emqx_connector_pgsql,
Source#{named_queries => #{ResourceID => SQL}}) of Source#{named_queries => #{ResourceID => SQL}},
#{waiting_connect_complete => 5000}) of
{ok, _} -> {ok, _} ->
Source#{annotations => Source#{annotations =>
#{id => ResourceID, #{id => ResourceID,

View File

@ -38,7 +38,7 @@ create_resource(Module, Config) ->
case emqx_resource:create_local(ResourceID, case emqx_resource:create_local(ResourceID,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
Module, Config, Module, Config,
#{wait_connected => 1000}) of #{waiting_connect_complete => 5000}) of
{ok, already_created} -> {ok, ResourceID}; {ok, already_created} -> {ok, ResourceID};
{ok, _} -> {ok, ResourceID}; {ok, _} -> {ok, ResourceID};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}

View File

@ -46,6 +46,7 @@ end_per_suite(_Config) ->
#{<<"no_match">> => <<"allow">>, #{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>}, <<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}), <<"sources">> => []}),
ok = stop_apps([emqx_resource]),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]), emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource), meck:unload(emqx_resource),
ok. ok.
@ -222,3 +223,6 @@ t_move_source(_) ->
], emqx_authz:lookup()), ], emqx_authz:lookup()),
ok. ok.
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -44,6 +44,7 @@ end_per_suite(_Config) ->
#{<<"no_match">> => <<"allow">>, #{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>}, <<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}), <<"sources">> => []}),
ok = stop_apps([emqx_resource, emqx_connector]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]), emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
ok. ok.
@ -131,3 +132,6 @@ auth_header_() ->
Password = <<"public">>, Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}. {"Authorization", "Bearer " ++ binary_to_list(Token)}.
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -44,7 +44,8 @@ init_per_suite(Config) ->
?MYSQL_RESOURCE, ?MYSQL_RESOURCE,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_mysql, emqx_connector_mysql,
mysql_config()), mysql_config(),
#{waiting_connect_complete => 5000}),
Config; Config;
false -> false ->
{skip, no_mysql} {skip, no_mysql}
@ -179,9 +180,9 @@ t_create_invalid(_Config) ->
BadConfig = maps:merge( BadConfig = maps:merge(
raw_mysql_authz_config(), raw_mysql_authz_config(),
#{<<"server">> => <<"255.255.255.255:33333">>}), #{<<"server">> => <<"255.255.255.255:33333">>}),
{error, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]), {ok, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]),
[] = emqx_authz:lookup(). [_] = emqx_authz:lookup().
t_nonbinary_values(_Config) -> t_nonbinary_values(_Config) ->
ClientInfo = #{clientid => clientid, ClientInfo = #{clientid => clientid,

View File

@ -44,7 +44,8 @@ init_per_suite(Config) ->
?PGSQL_RESOURCE, ?PGSQL_RESOURCE,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_pgsql, emqx_connector_pgsql,
pgsql_config()), pgsql_config(),
#{waiting_connect_complete => 5000}),
Config; Config;
false -> false ->
{skip, no_pgsql} {skip, no_pgsql}
@ -180,9 +181,9 @@ t_create_invalid(_Config) ->
BadConfig = maps:merge( BadConfig = maps:merge(
raw_pgsql_authz_config(), raw_pgsql_authz_config(),
#{<<"server">> => <<"255.255.255.255:33333">>}), #{<<"server">> => <<"255.255.255.255:33333">>}),
{error, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]), {ok, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]),
[] = emqx_authz:lookup(). [_] = emqx_authz:lookup().
t_nonbinary_values(_Config) -> t_nonbinary_values(_Config) ->
ClientInfo = #{clientid => clientid, ClientInfo = #{clientid => clientid,

View File

@ -45,7 +45,8 @@ init_per_suite(Config) ->
?REDIS_RESOURCE, ?REDIS_RESOURCE,
?RESOURCE_GROUP, ?RESOURCE_GROUP,
emqx_connector_redis, emqx_connector_redis,
redis_config()), redis_config(),
#{waiting_connect_complete => 5000}),
Config; Config;
false -> false ->
{skip, no_redis} {skip, no_redis}
@ -151,8 +152,8 @@ t_create_invalid(_Config) ->
lists:foreach( lists:foreach(
fun(Config) -> fun(Config) ->
{error, _} = emqx_authz:update(?CMD_REPLACE, [Config]), {ok, _} = emqx_authz:update(?CMD_REPLACE, [Config]),
[] = emqx_authz:lookup() [_] = emqx_authz:lookup()
end, end,
InvalidConfigs). InvalidConfigs).

View File

@ -224,8 +224,11 @@ create(BridgeId, Conf) ->
create(Type, Name, Conf) -> create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name, ?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}), config => Conf}),
case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, emqx_bridge:resource_type(Type), case emqx_resource:create_local(resource_id(Type, Name),
parse_confs(Type, Name, Conf), #{wait_connected => 1000}) of <<"emqx_bridge">>,
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf),
#{waiting_connect_complete => 5000}) of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
@ -270,7 +273,9 @@ recreate(Type, Name) ->
recreate(Type, Name, Conf) -> recreate(Type, Name, Conf) ->
emqx_resource:recreate_local(resource_id(Type, Name), emqx_resource:recreate_local(resource_id(Type, Name),
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), #{wait_connected => 1000}). emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf),
#{waiting_connect_complete => 5000}).
create_dry_run(Type, Conf) -> create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}}, Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},

View File

@ -63,26 +63,33 @@ t_lifecycle(_Config) ->
). ).
perform_lifecycle_check(PoolName, InitialConfig) -> perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig), {ok, #{config := CheckedConfig}} =
{ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local( emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State,
status := InitialStatus}} = emqx_resource:create_local(
PoolName, PoolName,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?MYSQL_RESOURCE_MOD, ?MYSQL_RESOURCE_MOD,
CheckedConfig, CheckedConfig,
#{wait_connected => 1000} #{waiting_connect_complete => 5000}
), ),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource % Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName), {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected % % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params_and_timeout())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName,
test_query_with_params_and_timeout())),
?assertEqual(ok, emqx_resource:stop(PoolName)), ?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail % Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists. % as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName), {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := StoppedStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected), ?assertEqual(StoppedStatus, disconnected),
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)), ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
@ -90,12 +97,16 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
% Can call stop/1 again on an already stopped instance % Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)), ?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly % Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName, #{wait_connected => 1000})), ?assertEqual(ok, emqx_resource:restart(PoolName)),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), % async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params_and_timeout())), ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName,
test_query_with_params_and_timeout())),
% Stop and remove the resource in one go. % Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),

View File

@ -63,17 +63,22 @@ t_lifecycle(_Config) ->
). ).
perform_lifecycle_check(PoolName, InitialConfig) -> perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig), {ok, #{config := CheckedConfig}} =
{ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local( emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State,
status := InitialStatus}}
= emqx_resource:create_local(
PoolName, PoolName,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?PGSQL_RESOURCE_MOD, ?PGSQL_RESOURCE_MOD,
CheckedConfig, CheckedConfig,
#{wait_connected => 1000} #{waiting_connect_complete => 5000}
), ),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource % Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName), {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected % % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())),
@ -81,7 +86,9 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
?assertEqual(ok, emqx_resource:stop(PoolName)), ?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail % Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists. % as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName), {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := StoppedStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected), ?assertEqual(StoppedStatus, disconnected),
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)), ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
@ -89,8 +96,11 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
% Can call stop/1 again on an already stopped instance % Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)), ?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly % Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName, #{wait_connected => 1000})), ?assertEqual(ok, emqx_resource:restart(PoolName)),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), % async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())), ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())), ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())),

View File

@ -78,24 +78,30 @@ t_sentinel_lifecycle(_Config) ->
). ).
perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig), {ok, #{config := CheckedConfig}} =
{ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local( emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State,
status := InitialStatus}} = emqx_resource:create_local(
PoolName, PoolName,
?CONNECTOR_RESOURCE_GROUP, ?CONNECTOR_RESOURCE_GROUP,
?REDIS_RESOURCE_MOD, ?REDIS_RESOURCE_MOD,
CheckedConfig, CheckedConfig,
#{wait_connected => 1000} #{waiting_connect_complete => 5000}
), ),
?assertEqual(InitialStatus, connected), ?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource % Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName), {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
% Perform query as further check that the resource is working as expected % Perform query as further check that the resource is working as expected
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
?assertEqual(ok, emqx_resource:stop(PoolName)), ?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail % Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists. % as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName), {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := StoppedStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected), ?assertEqual(StoppedStatus, disconnected),
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)), ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
@ -103,8 +109,11 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
% Can call stop/1 again on an already stopped instance % Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)), ?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly % Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName, #{wait_connected => 1000})), ?assertEqual(ok, emqx_resource:restart(PoolName)),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), % async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)), ?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
% Stop and remove the resource in one go. % Stop and remove the resource in one go.

View File

@ -32,7 +32,7 @@
-type create_opts() :: #{ -type create_opts() :: #{
health_check_interval => integer(), health_check_interval => integer(),
health_check_timeout => integer(), health_check_timeout => integer(),
wait_connected => integer() waiting_connect_complete => integer()
}. }.
-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} | -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
undefined. undefined.

View File

@ -147,7 +147,11 @@ create(InstId, Group, ResourceType, Config, Opts) ->
create_local(InstId, Group, ResourceType, Config) -> create_local(InstId, Group, ResourceType, Config) ->
create_local(InstId, Group, ResourceType, Config, #{}). create_local(InstId, Group, ResourceType, Config, #{}).
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> -spec create_local(instance_id(),
resource_group(),
resource_type(),
resource_config(),
create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}. {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create_local(InstId, Group, ResourceType, Config, Opts) -> create_local(InstId, Group, ResourceType, Config, Opts) ->
call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}). call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}).
@ -228,7 +232,8 @@ health_check(InstId) ->
set_resource_status_connecting(InstId) -> set_resource_status_connecting(InstId) ->
call_instance(InstId, {set_resource_status_connecting, InstId}). call_instance(InstId, {set_resource_status_connecting, InstId}).
-spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. -spec get_instance(instance_id()) ->
{ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(InstId) -> get_instance(InstId) ->
emqx_resource_instance:lookup(InstId). emqx_resource_instance:lookup(InstId).
@ -273,35 +278,54 @@ call_stop(InstId, Mod, ResourceState) ->
check_config(ResourceType, Conf) -> check_config(ResourceType, Conf) ->
emqx_hocon:check(ResourceType, Conf). emqx_hocon:check(ResourceType, Conf).
-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config()) -> -spec check_and_create(instance_id(),
resource_group(),
resource_type(),
raw_resource_config()) ->
{ok, resource_data() | 'already_created'} | {error, term()}. {ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, Group, ResourceType, RawConfig) -> check_and_create(InstId, Group, ResourceType, RawConfig) ->
check_and_create(InstId, Group, ResourceType, RawConfig, #{}). check_and_create(InstId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config(), create_opts()) -> -spec check_and_create(instance_id(),
resource_group(),
resource_type(),
raw_resource_config(),
create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, term()}. {ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, Group, ResourceType, RawConfig, Opts) -> check_and_create(InstId, Group, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig, check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end). fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end).
-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config()) -> -spec check_and_create_local(instance_id(),
resource_group(),
resource_type(),
raw_resource_config()) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, Group, ResourceType, RawConfig) -> check_and_create_local(InstId, Group, ResourceType, RawConfig) ->
check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}). check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config(), -spec check_and_create_local(instance_id(),
resource_group(),
resource_type(),
raw_resource_config(),
create_opts()) -> {ok, resource_data()} | {error, term()}. create_opts()) -> {ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) -> check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig, check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end). fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end).
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> -spec check_and_recreate(instance_id(),
resource_type(),
raw_resource_config(),
create_opts()) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_recreate(InstId, ResourceType, RawConfig, Opts) -> check_and_recreate(InstId, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig, check_and_do(ResourceType, RawConfig,
fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end). fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end).
-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> -spec check_and_recreate_local(instance_id(),
resource_type(),
raw_resource_config(),
create_opts()) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) -> check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig, check_and_do(ResourceType, RawConfig,

View File

@ -197,7 +197,7 @@ do_create(InstId, Group, ResourceType, Config, Opts) ->
ok -> ok ->
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId, ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
[matched, success, failed, exception], [matched]), [matched, success, failed, exception], [matched]),
WaitTime = maps:get(wait_connected, Opts, 0), WaitTime = maps:get(waiting_connect_complete , Opts, 0),
{ok, wait_for_resource_ready(InstId, WaitTime div 100)}; {ok, wait_for_resource_ready(InstId, WaitTime div 100)};
Error -> Error ->
Error Error

View File

@ -360,7 +360,8 @@ create_resource(Context, #{type := DB} = Config) ->
ResourceID, ResourceID,
<<"emqx_retainer">>, <<"emqx_retainer">>,
list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
Config) of Config,
#{waiting_connect_complete => 5000}) of
{ok, already_created} -> {ok, already_created} ->
Context#{resource_id => ResourceID}; Context#{resource_id => ResourceID};
{ok, _} -> {ok, _} ->