fix(bridge_v2): channels should not be removed when status is connecting

This fixes so that channels are not removed from the resource state when
their status is connecting. This is needed for Kafka since Kafka's message
buffer is stored in the resource state.

Fixes:
https://emqx.atlassian.net/browse/EMQX-11270
This commit is contained in:
Kjell Winblad 2023-10-31 19:31:16 +01:00
parent ae760a4ca1
commit 95f3b94ac3
6 changed files with 362 additions and 106 deletions

View File

@ -173,20 +173,24 @@ lookup(Type, Name) ->
Channels = maps:get(added_channels, InstanceData, #{}), Channels = maps:get(added_channels, InstanceData, #{}),
BridgeV2Id = id(Type, Name, BridgeConnector), BridgeV2Id = id(Type, Name, BridgeConnector),
ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
DisplayBridgeV2Status = {DisplayBridgeV2Status, ErrorMsg} =
case ChannelStatus of case ChannelStatus of
{error, undefined} -> <<"Unknown reason">>; #{status := connected} ->
{error, Reason} -> emqx_utils:readable_error_msg(Reason); {connected, <<"">>};
connected -> <<"connected">>; #{status := Status, error := undefined} ->
connecting -> <<"connecting">>; {Status, <<"Unknown reason">>};
Error -> emqx_utils:readable_error_msg(Error) #{status := Status, error := Error} ->
{Status, emqx_utils:readable_error_msg(Error)};
undefined ->
{disconnected, <<"Pending installation">>}
end, end,
{ok, #{ {ok, #{
type => Type, type => Type,
name => Name, name => Name,
raw_config => RawConf, raw_config => RawConf,
resource_data => InstanceData, resource_data => InstanceData,
status => DisplayBridgeV2Status status => DisplayBridgeV2Status,
error => ErrorMsg
}} }}
end. end.
@ -526,10 +530,10 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
ConnectorId, ChannelTestId ConnectorId, ChannelTestId
), ),
case HealthCheckResult of case HealthCheckResult of
{error, Reason} -> #{status := connected} ->
{error, Reason}; ok;
_ -> #{status := Status, error := Error} ->
ok {error, {Status, Error}}
end end
end end
end, end,
@ -1032,6 +1036,7 @@ lookup_and_transform_to_bridge_v1_helper(
BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2), BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2),
BridgeV1 = maps:remove(status, BridgeV1Tmp), BridgeV1 = maps:remove(status, BridgeV1Tmp),
BridgeV2Status = maps:get(status, BridgeV2, undefined), BridgeV2Status = maps:get(status, BridgeV2, undefined),
BridgeV2Error = maps:get(error, BridgeV2, undefined),
ResourceData1 = maps:get(resource_data, BridgeV1, #{}), ResourceData1 = maps:get(resource_data, BridgeV1, #{}),
%% Replace id in resouce data %% Replace id in resouce data
BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>, BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>,
@ -1040,12 +1045,12 @@ lookup_and_transform_to_bridge_v1_helper(
case ConnectorStatus of case ConnectorStatus of
connected -> connected ->
case BridgeV2Status of case BridgeV2Status of
<<"connected">> -> connected ->
%% No need to modify the status %% No need to modify the status
{ok, BridgeV1#{resource_data => ResourceData2}}; {ok, BridgeV1#{resource_data => ResourceData2}};
NotConnected -> NotConnected ->
ResourceData3 = maps:put(status, connecting, ResourceData2), ResourceData3 = maps:put(status, NotConnected, ResourceData2),
ResourceData4 = maps:put(error, NotConnected, ResourceData3), ResourceData4 = maps:put(error, BridgeV2Error, ResourceData3),
BridgeV1Final = maps:put(resource_data, ResourceData4, BridgeV1), BridgeV1Final = maps:put(resource_data, ResourceData4, BridgeV1),
{ok, BridgeV1Final} {ok, BridgeV1Final}
end; end;

View File

@ -133,6 +133,7 @@ setup_mocks() ->
ok. ok.
init_per_suite(Config) -> init_per_suite(Config) ->
snabbkaffe:fix_ct_logging(),
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
app_specs(), app_specs(),
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
@ -238,12 +239,12 @@ t_create_dry_run_fail_add_channel(_) ->
{error, Msg} {error, Msg}
end), end),
Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1}, Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
OnAddChannel2 = wrap_fun(fun() -> OnAddChannel2 = wrap_fun(fun() ->
throw(Msg) throw(Msg)
end), end),
Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2}, Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2),
ok. ok.
t_create_dry_run_fail_get_channel_status(_) -> t_create_dry_run_fail_get_channel_status(_) ->
@ -252,7 +253,7 @@ t_create_dry_run_fail_get_channel_status(_) ->
{error, Msg} {error, Msg}
end), end),
Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1}, Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1},
{error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1),
Fun2 = wrap_fun(fun() -> Fun2 = wrap_fun(fun() ->
throw(Msg) throw(Msg)
end), end),
@ -280,7 +281,9 @@ t_is_valid_bridge_v1(_) ->
t_manual_health_check(_) -> t_manual_health_check(_) ->
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()),
%% Run a health check for the bridge %% Run a health check for the bridge
connected = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), #{error := undefined, status := connected} = emqx_bridge_v2:health_check(
bridge_type(), my_test_bridge
),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok. ok.
@ -290,7 +293,9 @@ t_manual_health_check_exception(_) ->
}, },
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge %% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), #{error := my_error, status := disconnected} = emqx_bridge_v2:health_check(
bridge_type(), my_test_bridge
),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok. ok.
@ -300,7 +305,9 @@ t_manual_health_check_exception_error(_) ->
}, },
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge %% Run a health check for the bridge
{error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), #{error := _, status := disconnected} = emqx_bridge_v2:health_check(
bridge_type(), my_test_bridge
),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok. ok.
@ -310,7 +317,9 @@ t_manual_health_check_error(_) ->
}, },
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf),
%% Run a health check for the bridge %% Run a health check for the bridge
{error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), #{error := my_error, status := disconnected} = emqx_bridge_v2:health_check(
bridge_type(), my_test_bridge
),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok. ok.
@ -484,6 +493,83 @@ t_send_message_unhealthy_connector(_) ->
ets:delete(ResponseETS), ets:delete(ResponseETS),
ok. ok.
t_connector_connected_to_connecting_to_connected_no_channel_restart(_) ->
ResponseETS = ets:new(response_ets, [public]),
ets:insert(ResponseETS, {on_start_value, conf}),
ets:insert(ResponseETS, {on_get_status_value, connected}),
OnStartFun = wrap_fun(fun(Conf) ->
case ets:lookup_element(ResponseETS, on_start_value, 2) of
conf ->
{ok, Conf};
V ->
V
end
end),
OnGetStatusFun = wrap_fun(fun() ->
ets:lookup_element(ResponseETS, on_get_status_value, 2)
end),
OnAddChannelCntr = counters:new(1, []),
OnAddChannelFun = wrap_fun(fun(_InstId, ConnectorState, _ChannelId, _ChannelConfig) ->
counters:add(OnAddChannelCntr, 1, 1),
{ok, ConnectorState}
end),
ConConfig = emqx_utils_maps:deep_merge(con_config(), #{
<<"on_start_fun">> => OnStartFun,
<<"on_get_status_fun">> => OnGetStatusFun,
<<"on_add_channel_fun">> => OnAddChannelFun,
<<"resource_opts">> => #{<<"start_timeout">> => 100}
}),
ConName = ?FUNCTION_NAME,
{ok, _} = emqx_connector:create(con_type(), ConName, ConConfig),
BridgeConf = (bridge_config())#{
<<"connector">> => atom_to_binary(ConName)
},
{ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf),
%% Wait until on_add_channel_fun is called at least once
wait_until(fun() ->
counters:get(OnAddChannelCntr, 1) =:= 1
end),
1 = counters:get(OnAddChannelCntr, 1),
%% We change the status of the connector
ets:insert(ResponseETS, {on_get_status_value, connecting}),
%% Wait until the status is changed
wait_until(fun() ->
{ok, BridgeData} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
maps:get(status, BridgeData) =:= connecting
end),
{ok, BridgeData1} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
ct:pal("Bridge V2 status changed to: ~p", [maps:get(status, BridgeData1)]),
%% We change the status again back to connected
ets:insert(ResponseETS, {on_get_status_value, connected}),
%% Wait until the status is connected again
wait_until(fun() ->
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
maps:get(status, BridgeData2) =:= connected
end),
%% On add channel should not have been called again
1 = counters:get(OnAddChannelCntr, 1),
%% We change the status to an error
ets:insert(ResponseETS, {on_get_status_value, {error, my_error}}),
%% Wait until the status is changed
wait_until(fun() ->
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
maps:get(status, BridgeData2) =:= disconnected
end),
%% Now we go back to connected
ets:insert(ResponseETS, {on_get_status_value, connected}),
wait_until(fun() ->
{ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge),
maps:get(status, BridgeData2) =:= connected
end),
%% Now the channel should have been removed and added again
wait_until(fun() ->
counters:get(OnAddChannelCntr, 1) =:= 2
end),
ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
ok = emqx_connector:remove(con_type(), ConName),
ets:delete(ResponseETS),
ok.
t_unhealthy_channel_alarm(_) -> t_unhealthy_channel_alarm(_) ->
Conf = (bridge_config())#{ Conf = (bridge_config())#{
<<"on_get_channel_status_fun">> => <<"on_get_channel_status_fun">> =>
@ -720,3 +806,20 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) ->
end end
), ),
ok. ok.
%% Helper Functions
wait_until(Fun) ->
wait_until(Fun, 5000).
wait_until(Fun, Timeout) when Timeout >= 0 ->
case Fun() of
true ->
ok;
false ->
IdleTime = 100,
timer:sleep(IdleTime),
wait_until(Fun, Timeout - IdleTime)
end;
wait_until(_, _) ->
ct:fail("Wait until event did not happen").

