Merge pull request #10924 from JimMoen/refactor_influxdb_on_stop

feat: refactor influxdb connector to to avoid resources leaking
This commit is contained in:
JimMoen 2023-06-05 18:02:15 +08:00 committed by GitHub
commit 0f808345e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 5 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_influxdb, [ {application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"}, {description, "EMQX Enterprise InfluxDB Bridge"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, influxdb]}, {applications, [kernel, stdlib, influxdb]},
{env, []}, {env, []},

View File

@ -39,6 +39,9 @@
-type ts_precision() :: ns | us | ms | s. -type ts_precision() :: ns | us | ms | s.
%% Allocatable resources
-define(influx_client, influx_client).
-define(INFLUXDB_DEFAULT_PORT, 8086). -define(INFLUXDB_DEFAULT_PORT, 8086).
%% influxdb servers don't need parse %% influxdb servers don't need parse
@ -53,10 +56,20 @@
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
on_start(InstId, Config) -> on_start(InstId, Config) ->
%% InstID as pool would be handled by influxdb client
%% so there is no need to allocate pool_name here
%% ehttpc for influxdb-v1/v2,
%% ecpool for influxdb-udp
%% See: influxdb:start_client/1
start_client(InstId, Config). start_client(InstId, Config).
on_stop(_InstId, #{client := Client}) -> on_stop(InstId, _State) ->
influxdb:stop_client(Client). case emqx_resource:get_allocated_resources(InstId) of
#{?influx_client := Client} ->
influxdb:stop_client(Client);
_ ->
ok
end.
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
case data_to_points(Data, SyntaxLines) of case data_to_points(Data, SyntaxLines) of
@ -220,8 +233,12 @@ start_client(InstId, Config) ->
config => emqx_utils:redact(Config), config => emqx_utils:redact(Config),
client_config => emqx_utils:redact(ClientConfig) client_config => emqx_utils:redact(ClientConfig)
}), }),
try try do_start_client(InstId, ClientConfig, Config) of
do_start_client(InstId, ClientConfig, Config) Res = {ok, #{client := Client}} ->
ok = emqx_resource:allocate_resource(InstId, ?influx_client, Client),
Res;
{error, Reason} ->
{error, Reason}
catch catch
E:R:S -> E:R:S ->
?tp(influxdb_connector_start_exception, #{error => {E, R}}), ?tp(influxdb_connector_start_exception, #{error => {E, R}}),

View File

@ -0,0 +1 @@
Refactored influxdb bridge connector to avoid resource leaks during crashes at creation.