Merge pull request #7156 from EMQ-YangM/upm2

fix(emqx_resource): remove async_create option
This commit is contained in:
Xinyu Liu 2022-03-09 17:57:15 +08:00 committed by GitHub
commit e0557551aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 253 additions and 145 deletions

View File

@ -133,7 +133,8 @@ create(#{method := Method,
?RESOURCE_GROUP,
emqx_connector_http,
Config#{base_url => maps:remove(query, URIMap),
pool_type => random}) of
pool_type => random},
#{waiting_connect_complete => 5000}) of
{ok, already_created} ->
{ok, State};
{ok, _} ->

View File

@ -112,7 +112,11 @@ create(#{selector := Selector} = Config) ->
NState = State#{
selector_template => SelectorTemplate,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_mongo, Config) of
case emqx_resource:create_local(ResourceId,
?RESOURCE_GROUP,
emqx_connector_mongo,
Config,
#{waiting_connect_complete => 5000}) of
{ok, already_created} ->
{ok, NState};
{ok, _} ->

View File

@ -81,7 +81,11 @@ create(#{password_hash_algorithm := Algorithm,
placeholders => PlaceHolders,
query_timeout => QueryTimeout,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_mysql, Config) of
case emqx_resource:create_local(ResourceId,
?RESOURCE_GROUP,
emqx_connector_mysql,
Config,
#{waiting_connect_complete => 5000}) of
{ok, already_created} ->
{ok, State};
{ok, _} ->

View File

@ -79,7 +79,9 @@ create(#{query := Query0,
State = #{placeholders => PlaceHolders,
password_hash_algorithm => Algorithm,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql,
Config#{named_queries => #{ResourceId => Query}},
#{waiting_connect_complete => 5000}) of
{ok, already_created} ->
{ok, State};
{ok, _} ->

View File

@ -91,7 +91,9 @@ create(#{cmd := Cmd,
NState = State#{
cmd => NCmd,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_redis, Config) of
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP,
emqx_connector_redis, Config,
#{waiting_connect_complete => 5000}) of
{ok, already_created} ->
{ok, NState};
{ok, _} ->

View File

@ -91,9 +91,9 @@ t_create_invalid_server_name(_Config) ->
create_mongo_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server-unknown-host">>,
<<"verify">> => <<"verify_peer">>}),
fun({error, _}, Trace) ->
?assertEqual(
[failed],
fun(_, Trace) ->
?assertNotEqual(
[ok],
?projection(
status,
?of_kind(emqx_connector_mongo_health_check, Trace)))
@ -109,9 +109,9 @@ t_create_invalid_version(_Config) ->
#{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.1">>]}),
fun({error, _}, Trace) ->
?assertEqual(
[failed],
fun(_, Trace) ->
?assertNotEqual(
[ok],
?projection(
status,
?of_kind(emqx_connector_mongo_health_check, Trace)))
@ -128,9 +128,9 @@ t_invalid_ciphers(_Config) ->
<<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.2">>],
<<"ciphers">> => [<<"DHE-RSA-AES256-GCM-SHA384">>]}),
fun({error, _}, Trace) ->
?assertEqual(
[failed],
fun(_, Trace) ->
?assertNotEqual(
[ok],
?projection(
status,
?of_kind(emqx_connector_mongo_health_check, Trace)))
@ -142,7 +142,9 @@ t_invalid_ciphers(_Config) ->
create_mongo_auth_with_ssl_opts(SpecificSSLOpts) ->
AuthConfig = raw_mongo_auth_config(SpecificSSLOpts),
emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, AuthConfig}).
Res = emqx:update_config(?PATH, {create_authenticator, ?GLOBAL, AuthConfig}),
timer:sleep(500),
Res.
raw_mongo_auth_config(SpecificSSLOpts) ->
SSLOpts = maps:merge(

View File

@ -28,6 +28,7 @@
-define(MYSQL_RESOURCE, <<"emqx_authn_mysql_SUITE">>).
-define(PATH, [authentication]).
-define(ResourceID, <<"password-based:mysql">>).
all() ->
[{group, require_seeds}, t_create, t_create_invalid].
@ -61,7 +62,8 @@ init_per_suite(Config) ->
?MYSQL_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_mysql,
mysql_config()),
mysql_config(),
#{waiting_connect_complete => 5000}),
Config;
false ->
{skip, no_mysql}
@ -86,7 +88,8 @@ t_create(_Config) ->
?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}),
{ok, [#{provider := emqx_authn_mysql}]} = emqx_authentication:list_authenticators(?GLOBAL).
{ok, [#{provider := emqx_authn_mysql}]} = emqx_authentication:list_authenticators(?GLOBAL),
emqx_authn_test_lib:delete_config(?ResourceID).
t_create_invalid(_Config) ->
AuthConfig = raw_mysql_auth_config(),
@ -101,11 +104,11 @@ t_create_invalid(_Config) ->
lists:foreach(
fun(Config) ->
{error, _} = emqx:update_config(
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, Config}),
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL)
emqx_authn_test_lib:delete_config(?ResourceID),
{ok, _} = emqx_authentication:list_authenticators(?GLOBAL)
end,
InvalidConfigs).

View File

@ -27,6 +27,7 @@
-define(MYSQL_HOST, "mysql-tls").
-define(PATH, [authentication]).
-define(ResourceID, <<"password-based:mysql">>).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -80,22 +81,22 @@ t_create_invalid(_Config) ->
%% invalid server_name
?assertMatch(
{error, _},
{ok, _},
create_mysql_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server-unknown-host">>,
<<"verify">> => <<"verify_peer">>})),
emqx_authn_test_lib:delete_config(?ResourceID),
%% incompatible versions
?assertMatch(
{error, _},
{ok, _},
create_mysql_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.1">>]})),
emqx_authn_test_lib:delete_config(?ResourceID),
%% incompatible ciphers
?assertMatch(
{error, _},
{ok, _},
create_mysql_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>,

View File

@ -27,6 +27,7 @@
-define(PGSQL_HOST, "pgsql").
-define(PGSQL_RESOURCE, <<"emqx_authn_pgsql_SUITE">>).
-define(ResourceID, <<"password-based:postgresql">>).
-define(PATH, [authentication]).
@ -62,7 +63,8 @@ init_per_suite(Config) ->
?PGSQL_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_pgsql,
pgsql_config()),
pgsql_config(),
#{waiting_connect_complete => 5000}),
Config;
false ->
{skip, no_pgsql}
@ -87,7 +89,8 @@ t_create(_Config) ->
?PATH,
{create_authenticator, ?GLOBAL, AuthConfig}),
{ok, [#{provider := emqx_authn_pgsql}]} = emqx_authentication:list_authenticators(?GLOBAL).
{ok, [#{provider := emqx_authn_pgsql}]} = emqx_authentication:list_authenticators(?GLOBAL),
emqx_authn_test_lib:delete_config(?ResourceID).
t_create_invalid(_Config) ->
AuthConfig = raw_pgsql_auth_config(),
@ -102,10 +105,10 @@ t_create_invalid(_Config) ->
lists:foreach(
fun(Config) ->
{error, _} = emqx:update_config(
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, Config}),
emqx_authn_test_lib:delete_config(?ResourceID),
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL)
end,
InvalidConfigs).

View File

@ -27,6 +27,7 @@
-define(PGSQL_HOST, "pgsql-tls").
-define(PATH, [authentication]).
-define(ResourceID, <<"password-based:postgresql">>).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -80,22 +81,22 @@ t_create_invalid(_Config) ->
%% invalid server_name
?assertMatch(
{error, _},
{ok, _},
create_pgsql_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server-unknown-host">>,
<<"verify">> => <<"verify_peer">>})),
emqx_authn_test_lib:delete_config(?ResourceID),
%% incompatible versions
?assertMatch(
{error, _},
{ok, _},
create_pgsql_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>,
<<"versions">> => [<<"tlsv1.1">>]})),
emqx_authn_test_lib:delete_config(?ResourceID),
%% incompatible ciphers
?assertMatch(
{error, _},
{ok, _},
create_pgsql_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server">>,
<<"verify">> => <<"verify_peer">>,

View File

@ -28,6 +28,7 @@
-define(REDIS_RESOURCE, <<"emqx_authn_redis_SUITE">>).
-define(PATH, [authentication]).
-define(ResourceID, <<"password-based:redis">>).
all() ->
[{group, require_seeds}, t_create, t_create_invalid].
@ -61,7 +62,8 @@ init_per_suite(Config) ->
?REDIS_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_redis,
redis_config()),
redis_config(),
#{waiting_connect_complete => 5000}),
Config;
false ->
{skip, no_redis}
@ -91,13 +93,8 @@ t_create(_Config) ->
t_create_invalid(_Config) ->
AuthConfig = raw_redis_auth_config(),
InvalidConfigs =
[
maps:without([server], AuthConfig),
AuthConfig#{server => <<"unknownhost:3333">>},
AuthConfig#{password => <<"wrongpass">>},
AuthConfig#{database => <<"5678">>},
AuthConfig#{
cmd => <<"MGET password_hash:${username} salt:${username}">>},
AuthConfig#{
@ -105,7 +102,6 @@ t_create_invalid(_Config) ->
AuthConfig#{
cmd => <<"HMGET mqtt_user:${username} salt is_superuser">>}
],
lists:foreach(
fun(Config) ->
{error, _} = emqx:update_config(
@ -114,7 +110,25 @@ t_create_invalid(_Config) ->
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL)
end,
InvalidConfigs).
InvalidConfigs),
InvalidConfigs1 =
[
maps:without([server], AuthConfig),
AuthConfig#{server => <<"unknownhost:3333">>},
AuthConfig#{password => <<"wrongpass">>},
AuthConfig#{database => <<"5678">>}
],
lists:foreach(
fun(Config) ->
{ok, _} = emqx:update_config(
?PATH,
{create_authenticator, ?GLOBAL, Config}),
emqx_authn_test_lib:delete_config(?ResourceID),
{ok, []} = emqx_authentication:list_authenticators(?GLOBAL)
end,
InvalidConfigs1).
t_authenticate(_Config) ->
ok = lists:foreach(
@ -270,7 +284,8 @@ user_seeds() ->
},
#{data => #{
password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
password_hash =>
<<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>,
is_superuser => <<"0">>
},
@ -303,7 +318,8 @@ user_seeds() ->
result => {ok,#{is_superuser => false}}
},
#{data => #{
password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
password_hash =>
<<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>,
is_superuser => <<"0">>
},
@ -321,7 +337,8 @@ user_seeds() ->
},
#{data => #{
password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
password_hash =>
<<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>,
is_superuser => <<"0">>
},
@ -339,7 +356,8 @@ user_seeds() ->
},
#{data => #{
password_hash => <<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
password_hash =>
<<"$2b$12$wtY3h20mUjjmeaClpqZVveDWGlHzCGsvuThMlneGHA7wVeFYyns2u">>,
salt => <<"$2b$12$wtY3h20mUjjmeaClpqZVve">>,
is_superuser => <<"0">>
},

