diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 6d9ad50e4..d2f527883 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -446,6 +446,7 @@ init({DataIn, Opts}) -> terminate({shutdown, removed}, _State, _Data) -> ok; terminate(_Reason, _State, Data) -> + ok = terminate_health_check_workers(Data), _ = maybe_stop_resource(Data), _ = erase_cache(Data), ok. @@ -634,6 +635,7 @@ health_check_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> %% stop the buffer workers first, brutal_kill, so it should be fast ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), + ok = terminate_health_check_workers(Data), %% now stop the resource, this can be slow _ = stop_resource(Data), case ClearMetrics of @@ -793,6 +795,35 @@ safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) -> safe_call_remove_channel(ResId, Mod, State, ChannelID) -> emqx_resource:call_remove_channel(ResId, Mod, State, ChannelID). +%% For cases where we need to terminate and there are running health checks. +terminate_health_check_workers(Data) -> + #data{ + hc_workers = #{resource := RHCWorkers, channel := CHCWorkers}, + hc_pending_callers = #{resource := RPending, channel := CPending} + } = Data, + maps:foreach( + fun({Pid, _Ref}, _) -> + exit(Pid, kill) + end, + RHCWorkers + ), + maps:foreach( + fun + ({Pid, _Ref}, _) when is_pid(Pid) -> + exit(Pid, kill); + (_, _) -> + ok + end, + CHCWorkers + ), + Pending = lists:flatten([RPending, maps:values(CPending)]), + lists:foreach( + fun(From) -> + gen_statem:reply(From, {error, resource_shutting_down}) + end, + Pending + ). + make_test_id() -> RandId = iolist_to_binary(emqx_utils:gen_id(16)), <>.