Merge pull request #11857 from kjellwinblad/kjell/shared_con/EMQX-11270

fix(bridge_v2): channels should not be removed when status is connecting
This commit is contained in:
Kjell Winblad 2023-11-01 16:46:47 +01:00 committed by GitHub
commit ec2d339355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 369 additions and 109 deletions

View File

@ -173,20 +173,24 @@ lookup(Type, Name) ->
Channels = maps:get(added_channels, InstanceData, #{}),
BridgeV2Id = id(Type, Name, BridgeConnector),
ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
DisplayBridgeV2Status =
{DisplayBridgeV2Status, ErrorMsg} =
case ChannelStatus of
{error, undefined} -> <<"Unknown reason">>;
{error, Reason} -> emqx_utils:readable_error_msg(Reason);
connected -> <<"connected">>;
connecting -> <<"connecting">>;
Error -> emqx_utils:readable_error_msg(Error)
#{status := connected} ->
{connected, <<"">>};
#{status := Status, error := undefined} ->
{Status, <<"Unknown reason">>};
#{status := Status, error := Error} ->
{Status, emqx_utils:readable_error_msg(Error)};
undefined ->
{disconnected, <<"Pending installation">>}
end,
{ok, #{
type => Type,
name => Name,
raw_config => RawConf,
resource_data => InstanceData,
status => DisplayBridgeV2Status
status => DisplayBridgeV2Status,
error => ErrorMsg
}}
end.
@ -526,10 +530,10 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
ConnectorId, ChannelTestId
),
case HealthCheckResult of
{error, Reason} ->
{error, Reason};
_ ->
ok
#{status := connected} ->
ok;
#{status := Status, error := Error} ->
{error, {Status, Error}}
end
end
end,
@ -1032,6 +1036,7 @@ lookup_and_transform_to_bridge_v1_helper(
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2),
BridgeV1 = maps:remove(status, BridgeV1Tmp),
BridgeV2Status = maps:get(status, BridgeV2, undefined),
BridgeV2Error = maps:get(error, BridgeV2, undefined),
ResourceData1 = maps:get(resource_data, BridgeV1, #{}),
%% Replace id in resouce data
BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>,
@ -1040,12 +1045,12 @@ lookup_and_transform_to_bridge_v1_helper(
case ConnectorStatus of
connected ->
case BridgeV2Status of
<<"connected">> ->
connected ->
%% No need to modify the status
{ok, BridgeV1#{resource_data => ResourceData2}};
NotConnected ->
ResourceData3 = maps:put(status, connecting, ResourceData2),
ResourceData4 = maps:put(error, NotConnected, ResourceData3),
ResourceData3 = maps:put(status, NotConnected, ResourceData2),
ResourceData4 = maps:put(error, BridgeV2Error, ResourceData3),
BridgeV1Final = maps:put(resource_data, ResourceData4, BridgeV1),
{ok, BridgeV1Final}
end;

View File

@ -238,12 +238,12 @@ t_create_dry_run_fail_add_channel(_) ->
{error, Msg}
end),
Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
OnAddChannel2 = wrap_fun(fun() ->
throw(Msg)
end),
Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
ok.
t_create_dry_run_fail_get_channel_status(_) ->
@ -252,7 +252,7 @@ t_create_dry_run_fail_get_channel_status(_) ->
{error, Msg}
end),
Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
{error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
Fun2 = wrap_fun(fun() ->
throw(Msg)
end),
@ -280,7 +280,9 @@ t_is_valid_bridge_v1(_) ->
t_manual_health_check(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
%% Run a health check for the bridge
connected = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
#{error := undefined, status := connected} = emqx_bridge_v2:health_check(
bridge_type(), my_test_bridge
),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
@ -290,7 +292,9 @@ t_manual_health_check_exception(_) ->
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
#{error := my_error, status := disconnected} = emqx_bridge_v2:health_check(
bridge_type(), my_test_bridge
),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
@ -300,7 +304,9 @@ t_manual_health_check_exception_error(_) ->
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
#{error := _, status := disconnected} = emqx_bridge_v2:health_check(
bridge_type(), my_test_bridge
),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
@ -310,7 +316,9 @@ t_manual_health_check_error(_) ->
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge
{error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge),
#{error := my_error, status := disconnected} = emqx_bridge_v2:health_check(
bridge_type(), my_test_bridge
),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok.
@ -484,6 +492,83 @@ t_send_message_unhealthy_connector(_) ->
ets:delete(ResponseETS),
ok.
t_connector_connected_to_connecting_to_connected_no_channel_restart(_) ->
ResponseETS = ets:new(response_ets, [public]),
ets:insert(ResponseETS, {on_start_value, conf}),
ets:insert(ResponseETS, {on_get_status_value, connected}),
OnStartFun = wrap_fun(fun(Conf) ->
case ets:lookup_element(ResponseETS, on_start_value, 2) of
conf ->
{ok, Conf};
V ->
V
end
end),
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(ResponseETS, on_get_status_value, 2)
end),
OnAddChannelCntr = counters:new(1, []),
OnAddChannelFun = wrap_fun(fun(_InstId, ConnectorState, _ChannelId, _ChannelConfig) ->
counters:add(OnAddChannelCntr, 1, 1),
{ok, ConnectorState}
end),
ConConfig = emqx_utils_maps:deep_merge(con_config(), #{
<<"on_start_fun">> => OnStartFun,
<<"on_get_status_fun">> => OnGetStatusFun,
<<"on_add_channel_fun">> => OnAddChannelFun,
<<"resource_opts">> => #{<<"start_timeout">> => 100}
}),
ConName = ?FUNCTION_NAME,
{ok, _} = emqx_connector:create(con_type(), ConName, ConConfig),
BridgeConf = (bridge_config())#{
<<"connector">> => atom_to_binary(ConName)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf),
%% Wait until on_add_channel_fun is called at least once
wait_until(fun() ->
counters:get(OnAddChannelCntr, 1) =:= 1
end),
1 = counters:get(OnAddChannelCntr, 1),
%% We change the status of the connector
ets:insert(ResponseETS, {on_get_status_value, connecting}),
%% Wait until the status is changed
wait_until(fun() ->
{ok, BridgeData} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
maps:get(status, BridgeData) =:= connecting
end),
{ok, BridgeData1} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
ct:pal("Bridge V2 status changed to: ~p", [maps:get(status, BridgeData1)]),
%% We change the status again back to connected
ets:insert(ResponseETS, {on_get_status_value, connected}),
%% Wait until the status is connected again
wait_until(fun() ->
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
maps:get(status, BridgeData2) =:= connected
end),
%% On add channel should not have been called again
1 = counters:get(OnAddChannelCntr, 1),
%% We change the status to an error
ets:insert(ResponseETS, {on_get_status_value, {error, my_error}}),
%% Wait until the status is changed
wait_until(fun() ->
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
maps:get(status, BridgeData2) =:= disconnected
end),
%% Now we go back to connected
ets:insert(ResponseETS, {on_get_status_value, connected}),
wait_until(fun() ->
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
maps:get(status, BridgeData2) =:= connected
end),
%% Now the channel should have been removed and added again
wait_until(fun() ->
counters:get(OnAddChannelCntr, 1) =:= 2
end),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok = emqx_connector:remove(con_type(), ConName),
ets:delete(ResponseETS),
ok.
t_unhealthy_channel_alarm(_) ->
Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> =>
@ -720,3 +805,20 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) ->
end
),
ok.
%% Helper Functions
wait_until(Fun) ->
wait_until(Fun, 5000).
wait_until(Fun, Timeout) when Timeout >= 0 ->
case Fun() of
true ->
ok;
false ->
IdleTime = 100,
timer:sleep(IdleTime),
wait_until(Fun, Timeout - IdleTime)
end;
wait_until(_, _) ->
ct:fail("Wait until event did not happen").

