diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index 98ba587e8..e53d57365 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -132,6 +132,7 @@ on_start( [] end, State = parse_prepare_cql(Config), + ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; @@ -143,12 +144,17 @@ on_start( {error, Reason} end. -on_stop(InstId, #{pool_name := PoolName}) -> +on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_cassandra_connector", connector => InstId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. -type request() :: % emqx_bridge.erl diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index aefd9112f..6df7fb1ac 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -158,6 +158,7 @@ on_start( templates => Templates, connect_timeout => ConnectTimeout }, + ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID), case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of ok -> {ok, State}; @@ -274,12 +275,17 @@ connect(Options) -> -spec on_stop(resource_id(), resource_state()) -> term(). -on_stop(InstanceID, #{pool_name := PoolName}) -> +on_stop(InstanceID, _State) -> ?SLOG(info, #{ msg => "stopping clickouse connector", connector => InstanceID }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstanceID) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. %% ------------------------------------------------------------------- %% on_get_status emqx_resouce callback and related functions diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index 86e816c5d..7ce2c71f2 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -104,6 +104,7 @@ on_start( table => Table, templates => Templates }, + ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId), case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -111,12 +112,17 @@ on_start( Error end. -on_stop(InstanceId, #{pool_name := PoolName}) -> +on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_dynamo_connector", connector => InstanceId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query(InstanceId, Query, State) -> do_query(InstanceId, Query, State). diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index e49f552e9..d7151b889 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -78,6 +78,7 @@ on_start( State = #{pool_name => InstanceId, server => Server}, case opentsdb_connectivity(Server) of ok -> + ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId), case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -89,12 +90,17 @@ on_start( Error end. -on_stop(InstanceId, #{pool_name := PoolName} = _State) -> +on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_opents_connector", connector => InstanceId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query(InstanceId, Request, State) -> on_batch_query(InstanceId, [Request], State). diff --git a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl index cbdcbc845..2d26fd175 100644 --- a/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl +++ b/apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl @@ -242,6 +242,7 @@ on_start( %% Already initialized ok end, + ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID), case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of ok -> {ok, State}; @@ -267,6 +268,14 @@ on_stop( msg => "stopping RabbitMQ connector", connector => ResourceID }), + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + stop_clients_and_pool(PoolName); + _ -> + ok + end. + +stop_clients_and_pool(PoolName) -> Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], Clients = [ begin diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 52bd910db..d40cbd69b 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -215,6 +215,7 @@ on_start( sql_templates => parse_sql_template(Config), resource_opts => ResourceOpts }, + ok = emqx_resource:allocate_resource(InstanceId, pool_name, PoolName), case emqx_resource_pool:start(PoolName, ?MODULE, Options) of ok -> {ok, State}; @@ -231,7 +232,12 @@ on_stop(ResourceId, _State) -> msg => "stopping_sqlserver_connector", connector => ResourceId }), - emqx_resource_pool:stop(ResourceId). + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. -spec on_query( resource_id(),