feat: refactor connectors on_stop function to avoid resources leaking

Supplement to https://github.com/emqx/emqx/pull/10895
This commit is contained in:
JianBo He 2023-06-01 15:20:44 +08:00
parent e07c86b6a8
commit 3739230435
6 changed files with 48 additions and 9 deletions

View File

@ -132,6 +132,7 @@ on_start(
[] []
end, end,
State = parse_prepare_cql(Config), State = parse_prepare_cql(Config),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
@ -143,12 +144,17 @@ on_start(
{error, Reason} {error, Reason}
end. end.
on_stop(InstId, #{pool_name := PoolName}) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_cassandra_connector", msg => "stopping_cassandra_connector",
connector => InstId connector => InstId
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
-type request() :: -type request() ::
% emqx_bridge.erl % emqx_bridge.erl

View File

@ -158,6 +158,7 @@ on_start(
templates => Templates, templates => Templates,
connect_timeout => ConnectTimeout connect_timeout => ConnectTimeout
}, },
ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID),
case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -274,12 +275,17 @@ connect(Options) ->
-spec on_stop(resource_id(), resource_state()) -> term(). -spec on_stop(resource_id(), resource_state()) -> term().
on_stop(InstanceID, #{pool_name := PoolName}) -> on_stop(InstanceID, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping clickouse connector", msg => "stopping clickouse connector",
connector => InstanceID connector => InstanceID
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstanceID) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% on_get_status emqx_resouce callback and related functions %% on_get_status emqx_resouce callback and related functions

View File

@ -104,6 +104,7 @@ on_start(
table => Table, table => Table,
templates => Templates templates => Templates
}, },
ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId),
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -111,12 +112,17 @@ on_start(
Error Error
end. end.
on_stop(InstanceId, #{pool_name := PoolName}) -> on_stop(InstanceId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_dynamo_connector", msg => "stopping_dynamo_connector",
connector => InstanceId connector => InstanceId
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstanceId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstanceId, Query, State) -> on_query(InstanceId, Query, State) ->
do_query(InstanceId, Query, State). do_query(InstanceId, Query, State).

View File

@ -78,6 +78,7 @@ on_start(
State = #{pool_name => InstanceId, server => Server}, State = #{pool_name => InstanceId, server => Server},
case opentsdb_connectivity(Server) of case opentsdb_connectivity(Server) of
ok -> ok ->
ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId),
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -89,12 +90,17 @@ on_start(
Error Error
end. end.
on_stop(InstanceId, #{pool_name := PoolName} = _State) -> on_stop(InstanceId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_opents_connector", msg => "stopping_opents_connector",
connector => InstanceId connector => InstanceId
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstanceId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstanceId, Request, State) -> on_query(InstanceId, Request, State) ->
on_batch_query(InstanceId, [Request], State). on_batch_query(InstanceId, [Request], State).

View File

@ -242,6 +242,7 @@ on_start(
%% Already initialized %% Already initialized
ok ok
end, end,
ok = emqx_resource:allocate_resource(InstanceID, pool_name, InstanceID),
case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -267,6 +268,14 @@ on_stop(
msg => "stopping RabbitMQ connector", msg => "stopping RabbitMQ connector",
connector => ResourceID connector => ResourceID
}), }),
case emqx_resource:get_allocated_resources(InstanceId) of
#{pool_name := PoolName} ->
stop_clients_and_pool(PoolName);
_ ->
ok
end.
stop_clients_and_pool(PoolName) ->
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
Clients = [ Clients = [
begin begin

View File

@ -215,6 +215,7 @@ on_start(
sql_templates => parse_sql_template(Config), sql_templates => parse_sql_template(Config),
resource_opts => ResourceOpts resource_opts => ResourceOpts
}, },
ok = emqx_resource:allocate_resource(InstanceId, pool_name, PoolName),
case emqx_resource_pool:start(PoolName, ?MODULE, Options) of case emqx_resource_pool:start(PoolName, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -231,7 +232,12 @@ on_stop(ResourceId, _State) ->
msg => "stopping_sqlserver_connector", msg => "stopping_sqlserver_connector",
connector => ResourceId connector => ResourceId
}), }),
emqx_resource_pool:stop(ResourceId). case emqx_resource:get_allocated_resources(InstanceId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
-spec on_query( -spec on_query(
resource_id(), resource_id(),