fix(resource_manager_sup): use `one_for_one` instead of `simple_one_for_one`

Using `simple_one_for_one` has a potential race condition issue where
we read the PID of the resource manager before trying to remove a
resource, and then that PID changes because it was either dead at
first, or it crashed and changed, and later we use this stale PID to
try to remove it from the supervisor.  Under such circumstances, the
restarting child might linger in the supervisor, leaking resources.

By using the resource ID itself as a child ID (and using `one_for_one`
restart strategy), we ensure the child is truly removed.
This commit is contained in:
Thales Macedo Garitezi 2023-05-25 15:01:39 -03:00
parent e43517188f
commit 32e6213ce3
4 changed files with 84 additions and 22 deletions

View File

@ -2058,14 +2058,41 @@ t_resource_manager_crash_after_subscriber_started(Config) ->
?tp(resource_manager_killed, #{}), ?tp(resource_manager_killed, #{}),
ok ok
end), end),
%% even if the resource manager is dead, we can still %% even if the resource manager is dead, we can still
%% clear the allocated resources. %% clear the allocated resources.
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
%% We avoid asserting only the `config_update_crashed'
%% error here because there's a race condition (just a
%% problem for the test assertion below) in which the
%% `emqx_resource_manager:create/5' call returns a failure
%% (not checked) and then `lookup' in that module is
%% delayed enough so that the manager supervisor has time
%% to restart the manager process and for the latter to
%% startup successfully. Occurs frequently in CI...
{Res, {ok, _}} =
?wait_async_action( ?wait_async_action(
create_bridge(Config), create_bridge(Config),
#{?snk_kind := kafka_consumer_subcriber_and_client_stopped}, #{?snk_kind := kafka_consumer_subcriber_and_client_stopped},
10_000 10_000
), ),
case Res of
{error, {config_update_crashed, {killed, _}}} ->
ok;
{ok, _} ->
%% the new manager may have had time to startup
%% before the resource status cache is read...
ok;
_ ->
ct:fail("unexpected result: ~p", [Res])
end,
?assertMatch({ok, _}, delete_bridge(Config)),
?retry(
_Sleep = 50,
_Attempts = 50,
?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup))
),
ok ok
end, end,
[] []
@ -2089,14 +2116,40 @@ t_resource_manager_crash_before_subscriber_started(Config) ->
?tp(resource_manager_killed, #{}), ?tp(resource_manager_killed, #{}),
ok ok
end), end),
%% even if the resource manager is dead, we can still %% even if the resource manager is dead, we can still
%% clear the allocated resources. %% clear the allocated resources.
{{error, {config_update_crashed, {killed, _}}}, {ok, _}} =
%% We avoid asserting only the `config_update_crashed'
%% error here because there's a race condition (just a
%% problem for the test assertion below) in which the
%% `emqx_resource_manager:create/5' call returns a failure
%% (not checked) and then `lookup' in that module is
%% delayed enough so that the manager supervisor has time
%% to restart the manager process and for the latter to
%% startup successfully. Occurs frequently in CI...
{Res, {ok, _}} =
?wait_async_action( ?wait_async_action(
create_bridge(Config), create_bridge(Config),
#{?snk_kind := kafka_consumer_just_client_stopped}, #{?snk_kind := kafka_consumer_just_client_stopped},
10_000 10_000
), ),
case Res of
{error, {config_update_crashed, {killed, _}}} ->
ok;
{ok, _} ->
%% the new manager may have had time to startup
%% before the resource status cache is read...
ok;
_ ->
ct:fail("unexpected result: ~p", [Res])
end,
?assertMatch({ok, _}, delete_bridge(Config)),
?retry(
_Sleep = 50,
_Attempts = 50,
?assertEqual([], supervisor:which_children(emqx_bridge_kafka_consumer_sup))
),
ok ok
end, end,
[] []

View File