View File

@ -77,7 +77,7 @@ t_create(_Config) ->
t_create_invalid(_Config) ->
%% invalid server_name
?assertMatch(
{error, _},
{ok, _},
create_redis_auth_with_ssl_opts(
#{<<"server_name_indication">> => <<"authn-server-unknown-host">>,
<<"verify">> => <<"verify_peer">>,

View File

@ -55,7 +55,8 @@ init(#{query := SQL0} = Source) ->
ResourceID,
?RESOURCE_GROUP,
emqx_connector_pgsql,
Source#{named_queries => #{ResourceID => SQL}}) of
Source#{named_queries => #{ResourceID => SQL}},
#{waiting_connect_complete => 5000}) of
{ok, _} ->
Source#{annotations =>
#{id => ResourceID,

View File

@ -35,7 +35,10 @@
create_resource(Module, Config) ->
ResourceID = make_resource_id(Module),
case emqx_resource:create_local(ResourceID, ?RESOURCE_GROUP, Module, Config) of
case emqx_resource:create_local(ResourceID,
?RESOURCE_GROUP,
Module, Config,
#{waiting_connect_complete => 5000}) of
{ok, already_created} -> {ok, ResourceID};
{ok, _} -> {ok, ResourceID};
{error, Reason} -> {error, Reason}

View File

@ -46,6 +46,7 @@ end_per_suite(_Config) ->
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
ok = stop_apps([emqx_resource]),
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
meck:unload(emqx_resource),
ok.
@ -222,3 +223,6 @@ t_move_source(_) ->
], emqx_authz:lookup()),
ok.
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -44,6 +44,7 @@ end_per_suite(_Config) ->
#{<<"no_match">> => <<"allow">>,
<<"cache">> => #{<<"enable">> => <<"true">>},
<<"sources">> => []}),
ok = stop_apps([emqx_resource, emqx_connector]),
emqx_common_test_helpers:stop_apps([emqx_dashboard, emqx_authz, emqx_conf]),
ok.
@ -131,3 +132,6 @@ auth_header_() ->
Password = <<"public">>,
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -36,6 +36,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok = emqx_authz_test_lib:restore_authorizers(),
ok = stop_apps([emqx_resource, emqx_connector]),
ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
init_per_testcase(_TestCase, Config) ->
@ -128,3 +129,6 @@ setup_config(SpecialParams) ->
emqx_authz_test_lib:setup_config(
raw_file_authz_config(),
SpecialParams).
stop_apps(Apps) ->
lists:foreach(fun application:stop/1, Apps).