View File

@ -54,6 +54,14 @@ on_add_channel(
) -> ) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun(); Fun();
on_add_channel(
InstId,
#{on_add_channel_fun := FunRef} = ConnectorState,
ChannelId,
ChannelConfig
) ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
Fun(InstId, ConnectorState, ChannelId, ChannelConfig);
on_add_channel( on_add_channel(
_InstId, _InstId,
State, State,
@ -118,8 +126,8 @@ on_get_channel_status(
ChannelId, ChannelId,
State State
) -> ) ->
Channels = maps:get(channels, State), Channels = maps:get(channels, State, #{}),
ChannelState = maps:get(ChannelId, Channels), ChannelState = maps:get(ChannelId, Channels, #{}),
case ChannelState of case ChannelState of
#{on_get_channel_status_fun := FunRef} -> #{on_get_channel_status_fun := FunRef} ->
Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),

View File

@ -200,6 +200,7 @@
-callback on_get_channel_status(resource_id(), channel_id(), resource_state()) -> -callback on_get_channel_status(resource_id(), channel_id(), resource_state()) ->
channel_status() channel_status()
| {channel_status(), Reason :: term()}
| {error, term()}. | {error, term()}.
-callback query_mode(Config :: term()) -> query_mode(). -callback query_mode(Config :: term()) -> query_mode().
@ -457,7 +458,7 @@ health_check(ResId) ->
emqx_resource_manager:health_check(ResId). emqx_resource_manager:health_check(ResId).
-spec channel_health_check(resource_id(), channel_id()) -> -spec channel_health_check(resource_id(), channel_id()) ->
{ok, resource_status()} | {error, term()}. #{status := channel_status(), error := term(), any() := any()}.
channel_health_check(ResId, ChannelId) -> channel_health_check(ResId, ChannelId) ->
emqx_resource_manager:channel_health_check(ResId, ChannelId). emqx_resource_manager:channel_health_check(ResId, ChannelId).
@ -534,6 +535,7 @@ call_health_check(ResId, Mod, ResourceState) ->
-spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) -> -spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) ->
channel_status() channel_status()
| {channel_status(), Reason :: term()}
| {error, term()}. | {error, term()}.
call_channel_health_check(ResId, ChannelId, Mod, ResourceState) -> call_channel_health_check(ResId, ChannelId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)). ?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)).

