From 3739230435b8415767a472ee49c5dd0306a94290 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 1 Jun 2023 15:20:44 +0800 Subject: [PATCH 1/3] feat: refactor connectors on_stop function to avoid resources leaking Supplement to https://github.com/emqx/emqx/pull/10895 --- .../src/emqx_bridge_cassandra_connector.erl | 10 ++++++++-- .../src/emqx_bridge_clickhouse_connector.erl | 10 ++++++++-- .../src/emqx_bridge_dynamo_connector.erl | 10 ++++++++-- .../src/emqx_bridge_opents_connector.erl | 10 ++++++++-- .../src/emqx_bridge_rabbitmq_connector.erl | 9 +++++++++ .../src/emqx_bridge_sqlserver_connector.erl | 8 +++++++- 6 files changed, 48 insertions(+), 9 deletions(-) diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index 98ba587e8..e53d57365 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -132,6 +132,7 @@ on_start( [] end, State = parse_prepare_cql(Config), + ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; @@ -143,12 +144,17 @@ on_start( {error, Reason} end. -on_stop(InstId, #{pool_name := PoolName}) -> +on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_cassandra_connector", connector => InstId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. -type request() :: % emqx_bridge.erl diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index aefd9112f..6df7fb1ac 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -158,6 +158,7 @@ on_start( templates => Templates, connect_timeout => ConnectTimeout }, + ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID), case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of ok -> {ok, State}; @@ -274,12 +275,17 @@ connect(Options) -> -spec on_stop(resource_id(), resource_state()) -> term(). -on_stop(InstanceID, #{pool_name := PoolName}) -> +on_stop(InstanceID, _State) -> ?SLOG(info, #{ msg => "stopping clickouse connector", connector => InstanceID }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstanceID) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. %% ------------------------------------------------------------------- %% on_get_status emqx_resouce callback and related functions diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 86e816c5d..7ce2c71f2 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -104,6 +104,7 @@ on_start( table => Table, templates => Templates }, + ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId), case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -111,12 +112,17 @@ on_start( Error end. -on_stop(InstanceId, #{pool_name := PoolName}) -> +on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_dynamo_connector", connector => InstanceId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query(InstanceId, Query, State) -> do_query(InstanceId, Query, State). diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index e49f552e9..d7151b889 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -78,6 +78,7 @@ on_start( State = #{pool_name => InstanceId, server => Server}, case opentsdb_connectivity(Server) of ok -> + ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId), case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -89,12 +90,17 @@ on_start( Error end. -on_stop(InstanceId, #{pool_name := PoolName} = _State) -> +on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_opents_connector", connector => InstanceId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query(InstanceId, Request, State) -> on_batch_query(InstanceId, [Request], State). diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index cbdcbc845..2d26fd175 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -242,6 +242,7 @@ on_start( %% Already initialized ok end, + ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID), case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of ok -> {ok, State}; @@ -267,6 +268,14 @@ on_stop( msg => "stopping RabbitMQ connector", connector => ResourceID }), + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + stop_clients_and_pool(PoolName); + _ -> + ok + end. + +stop_clients_and_pool(PoolName) -> Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], Clients = [ begin diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 52bd910db..d40cbd69b 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -215,6 +215,7 @@ on_start( sql_templates => parse_sql_template(Config), resource_opts => ResourceOpts }, + ok = emqx_resource:allocate_resource(InstanceId, pool_name, PoolName), case emqx_resource_pool:start(PoolName, ?MODULE, Options) of ok -> {ok, State}; @@ -231,7 +232,12 @@ on_stop(ResourceId, _State) -> msg => "stopping_sqlserver_connector", connector => ResourceId }), - emqx_resource_pool:stop(ResourceId). + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. -spec on_query( resource_id(), From e717ddafd72b4381583c5abe8076f39fe1974e23 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 1 Jun 2023 15:30:30 +0800 Subject: [PATCH 2/3] chore: update changes --- .../src/emqx_bridge_rabbitmq_connector.erl | 4 ++-- .../src/emqx_bridge_sqlserver_connector.erl | 4 ++-- changes/ce/feat-10895.en.md | 9 +-------- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 2d26fd175..77607f86f 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -262,13 +262,13 @@ on_start( -spec on_stop(resource_id(), resource_state()) -> term(). on_stop( ResourceID, - #{poolname := PoolName} = _State + _State ) -> ?SLOG(info, #{ msg => "stopping RabbitMQ connector", connector => ResourceID }), - case emqx_resource:get_allocated_resources(InstanceId) of + case emqx_resource:get_allocated_resources(ResourceID) of #{pool_name := PoolName} -> stop_clients_and_pool(PoolName); _ -> diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index d40cbd69b..f79c56281 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -215,7 +215,7 @@ on_start( sql_templates => parse_sql_template(Config), resource_opts => ResourceOpts }, - ok = emqx_resource:allocate_resource(InstanceId, pool_name, PoolName), + ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName), case emqx_resource_pool:start(PoolName, ?MODULE, Options) of ok -> {ok, State}; @@ -232,7 +232,7 @@ on_stop(ResourceId, _State) -> msg => "stopping_sqlserver_connector", connector => ResourceId }), - case emqx_resource:get_allocated_resources(InstanceId) of + case emqx_resource:get_allocated_resources(ResourceId) of #{pool_name := PoolName} -> emqx_resource_pool:stop(PoolName); _ -> diff --git a/changes/ce/feat-10895.en.md b/changes/ce/feat-10895.en.md index f990b2e46..b8a5d4e0f 100644 --- a/changes/ce/feat-10895.en.md +++ b/changes/ce/feat-10895.en.md @@ -1,8 +1 @@ -Refactored some bridges to avoid leaking resources during crashes at creation, including: -- TDEngine -- WebHook -- LDAP -- MongoDB -- MySQL -- PostgreSQL -- Redis +Refactored most of the bridges to avoid resource leaks during crashes during creation. From dbc0cdce673a1c607e3d3a5cd6b71b91476e7d3c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 5 Jun 2023 10:53:55 +0800 Subject: [PATCH 3/3] chore: dont allocate resource for simple connectiors --- .../src/emqx_bridge_cassandra_connector.erl | 8 +------- .../src/emqx_bridge_clickhouse_connector.erl | 8 +------- .../src/emqx_bridge_dynamo_connector.erl | 8 +------- .../src/emqx_bridge_opents_connector.erl | 8 +------- .../src/emqx_bridge_rabbitmq_connector.erl | 8 +------- .../src/emqx_bridge_sqlserver_connector.erl | 8 +------- .../src/emqx_bridge_tdengine_connector.erl | 8 +------- apps/emqx_connector/src/emqx_connector_http.erl | 8 +------- apps/emqx_connector/src/emqx_connector_ldap.erl | 8 +------- apps/emqx_connector/src/emqx_connector_mongo.erl | 8 +------- apps/emqx_connector/src/emqx_connector_mysql.erl | 8 +------- apps/emqx_connector/src/emqx_connector_pgsql.erl | 8 +------- changes/{ce => ee}/feat-10895.en.md | 0 13 files changed, 12 insertions(+), 84 deletions(-) rename changes/{ce => ee}/feat-10895.en.md (100%) diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index e53d57365..285825714 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -132,7 +132,6 @@ on_start( [] end, State = parse_prepare_cql(Config), - ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; @@ -149,12 +148,7 @@ on_stop(InstId, _State) -> msg => "stopping_cassandra_connector", connector => InstId }), - case emqx_resource:get_allocated_resources(InstId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstId). -type request() :: % emqx_bridge.erl diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 6df7fb1ac..9f4e5c6b4 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -158,7 +158,6 @@ on_start( templates => Templates, connect_timeout => ConnectTimeout }, - ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID), case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of ok -> {ok, State}; @@ -280,12 +279,7 @@ on_stop(InstanceID, _State) -> msg => "stopping clickouse connector", connector => InstanceID }), - case emqx_resource:get_allocated_resources(InstanceID) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstanceID). %% ------------------------------------------------------------------- %% on_get_status emqx_resouce callback and related functions diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 7ce2c71f2..0cc3f993b 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -104,7 +104,6 @@ on_start( table => Table, templates => Templates }, - ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId), case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -117,12 +116,7 @@ on_stop(InstanceId, _State) -> msg => "stopping_dynamo_connector", connector => InstanceId }), - case emqx_resource:get_allocated_resources(InstanceId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstanceId). on_query(InstanceId, Query, State) -> do_query(InstanceId, Query, State). diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index d7151b889..71184e872 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -78,7 +78,6 @@ on_start( State = #{pool_name => InstanceId, server => Server}, case opentsdb_connectivity(Server) of ok -> - ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId), case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -95,12 +94,7 @@ on_stop(InstanceId, _State) -> msg => "stopping_opents_connector", connector => InstanceId }), - case emqx_resource:get_allocated_resources(InstanceId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstanceId). on_query(InstanceId, Request, State) -> on_batch_query(InstanceId, [Request], State). diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index 77607f86f..ae70cddde 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -242,7 +242,6 @@ on_start( %% Already initialized ok end, - ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID), case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of ok -> {ok, State}; @@ -268,12 +267,7 @@ on_stop( msg => "stopping RabbitMQ connector", connector => ResourceID }), - case emqx_resource:get_allocated_resources(ResourceID) of - #{pool_name := PoolName} -> - stop_clients_and_pool(PoolName); - _ -> - ok - end. + stop_clients_and_pool(ResourceID). stop_clients_and_pool(PoolName) -> Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index f79c56281..52bd910db 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -215,7 +215,6 @@ on_start( sql_templates => parse_sql_template(Config), resource_opts => ResourceOpts }, - ok = emqx_resource:allocate_resource(ResourceId, pool_name, PoolName), case emqx_resource_pool:start(PoolName, ?MODULE, Options) of ok -> {ok, State}; @@ -232,12 +231,7 @@ on_stop(ResourceId, _State) -> msg => "stopping_sqlserver_connector", connector => ResourceId }), - case emqx_resource:get_allocated_resources(ResourceId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(ResourceId). -spec on_query( resource_id(), diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index aed5e5e0a..8fd41443c 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -108,7 +108,6 @@ on_start( Prepares = parse_prepare_sql(Config), State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)}, - ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId), case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -121,12 +120,7 @@ on_stop(InstanceId, _State) -> msg => "stopping_tdengine_connector", connector => InstanceId }), - case emqx_resource:get_allocated_resources(InstanceId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstanceId). on_query(InstanceId, {query, SQL}, State) -> do_query(InstanceId, SQL, State); diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index ef4224592..47e722419 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -219,7 +219,6 @@ on_start( base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case ehttpc_sup:start_pool(InstId, PoolOpts) of {ok, _} -> {ok, State}; {error, {already_started, _}} -> {ok, State}; @@ -231,12 +230,7 @@ on_stop(InstId, _State) -> msg => "stopping_http_connector", connector => InstId }), - case emqx_resource:get_allocated_resources(InstId) of - #{pool_name := PoolName} -> - ehttpc_sup:stop_pool(PoolName); - _ -> - ok - end. + ehttpc_sup:stop_pool(InstId). on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 1d969e6f1..446bd6e0e 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -97,7 +97,6 @@ on_start( {pool_size, PoolSize}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} ], - ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of ok -> {ok, #{pool_name => InstId}}; {error, Reason} -> {error, Reason} @@ -108,12 +107,7 @@ on_stop(InstId, _State) -> msg => "stopping_ldap_connector", connector => InstId }), - case emqx_resource:get_allocated_resources(InstId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstId). on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) -> Request = {Base, Filter, Attributes}, diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 5b63daef3..f827e5abb 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -183,7 +183,6 @@ on_start( {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)} ], Collection = maps:get(collection, Config, <<"mqtt">>), - ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Opts) of ok -> {ok, #{ @@ -200,12 +199,7 @@ on_stop(InstId, _State) -> msg => "stopping_mongodb_connector", connector => InstId }), - case emqx_resource:get_allocated_resources(InstId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstId). on_query( InstId, diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index 1263524fc..25db669ce 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -124,7 +124,6 @@ on_start( ] ), State = parse_prepare_sql(Config), - ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State#{pool_name => InstId})}; @@ -146,12 +145,7 @@ on_stop(InstId, _State) -> msg => "stopping_mysql_connector", connector => InstId }), - case emqx_resource:get_allocated_resources(InstId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstId). on_query(InstId, {TypeOrKey, SQLOrKey}, State) -> on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State); diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index a2c8df3d3..cbe95cb33 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -121,7 +121,6 @@ on_start( {pool_size, PoolSize} ], State = parse_prepare_sql(Config), - ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; @@ -138,12 +137,7 @@ on_stop(InstId, _State) -> msg => "stopping postgresql connector", connector => InstId }), - case emqx_resource:get_allocated_resources(InstId) of - #{pool_name := PoolName} -> - emqx_resource_pool:stop(PoolName); - _ -> - ok - end. + emqx_resource_pool:stop(InstId). on_query(InstId, {TypeOrKey, NameOrSQL}, State) -> on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); diff --git a/changes/ce/feat-10895.en.md b/changes/ee/feat-10895.en.md similarity index 100% rename from changes/ce/feat-10895.en.md rename to changes/ee/feat-10895.en.md