diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 1971ad697..08b3270ea 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -858,6 +858,12 @@ do_start_stop_bridges(Type, Config) -> <<"status_reason">> := <<"connack_timeout">> } -> ok; + #{ + <<"node_status">> := [_, _ | _], + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"connack_timeout">> + } -> + ok; #{ <<"node_status">> := [_], <<"status">> := <<"connecting">> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 06123a935..fcdf56202 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -61,7 +61,7 @@ -export([init/1, callback_mode/0, handle_event/4, terminate/3]). %% Internal exports. --export([worker_resource_health_check/1]). +-export([worker_resource_health_check/1, worker_channel_health_check/2]). % State record -record(data, { @@ -78,12 +78,24 @@ pid, added_channels = #{}, %% Reference to process performing resource health check. - hc_workers = #{resource => #{}, channel => #{}} :: #{ - resource | channel := #{{pid(), reference()} => true} + hc_workers = #{ + resource => #{}, + channel => #{ + pending => [], + previous_status => #{} + } + } :: #{ + resource := #{{pid(), reference()} => true}, + channel := #{ + {pid(), reference()} => channel_id(), + pending := [channel_id()], + previous_status := #{channel_id() => channel_status_map()} + } }, %% Callers waiting on health check - hc_pending_callers = #{resource => [], channel => []} :: #{ - resource | channel := [gen_server:from()] + hc_pending_callers = #{resource => [], channel => #{}} :: #{ + resource := [gen_server:from()], + channel := #{channel_id() => [gen_server:from()]} }, extra }). @@ -107,6 +119,12 @@ -define(state_disconnected, disconnected). -define(state_stopped, stopped). +-type state() :: + ?state_stopped + | ?state_disconnected + | ?state_connecting + | ?state_connected. + -define(IS_STATUS(ST), ST =:= ?status_connecting; ST =:= ?status_connected; ST =:= ?status_disconnected ). @@ -339,6 +357,7 @@ add_channel(ResId, ChannelId, Config) -> Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION), %% Wait for health_check to finish _ = health_check(ResId), + _ = channel_health_check(ResId, ChannelId), Result. remove_channel(ResId, ChannelId) -> @@ -538,11 +557,20 @@ handle_event( info, {'DOWN', Ref, process, Pid, Res}, State0, - Data0 = #data{hc_workers = #{resource := HCWorkers}} + Data0 = #data{hc_workers = #{resource := RHCWorkers}} ) when - is_map_key({Pid, Ref}, HCWorkers) + is_map_key({Pid, Ref}, RHCWorkers) -> handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res); +handle_event( + info, + {'DOWN', Ref, process, Pid, Res}, + _State, + Data0 = #data{hc_workers = #{channel := CHCWorkers}} +) when + is_map_key({Pid, Ref}, CHCWorkers) +-> + handle_channel_health_check_worker_down(Data0, {Pid, Ref}, Res); % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -558,7 +586,7 @@ handle_event(EventType, EventData, State, Data) -> keep_state_and_data. log_status_consistency(Status, #data{status = Status} = Data) -> - log_cache_consistency(read_cache(Data#data.id), Data); + log_cache_consistency(read_cache(Data#data.id), remove_runtime_data(Data)); log_status_consistency(Status, Data) -> ?tp(warning, "inconsistent_status", #{ status => Status, @@ -869,7 +897,7 @@ handle_manual_resource_health_check(From, Data0) -> Data = Data0#data{hc_pending_callers = Pending}, start_resource_health_check(Data). -reply_pending_health_check_callers(Status, resource, Data0 = #data{hc_pending_callers = Pending0}) -> +reply_pending_resource_health_check_callers(Status, Data0 = #data{hc_pending_callers = Pending0}) -> #{resource := RPending} = Pending0, Actions = [{reply, From, {ok, Status}} || From <- RPending], Data = Data0#data{hc_pending_callers = Pending0#{resource := []}}, @@ -888,13 +916,13 @@ start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when keep_state_and_data; start_resource_health_check(#data{} = Data0) -> #data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0, - WorkerRef = {_Pid, _Ref} = spawn_health_check_worker(Data0), + WorkerRef = {_Pid, _Ref} = spawn_resource_health_check_worker(Data0), HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}}, Data = Data0#data{hc_workers = HCWorkers}, {keep_state, Data}. --spec spawn_health_check_worker(data()) -> {pid(), reference()}. -spawn_health_check_worker(#data{} = Data) -> +-spec spawn_resource_health_check_worker(data()) -> {pid(), reference()}. +spawn_resource_health_check_worker(#data{} = Data) -> spawn_monitor(?MODULE, worker_resource_health_check, [Data]). %% separated so it can be spec'ed and placate dialyzer tantrums... @@ -939,7 +967,7 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) -> continue_resource_health_check_connected(NewStatus, Data0) -> case NewStatus of ?status_connected -> - {Replies, Data1} = reply_pending_health_check_callers(NewStatus, resource, Data0), + {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0), Data2 = channels_health_check(?status_connected, Data1), Data = update_state(Data2, Data0), Actions = Replies ++ health_check_actions(Data), @@ -954,13 +982,13 @@ continue_resource_health_check_connected(NewStatus, Data0) -> %% subset of resource manager state... But there should be a conversion %% between the two here, as resource manager also has `stopped', which is %% not a valid status at the time of writing. - {Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0), + {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0), {next_state, NewStatus, channels_health_check(NewStatus, Data), Replies} end. %% Continuation to be used when the current resource state is not `?state_connected'. continue_resource_health_check_not_connected(NewStatus, Data0) -> - {Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0), + {Replies, Data} = reply_pending_resource_health_check_callers(NewStatus, Data0), case NewStatus of ?status_connected -> {next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies}; @@ -975,6 +1003,30 @@ continue_resource_health_check_not_connected(NewStatus, Data0) -> handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) -> {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]}; +handle_manual_channel_health_check( + From, + #data{ + added_channels = Channels, + hc_pending_callers = #{channel := CPending0} = Pending0, + hc_workers = #{channel := #{previous_status := PreviousStatus}} + } = Data0, + ChannelId +) when + is_map_key(ChannelId, Channels), + is_map_key(ChannelId, PreviousStatus) +-> + %% Ongoing health check. + CPending = maps:update_with( + ChannelId, + fun(OtherCallers) -> + [From | OtherCallers] + end, + [From], + CPending0 + ), + Pending = Pending0#{channel := CPending}, + Data = Data0#data{hc_pending_callers = Pending}, + {keep_state, Data}; handle_manual_channel_health_check( From, #data{added_channels = Channels} = _Data, @@ -982,6 +1034,7 @@ handle_manual_channel_health_check( ) when is_map_key(ChannelId, Channels) -> + %% No ongoing health check: reply with current status. {keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]}; handle_manual_channel_health_check( From, @@ -990,10 +1043,6 @@ handle_manual_channel_health_check( ) -> {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}. -get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) -> - RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), - channel_status(RawStatus). - -spec channels_health_check(resource_status(), data()) -> data(). channels_health_check(?status_connected = _ConnectorStatus, Data0) -> Channels = maps:to_list(Data0#data.added_channels), @@ -1009,8 +1058,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) -> get_config_for_channels(Data0, ChannelsNotAdded), Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0), %% Now that we have done the adding, we can get the status of all channels - Data2 = channel_status_for_all_channels(Data1), - update_state(Data2, Data0); + trigger_health_check_for_added_channels(Data1); channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> %% Whenever the resource is connecting: %% 1. Change the status of all added channels to connecting @@ -1105,41 +1153,117 @@ resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) -> ) ). -channel_status_for_all_channels(Data) -> - Channels = maps:to_list(Data#data.added_channels), - AddedChannelsWithOldAndNewStatus = [ - {ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)} - || {ChannelId, OldStatus} <- Channels, +%% Currently, we only call resource channel health checks when the underlying resource is +%% `?status_connected'. +-spec trigger_health_check_for_added_channels(data()) -> data(). +trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) -> + #{channel := CHCWorkers0} = HCWorkers0, + PreviousStatus = maps:from_list([ + {ChannelId, OldStatus} + || {ChannelId, OldStatus} <- maps:to_list(Data0#data.added_channels), channel_status_is_channel_added(OldStatus) - ], + ]), + ChannelsToCheck = maps:keys(PreviousStatus), + case ChannelsToCheck of + [] -> + %% Nothing to do. + Data0; + [ChannelId | Rest] -> + %% Shooting one check at a time. We could increase concurrency in the future. + CHCWorkers = CHCWorkers0#{pending := Rest, previous_status := PreviousStatus}, + Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, + start_channel_health_check(Data1, ChannelId) + end. + +-spec continue_channel_health_check_connected(data()) -> data(). +continue_channel_health_check_connected(Data0) -> + #data{hc_workers = HCWorkers0} = Data0, + #{channel := #{previous_status := PreviousStatus} = CHCWorkers0} = HCWorkers0, + CHCWorkers = CHCWorkers0#{previous_status := #{}}, + Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, %% Remove the added channels with a a status different from connected or connecting + CheckedChannels = [ + {ChannelId, NewStatus} + || {ChannelId, NewStatus} <- maps:to_list(Data0#data.added_channels), + is_map_key(ChannelId, PreviousStatus) + ], ChannelsToRemove = [ ChannelId - || {ChannelId, _, NewStatus} <- AddedChannelsWithOldAndNewStatus, + || {ChannelId, NewStatus} <- CheckedChannels, not channel_status_is_channel_added(NewStatus) ], - Data1 = remove_channels_in_list(ChannelsToRemove, Data, true), + Data = remove_channels_in_list(ChannelsToRemove, Data1, true), %% Raise/clear alarms lists:foreach( fun - ({ID, _OldStatus, #{status := ?status_connected}}) -> + ({ID, #{status := ?status_connected}}) -> _ = maybe_clear_alarm(ID); - ({ID, OldStatus, NewStatus}) -> + ({ID, NewStatus}) -> + OldStatus = maps:get(ID, PreviousStatus), _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) end, - AddedChannelsWithOldAndNewStatus + CheckedChannels ), - %% Update the ChannelsMap - ChannelsMap = Data1#data.added_channels, - NewChannelsMap = - lists:foldl( - fun({ChannelId, _, NewStatus}, Acc) -> - maps:put(ChannelId, NewStatus, Acc) - end, - ChannelsMap, - AddedChannelsWithOldAndNewStatus - ), - Data1#data{added_channels = NewChannelsMap}. + Data. + +-spec start_channel_health_check(data(), channel_id()) -> data(). +start_channel_health_check(#data{} = Data0, ChannelId) -> + #data{hc_workers = HCWorkers0 = #{channel := CHCWorkers0}} = Data0, + WorkerRef = {_Pid, _Ref} = spawn_channel_health_check_worker(Data0, ChannelId), + HCWorkers = HCWorkers0#{channel := CHCWorkers0#{WorkerRef => ChannelId}}, + Data0#data{hc_workers = HCWorkers}. + +-spec spawn_channel_health_check_worker(data(), channel_id()) -> {pid(), reference()}. +spawn_channel_health_check_worker(#data{} = Data, ChannelId) -> + spawn_monitor(?MODULE, worker_channel_health_check, [Data, ChannelId]). + +%% separated so it can be spec'ed and placate dialyzer tantrums... +-spec worker_channel_health_check(data(), channel_id()) -> no_return(). +worker_channel_health_check(Data, ChannelId) -> + #data{id = ResId, mod = Mod, state = State} = Data, + RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), + exit({ok, channel_status(RawStatus)}). + +-spec handle_channel_health_check_worker_down( + data(), {pid(), reference()}, {ok, channel_status_map()} +) -> + gen_statem:event_handler_result(state(), data()). +handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> + #data{ + hc_workers = HCWorkers0 = #{channel := CHCWorkers0}, + added_channels = AddedChannels0 + } = Data0, + {ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0), + case ExitResult of + {ok, NewStatus} -> + %% `emqx_resource:call_channel_health_check' catches all exceptions. + AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0) + end, + Data1 = Data0#data{added_channels = AddedChannels}, + {Replies, Data2} = reply_pending_channel_health_check_callers(ChannelId, NewStatus, Data1), + case CHCWorkers1 of + #{pending := [NextChannelId | Rest]} -> + CHCWorkers = CHCWorkers1#{pending := Rest}, + HCWorkers = HCWorkers0#{channel := CHCWorkers}, + Data3 = Data2#data{hc_workers = HCWorkers}, + Data = start_channel_health_check(Data3, NextChannelId), + {keep_state, update_state(Data, Data0), Replies}; + #{pending := []} -> + HCWorkers = HCWorkers0#{channel := CHCWorkers1}, + Data3 = Data2#data{hc_workers = HCWorkers}, + Data = continue_channel_health_check_connected(Data3), + {keep_state, update_state(Data, Data0), Replies} + end. + +reply_pending_channel_health_check_callers( + ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0} +) -> + #{channel := CPending0} = Pending0, + Pending = maps:get(ChannelId, CPending0, []), + Actions = [{reply, From, Status} || From <- Pending], + CPending = maps:remove(ChannelId, CPending0), + Data = Data0#data{hc_pending_callers = Pending0#{channel := CPending}}, + {Actions, Data}. get_config_for_channels(Data0, ChannelsWithoutConfig) -> ResId = Data0#data.id, @@ -1181,8 +1305,14 @@ update_state(Data, _DataWas) -> remove_runtime_data(#data{} = Data0) -> Data0#data{ - hc_workers = #{resource => #{}, channel => #{}}, - hc_pending_callers = #{resource => [], channel => []} + hc_workers = #{ + resource => #{}, + channel => #{pending => [], previous_status => #{}} + }, + hc_pending_callers = #{ + resource => [], + channel => #{} + } }. health_check_interval(Opts) -> diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index d1ac5c2e6..754727e8c 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -31,7 +31,12 @@ on_query_async/4, on_batch_query/3, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([counter_loop/0, set_callback_mode/1]). @@ -40,6 +45,7 @@ -export([roots/0]). -define(CM_KEY, {?MODULE, callback_mode}). +-define(PT_CHAN_KEY(CONN_RES_ID), {?MODULE, chans, CONN_RES_ID}). roots() -> [ @@ -71,12 +77,14 @@ on_start(InstId, #{name := Name} = Opts) -> {ok, Opts#{ id => InstId, stop_error => StopError, + channels => #{}, pid => spawn_counter_process(Name, Register) }}. on_stop(_InstId, #{stop_error := true}) -> {error, stop_error}; -on_stop(_InstId, #{pid := Pid}) -> +on_stop(InstId, #{pid := Pid}) -> + persistent_term:erase(?PT_CHAN_KEY(InstId)), stop_counter_process(Pid). on_query(_InstId, get_state, State) -> @@ -295,6 +303,31 @@ on_get_status(_InstId, #{pid := Pid}) -> false -> ?status_disconnected end. +on_add_channel(ConnResId, ConnSt0, ChanId, ChanCfg) -> + ConnSt = emqx_utils_maps:deep_put([channels, ChanId], ConnSt0, ChanCfg), + do_add_channel(ConnResId, ChanId, ChanCfg), + {ok, ConnSt}. + +on_remove_channel(ConnResId, ConnSt0, ChanId) -> + ConnSt = emqx_utils_maps:deep_remove([channels, ChanId], ConnSt0), + do_remove_channel(ConnResId, ChanId), + {ok, ConnSt}. + +on_get_channels(ConnResId) -> + persistent_term:get(?PT_CHAN_KEY(ConnResId), []). + +on_get_channel_status(_ConnResId, ChanId, #{channels := Chans}) -> + case Chans of + #{ChanId := #{health_check_delay := Delay}} -> + ?tp(connector_demo_channel_health_check_delay, #{}), + timer:sleep(Delay), + ?status_connected; + #{ChanId := _ChanCfg} -> + ?status_connected; + #{} -> + ?status_disconnected + end. + spawn_counter_process(Name, Register) -> Pid = spawn_link(?MODULE, counter_loop, []), true = maybe_register(Name, Pid, Register), @@ -455,3 +488,11 @@ make_random_reply(N) -> 3 -> {error, {unrecoverable_error, N}} end. + +do_add_channel(ConnResId, ChanId, ChanCfg) -> + Chans = persistent_term:get(?PT_CHAN_KEY(ConnResId), []), + persistent_term:put(?PT_CHAN_KEY(ConnResId), [{ChanId, ChanCfg} | Chans]). + +do_remove_channel(ConnResId, ChanId) -> + Chans = persistent_term:get(?PT_CHAN_KEY(ConnResId), []), + persistent_term:put(?PT_CHAN_KEY(ConnResId), proplists:delete(ChanId, Chans)). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index a6cdaedb2..99e85424d 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -3141,6 +3141,55 @@ t_non_blocking_resource_health_check(_Config) -> ), ok. +t_non_blocking_channel_health_check(_Config) -> + ?check_trace( + begin + {ok, _} = + create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, health_check_error => {delay, 500}}, + #{health_check_interval => 100} + ), + ChanId = <<"chan">>, + ok = + emqx_resource_manager:add_channel( + ?ID, + ChanId, + #{health_check_delay => 500} + ), + + %% concurrently attempt to health check the resource; should do it only once + %% for all callers + NumCallers = 20, + Expected = lists:duplicate( + NumCallers, + #{error => undefined, status => connected} + ), + ?assertEqual( + Expected, + emqx_utils:pmap( + fun(_) -> emqx_resource_manager:channel_health_check(?ID, ChanId) end, + lists:seq(1, NumCallers) + ) + ), + + NumCallers + end, + [ + log_consistency_prop(), + fun(NumCallers, Trace) -> + %% shouldn't have one health check per caller + SubTrace = ?of_kind(connector_demo_channel_health_check_delay, Trace), + ?assertMatch([_ | _], SubTrace), + ?assert(length(SubTrace) < (NumCallers div 2), #{trace => Trace}), + ok + end + ] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-12830.en.md b/changes/ce/fix-12830.en.md new file mode 100644 index 000000000..5800a9bd3 --- /dev/null +++ b/changes/ce/fix-12830.en.md @@ -0,0 +1 @@ +Made channel (action/source) health checks non-blocking operations. This means that operations such as updating or removing an action/source data integration won't be blocked by a lengthy running health check.