View File

@ -1112,11 +1112,21 @@ is_channel_id(Id) ->
%% There is no need to query the conncector if the channel is not %% There is no need to query the conncector if the channel is not
%% installed as the query will fail anyway. %% installed as the query will fail anyway.
pre_query_channel_check({Id, _} = _Request, Channels) when pre_query_channel_check({Id, _} = _Request, Channels) when
is_map_key(Id, Channels), is_map_key(Id, Channels)
(map_get(Id, Channels) =:= connected orelse map_get(Id, Channels) =:= connecting)
-> ->
ChannelStatus = maps:get(Id, Channels),
case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of
true ->
ok; ok;
false ->
maybe_throw_channel_not_installed(Id)
end;
pre_query_channel_check({Id, _} = _Request, _Channels) -> pre_query_channel_check({Id, _} = _Request, _Channels) ->
maybe_throw_channel_not_installed(Id);
pre_query_channel_check(_Request, _Channels) ->
ok.
maybe_throw_channel_not_installed(Id) ->
%% Fail with a recoverable error if the channel is not installed %% Fail with a recoverable error if the channel is not installed
%% so that the operation can be retried. It is emqx_resource_manager's %% so that the operation can be retried. It is emqx_resource_manager's
%% responsibility to ensure that the channel installation is retried. %% responsibility to ensure that the channel installation is retried.
@ -1128,9 +1138,7 @@ pre_query_channel_check({Id, _} = _Request, _Channels) ->
); );
false -> false ->
ok ok
end; end.
pre_query_channel_check(_Request, _Channels) ->
ok.
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer

