diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d2f527883..ff91d99fe 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -81,15 +81,15 @@ hc_workers = #{ resource => #{}, channel => #{ - pending => [], - previous_status => #{} + ongoing => #{}, + pending => [] } } :: #{ resource := #{{pid(), reference()} => true}, channel := #{ {pid(), reference()} => channel_id(), - pending := [channel_id()], - previous_status := #{channel_id() => channel_status_map()} + ongoing := #{channel_id() => channel_status_map()}, + pending := [channel_id()] } }, %% Callers waiting on health check @@ -1039,12 +1039,12 @@ handle_manual_channel_health_check( #data{ added_channels = Channels, hc_pending_callers = #{channel := CPending0} = Pending0, - hc_workers = #{channel := #{previous_status := PreviousStatus}} + hc_workers = #{channel := #{ongoing := Ongoing}} } = Data0, ChannelId ) when is_map_key(ChannelId, Channels), - is_map_key(ChannelId, PreviousStatus) + is_map_key(ChannelId, Ongoing) -> %% Ongoing health check. CPending = maps:update_with( @@ -1189,53 +1189,54 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) -> %% `?status_connected'. -spec trigger_health_check_for_added_channels(data()) -> data(). trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) -> - #{channel := CHCWorkers0} = HCWorkers0, - PreviousStatus = maps:from_list([ - {ChannelId, OldStatus} - || {ChannelId, OldStatus} <- maps:to_list(Data0#data.added_channels), - channel_status_is_channel_added(OldStatus) - ]), - ChannelsToCheck = maps:keys(PreviousStatus), + #{ + channel := CHCWorkers0 = + #{ + pending := CPending0, + ongoing := Ongoing0 + } + } = HCWorkers0, + NewOngoing = maps:filter( + fun(ChannelId, OldStatus) -> + not is_map_key(ChannelId, Ongoing0) and + channel_status_is_channel_added(OldStatus) + end, + Data0#data.added_channels + ), + ChannelsToCheck = maps:keys(NewOngoing), case ChannelsToCheck of [] -> %% Nothing to do. Data0; [ChannelId | Rest] -> %% Shooting one check at a time. We could increase concurrency in the future. - CHCWorkers = CHCWorkers0#{pending := Rest, previous_status := PreviousStatus}, + CHCWorkers = CHCWorkers0#{ + pending := CPending0 ++ Rest, + ongoing := maps:merge(Ongoing0, NewOngoing) + }, Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, start_channel_health_check(Data1, ChannelId) end. --spec continue_channel_health_check_connected(data()) -> data(). -continue_channel_health_check_connected(Data0) -> +-spec continue_channel_health_check_connected(channel_id(), channel_status_map(), data()) -> data(). +continue_channel_health_check_connected(ChannelId, OldStatus, Data0) -> #data{hc_workers = HCWorkers0} = Data0, - #{channel := #{previous_status := PreviousStatus} = CHCWorkers0} = HCWorkers0, - CHCWorkers = CHCWorkers0#{previous_status := #{}}, + #{channel := CHCWorkers0} = HCWorkers0, + CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0), Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, %% Remove the added channels with a a status different from connected or connecting - CheckedChannels = [ - {ChannelId, NewStatus} - || {ChannelId, NewStatus} <- maps:to_list(Data0#data.added_channels), - is_map_key(ChannelId, PreviousStatus) - ], - ChannelsToRemove = [ - ChannelId - || {ChannelId, NewStatus} <- CheckedChannels, - not channel_status_is_channel_added(NewStatus) - ], + NewStatus = maps:get(ChannelId, Data0#data.added_channels), + ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], Data = remove_channels_in_list(ChannelsToRemove, Data1, true), %% Raise/clear alarms - lists:foreach( - fun - ({ID, #{status := ?status_connected}}) -> - _ = maybe_clear_alarm(ID); - ({ID, NewStatus}) -> - OldStatus = maps:get(ID, PreviousStatus), - _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) - end, - CheckedChannels - ), + case NewStatus of + #{status := ?status_connected} -> + _ = maybe_clear_alarm(ChannelId), + ok; + _ -> + _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus), + ok + end, Data. -spec start_channel_health_check(data(), channel_id()) -> data(). @@ -1271,19 +1272,24 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> %% `emqx_resource:call_channel_health_check' catches all exceptions. AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0) end, + #{ongoing := Ongoing0} = CHCWorkers1, + {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0), + CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1}, + CHCWorkers3 = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers2), Data1 = Data0#data{added_channels = AddedChannels}, {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1), case CHCWorkers1 of #{pending := [NextChannelId | Rest]} -> - CHCWorkers = CHCWorkers1#{pending := Rest}, + CHCWorkers = CHCWorkers3#{pending := Rest}, HCWorkers = HCWorkers0#{channel := CHCWorkers}, Data3 = Data2#data{hc_workers = HCWorkers}, - Data = start_channel_health_check(Data3, NextChannelId), + Data4 = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3), + Data = start_channel_health_check(Data4, NextChannelId), {keep_state, update_state(Data, Data0), Replies}; #{pending := []} -> - HCWorkers = HCWorkers0#{channel := CHCWorkers1}, + HCWorkers = HCWorkers0#{channel := CHCWorkers3}, Data3 = Data2#data{hc_workers = HCWorkers}, - Data = continue_channel_health_check_connected(Data3), + Data = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3), {keep_state, update_state(Data, Data0), Replies} end. @@ -1339,7 +1345,7 @@ remove_runtime_data(#data{} = Data0) -> Data0#data{ hc_workers = #{ resource => #{}, - channel => #{pending => [], previous_status => #{}} + channel => #{pending => [], ongoing => #{}} }, hc_pending_callers = #{ resource => [],