fix(resource): account for ongoing channel health checks, update data and reply immediately when receiving an update
This commit is contained in:
parent
79526d539a
commit
475077c798
|
@ -81,15 +81,15 @@
|
||||||
hc_workers = #{
|
hc_workers = #{
|
||||||
resource => #{},
|
resource => #{},
|
||||||
channel => #{
|
channel => #{
|
||||||
pending => [],
|
ongoing => #{},
|
||||||
previous_status => #{}
|
pending => []
|
||||||
}
|
}
|
||||||
} :: #{
|
} :: #{
|
||||||
resource := #{{pid(), reference()} => true},
|
resource := #{{pid(), reference()} => true},
|
||||||
channel := #{
|
channel := #{
|
||||||
{pid(), reference()} => channel_id(),
|
{pid(), reference()} => channel_id(),
|
||||||
pending := [channel_id()],
|
ongoing := #{channel_id() => channel_status_map()},
|
||||||
previous_status := #{channel_id() => channel_status_map()}
|
pending := [channel_id()]
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
%% Callers waiting on health check
|
%% Callers waiting on health check
|
||||||
|
@ -1039,12 +1039,12 @@ handle_manual_channel_health_check(
|
||||||
#data{
|
#data{
|
||||||
added_channels = Channels,
|
added_channels = Channels,
|
||||||
hc_pending_callers = #{channel := CPending0} = Pending0,
|
hc_pending_callers = #{channel := CPending0} = Pending0,
|
||||||
hc_workers = #{channel := #{previous_status := PreviousStatus}}
|
hc_workers = #{channel := #{ongoing := Ongoing}}
|
||||||
} = Data0,
|
} = Data0,
|
||||||
ChannelId
|
ChannelId
|
||||||
) when
|
) when
|
||||||
is_map_key(ChannelId, Channels),
|
is_map_key(ChannelId, Channels),
|
||||||
is_map_key(ChannelId, PreviousStatus)
|
is_map_key(ChannelId, Ongoing)
|
||||||
->
|
->
|
||||||
%% Ongoing health check.
|
%% Ongoing health check.
|
||||||
CPending = maps:update_with(
|
CPending = maps:update_with(
|
||||||
|
@ -1189,53 +1189,54 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
|
||||||
%% `?status_connected'.
|
%% `?status_connected'.
|
||||||
-spec trigger_health_check_for_added_channels(data()) -> data().
|
-spec trigger_health_check_for_added_channels(data()) -> data().
|
||||||
trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) ->
|
trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) ->
|
||||||
#{channel := CHCWorkers0} = HCWorkers0,
|
#{
|
||||||
PreviousStatus = maps:from_list([
|
channel := CHCWorkers0 =
|
||||||
{ChannelId, OldStatus}
|
#{
|
||||||
|| {ChannelId, OldStatus} <- maps:to_list(Data0#data.added_channels),
|
pending := CPending0,
|
||||||
channel_status_is_channel_added(OldStatus)
|
ongoing := Ongoing0
|
||||||
]),
|
}
|
||||||
ChannelsToCheck = maps:keys(PreviousStatus),
|
} = HCWorkers0,
|
||||||
|
NewOngoing = maps:filter(
|
||||||
|
fun(ChannelId, OldStatus) ->
|
||||||
|
not is_map_key(ChannelId, Ongoing0) and
|
||||||
|
channel_status_is_channel_added(OldStatus)
|
||||||
|
end,
|
||||||
|
Data0#data.added_channels
|
||||||
|
),
|
||||||
|
ChannelsToCheck = maps:keys(NewOngoing),
|
||||||
case ChannelsToCheck of
|
case ChannelsToCheck of
|
||||||
[] ->
|
[] ->
|
||||||
%% Nothing to do.
|
%% Nothing to do.
|
||||||
Data0;
|
Data0;
|
||||||
[ChannelId | Rest] ->
|
[ChannelId | Rest] ->
|
||||||
%% Shooting one check at a time. We could increase concurrency in the future.
|
%% Shooting one check at a time. We could increase concurrency in the future.
|
||||||
CHCWorkers = CHCWorkers0#{pending := Rest, previous_status := PreviousStatus},
|
CHCWorkers = CHCWorkers0#{
|
||||||
|
pending := CPending0 ++ Rest,
|
||||||
|
ongoing := maps:merge(Ongoing0, NewOngoing)
|
||||||
|
},
|
||||||
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
|
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
|
||||||
start_channel_health_check(Data1, ChannelId)
|
start_channel_health_check(Data1, ChannelId)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec continue_channel_health_check_connected(data()) -> data().
|
-spec continue_channel_health_check_connected(channel_id(), channel_status_map(), data()) -> data().
|
||||||
continue_channel_health_check_connected(Data0) ->
|
continue_channel_health_check_connected(ChannelId, OldStatus, Data0) ->
|
||||||
#data{hc_workers = HCWorkers0} = Data0,
|
#data{hc_workers = HCWorkers0} = Data0,
|
||||||
#{channel := #{previous_status := PreviousStatus} = CHCWorkers0} = HCWorkers0,
|
#{channel := CHCWorkers0} = HCWorkers0,
|
||||||
CHCWorkers = CHCWorkers0#{previous_status := #{}},
|
CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0),
|
||||||
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
|
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
|
||||||
%% Remove the added channels with a a status different from connected or connecting
|
%% Remove the added channels with a a status different from connected or connecting
|
||||||
CheckedChannels = [
|
NewStatus = maps:get(ChannelId, Data0#data.added_channels),
|
||||||
{ChannelId, NewStatus}
|
ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
|
||||||
|| {ChannelId, NewStatus} <- maps:to_list(Data0#data.added_channels),
|
|
||||||
is_map_key(ChannelId, PreviousStatus)
|
|
||||||
],
|
|
||||||
ChannelsToRemove = [
|
|
||||||
ChannelId
|
|
||||||
|| {ChannelId, NewStatus} <- CheckedChannels,
|
|
||||||
not channel_status_is_channel_added(NewStatus)
|
|
||||||
],
|
|
||||||
Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
|
Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
|
||||||
%% Raise/clear alarms
|
%% Raise/clear alarms
|
||||||
lists:foreach(
|
case NewStatus of
|
||||||
fun
|
#{status := ?status_connected} ->
|
||||||
({ID, #{status := ?status_connected}}) ->
|
_ = maybe_clear_alarm(ChannelId),
|
||||||
_ = maybe_clear_alarm(ID);
|
ok;
|
||||||
({ID, NewStatus}) ->
|
_ ->
|
||||||
OldStatus = maps:get(ID, PreviousStatus),
|
_ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus),
|
||||||
_ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
|
ok
|
||||||
end,
|
end,
|
||||||
CheckedChannels
|
|
||||||
),
|
|
||||||
Data.
|
Data.
|
||||||
|
|
||||||
-spec start_channel_health_check(data(), channel_id()) -> data().
|
-spec start_channel_health_check(data(), channel_id()) -> data().
|
||||||
|
@ -1271,19 +1272,24 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
|
||||||
%% `emqx_resource:call_channel_health_check' catches all exceptions.
|
%% `emqx_resource:call_channel_health_check' catches all exceptions.
|
||||||
AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0)
|
AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0)
|
||||||
end,
|
end,
|
||||||
|
#{ongoing := Ongoing0} = CHCWorkers1,
|
||||||
|
{PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0),
|
||||||
|
CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1},
|
||||||
|
CHCWorkers3 = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers2),
|
||||||
Data1 = Data0#data{added_channels = AddedChannels},
|
Data1 = Data0#data{added_channels = AddedChannels},
|
||||||
{Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1),
|
{Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1),
|
||||||
case CHCWorkers1 of
|
case CHCWorkers1 of
|
||||||
#{pending := [NextChannelId | Rest]} ->
|
#{pending := [NextChannelId | Rest]} ->
|
||||||
CHCWorkers = CHCWorkers1#{pending := Rest},
|
CHCWorkers = CHCWorkers3#{pending := Rest},
|
||||||
HCWorkers = HCWorkers0#{channel := CHCWorkers},
|
HCWorkers = HCWorkers0#{channel := CHCWorkers},
|
||||||
Data3 = Data2#data{hc_workers = HCWorkers},
|
Data3 = Data2#data{hc_workers = HCWorkers},
|
||||||
Data = start_channel_health_check(Data3, NextChannelId),
|
Data4 = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
|
||||||
|
Data = start_channel_health_check(Data4, NextChannelId),
|
||||||
{keep_state, update_state(Data, Data0), Replies};
|
{keep_state, update_state(Data, Data0), Replies};
|
||||||
#{pending := []} ->
|
#{pending := []} ->
|
||||||
HCWorkers = HCWorkers0#{channel := CHCWorkers1},
|
HCWorkers = HCWorkers0#{channel := CHCWorkers3},
|
||||||
Data3 = Data2#data{hc_workers = HCWorkers},
|
Data3 = Data2#data{hc_workers = HCWorkers},
|
||||||
Data = continue_channel_health_check_connected(Data3),
|
Data = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
|
||||||
{keep_state, update_state(Data, Data0), Replies}
|
{keep_state, update_state(Data, Data0), Replies}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -1339,7 +1345,7 @@ remove_runtime_data(#data{} = Data0) ->
|
||||||
Data0#data{
|
Data0#data{
|
||||||
hc_workers = #{
|
hc_workers = #{
|
||||||
resource => #{},
|
resource => #{},
|
||||||
channel => #{pending => [], previous_status => #{}}
|
channel => #{pending => [], ongoing => #{}}
|
||||||
},
|
},
|
||||||
hc_pending_callers = #{
|
hc_pending_callers = #{
|
||||||
resource => [],
|
resource => [],
|
||||||
|
|
Loading…
Reference in New Issue