fix: periodical status checks and alarms for channels

This commit is contained in:
Kjell Winblad 2023-10-25 09:56:15 +02:00 committed by Zaiming (Stone) Shi
parent 3cab31261e
commit 477ed11de8
6 changed files with 349 additions and 100 deletions

View File

@ -30,6 +30,9 @@ con_type() ->
con_name() -> con_name() ->
my_connector. my_connector.
connector_resource_id() ->
emqx_connector_resource:resource_id(con_type(), con_name()).
bridge_type() -> bridge_type() ->
test_bridge_type. test_bridge_type.
@ -48,7 +51,13 @@ con_schema() ->
]. ].
con_config() -> con_config() ->
#{}. #{
<<"enable">> => true,
<<"resource_opts">> => #{
%% Set this to a low value to make the test run faster
<<"health_check_interval">> => 100
}
}.
bridge_schema() -> bridge_schema() ->
[ [
@ -66,9 +75,17 @@ bridge_schema() ->
bridge_config() -> bridge_config() ->
#{ #{
<<"connector">> => atom_to_binary(con_name()) <<"connector">> => atom_to_binary(con_name()),
<<"enable">> => true,
<<"send_to">> => registered_process_name(),
<<"resource_opts">> => #{
<<"resume_interval">> => 100
}
}. }.
registered_process_name() ->
my_registered_process.
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -179,3 +196,100 @@ t_is_valid_bridge_v1(_) ->
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2), {ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2),
false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge), false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
ok. ok.
t_manual_health_check(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
%% Run a health check for the bridge
connected = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_manual_health_check_exception(_) ->
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> throw(my_error) end},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_manual_health_check_exception_error(_) ->
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> error(my_error) end},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_manual_health_check_error(_) ->
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> {error, my_error} end},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_send_message(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
%% Register name for this process
register(registered_process_name(), self()),
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{}),
receive
<<"my_msg">> ->
ok
after 1000 ->
ct:fail("Failed to receive message")
end,
unregister(registered_process_name()),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_send_message_unhealthy_channel(_) ->
OnGetStatusResponseETS = ets:new(on_get_status_response_ets, [public]),
ets:insert(OnGetStatusResponseETS, {status_value, {error, my_error}}),
OnGetStatusFun = fun() -> ets:lookup_element(OnGetStatusResponseETS, status_value, 2) end,
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => OnGetStatusFun},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Register name for this process
register(registered_process_name(), self()),
_ = emqx_bridge_v2:send_message(bridge_type(), my_test_bridge, <<"my_msg">>, #{timeout => 1}),
receive
Any ->
ct:pal("Received message: ~p", [Any]),
ct:fail("Should not get message here")
after 1 ->
ok
end,
%% Sending should work again after the channel is healthy
ets:insert(OnGetStatusResponseETS, {status_value, connected}),
_ = emqx_bridge_v2:send_message(
bridge_type(),
my_test_bridge,
<<"my_msg">>,
#{resume_interval => 100}
),
receive
<<"my_msg">> ->
ok
after 10000 ->
ct:fail("Failed to receive message")
end,
unregister(registered_process_name()),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
t_unhealthy_channel_alarm(_) ->
Conf = (bridge_config())#{<<"on_get_channel_status_fun">> => fun() -> {error, my_error} end},
0 = get_bridge_v2_alarm_cnt(),
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
1 = get_bridge_v2_alarm_cnt(),
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
0 = get_bridge_v2_alarm_cnt(),
ok.
get_bridge_v2_alarm_cnt() ->
Alarms = emqx_alarm:get_alarms(activated),
FilterFun = fun
(#{name := S}) when is_binary(S) -> string:find(S, "bridge_v2") =/= nomatch;
(_) -> false
end,
length(lists:filter(FilterFun, Alarms)).

View File

@ -73,10 +73,19 @@ on_remove_channel(
on_query( on_query(
_InstId, _InstId,
{_MessageTag, _Message}, {ChannelId, Message},
_ConnectorState ConnectorState
) -> ) ->
throw(not_implemented). Channels = maps:get(channels, ConnectorState, #{}),
%% Lookup the channel
ChannelState = maps:get(ChannelId, Channels, not_found),
case ChannelState of
not_found -> throw(<<"Channel not active">>);
_ -> ok
end,
SendTo = maps:get(send_to, ChannelState),
SendTo ! Message,
ok.
on_get_channels(ResId) -> on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId). emqx_bridge_v2:get_channels_for_connector(ResId).

View File

@ -150,7 +150,8 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
{error, {active_channels, Channels}} {error, {active_channels, Channels}}
end; end;
post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) -> post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) ->
ok = emqx_connector_resource:create(Type, Name, NewConf), ResOpts = emqx_resource:fetch_creation_opts(NewConf),
ok = emqx_connector_resource:create(Type, Name, NewConf, ResOpts),
?tp(connector_post_config_update_done, #{}), ?tp(connector_post_config_update_done, #{}),
ok; ok;
post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) -> post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->

View File

@ -22,6 +22,7 @@
-type resource_spec() :: map(). -type resource_spec() :: map().
-type resource_state() :: term(). -type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting | stopped. -type resource_status() :: connected | disconnected | connecting | stopped.
-type channel_status() :: connected | connecting.
-type callback_mode() :: always_sync | async_if_possible. -type callback_mode() :: always_sync | async_if_possible.
-type query_mode() :: -type query_mode() ::
simple_sync simple_sync

View File

@ -527,9 +527,8 @@ call_health_check(ResId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)). ?SAFE_CALL(Mod:on_get_status(ResId, ResourceState)).
-spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) -> -spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) ->
resource_status() channel_status()
| {resource_status()} | {channel_status(), term()}
| {resource_status(), term()}
| {error, term()}. | {error, term()}.
call_channel_health_check(ResId, ChannelId, Mod, ResourceState) -> call_channel_health_check(ResId, ChannelId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)). ?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)).