View File

@ -44,7 +44,8 @@ init_per_suite(Config) ->
?MYSQL_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_mysql,
mysql_config()),
mysql_config(),
#{waiting_connect_complete => 5000}),
Config;
false ->
{skip, no_mysql}
@ -179,9 +180,9 @@ t_create_invalid(_Config) ->
BadConfig = maps:merge(
raw_mysql_authz_config(),
#{<<"server">> => <<"255.255.255.255:33333">>}),
{error, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]),
{ok, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]),
[] = emqx_authz:lookup().
[_] = emqx_authz:lookup().
t_nonbinary_values(_Config) ->
ClientInfo = #{clientid => clientid,

View File

@ -44,7 +44,8 @@ init_per_suite(Config) ->
?PGSQL_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_pgsql,
pgsql_config()),
pgsql_config(),
#{waiting_connect_complete => 5000}),
Config;
false ->
{skip, no_pgsql}
@ -180,9 +181,9 @@ t_create_invalid(_Config) ->
BadConfig = maps:merge(
raw_pgsql_authz_config(),
#{<<"server">> => <<"255.255.255.255:33333">>}),
{error, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]),
{ok, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]),
[] = emqx_authz:lookup().
[_] = emqx_authz:lookup().
t_nonbinary_values(_Config) ->
ClientInfo = #{clientid => clientid,

