Merge pull request #10895 from lafirest/fix/refactor_on_stop

feat: refactored some bridges to avoid leaking resources during crashes at creation
This commit is contained in:
JianBo He 2023-06-01 15:17:37 +08:00 committed by GitHub
commit bcc5e30f22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 89 additions and 35 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge_iotdb, [ {application, emqx_bridge_iotdb, [
{description, "EMQX Enterprise Apache IoTDB Bridge"}, {description, "EMQX Enterprise Apache IoTDB Bridge"},
{vsn, "0.1.1"}, {vsn, "0.1.2"},
{modules, [ {modules, [
emqx_bridge_iotdb, emqx_bridge_iotdb,
emqx_bridge_iotdb_impl emqx_bridge_iotdb_impl

View File

@ -23,35 +23,35 @@
scheme := http | https, scheme := http | https,
host := iolist(), host := iolist(),
port := inet:port_number(), port := inet:port_number(),
path := '_' path := _
}, },
connect_timeout := pos_integer(), connect_timeout := pos_integer(),
pool_type := random | hash, pool_type := random | hash,
pool_size := pos_integer(), pool_size := pos_integer(),
request := undefined | map(), request => undefined | map(),
is_aligned := boolean(), is_aligned => boolean(),
iotdb_version := binary(), iotdb_version => binary(),
device_id := binary() | undefined, device_id => binary() | undefined,
atom() => '_' atom() => _
}. }.
-type state() :: -type state() ::
#{ #{
base_path := '_', base_path := _,
base_url := #{ base_url := #{
scheme := http | https, scheme := http | https,
host := iolist(), host := iolist(),
port := inet:port_number(), port := inet:port_number(),
path := '_' path := _
}, },
connect_timeout := pos_integer(), connect_timeout := pos_integer(),
pool_type := random | hash, pool_type := random | hash,
pool_size := pos_integer(), pool_size := pos_integer(),
request := undefined | map(), request => undefined | map(),
is_aligned := boolean(), is_aligned => boolean(),
iotdb_version := binary(), iotdb_version => binary(),
device_id := binary() | undefined, device_id => binary() | undefined,
atom() => '_' atom() => _
}. }.
-type manager_id() :: binary(). -type manager_id() :: binary().

View File

@ -83,6 +83,7 @@ start_ingress(ResourceId, Ingress, ClientOpts) ->
{ingress, Ingress}, {ingress, Ingress},
{client_opts, ClientOpts} {client_opts, ClientOpts}
], ],
ok = emqx_resource:allocate_resource(ResourceId, ingress_pool_name, PoolName),
case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of
ok -> ok ->
{ok, #{ingress_pool_name => PoolName}}; {ok, #{ingress_pool_name => PoolName}};
@ -132,6 +133,7 @@ start_egress(ResourceId, Egress, ClientOpts) ->
{pool_size, PoolSize}, {pool_size, PoolSize},
{client_opts, ClientOpts} {client_opts, ClientOpts}
], ],
ok = emqx_resource:allocate_resource(ResourceId, egress_pool_name, PoolName),
case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of
ok -> ok ->
{ok, #{ {ok, #{
@ -142,13 +144,14 @@ start_egress(ResourceId, Egress, ClientOpts) ->
{error, Reason} {error, Reason}
end. end.
on_stop(ResourceId, State) -> on_stop(ResourceId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_mqtt_connector", msg => "stopping_mqtt_connector",
connector => ResourceId connector => ResourceId
}), }),
ok = stop_ingress(State), Allocated = emqx_resource:get_allocated_resources(ResourceId),
ok = stop_egress(State). ok = stop_ingress(Allocated),
ok = stop_egress(Allocated).
stop_ingress(#{ingress_pool_name := PoolName}) -> stop_ingress(#{ingress_pool_name := PoolName}) ->
emqx_resource_pool:stop(PoolName); emqx_resource_pool:stop(PoolName);

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_tdengine, [ {application, emqx_bridge_tdengine, [
{description, "EMQX Enterprise TDEngine Bridge"}, {description, "EMQX Enterprise TDEngine Bridge"},
{vsn, "0.1.2"}, {vsn, "0.1.3"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, tdengine]}, {applications, [kernel, stdlib, tdengine]},
{env, []}, {env, []},

View File

@ -108,6 +108,7 @@ on_start(
Prepares = parse_prepare_sql(Config), Prepares = parse_prepare_sql(Config),
State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)}, State = Prepares#{pool_name => InstanceId, query_opts => query_opts(Config)},
ok = emqx_resource:allocate_resource(InstanceId, pool_name, InstanceId),
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
ok -> ok ->
{ok, State}; {ok, State};
@ -115,12 +116,17 @@ on_start(
Error Error
end. end.
on_stop(InstanceId, #{pool_name := PoolName}) -> on_stop(InstanceId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_tdengine_connector", msg => "stopping_tdengine_connector",
connector => InstanceId connector => InstanceId
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstanceId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstanceId, {query, SQL}, State) -> on_query(InstanceId, {query, SQL}, State) ->
do_query(InstanceId, SQL, State); do_query(InstanceId, SQL, State);

View File

@ -219,18 +219,24 @@ on_start(
base_path => BasePath, base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined)) request => preprocess_request(maps:get(request, Config, undefined))
}, },
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case ehttpc_sup:start_pool(InstId, PoolOpts) of case ehttpc_sup:start_pool(InstId, PoolOpts) of
{ok, _} -> {ok, State}; {ok, _} -> {ok, State};
{error, {already_started, _}} -> {ok, State}; {error, {already_started, _}} -> {ok, State};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
on_stop(InstId, #{pool_name := PoolName}) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_http_connector", msg => "stopping_http_connector",
connector => InstId connector => InstId
}), }),
ehttpc_sup:stop_pool(PoolName). case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
ehttpc_sup:stop_pool(PoolName);
_ ->
ok
end.
on_query(InstId, {send_message, Msg}, State) -> on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of case maps:get(request, State, undefined) of