View File

@ -44,7 +44,8 @@
list_group/1, list_group/1,
lookup_cached/1, lookup_cached/1,
get_metrics/1, get_metrics/1,
reset_metrics/1 reset_metrics/1,
channel_status_is_channel_added/1
]). ]).
-export([ -export([
@ -306,7 +307,7 @@ health_check(ResId) ->
safe_call(ResId, health_check, ?T_OPERATION). safe_call(ResId, health_check, ?T_OPERATION).
-spec channel_health_check(resource_id(), channel_id()) -> -spec channel_health_check(resource_id(), channel_id()) ->
{ok, resource_status()} | {error, term()}. #{status := channel_status(), error := term(), any() := any()}.
channel_health_check(ResId, ChannelId) -> channel_health_check(ResId, ChannelId) ->
%% Do normal health check first to trigger health checks for channels %% Do normal health check first to trigger health checks for channels
%% and update the cached health status for the channels %% and update the cached health status for the channels
@ -420,6 +421,10 @@ handle_event(internal, start_resource, connecting, Data) ->
start_resource(Data, undefined); start_resource(Data, undefined);
handle_event(state_timeout, health_check, connecting, Data) -> handle_event(state_timeout, health_check, connecting, Data) ->
handle_connecting_health_check(Data); handle_connecting_health_check(Data);
handle_event(
{call, From}, {remove_channel, ChannelId}, connecting = _State, Data
) ->
handle_remove_channel(From, ChannelId, Data);
%% State: CONNECTED %% State: CONNECTED
%% The connected state is entered after a successful on_start/2 of the callback mod %% The connected state is entered after a successful on_start/2 of the callback mod
%% and successful health_checks %% and successful health_checks
@ -459,7 +464,7 @@ handle_event(
handle_event( handle_event(
{call, From}, {remove_channel, ChannelId}, _State, Data {call, From}, {remove_channel, ChannelId}, _State, Data
) -> ) ->
handle_not_connected_remove_channel(From, ChannelId, Data); handle_not_connected_or_connecting_remove_channel(From, ChannelId, Data);
handle_event( handle_event(
{call, From}, get_channels, _State, Data {call, From}, get_channels, _State, Data
) -> ) ->
@ -570,7 +575,7 @@ add_channels(Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
NewChannels = lists:foldl( NewChannels = lists:foldl(
fun({ChannelID, _Conf}, Acc) -> fun({ChannelID, _Conf}, Acc) ->
maps:put(ChannelID, {error, connecting}, Acc) maps:put(ChannelID, channel_status_new(), Acc)
end, end,
Channels, Channels,
ChannelIDConfigTuples ChannelIDConfigTuples
@ -589,7 +594,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
AddedChannelsMap = Data#data.added_channels, AddedChannelsMap = Data#data.added_channels,
%% Set the channel status to connecting to indicate that %% Set the channel status to connecting to indicate that
%% we have not yet performed the initial health_check %% we have not yet performed the initial health_check
NewAddedChannelsMap = maps:put(ChannelID, connecting, AddedChannelsMap), NewAddedChannelsMap = maps:put(
ChannelID,
channel_status_new_waiting_for_health_check(),
AddedChannelsMap
),
NewData = Data#data{ NewData = Data#data{
state = NewState, state = NewState,
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
@ -603,7 +612,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
reason => Reason reason => Reason
}), }),
AddedChannelsMap = Data#data.added_channels, AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put(ChannelID, Error, AddedChannelsMap), NewAddedChannelsMap = maps:put(
ChannelID,
channel_status_new(Error),
AddedChannelsMap
),
NewData = Data#data{ NewData = Data#data{
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
}, },
@ -680,65 +693,45 @@ make_test_id() ->
RandId = iolist_to_binary(emqx_utils:gen_id(16)), RandId = iolist_to_binary(emqx_utils:gen_id(16)),
<<?TEST_ID_PREFIX, RandId/binary>>. <<?TEST_ID_PREFIX, RandId/binary>>.
handle_add_channel(From, Data, ChannelId, ChannelConfig) -> handle_add_channel(From, Data, ChannelId, Config) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
case maps:get(ChannelId, Channels, {error, not_added}) of case
{error, _Reason} -> channel_status_is_channel_added(
maps:get(
ChannelId,
Channels,
channel_status_new()
)
)
of
false ->
%% The channel is not installed in the connector state %% The channel is not installed in the connector state
%% We need to install it %% We need insert it into the channels map and let the health check
handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig); %% take care of the rest
_ -> NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels),
NewData = Data#data{added_channels = NewChannels},
{keep_state, update_state(NewData, Data), [
{reply, From, ok}, {state_timeout, 0, health_check}
]};
%%handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig);
true ->
%% The channel is already installed in the connector state %% The channel is already installed in the connector state
%% We don't need to install it again %% We don't need to install it again
{keep_state_and_data, [{reply, From, ok}]} {keep_state_and_data, [{reply, From, ok}]}
end. end.
handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) ->
NewData = add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig),
%% Trigger a health check to raise alarm if channel is not healthy
{keep_state, NewData, [{reply, From, ok}, {state_timeout, 0, health_check}]}.
add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) ->
case
emqx_resource:call_add_channel(
Data#data.id, Data#data.mod, Data#data.state, ChannelId, ChannelConfig
)
of
{ok, NewState} ->
AddedChannelsMap = Data#data.added_channels,
%% Setting channel status to connecting to indicate that an health check
%% has not been performed yet
NewAddedChannelsMap = maps:put(ChannelId, connecting, AddedChannelsMap),
UpdatedData = Data#data{
state = NewState,
added_channels = NewAddedChannelsMap
},
update_state(UpdatedData, Data);
{error, _Reason} = Error ->
ChannelsMap = Data#data.added_channels,
NewChannelsMap = maps:put(ChannelId, Error, ChannelsMap),
UpdatedData = Data#data{
added_channels = NewChannelsMap
},
update_state(UpdatedData, Data)
end.
handle_not_connected_add_channel(From, ChannelId, State, Data) -> handle_not_connected_add_channel(From, ChannelId, State, Data) ->
%% When state is not connected the channel will be added to the channels %% When state is not connected the channel will be added to the channels
%% map but nothing else will happen. %% map but nothing else will happen.
Channels = Data#data.added_channels, NewData = add_channel_status_if_not_exists(Data, ChannelId, State),
NewChannels = maps:put(ChannelId, {error, resource_not_operational}, Channels), {keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
NewData1 = Data#data{added_channels = NewChannels},
%% Do channel health check to trigger alarm
NewData2 = channels_health_check(State, NewData1),
{keep_state, update_state(NewData2, Data), [{reply, From, ok}]}.
handle_remove_channel(From, ChannelId, Data) -> handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
%% Deactivate alarm %% Deactivate alarm
_ = maybe_clear_alarm(ChannelId), _ = maybe_clear_alarm(ChannelId),
case maps:get(ChannelId, Channels, {error, not_added}) of case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status_new())) of
{error, _} -> false ->
%% The channel is already not installed in the connector state. %% The channel is already not installed in the connector state.
%% We still need to remove it from the added_channels map %% We still need to remove it from the added_channels map
AddedChannels = Data#data.added_channels, AddedChannels = Data#data.added_channels,
@ -747,7 +740,7 @@ handle_remove_channel(From, ChannelId, Data) ->
added_channels = NewAddedChannels added_channels = NewAddedChannels
}, },
{keep_state, NewData, [{reply, From, ok}]}; {keep_state, NewData, [{reply, From, ok}]};
_ -> true ->
%% The channel is installed in the connector state %% The channel is installed in the connector state
handle_remove_channel_exists(From, ChannelId, Data) handle_remove_channel_exists(From, ChannelId, Data)
end. end.
@ -777,9 +770,9 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
{keep_state_and_data, [{reply, From, Error}]} {keep_state_and_data, [{reply, From, Error}]}
end. end.
handle_not_connected_remove_channel(From, ChannelId, Data) -> handle_not_connected_or_connecting_remove_channel(From, ChannelId, Data) ->
%% When state is not connected the channel will be removed from the channels %% When state is not connected or connecting the channel will be removed
%% map but nothing else will happen. %% from the channels map but nothing else will happen.
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
NewChannels = maps:remove(ChannelId, Channels), NewChannels = maps:remove(ChannelId, Channels),
NewData = Data#data{added_channels = NewChannels}, NewData = Data#data{added_channels = NewChannels},
@ -796,7 +789,7 @@ handle_manually_health_check(From, Data) ->
). ).
handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) -> handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
{keep_state_and_data, [{reply, From, {ok, disconnected}}]}; {keep_state_and_data, [{reply, From, channel_status_new({error, resource_disconnected})}]};
handle_manually_channel_health_check( handle_manually_channel_health_check(
From, From,
#data{added_channels = Channels} = _Data, #data{added_channels = Channels} = _Data,
@ -810,10 +803,11 @@ handle_manually_channel_health_check(
_Data, _Data,
_ChannelId _ChannelId
) -> ) ->
{keep_state_and_data, [{reply, From, {error, channel_not_found}}]}. {keep_state_and_data, [{reply, From, channel_status_new({error, channel_not_found})}]}.
get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) -> get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) ->
emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State). RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
channel_status_new(RawStatus).
handle_connecting_health_check(Data) -> handle_connecting_health_check(Data) ->
with_health_check( with_health_check(
@ -833,9 +827,9 @@ handle_connected_health_check(Data) ->
with_health_check( with_health_check(
Data, Data,
fun fun
(connected, UpdatedData) -> (connected, UpdatedData0) ->
{keep_state, channels_health_check(connected, UpdatedData), UpdatedData1 = channels_health_check(connected, UpdatedData0),
health_check_actions(UpdatedData)}; {keep_state, UpdatedData1, health_check_actions(UpdatedData1)};
(Status, UpdatedData) -> (Status, UpdatedData) ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "health_check_failed", msg => "health_check_failed",
@ -861,20 +855,59 @@ with_health_check(#data{error = PrevError} = Data, Func) ->
channels_health_check(connected = _ResourceStatus, Data0) -> channels_health_check(connected = _ResourceStatus, Data0) ->
Channels = maps:to_list(Data0#data.added_channels), Channels = maps:to_list(Data0#data.added_channels),
%% All channels with an error status are considered not added %% All channels with a stutus different from connected or connecting are
%% not added
ChannelsNotAdded = [ ChannelsNotAdded = [
ChannelId ChannelId
|| {ChannelId, Status} <- Channels, || {ChannelId, Status} <- Channels,
not is_channel_added(Status) not channel_status_is_channel_added(Status)
], ],
%% Attempt to add channels that are not added %% Attempt to add channels that are not added
ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded), ChannelsNotAddedWithConfigs =
get_config_for_channels(Data0, ChannelsNotAdded),
Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0), Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
%% Now that we have done the adding, we can get the status of all channels %% Now that we have done the adding, we can get the status of all channels
Data2 = channel_status_for_all_channels(Data1), Data2 = channel_status_for_all_channels(Data1),
update_state(Data2, Data0); update_state(Data2, Data0);
channels_health_check(connecting, Data0) ->
%% Whenever the resource is connecting:
%% 1. Change the status of all added channels to connecting
%% 2. Raise alarms (TODO: if it is a probe we should not raise alarms)
Channels = Data0#data.added_channels,
ChannelsToChangeStatusFor = [
ChannelId
|| {ChannelId, Status} <- maps:to_list(Channels),
channel_status_is_channel_added(Status)
],
ChannelsWithNewStatuses =
[
{ChannelId, channel_status_new({connecting, resource_is_connecting})}
|| ChannelId <- ChannelsToChangeStatusFor
],
%% Update the channels map
NewChannels = lists:foldl(
fun({ChannelId, NewStatus}, Acc) ->
maps:update(ChannelId, NewStatus, Acc)
end,
Channels,
ChannelsWithNewStatuses
),
ChannelsWithNewAndPrevErrorStatuses =
[
{ChannelId, NewStatus, maps:get(ChannelId, Channels)}
|| {ChannelId, NewStatus} <- maps:to_list(NewChannels)
],
%% Raise alarms for all channels
lists:foreach(
fun({ChannelId, Status, PrevStatus}) ->
maybe_alarm(connecting, ChannelId, Status, PrevStatus)
end,
ChannelsWithNewAndPrevErrorStatuses
),
Data1 = Data0#data{added_channels = NewChannels},
update_state(Data1, Data0);
channels_health_check(ResourceStatus, Data0) -> channels_health_check(ResourceStatus, Data0) ->
%% Whenever the resource is not connected: %% Whenever the resource is not connected and not connecting:
%% 1. Remove all added channels %% 1. Remove all added channels
%% 2. Change the status to an error status %% 2. Change the status to an error status
%% 3. Raise alarms %% 3. Raise alarms
@ -882,13 +915,20 @@ channels_health_check(ResourceStatus, Data0) ->
ChannelsToRemove = [ ChannelsToRemove = [
ChannelId ChannelId
|| {ChannelId, Status} <- maps:to_list(Channels), || {ChannelId, Status} <- maps:to_list(Channels),
is_channel_added(Status) channel_status_is_channel_added(Status)
], ],
Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true), Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true),
ChannelsWithNewAndOldStatuses = ChannelsWithNewAndOldStatuses =
[ [
{ChannelId, OldStatus, {ChannelId, OldStatus,
{error, resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1)}} channel_status_new(
{error,
resource_not_connected_channel_error_msg(
ResourceStatus,
ChannelId,
Data1
)}
)}
|| {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels) || {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels)
], ],
%% Raise alarms %% Raise alarms
@ -928,18 +968,19 @@ channel_status_for_all_channels(Data) ->
AddedChannelsWithOldAndNewStatus = [ AddedChannelsWithOldAndNewStatus = [
{ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)} {ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)}
|| {ChannelId, OldStatus} <- Channels, || {ChannelId, OldStatus} <- Channels,
is_channel_added(OldStatus) channel_status_is_channel_added(OldStatus)
], ],
%% Remove the added channels with a new error statuses %% Remove the added channels with a a status different from connected or connecting
ChannelsToRemove = [ ChannelsToRemove = [
ChannelId ChannelId
|| {ChannelId, _, {error, _}} <- AddedChannelsWithOldAndNewStatus || {ChannelId, _, NewStatus} <- AddedChannelsWithOldAndNewStatus,
not channel_status_is_channel_added(NewStatus)
], ],
Data1 = remove_channels_in_list(ChannelsToRemove, Data, true), Data1 = remove_channels_in_list(ChannelsToRemove, Data, true),
%% Raise/clear alarms %% Raise/clear alarms
lists:foreach( lists:foreach(
fun fun
({ID, _OldStatus, connected}) -> ({ID, _OldStatus, #{status := connected}}) ->
_ = maybe_clear_alarm(ID); _ = maybe_clear_alarm(ID);
({ID, OldStatus, NewStatus}) -> ({ID, OldStatus, NewStatus}) ->
_ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
@ -958,18 +999,14 @@ channel_status_for_all_channels(Data) ->
), ),
Data1#data{added_channels = NewChannelsMap}. Data1#data{added_channels = NewChannelsMap}.
is_channel_added({error, _}) ->
false;
is_channel_added(_) ->
true.
get_config_for_channels(Data0, ChannelsWithoutConfig) -> get_config_for_channels(Data0, ChannelsWithoutConfig) ->
ResId = Data0#data.id, ResId = Data0#data.id,
Mod = Data0#data.mod, Mod = Data0#data.mod,
Channels = emqx_resource:call_get_channels(ResId, Mod), Channels = emqx_resource:call_get_channels(ResId, Mod),
ChannelIdToConfig = maps:from_list(Channels), ChannelIdToConfig = maps:from_list(Channels),
ChannelStatusMap = Data0#data.added_channels,
ChannelsWithConfig = [ ChannelsWithConfig = [
{Id, maps:get(Id, ChannelIdToConfig, no_config)} {Id, get_config_from_map_or_channel_status(Id, ChannelIdToConfig, ChannelStatusMap)}
|| Id <- ChannelsWithoutConfig || Id <- ChannelsWithoutConfig
], ],
%% Filter out channels without config %% Filter out channels without config
@ -979,6 +1016,16 @@ get_config_for_channels(Data0, ChannelsWithoutConfig) ->
Conf =/= no_config Conf =/= no_config
]. ].
get_config_from_map_or_channel_status(ChannelId, ChannelIdToConfig, ChannelStatusMap) ->
ChannelStatus = maps:get(ChannelId, ChannelStatusMap, #{}),
case maps:get(config, ChannelStatus, undefined) of
undefined ->
%% Channel config
maps:get(ChannelId, ChannelIdToConfig, no_config);
Config ->
Config
end.
update_state(Data) -> update_state(Data) ->
update_state(Data, undefined). update_state(Data, undefined).
@ -1098,3 +1145,86 @@ safe_call(ResId, Message, Timeout) ->
exit:{timeout, _} -> exit:{timeout, _} ->
{error, timeout} {error, timeout}
end. end.
%% Helper functions for chanel status data
channel_status_new() ->
#{
%% The status of the channel. Can be one of the following:
%% - disconnected: the channel is not added to the resource (error may contain the reason))
%% - connecting: the channel has been added to the resource state but
%% either the resource status is connecting or the
%% on_channel_get_status callback has returned connecting
%% - connected: the channel is added to the resource, the resource is
%% connected and the on_channel_get_status callback has returned
%% connected. The error field should be undefined.
status => disconnected,
error => not_added_yet
}.
%% If the channel is added with add_channel/2, the config field will be set to
%% the config. This is useful when doing probing since the config is not stored
%% anywhere else in that case.
channel_status_new_with_config(Config) ->
#{
status => disconnected,
error => not_added_yet,
config => Config
}.
channel_status_new_waiting_for_health_check() ->
#{
status => connecting,
error => no_health_check_yet
}.
channel_status_new({connecting, Error}) ->
#{
status => connecting,
error => Error
};
channel_status_new(connecting) ->
#{
status => connecting,
error => <<"Not connected for unknown reason">>
};
channel_status_new(connected) ->
#{
status => connected,
error => undefined
};
%% Probably not so useful but it is permitted to set an error even when the
%% status is connected
channel_status_new({connected, Error}) ->
#{
status => connected,
error => Error
};
channel_status_new({error, Reason}) ->
#{
status => disconnected,
error => Reason
}.
channel_status_is_channel_added(#{
status := connected
}) ->
true;
channel_status_is_channel_added(#{
status := connecting
}) ->
true;
channel_status_is_channel_added(_Status) ->
false.
add_channel_status_if_not_exists(Data, ChannelId, State) ->
Channels = Data#data.added_channels,
case maps:is_key(ChannelId, Channels) of
true ->
Data;
false ->
ChannelStatus = channel_status_new({error, resource_not_operational}),
NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
maybe_alarm(State, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels}
end.