@ -285,6 +285,11 @@ create_bridge(Config, Overrides) ->
PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides), PulsarConfig = emqx_utils_maps:deep_merge(PulsarConfig0, Overrides),
emqx_bridge:create(Type, Name, PulsarConfig). emqx_bridge:create(Type, Name, PulsarConfig).
delete_bridge(Config) ->
Type = ?BRIDGE_TYPE_BIN,
Name = ?config(pulsar_name, Config),
emqx_bridge:remove(Type, Name).
create_bridge_api(Config) -> create_bridge_api(Config) ->
create_bridge_api(Config, _Overrides = #{}). create_bridge_api(Config, _Overrides = #{}).
@ -1008,6 +1013,8 @@ t_resource_manager_crash_after_producers_started(Config) ->
Producers =/= undefined, Producers =/= undefined,
10_000 10_000
), ),
?assertMatch({ok, _}, delete_bridge(Config)),
?assertEqual([], get_pulsar_producers()),
ok ok
end, end,
[] []
@ -1039,6 +1046,8 @@ t_resource_manager_crash_before_producers_started(Config) ->
#{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined},
10_000 10_000
), ),
?assertMatch({ok, _}, delete_bridge(Config)),
?assertEqual([], get_pulsar_producers()),
ok ok
end, end,
[] []

View File

@ -192,14 +192,13 @@ remove(ResId) when is_binary(ResId) ->
%% @doc Stops a running resource_manager and optionally clears the metrics for the resource %% @doc Stops a running resource_manager and optionally clears the metrics for the resource
-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
remove(ResId, ClearMetrics) when is_binary(ResId) -> remove(ResId, ClearMetrics) when is_binary(ResId) ->
ResourceManagerPid = gproc:whereis_name(?NAME(ResId)),
try try
safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION)
after after
%% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process %% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
%% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition. %% If the 'remove' call babove had succeeded, this is mostly a no-op but still needed to avoid race condition.
%% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long. %% Otherwise this is a 'infinity' shutdown, so it may take arbitrary long.
emqx_resource_manager_sup:delete_child(ResourceManagerPid) emqx_resource_manager_sup:delete_child(ResId)
end. end.
%% @doc Stops and then starts an instance that was already running %% @doc Stops and then starts an instance that was already running

View File

@ -26,12 +26,12 @@
-export([init/1]). -export([init/1]).
ensure_child(ResId, Group, ResourceType, Config, Opts) -> ensure_child(ResId, Group, ResourceType, Config, Opts) ->
_ = supervisor:start_child(?MODULE, [ResId, Group, ResourceType, Config, Opts]), _ = supervisor:start_child(?MODULE, child_spec(ResId, Group, ResourceType, Config, Opts)),
ok. ok.
delete_child(Pid) -> delete_child(ResId) ->
_ = supervisor:terminate_child(?MODULE, Pid), _ = supervisor:terminate_child(?MODULE, ResId),
_ = supervisor:delete_child(?MODULE, Pid), _ = supervisor:delete_child(?MODULE, ResId),
ok. ok.
start_link() -> start_link() ->
@ -44,18 +44,19 @@ init([]) ->
public, public,
{read_concurrency, true} {read_concurrency, true}
]), ]),
ChildSpecs = [ ChildSpecs = [],
#{ SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
id => emqx_resource_manager,
start => {emqx_resource_manager, start_link, []},
restart => transient,
%% never force kill a resource manager.
%% becasue otherwise it may lead to release leak,
%% resource_manager's terminate callback calls resource on_stop
shutdown => infinity,
type => worker,
modules => [emqx_resource_manager]
}
],
SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10},
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.
child_spec(ResId, Group, ResourceType, Config, Opts) ->
#{
id => ResId,
start => {emqx_resource_manager, start_link, [ResId, Group, ResourceType, Config, Opts]},
restart => transient,
%% never force kill a resource manager.
%% becasue otherwise it may lead to release leak,
%% resource_manager's terminate callback calls resource on_stop
shutdown => infinity,
type => worker,
modules => [emqx_resource_manager]
}.