View File

@ -45,7 +45,8 @@ init_per_suite(Config) ->
?REDIS_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_redis,
redis_config()),
redis_config(),
#{waiting_connect_complete => 5000}),
Config;
false ->
{skip, no_redis}
@ -151,8 +152,8 @@ t_create_invalid(_Config) ->
lists:foreach(
fun(Config) ->
{error, _} = emqx_authz:update(?CMD_REPLACE, [Config]),
[] = emqx_authz:lookup()
{ok, _} = emqx_authz:update(?CMD_REPLACE, [Config]),
[_] = emqx_authz:lookup()
end,
InvalidConfigs).

View File

@ -224,9 +224,11 @@ create(BridgeId, Conf) ->
create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>,
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf), #{async_create => true}) of
case emqx_resource:create_local(resource_id(Type, Name),
<<"emqx_bridge">>,
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf),
#{waiting_connect_complete => 5000}) of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);
{error, Reason} -> {error, Reason}
@ -271,8 +273,9 @@ recreate(Type, Name) ->
recreate(Type, Name, Conf) ->
emqx_resource:recreate_local(resource_id(Type, Name),
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf),
#{async_create => true}).
emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf),
#{waiting_connect_complete => 5000}).
create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},

View File

@ -164,7 +164,6 @@ t_http_crud_apis(_) ->
BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
%% send an message to emqx and the message should be forwarded to the HTTP server
wait_for_resource_ready(BridgeID, 5),
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert(
@ -214,7 +213,6 @@ t_http_crud_apis(_) ->
}, jsx:decode(Bridge3Str)),
%% send an message to emqx again, check the path has been changed
wait_for_resource_ready(BridgeID, 5),
emqx:publish(emqx_message:make(<<"emqx_http/1">>, Body)),
?assert(
receive
@ -319,14 +317,3 @@ auth_header_() ->
operation_path(Oper, BridgeID) ->
uri(["bridges", BridgeID, "operation", Oper]).
wait_for_resource_ready(InstId, 0) ->
ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]),
ct:fail(wait_resource_timeout);
wait_for_resource_ready(InstId, Retry) ->
case emqx_bridge:lookup(InstId) of
{ok, #{resource_data := #{status := connected}}} -> ok;
_ ->
timer:sleep(100),
wait_for_resource_ready(InstId, Retry-1)
end.

View File

@ -63,25 +63,33 @@ t_lifecycle(_Config) ->
).
perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local(
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State,
status := InitialStatus}} = emqx_resource:create_local(
PoolName,
?CONNECTOR_RESOURCE_GROUP,
?MYSQL_RESOURCE_MOD,
CheckedConfig
CheckedConfig,
#{waiting_connect_complete => 5000}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params_and_timeout())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName,
test_query_with_params_and_timeout())),
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := StoppedStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected),
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
@ -90,11 +98,15 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params_and_timeout())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName,
test_query_with_params_and_timeout())),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),

