From 79526d539a7ed31f5b66f3bc9caca2a6ec3a42b5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 8 Apr 2024 09:57:35 -0300 Subject: [PATCH 1/3] fix(resource manager): clean up any running health checks when terminating Fixes https://github.com/emqx/emqx/pull/12812#discussion_r1555564254 --- .../src/emqx_resource_manager.erl | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 6d9ad50e4..d2f527883 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -446,6 +446,7 @@ init({DataIn, Opts}) -> terminate({shutdown, removed}, _State, _Data) -> ok; terminate(_Reason, _State, Data) -> + ok = terminate_health_check_workers(Data), _ = maybe_stop_resource(Data), _ = erase_cache(Data), ok. @@ -634,6 +635,7 @@ health_check_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> %% stop the buffer workers first, brutal_kill, so it should be fast ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), + ok = terminate_health_check_workers(Data), %% now stop the resource, this can be slow _ = stop_resource(Data), case ClearMetrics of @@ -793,6 +795,35 @@ safe_call_remove_channel(_ResId, _Mod, undefined = State, _ChannelID) -> safe_call_remove_channel(ResId, Mod, State, ChannelID) -> emqx_resource:call_remove_channel(ResId, Mod, State, ChannelID). +%% For cases where we need to terminate and there are running health checks. +terminate_health_check_workers(Data) -> + #data{ + hc_workers = #{resource := RHCWorkers, channel := CHCWorkers}, + hc_pending_callers = #{resource := RPending, channel := CPending} + } = Data, + maps:foreach( + fun({Pid, _Ref}, _) -> + exit(Pid, kill) + end, + RHCWorkers + ), + maps:foreach( + fun + ({Pid, _Ref}, _) when is_pid(Pid) -> + exit(Pid, kill); + (_, _) -> + ok + end, + CHCWorkers + ), + Pending = lists:flatten([RPending, maps:values(CPending)]), + lists:foreach( + fun(From) -> + gen_statem:reply(From, {error, resource_shutting_down}) + end, + Pending + ). + make_test_id() -> RandId = iolist_to_binary(emqx_utils:gen_id(16)), <>. From 475077c7981edcbb2e8986515f9bf62ba1572496 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 8 Apr 2024 15:11:10 -0300 Subject: [PATCH 2/3] fix(resource): account for ongoing channel health checks, update data and reply immediately when receiving an update --- .../src/emqx_resource_manager.erl | 92 ++++++++++--------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d2f527883..ff91d99fe 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -81,15 +81,15 @@ hc_workers = #{ resource => #{}, channel => #{ - pending => [], - previous_status => #{} + ongoing => #{}, + pending => [] } } :: #{ resource := #{{pid(), reference()} => true}, channel := #{ {pid(), reference()} => channel_id(), - pending := [channel_id()], - previous_status := #{channel_id() => channel_status_map()} + ongoing := #{channel_id() => channel_status_map()}, + pending := [channel_id()] } }, %% Callers waiting on health check @@ -1039,12 +1039,12 @@ handle_manual_channel_health_check( #data{ added_channels = Channels, hc_pending_callers = #{channel := CPending0} = Pending0, - hc_workers = #{channel := #{previous_status := PreviousStatus}} + hc_workers = #{channel := #{ongoing := Ongoing}} } = Data0, ChannelId ) when is_map_key(ChannelId, Channels), - is_map_key(ChannelId, PreviousStatus) + is_map_key(ChannelId, Ongoing) -> %% Ongoing health check. CPending = maps:update_with( @@ -1189,53 +1189,54 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) -> %% `?status_connected'. -spec trigger_health_check_for_added_channels(data()) -> data(). trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) -> - #{channel := CHCWorkers0} = HCWorkers0, - PreviousStatus = maps:from_list([ - {ChannelId, OldStatus} - || {ChannelId, OldStatus} <- maps:to_list(Data0#data.added_channels), - channel_status_is_channel_added(OldStatus) - ]), - ChannelsToCheck = maps:keys(PreviousStatus), + #{ + channel := CHCWorkers0 = + #{ + pending := CPending0, + ongoing := Ongoing0 + } + } = HCWorkers0, + NewOngoing = maps:filter( + fun(ChannelId, OldStatus) -> + not is_map_key(ChannelId, Ongoing0) and + channel_status_is_channel_added(OldStatus) + end, + Data0#data.added_channels + ), + ChannelsToCheck = maps:keys(NewOngoing), case ChannelsToCheck of [] -> %% Nothing to do. Data0; [ChannelId | Rest] -> %% Shooting one check at a time. We could increase concurrency in the future. - CHCWorkers = CHCWorkers0#{pending := Rest, previous_status := PreviousStatus}, + CHCWorkers = CHCWorkers0#{ + pending := CPending0 ++ Rest, + ongoing := maps:merge(Ongoing0, NewOngoing) + }, Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, start_channel_health_check(Data1, ChannelId) end. --spec continue_channel_health_check_connected(data()) -> data(). -continue_channel_health_check_connected(Data0) -> +-spec continue_channel_health_check_connected(channel_id(), channel_status_map(), data()) -> data(). +continue_channel_health_check_connected(ChannelId, OldStatus, Data0) -> #data{hc_workers = HCWorkers0} = Data0, - #{channel := #{previous_status := PreviousStatus} = CHCWorkers0} = HCWorkers0, - CHCWorkers = CHCWorkers0#{previous_status := #{}}, + #{channel := CHCWorkers0} = HCWorkers0, + CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0), Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, %% Remove the added channels with a a status different from connected or connecting - CheckedChannels = [ - {ChannelId, NewStatus} - || {ChannelId, NewStatus} <- maps:to_list(Data0#data.added_channels), - is_map_key(ChannelId, PreviousStatus) - ], - ChannelsToRemove = [ - ChannelId - || {ChannelId, NewStatus} <- CheckedChannels, - not channel_status_is_channel_added(NewStatus) - ], + NewStatus = maps:get(ChannelId, Data0#data.added_channels), + ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], Data = remove_channels_in_list(ChannelsToRemove, Data1, true), %% Raise/clear alarms - lists:foreach( - fun - ({ID, #{status := ?status_connected}}) -> - _ = maybe_clear_alarm(ID); - ({ID, NewStatus}) -> - OldStatus = maps:get(ID, PreviousStatus), - _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) - end, - CheckedChannels - ), + case NewStatus of + #{status := ?status_connected} -> + _ = maybe_clear_alarm(ChannelId), + ok; + _ -> + _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus), + ok + end, Data. -spec start_channel_health_check(data(), channel_id()) -> data(). @@ -1271,19 +1272,24 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> %% `emqx_resource:call_channel_health_check' catches all exceptions. AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0) end, + #{ongoing := Ongoing0} = CHCWorkers1, + {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0), + CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1}, + CHCWorkers3 = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers2), Data1 = Data0#data{added_channels = AddedChannels}, {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1), case CHCWorkers1 of #{pending := [NextChannelId | Rest]} -> - CHCWorkers = CHCWorkers1#{pending := Rest}, + CHCWorkers = CHCWorkers3#{pending := Rest}, HCWorkers = HCWorkers0#{channel := CHCWorkers}, Data3 = Data2#data{hc_workers = HCWorkers}, - Data = start_channel_health_check(Data3, NextChannelId), + Data4 = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3), + Data = start_channel_health_check(Data4, NextChannelId), {keep_state, update_state(Data, Data0), Replies}; #{pending := []} -> - HCWorkers = HCWorkers0#{channel := CHCWorkers1}, + HCWorkers = HCWorkers0#{channel := CHCWorkers3}, Data3 = Data2#data{hc_workers = HCWorkers}, - Data = continue_channel_health_check_connected(Data3), + Data = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3), {keep_state, update_state(Data, Data0), Replies} end. @@ -1339,7 +1345,7 @@ remove_runtime_data(#data{} = Data0) -> Data0#data{ hc_workers = #{ resource => #{}, - channel => #{pending => [], previous_status => #{}} + channel => #{pending => [], ongoing => #{}} }, hc_pending_callers = #{ resource => [], From 5cf92dcb73108b5ce1a967eec410b432692763ea Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 29 Apr 2024 13:26:49 -0300 Subject: [PATCH 3/3] refactor: use `spawn_link` instead of `spawn_monitor` This should cover the case when the resource manager is brutally killed. --- .../src/emqx_resource_manager.erl | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index ff91d99fe..1c0c3f4ca 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -556,22 +556,22 @@ handle_event( {keep_state_and_data, {reply, From, {ok, Channels}}}; handle_event( info, - {'DOWN', Ref, process, Pid, Res}, + {'EXIT', Pid, Res}, State0, Data0 = #data{hc_workers = #{resource := RHCWorkers}} ) when - is_map_key({Pid, Ref}, RHCWorkers) + is_map_key(Pid, RHCWorkers) -> - handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res); + handle_resource_health_check_worker_down(State0, Data0, Pid, Res); handle_event( info, - {'DOWN', Ref, process, Pid, Res}, + {'EXIT', Pid, Res}, _State, Data0 = #data{hc_workers = #{channel := CHCWorkers}} ) when - is_map_key({Pid, Ref}, CHCWorkers) + is_map_key(Pid, CHCWorkers) -> - handle_channel_health_check_worker_down(Data0, {Pid, Ref}, Res); + handle_channel_health_check_worker_down(Data0, Pid, Res); % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -802,14 +802,14 @@ terminate_health_check_workers(Data) -> hc_pending_callers = #{resource := RPending, channel := CPending} } = Data, maps:foreach( - fun({Pid, _Ref}, _) -> + fun(Pid, _) -> exit(Pid, kill) end, RHCWorkers ), maps:foreach( fun - ({Pid, _Ref}, _) when is_pid(Pid) -> + (Pid, _) when is_pid(Pid) -> exit(Pid, kill); (_, _) -> ok @@ -947,14 +947,14 @@ start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when keep_state_and_data; start_resource_health_check(#data{} = Data0) -> #data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0, - WorkerRef = {_Pid, _Ref} = spawn_resource_health_check_worker(Data0), - HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}}, + WorkerPid = spawn_resource_health_check_worker(Data0), + HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerPid => true}}, Data = Data0#data{hc_workers = HCWorkers}, {keep_state, Data}. --spec spawn_resource_health_check_worker(data()) -> {pid(), reference()}. +-spec spawn_resource_health_check_worker(data()) -> pid(). spawn_resource_health_check_worker(#data{} = Data) -> - spawn_monitor(?MODULE, worker_resource_health_check, [Data]). + spawn_link(?MODULE, worker_resource_health_check, [Data]). %% separated so it can be spec'ed and placate dialyzer tantrums... -spec worker_resource_health_check(data()) -> no_return(). @@ -1242,13 +1242,13 @@ continue_channel_health_check_connected(ChannelId, OldStatus, Data0) -> -spec start_channel_health_check(data(), channel_id()) -> data(). start_channel_health_check(#data{} = Data0, ChannelId) -> #data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0, - WorkerRef = {_Pid, _Ref} = spawn_channel_health_check_worker(Data0, ChannelId), - HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerRef => ChannelId}}, + WorkerPid = spawn_channel_health_check_worker(Data0, ChannelId), + HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerPid => ChannelId}}, Data0#data{hc_workers = HCWorkers}. --spec spawn_channel_health_check_worker(data(), channel_id()) -> {pid(), reference()}. +-spec spawn_channel_health_check_worker(data(), channel_id()) -> pid(). spawn_channel_health_check_worker(#data{} = Data, ChannelId) -> - spawn_monitor(?MODULE, worker_channel_health_check, [Data, ChannelId]). + spawn_link(?MODULE, worker_channel_health_check, [Data, ChannelId]). %% separated so it can be spec'ed and placate dialyzer tantrums... -spec worker_channel_health_check(data(), channel_id()) -> no_return().