View File

@ -54,6 +54,14 @@ on_add_channel(
) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun();
on_add_channel(
InstId,
#{on_add_channel_fun := FunRef} = ConnectorState,
ChannelId,
ChannelConfig
) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun(InstId, ConnectorState, ChannelId, ChannelConfig);
on_add_channel(
_InstId,
State,
@ -118,8 +126,8 @@ on_get_channel_status(
ChannelId,
State
) ->
Channels = maps:get(channels, State),
ChannelState = maps:get(ChannelId, Channels),
Channels = maps:get(channels, State, #{}),
ChannelState = maps:get(ChannelId, Channels, #{}),
case ChannelState of
#{on_get_channel_status_fun := FunRef} ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),

View File

@ -570,7 +570,9 @@ t_nonexistent_topic(_Config) ->
erlang:list_to_atom(Type), erlang:list_to_atom(Name), Conf
),
% TODO: make sure the user facing APIs for Bridge V1 also get this error
{error, _} = emqx_bridge_v2:health_check(?BRIDGE_TYPE_V2, list_to_atom(Name)),
#{status := disconnected, error := #{error := unknown_kafka_topic}} = emqx_bridge_v2:health_check(
?BRIDGE_TYPE_V2, list_to_atom(Name)
),
ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)),
delete_all_bridges(),
ok.

