From 95f3b94ac32e7167f30558d9a36be6c3f3d60cdb Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 31 Oct 2023 19:31:16 +0100 Subject: [PATCH] fix(bridge_v2): channels should not be removed when status is connecting This fixes so that channels are not removed from the resource state when their status is connecting. This is needed for Kafka since Kafka's message buffer is stored in the resource state. Fixes: https://emqx.atlassian.net/browse/EMQX-11270 --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 33 +- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 117 +++++++- .../test/emqx_bridge_v2_test_connector.erl | 12 +- apps/emqx_resource/src/emqx_resource.erl | 4 +- .../src/emqx_resource_buffer_worker.erl | 20 +- .../src/emqx_resource_manager.erl | 282 +++++++++++++----- 6 files changed, 362 insertions(+), 106 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 3747a4671..ea47c30e8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -173,20 +173,24 @@ lookup(Type, Name) -> Channels = maps:get(added_channels, InstanceData, #{}), BridgeV2Id = id(Type, Name, BridgeConnector), ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), - DisplayBridgeV2Status = + {DisplayBridgeV2Status, ErrorMsg} = case ChannelStatus of - {error, undefined} -> <<"Unknown reason">>; - {error, Reason} -> emqx_utils:readable_error_msg(Reason); - connected -> <<"connected">>; - connecting -> <<"connecting">>; - Error -> emqx_utils:readable_error_msg(Error) + #{status := connected} -> + {connected, <<"">>}; + #{status := Status, error := undefined} -> + {Status, <<"Unknown reason">>}; + #{status := Status, error := Error} -> + {Status, emqx_utils:readable_error_msg(Error)}; + undefined -> + {disconnected, <<"Pending installation">>} end, {ok, #{ type => Type, name => Name, raw_config => RawConf, resource_data => InstanceData, - status => DisplayBridgeV2Status + status => DisplayBridgeV2Status, + error => ErrorMsg }} end. @@ -526,10 +530,10 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> ConnectorId, ChannelTestId ), case HealthCheckResult of - {error, Reason} -> - {error, Reason}; - _ -> - ok + #{status := connected} -> + ok; + #{status := Status, error := Error} -> + {error, {Status, Error}} end end end, @@ -1032,6 +1036,7 @@ lookup_and_transform_to_bridge_v1_helper( BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2), BridgeV1 = maps:remove(status, BridgeV1Tmp), BridgeV2Status = maps:get(status, BridgeV2, undefined), + BridgeV2Error = maps:get(error, BridgeV2, undefined), ResourceData1 = maps:get(resource_data, BridgeV1, #{}), %% Replace id in resouce data BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>, @@ -1040,12 +1045,12 @@ lookup_and_transform_to_bridge_v1_helper( case ConnectorStatus of connected -> case BridgeV2Status of - <<"connected">> -> + connected -> %% No need to modify the status {ok, BridgeV1#{resource_data => ResourceData2}}; NotConnected -> - ResourceData3 = maps:put(status, connecting, ResourceData2), - ResourceData4 = maps:put(error, NotConnected, ResourceData3), + ResourceData3 = maps:put(status, NotConnected, ResourceData2), + ResourceData4 = maps:put(error, BridgeV2Error, ResourceData3), BridgeV1Final = maps:put(resource_data, ResourceData4, BridgeV1), {ok, BridgeV1Final} end; diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 6e15887c8..915691073 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -133,6 +133,7 @@ setup_mocks() -> ok. init_per_suite(Config) -> + snabbkaffe:fix_ct_logging(), Apps = emqx_cth_suite:start( app_specs(), #{work_dir => emqx_cth_suite:work_dir(Config)} @@ -238,12 +239,12 @@ t_create_dry_run_fail_add_channel(_) -> {error, Msg} end), Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), OnAddChannel2 = wrap_fun(fun() -> throw(Msg) end), Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), ok. t_create_dry_run_fail_get_channel_status(_) -> @@ -252,7 +253,7 @@ t_create_dry_run_fail_get_channel_status(_) -> {error, Msg} end), Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), Fun2 = wrap_fun(fun() -> throw(Msg) end), @@ -280,7 +281,9 @@ t_is_valid_bridge_v1(_) -> t_manual_health_check(_) -> {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), %% Run a health check for the bridge - connected = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := undefined, status := connected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -290,7 +293,9 @@ t_manual_health_check_exception(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := my_error, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -300,7 +305,9 @@ t_manual_health_check_exception_error(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := _, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -310,7 +317,9 @@ t_manual_health_check_error(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := my_error, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -484,6 +493,83 @@ t_send_message_unhealthy_connector(_) -> ets:delete(ResponseETS), ok. +t_connector_connected_to_connecting_to_connected_no_channel_restart(_) -> + ResponseETS = ets:new(response_ets, [public]), + ets:insert(ResponseETS, {on_start_value, conf}), + ets:insert(ResponseETS, {on_get_status_value, connected}), + OnStartFun = wrap_fun(fun(Conf) -> + case ets:lookup_element(ResponseETS, on_start_value, 2) of + conf -> + {ok, Conf}; + V -> + V + end + end), + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(ResponseETS, on_get_status_value, 2) + end), + OnAddChannelCntr = counters:new(1, []), + OnAddChannelFun = wrap_fun(fun(_InstId, ConnectorState, _ChannelId, _ChannelConfig) -> + counters:add(OnAddChannelCntr, 1, 1), + {ok, ConnectorState} + end), + ConConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"on_start_fun">> => OnStartFun, + <<"on_get_status_fun">> => OnGetStatusFun, + <<"on_add_channel_fun">> => OnAddChannelFun, + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConName = ?FUNCTION_NAME, + {ok, _} = emqx_connector:create(con_type(), ConName, ConConfig), + BridgeConf = (bridge_config())#{ + <<"connector">> => atom_to_binary(ConName) + }, + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf), + %% Wait until on_add_channel_fun is called at least once + wait_until(fun() -> + counters:get(OnAddChannelCntr, 1) =:= 1 + end), + 1 = counters:get(OnAddChannelCntr, 1), + %% We change the status of the connector + ets:insert(ResponseETS, {on_get_status_value, connecting}), + %% Wait until the status is changed + wait_until(fun() -> + {ok, BridgeData} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData) =:= connecting + end), + {ok, BridgeData1} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + ct:pal("Bridge V2 status changed to: ~p", [maps:get(status, BridgeData1)]), + %% We change the status again back to connected + ets:insert(ResponseETS, {on_get_status_value, connected}), + %% Wait until the status is connected again + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= connected + end), + %% On add channel should not have been called again + 1 = counters:get(OnAddChannelCntr, 1), + %% We change the status to an error + ets:insert(ResponseETS, {on_get_status_value, {error, my_error}}), + %% Wait until the status is changed + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= disconnected + end), + %% Now we go back to connected + ets:insert(ResponseETS, {on_get_status_value, connected}), + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= connected + end), + %% Now the channel should have been removed and added again + wait_until(fun() -> + counters:get(OnAddChannelCntr, 1) =:= 2 + end), + ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok = emqx_connector:remove(con_type(), ConName), + ets:delete(ResponseETS), + ok. + t_unhealthy_channel_alarm(_) -> Conf = (bridge_config())#{ <<"on_get_channel_status_fun">> => @@ -720,3 +806,20 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) -> end ), ok. + +%% Helper Functions + +wait_until(Fun) -> + wait_until(Fun, 5000). + +wait_until(Fun, Timeout) when Timeout >= 0 -> + case Fun() of + true -> + ok; + false -> + IdleTime = 100, + timer:sleep(IdleTime), + wait_until(Fun, Timeout - IdleTime) + end; +wait_until(_, _) -> + ct:fail("Wait until event did not happen"). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index a84d6b4b2..0138832a0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -54,6 +54,14 @@ on_add_channel( ) -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun(); +on_add_channel( + InstId, + #{on_add_channel_fun := FunRef} = ConnectorState, + ChannelId, + ChannelConfig +) -> + Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), + Fun(InstId, ConnectorState, ChannelId, ChannelConfig); on_add_channel( _InstId, State, @@ -118,8 +126,8 @@ on_get_channel_status( ChannelId, State ) -> - Channels = maps:get(channels, State), - ChannelState = maps:get(ChannelId, Channels), + Channels = maps:get(channels, State, #{}), + ChannelState = maps:get(ChannelId, Channels, #{}), case ChannelState of #{on_get_channel_status_fun := FunRef} -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index a67677478..155a07593 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -200,6 +200,7 @@ -callback on_get_channel_status(resource_id(), channel_id(), resource_state()) -> channel_status() + | {channel_status(), Reason :: term()} | {error, term()}. -callback query_mode(Config :: term()) -> query_mode(). @@ -457,7 +458,7 @@ health_check(ResId) -> emqx_resource_manager:health_check(ResId). -spec channel_health_check(resource_id(), channel_id()) -> - {ok, resource_status()} | {error, term()}. + #{status := channel_status(), error := term(), any() := any()}. channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). @@ -534,6 +535,7 @@ call_health_check(ResId, Mod, ResourceState) -> -spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) -> channel_status() + | {channel_status(), Reason :: term()} | {error, term()}. call_channel_health_check(ResId, ChannelId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 13d9ad2de..060a97f83 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1112,11 +1112,21 @@ is_channel_id(Id) -> %% There is no need to query the conncector if the channel is not %% installed as the query will fail anyway. pre_query_channel_check({Id, _} = _Request, Channels) when - is_map_key(Id, Channels), - (map_get(Id, Channels) =:= connected orelse map_get(Id, Channels) =:= connecting) + is_map_key(Id, Channels) -> - ok; + ChannelStatus = maps:get(Id, Channels), + case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of + true -> + ok; + false -> + maybe_throw_channel_not_installed(Id) + end; pre_query_channel_check({Id, _} = _Request, _Channels) -> + maybe_throw_channel_not_installed(Id); +pre_query_channel_check(_Request, _Channels) -> + ok. + +maybe_throw_channel_not_installed(Id) -> %% Fail with a recoverable error if the channel is not installed %% so that the operation can be retried. It is emqx_resource_manager's %% responsibility to ensure that the channel installation is retried. @@ -1128,9 +1138,7 @@ pre_query_channel_check({Id, _} = _Request, _Channels) -> ); false -> ok - end; -pre_query_channel_check(_Request, _Channels) -> - ok. + end. do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 4b1c9e4a4..f4c11fd04 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -44,7 +44,8 @@ list_group/1, lookup_cached/1, get_metrics/1, - reset_metrics/1 + reset_metrics/1, + channel_status_is_channel_added/1 ]). -export([ @@ -306,7 +307,7 @@ health_check(ResId) -> safe_call(ResId, health_check, ?T_OPERATION). -spec channel_health_check(resource_id(), channel_id()) -> - {ok, resource_status()} | {error, term()}. + #{status := channel_status(), error := term(), any() := any()}. channel_health_check(ResId, ChannelId) -> %% Do normal health check first to trigger health checks for channels %% and update the cached health status for the channels @@ -420,6 +421,10 @@ handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> handle_connecting_health_check(Data); +handle_event( + {call, From}, {remove_channel, ChannelId}, connecting = _State, Data +) -> + handle_remove_channel(From, ChannelId, Data); %% State: CONNECTED %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks @@ -459,7 +464,7 @@ handle_event( handle_event( {call, From}, {remove_channel, ChannelId}, _State, Data ) -> - handle_not_connected_remove_channel(From, ChannelId, Data); + handle_not_connected_or_connecting_remove_channel(From, ChannelId, Data); handle_event( {call, From}, get_channels, _State, Data ) -> @@ -570,7 +575,7 @@ add_channels(Data) -> Channels = Data#data.added_channels, NewChannels = lists:foldl( fun({ChannelID, _Conf}, Acc) -> - maps:put(ChannelID, {error, connecting}, Acc) + maps:put(ChannelID, channel_status_new(), Acc) end, Channels, ChannelIDConfigTuples @@ -589,7 +594,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> AddedChannelsMap = Data#data.added_channels, %% Set the channel status to connecting to indicate that %% we have not yet performed the initial health_check - NewAddedChannelsMap = maps:put(ChannelID, connecting, AddedChannelsMap), + NewAddedChannelsMap = maps:put( + ChannelID, + channel_status_new_waiting_for_health_check(), + AddedChannelsMap + ), NewData = Data#data{ state = NewState, added_channels = NewAddedChannelsMap @@ -603,7 +612,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> reason => Reason }), AddedChannelsMap = Data#data.added_channels, - NewAddedChannelsMap = maps:put(ChannelID, Error, AddedChannelsMap), + NewAddedChannelsMap = maps:put( + ChannelID, + channel_status_new(Error), + AddedChannelsMap + ), NewData = Data#data{ added_channels = NewAddedChannelsMap }, @@ -680,65 +693,45 @@ make_test_id() -> RandId = iolist_to_binary(emqx_utils:gen_id(16)), <>. -handle_add_channel(From, Data, ChannelId, ChannelConfig) -> +handle_add_channel(From, Data, ChannelId, Config) -> Channels = Data#data.added_channels, - case maps:get(ChannelId, Channels, {error, not_added}) of - {error, _Reason} -> + case + channel_status_is_channel_added( + maps:get( + ChannelId, + Channels, + channel_status_new() + ) + ) + of + false -> %% The channel is not installed in the connector state - %% We need to install it - handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig); - _ -> + %% We need insert it into the channels map and let the health check + %% take care of the rest + NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels), + NewData = Data#data{added_channels = NewChannels}, + {keep_state, update_state(NewData, Data), [ + {reply, From, ok}, {state_timeout, 0, health_check} + ]}; + %%handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig); + true -> %% The channel is already installed in the connector state %% We don't need to install it again {keep_state_and_data, [{reply, From, ok}]} end. -handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) -> - NewData = add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig), - %% Trigger a health check to raise alarm if channel is not healthy - {keep_state, NewData, [{reply, From, ok}, {state_timeout, 0, health_check}]}. - -add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) -> - case - emqx_resource:call_add_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelId, ChannelConfig - ) - of - {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, - %% Setting channel status to connecting to indicate that an health check - %% has not been performed yet - NewAddedChannelsMap = maps:put(ChannelId, connecting, AddedChannelsMap), - UpdatedData = Data#data{ - state = NewState, - added_channels = NewAddedChannelsMap - }, - update_state(UpdatedData, Data); - {error, _Reason} = Error -> - ChannelsMap = Data#data.added_channels, - NewChannelsMap = maps:put(ChannelId, Error, ChannelsMap), - UpdatedData = Data#data{ - added_channels = NewChannelsMap - }, - update_state(UpdatedData, Data) - end. - handle_not_connected_add_channel(From, ChannelId, State, Data) -> %% When state is not connected the channel will be added to the channels %% map but nothing else will happen. - Channels = Data#data.added_channels, - NewChannels = maps:put(ChannelId, {error, resource_not_operational}, Channels), - NewData1 = Data#data{added_channels = NewChannels}, - %% Do channel health check to trigger alarm - NewData2 = channels_health_check(State, NewData1), - {keep_state, update_state(NewData2, Data), [{reply, From, ok}]}. + NewData = add_channel_status_if_not_exists(Data, ChannelId, State), + {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, %% Deactivate alarm _ = maybe_clear_alarm(ChannelId), - case maps:get(ChannelId, Channels, {error, not_added}) of - {error, _} -> + case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status_new())) of + false -> %% The channel is already not installed in the connector state. %% We still need to remove it from the added_channels map AddedChannels = Data#data.added_channels, @@ -747,7 +740,7 @@ handle_remove_channel(From, ChannelId, Data) -> added_channels = NewAddedChannels }, {keep_state, NewData, [{reply, From, ok}]}; - _ -> + true -> %% The channel is installed in the connector state handle_remove_channel_exists(From, ChannelId, Data) end. @@ -777,9 +770,9 @@ handle_remove_channel_exists(From, ChannelId, Data) -> {keep_state_and_data, [{reply, From, Error}]} end. -handle_not_connected_remove_channel(From, ChannelId, Data) -> - %% When state is not connected the channel will be removed from the channels - %% map but nothing else will happen. +handle_not_connected_or_connecting_remove_channel(From, ChannelId, Data) -> + %% When state is not connected or connecting the channel will be removed + %% from the channels map but nothing else will happen. Channels = Data#data.added_channels, NewChannels = maps:remove(ChannelId, Channels), NewData = Data#data{added_channels = NewChannels}, @@ -796,7 +789,7 @@ handle_manually_health_check(From, Data) -> ). handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) -> - {keep_state_and_data, [{reply, From, {ok, disconnected}}]}; + {keep_state_and_data, [{reply, From, channel_status_new({error, resource_disconnected})}]}; handle_manually_channel_health_check( From, #data{added_channels = Channels} = _Data, @@ -810,10 +803,11 @@ handle_manually_channel_health_check( _Data, _ChannelId ) -> - {keep_state_and_data, [{reply, From, {error, channel_not_found}}]}. + {keep_state_and_data, [{reply, From, channel_status_new({error, channel_not_found})}]}. get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) -> - emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State). + RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), + channel_status_new(RawStatus). handle_connecting_health_check(Data) -> with_health_check( @@ -833,9 +827,9 @@ handle_connected_health_check(Data) -> with_health_check( Data, fun - (connected, UpdatedData) -> - {keep_state, channels_health_check(connected, UpdatedData), - health_check_actions(UpdatedData)}; + (connected, UpdatedData0) -> + UpdatedData1 = channels_health_check(connected, UpdatedData0), + {keep_state, UpdatedData1, health_check_actions(UpdatedData1)}; (Status, UpdatedData) -> ?SLOG(warning, #{ msg => "health_check_failed", @@ -861,20 +855,59 @@ with_health_check(#data{error = PrevError} = Data, Func) -> channels_health_check(connected = _ResourceStatus, Data0) -> Channels = maps:to_list(Data0#data.added_channels), - %% All channels with an error status are considered not added + %% All channels with a stutus different from connected or connecting are + %% not added ChannelsNotAdded = [ ChannelId || {ChannelId, Status} <- Channels, - not is_channel_added(Status) + not channel_status_is_channel_added(Status) ], %% Attempt to add channels that are not added - ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded), + ChannelsNotAddedWithConfigs = + get_config_for_channels(Data0, ChannelsNotAdded), Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0), %% Now that we have done the adding, we can get the status of all channels Data2 = channel_status_for_all_channels(Data1), update_state(Data2, Data0); +channels_health_check(connecting, Data0) -> + %% Whenever the resource is connecting: + %% 1. Change the status of all added channels to connecting + %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) + Channels = Data0#data.added_channels, + ChannelsToChangeStatusFor = [ + ChannelId + || {ChannelId, Status} <- maps:to_list(Channels), + channel_status_is_channel_added(Status) + ], + ChannelsWithNewStatuses = + [ + {ChannelId, channel_status_new({connecting, resource_is_connecting})} + || ChannelId <- ChannelsToChangeStatusFor + ], + %% Update the channels map + NewChannels = lists:foldl( + fun({ChannelId, NewStatus}, Acc) -> + maps:update(ChannelId, NewStatus, Acc) + end, + Channels, + ChannelsWithNewStatuses + ), + ChannelsWithNewAndPrevErrorStatuses = + [ + {ChannelId, NewStatus, maps:get(ChannelId, Channels)} + || {ChannelId, NewStatus} <- maps:to_list(NewChannels) + ], + %% Raise alarms for all channels + lists:foreach( + fun({ChannelId, Status, PrevStatus}) -> + maybe_alarm(connecting, ChannelId, Status, PrevStatus) + end, + ChannelsWithNewAndPrevErrorStatuses + ), + Data1 = Data0#data{added_channels = NewChannels}, + update_state(Data1, Data0); channels_health_check(ResourceStatus, Data0) -> - %% Whenever the resource is not connected: + %% Whenever the resource is not connected and not connecting: %% 1. Remove all added channels %% 2. Change the status to an error status %% 3. Raise alarms @@ -882,13 +915,20 @@ channels_health_check(ResourceStatus, Data0) -> ChannelsToRemove = [ ChannelId || {ChannelId, Status} <- maps:to_list(Channels), - is_channel_added(Status) + channel_status_is_channel_added(Status) ], Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true), ChannelsWithNewAndOldStatuses = [ {ChannelId, OldStatus, - {error, resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1)}} + channel_status_new( + {error, + resource_not_connected_channel_error_msg( + ResourceStatus, + ChannelId, + Data1 + )} + )} || {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels) ], %% Raise alarms @@ -928,18 +968,19 @@ channel_status_for_all_channels(Data) -> AddedChannelsWithOldAndNewStatus = [ {ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)} || {ChannelId, OldStatus} <- Channels, - is_channel_added(OldStatus) + channel_status_is_channel_added(OldStatus) ], - %% Remove the added channels with a new error statuses + %% Remove the added channels with a a status different from connected or connecting ChannelsToRemove = [ ChannelId - || {ChannelId, _, {error, _}} <- AddedChannelsWithOldAndNewStatus + || {ChannelId, _, NewStatus} <- AddedChannelsWithOldAndNewStatus, + not channel_status_is_channel_added(NewStatus) ], Data1 = remove_channels_in_list(ChannelsToRemove, Data, true), %% Raise/clear alarms lists:foreach( fun - ({ID, _OldStatus, connected}) -> + ({ID, _OldStatus, #{status := connected}}) -> _ = maybe_clear_alarm(ID); ({ID, OldStatus, NewStatus}) -> _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) @@ -958,18 +999,14 @@ channel_status_for_all_channels(Data) -> ), Data1#data{added_channels = NewChannelsMap}. -is_channel_added({error, _}) -> - false; -is_channel_added(_) -> - true. - get_config_for_channels(Data0, ChannelsWithoutConfig) -> ResId = Data0#data.id, Mod = Data0#data.mod, Channels = emqx_resource:call_get_channels(ResId, Mod), ChannelIdToConfig = maps:from_list(Channels), + ChannelStatusMap = Data0#data.added_channels, ChannelsWithConfig = [ - {Id, maps:get(Id, ChannelIdToConfig, no_config)} + {Id, get_config_from_map_or_channel_status(Id, ChannelIdToConfig, ChannelStatusMap)} || Id <- ChannelsWithoutConfig ], %% Filter out channels without config @@ -979,6 +1016,16 @@ get_config_for_channels(Data0, ChannelsWithoutConfig) -> Conf =/= no_config ]. +get_config_from_map_or_channel_status(ChannelId, ChannelIdToConfig, ChannelStatusMap) -> + ChannelStatus = maps:get(ChannelId, ChannelStatusMap, #{}), + case maps:get(config, ChannelStatus, undefined) of + undefined -> + %% Channel config + maps:get(ChannelId, ChannelIdToConfig, no_config); + Config -> + Config + end. + update_state(Data) -> update_state(Data, undefined). @@ -1098,3 +1145,86 @@ safe_call(ResId, Message, Timeout) -> exit:{timeout, _} -> {error, timeout} end. + +%% Helper functions for chanel status data + +channel_status_new() -> + #{ + %% The status of the channel. Can be one of the following: + %% - disconnected: the channel is not added to the resource (error may contain the reason)) + %% - connecting: the channel has been added to the resource state but + %% either the resource status is connecting or the + %% on_channel_get_status callback has returned connecting + %% - connected: the channel is added to the resource, the resource is + %% connected and the on_channel_get_status callback has returned + %% connected. The error field should be undefined. + status => disconnected, + error => not_added_yet + }. + +%% If the channel is added with add_channel/2, the config field will be set to +%% the config. This is useful when doing probing since the config is not stored +%% anywhere else in that case. +channel_status_new_with_config(Config) -> + #{ + status => disconnected, + error => not_added_yet, + config => Config + }. + +channel_status_new_waiting_for_health_check() -> + #{ + status => connecting, + error => no_health_check_yet + }. + +channel_status_new({connecting, Error}) -> + #{ + status => connecting, + error => Error + }; +channel_status_new(connecting) -> + #{ + status => connecting, + error => <<"Not connected for unknown reason">> + }; +channel_status_new(connected) -> + #{ + status => connected, + error => undefined + }; +%% Probably not so useful but it is permitted to set an error even when the +%% status is connected +channel_status_new({connected, Error}) -> + #{ + status => connected, + error => Error + }; +channel_status_new({error, Reason}) -> + #{ + status => disconnected, + error => Reason + }. + +channel_status_is_channel_added(#{ + status := connected +}) -> + true; +channel_status_is_channel_added(#{ + status := connecting +}) -> + true; +channel_status_is_channel_added(_Status) -> + false. + +add_channel_status_if_not_exists(Data, ChannelId, State) -> + Channels = Data#data.added_channels, + case maps:is_key(ChannelId, Channels) of + true -> + Data; + false -> + ChannelStatus = channel_status_new({error, resource_not_operational}), + NewChannels = maps:put(ChannelId, ChannelStatus, Channels), + maybe_alarm(State, ChannelId, ChannelStatus, no_prev), + Data#data{added_channels = NewChannels} + end.