View File

@ -97,17 +97,23 @@ on_start(
{pool_size, PoolSize}, {pool_size, PoolSize},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL} {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}
], ],
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Opts ++ SslOpts) of
ok -> {ok, #{pool_name => InstId}}; ok -> {ok, #{pool_name => InstId}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
on_stop(InstId, #{pool_name := PoolName}) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_ldap_connector", msg => "stopping_ldap_connector",
connector => InstId connector => InstId
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) -> on_query(InstId, {search, Base, Filter, Attributes}, #{pool_name := PoolName} = State) ->
Request = {Base, Filter, Attributes}, Request = {Base, Filter, Attributes},

View File

@ -183,6 +183,7 @@ on_start(
{worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)} {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
], ],
Collection = maps:get(collection, Config, <<"mqtt">>), Collection = maps:get(collection, Config, <<"mqtt">>),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Opts) of case emqx_resource_pool:start(InstId, ?MODULE, Opts) of
ok -> ok ->
{ok, #{ {ok, #{
@ -194,12 +195,17 @@ on_start(
{error, Reason} {error, Reason}
end. end.
on_stop(InstId, #{pool_name := PoolName}) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_mongodb_connector", msg => "stopping_mongodb_connector",
connector => InstId connector => InstId
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query( on_query(
InstId, InstId,

View File

@ -124,6 +124,7 @@ on_start(
] ]
), ),
State = parse_prepare_sql(Config), State = parse_prepare_sql(Config),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId})}; {ok, init_prepare(State#{pool_name => InstId})};
@ -140,12 +141,17 @@ maybe_add_password_opt(undefined, Options) ->
maybe_add_password_opt(Password, Options) -> maybe_add_password_opt(Password, Options) ->
[{password, Password} | Options]. [{password, Password} | Options].
on_stop(InstId, #{pool_name := PoolName}) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_mysql_connector", msg => "stopping_mysql_connector",
connector => InstId connector => InstId
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstId, {TypeOrKey, SQLOrKey}, State) -> on_query(InstId, {TypeOrKey, SQLOrKey}, State) ->
on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State); on_query(InstId, {TypeOrKey, SQLOrKey, [], default_timeout}, State);

View File

@ -121,6 +121,7 @@ on_start(
{pool_size, PoolSize} {pool_size, PoolSize}
], ],
State = parse_prepare_sql(Config), State = parse_prepare_sql(Config),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
ok -> ok ->
{ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})}; {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
@ -132,12 +133,17 @@ on_start(
{error, Reason} {error, Reason}
end. end.
on_stop(InstId, #{pool_name := PoolName}) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping postgresql connector", msg => "stopping postgresql connector",
connector => InstId connector => InstId
}), }),
emqx_resource_pool:stop(PoolName). case emqx_resource:get_allocated_resources(InstId) of
#{pool_name := PoolName} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end.
on_query(InstId, {TypeOrKey, NameOrSQL}, State) -> on_query(InstId, {TypeOrKey, NameOrSQL}, State) ->
on_query(InstId, {TypeOrKey, NameOrSQL, []}, State); on_query(InstId, {TypeOrKey, NameOrSQL, []}, State);

View File

@ -160,6 +160,8 @@ on_start(
[{ssl, false}] [{ssl, false}]
end ++ [{sentinel, maps:get(sentinel, Config, undefined)}], end ++ [{sentinel, maps:get(sentinel, Config, undefined)}],
State = #{pool_name => InstId, type => Type}, State = #{pool_name => InstId, type => Type},
ok = emqx_resource:allocate_resource(InstId, type, Type),
ok = emqx_resource:allocate_resource(InstId, pool_name, InstId),
case Type of case Type of
cluster -> cluster ->
case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of case eredis_cluster:start_pool(InstId, Opts ++ [{options, Options}]) of
@ -177,14 +179,18 @@ on_start(
end end
end. end.
on_stop(InstId, #{pool_name := PoolName, type := Type}) -> on_stop(InstId, _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_redis_connector", msg => "stopping_redis_connector",
connector => InstId connector => InstId
}), }),
case Type of case emqx_resource:get_allocated_resources(InstId) of
cluster -> eredis_cluster:stop_pool(PoolName); #{pool_name := PoolName, type := cluster} ->
_ -> emqx_resource_pool:stop(PoolName) eredis_cluster:stop_pool(PoolName);
#{pool_name := PoolName, type := _} ->
emqx_resource_pool:stop(PoolName);
_ ->
ok
end. end.
on_query(InstId, {cmd, _} = Query, State) -> on_query(InstId, {cmd, _} = Query, State) ->

View File

@ -25,7 +25,8 @@ wrap_auth_headers_test_() ->
meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}), meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}),
meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end), meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end),
meck:expect(ehttpc_pool, pick_worker, 1, self()), meck:expect(ehttpc_pool, pick_worker, 1, self()),
[ehttpc_sup, ehttpc, ehttpc_pool] meck:expect(emqx_resource, allocate_resource, 3, ok),
[ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource]
end, end,
fun meck:unload/1, fun(_) -> fun meck:unload/1, fun(_) ->
Config = #{ Config = #{

View File

@ -0,0 +1,8 @@
Refactored some bridges to avoid leaking resources during crashes at creation, including:
- TDEngine
- WebHook
- LDAP
- MongoDB
- MySQL
- PostgreSQL
- Redis