fix: handle channel updated during health check
This commit fixes an issue found by CI test case emqx_bridge_influxdb_SUITE:t_start_stop and others. While the channel health check process is running, the channel could be removed or updated which could cause a crash in the resource manager or non up-to-date alarms being triggered.
This commit is contained in:
parent
39d758c4d6
commit
cff8b97e8a
|
@ -1228,14 +1228,29 @@ trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0})
|
||||||
start_channel_health_check(Data1, ChannelId)
|
start_channel_health_check(Data1, ChannelId)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec continue_channel_health_check_connected(channel_id(), channel_status_map(), data()) -> data().
|
-spec continue_channel_health_check_connected(
|
||||||
continue_channel_health_check_connected(ChannelId, OldStatus, Data0) ->
|
channel_id(), channel_status_map(), channel_status_map(), data()
|
||||||
|
) -> data().
|
||||||
|
continue_channel_health_check_connected(ChannelId, OldStatus, CurrentStatus, Data0) ->
|
||||||
#data{hc_workers = HCWorkers0} = Data0,
|
#data{hc_workers = HCWorkers0} = Data0,
|
||||||
#{channel := CHCWorkers0} = HCWorkers0,
|
#{channel := CHCWorkers0} = HCWorkers0,
|
||||||
CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0),
|
CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0),
|
||||||
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
|
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
|
||||||
|
case OldStatus =:= CurrentStatus of
|
||||||
|
true ->
|
||||||
|
continue_channel_health_check_connected_no_update_during_check(
|
||||||
|
ChannelId, OldStatus, Data1
|
||||||
|
);
|
||||||
|
false ->
|
||||||
|
%% Channel has been updated while the health check process was working so
|
||||||
|
%% we should not clear any alarm or remove the channel from the
|
||||||
|
%% connector
|
||||||
|
Data1
|
||||||
|
end.
|
||||||
|
|
||||||
|
continue_channel_health_check_connected_no_update_during_check(ChannelId, OldStatus, Data1) ->
|
||||||
%% Remove the added channels with a status different from connected or connecting
|
%% Remove the added channels with a status different from connected or connecting
|
||||||
NewStatus = maps:get(ChannelId, Data0#data.added_channels),
|
NewStatus = maps:get(ChannelId, Data1#data.added_channels),
|
||||||
ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
|
ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
|
||||||
Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
|
Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
|
||||||
%% Raise/clear alarms
|
%% Raise/clear alarms
|
||||||
|
@ -1299,13 +1314,23 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
|
||||||
CHCWorkers = CHCWorkers3#{pending := Rest},
|
CHCWorkers = CHCWorkers3#{pending := Rest},
|
||||||
HCWorkers = HCWorkers0#{channel := CHCWorkers},
|
HCWorkers = HCWorkers0#{channel := CHCWorkers},
|
||||||
Data3 = Data2#data{hc_workers = HCWorkers},
|
Data3 = Data2#data{hc_workers = HCWorkers},
|
||||||
Data4 = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
|
Data4 = continue_channel_health_check_connected(
|
||||||
|
ChannelId,
|
||||||
|
PreviousChanStatus,
|
||||||
|
CurrentStatus,
|
||||||
|
Data3
|
||||||
|
),
|
||||||
Data = start_channel_health_check(Data4, NextChannelId),
|
Data = start_channel_health_check(Data4, NextChannelId),
|
||||||
{keep_state, update_state(Data, Data0), Replies};
|
{keep_state, update_state(Data, Data0), Replies};
|
||||||
#{pending := []} ->
|
#{pending := []} ->
|
||||||
HCWorkers = HCWorkers0#{channel := CHCWorkers3},
|
HCWorkers = HCWorkers0#{channel := CHCWorkers3},
|
||||||
Data3 = Data2#data{hc_workers = HCWorkers},
|
Data3 = Data2#data{hc_workers = HCWorkers},
|
||||||
Data = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
|
Data = continue_channel_health_check_connected(
|
||||||
|
ChannelId,
|
||||||
|
PreviousChanStatus,
|
||||||
|
CurrentStatus,
|
||||||
|
Data3
|
||||||
|
),
|
||||||
{keep_state, update_state(Data, Data0), Replies}
|
{keep_state, update_state(Data, Data0), Replies}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -1326,7 +1351,9 @@ handle_channel_health_check_worker_down_new_channels_and_status(
|
||||||
) ->
|
) ->
|
||||||
%% The checked config is different from the current config which means we
|
%% The checked config is different from the current config which means we
|
||||||
%% should not update AddedChannels because the channel has been removed or
|
%% should not update AddedChannels because the channel has been removed or
|
||||||
%% updated while the health check was in progress
|
%% updated while the health check was in progress. We can still reply with
|
||||||
|
%% NewStatus because the health check must have been issued before the
|
||||||
|
%% configuration changed or the channel got removed.
|
||||||
{AddedChannels, NewStatus}.
|
{AddedChannels, NewStatus}.
|
||||||
|
|
||||||
reply_pending_channel_health_check_callers(
|
reply_pending_channel_health_check_callers(
|
||||||
|
|
Loading…
Reference in New Issue