View File

@ -63,16 +63,22 @@ t_lifecycle(_Config) ->
).
perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local(
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State,
status := InitialStatus}}
= emqx_resource:create_local(
PoolName,
?CONNECTOR_RESOURCE_GROUP,
?PGSQL_RESOURCE_MOD,
CheckedConfig
CheckedConfig,
#{waiting_connect_complete => 5000}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())),
@ -80,7 +86,9 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := StoppedStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected),
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
@ -89,7 +97,10 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())),

View File

@ -78,23 +78,30 @@ t_sentinel_lifecycle(_Config) ->
).
perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
{ok, #{config := CheckedConfig}} = emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local(
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig),
{ok, #{state := #{poolname := ReturnedPoolName} = State,
status := InitialStatus}} = emqx_resource:create_local(
PoolName,
?CONNECTOR_RESOURCE_GROUP,
?REDIS_RESOURCE_MOD,
CheckedConfig
CheckedConfig,
#{waiting_connect_complete => 5000}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
% Perform query as further check that the resource is working as expected
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State,
status := StoppedStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected),
?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
@ -103,7 +110,10 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}}
= emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
% Stop and remove the resource in one go.

View File

@ -30,10 +30,9 @@
}.
-type resource_group() :: binary().
-type create_opts() :: #{
%% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails,
%% the 'status' of the resource will be 'stopped' in this case.
%% Defaults to 'false'
async_create => boolean()
health_check_interval => integer(),
health_check_timeout => integer(),
waiting_connect_complete => integer()
}.
-type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
undefined.

View File

