diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl index 1fdd7cef7..fe0349b4a 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl @@ -35,18 +35,17 @@ callback_mode() -> always_sync. on_start(InstId, Opts) -> - PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolOpts = [ {pool_size, maps:get(pool_size, Opts, ?DEFAULT_POOL_SIZE)}, {connector_opts, Opts} ], - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, PoolOpts) of - ok -> {ok, #{pool_name => PoolName}}; + case emqx_resource_pool:start(InstId, ?MODULE, PoolOpts) of + ok -> {ok, #{pool_name => InstId}}; {error, Reason} -> {error, Reason} end. on_stop(_InstId, #{pool_name := PoolName}) -> - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). on_query(InstId, get_jwks, #{pool_name := PoolName}) -> Result = ecpool:pick_and_do(PoolName, {emqx_authn_jwks_client, get_jwks, []}, no_handover), @@ -72,18 +71,17 @@ on_query(_InstId, {update, Opts}, #{pool_name := PoolName}) -> ok. on_get_status(_InstId, #{pool_name := PoolName}) -> - Func = - fun(Conn) -> - case emqx_authn_jwks_client:get_jwks(Conn) of - {ok, _} -> true; - _ -> false - end - end, - case emqx_plugin_libs_pool:health_check_ecpool_workers(PoolName, Func) of + case emqx_resource_pool:health_check_workers(PoolName, fun health_check/1) of true -> connected; false -> disconnected end. +health_check(Conn) -> + case emqx_authn_jwks_client:get_jwks(Conn) of + {ok, _} -> true; + _ -> false + end. + connect(Opts) -> ConnectorOpts = proplists:get_value(connector_opts, Opts), emqx_authn_jwks_client:start_link(ConnectorOpts). diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index ef2e11eb7..411df7899 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -231,9 +231,8 @@ on_start( {transport_opts, NTransportOpts}, {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)} ], - PoolName = emqx_plugin_libs_pool:pool_name(InstId), State = #{ - pool_name => PoolName, + pool_name => InstId, pool_type => PoolType, host => Host, port => Port, @@ -241,7 +240,7 @@ on_start( base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - case ehttpc_sup:start_pool(PoolName, PoolOpts) of + case ehttpc_sup:start_pool(InstId, PoolOpts) of {ok, _} -> {ok, State}; {error, {already_started, _}} -> {ok, State}; {error, Reason} -> {error, Reason} diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index ac2af301e..e2121de22 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -87,20 +87,19 @@ on_start( {pool_size, PoolSize}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} ], - PoolName = emqx_plugin_libs_pool:pool_name(InstId), - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts) of - ok -> {ok, #{poolname => PoolName}}; + case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of + ok -> {ok, #{pool_name => InstId}}; {error, Reason} -> {error, Reason} end. -on_stop(InstId, #{poolname := PoolName}) -> +on_stop(InstId, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping_ldap_connector", connector => InstId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). -on_query(InstId, {search, Base, Filter, Attributes}, #{poolname := PoolName} = State) -> +on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) -> Request = {Base, Filter, Attributes}, ?TRACE( "QUERY", diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index a5873bcf6..a65a32842 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -182,12 +182,11 @@ on_start( {options, init_topology_options(maps:to_list(Topology), [])}, {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)} ], - PoolName = emqx_plugin_libs_pool:pool_name(InstId), Collection = maps:get(collection, Config, <<"mqtt">>), - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of + case emqx_resource_pool:start(InstId, ?MODULE, Opts) of ok -> {ok, #{ - poolname => PoolName, + pool_name => InstId, type => Type, collection => Collection }}; @@ -195,17 +194,17 @@ on_start( {error, Reason} end. -on_stop(InstId, #{poolname := PoolName}) -> +on_stop(InstId, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping_mongodb_connector", connector => InstId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). on_query( InstId, {send_message, Document}, - #{poolname := PoolName, collection := Collection} = State + #{pool_name := PoolName, collection := Collection} = State ) -> Request = {insert, Collection, Document}, ?TRACE( @@ -234,7 +233,7 @@ on_query( on_query( InstId, {Action, Collection, Filter, Projector}, - #{poolname := PoolName} = State + #{pool_name := PoolName} = State ) -> Request = {Action, Collection, Filter, Projector}, ?TRACE( @@ -263,8 +262,7 @@ on_query( {ok, Result} end. --dialyzer({nowarn_function, [on_get_status/2]}). -on_get_status(InstId, #{poolname := PoolName} = _State) -> +on_get_status(InstId, #{pool_name := PoolName}) -> case health_check(PoolName) of true -> ?tp(debug, emqx_connector_mongo_health_check, #{ @@ -281,8 +279,10 @@ on_get_status(InstId, #{poolname := PoolName} = _State) -> end. health_check(PoolName) -> - emqx_plugin_libs_pool:health_check_ecpool_workers( - PoolName, fun ?MODULE:check_worker_health/1, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1) + emqx_resource_pool:health_check_workers( + PoolName, + fun ?MODULE:check_worker_health/1, + ?HEALTH_CHECK_TIMEOUT + timer:seconds(1) ). %% =================================================================== diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 6600a5f77..45d459e70 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -51,7 +51,7 @@ -type sqls() :: #{atom() => binary()}. -type state() :: #{ - poolname := atom(), + pool_name := binary(), prepare_statement := prepares(), params_tokens := params_tokens(), batch_inserts := sqls(), @@ -123,13 +123,10 @@ on_start( {pool_size, PoolSize} ] ), - - PoolName = emqx_plugin_libs_pool:pool_name(InstId), - Prepares = parse_prepare_sql(Config), - State = maps:merge(#{poolname => PoolName}, Prepares), - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of + State = parse_prepare_sql(Config), + case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State)}; + {ok, init_prepare(State#{pool_name => InstId})}; {error, Reason} -> ?tp( mysql_connector_start_failed, @@ -143,12 +140,12 @@ maybe_add_password_opt(undefined, Options) -> maybe_add_password_opt(Password, Options) -> [{password, Password} | Options]. -on_stop(InstId, #{poolname := PoolName}) -> +on_stop(InstId, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping_mysql_connector", connector => InstId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). on_query(InstId, {TypeOrKey, SQLOrKey}, State) -> on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State); @@ -157,7 +154,7 @@ on_query(InstId, {TypeOrKey, SQLOrKey, Params}, State) -> on_query( InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, - #{poolname := PoolName, prepare_statement := Prepares} = State + #{pool_name := PoolName, prepare_statement := Prepares} = State ) -> MySqlFunction = mysql_function(TypeOrKey), {SQLOrKey2, Data} = proc_sql_params(TypeOrKey, SQLOrKey, Params, State), @@ -216,8 +213,8 @@ mysql_function(prepared_query) -> mysql_function(_) -> mysql_function(prepared_query). -on_get_status(_InstId, #{poolname := Pool} = State) -> - case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of +on_get_status(_InstId, #{pool_name := PoolName} = State) -> + case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of true -> case do_check_prepares(State) of ok -> @@ -238,7 +235,7 @@ do_get_status(Conn) -> do_check_prepares(#{prepare_statement := Prepares}) when is_map(Prepares) -> ok; -do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, Prepares}}) -> +do_check_prepares(State = #{pool_name := PoolName, prepare_statement := {error, Prepares}}) -> %% retry to prepare case prepare_sql(Prepares, PoolName) of ok -> @@ -253,7 +250,7 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P connect(Options) -> mysql:start_link(Options). -init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) -> +init_prepare(State = #{prepare_statement := Prepares, pool_name := PoolName}) -> case maps:size(Prepares) of 0 -> State; @@ -409,7 +406,7 @@ on_sql_query( SQLOrKey, Params, Timeout, - #{poolname := PoolName} = State + #{pool_name := PoolName} = State ) -> LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 8796e00a5..ddbf9491d 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -56,7 +56,7 @@ -type state() :: #{ - poolname := atom(), + pool_name := binary(), prepare_sql := prepares(), params_tokens := params_tokens(), prepare_statement := epgsql:statement() @@ -120,13 +120,10 @@ on_start( {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize} ], - PoolName = emqx_plugin_libs_pool:pool_name(InstId), - Prepares = parse_prepare_sql(Config), - InitState = #{poolname => PoolName, prepare_statement => #{}}, - State = maps:merge(InitState, Prepares), - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of + State = parse_prepare_sql(Config), + case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State)}; + {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {error, Reason} -> ?tp( pgsql_connector_start_failed, @@ -135,19 +132,19 @@ on_start( {error, Reason} end. -on_stop(InstId, #{poolname := PoolName}) -> +on_stop(InstId, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping postgresql connector", connector => InstId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). -on_query(InstId, {TypeOrKey, NameOrSQL}, #{poolname := _PoolName} = State) -> +on_query(InstId, {TypeOrKey, NameOrSQL}, State) -> on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); on_query( InstId, {TypeOrKey, NameOrSQL, Params}, - #{poolname := PoolName} = State + #{pool_name := PoolName} = State ) -> ?SLOG(debug, #{ msg => "postgresql connector received sql query", @@ -174,7 +171,7 @@ pgsql_query_type(_) -> on_batch_query( InstId, BatchReq, - #{poolname := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State + #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State ) -> case BatchReq of [{Key, _} = Request | _] -> @@ -258,8 +255,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> {error, {unrecoverable_error, invalid_request}} end. -on_get_status(_InstId, #{poolname := Pool} = State) -> - case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of +on_get_status(_InstId, #{pool_name := PoolName} = State) -> + case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of true -> case do_check_prepares(State) of ok -> @@ -280,7 +277,7 @@ do_get_status(Conn) -> do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) -> ok; -do_check_prepares(State = #{poolname := PoolName, prepare_sql := {error, Prepares}}) -> +do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) -> %% retry to prepare case prepare_sql(Prepares, PoolName) of {ok, Sts} -> @@ -358,7 +355,7 @@ parse_prepare_sql([], Prepares, Tokens) -> params_tokens => Tokens }. -init_prepare(State = #{prepare_sql := Prepares, poolname := PoolName}) -> +init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) -> case maps:size(Prepares) of 0 -> State; @@ -389,17 +386,17 @@ prepare_sql(Prepares, PoolName) -> end. do_prepare_sql(Prepares, PoolName) -> - do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}). + do_prepare_sql(ecpool:workers(PoolName), Prepares, #{}). -do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> +do_prepare_sql([{_Name, Worker} | T], Prepares, _LastSts) -> {ok, Conn} = ecpool_worker:client(Worker), case prepare_sql_to_conn(Conn, Prepares) of {ok, Sts} -> - do_prepare_sql(T, Prepares, PoolName, Sts); + do_prepare_sql(T, Prepares, Sts); Error -> Error end; -do_prepare_sql([], _Prepares, _PoolName, LastSts) -> +do_prepare_sql([], _Prepares, LastSts) -> {ok, LastSts}. prepare_sql_to_conn(Conn, Prepares) -> diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 4ef778e6b..e2155eb49 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -153,11 +153,10 @@ on_start( false -> [{ssl, false}] end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], - PoolName = InstId, - State = #{poolname => PoolName, type => Type}, + State = #{pool_name => InstId, type => Type}, case Type of cluster -> - case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of + case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of {ok, _} -> {ok, State}; {ok, _, _} -> @@ -166,22 +165,20 @@ on_start( {error, Reason} end; _ -> - case - emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ [{options, Options}]) - of + case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ [{options, Options}]) of ok -> {ok, State}; {error, Reason} -> {error, Reason} end end. -on_stop(InstId, #{poolname := PoolName, type := Type}) -> +on_stop(InstId, #{pool_name := PoolName, type := Type}) -> ?SLOG(info, #{ msg => "stopping_redis_connector", connector => InstId }), case Type of cluster -> eredis_cluster:stop_pool(PoolName); - _ -> emqx_plugin_libs_pool:stop_pool(PoolName) + _ -> emqx_resource_pool:stop(PoolName) end. on_query(InstId, {cmd, _} = Query, State) -> @@ -189,7 +186,7 @@ on_query(InstId, {cmd, _} = Query, State) -> on_query(InstId, {cmds, _} = Query, State) -> do_query(InstId, Query, State). -do_query(InstId, Query, #{poolname := PoolName, type := Type} = State) -> +do_query(InstId, Query, #{pool_name := PoolName, type := Type} = State) -> ?TRACE( "QUERY", "redis_connector_received", @@ -227,7 +224,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) -> is_unrecoverable_error(_) -> false. -on_get_status(_InstId, #{type := cluster, poolname := PoolName}) -> +on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) -> case eredis_cluster:pool_exists(PoolName) of true -> Health = eredis_cluster:ping_all(PoolName), @@ -235,8 +232,8 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName}) -> false -> disconnected end; -on_get_status(_InstId, #{poolname := Pool}) -> - Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), +on_get_status(_InstId, #{pool_name := PoolName}) -> + Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), status_result(Health). do_get_status(Conn) -> diff --git a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl index 2be30466c..9067c85de 100644 --- a/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl @@ -64,15 +64,15 @@ t_lifecycle(_Config) -> mongo_config() ). -perform_lifecycle_check(PoolName, InitialConfig) -> +perform_lifecycle_check(ResourceId, InitialConfig) -> {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?MONGO_RESOURCE_MOD, InitialConfig), {ok, #{ - state := #{poolname := ReturnedPoolName} = State, + state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - PoolName, + ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?MONGO_RESOURCE_MOD, CheckedConfig, @@ -84,39 +84,39 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := State, status := InitialStatus }} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), % % Perform query as further check that the resource is working as expected - ?assertMatch({ok, []}, emqx_resource:query(PoolName, test_query_find())), - ?assertMatch({ok, undefined}, emqx_resource:query(PoolName, test_query_find_one())), - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertMatch({ok, []}, emqx_resource:query(ResourceId, test_query_find())), + ?assertMatch({ok, undefined}, emqx_resource:query(ResourceId, test_query_find_one())), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. {ok, ?CONNECTOR_RESOURCE_GROUP, #{ state := State, status := StoppedStatus }} = - emqx_resource:get_instance(PoolName), + emqx_resource:get_instance(ResourceId), ?assertEqual(stopped, StoppedStatus), - ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)), + ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Can call stop/1 again on an already stopped instance - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName)), + ?assertEqual(ok, emqx_resource:restart(ResourceId)), % async restart, need to wait resource timer:sleep(500), {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch({ok, []}, emqx_resource:query(PoolName, test_query_find())), - ?assertMatch({ok, undefined}, emqx_resource:query(PoolName, test_query_find_one())), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), + ?assertMatch({ok, []}, emqx_resource:query(ResourceId, test_query_find())), + ?assertMatch({ok, undefined}, emqx_resource:query(ResourceId, test_query_find_one())), % Stop and remove the resource in one go. - ?assertEqual(ok, emqx_resource:remove_local(PoolName)), - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual(ok, emqx_resource:remove_local(ResourceId)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Should not even be able to get the resource data out of ets now unlike just stopping. - ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + ?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)). % %%------------------------------------------------------------------------------ % %% Helpers diff --git a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl index dc5826766..a0455c92c 100644 --- a/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl @@ -64,14 +64,14 @@ t_lifecycle(_Config) -> mysql_config() ). -perform_lifecycle_check(PoolName, InitialConfig) -> +perform_lifecycle_check(ResourceId, InitialConfig) -> {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?MYSQL_RESOURCE_MOD, InitialConfig), {ok, #{ - state := #{poolname := ReturnedPoolName} = State, + state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - PoolName, + ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?MYSQL_RESOURCE_MOD, CheckedConfig, @@ -83,53 +83,53 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := State, status := InitialStatus }} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), % % Perform query as further check that the resource is working as expected - ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), - ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())), + ?assertMatch({ok, _, [[1]]}, emqx_resource:query(ResourceId, test_query_no_params())), + ?assertMatch({ok, _, [[1]]}, emqx_resource:query(ResourceId, test_query_with_params())), ?assertMatch( {ok, _, [[1]]}, emqx_resource:query( - PoolName, + ResourceId, test_query_with_params_and_timeout() ) ), - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. {ok, ?CONNECTOR_RESOURCE_GROUP, #{ state := State, status := StoppedStatus }} = - emqx_resource:get_instance(PoolName), + emqx_resource:get_instance(ResourceId), ?assertEqual(stopped, StoppedStatus), - ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)), + ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Can call stop/1 again on an already stopped instance - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName)), + ?assertEqual(ok, emqx_resource:restart(ResourceId)), % async restart, need to wait resource timer:sleep(500), {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())), - ?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), + ?assertMatch({ok, _, [[1]]}, emqx_resource:query(ResourceId, test_query_no_params())), + ?assertMatch({ok, _, [[1]]}, emqx_resource:query(ResourceId, test_query_with_params())), ?assertMatch( {ok, _, [[1]]}, emqx_resource:query( - PoolName, + ResourceId, test_query_with_params_and_timeout() ) ), % Stop and remove the resource in one go. - ?assertEqual(ok, emqx_resource:remove_local(PoolName)), - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual(ok, emqx_resource:remove_local(ResourceId)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Should not even be able to get the resource data out of ets now unlike just stopping. - ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + ?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)). % %%------------------------------------------------------------------------------ % %% Helpers diff --git a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl index 2f77ca38d..a4ac4f932 100644 --- a/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl @@ -64,15 +64,15 @@ t_lifecycle(_Config) -> pgsql_config() ). -perform_lifecycle_check(PoolName, InitialConfig) -> +perform_lifecycle_check(ResourceId, InitialConfig) -> {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?PGSQL_RESOURCE_MOD, InitialConfig), {ok, #{ - state := #{poolname := ReturnedPoolName} = State, + state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - PoolName, + ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?PGSQL_RESOURCE_MOD, CheckedConfig, @@ -84,39 +84,39 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := State, status := InitialStatus }} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), % % Perform query as further check that the resource is working as expected - ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())), - ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())), - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertMatch({ok, _, [{1}]}, emqx_resource:query(ResourceId, test_query_no_params())), + ?assertMatch({ok, _, [{1}]}, emqx_resource:query(ResourceId, test_query_with_params())), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. {ok, ?CONNECTOR_RESOURCE_GROUP, #{ state := State, status := StoppedStatus }} = - emqx_resource:get_instance(PoolName), + emqx_resource:get_instance(ResourceId), ?assertEqual(stopped, StoppedStatus), - ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)), + ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Can call stop/1 again on an already stopped instance - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName)), + ?assertEqual(ok, emqx_resource:restart(ResourceId)), % async restart, need to wait resource timer:sleep(500), {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())), - ?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), + ?assertMatch({ok, _, [{1}]}, emqx_resource:query(ResourceId, test_query_no_params())), + ?assertMatch({ok, _, [{1}]}, emqx_resource:query(ResourceId, test_query_with_params())), % Stop and remove the resource in one go. - ?assertEqual(ok, emqx_resource:remove_local(PoolName)), - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual(ok, emqx_resource:remove_local(ResourceId)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Should not even be able to get the resource data out of ets now unlike just stopping. - ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + ?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)). % %%------------------------------------------------------------------------------ % %% Helpers diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl index 3a134ad35..e6df4f711 100644 --- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -102,14 +102,14 @@ t_sentinel_lifecycle(_Config) -> [<<"PING">>] ). -perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> +perform_lifecycle_check(ResourceId, InitialConfig, RedisCommand) -> {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig), {ok, #{ - state := #{poolname := ReturnedPoolName} = State, + state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - PoolName, + ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?REDIS_RESOURCE_MOD, CheckedConfig, @@ -121,49 +121,49 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> state := State, status := InitialStatus }} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), % Perform query as further check that the resource is working as expected - ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), + ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(ResourceId, {cmd, RedisCommand})), ?assertEqual( {ok, [{ok, <<"PONG">>}, {ok, <<"PONG">>}]}, - emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]}) + emqx_resource:query(ResourceId, {cmds, [RedisCommand, RedisCommand]}) ), ?assertMatch( {error, {unrecoverable_error, [{ok, <<"PONG">>}, {error, _}]}}, emqx_resource:query( - PoolName, + ResourceId, {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]}, #{timeout => 500} ) ), - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. {ok, ?CONNECTOR_RESOURCE_GROUP, #{ state := State, status := StoppedStatus }} = - emqx_resource:get_instance(PoolName), + emqx_resource:get_instance(ResourceId), ?assertEqual(stopped, StoppedStatus), - ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)), + ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Can call stop/1 again on an already stopped instance - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName)), + ?assertEqual(ok, emqx_resource:restart(ResourceId)), % async restart, need to wait resource timer:sleep(500), {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), + ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(ResourceId, {cmd, RedisCommand})), % Stop and remove the resource in one go. - ?assertEqual(ok, emqx_resource:remove_local(PoolName)), - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual(ok, emqx_resource:remove_local(ResourceId)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Should not even be able to get the resource data out of ets now unlike just stopping. - ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + ?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)). % %%------------------------------------------------------------------------------ % %% Helpers diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_resource/src/emqx_resource_pool.erl similarity index 82% rename from apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl rename to apps/emqx_resource/src/emqx_resource_pool.erl index a3048e5dd..913b29c86 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_resource/src/emqx_resource_pool.erl @@ -14,31 +14,27 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_plugin_libs_pool). +-module(emqx_resource_pool). -export([ - start_pool/3, - stop_pool/1, - pool_name/1, - health_check_ecpool_workers/2, - health_check_ecpool_workers/3 + start/3, + stop/1, + health_check_workers/2, + health_check_workers/3 ]). -include_lib("emqx/include/logger.hrl"). -define(HEALTH_CHECK_TIMEOUT, 15000). -pool_name(ID) when is_binary(ID) -> - list_to_atom(binary_to_list(ID)). - -start_pool(Name, Mod, Options) -> +start(Name, Mod, Options) -> case ecpool:start_sup_pool(Name, Mod, Options) of {ok, _} -> ?SLOG(info, #{msg => "start_ecpool_ok", pool_name => Name}), ok; {error, {already_started, _Pid}} -> - stop_pool(Name), - start_pool(Name, Mod, Options); + stop(Name), + start(Name, Mod, Options); {error, Reason} -> NReason = parse_reason(Reason), ?SLOG(error, #{ @@ -49,7 +45,7 @@ start_pool(Name, Mod, Options) -> {error, {start_pool_failed, Name, NReason}} end. -stop_pool(Name) -> +stop(Name) -> case ecpool:stop_sup_pool(Name) of ok -> ?SLOG(info, #{msg => "stop_ecpool_ok", pool_name => Name}); @@ -64,10 +60,10 @@ stop_pool(Name) -> error({stop_pool_failed, Name, Reason}) end. -health_check_ecpool_workers(PoolName, CheckFunc) -> - health_check_ecpool_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT). +health_check_workers(PoolName, CheckFunc) -> + health_check_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT). -health_check_ecpool_workers(PoolName, CheckFunc, Timeout) -> +health_check_workers(PoolName, CheckFunc, Timeout) -> Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], DoPerWorker = fun(Worker) -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index a785924d4..d2eb9ee73 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -917,7 +917,7 @@ t_invalid_private_key(Config) -> #{<<"private_key">> => InvalidPrivateKeyPEM} } ), - #{?snk_kind := gcp_pubsub_bridge_jwt_worker_failed_to_start}, + #{?snk_kind := "gcp_pubsub_bridge_jwt_worker_failed_to_start"}, 20_000 ), Res @@ -928,7 +928,7 @@ t_invalid_private_key(Config) -> [#{reason := Reason}] when Reason =:= noproc orelse Reason =:= {shutdown, {error, empty_key}}, - ?of_kind(gcp_pubsub_bridge_jwt_worker_failed_to_start, Trace) + ?of_kind("gcp_pubsub_bridge_jwt_worker_failed_to_start", Trace) ), ?assertMatch( [#{error := empty_key}], @@ -956,14 +956,14 @@ t_jwt_worker_start_timeout(Config) -> #{<<"private_key">> => InvalidPrivateKeyPEM} } ), - #{?snk_kind := gcp_pubsub_bridge_jwt_timeout}, + #{?snk_kind := "gcp_pubsub_bridge_jwt_timeout"}, 20_000 ), Res end, fun(Res, Trace) -> ?assertMatch({ok, _}, Res), - ?assertMatch([_], ?of_kind(gcp_pubsub_bridge_jwt_timeout, Trace)), + ?assertMatch([_], ?of_kind("gcp_pubsub_bridge_jwt_timeout", Trace)), ok end ), @@ -1329,7 +1329,7 @@ t_failed_to_start_jwt_worker(Config) -> fun(Trace) -> ?assertMatch( [#{reason := {error, restarting}}], - ?of_kind(gcp_pubsub_bridge_jwt_worker_failed_to_start, Trace) + ?of_kind("gcp_pubsub_bridge_jwt_worker_failed_to_start", Trace) ), ok end diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index be07a2bb7..e497e0a47 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -265,7 +265,7 @@ unprepare(Config, Key) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID), + {ok, _, #{state := #{pool_name := PoolName}}} = emqx_resource:get_instance(ResourceID), [ begin {ok, Conn} = ecpool_worker:client(Worker), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl index 86b908038..397532f47 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl @@ -44,7 +44,7 @@ -type state() :: #{ - poolname := atom(), + pool_name := binary(), prepare_cql := prepares(), params_tokens := params_tokens(), %% returned by ecql:prepare/2 @@ -124,14 +124,10 @@ on_start( false -> [] end, - %% use InstaId of binary type as Pool name, which is supported in ecpool. - PoolName = InstId, - Prepares = parse_prepare_cql(Config), - InitState = #{poolname => PoolName, prepare_statement => #{}}, - State = maps:merge(InitState, Prepares), - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options ++ SslOpts) of + State = parse_prepare_cql(Config), + case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> - {ok, init_prepare(State)}; + {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {error, Reason} -> ?tp( cassandra_connector_start_failed, @@ -140,12 +136,12 @@ on_start( {error, Reason} end. -on_stop(InstId, #{poolname := PoolName}) -> +on_stop(InstId, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping_cassandra_connector", connector => InstId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). -type request() :: % emqx_bridge.erl @@ -184,7 +180,7 @@ do_single_query( InstId, Request, Async, - #{poolname := PoolName} = State + #{pool_name := PoolName} = State ) -> {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request), ?tp( @@ -232,7 +228,7 @@ do_batch_query( InstId, Requests, Async, - #{poolname := PoolName} = State + #{pool_name := PoolName} = State ) -> CQLs = lists:map( @@ -305,8 +301,8 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) -> Result end. -on_get_status(_InstId, #{poolname := Pool} = State) -> - case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of +on_get_status(_InstId, #{pool_name := PoolName} = State) -> + case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of true -> case do_check_prepares(State) of ok -> @@ -327,7 +323,7 @@ do_get_status(Conn) -> do_check_prepares(#{prepare_cql := Prepares}) when is_map(Prepares) -> ok; -do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepares}}) -> +do_check_prepares(State = #{pool_name := PoolName, prepare_cql := {error, Prepares}}) -> %% retry to prepare case prepare_cql(Prepares, PoolName) of {ok, Sts} -> @@ -397,7 +393,7 @@ parse_prepare_cql([], Prepares, Tokens) -> params_tokens => Tokens }. -init_prepare(State = #{prepare_cql := Prepares, poolname := PoolName}) -> +init_prepare(State = #{prepare_cql := Prepares, pool_name := PoolName}) -> case maps:size(Prepares) of 0 -> State; @@ -429,17 +425,17 @@ prepare_cql(Prepares, PoolName) -> end. do_prepare_cql(Prepares, PoolName) -> - do_prepare_cql(ecpool:workers(PoolName), Prepares, PoolName, #{}). + do_prepare_cql(ecpool:workers(PoolName), Prepares, #{}). -do_prepare_cql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) -> +do_prepare_cql([{_Name, Worker} | T], Prepares, _LastSts) -> {ok, Conn} = ecpool_worker:client(Worker), case prepare_cql_to_conn(Conn, Prepares) of {ok, Sts} -> - do_prepare_cql(T, Prepares, PoolName, Sts); + do_prepare_cql(T, Prepares, Sts); Error -> Error end; -do_prepare_cql([], _Prepares, _PoolName, LastSts) -> +do_prepare_cql([], _Prepares, LastSts) -> {ok, LastSts}. prepare_cql_to_conn(Conn, Prepares) -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl index 2872e8cf0..a7afcd6d5 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl @@ -62,7 +62,8 @@ -type state() :: #{ templates := templates(), - poolname := atom() + pool_name := binary(), + connect_timeout := pos_integer() }. -type clickhouse_config() :: map(). @@ -141,7 +142,6 @@ on_start( connector => InstanceID, config => emqx_utils:redact(Config) }), - PoolName = emqx_plugin_libs_pool:pool_name(InstanceID), Options = [ {url, URL}, {user, maps:get(username, Config, "default")}, @@ -149,46 +149,43 @@ on_start( {database, DB}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, {pool_size, PoolSize}, - {pool, PoolName} + {pool, InstanceID} ], - InitState = #{ - poolname => PoolName, - connect_timeout => ConnectTimeout - }, try Templates = prepare_sql_templates(Config), - State = maps:merge(InitState, #{templates => Templates}), - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of + State = #{ + pool_name => InstanceID, + templates => Templates, + connect_timeout => ConnectTimeout + }, + case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of ok -> {ok, State}; {error, Reason} -> - log_start_error(Config, Reason, none), + ?tp( + info, + "clickhouse_connector_start_failed", + #{ + error => Reason, + config => emqx_utils:redact(Config) + } + ), {error, Reason} end catch _:CatchReason:Stacktrace -> - log_start_error(Config, CatchReason, Stacktrace), + ?tp( + info, + "clickhouse_connector_start_failed", + #{ + error => CatchReason, + stacktrace => Stacktrace, + config => emqx_utils:redact(Config) + } + ), {error, CatchReason} end. -log_start_error(Config, Reason, Stacktrace) -> - StacktraceMap = - case Stacktrace of - none -> #{}; - _ -> #{stacktrace => Stacktrace} - end, - LogMessage = - #{ - msg => "clickhouse_connector_start_failed", - error_reason => Reason, - config => emqx_utils:redact(Config) - }, - ?SLOG(info, maps:merge(LogMessage, StacktraceMap)), - ?tp( - clickhouse_connector_start_failed, - #{error => Reason} - ). - %% Helper functions to prepare SQL tempaltes prepare_sql_templates(#{ @@ -240,7 +237,7 @@ split_clickhouse_insert_sql(SQL) -> end. % This is a callback for ecpool which is triggered by the call to -% emqx_plugin_libs_pool:start_pool in on_start/2 +% emqx_resource_pool:start in on_start/2 connect(Options) -> URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))), @@ -277,23 +274,20 @@ connect(Options) -> -spec on_stop(resource_id(), resource_state()) -> term(). -on_stop(ResourceID, #{poolname := PoolName}) -> +on_stop(InstanceID, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping clickouse connector", - connector => ResourceID + connector => InstanceID }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). %% ------------------------------------------------------------------- %% on_get_status emqx_resouce callback and related functions %% ------------------------------------------------------------------- on_get_status( - _InstId, - #{ - poolname := PoolName, - connect_timeout := Timeout - } = State + _InstanceID, + #{pool_name := PoolName, connect_timeout := Timeout} = State ) -> case do_get_status(PoolName, Timeout) of ok -> @@ -352,7 +346,7 @@ do_get_status(PoolName, Timeout) -> on_query( ResourceID, {RequestType, DataOrSQL}, - #{poolname := PoolName} = State + #{pool_name := PoolName} = State ) -> ?SLOG(debug, #{ msg => "clickhouse connector received sql query", @@ -391,16 +385,11 @@ query_type(send_message) -> on_batch_query( ResourceID, BatchReq, - State + #{pool_name := PoolName, templates := Templates} = _State ) -> %% Currently we only support batch requests with the send_message key {Keys, ObjectsToInsert} = lists:unzip(BatchReq), ensure_keys_are_of_type_send_message(Keys), - %% Pick out the SQL template - #{ - templates := Templates, - poolname := PoolName - } = State, %% Create batch insert SQL statement SQL = objects_to_sql(ObjectsToInsert, Templates), %% Do the actual query in the database diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index 85daefbb7..1d273cdd7 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -114,23 +114,23 @@ on_start( Templates = parse_template(Config), State = #{ - poolname => InstanceId, + pool_name => InstanceId, database => Database, templates => Templates }, - case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of + case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; Error -> Error end. -on_stop(InstanceId, #{poolname := PoolName} = _State) -> +on_stop(InstanceId, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping_dynamo_connector", connector => InstanceId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). on_query(InstanceId, Query, State) -> do_query(InstanceId, Query, handover, State). @@ -160,8 +160,8 @@ on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State) on_batch_query_async(_InstanceId, Query, _Reply, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. -on_get_status(_InstanceId, #{poolname := Pool}) -> - Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), +on_get_status(_InstanceId, #{pool_name := PoolName}) -> + Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), status_result(Health). do_get_status(_Conn) -> @@ -183,7 +183,7 @@ do_query( InstanceId, Query, ApplyMode, - #{poolname := PoolName, templates := Templates, database := Database} = State + #{pool_name := PoolName, templates := Templates, database := Database} = State ) -> ?TRACE( "QUERY", diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 2295f63ab..7b068ec8f 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -26,7 +26,6 @@ ]). -export([reply_delegator/3]). --type bridge_id() :: binary(). -type jwt_worker() :: binary(). -type service_account_json() :: emqx_ee_bridge_gcp_pubsub:service_account_json(). -type config() :: #{ @@ -43,7 +42,7 @@ jwt_worker_id := jwt_worker(), max_retries := non_neg_integer(), payload_template := emqx_plugin_libs_rule:tmpl_token(), - pool_name := atom(), + pool_name := binary(), project_id := binary(), pubsub_topic := binary(), request_timeout := timer:time() @@ -102,14 +101,13 @@ on_start( jwt_worker_id := JWTWorkerId, project_id := ProjectId } = ensure_jwt_worker(InstanceId, Config), - PoolName = emqx_plugin_libs_pool:pool_name(InstanceId), State = #{ connect_timeout => ConnectTimeout, instance_id => InstanceId, jwt_worker_id => JWTWorkerId, max_retries => MaxRetries, payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), - pool_name => PoolName, + pool_name => InstanceId, project_id => ProjectId, pubsub_topic => PubSubTopic, request_timeout => RequestTimeout @@ -118,20 +116,20 @@ on_start( gcp_pubsub_on_start_before_starting_pool, #{ instance_id => InstanceId, - pool_name => PoolName, + pool_name => InstanceId, pool_opts => PoolOpts } ), - ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => PoolName}), - case ehttpc_sup:start_pool(PoolName, PoolOpts) of + ?tp(gcp_pubsub_starting_ehttpc_pool, #{pool_name => InstanceId}), + case ehttpc_sup:start_pool(InstanceId, PoolOpts) of {ok, _} -> {ok, State}; {error, {already_started, _}} -> - ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => PoolName}), + ?tp(gcp_pubsub_ehttpc_pool_already_started, #{pool_name => InstanceId}), {ok, State}; {error, Reason} -> ?tp(gcp_pubsub_ehttpc_pool_start_failure, #{ - pool_name => PoolName, + pool_name => InstanceId, reason => Reason }), {error, Reason} @@ -140,10 +138,7 @@ on_start( -spec on_stop(manager_id(), state()) -> ok | {error, term()}. on_stop( InstanceId, - _State = #{ - jwt_worker_id := JWTWorkerId, - pool_name := PoolName - } + _State = #{jwt_worker_id := JWTWorkerId, pool_name := PoolName} ) -> ?tp(gcp_pubsub_stop, #{instance_id => InstanceId, jwt_worker_id => JWTWorkerId}), ?SLOG(info, #{ @@ -155,7 +150,7 @@ on_stop( ehttpc_sup:stop_pool(PoolName). -spec on_query( - bridge_id(), + resource_id(), {send_message, map()}, state() ) -> @@ -163,32 +158,32 @@ on_stop( | {ok, status_code(), headers(), body()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(BridgeId, {send_message, Selected}, State) -> +on_query(ResourceId, {send_message, Selected}, State) -> Requests = [{send_message, Selected}], ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", - #{requests => Requests, connector => BridgeId, state => State} + #{requests => Requests, connector => ResourceId, state => State} ), - do_send_requests_sync(State, Requests, BridgeId). + do_send_requests_sync(State, Requests, ResourceId). -spec on_query_async( - bridge_id(), + resource_id(), {send_message, map()}, {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()}. -on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) -> +on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) -> Requests = [{send_message, Selected}], ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", - #{requests => Requests, connector => BridgeId, state => State} + #{requests => Requests, connector => ResourceId, state => State} ), - do_send_requests_async(State, Requests, ReplyFunAndArgs, BridgeId). + do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). -spec on_batch_query( - bridge_id(), + resource_id(), [{send_message, map()}], state() ) -> @@ -196,34 +191,30 @@ on_query_async(BridgeId, {send_message, Selected}, ReplyFunAndArgs, State) -> | {ok, status_code(), headers(), body()} | {error, {recoverable_error, term()}} | {error, term()}. -on_batch_query(BridgeId, Requests, State) -> +on_batch_query(ResourceId, Requests, State) -> ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", - #{requests => Requests, connector => BridgeId, state => State} + #{requests => Requests, connector => ResourceId, state => State} ), - do_send_requests_sync(State, Requests, BridgeId). + do_send_requests_sync(State, Requests, ResourceId). -spec on_batch_query_async( - bridge_id(), + resource_id(), [{send_message, map()}], {ReplyFun :: function(), Args :: list()}, state() ) -> {ok, pid()}. -on_batch_query_async(BridgeId, Requests, ReplyFunAndArgs, State) -> +on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", - #{requests => Requests, connector => BridgeId, state => State} + #{requests => Requests, connector => ResourceId, state => State} ), - do_send_requests_async(State, Requests, ReplyFunAndArgs, BridgeId). + do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId). -spec on_get_status(manager_id(), state()) -> connected | disconnected. -on_get_status(InstanceId, State) -> - #{ - connect_timeout := Timeout, - pool_name := PoolName - } = State, +on_get_status(InstanceId, #{connect_timeout := Timeout, pool_name := PoolName} = State) -> case do_get_status(InstanceId, PoolName, Timeout) of true -> connected; @@ -245,8 +236,7 @@ on_get_status(InstanceId, State) -> project_id := binary() }. ensure_jwt_worker(InstanceId, #{ - service_account_json := ServiceAccountJSON, - pubsub_topic := PubSubTopic + service_account_json := ServiceAccountJSON }) -> #{ project_id := ProjectId, @@ -276,14 +266,8 @@ ensure_jwt_worker(InstanceId, #{ {ok, Worker0} -> Worker0; Error -> - ?tp( - gcp_pubsub_bridge_jwt_worker_failed_to_start, - #{instance_id => InstanceId, reason => Error} - ), - ?SLOG(error, #{ - msg => "failed_to_start_gcp_pubsub_jwt_worker", - instance_id => InstanceId, - pubsub_topic => PubSubTopic, + ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ + connector => InstanceId, reason => Error }), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), @@ -301,26 +285,14 @@ ensure_jwt_worker(InstanceId, #{ demonitor(MRef, [flush]), ok; {'DOWN', MRef, process, Worker, Reason} -> - ?tp( - gcp_pubsub_bridge_jwt_worker_failed_to_start, - #{ - resource_id => InstanceId, - reason => Reason - } - ), - ?SLOG(error, #{ - msg => "gcp_pubsub_bridge_jwt_worker_failed_to_start", + ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ connector => InstanceId, reason => Reason }), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), throw(failed_to_start_jwt_worker) after 10_000 -> - ?tp(gcp_pubsub_bridge_jwt_timeout, #{resource_id => InstanceId}), - ?SLOG(warning, #{ - msg => "gcp_pubsub_bridge_jwt_timeout", - connector => InstanceId - }), + ?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => InstanceId}), demonitor(MRef, [flush]), _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), throw(timeout_creating_jwt) @@ -569,7 +541,7 @@ reply_delegator(_ResourceId, ReplyFunAndArgs, Result) -> emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) end. --spec do_get_status(manager_id(), atom(), timer:time()) -> boolean(). +-spec do_get_status(manager_id(), binary(), timer:time()) -> boolean(). do_get_status(InstanceId, PoolName, Timeout) -> Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)], DoPerWorker = diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl index f11441a3b..70bd76d14 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sqlserver.erl @@ -126,7 +126,7 @@ %% -type size() :: integer(). -type state() :: #{ - poolname := binary(), + pool_name := binary(), resource_opts := map(), sql_templates := map() }. @@ -208,17 +208,16 @@ on_start( {password, Password}, {driver, Driver}, {database, Database}, - {pool_size, PoolSize}, - {poolname, PoolName} + {pool_size, PoolSize} ], State = #{ %% also InstanceId - poolname => PoolName, + pool_name => PoolName, sql_templates => parse_sql_template(Config), resource_opts => ResourceOpts }, - case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of + case emqx_resource_pool:start(PoolName, ?MODULE, Options) of ok -> {ok, State}; {error, Reason} -> @@ -229,12 +228,12 @@ on_start( {error, Reason} end. -on_stop(InstanceId, #{poolname := PoolName} = _State) -> +on_stop(InstanceId, #{pool_name := PoolName} = _State) -> ?SLOG(info, #{ msg => "stopping_sqlserver_connector", connector => InstanceId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). -spec on_query( manager_id(), @@ -265,7 +264,6 @@ on_query_async( InstanceId, {?ACTION_SEND_MESSAGE, _Msg} = Query, ReplyFunAndArgs, - %% #{poolname := PoolName, sql_templates := Templates} = State State ) -> ?TRACE( @@ -306,10 +304,12 @@ on_batch_query_async(InstanceId, Requests, ReplyFunAndArgs, State) -> ), do_query(InstanceId, Requests, ?ASYNC_QUERY_MODE(ReplyFunAndArgs), State). -on_get_status(_InstanceId, #{poolname := Pool, resource_opts := ResourceOpts} = _State) -> +on_get_status(_InstanceId, #{pool_name := PoolName, resource_opts := ResourceOpts} = _State) -> RequestTimeout = ?REQUEST_TIMEOUT(ResourceOpts), - Health = emqx_plugin_libs_pool:health_check_ecpool_workers( - Pool, {?MODULE, do_get_status, [RequestTimeout]}, RequestTimeout + Health = emqx_resource_pool:health_check_workers( + PoolName, + {?MODULE, do_get_status, [RequestTimeout]}, + RequestTimeout ), status_result(Health). @@ -382,7 +382,7 @@ do_query( InstanceId, Query, ApplyMode, - #{poolname := PoolName, sql_templates := Templates} = State + #{pool_name := PoolName, sql_templates := Templates} = State ) -> ?TRACE( "SINGLE_QUERY_SYNC", @@ -425,7 +425,7 @@ do_query( end. worker_do_insert( - Conn, SQL, #{resource_opts := ResourceOpts, poolname := InstanceId} = State + Conn, SQL, #{resource_opts := ResourceOpts, pool_name := InstanceId} = State ) -> LogMeta = #{connector => InstanceId, state => State}, try diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl index 9b6718882..f9ca21ad7 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl @@ -107,20 +107,20 @@ on_start( ], Prepares = parse_prepare_sql(Config), - State = maps:merge(Prepares, #{poolname => InstanceId, query_opts => query_opts(Config)}), - case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of + State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)}, + case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; Error -> Error end. -on_stop(InstanceId, #{poolname := PoolName} = _State) -> +on_stop(InstanceId, #{pool_name := PoolName}) -> ?SLOG(info, #{ msg => "stopping_tdengine_connector", connector => InstanceId }), - emqx_plugin_libs_pool:stop_pool(PoolName). + emqx_resource_pool:stop(PoolName). on_query(InstanceId, {query, SQL}, State) -> do_query(InstanceId, SQL, State); @@ -150,8 +150,8 @@ on_batch_query( {error, {unrecoverable_error, invalid_request}} end. -on_get_status(_InstanceId, #{poolname := Pool}) -> - Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), +on_get_status(_InstanceId, #{pool_name := PoolName}) -> + Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1), status_result(Health). do_get_status(Conn) -> @@ -171,7 +171,7 @@ do_batch_insert(InstanceId, BatchReqs, InsertPart, Tokens, State) -> SQL = emqx_plugin_libs_rule:proc_batch_sql(BatchReqs, InsertPart, Tokens), do_query(InstanceId, SQL, State). -do_query(InstanceId, Query, #{poolname := PoolName, query_opts := Opts} = State) -> +do_query(InstanceId, Query, #{pool_name := PoolName, query_opts := Opts} = State) -> ?TRACE( "QUERY", "tdengine_connector_received", diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl index 52ed03a62..f2647d756 100644 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_cassa_SUITE.erl @@ -101,15 +101,15 @@ show(Label, What) -> erlang:display({Label, What}), What. -perform_lifecycle_check(PoolName, InitialConfig) -> +perform_lifecycle_check(ResourceId, InitialConfig) -> {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?CASSANDRA_RESOURCE_MOD, InitialConfig), {ok, #{ - state := #{poolname := ReturnedPoolName} = State, + state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - PoolName, + ResourceId, ?CONNECTOR_RESOURCE_GROUP, ?CASSANDRA_RESOURCE_MOD, CheckedConfig, @@ -121,45 +121,45 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := State, status := InitialStatus }} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), % % Perform query as further check that the resource is working as expected (fun() -> - erlang:display({pool_name, PoolName}), - QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()), + erlang:display({pool_name, ResourceId}), + QueryNoParamsResWrapper = emqx_resource:query(ResourceId, test_query_no_params()), ?assertMatch({ok, _}, QueryNoParamsResWrapper) end)(), - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. {ok, ?CONNECTOR_RESOURCE_GROUP, #{ state := State, status := StoppedStatus }} = - emqx_resource:get_instance(PoolName), + emqx_resource:get_instance(ResourceId), ?assertEqual(stopped, StoppedStatus), - ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)), + ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceId)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Can call stop/1 again on an already stopped instance - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceId)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName)), + ?assertEqual(ok, emqx_resource:restart(ResourceId)), % async restart, need to wait resource timer:sleep(500), {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + emqx_resource:get_instance(ResourceId), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceId)), (fun() -> QueryNoParamsResWrapper = - emqx_resource:query(PoolName, test_query_no_params()), + emqx_resource:query(ResourceId, test_query_no_params()), ?assertMatch({ok, _}, QueryNoParamsResWrapper) end)(), % Stop and remove the resource in one go. - ?assertEqual(ok, emqx_resource:remove_local(PoolName)), - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual(ok, emqx_resource:remove_local(ResourceId)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Should not even be able to get the resource data out of ets now unlike just stopping. - ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + ?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceId)). %%-------------------------------------------------------------------- %% utils diff --git a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_clickhouse_SUITE.erl b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_clickhouse_SUITE.erl index 73018e14f..e704a2c0c 100644 --- a/lib-ee/emqx_ee_connector/test/emqx_ee_connector_clickhouse_SUITE.erl +++ b/lib-ee/emqx_ee_connector/test/emqx_ee_connector_clickhouse_SUITE.erl @@ -95,15 +95,15 @@ show(Label, What) -> erlang:display({Label, What}), What. -perform_lifecycle_check(PoolName, InitialConfig) -> +perform_lifecycle_check(ResourceID, InitialConfig) -> {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?CLICKHOUSE_RESOURCE_MOD, InitialConfig), {ok, #{ - state := #{poolname := ReturnedPoolName} = State, + state := #{pool_name := PoolName} = State, status := InitialStatus }} = emqx_resource:create_local( - PoolName, + ResourceID, ?CONNECTOR_RESOURCE_GROUP, ?CLICKHOUSE_RESOURCE_MOD, CheckedConfig, @@ -115,49 +115,49 @@ perform_lifecycle_check(PoolName, InitialConfig) -> state := State, status := InitialStatus }} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + emqx_resource:get_instance(ResourceID), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)), % % Perform query as further check that the resource is working as expected (fun() -> - erlang:display({pool_name, PoolName}), - QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()), + erlang:display({pool_name, ResourceID}), + QueryNoParamsResWrapper = emqx_resource:query(ResourceID, test_query_no_params()), ?assertMatch({ok, _}, QueryNoParamsResWrapper), {_, QueryNoParamsRes} = QueryNoParamsResWrapper, ?assertMatch(<<"1">>, string:trim(QueryNoParamsRes)) end)(), - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceID)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. {ok, ?CONNECTOR_RESOURCE_GROUP, #{ state := State, status := StoppedStatus }} = - emqx_resource:get_instance(PoolName), + emqx_resource:get_instance(ResourceID), ?assertEqual(stopped, StoppedStatus), - ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)), + ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(ResourceID)), % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Can call stop/1 again on an already stopped instance - ?assertEqual(ok, emqx_resource:stop(PoolName)), + ?assertEqual(ok, emqx_resource:stop(ResourceID)), % Make sure it can be restarted and the healthchecks and queries work properly - ?assertEqual(ok, emqx_resource:restart(PoolName)), + ?assertEqual(ok, emqx_resource:restart(ResourceID)), % async restart, need to wait resource timer:sleep(500), {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = - emqx_resource:get_instance(PoolName), - ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + emqx_resource:get_instance(ResourceID), + ?assertEqual({ok, connected}, emqx_resource:health_check(ResourceID)), (fun() -> QueryNoParamsResWrapper = - emqx_resource:query(PoolName, test_query_no_params()), + emqx_resource:query(ResourceID, test_query_no_params()), ?assertMatch({ok, _}, QueryNoParamsResWrapper), {_, QueryNoParamsRes} = QueryNoParamsResWrapper, ?assertMatch(<<"1">>, string:trim(QueryNoParamsRes)) end)(), % Stop and remove the resource in one go. - ?assertEqual(ok, emqx_resource:remove_local(PoolName)), - ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + ?assertEqual(ok, emqx_resource:remove_local(ResourceID)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(PoolName)), % Should not even be able to get the resource data out of ets now unlike just stopping. - ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + ?assertEqual({error, not_found}, emqx_resource:get_instance(ResourceID)). % %%------------------------------------------------------------------------------ % %% Helpers