View File

@ -119,7 +119,7 @@ t_health_check(_) ->
ConnectorConfig = connector_config(),
{ok, _} = emqx_connector:create(?TYPE, test_connector3, ConnectorConfig),
{ok, _} = emqx_bridge_v2:create(?TYPE, test_bridge_v2, BridgeV2Config),
connected = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2),
#{status := connected} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2),
ok = emqx_bridge_v2:remove(?TYPE, test_bridge_v2),
%% Check behaviour when bridge does not exist
{error, bridge_not_found} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2),

View File

@ -200,6 +200,7 @@
-callback on_get_channel_status(resource_id(), channel_id(), resource_state()) ->
channel_status()
| {channel_status(), Reason :: term()}
| {error, term()}.
-callback query_mode(Config :: term()) -> query_mode().
@ -457,7 +458,7 @@ health_check(ResId) ->
emqx_resource_manager:health_check(ResId).
-spec channel_health_check(resource_id(), channel_id()) ->
{ok, resource_status()} | {error, term()}.
#{status := channel_status(), error := term(), any() => any()}.
channel_health_check(ResId, ChannelId) ->
emqx_resource_manager:channel_health_check(ResId, ChannelId).
@ -534,6 +535,7 @@ call_health_check(ResId, Mod, ResourceState) ->
-spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) ->
channel_status()
| {channel_status(), Reason :: term()}
| {error, term()}.
call_channel_health_check(ResId, ChannelId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)).

View File

@ -1112,11 +1112,21 @@ is_channel_id(Id) ->
%% There is no need to query the conncector if the channel is not
%% installed as the query will fail anyway.
pre_query_channel_check({Id, _} = _Request, Channels) when
is_map_key(Id, Channels),
(map_get(Id, Channels) =:= connected orelse map_get(Id, Channels) =:= connecting)
is_map_key(Id, Channels)
->
ok;
ChannelStatus = maps:get(Id, Channels),
case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of
true ->
ok;
false ->
maybe_throw_channel_not_installed(Id)
end;
pre_query_channel_check({Id, _} = _Request, _Channels) ->
maybe_throw_channel_not_installed(Id);
pre_query_channel_check(_Request, _Channels) ->
ok.
maybe_throw_channel_not_installed(Id) ->
%% Fail with a recoverable error if the channel is not installed
%% so that the operation can be retried. It is emqx_resource_manager's
%% responsibility to ensure that the channel installation is retried.
@ -1128,9 +1138,7 @@ pre_query_channel_check({Id, _} = _Request, _Channels) ->
);
false ->
ok
end;
pre_query_channel_check(_Request, _Channels) ->
ok.
end.
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer

