diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index cebf60cb1..a3e4f1eb3 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_impl diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index f19fc0839..5d214850d 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -23,35 +23,35 @@ scheme := http | https, host := iolist(), port := inet:port_number(), - path := '_' + path := _ }, connect_timeout := pos_integer(), pool_type := random | hash, pool_size := pos_integer(), - request := undefined | map(), - is_aligned := boolean(), - iotdb_version := binary(), - device_id := binary() | undefined, - atom() => '_' + request => undefined | map(), + is_aligned => boolean(), + iotdb_version => binary(), + device_id => binary() | undefined, + atom() => _ }. -type state() :: #{ - base_path := '_', + base_path := _, base_url := #{ scheme := http | https, host := iolist(), port := inet:port_number(), - path := '_' + path := _ }, connect_timeout := pos_integer(), pool_type := random | hash, pool_size := pos_integer(), - request := undefined | map(), - is_aligned := boolean(), - iotdb_version := binary(), - device_id := binary() | undefined, - atom() => '_' + request => undefined | map(), + is_aligned => boolean(), + iotdb_version => binary(), + device_id => binary() | undefined, + atom() => _ }. -type manager_id() :: binary(). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 60a512011..86787b33b 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -83,6 +83,7 @@ start_ingress(ResourceId, Ingress, ClientOpts) -> {ingress, Ingress}, {client_opts, ClientOpts} ], + ok = emqx_resource:allocate_resource(ResourceId, ingress_pool_name, PoolName), case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of ok -> {ok, #{ingress_pool_name => PoolName}}; @@ -132,6 +133,7 @@ start_egress(ResourceId, Egress, ClientOpts) -> {pool_size, PoolSize}, {client_opts, ClientOpts} ], + ok = emqx_resource:allocate_resource(ResourceId, egress_pool_name, PoolName), case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of ok -> {ok, #{ @@ -142,13 +144,14 @@ start_egress(ResourceId, Egress, ClientOpts) -> {error, Reason} end. -on_stop(ResourceId, State) -> +on_stop(ResourceId, _State) -> ?SLOG(info, #{ msg => "stopping_mqtt_connector", connector => ResourceId }), - ok = stop_ingress(State), - ok = stop_egress(State). + Allocated = emqx_resource:get_allocated_resources(ResourceId), + ok = stop_ingress(Allocated), + ok = stop_egress(Allocated). stop_ingress(#{ingress_pool_name := PoolName}) -> emqx_resource_pool:stop(PoolName); diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src index 321a2b724..97d8ff2e5 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_tdengine, [ {description, "EMQX Enterprise TDEngine Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [kernel, stdlib, tdengine]}, {env, []}, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 5644d0446..25fa3a84d 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -108,6 +108,7 @@ on_start( Prepares = parse_prepare_sql(Config), State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)}, + ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId), case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> {ok, State}; @@ -115,12 +116,17 @@ on_start( Error end. -on_stop(InstanceId, #{pool_name := PoolName}) -> +on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_tdengine_connector", connector => InstanceId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstanceId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query(InstanceId, {query, SQL}, State) -> do_query(InstanceId, SQL, State); diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 7c95febe8..ef4224592 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -219,18 +219,24 @@ on_start( base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, + ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case ehttpc_sup:start_pool(InstId, PoolOpts) of {ok, _} -> {ok, State}; {error, {already_started, _}} -> {ok, State}; {error, Reason} -> {error, Reason} end. -on_stop(InstId, #{pool_name := PoolName}) -> +on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_http_connector", connector => InstId }), - ehttpc_sup:stop_pool(PoolName). + case emqx_resource:get_allocated_resources(InstId) of + #{pool_name := PoolName} -> + ehttpc_sup:stop_pool(PoolName); + _ -> + ok + end. on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index c3e1db7d3..84048901f 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -97,17 +97,23 @@ on_start( {pool_size, PoolSize}, {auto_reconnect, ?AUTO_RECONNECT_INTERVAL} ], + ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of ok -> {ok, #{pool_name => InstId}}; {error, Reason} -> {error, Reason} end. -on_stop(InstId, #{pool_name := PoolName}) -> +on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_ldap_connector", connector => InstId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) -> Request = {Base, Filter, Attributes}, diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index dde8652f0..3657bb74c 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -183,6 +183,7 @@ on_start( {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)} ], Collection = maps:get(collection, Config, <<"mqtt">>), + ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Opts) of ok -> {ok, #{ @@ -194,12 +195,17 @@ on_start( {error, Reason} end. -on_stop(InstId, #{pool_name := PoolName}) -> +on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_mongodb_connector", connector => InstId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query( InstId, diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index b8c1250fe..1263524fc 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -124,6 +124,7 @@ on_start( ] ), State = parse_prepare_sql(Config), + ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State#{pool_name => InstId})}; @@ -140,12 +141,17 @@ maybe_add_password_opt(undefined, Options) -> maybe_add_password_opt(Password, Options) -> [{password, Password} | Options]. -on_stop(InstId, #{pool_name := PoolName}) -> +on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_mysql_connector", connector => InstId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query(InstId, {TypeOrKey, SQLOrKey}, State) -> on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State); diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 3b2375d04..ba80d0c1d 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -121,6 +121,7 @@ on_start( {pool_size, PoolSize} ], State = parse_prepare_sql(Config), + ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of ok -> {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; @@ -132,12 +133,17 @@ on_start( {error, Reason} end. -on_stop(InstId, #{pool_name := PoolName}) -> +on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping postgresql connector", connector => InstId }), - emqx_resource_pool:stop(PoolName). + case emqx_resource:get_allocated_resources(InstId) of + #{pool_name := PoolName} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok + end. on_query(InstId, {TypeOrKey, NameOrSQL}, State) -> on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 32ac77226..e7b474326 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -160,6 +160,8 @@ on_start( [{ssl, false}] end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], State = #{pool_name => InstId, type => Type}, + ok = emqx_resource:allocate_resource(InstId, type, Type), + ok = emqx_resource:allocate_resource(InstId, pool_name, InstId), case Type of cluster -> case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of @@ -177,14 +179,18 @@ on_start( end end. -on_stop(InstId, #{pool_name := PoolName, type := Type}) -> +on_stop(InstId, _State) -> ?SLOG(info, #{ msg => "stopping_redis_connector", connector => InstId }), - case Type of - cluster -> eredis_cluster:stop_pool(PoolName); - _ -> emqx_resource_pool:stop(PoolName) + case emqx_resource:get_allocated_resources(InstId) of + #{pool_name := PoolName, type := cluster} -> + eredis_cluster:stop_pool(PoolName); + #{pool_name := PoolName, type := _} -> + emqx_resource_pool:stop(PoolName); + _ -> + ok end. on_query(InstId, {cmd, _} = Query, State) -> diff --git a/apps/emqx_connector/test/emqx_connector_http_tests.erl b/apps/emqx_connector/test/emqx_connector_http_tests.erl index 8d0fa6d2c..2dc2119f7 100644 --- a/apps/emqx_connector/test/emqx_connector_http_tests.erl +++ b/apps/emqx_connector/test/emqx_connector_http_tests.erl @@ -25,7 +25,8 @@ wrap_auth_headers_test_() -> meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}), meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end), meck:expect(ehttpc_pool, pick_worker, 1, self()), - [ehttpc_sup, ehttpc, ehttpc_pool] + meck:expect(emqx_resource, allocate_resource, 3, ok), + [ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource] end, fun meck:unload/1, fun(_) -> Config = #{ diff --git a/changes/ce/feat-10895.en.md b/changes/ce/feat-10895.en.md new file mode 100644 index 000000000..f990b2e46 --- /dev/null +++ b/changes/ce/feat-10895.en.md @@ -0,0 +1,8 @@ +Refactored some bridges to avoid leaking resources during crashes at creation, including: +- TDEngine +- WebHook +- LDAP +- MongoDB +- MySQL +- PostgreSQL +- Redis