@ -60,7 +60,7 @@
-export([ restart/1 %% restart the instance.
, restart/2
, health_check/1 %% verify if the resource is working normally
, set_resource_status_disconnected/1 %% set resource status to disconnected
, set_resource_status_connecting/1 %% set resource status to disconnected
, stop/1 %% stop the instance
, query/2 %% query the instance
, query/3 %% query the instance with after_query()
@ -147,7 +147,11 @@ create(InstId, Group, ResourceType, Config, Opts) ->
create_local(InstId, Group, ResourceType, Config) ->
create_local(InstId, Group, ResourceType, Config, #{}).
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
-spec create_local(instance_id(),
resource_group(),
resource_type(),
resource_config(),
create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create_local(InstId, Group, ResourceType, Config, Opts) ->
call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}).
@ -225,10 +229,11 @@ stop(InstId) ->
health_check(InstId) ->
call_instance(InstId, {health_check, InstId}).
set_resource_status_disconnected(InstId) ->
call_instance(InstId, {set_resource_status_disconnected, InstId}).
set_resource_status_connecting(InstId) ->
call_instance(InstId, {set_resource_status_connecting, InstId}).
-spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
-spec get_instance(instance_id()) ->
{ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(InstId) ->
emqx_resource_instance:lookup(InstId).
@ -273,35 +278,54 @@ call_stop(InstId, Mod, ResourceState) ->
check_config(ResourceType, Conf) ->
emqx_hocon:check(ResourceType, Conf).
-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config()) ->
-spec check_and_create(instance_id(),
resource_group(),
resource_type(),
raw_resource_config()) ->
{ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, Group, ResourceType, RawConfig) ->
check_and_create(InstId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config(), create_opts()) ->
-spec check_and_create(instance_id(),
resource_group(),
resource_type(),
raw_resource_config(),
create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, Group, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end).
-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config()) ->
-spec check_and_create_local(instance_id(),
resource_group(),
resource_type(),
raw_resource_config()) ->
{ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, Group, ResourceType, RawConfig) ->
check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config(),
-spec check_and_create_local(instance_id(),
resource_group(),
resource_type(),
raw_resource_config(),
create_opts()) -> {ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end).
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
-spec check_and_recreate(instance_id(),
resource_type(),
raw_resource_config(),
create_opts()) ->
{ok, resource_data()} | {error, term()}.
check_and_recreate(InstId, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end).
-spec check_and_recreate_local(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
-spec check_and_recreate_local(instance_id(),
resource_type(),
raw_resource_config(),
create_opts()) ->
{ok, resource_data()} | {error, term()}.
check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig,

View File

@ -54,7 +54,7 @@ delete_checker(Name) ->
case supervisor:terminate_child(?SUP, ?ID(Name)) of
ok -> supervisor:delete_child(?SUP, ?ID(Name));
Error -> Error
end.
end.
start_health_check(Name, Sleep, Timeout) ->
Pid = self(),
@ -83,7 +83,7 @@ health_check_timeout_checker(Pid, Name, SleepTime, Timeout) ->
after Timeout ->
emqx_alarm:activate(Name, #{name => Name},
<<Name/binary, " health check timeout">>),
emqx_resource:set_resource_status_disconnected(Name),
emqx_resource:set_resource_status_connecting(Name),
receive
health_check_finish -> timer:sleep(SleepTime)
end

View File

@ -90,9 +90,9 @@ list_all() ->
end.
-spec list_group(resource_group()) -> [instance_id()].
list_group(Group) ->
list_group(Group) ->
List = ets:match(emqx_resource_instance, {'$1', Group, '_'}),
lists:map(fun([A|_]) -> A end, List).
lists:map(fun([A | _]) -> A end, List).
%%------------------------------------------------------------------------------
%% gen_server callbacks
@ -126,8 +126,8 @@ handle_call({stop, InstId}, _From, State) ->
handle_call({health_check, InstId}, _From, State) ->
{reply, do_health_check(InstId), State};
handle_call({set_resource_status_disconnected, InstId}, _From, State) ->
{reply, do_set_resource_status_disconnected(InstId), State};
handle_call({set_resource_status_connecting, InstId}, _From, State) ->
{reply, do_set_resource_status_connecting(InstId), State};
handle_call(Req, _From, State) ->
logger:error("Received unexpected call: ~p", [Req]),
@ -178,6 +178,16 @@ do_recreate(InstId, ResourceType, NewConfig, Opts) ->
{error, not_found}
end.
wait_for_resource_ready(InstId, 0) ->
force_lookup(InstId);
wait_for_resource_ready(InstId, Retry) ->
case force_lookup(InstId) of
#{status := connected} = Data -> Data;
_ ->
timer:sleep(100),
wait_for_resource_ready(InstId, Retry-1)
end.
do_create(InstId, Group, ResourceType, Config, Opts) ->
case lookup(InstId) of
{ok,_, _} ->
@ -187,7 +197,8 @@ do_create(InstId, Group, ResourceType, Config, Opts) ->
ok ->
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
[matched, success, failed, exception], [matched]),
{ok, force_lookup(InstId)};
WaitTime = maps:get(waiting_connect_complete , Opts, 0),
{ok, wait_for_resource_ready(InstId, WaitTime div 100)};
Error ->
Error
end
@ -238,28 +249,17 @@ do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
status => connecting, state => undefined},
%% The `emqx_resource:call_start/3` need the instance exist beforehand
ets:insert(emqx_resource_instance, {InstId, Group, InitData}),
case maps:get(async_create, Opts, false) of
false ->
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData);
true ->
spawn(fun() ->
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
end),
ok
end.
spawn(fun() ->
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
end),
ok.
start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->
case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState} ->
Data2 = Data#{state => ResourceState},
Data2 = Data#{state => ResourceState, status => connected},
ets:insert(emqx_resource_instance, {InstId, Group, Data2}),
case maps:get(async_create, Opts, false) of
false -> case do_health_check(Group, Data2) of
ok -> create_default_checker(InstId, Opts);
{error, Reason} -> {error, Reason}
end;
true -> create_default_checker(InstId, Opts)
end;
create_default_checker(InstId, Opts);
{error, Reason} ->
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}}),
{error, Reason}
@ -295,15 +295,15 @@ do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Da
{error, Reason, ResourceState1} ->
logger:error("health check for ~p failed: ~p", [InstId, Reason]),
ets:insert(emqx_resource_instance,
{InstId, Group, Data#{status => disconnected, state => ResourceState1}}),
{InstId, Group, Data#{status => connecting, state => ResourceState1}}),
{error, Reason}
end.
do_set_resource_status_disconnected(InstId) ->
do_set_resource_status_connecting(InstId) ->
case emqx_resource_instance:lookup(InstId) of
{ok, Group, #{id := InstId} = Data} ->
logger:error("health check for ~p failed: timeout", [InstId]),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => disconnected}});
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => connecting}});
Error -> {error, Error}
end.