View File

@ -44,7 +44,8 @@
list_group/1,
lookup_cached/1,
get_metrics/1,
reset_metrics/1
reset_metrics/1,
channel_status_is_channel_added/1
]).
-export([
@ -306,7 +307,7 @@ health_check(ResId) ->
safe_call(ResId, health_check, ?T_OPERATION).
-spec channel_health_check(resource_id(), channel_id()) ->
{ok, resource_status()} | {error, term()}.
#{status := channel_status(), error := term(), any() => any()}.
channel_health_check(ResId, ChannelId) ->
%% Do normal health check first to trigger health checks for channels
%% and update the cached health status for the channels
@ -314,7 +315,10 @@ channel_health_check(ResId, ChannelId) ->
safe_call(ResId, {channel_health_check, ChannelId}, ?T_OPERATION).
add_channel(ResId, ChannelId, Config) ->
safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION).
Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION),
%% Wait for health_check to finish
_ = health_check(ResId),
Result.
remove_channel(ResId, ChannelId) ->
safe_call(ResId, {remove_channel, ChannelId}, ?T_OPERATION).
@ -420,6 +424,10 @@ handle_event(internal, start_resource, connecting, Data) ->
start_resource(Data, undefined);
handle_event(state_timeout, health_check, connecting, Data) ->
handle_connecting_health_check(Data);
handle_event(
{call, From}, {remove_channel, ChannelId}, connecting = _State, Data
) ->
handle_remove_channel(From, ChannelId, Data);
%% State: CONNECTED
%% The connected state is entered after a successful on_start/2 of the callback mod
%% and successful health_checks
@ -459,7 +467,7 @@ handle_event(
handle_event(
{call, From}, {remove_channel, ChannelId}, _State, Data
) ->
handle_not_connected_remove_channel(From, ChannelId, Data);
handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data);
handle_event(
{call, From}, get_channels, _State, Data
) ->
@ -570,7 +578,7 @@ add_channels(Data) ->
Channels = Data#data.added_channels,
NewChannels = lists:foldl(
fun({ChannelID, _Conf}, Acc) ->
maps:put(ChannelID, {error, connecting}, Acc)
maps:put(ChannelID, channel_status(), Acc)
end,
Channels,
ChannelIDConfigTuples
@ -589,7 +597,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
AddedChannelsMap = Data#data.added_channels,
%% Set the channel status to connecting to indicate that
%% we have not yet performed the initial health_check
NewAddedChannelsMap = maps:put(ChannelID, connecting, AddedChannelsMap),
NewAddedChannelsMap = maps:put(
ChannelID,
channel_status_new_waiting_for_health_check(),
AddedChannelsMap
),
NewData = Data#data{
state = NewState,
added_channels = NewAddedChannelsMap
@ -603,7 +615,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
reason => Reason
}),
AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put(ChannelID, Error, AddedChannelsMap),
NewAddedChannelsMap = maps:put(
ChannelID,
channel_status(Error),
AddedChannelsMap
),
NewData = Data#data{
added_channels = NewAddedChannelsMap
},
@ -680,65 +696,44 @@ make_test_id() ->
RandId = iolist_to_binary(emqx_utils:gen_id(16)),
<<?TEST_ID_PREFIX, RandId/binary>>.
handle_add_channel(From, Data, ChannelId, ChannelConfig) ->
handle_add_channel(From, Data, ChannelId, Config) ->
Channels = Data#data.added_channels,
case maps:get(ChannelId, Channels, {error, not_added}) of
{error, _Reason} ->
case
channel_status_is_channel_added(
maps:get(
ChannelId,
Channels,
channel_status()
)
)
of
false ->
%% The channel is not installed in the connector state
%% We need to install it
handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig);
_ ->
%% We insert it into the channels map and let the health check
%% take care of the rest
NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels),
NewData = Data#data{added_channels = NewChannels},
{keep_state, update_state(NewData, Data), [
{reply, From, ok}
]};
true ->
%% The channel is already installed in the connector state
%% We don't need to install it again
{keep_state_and_data, [{reply, From, ok}]}
end.
handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) ->
NewData = add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig),
%% Trigger a health check to raise alarm if channel is not healthy
{keep_state, NewData, [{reply, From, ok}, {state_timeout, 0, health_check}]}.
add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) ->
case
emqx_resource:call_add_channel(
Data#data.id, Data#data.mod, Data#data.state, ChannelId, ChannelConfig
)
of
{ok, NewState} ->
AddedChannelsMap = Data#data.added_channels,
%% Setting channel status to connecting to indicate that an health check
%% has not been performed yet
NewAddedChannelsMap = maps:put(ChannelId, connecting, AddedChannelsMap),
UpdatedData = Data#data{
state = NewState,
added_channels = NewAddedChannelsMap
},
update_state(UpdatedData, Data);
{error, _Reason} = Error ->
ChannelsMap = Data#data.added_channels,
NewChannelsMap = maps:put(ChannelId, Error, ChannelsMap),
UpdatedData = Data#data{
added_channels = NewChannelsMap
},
update_state(UpdatedData, Data)
end.
handle_not_connected_add_channel(From, ChannelId, State, Data) ->
%% When state is not connected the channel will be added to the channels
%% map but nothing else will happen.
Channels = Data#data.added_channels,
NewChannels = maps:put(ChannelId, {error, resource_not_operational}, Channels),
NewData1 = Data#data{added_channels = NewChannels},
%% Do channel health check to trigger alarm
NewData2 = channels_health_check(State, NewData1),
{keep_state, update_state(NewData2, Data), [{reply, From, ok}]}.
NewData = add_channel_status_if_not_exists(Data, ChannelId, State),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels,
%% Deactivate alarm
_ = maybe_clear_alarm(ChannelId),
case maps:get(ChannelId, Channels, {error, not_added}) of
{error, _} ->
case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status())) of
false ->
%% The channel is already not installed in the connector state.
%% We still need to remove it from the added_channels map
AddedChannels = Data#data.added_channels,
@ -747,7 +742,7 @@ handle_remove_channel(From, ChannelId, Data) ->
added_channels = NewAddedChannels
},
{keep_state, NewData, [{reply, From, ok}]};
_ ->
true ->
%% The channel is installed in the connector state
handle_remove_channel_exists(From, ChannelId, Data)
end.
@ -777,9 +772,10 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
{keep_state_and_data, [{reply, From, Error}]}
end.
handle_not_connected_remove_channel(From, ChannelId, Data) ->
%% When state is not connected the channel will be removed from the channels
%% map but nothing else will happen.
handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) ->
%% When state is not connected and not connecting the channel will be removed
%% from the channels map but nothing else will happen since the channel
%% is not addded/installed in the resource state.
Channels = Data#data.added_channels,
NewChannels = maps:remove(ChannelId, Channels),
NewData = Data#data{added_channels = NewChannels},
@ -796,7 +792,7 @@ handle_manually_health_check(From, Data) ->
).
handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
{keep_state_and_data, [{reply, From, {ok, disconnected}}]};
{keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
handle_manually_channel_health_check(
From,
#data{added_channels = Channels} = _Data,
@ -810,10 +806,11 @@ handle_manually_channel_health_check(
_Data,
_ChannelId
) ->
{keep_state_and_data, [{reply, From, {error, channel_not_found}}]}.
{keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}.
get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) ->
emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State).
RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
channel_status(RawStatus).
handle_connecting_health_check(Data) ->
with_health_check(
@ -833,9 +830,9 @@ handle_connected_health_check(Data) ->
with_health_check(
Data,
fun
(connected, UpdatedData) ->
{keep_state, channels_health_check(connected, UpdatedData),
health_check_actions(UpdatedData)};
(connected, UpdatedData0) ->
UpdatedData1 = channels_health_check(connected, UpdatedData0),
{keep_state, UpdatedData1, health_check_actions(UpdatedData1)};
(Status, UpdatedData) ->
?SLOG(warning, #{
msg => "health_check_failed",
@ -861,20 +858,59 @@ with_health_check(#data{error = PrevError} = Data, Func) ->
channels_health_check(connected = _ResourceStatus, Data0) ->
Channels = maps:to_list(Data0#data.added_channels),
%% All channels with an error status are considered not added
%% All channels with a stutus different from connected or connecting are
%% not added
ChannelsNotAdded = [
ChannelId
|| {ChannelId, Status} <- Channels,
not is_channel_added(Status)
not channel_status_is_channel_added(Status)
],
%% Attempt to add channels that are not added
ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded),
ChannelsNotAddedWithConfigs =
get_config_for_channels(Data0, ChannelsNotAdded),
Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
%% Now that we have done the adding, we can get the status of all channels
Data2 = channel_status_for_all_channels(Data1),
update_state(Data2, Data0);
channels_health_check(connecting, Data0) ->
%% Whenever the resource is connecting:
%% 1. Change the status of all added channels to connecting
%% 2. Raise alarms (TODO: if it is a probe we should not raise alarms)
Channels = Data0#data.added_channels,
ChannelsToChangeStatusFor = [
ChannelId
|| {ChannelId, Status} <- maps:to_list(Channels),
channel_status_is_channel_added(Status)
],
ChannelsWithNewStatuses =
[
{ChannelId, channel_status({connecting, resource_is_connecting})}
|| ChannelId <- ChannelsToChangeStatusFor
],
%% Update the channels map
NewChannels = lists:foldl(
fun({ChannelId, NewStatus}, Acc) ->
maps:update(ChannelId, NewStatus, Acc)
end,
Channels,
ChannelsWithNewStatuses
),
ChannelsWithNewAndPrevErrorStatuses =
[
{ChannelId, NewStatus, maps:get(ChannelId, Channels)}
|| {ChannelId, NewStatus} <- maps:to_list(NewChannels)
],
%% Raise alarms for all channels
lists:foreach(
fun({ChannelId, Status, PrevStatus}) ->
maybe_alarm(connecting, ChannelId, Status, PrevStatus)
end,
ChannelsWithNewAndPrevErrorStatuses
),
Data1 = Data0#data{added_channels = NewChannels},
update_state(Data1, Data0);
channels_health_check(ResourceStatus, Data0) ->
%% Whenever the resource is not connected:
%% Whenever the resource is not connected and not connecting:
%% 1. Remove all added channels
%% 2. Change the status to an error status
%% 3. Raise alarms
@ -882,13 +918,20 @@ channels_health_check(ResourceStatus, Data0) ->
ChannelsToRemove = [
ChannelId
|| {ChannelId, Status} <- maps:to_list(Channels),
is_channel_added(Status)
channel_status_is_channel_added(Status)
],
Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true),
ChannelsWithNewAndOldStatuses =
[
{ChannelId, OldStatus,
{error, resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1)}}
channel_status(
{error,
resource_not_connected_channel_error_msg(
ResourceStatus,
ChannelId,
Data1
)}
)}
|| {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels)
],
%% Raise alarms
@ -928,18 +971,19 @@ channel_status_for_all_channels(Data) ->
AddedChannelsWithOldAndNewStatus = [
{ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)}
|| {ChannelId, OldStatus} <- Channels,
is_channel_added(OldStatus)
channel_status_is_channel_added(OldStatus)
],
%% Remove the added channels with a new error statuses
%% Remove the added channels with a a status different from connected or connecting
ChannelsToRemove = [
ChannelId
|| {ChannelId, _, {error, _}} <- AddedChannelsWithOldAndNewStatus
|| {ChannelId, _, NewStatus} <- AddedChannelsWithOldAndNewStatus,
not channel_status_is_channel_added(NewStatus)
],
Data1 = remove_channels_in_list(ChannelsToRemove, Data, true),
%% Raise/clear alarms
lists:foreach(
fun
({ID, _OldStatus, connected}) ->
({ID, _OldStatus, #{status := connected}}) ->
_ = maybe_clear_alarm(ID);
({ID, OldStatus, NewStatus}) ->
_ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
@ -958,18 +1002,14 @@ channel_status_for_all_channels(Data) ->
),
Data1#data{added_channels = NewChannelsMap}.
is_channel_added({error, _}) ->
false;
is_channel_added(_) ->
true.
get_config_for_channels(Data0, ChannelsWithoutConfig) ->
ResId = Data0#data.id,
Mod = Data0#data.mod,
Channels = emqx_resource:call_get_channels(ResId, Mod),
ChannelIdToConfig = maps:from_list(Channels),
ChannelStatusMap = Data0#data.added_channels,
ChannelsWithConfig = [
{Id, maps:get(Id, ChannelIdToConfig, no_config)}
{Id, get_config_from_map_or_channel_status(Id, ChannelIdToConfig, ChannelStatusMap)}
|| Id <- ChannelsWithoutConfig
],
%% Filter out channels without config
@ -979,6 +1019,16 @@ get_config_for_channels(Data0, ChannelsWithoutConfig) ->
Conf =/= no_config
].
get_config_from_map_or_channel_status(ChannelId, ChannelIdToConfig, ChannelStatusMap) ->
ChannelStatus = maps:get(ChannelId, ChannelStatusMap, #{}),
case maps:get(config, ChannelStatus, undefined) of
undefined ->
%% Channel config
maps:get(ChannelId, ChannelIdToConfig, no_config);
Config ->
Config
end.
update_state(Data) ->
update_state(Data, undefined).
@ -1098,3 +1148,86 @@ safe_call(ResId, Message, Timeout) ->
exit:{timeout, _} ->
{error, timeout}
end.
%% Helper functions for chanel status data
channel_status() ->
#{
%% The status of the channel. Can be one of the following:
%% - disconnected: the channel is not added to the resource (error may contain the reason))
%% - connecting: the channel has been added to the resource state but
%% either the resource status is connecting or the
%% on_channel_get_status callback has returned connecting
%% - connected: the channel is added to the resource, the resource is
%% connected and the on_channel_get_status callback has returned
%% connected. The error field should be undefined.
status => disconnected,
error => not_added_yet
}.
%% If the channel is added with add_channel/2, the config field will be set to
%% the config. This is useful when doing probing since the config is not stored
%% anywhere else in that case.
channel_status_new_with_config(Config) ->
#{
status => disconnected,
error => not_added_yet,
config => Config
}.
channel_status_new_waiting_for_health_check() ->
#{
status => connecting,
error => no_health_check_yet
}.
channel_status({connecting, Error}) ->
#{
status => connecting,
error => Error
};
channel_status(connecting) ->
#{
status => connecting,
error => <<"Not connected for unknown reason">>
};
channel_status(connected) ->
#{
status => connected,
error => undefined
};
%% Probably not so useful but it is permitted to set an error even when the
%% status is connected
channel_status({connected, Error}) ->
#{
status => connected,
error => Error
};
channel_status({error, Reason}) ->
#{
status => disconnected,
error => Reason
}.
channel_status_is_channel_added(#{
status := connected
}) ->
true;
channel_status_is_channel_added(#{
status := connecting
}) ->
true;
channel_status_is_channel_added(_Status) ->
false.
add_channel_status_if_not_exists(Data, ChannelId, State) ->
Channels = Data#data.added_channels,
case maps:is_key(ChannelId, Channels) of
true ->
Data;
false ->
ChannelStatus = channel_status({error, resource_not_operational}),
NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
maybe_alarm(State, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels}
end.