diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c3b746d8e..50a25620c 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -143,6 +143,15 @@ perform_health_check => boolean() }. +%% calls/casts/generic timeouts +-record(add_channel, {channel_id :: channel_id(), config :: map()}). +-record(start_channel_health_check, {channel_id :: channel_id()}). + +-type generic_timeout(Id, Content) :: {{timeout, Id}, timeout(), Content}. +-type start_channel_health_check_action() :: generic_timeout( + #start_channel_health_check{}, #start_channel_health_check{} +). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -405,7 +414,7 @@ add_channel(ResId, ChannelId, Config) -> ) -> ok | {error, term()}. add_channel(ResId, ChannelId, Config, Opts) -> - Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION), + Result = safe_call(ResId, #add_channel{channel_id = ChannelId, config = Config}, ?T_OPERATION), maybe true ?= maps:get(perform_health_check, Opts, true), %% Wait for health_check to finish @@ -570,7 +579,9 @@ handle_event({call, From}, health_check, _State, Data) -> handle_manual_resource_health_check(From, Data); handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) -> handle_manual_channel_health_check(From, Data, ChannelId); -% State: CONNECTING +%%-------------------------- +%% State: CONNECTING +%%-------------------------- handle_event(enter, _OldState, ?state_connecting = State, Data) -> ok = log_status_consistency(State, Data), {keep_state_and_data, [{state_timeout, 0, health_check}]}; @@ -582,25 +593,39 @@ handle_event( {call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data ) -> handle_remove_channel(From, ChannelId, Data); +%%-------------------------- %% State: CONNECTED %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks +%%-------------------------- handle_event(enter, _OldState, ?state_connected = State, Data) -> ok = log_status_consistency(State, Data), _ = emqx_alarm:safe_deactivate(Data#data.id), ?tp(resource_connected_enter, #{}), - {keep_state_and_data, health_check_actions(Data)}; + {keep_state_and_data, resource_health_check_actions(Data)}; handle_event(state_timeout, health_check, ?state_connected, Data) -> start_resource_health_check(Data); handle_event( - {call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data + {call, From}, + #add_channel{channel_id = ChannelId, config = Config}, + ?state_connected = _State, + Data ) -> handle_add_channel(From, Data, ChannelId, Config); handle_event( {call, From}, {remove_channel, ChannelId}, ?state_connected = _State, Data ) -> handle_remove_channel(From, ChannelId, Data); +handle_event( + {timeout, #start_channel_health_check{channel_id = ChannelId}}, + _, + ?state_connected = _State, + Data +) -> + handle_start_channel_health_check(Data, ChannelId); +%%-------------------------- %% State: DISCONNECTED +%%-------------------------- handle_event(enter, _OldState, ?state_disconnected = State, Data) -> ok = log_status_consistency(State, Data), ?tp(resource_disconnected_enter, #{}), @@ -608,14 +633,18 @@ handle_event(enter, _OldState, ?state_disconnected = State, Data) -> handle_event(state_timeout, auto_retry, ?state_disconnected, Data) -> ?tp(resource_auto_reconnect, #{}), start_resource(Data, undefined); +%%-------------------------- %% State: STOPPED %% The stopped state is entered after the resource has been explicitly stopped +%%-------------------------- handle_event(enter, _OldState, ?state_stopped = State, Data) -> ok = log_status_consistency(State, Data), {keep_state_and_data, []}; +%%-------------------------- %% The following events can be handled in any other state +%%-------------------------- handle_event( - {call, From}, {add_channel, ChannelId, Config}, State, Data + {call, From}, #add_channel{channel_id = ChannelId, config = Config}, State, Data ) -> handle_not_connected_add_channel(From, ChannelId, Config, State, Data); handle_event( @@ -645,6 +674,9 @@ handle_event( is_map_key(Pid, CHCWorkers) -> handle_channel_health_check_worker_down(Data0, Pid, Res); +handle_event({timeout, #start_channel_health_check{channel_id = _}}, _, _State, _Data) -> + %% Stale health check action; currently, we only probe channel health when connected. + keep_state_and_data; % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -702,7 +734,7 @@ retry_actions(Data) -> [{state_timeout, RetryInterval, auto_retry}] end. -health_check_actions(Data) -> +resource_health_check_actions(Data) -> [{state_timeout, health_check_interval(Data#data.opts), health_check}]. handle_remove_event(From, ClearMetrics, Data) -> @@ -1079,7 +1111,7 @@ continue_resource_health_check_connected(NewStatus, Data0) -> {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0), Data2 = channels_health_check(?status_connected, Data1), Data = update_state(Data2, Data0), - Actions = Replies ++ health_check_actions(Data), + Actions = Replies ++ resource_health_check_actions(Data), {keep_state, Data, Actions}; _ -> ?SLOG(warning, #{ @@ -1091,23 +1123,28 @@ continue_resource_health_check_connected(NewStatus, Data0) -> %% subset of resource manager state... But there should be a conversion %% between the two here, as resource manager also has `stopped', which is %% not a valid status at the time of writing. - {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0), - {next_state, NewStatus, channels_health_check(NewStatus, Data), Replies} + {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0), + Data = channels_health_check(NewStatus, Data1), + Actions = Replies, + {next_state, NewStatus, Data, Actions} end. %% Continuation to be used when the current resource state is not `?state_connected'. continue_resource_health_check_not_connected(NewStatus, Data0) -> - {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0), + {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0), case NewStatus of ?status_connected -> - {next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies}; + Data = channels_health_check(?status_connected, Data1), + Actions = Replies, + {next_state, ?state_connected, Data, Actions}; ?status_connecting -> - Actions = Replies ++ health_check_actions(Data), - {next_state, ?status_connecting, channels_health_check(?status_connecting, Data), - Actions}; + Data = channels_health_check(?status_connecting, Data1), + Actions = Replies ++ resource_health_check_actions(Data), + {next_state, ?status_connecting, Data, Actions}; ?status_disconnected -> - {next_state, ?state_disconnected, channels_health_check(?status_disconnected, Data), - Replies} + Data = channels_health_check(?status_disconnected, Data1), + Actions = Replies, + {next_state, ?state_disconnected, Data, Actions} end. handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) -> @@ -1269,38 +1306,60 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) -> ) ). +-spec generic_timeout_action(Id, timeout(), Content) -> generic_timeout(Id, Content). +generic_timeout_action(Id, Timeout, Content) -> + {{timeout, Id}, Timeout, Content}. + +-spec start_channel_health_check_action(channel_id(), map(), map(), data() | timeout()) -> + [start_channel_health_check_action()]. +start_channel_health_check_action(ChannelId, NewChanStatus, PreviousChanStatus, Data = #data{}) -> + Timeout = get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data), + Event = #start_channel_health_check{channel_id = ChannelId}, + [generic_timeout_action(Event, Timeout, Event)]. + +get_channel_health_check_interval(ChannelId, NewChanStatus, PreviousChanStatus, Data) -> + emqx_utils:foldl_while( + fun + (#{config := #{resource_opts := #{health_check_interval := HCInterval}}}, _Acc) -> + {halt, HCInterval}; + (_, Acc) -> + {cont, Acc} + end, + ?HEALTHCHECK_INTERVAL, + [ + NewChanStatus, + PreviousChanStatus, + maps:get(ChannelId, Data#data.added_channels, #{}) + ] + ). + %% Currently, we only call resource channel health checks when the underlying resource is %% `?status_connected'. -spec trigger_health_check_for_added_channels(data()) -> data(). trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) -> #{ - channel := CHCWorkers0 = + channel := #{ - pending := CPending0, + %% TODO: rm pending + %% pending := CPending0, ongoing := Ongoing0 } } = HCWorkers0, NewOngoing = maps:filter( fun(ChannelId, OldStatus) -> - not is_map_key(ChannelId, Ongoing0) and + (not is_map_key(ChannelId, Ongoing0)) andalso channel_status_is_channel_added(OldStatus) end, Data0#data.added_channels ), ChannelsToCheck = maps:keys(NewOngoing), - case ChannelsToCheck of - [] -> - %% Nothing to do. - Data0; - [ChannelId | Rest] -> - %% Shooting one check at a time. We could increase concurrency in the future. - CHCWorkers = CHCWorkers0#{ - pending := CPending0 ++ Rest, - ongoing := maps:merge(Ongoing0, NewOngoing) - }, - Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, - start_channel_health_check(Data1, ChannelId) - end. + lists:foldl( + fun(ChannelId, Acc) -> + start_channel_health_check(Acc, ChannelId) + end, + Data0, + ChannelsToCheck + ). -spec continue_channel_health_check_connected( channel_id(), channel_status_map(), channel_status_map(), data() @@ -1338,12 +1397,29 @@ continue_channel_health_check_connected_no_update_during_check(ChannelId, OldSta end, Data. +-spec handle_start_channel_health_check(data(), channel_id()) -> + gen_statem:event_handler_result(state(), data()). +handle_start_channel_health_check(Data0, ChannelId) -> + Data = start_channel_health_check(Data0, ChannelId), + {keep_state, Data}. + -spec start_channel_health_check(data(), channel_id()) -> data(). -start_channel_health_check(#data{} = Data0, ChannelId) -> +start_channel_health_check( + #data{added_channels = AddedChannels, hc_workers = #{channel := #{ongoing := CHCOngoing0}}} = + Data0, + ChannelId +) when + is_map_key(ChannelId, AddedChannels) andalso (not is_map_key(ChannelId, CHCOngoing0)) +-> #data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0, WorkerPid = spawn_channel_health_check_worker(Data0, ChannelId), - HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerPid => ChannelId}}, - Data0#data{hc_workers = HCWorkers}. + ChannelStatus = maps:get(ChannelId, AddedChannels), + CHCOngoing = CHCOngoing0#{ChannelId => ChannelStatus}, + CHCWorkers = CHCWorkers0#{WorkerPid => ChannelId, ongoing := CHCOngoing}, + HCWorkers = HCWorkers0#{channel := CHCWorkers}, + Data0#data{hc_workers = HCWorkers}; +start_channel_health_check(Data, _ChannelId) -> + Data. -spec spawn_channel_health_check_worker(data(), channel_id()) -> pid(). spawn_channel_health_check_worker(#data{} = Data, ChannelId) -> @@ -1380,33 +1456,19 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> #{ongoing := Ongoing0} = CHCWorkers1, {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0), CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1}, - CHCWorkers3 = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers2), Data1 = Data0#data{added_channels = AddedChannels}, {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1), - case CHCWorkers1 of - #{pending := [NextChannelId | Rest]} -> - CHCWorkers = CHCWorkers3#{pending := Rest}, - HCWorkers = HCWorkers0#{channel := CHCWorkers}, - Data3 = Data2#data{hc_workers = HCWorkers}, - Data4 = continue_channel_health_check_connected( - ChannelId, - PreviousChanStatus, - CurrentStatus, - Data3 - ), - Data = start_channel_health_check(Data4, NextChannelId), - {keep_state, update_state(Data, Data0), Replies}; - #{pending := []} -> - HCWorkers = HCWorkers0#{channel := CHCWorkers3}, - Data3 = Data2#data{hc_workers = HCWorkers}, - Data = continue_channel_health_check_connected( - ChannelId, - PreviousChanStatus, - CurrentStatus, - Data3 - ), - {keep_state, update_state(Data, Data0), Replies} - end. + HCWorkers = HCWorkers0#{channel := CHCWorkers2}, + Data3 = Data2#data{hc_workers = HCWorkers}, + Data = continue_channel_health_check_connected( + ChannelId, + PreviousChanStatus, + CurrentStatus, + Data3 + ), + CHCActions = start_channel_health_check_action(ChannelId, NewStatus, PreviousChanStatus, Data), + Actions = Replies ++ CHCActions, + {keep_state, update_state(Data, Data0), Actions}. handle_channel_health_check_worker_down_new_channels_and_status( ChannelId, diff --git a/changes/ce/fix-13442.en.md b/changes/ce/fix-13442.en.md new file mode 100644 index 000000000..05aaee8a0 --- /dev/null +++ b/changes/ce/fix-13442.en.md @@ -0,0 +1 @@ +Fixed an issue where the health check interval values of actions/sources were not being taken into account.