fix(resource manager): clean up any running health checks when terminating
Fixes https://github.com/emqx/emqx/pull/12812#discussion_r1555564254
This commit is contained in:
parent
1e64e531f0
commit
79526d539a
|
@ -446,6 +446,7 @@ init({DataIn, Opts}) ->
|
||||||
terminate({shutdown, removed}, _State, _Data) ->
|
terminate({shutdown, removed}, _State, _Data) ->
|
||||||
ok;
|
ok;
|
||||||
terminate(_Reason, _State, Data) ->
|
terminate(_Reason, _State, Data) ->
|
||||||
|
ok = terminate_health_check_workers(Data),
|
||||||
_ = maybe_stop_resource(Data),
|
_ = maybe_stop_resource(Data),
|
||||||
_ = erase_cache(Data),
|
_ = erase_cache(Data),
|
||||||
ok.
|
ok.
|
||||||
|
@ -634,6 +635,7 @@ health_check_actions(Data) ->
|
||||||
handle_remove_event(From, ClearMetrics, Data) ->
|
handle_remove_event(From, ClearMetrics, Data) ->
|
||||||
%% stop the buffer workers first, brutal_kill, so it should be fast
|
%% stop the buffer workers first, brutal_kill, so it should be fast
|
||||||
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
|
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
|
||||||
|
ok = terminate_health_check_workers(Data),
|
||||||
%% now stop the resource, this can be slow
|
%% now stop the resource, this can be slow
|
||||||
_ = stop_resource(Data),
|
_ = stop_resource(Data),
|
||||||
case ClearMetrics of
|
case ClearMetrics of
|
||||||
|
@ -793,6 +795,35 @@ safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) ->
|
||||||
safe_call_remove_channel(ResId, Mod, State, ChannelID) ->
|
safe_call_remove_channel(ResId, Mod, State, ChannelID) ->
|
||||||
emqx_resource:call_remove_channel(ResId, Mod, State, ChannelID).
|
emqx_resource:call_remove_channel(ResId, Mod, State, ChannelID).
|
||||||
|
|
||||||
|
%% For cases where we need to terminate and there are running health checks.
|
||||||
|
terminate_health_check_workers(Data) ->
|
||||||
|
#data{
|
||||||
|
hc_workers = #{resource := RHCWorkers, channel := CHCWorkers},
|
||||||
|
hc_pending_callers = #{resource := RPending, channel := CPending}
|
||||||
|
} = Data,
|
||||||
|
maps:foreach(
|
||||||
|
fun({Pid, _Ref}, _) ->
|
||||||
|
exit(Pid, kill)
|
||||||
|
end,
|
||||||
|
RHCWorkers
|
||||||
|
),
|
||||||
|
maps:foreach(
|
||||||
|
fun
|
||||||
|
({Pid, _Ref}, _) when is_pid(Pid) ->
|
||||||
|
exit(Pid, kill);
|
||||||
|
(_, _) ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
CHCWorkers
|
||||||
|
),
|
||||||
|
Pending = lists:flatten([RPending, maps:values(CPending)]),
|
||||||
|
lists:foreach(
|
||||||
|
fun(From) ->
|
||||||
|
gen_statem:reply(From, {error, resource_shutting_down})
|
||||||
|
end,
|
||||||
|
Pending
|
||||||
|
).
|
||||||
|
|
||||||
make_test_id() ->
|
make_test_id() ->
|
||||||
RandId = iolist_to_binary(emqx_utils:gen_id(16)),
|
RandId = iolist_to_binary(emqx_utils:gen_id(16)),
|
||||||
<<?TEST_ID_PREFIX, RandId/binary>>.
|
<<?TEST_ID_PREFIX, RandId/binary>>.
|
||||||
|
|
Loading…
Reference in New Issue