From 477ed11de8d8b8a3df8e56af512a209333ff0195 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 25 Oct 2023 09:56:15 +0200 Subject: [PATCH] fix: periodical status checks and alarms for channels --- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 118 ++++++- .../test/emqx_bridge_v2_test_connector.erl | 15 +- apps/emqx_connector/src/emqx_connector.erl | 3 +- apps/emqx_resource/include/emqx_resource.hrl | 1 + apps/emqx_resource/src/emqx_resource.erl | 5 +- .../src/emqx_resource_manager.erl | 307 ++++++++++++------ 6 files changed, 349 insertions(+), 100 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index a0786d91a..95385c585 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -30,6 +30,9 @@ con_type() -> con_name() -> my_connector. +connector_resource_id() -> + emqx_connector_resource:resource_id(con_type(), con_name()). + bridge_type() -> test_bridge_type. @@ -48,7 +51,13 @@ con_schema() -> ]. con_config() -> - #{}. + #{ + <<"enable">> => true, + <<"resource_opts">> => #{ + %% Set this to a low value to make the test run faster + <<"health_check_interval">> => 100 + } + }. bridge_schema() -> [ @@ -66,9 +75,17 @@ bridge_schema() -> bridge_config() -> #{ - <<"connector">> => atom_to_binary(con_name()) + <<"connector">> => atom_to_binary(con_name()), + <<"enable">> => true, + <<"send_to">> => registered_process_name(), + <<"resource_opts">> => #{ + <<"resume_interval">> => 100 + } }. +registered_process_name() -> + my_registered_process. + all() -> emqx_common_test_helpers:all(?MODULE). @@ -179,3 +196,100 @@ t_is_valid_bridge_v1(_) -> {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2), false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), ok. + +t_manual_health_check(_) -> + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), + %% Run a health check for the bridge + connected = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok. + +t_manual_health_check_exception(_) -> + Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> throw(my_error) end}, + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), + %% Run a health check for the bridge + {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok. + +t_manual_health_check_exception_error(_) -> + Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> error(my_error) end}, + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), + %% Run a health check for the bridge + {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok. + +t_manual_health_check_error(_) -> + Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> {error, my_error} end}, + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), + %% Run a health check for the bridge + {error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok. + +t_send_message(_) -> + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), + %% Register name for this process + register(registered_process_name(), self()), + _ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{}), + receive + <<"my_msg">> -> + ok + after 1000 -> + ct:fail("Failed to receive message") + end, + unregister(registered_process_name()), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok. + +t_send_message_unhealthy_channel(_) -> + OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]), + ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}), + OnGetStatusFun = fun() -> ets:lookup_element(OnGetStatusResponseETS, status_value, 2) end, + Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => OnGetStatusFun}, + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), + %% Register name for this process + register(registered_process_name(), self()), + _ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 1}), + receive + Any -> + ct:pal("Received message: ~p", [Any]), + ct:fail("Should not get message here") + after 1 -> + ok + end, + %% Sending should work again after the channel is healthy + ets:insert(OnGetStatusResponseETS, {status_value, connected}), + _ = emqx_bridge_v2:send_message( + bridge_type(), + my_test_bridge, + <<"my_msg">>, + #{resume_interval => 100} + ), + receive + <<"my_msg">> -> + ok + after 10000 -> + ct:fail("Failed to receive message") + end, + unregister(registered_process_name()), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok. + +t_unhealthy_channel_alarm(_) -> + Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> {error, my_error} end}, + 0 = get_bridge_v2_alarm_cnt(), + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), + 1 = get_bridge_v2_alarm_cnt(), + {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + 0 = get_bridge_v2_alarm_cnt(), + ok. + +get_bridge_v2_alarm_cnt() -> + Alarms = emqx_alarm:get_alarms(activated), + FilterFun = fun + (#{name := S}) when is_binary(S) -> string:find(S, "bridge_v2") =/= nomatch; + (_) -> false + end, + length(lists:filter(FilterFun, Alarms)). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index cda923bde..2774643a9 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -73,10 +73,19 @@ on_remove_channel( on_query( _InstId, - {_MessageTag, _Message}, - _ConnectorState + {ChannelId, Message}, + ConnectorState ) -> - throw(not_implemented). + Channels = maps:get(channels, ConnectorState, #{}), + %% Lookup the channel + ChannelState = maps:get(ChannelId, Channels, not_found), + case ChannelState of + not_found -> throw(<<"Channel not active">>); + _ -> ok + end, + SendTo = maps:get(send_to, ChannelState), + SendTo ! Message, + ok. on_get_channels(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId). diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 554274697..996c13bbf 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -150,7 +150,8 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> {error, {active_channels, Channels}} end; post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) -> - ok = emqx_connector_resource:create(Type, Name, NewConf), + ResOpts = emqx_resource:fetch_creation_opts(NewConf), + ok = emqx_connector_resource:create(Type, Name, NewConf, ResOpts), ?tp(connector_post_config_update_done, #{}), ok; post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 30f936f8c..beaea0c99 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -22,6 +22,7 @@ -type resource_spec() :: map(). -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. +-type channel_status() :: connected | connecting. -type callback_mode() :: always_sync | async_if_possible. -type query_mode() :: simple_sync diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 292c76d67..f7f878d82 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -527,9 +527,8 @@ call_health_check(ResId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)). -spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) -> - resource_status() - | {resource_status()} - | {resource_status(), term()} + channel_status() + | {channel_status(), term()} | {error, term()}. call_channel_health_check(ResId, ChannelId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index f81bc093d..d0fc168ba 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -308,18 +308,13 @@ health_check(ResId) -> -spec channel_health_check(resource_id(), channel_id()) -> {ok, resource_status()} | {error, term()}. channel_health_check(ResId, ChannelId) -> + %% Do normal health check first to trigger health checks for channels + %% and update the cached health status for the channels + _ = health_check(ResId), safe_call(ResId, {channel_health_check, ChannelId}, ?T_OPERATION). add_channel(ResId, ChannelId, Config) -> - %% Use cache to avoid doing inter process communication on every call - Data = read_cache(ResId), - AddedChannels = Data#data.added_channels, - case maps:get(ChannelId, AddedChannels, false) of - true -> - ok; - false -> - safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION) - end. + safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION). remove_channel(ResId, ChannelId) -> safe_call(ResId, {remove_channel, ChannelId}, ?T_OPERATION). @@ -448,7 +443,7 @@ handle_event(state_timeout, auto_retry, disconnected, Data) -> handle_event(enter, _OldState, stopped = State, Data) -> ok = log_state_consistency(State, Data), {keep_state_and_data, []}; -% Ignore all other events +%% The following events can be handled in any state handle_event( {call, From}, {add_channel, ChannelId, Config}, _State, Data ) -> @@ -462,6 +457,7 @@ handle_event( ) -> Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod), {keep_state_and_data, {reply, From, {ok, Channels}}}; +% Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( error, @@ -557,8 +553,19 @@ start_resource(Data, From) -> end. add_channels(Data) -> + %% Add channels to the Channels map but not to the resource state + %% Channels will be added to the resouce state after the initial health_check + %% if that succeeds. ChannelIDConfigTuples = emqx_resource:call_get_channels(Data#data.id, Data#data.mod), - add_channels_in_list(ChannelIDConfigTuples, Data). + Channels = Data#data.added_channels, + NewChannels = lists:foldl( + fun({ChannelID, _Conf}, Acc) -> + maps:put(ChannelID, {error, connecting}, Acc) + end, + Channels, + ChannelIDConfigTuples + ), + Data#data{added_channels = NewChannels}. add_channels_in_list([], Data) -> Data; @@ -570,20 +577,29 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> of {ok, NewState} -> AddedChannelsMap = Data#data.added_channels, - NewAddedChannelsMap = maps:put(ChannelID, true, AddedChannelsMap), + %% Set the channel status to connecting to indicate that + %% we have not yet performed the initial health_check + NewAddedChannelsMap = maps:put(ChannelID, connecting, AddedChannelsMap), NewData = Data#data{ state = NewState, added_channels = NewAddedChannelsMap }, add_channels_in_list(Rest, NewData); - {error, Reason} -> + {error, Reason} = Error -> ?SLOG(warning, #{ msg => add_channel_failed, id => Data#data.id, channel_id => ChannelID, reason => Reason }), - add_channels_in_list(Rest, Data) + AddedChannelsMap = Data#data.added_channels, + NewAddedChannelsMap = maps:put(ChannelID, Error, AddedChannelsMap), + NewData = Data#data{ + added_channels = NewAddedChannelsMap + }, + %% Raise an alarm since the channel could not be added + _ = maybe_alarm(disconnected, ChannelID, Error, no_prev_error), + add_channels_in_list(Rest, NewData) end. maybe_stop_resource(#data{status = Status} = Data) when Status /= stopped -> @@ -611,22 +627,29 @@ stop_resource(#data{state = ResState, id = ResId} = Data) -> remove_channels(Data) -> Channels = maps:keys(Data#data.added_channels), - remove_channels_in_list(Channels, Data). + remove_channels_in_list(Channels, Data, false). -remove_channels_in_list([], Data) -> +remove_channels_in_list([], Data, _KeepInChannelMap) -> Data; -remove_channels_in_list([ChannelID | Rest], Data) -> +remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) -> + AddedChannelsMap = Data#data.added_channels, + NewAddedChannelsMap = + case KeepInChannelMap of + true -> + AddedChannelsMap; + false -> + maybe_clear_alarm(ChannelID), + maps:remove(ChannelID, AddedChannelsMap) + end, case emqx_resource:call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, - NewAddedChannelsMap = maps:remove(ChannelID, AddedChannelsMap), NewData = Data#data{ state = NewState, added_channels = NewAddedChannelsMap }, - remove_channels_in_list(Rest, NewData); + remove_channels_in_list(Rest, NewData, KeepInChannelMap); {error, Reason} -> ?SLOG(warning, #{ msg => remove_channel_failed, @@ -634,7 +657,10 @@ remove_channels_in_list([ChannelID | Rest], Data) -> channel_id => ChannelID, reason => Reason }), - remove_channels_in_list(Rest, Data) + NewData = Data#data{ + added_channels = NewAddedChannelsMap + }, + remove_channels_in_list(Rest, NewData, KeepInChannelMap) end. make_test_id() -> @@ -643,24 +669,21 @@ make_test_id() -> handle_add_channel(From, Data, ChannelId, ChannelConfig) -> Channels = Data#data.added_channels, - case maps:get(ChannelId, Channels, false) of - true -> - %% The channel is already installed in the connector state - %% We don't need to install it again - {keep_state_and_data, [{reply, From, ok}]}; - false -> + case maps:get(ChannelId, Channels, {error, not_added}) of + {error, _Reason} -> %% The channel is not installed in the connector state %% We need to install it - handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) + handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig); + _ -> + %% The channel is already installed in the connector state + %% We don't need to install it again + {keep_state_and_data, [{reply, From, ok}]} end. handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) -> - case add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) of - {ok, NewData} -> - {keep_state, NewData, [{reply, From, ok}]}; - {error, _Reason} = Error -> - {keep_state_and_data, [{reply, From, Error}]} - end. + NewData = add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig), + %% Trigger a health check to raise alarm if channel is not healthy + {keep_state, NewData, [{reply, From, ok}, {state_timeout, 0, health_check}]}. add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) -> case @@ -670,30 +693,38 @@ add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) -> of {ok, NewState} -> AddedChannelsMap = Data#data.added_channels, - NewAddedChannelsMap = maps:put(ChannelId, true, AddedChannelsMap), + %% Setting channel status to connecting to indicate that an health check + %% has not been performed yet + NewAddedChannelsMap = maps:put(ChannelId, connecting, AddedChannelsMap), UpdatedData = Data#data{ state = NewState, added_channels = NewAddedChannelsMap }, - {ok, update_state(UpdatedData, Data)}; - {error, Reason} = Error -> - %% Log the error as a warning - ?SLOG(warning, #{ - msg => add_channel_failed, - id => Data#data.id, - channel_id => ChannelId, - reason => Reason - }), - Error + update_state(UpdatedData, Data); + {error, _Reason} = Error -> + ChannelsMap = Data#data.added_channels, + NewChannelsMap = maps:put(ChannelId, Error, ChannelsMap), + UpdatedData = Data#data{ + added_channels = NewChannelsMap + }, + update_state(UpdatedData, Data) end. handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, - case maps:get(ChannelId, Channels, false) of - false -> - %% The channel is already not installed in the connector state - {keep_state_and_data, [{reply, From, ok}]}; - true -> + case maps:get(ChannelId, Channels, {error, not_added}) of + {error, _} -> + %% The channel is already not installed in the connector state. + %% We still need to remove it from the added_channels map + AddedChannels = Data#data.added_channels, + NewAddedChannels = maps:remove(ChannelId, AddedChannels), + NewData = Data#data{ + added_channels = NewAddedChannels + }, + %% Deactivate alarm + _ = maybe_clear_alarm(ChannelId), + {keep_state, NewData, [{reply, From, ok}]}; + _ -> %% The channel is installed in the connector state handle_remove_channel_exists(From, ChannelId, Data) end. @@ -728,7 +759,7 @@ handle_manually_health_check(From, Data) -> Data, fun(Status, UpdatedData) -> Actions = [{reply, From, {ok, Status}}], - {next_state, Status, UpdatedData, Actions} + {next_state, Status, channels_health_check(Status, UpdatedData), Actions} end ). @@ -736,47 +767,18 @@ handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) {keep_state_and_data, [{reply, From, {ok, disconnected}}]}; handle_manually_channel_health_check( From, - #data{added_channels = Channels} = Data, + #data{added_channels = Channels} = _Data, ChannelId ) when is_map_key(ChannelId, Channels) -> - {keep_state_and_data, [{reply, From, get_channel_status_channel_added(Data, ChannelId)}]}; + {keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]}; handle_manually_channel_health_check( From, - Data, - ChannelId + _Data, + _ChannelId ) -> - %% add channel - ResId = Data#data.id, - Mod = Data#data.mod, - case emqx_resource:call_get_channel_config(ResId, ChannelId, Mod) of - ChannelConfig when is_map(ChannelConfig) -> - case add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) of - {ok, UpdatedData} -> - {keep_state, UpdatedData, [ - {reply, From, get_channel_status_channel_added(UpdatedData, ChannelId)} - ]}; - {error, Reason} = Error -> - %% Log the error as a warning - ?SLOG(warning, #{ - msg => add_channel_failed_when_doing_status_check, - id => ResId, - channel_id => ChannelId, - reason => Reason - }), - {keep_state_and_data, [{reply, From, Error}]} - end; - {error, Reason} = Error -> - %% Log the error as a warning - ?SLOG(warning, #{ - msg => get_channel_config_failed_when_doing_status_check, - id => ResId, - channel_id => ChannelId, - reason => Reason - }), - {keep_state_and_data, [{reply, From, Error}]} - end. + {keep_state_and_data, [{reply, From, {error, channel_not_found}}]}. get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) -> emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State). @@ -786,11 +788,12 @@ handle_connecting_health_check(Data) -> Data, fun (connected, UpdatedData) -> - {next_state, connected, UpdatedData}; + {next_state, connected, channels_health_check(connected, UpdatedData)}; (connecting, UpdatedData) -> - {keep_state, UpdatedData, health_check_actions(UpdatedData)}; + {keep_state, channels_health_check(connecting, UpdatedData), + health_check_actions(UpdatedData)}; (disconnected, UpdatedData) -> - {next_state, disconnected, UpdatedData} + {next_state, disconnected, channels_health_check(disconnected, UpdatedData)} end ). @@ -799,14 +802,15 @@ handle_connected_health_check(Data) -> Data, fun (connected, UpdatedData) -> - {keep_state, UpdatedData, health_check_actions(UpdatedData)}; + {keep_state, channels_health_check(connected, UpdatedData), + health_check_actions(UpdatedData)}; (Status, UpdatedData) -> ?SLOG(warning, #{ msg => "health_check_failed", id => Data#data.id, status => Status }), - {next_state, Status, UpdatedData} + {next_state, Status, channels_health_check(Status, UpdatedData)} end ). @@ -823,6 +827,126 @@ with_health_check(#data{error = PrevError} = Data, Func) -> }, Func(Status, update_state(UpdatedData, Data)). +channels_health_check(connected = _ResourceStatus, Data0) -> + Channels = maps:to_list(Data0#data.added_channels), + %% All channels with an error status are considered not added + ChannelsNotAdded = [ + ChannelId + || {ChannelId, Status} <- Channels, + not is_channel_added(Status) + ], + %% Attempt to add channels that are not added + ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded), + Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0), + %% Now that we have done the adding, we can get the status of all channels + Data2 = channel_status_for_all_channels(Data1), + update_state(Data2, Data0); +channels_health_check(ResourceStatus, Data0) -> + %% Whenever the resource is not connected: + %% 1. Remove all added channels + %% 2. Change the status to an error status + %% 3. Raise alarms + Channels = Data0#data.added_channels, + ChannelsToRemove = [ + ChannelId + || {ChannelId, Status} <- maps:to_list(Channels), + is_channel_added(Status) + ], + Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true), + ChannelsWithNewAndOldStatuses = + [ + {ChannelId, OldStatus, + {error, resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1)}} + || {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels) + ], + %% Raise alarms + _ = lists:foreach( + fun({ChannelId, OldStatus, NewStatus}) -> + _ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus) + end, + ChannelsWithNewAndOldStatuses + ), + %% Update the channels map + NewChannels = lists:foldl( + fun({ChannelId, _, NewStatus}, Acc) -> + maps:put(ChannelId, NewStatus, Acc) + end, + Channels, + ChannelsWithNewAndOldStatuses + ), + Data2 = Data1#data{added_channels = NewChannels}, + update_state(Data2, Data0). + +resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) -> + ResourceId = Data1#data.id, + iolist_to_binary( + io_lib:format( + "Resource ~s for channel ~s is not connected. " + "Resource status: ~p", + [ + ResourceId, + ChannelId, + ResourceStatus + ] + ) + ). + +channel_status_for_all_channels(Data) -> + Channels = maps:to_list(Data#data.added_channels), + AddedChannelsWithOldAndNewStatus = [ + {ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)} + || {ChannelId, OldStatus} <- Channels, + is_channel_added(OldStatus) + ], + %% Remove the added channels with a new error statuses + ChannelsToRemove = [ + ChannelId + || {ChannelId, _, {error, _}} <- AddedChannelsWithOldAndNewStatus + ], + Data1 = remove_channels_in_list(ChannelsToRemove, Data, true), + %% Raise/clear alarms + lists:foreach( + fun + ({ID, _OldStatus, connected}) -> + _ = maybe_clear_alarm(ID); + ({ID, OldStatus, NewStatus}) -> + _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) + end, + AddedChannelsWithOldAndNewStatus + ), + %% Update the ChannelsMap + ChannelsMap = Data1#data.added_channels, + NewChannelsMap = + lists:foldl( + fun({ChannelId, _, NewStatus}, Acc) -> + maps:put(ChannelId, NewStatus, Acc) + end, + ChannelsMap, + AddedChannelsWithOldAndNewStatus + ), + Data1#data{added_channels = NewChannelsMap}. + +is_channel_added({error, _}) -> + false; +is_channel_added(_) -> + true. + +get_config_for_channels(Data0, ChannelsWithoutConfig) -> + ResId = Data0#data.id, + Mod = Data0#data.mod, + Channels = emqx_resource:call_get_channels(ResId, Mod), + ChannelIdToConfig = maps:from_list(Channels), + ChannelsWithConfig = [ + {Id, maps:get(Id, ChannelIdToConfig, no_config)} + || Id <- ChannelsWithoutConfig + ], + %% Filter out channels without config + [ + ChConf + || {_Id, Conf} = ChConf <- ChannelsWithConfig, + Conf =/= no_config + ]. + update_state(Data) -> update_state(Data, undefined). @@ -846,7 +970,8 @@ maybe_alarm(_Status, ResId, Error, _PrevError) -> HrError = case Error of {error, undefined} -> <<"Unknown reason">>; - {error, Reason} -> emqx_utils:readable_error_msg(Reason) + {error, Reason} -> emqx_utils:readable_error_msg(Reason); + Error -> emqx_utils:readable_error_msg(Error) end, emqx_alarm:safe_activate( ResId,