View File

@ -308,18 +308,13 @@ health_check(ResId) ->
-spec channel_health_check(resource_id(), channel_id()) -> -spec channel_health_check(resource_id(), channel_id()) ->
{ok, resource_status()} | {error, term()}. {ok, resource_status()} | {error, term()}.
channel_health_check(ResId, ChannelId) -> channel_health_check(ResId, ChannelId) ->
%% Do normal health check first to trigger health checks for channels
%% and update the cached health status for the channels
_ = health_check(ResId),
safe_call(ResId, {channel_health_check, ChannelId}, ?T_OPERATION). safe_call(ResId, {channel_health_check, ChannelId}, ?T_OPERATION).
add_channel(ResId, ChannelId, Config) -> add_channel(ResId, ChannelId, Config) ->
%% Use cache to avoid doing inter process communication on every call safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION).
Data = read_cache(ResId),
AddedChannels = Data#data.added_channels,
case maps:get(ChannelId, AddedChannels, false) of
true ->
ok;
false ->
safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION)
end.
remove_channel(ResId, ChannelId) -> remove_channel(ResId, ChannelId) ->
safe_call(ResId, {remove_channel, ChannelId}, ?T_OPERATION). safe_call(ResId, {remove_channel, ChannelId}, ?T_OPERATION).
@ -448,7 +443,7 @@ handle_event(state_timeout, auto_retry, disconnected, Data) ->
handle_event(enter, _OldState, stopped = State, Data) -> handle_event(enter, _OldState, stopped = State, Data) ->
ok = log_state_consistency(State, Data), ok = log_state_consistency(State, Data),
{keep_state_and_data, []}; {keep_state_and_data, []};
% Ignore all other events %% The following events can be handled in any state
handle_event( handle_event(
{call, From}, {add_channel, ChannelId, Config}, _State, Data {call, From}, {add_channel, ChannelId, Config}, _State, Data
) -> ) ->
@ -462,6 +457,7 @@ handle_event(
) -> ) ->
Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod), Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod),
{keep_state_and_data, {reply, From, {ok, Channels}}}; {keep_state_and_data, {reply, From, {ok, Channels}}};
% Ignore all other events
handle_event(EventType, EventData, State, Data) -> handle_event(EventType, EventData, State, Data) ->
?SLOG( ?SLOG(
error, error,
@ -557,8 +553,19 @@ start_resource(Data, From) ->
end. end.
add_channels(Data) -> add_channels(Data) ->
%% Add channels to the Channels map but not to the resource state
%% Channels will be added to the resouce state after the initial health_check
%% if that succeeds.
ChannelIDConfigTuples = emqx_resource:call_get_channels(Data#data.id, Data#data.mod), ChannelIDConfigTuples = emqx_resource:call_get_channels(Data#data.id, Data#data.mod),
add_channels_in_list(ChannelIDConfigTuples, Data). Channels = Data#data.added_channels,
NewChannels = lists:foldl(
fun({ChannelID, _Conf}, Acc) ->
maps:put(ChannelID, {error, connecting}, Acc)
end,
Channels,
ChannelIDConfigTuples
),
Data#data{added_channels = NewChannels}.
add_channels_in_list([], Data) -> add_channels_in_list([], Data) ->
Data; Data;
@ -570,20 +577,29 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
of of
{ok, NewState} -> {ok, NewState} ->
AddedChannelsMap = Data#data.added_channels, AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put(ChannelID, true, AddedChannelsMap), %% Set the channel status to connecting to indicate that
%% we have not yet performed the initial health_check
NewAddedChannelsMap = maps:put(ChannelID, connecting, AddedChannelsMap),
NewData = Data#data{ NewData = Data#data{
state = NewState, state = NewState,
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
}, },
add_channels_in_list(Rest, NewData); add_channels_in_list(Rest, NewData);
{error, Reason} -> {error, Reason} = Error ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => add_channel_failed, msg => add_channel_failed,
id => Data#data.id, id => Data#data.id,
channel_id => ChannelID, channel_id => ChannelID,
reason => Reason reason => Reason
}), }),
add_channels_in_list(Rest, Data) AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put(ChannelID, Error, AddedChannelsMap),
NewData = Data#data{
added_channels = NewAddedChannelsMap
},
%% Raise an alarm since the channel could not be added
_ = maybe_alarm(disconnected, ChannelID, Error, no_prev_error),
add_channels_in_list(Rest, NewData)
end. end.
maybe_stop_resource(#data{status = Status} = Data) when Status /= stopped -> maybe_stop_resource(#data{status = Status} = Data) when Status /= stopped ->
@ -611,22 +627,29 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
remove_channels(Data) -> remove_channels(Data) ->
Channels = maps:keys(Data#data.added_channels), Channels = maps:keys(Data#data.added_channels),
remove_channels_in_list(Channels, Data). remove_channels_in_list(Channels, Data, false).
remove_channels_in_list([], Data) -> remove_channels_in_list([], Data, _KeepInChannelMap) ->
Data; Data;
remove_channels_in_list([ChannelID | Rest], Data) -> remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap =
case KeepInChannelMap of
true ->
AddedChannelsMap;
false ->
maybe_clear_alarm(ChannelID),
maps:remove(ChannelID, AddedChannelsMap)
end,
case case
emqx_resource:call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) emqx_resource:call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID)
of of
{ok, NewState} -> {ok, NewState} ->
AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:remove(ChannelID, AddedChannelsMap),
NewData = Data#data{ NewData = Data#data{
state = NewState, state = NewState,
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
}, },
remove_channels_in_list(Rest, NewData); remove_channels_in_list(Rest, NewData, KeepInChannelMap);
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => remove_channel_failed, msg => remove_channel_failed,
@ -634,7 +657,10 @@ remove_channels_in_list([ChannelID | Rest], Data) ->
channel_id => ChannelID, channel_id => ChannelID,
reason => Reason reason => Reason
}), }),
remove_channels_in_list(Rest, Data) NewData = Data#data{
added_channels = NewAddedChannelsMap
},
remove_channels_in_list(Rest, NewData, KeepInChannelMap)
end. end.
make_test_id() -> make_test_id() ->
@ -643,24 +669,21 @@ make_test_id() ->
handle_add_channel(From, Data, ChannelId, ChannelConfig) -> handle_add_channel(From, Data, ChannelId, ChannelConfig) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
case maps:get(ChannelId, Channels, false) of case maps:get(ChannelId, Channels, {error, not_added}) of
true -> {error, _Reason} ->
%% The channel is already installed in the connector state
%% We don't need to install it again
{keep_state_and_data, [{reply, From, ok}]};
false ->
%% The channel is not installed in the connector state %% The channel is not installed in the connector state
%% We need to install it %% We need to install it
handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig);
_ ->
%% The channel is already installed in the connector state
%% We don't need to install it again
{keep_state_and_data, [{reply, From, ok}]}
end. end.
handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) -> handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) ->
case add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) of NewData = add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig),
{ok, NewData} -> %% Trigger a health check to raise alarm if channel is not healthy
{keep_state, NewData, [{reply, From, ok}]}; {keep_state, NewData, [{reply, From, ok}, {state_timeout, 0, health_check}]}.
{error, _Reason} = Error ->
{keep_state_and_data, [{reply, From, Error}]}
end.
add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) -> add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) ->
case case
@ -670,30 +693,38 @@ add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) ->
of of
{ok, NewState} -> {ok, NewState} ->
AddedChannelsMap = Data#data.added_channels, AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put(ChannelId, true, AddedChannelsMap), %% Setting channel status to connecting to indicate that an health check
%% has not been performed yet
NewAddedChannelsMap = maps:put(ChannelId, connecting, AddedChannelsMap),
UpdatedData = Data#data{ UpdatedData = Data#data{
state = NewState, state = NewState,
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
}, },
{ok, update_state(UpdatedData, Data)}; update_state(UpdatedData, Data);
{error, Reason} = Error -> {error, _Reason} = Error ->
%% Log the error as a warning ChannelsMap = Data#data.added_channels,
?SLOG(warning, #{ NewChannelsMap = maps:put(ChannelId, Error, ChannelsMap),
msg => add_channel_failed, UpdatedData = Data#data{
id => Data#data.id, added_channels = NewChannelsMap
channel_id => ChannelId, },
reason => Reason update_state(UpdatedData, Data)
}),
Error
end. end.
handle_remove_channel(From, ChannelId, Data) -> handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
case maps:get(ChannelId, Channels, false) of case maps:get(ChannelId, Channels, {error, not_added}) of
false -> {error, _} ->
%% The channel is already not installed in the connector state %% The channel is already not installed in the connector state.
{keep_state_and_data, [{reply, From, ok}]}; %% We still need to remove it from the added_channels map
true -> AddedChannels = Data#data.added_channels,
NewAddedChannels = maps:remove(ChannelId, AddedChannels),
NewData = Data#data{
added_channels = NewAddedChannels
},
%% Deactivate alarm
_ = maybe_clear_alarm(ChannelId),
{keep_state, NewData, [{reply, From, ok}]};
_ ->
%% The channel is installed in the connector state %% The channel is installed in the connector state
handle_remove_channel_exists(From, ChannelId, Data) handle_remove_channel_exists(From, ChannelId, Data)
end. end.
@ -728,7 +759,7 @@ handle_manually_health_check(From, Data) ->
Data, Data,
fun(Status, UpdatedData) -> fun(Status, UpdatedData) ->
Actions = [{reply, From, {ok, Status}}], Actions = [{reply, From, {ok, Status}}],
{next_state, Status, UpdatedData, Actions} {next_state, Status, channels_health_check(Status, UpdatedData), Actions}
end end
). ).
@ -736,47 +767,18 @@ handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId)
{keep_state_and_data, [{reply, From, {ok, disconnected}}]}; {keep_state_and_data, [{reply, From, {ok, disconnected}}]};
handle_manually_channel_health_check( handle_manually_channel_health_check(
From, From,
#data{added_channels = Channels} = Data, #data{added_channels = Channels} = _Data,
ChannelId ChannelId
) when ) when
is_map_key(ChannelId, Channels) is_map_key(ChannelId, Channels)
-> ->
{keep_state_and_data, [{reply, From, get_channel_status_channel_added(Data, ChannelId)}]}; {keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
handle_manually_channel_health_check( handle_manually_channel_health_check(
From, From,
Data, _Data,
ChannelId _ChannelId
) -> ) ->
%% add channel {keep_state_and_data, [{reply, From, {error, channel_not_found}}]}.
ResId = Data#data.id,
Mod = Data#data.mod,
case emqx_resource:call_get_channel_config(ResId, ChannelId, Mod) of
ChannelConfig when is_map(ChannelConfig) ->
case add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) of
{ok, UpdatedData} ->
{keep_state, UpdatedData, [
{reply, From, get_channel_status_channel_added(UpdatedData, ChannelId)}
]};
{error, Reason} = Error ->
%% Log the error as a warning
?SLOG(warning, #{
msg => add_channel_failed_when_doing_status_check,
id => ResId,
channel_id => ChannelId,
reason => Reason
}),
{keep_state_and_data, [{reply, From, Error}]}
end;
{error, Reason} = Error ->
%% Log the error as a warning
?SLOG(warning, #{
msg => get_channel_config_failed_when_doing_status_check,
id => ResId,
channel_id => ChannelId,
reason => Reason
}),
{keep_state_and_data, [{reply, From, Error}]}
end.
get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) -> get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) ->
emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State). emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State).
@ -786,11 +788,12 @@ handle_connecting_health_check(Data) ->
Data, Data,
fun fun
(connected, UpdatedData) -> (connected, UpdatedData) ->
{next_state, connected, UpdatedData}; {next_state, connected, channels_health_check(connected, UpdatedData)};
(connecting, UpdatedData) -> (connecting, UpdatedData) ->
{keep_state, UpdatedData, health_check_actions(UpdatedData)}; {keep_state, channels_health_check(connecting, UpdatedData),
health_check_actions(UpdatedData)};
(disconnected, UpdatedData) -> (disconnected, UpdatedData) ->
{next_state, disconnected, UpdatedData} {next_state, disconnected, channels_health_check(disconnected, UpdatedData)}
end end
). ).
@ -799,14 +802,15 @@ handle_connected_health_check(Data) ->
Data, Data,
fun fun
(connected, UpdatedData) -> (connected, UpdatedData) ->
{keep_state, UpdatedData, health_check_actions(UpdatedData)}; {keep_state, channels_health_check(connected, UpdatedData),
health_check_actions(UpdatedData)};
(Status, UpdatedData) -> (Status, UpdatedData) ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "health_check_failed", msg => "health_check_failed",
id => Data#data.id, id => Data#data.id,
status => Status status => Status
}), }),
{next_state, Status, UpdatedData} {next_state, Status, channels_health_check(Status, UpdatedData)}
end end
). ).
@ -823,6 +827,126 @@ with_health_check(#data{error = PrevError} = Data, Func) ->
}, },
Func(Status, update_state(UpdatedData, Data)). Func(Status, update_state(UpdatedData, Data)).
channels_health_check(connected = _ResourceStatus, Data0) ->
Channels = maps:to_list(Data0#data.added_channels),
%% All channels with an error status are considered not added
ChannelsNotAdded = [
ChannelId
|| {ChannelId, Status} <- Channels,
not is_channel_added(Status)
],
%% Attempt to add channels that are not added
ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded),
Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
%% Now that we have done the adding, we can get the status of all channels
Data2 = channel_status_for_all_channels(Data1),
update_state(Data2, Data0);
channels_health_check(ResourceStatus, Data0) ->
%% Whenever the resource is not connected:
%% 1. Remove all added channels
%% 2. Change the status to an error status
%% 3. Raise alarms
Channels = Data0#data.added_channels,
ChannelsToRemove = [
ChannelId
|| {ChannelId, Status} <- maps:to_list(Channels),
is_channel_added(Status)
],
Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true),
ChannelsWithNewAndOldStatuses =
[
{ChannelId, OldStatus,
{error, resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1)}}
|| {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels)
],
%% Raise alarms
_ = lists:foreach(
fun({ChannelId, OldStatus, NewStatus}) ->
_ = maybe_alarm(NewStatus, ChannelId, NewStatus, OldStatus)
end,
ChannelsWithNewAndOldStatuses
),
%% Update the channels map
NewChannels = lists:foldl(
fun({ChannelId, _, NewStatus}, Acc) ->
maps:put(ChannelId, NewStatus, Acc)
end,
Channels,
ChannelsWithNewAndOldStatuses
),
Data2 = Data1#data{added_channels = NewChannels},
update_state(Data2, Data0).
resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
ResourceId = Data1#data.id,
iolist_to_binary(
io_lib:format(
"Resource ~s for channel ~s is not connected. "
"Resource status: ~p",
[
ResourceId,
ChannelId,
ResourceStatus
]
)
).
channel_status_for_all_channels(Data) ->
Channels = maps:to_list(Data#data.added_channels),
AddedChannelsWithOldAndNewStatus = [
{ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)}
|| {ChannelId, OldStatus} <- Channels,
is_channel_added(OldStatus)
],
%% Remove the added channels with a new error statuses
ChannelsToRemove = [
ChannelId
|| {ChannelId, _, {error, _}} <- AddedChannelsWithOldAndNewStatus
],
Data1 = remove_channels_in_list(ChannelsToRemove, Data, true),
%% Raise/clear alarms
lists:foreach(
fun
({ID, _OldStatus, connected}) ->
_ = maybe_clear_alarm(ID);
({ID, OldStatus, NewStatus}) ->
_ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
end,
AddedChannelsWithOldAndNewStatus
),
%% Update the ChannelsMap
ChannelsMap = Data1#data.added_channels,
NewChannelsMap =
lists:foldl(
fun({ChannelId, _, NewStatus}, Acc) ->
maps:put(ChannelId, NewStatus, Acc)
end,
ChannelsMap,
AddedChannelsWithOldAndNewStatus
),
Data1#data{added_channels = NewChannelsMap}.
is_channel_added({error, _}) ->
false;
is_channel_added(_) ->
true.
get_config_for_channels(Data0, ChannelsWithoutConfig) ->
ResId = Data0#data.id,
Mod = Data0#data.mod,
Channels = emqx_resource:call_get_channels(ResId, Mod),
ChannelIdToConfig = maps:from_list(Channels),
ChannelsWithConfig = [
{Id, maps:get(Id, ChannelIdToConfig, no_config)}
|| Id <- ChannelsWithoutConfig
],
%% Filter out channels without config
[
ChConf
|| {_Id, Conf} = ChConf <- ChannelsWithConfig,
Conf =/= no_config
].
update_state(Data) -> update_state(Data) ->
update_state(Data, undefined). update_state(Data, undefined).
@ -846,7 +970,8 @@ maybe_alarm(_Status, ResId, Error, _PrevError) ->
HrError = HrError =
case Error of case Error of
{error, undefined} -> <<"Unknown reason">>; {error, undefined} -> <<"Unknown reason">>;
{error, Reason} -> emqx_utils:readable_error_msg(Reason) {error, Reason} -> emqx_utils:readable_error_msg(Reason);
Error -> emqx_utils:readable_error_msg(Error)
end, end,
emqx_alarm:safe_activate( emqx_alarm:safe_activate(
ResId, ResId,