View File

@ -107,7 +107,7 @@ t_create_remove_local(_) ->
?assert(is_process_alive(Pid)),
emqx_resource:set_resource_status_disconnected(?ID),
emqx_resource:set_resource_status_connecting(?ID),
emqx_resource:recreate_local(
?ID,
@ -153,7 +153,7 @@ t_healthy_timeout(_) ->
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => <<"test_resource">>},
#{async_create => true, health_check_timeout => 200}),
#{health_check_timeout => 200}),
timer:sleep(500),
ok = emqx_resource:remove_local(?ID).
@ -163,14 +163,13 @@ t_healthy(_) ->
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => <<"test_resource">>},
#{async_create => true}),
#{name => <<"test_resource">>}),
timer:sleep(400),
emqx_resource_health_check:create_checker(?ID, 15000, 10000),
#{pid := Pid} = emqx_resource:query(?ID, get_state),
timer:sleep(300),
emqx_resource:set_resource_status_disconnected(?ID),
emqx_resource:set_resource_status_connecting(?ID),
ok = emqx_resource:health_check(?ID),
@ -185,7 +184,7 @@ t_healthy(_) ->
emqx_resource:health_check(?ID)),
?assertMatch(
[#{status := disconnected}],
[#{status := connecting}],
emqx_resource:list_instances_verbose()),
ok = emqx_resource:remove_local(?ID).
@ -222,6 +221,8 @@ t_stop_start(_) ->
ok = emqx_resource:restart(?ID),
timer:sleep(300),
#{pid := Pid1} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid1)).

View File

@ -360,7 +360,8 @@ create_resource(Context, #{type := DB} = Config) ->
ResourceID,
<<"emqx_retainer">>,
list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
Config) of
Config,
#{waiting_connect_complete => 5000}) of
{ok, already_created} ->
Context#{resource_id => ResourceID};
{ok, _} ->