chore: attempt to reduce race condition supervisor shutdown errors

Fixes https://emqx.atlassian.net/browse/EMQX-12442

e.g.:
```
2024-05-23T08:52:39.811845+00:00 [error] Supervisor: {local,emqx_resource_manager_sup}. Context: shutdown_error. Reason: noproc. Offender: id=<<99, 101, 110, 115, 111, 114, 101, 100>>,pid=<0.7752.1030>.
```

It could be just a race condition, as it seems to be the case for resource manager: i) a call is made to the process to stop it; ii) the call times out; iii) the after clause ends up calling supervisor:terminate_child; iv) while the supervisor is finding the child to terminate, the process actually finishes terminating, and the supervisor receives a noproc reason back.
This commit is contained in:
Thales Macedo Garitezi 2024-07-19 10:53:30 -03:00
parent ffa69df6f8
commit eb2d3a3b7e
1 changed files with 41 additions and 14 deletions

View File

@ -59,7 +59,7 @@
]).
% Server
-export([start_link/5]).
-export([start_link/5, where/1]).
% Behaviour
-export([init/1, callback_mode/0, handle_event/4, terminate/3]).
@ -147,6 +147,9 @@
%% API
%%------------------------------------------------------------------------------
where(ResId) ->
gproc:where(?NAME(ResId)).
%% @doc Called from emqx_resource when starting a resource instance.
%%
%% Triggers the emqx_resource_manager_sup supervisor to actually create
@ -268,17 +271,7 @@ remove(ResId) when is_binary(ResId) ->
-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
remove(ResId, ClearMetrics) when is_binary(ResId) ->
try
case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of
{error, timeout} ->
?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
action => remove,
resource_id => ResId
}),
force_kill(ResId),
ok;
Res ->
Res
end
do_remove(ResId, ClearMetrics)
after
%% Ensure the supervisor has it removed, otherwise the immediate re-add will see a stale process
%% If the 'remove' call above had succeeded, this is mostly a no-op but still needed to avoid race condition.
@ -286,6 +279,31 @@ remove(ResId, ClearMetrics) when is_binary(ResId) ->
emqx_resource_manager_sup:delete_child(ResId)
end.
do_remove(ResId, ClearMetrics) ->
case gproc:whereis_name(?NAME(ResId)) of
undefined ->
ok;
Pid when is_pid(Pid) ->
MRef = monitor(process, Pid),
case safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION) of
{error, timeout} ->
?tp(error, "forcefully_stopping_resource_due_to_timeout", #{
action => remove,
resource_id => ResId
}),
force_kill(ResId, MRef),
ok;
ok ->
receive
{'DOWN', MRef, process, Pid, _} ->
ok
end,
ok;
Res ->
Res
end
end.
%% @doc Stops and then starts an instance that was already running
-spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
restart(ResId, Opts) when is_binary(ResId) ->
@ -323,7 +341,7 @@ stop(ResId, Timeout) ->
action => stop,
resource_id => ResId
}),
force_kill(ResId),
force_kill(ResId, _MRef = undefined),
ok;
{error, _Reason} = Error ->
Error
@ -460,12 +478,21 @@ get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when
get_error(_ResId, #{error := Error}) ->
Error.
force_kill(ResId) ->
force_kill(ResId, MRef0) ->
case gproc:whereis_name(?NAME(ResId)) of
undefined ->
ok;
Pid when is_pid(Pid) ->
MRef =
case MRef0 of
undefined -> monitor(process, Pid);
_ -> MRef0
end,
exit(Pid, kill),
receive
{'DOWN', MRef, process, Pid, _} ->
ok
end,
try_clean_allocated_resources(ResId),
ok
end.