From 95f3b94ac32e7167f30558d9a36be6c3f3d60cdb Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 31 Oct 2023 19:31:16 +0100 Subject: [PATCH 1/5] fix(bridge_v2): channels should not be removed when status is connecting This fixes so that channels are not removed from the resource state when their status is connecting. This is needed for Kafka since Kafka's message buffer is stored in the resource state. Fixes: https://emqx.atlassian.net/browse/EMQX-11270 --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 33 +- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 117 +++++++- .../test/emqx_bridge_v2_test_connector.erl | 12 +- apps/emqx_resource/src/emqx_resource.erl | 4 +- .../src/emqx_resource_buffer_worker.erl | 20 +- .../src/emqx_resource_manager.erl | 282 +++++++++++++----- 6 files changed, 362 insertions(+), 106 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 3747a4671..ea47c30e8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -173,20 +173,24 @@ lookup(Type, Name) -> Channels = maps:get(added_channels, InstanceData, #{}), BridgeV2Id = id(Type, Name, BridgeConnector), ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), - DisplayBridgeV2Status = + {DisplayBridgeV2Status, ErrorMsg} = case ChannelStatus of - {error, undefined} -> <<"Unknown reason">>; - {error, Reason} -> emqx_utils:readable_error_msg(Reason); - connected -> <<"connected">>; - connecting -> <<"connecting">>; - Error -> emqx_utils:readable_error_msg(Error) + #{status := connected} -> + {connected, <<"">>}; + #{status := Status, error := undefined} -> + {Status, <<"Unknown reason">>}; + #{status := Status, error := Error} -> + {Status, emqx_utils:readable_error_msg(Error)}; + undefined -> + {disconnected, <<"Pending installation">>} end, {ok, #{ type => Type, name => Name, raw_config => RawConf, resource_data => InstanceData, - status => DisplayBridgeV2Status + status => DisplayBridgeV2Status, + error => ErrorMsg }} end. @@ -526,10 +530,10 @@ create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf) -> ConnectorId, ChannelTestId ), case HealthCheckResult of - {error, Reason} -> - {error, Reason}; - _ -> - ok + #{status := connected} -> + ok; + #{status := Status, error := Error} -> + {error, {Status, Error}} end end end, @@ -1032,6 +1036,7 @@ lookup_and_transform_to_bridge_v1_helper( BridgeV1Tmp = maps:put(raw_config, BridgeV1Config2, BridgeV2), BridgeV1 = maps:remove(status, BridgeV1Tmp), BridgeV2Status = maps:get(status, BridgeV2, undefined), + BridgeV2Error = maps:get(error, BridgeV2, undefined), ResourceData1 = maps:get(resource_data, BridgeV1, #{}), %% Replace id in resouce data BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>, @@ -1040,12 +1045,12 @@ lookup_and_transform_to_bridge_v1_helper( case ConnectorStatus of connected -> case BridgeV2Status of - <<"connected">> -> + connected -> %% No need to modify the status {ok, BridgeV1#{resource_data => ResourceData2}}; NotConnected -> - ResourceData3 = maps:put(status, connecting, ResourceData2), - ResourceData4 = maps:put(error, NotConnected, ResourceData3), + ResourceData3 = maps:put(status, NotConnected, ResourceData2), + ResourceData4 = maps:put(error, BridgeV2Error, ResourceData3), BridgeV1Final = maps:put(resource_data, ResourceData4, BridgeV1), {ok, BridgeV1Final} end; diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 6e15887c8..915691073 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -133,6 +133,7 @@ setup_mocks() -> ok. init_per_suite(Config) -> + snabbkaffe:fix_ct_logging(), Apps = emqx_cth_suite:start( app_specs(), #{work_dir => emqx_cth_suite:work_dir(Config)} @@ -238,12 +239,12 @@ t_create_dry_run_fail_add_channel(_) -> {error, Msg} end), Conf1 = (bridge_config())#{on_add_channel_fun => OnAddChannel1}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), OnAddChannel2 = wrap_fun(fun() -> throw(Msg) end), Conf2 = (bridge_config())#{on_add_channel_fun => OnAddChannel2}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf2), ok. t_create_dry_run_fail_get_channel_status(_) -> @@ -252,7 +253,7 @@ t_create_dry_run_fail_get_channel_status(_) -> {error, Msg} end), Conf1 = (bridge_config())#{on_get_channel_status_fun => Fun1}, - {error, Msg} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), + {error, _} = emqx_bridge_v2:create_dry_run(bridge_type(), Conf1), Fun2 = wrap_fun(fun() -> throw(Msg) end), @@ -280,7 +281,9 @@ t_is_valid_bridge_v1(_) -> t_manual_health_check(_) -> {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, bridge_config()), %% Run a health check for the bridge - connected = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := undefined, status := connected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -290,7 +293,9 @@ t_manual_health_check_exception(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := my_error, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -300,7 +305,9 @@ t_manual_health_check_exception_error(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, _} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := _, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -310,7 +317,9 @@ t_manual_health_check_error(_) -> }, {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf), %% Run a health check for the bridge - {error, my_error} = emqx_bridge_v2:health_check(bridge_type(), my_test_bridge), + #{error := my_error, status := disconnected} = emqx_bridge_v2:health_check( + bridge_type(), my_test_bridge + ), ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), ok. @@ -484,6 +493,83 @@ t_send_message_unhealthy_connector(_) -> ets:delete(ResponseETS), ok. +t_connector_connected_to_connecting_to_connected_no_channel_restart(_) -> + ResponseETS = ets:new(response_ets, [public]), + ets:insert(ResponseETS, {on_start_value, conf}), + ets:insert(ResponseETS, {on_get_status_value, connected}), + OnStartFun = wrap_fun(fun(Conf) -> + case ets:lookup_element(ResponseETS, on_start_value, 2) of + conf -> + {ok, Conf}; + V -> + V + end + end), + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(ResponseETS, on_get_status_value, 2) + end), + OnAddChannelCntr = counters:new(1, []), + OnAddChannelFun = wrap_fun(fun(_InstId, ConnectorState, _ChannelId, _ChannelConfig) -> + counters:add(OnAddChannelCntr, 1, 1), + {ok, ConnectorState} + end), + ConConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"on_start_fun">> => OnStartFun, + <<"on_get_status_fun">> => OnGetStatusFun, + <<"on_add_channel_fun">> => OnAddChannelFun, + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConName = ?FUNCTION_NAME, + {ok, _} = emqx_connector:create(con_type(), ConName, ConConfig), + BridgeConf = (bridge_config())#{ + <<"connector">> => atom_to_binary(ConName) + }, + {ok, _} = emqx_bridge_v2:create(bridge_type(), my_test_bridge, BridgeConf), + %% Wait until on_add_channel_fun is called at least once + wait_until(fun() -> + counters:get(OnAddChannelCntr, 1) =:= 1 + end), + 1 = counters:get(OnAddChannelCntr, 1), + %% We change the status of the connector + ets:insert(ResponseETS, {on_get_status_value, connecting}), + %% Wait until the status is changed + wait_until(fun() -> + {ok, BridgeData} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData) =:= connecting + end), + {ok, BridgeData1} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + ct:pal("Bridge V2 status changed to: ~p", [maps:get(status, BridgeData1)]), + %% We change the status again back to connected + ets:insert(ResponseETS, {on_get_status_value, connected}), + %% Wait until the status is connected again + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= connected + end), + %% On add channel should not have been called again + 1 = counters:get(OnAddChannelCntr, 1), + %% We change the status to an error + ets:insert(ResponseETS, {on_get_status_value, {error, my_error}}), + %% Wait until the status is changed + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= disconnected + end), + %% Now we go back to connected + ets:insert(ResponseETS, {on_get_status_value, connected}), + wait_until(fun() -> + {ok, BridgeData2} = emqx_bridge_v2:lookup(bridge_type(), my_test_bridge), + maps:get(status, BridgeData2) =:= connected + end), + %% Now the channel should have been removed and added again + wait_until(fun() -> + counters:get(OnAddChannelCntr, 1) =:= 2 + end), + ok = emqx_bridge_v2:remove(bridge_type(), my_test_bridge), + ok = emqx_connector:remove(con_type(), ConName), + ets:delete(ResponseETS), + ok. + t_unhealthy_channel_alarm(_) -> Conf = (bridge_config())#{ <<"on_get_channel_status_fun">> => @@ -720,3 +806,20 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) -> end ), ok. + +%% Helper Functions + +wait_until(Fun) -> + wait_until(Fun, 5000). + +wait_until(Fun, Timeout) when Timeout >= 0 -> + case Fun() of + true -> + ok; + false -> + IdleTime = 100, + timer:sleep(IdleTime), + wait_until(Fun, Timeout - IdleTime) + end; +wait_until(_, _) -> + ct:fail("Wait until event did not happen"). diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index a84d6b4b2..0138832a0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -54,6 +54,14 @@ on_add_channel( ) -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun(); +on_add_channel( + InstId, + #{on_add_channel_fun := FunRef} = ConnectorState, + ChannelId, + ChannelConfig +) -> + Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), + Fun(InstId, ConnectorState, ChannelId, ChannelConfig); on_add_channel( _InstId, State, @@ -118,8 +126,8 @@ on_get_channel_status( ChannelId, State ) -> - Channels = maps:get(channels, State), - ChannelState = maps:get(ChannelId, Channels), + Channels = maps:get(channels, State, #{}), + ChannelState = maps:get(ChannelId, Channels, #{}), case ChannelState of #{on_get_channel_status_fun := FunRef} -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index a67677478..155a07593 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -200,6 +200,7 @@ -callback on_get_channel_status(resource_id(), channel_id(), resource_state()) -> channel_status() + | {channel_status(), Reason :: term()} | {error, term()}. -callback query_mode(Config :: term()) -> query_mode(). @@ -457,7 +458,7 @@ health_check(ResId) -> emqx_resource_manager:health_check(ResId). -spec channel_health_check(resource_id(), channel_id()) -> - {ok, resource_status()} | {error, term()}. + #{status := channel_status(), error := term(), any() := any()}. channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). @@ -534,6 +535,7 @@ call_health_check(ResId, Mod, ResourceState) -> -spec call_channel_health_check(resource_id(), channel_id(), module(), resource_state()) -> channel_status() + | {channel_status(), Reason :: term()} | {error, term()}. call_channel_health_check(ResId, ChannelId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_channel_status(ResId, ChannelId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 13d9ad2de..060a97f83 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1112,11 +1112,21 @@ is_channel_id(Id) -> %% There is no need to query the conncector if the channel is not %% installed as the query will fail anyway. pre_query_channel_check({Id, _} = _Request, Channels) when - is_map_key(Id, Channels), - (map_get(Id, Channels) =:= connected orelse map_get(Id, Channels) =:= connecting) + is_map_key(Id, Channels) -> - ok; + ChannelStatus = maps:get(Id, Channels), + case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of + true -> + ok; + false -> + maybe_throw_channel_not_installed(Id) + end; pre_query_channel_check({Id, _} = _Request, _Channels) -> + maybe_throw_channel_not_installed(Id); +pre_query_channel_check(_Request, _Channels) -> + ok. + +maybe_throw_channel_not_installed(Id) -> %% Fail with a recoverable error if the channel is not installed %% so that the operation can be retried. It is emqx_resource_manager's %% responsibility to ensure that the channel installation is retried. @@ -1128,9 +1138,7 @@ pre_query_channel_check({Id, _} = _Request, _Channels) -> ); false -> ok - end; -pre_query_channel_check(_Request, _Channels) -> - ok. + end. do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when ResQM =:= simple_sync_internal_buffer; ResQM =:= simple_async_internal_buffer diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 4b1c9e4a4..f4c11fd04 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -44,7 +44,8 @@ list_group/1, lookup_cached/1, get_metrics/1, - reset_metrics/1 + reset_metrics/1, + channel_status_is_channel_added/1 ]). -export([ @@ -306,7 +307,7 @@ health_check(ResId) -> safe_call(ResId, health_check, ?T_OPERATION). -spec channel_health_check(resource_id(), channel_id()) -> - {ok, resource_status()} | {error, term()}. + #{status := channel_status(), error := term(), any() := any()}. channel_health_check(ResId, ChannelId) -> %% Do normal health check first to trigger health checks for channels %% and update the cached health status for the channels @@ -420,6 +421,10 @@ handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> handle_connecting_health_check(Data); +handle_event( + {call, From}, {remove_channel, ChannelId}, connecting = _State, Data +) -> + handle_remove_channel(From, ChannelId, Data); %% State: CONNECTED %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks @@ -459,7 +464,7 @@ handle_event( handle_event( {call, From}, {remove_channel, ChannelId}, _State, Data ) -> - handle_not_connected_remove_channel(From, ChannelId, Data); + handle_not_connected_or_connecting_remove_channel(From, ChannelId, Data); handle_event( {call, From}, get_channels, _State, Data ) -> @@ -570,7 +575,7 @@ add_channels(Data) -> Channels = Data#data.added_channels, NewChannels = lists:foldl( fun({ChannelID, _Conf}, Acc) -> - maps:put(ChannelID, {error, connecting}, Acc) + maps:put(ChannelID, channel_status_new(), Acc) end, Channels, ChannelIDConfigTuples @@ -589,7 +594,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> AddedChannelsMap = Data#data.added_channels, %% Set the channel status to connecting to indicate that %% we have not yet performed the initial health_check - NewAddedChannelsMap = maps:put(ChannelID, connecting, AddedChannelsMap), + NewAddedChannelsMap = maps:put( + ChannelID, + channel_status_new_waiting_for_health_check(), + AddedChannelsMap + ), NewData = Data#data{ state = NewState, added_channels = NewAddedChannelsMap @@ -603,7 +612,11 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> reason => Reason }), AddedChannelsMap = Data#data.added_channels, - NewAddedChannelsMap = maps:put(ChannelID, Error, AddedChannelsMap), + NewAddedChannelsMap = maps:put( + ChannelID, + channel_status_new(Error), + AddedChannelsMap + ), NewData = Data#data{ added_channels = NewAddedChannelsMap }, @@ -680,65 +693,45 @@ make_test_id() -> RandId = iolist_to_binary(emqx_utils:gen_id(16)), <>. -handle_add_channel(From, Data, ChannelId, ChannelConfig) -> +handle_add_channel(From, Data, ChannelId, Config) -> Channels = Data#data.added_channels, - case maps:get(ChannelId, Channels, {error, not_added}) of - {error, _Reason} -> + case + channel_status_is_channel_added( + maps:get( + ChannelId, + Channels, + channel_status_new() + ) + ) + of + false -> %% The channel is not installed in the connector state - %% We need to install it - handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig); - _ -> + %% We need insert it into the channels map and let the health check + %% take care of the rest + NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels), + NewData = Data#data{added_channels = NewChannels}, + {keep_state, update_state(NewData, Data), [ + {reply, From, ok}, {state_timeout, 0, health_check} + ]}; + %%handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig); + true -> %% The channel is already installed in the connector state %% We don't need to install it again {keep_state_and_data, [{reply, From, ok}]} end. -handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig) -> - NewData = add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig), - %% Trigger a health check to raise alarm if channel is not healthy - {keep_state, NewData, [{reply, From, ok}, {state_timeout, 0, health_check}]}. - -add_channel_need_insert_update_data(Data, ChannelId, ChannelConfig) -> - case - emqx_resource:call_add_channel( - Data#data.id, Data#data.mod, Data#data.state, ChannelId, ChannelConfig - ) - of - {ok, NewState} -> - AddedChannelsMap = Data#data.added_channels, - %% Setting channel status to connecting to indicate that an health check - %% has not been performed yet - NewAddedChannelsMap = maps:put(ChannelId, connecting, AddedChannelsMap), - UpdatedData = Data#data{ - state = NewState, - added_channels = NewAddedChannelsMap - }, - update_state(UpdatedData, Data); - {error, _Reason} = Error -> - ChannelsMap = Data#data.added_channels, - NewChannelsMap = maps:put(ChannelId, Error, ChannelsMap), - UpdatedData = Data#data{ - added_channels = NewChannelsMap - }, - update_state(UpdatedData, Data) - end. - handle_not_connected_add_channel(From, ChannelId, State, Data) -> %% When state is not connected the channel will be added to the channels %% map but nothing else will happen. - Channels = Data#data.added_channels, - NewChannels = maps:put(ChannelId, {error, resource_not_operational}, Channels), - NewData1 = Data#data{added_channels = NewChannels}, - %% Do channel health check to trigger alarm - NewData2 = channels_health_check(State, NewData1), - {keep_state, update_state(NewData2, Data), [{reply, From, ok}]}. + NewData = add_channel_status_if_not_exists(Data, ChannelId, State), + {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, %% Deactivate alarm _ = maybe_clear_alarm(ChannelId), - case maps:get(ChannelId, Channels, {error, not_added}) of - {error, _} -> + case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status_new())) of + false -> %% The channel is already not installed in the connector state. %% We still need to remove it from the added_channels map AddedChannels = Data#data.added_channels, @@ -747,7 +740,7 @@ handle_remove_channel(From, ChannelId, Data) -> added_channels = NewAddedChannels }, {keep_state, NewData, [{reply, From, ok}]}; - _ -> + true -> %% The channel is installed in the connector state handle_remove_channel_exists(From, ChannelId, Data) end. @@ -777,9 +770,9 @@ handle_remove_channel_exists(From, ChannelId, Data) -> {keep_state_and_data, [{reply, From, Error}]} end. -handle_not_connected_remove_channel(From, ChannelId, Data) -> - %% When state is not connected the channel will be removed from the channels - %% map but nothing else will happen. +handle_not_connected_or_connecting_remove_channel(From, ChannelId, Data) -> + %% When state is not connected or connecting the channel will be removed + %% from the channels map but nothing else will happen. Channels = Data#data.added_channels, NewChannels = maps:remove(ChannelId, Channels), NewData = Data#data{added_channels = NewChannels}, @@ -796,7 +789,7 @@ handle_manually_health_check(From, Data) -> ). handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) -> - {keep_state_and_data, [{reply, From, {ok, disconnected}}]}; + {keep_state_and_data, [{reply, From, channel_status_new({error, resource_disconnected})}]}; handle_manually_channel_health_check( From, #data{added_channels = Channels} = _Data, @@ -810,10 +803,11 @@ handle_manually_channel_health_check( _Data, _ChannelId ) -> - {keep_state_and_data, [{reply, From, {error, channel_not_found}}]}. + {keep_state_and_data, [{reply, From, channel_status_new({error, channel_not_found})}]}. get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) -> - emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State). + RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), + channel_status_new(RawStatus). handle_connecting_health_check(Data) -> with_health_check( @@ -833,9 +827,9 @@ handle_connected_health_check(Data) -> with_health_check( Data, fun - (connected, UpdatedData) -> - {keep_state, channels_health_check(connected, UpdatedData), - health_check_actions(UpdatedData)}; + (connected, UpdatedData0) -> + UpdatedData1 = channels_health_check(connected, UpdatedData0), + {keep_state, UpdatedData1, health_check_actions(UpdatedData1)}; (Status, UpdatedData) -> ?SLOG(warning, #{ msg => "health_check_failed", @@ -861,20 +855,59 @@ with_health_check(#data{error = PrevError} = Data, Func) -> channels_health_check(connected = _ResourceStatus, Data0) -> Channels = maps:to_list(Data0#data.added_channels), - %% All channels with an error status are considered not added + %% All channels with a stutus different from connected or connecting are + %% not added ChannelsNotAdded = [ ChannelId || {ChannelId, Status} <- Channels, - not is_channel_added(Status) + not channel_status_is_channel_added(Status) ], %% Attempt to add channels that are not added - ChannelsNotAddedWithConfigs = get_config_for_channels(Data0, ChannelsNotAdded), + ChannelsNotAddedWithConfigs = + get_config_for_channels(Data0, ChannelsNotAdded), Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0), %% Now that we have done the adding, we can get the status of all channels Data2 = channel_status_for_all_channels(Data1), update_state(Data2, Data0); +channels_health_check(connecting, Data0) -> + %% Whenever the resource is connecting: + %% 1. Change the status of all added channels to connecting + %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) + Channels = Data0#data.added_channels, + ChannelsToChangeStatusFor = [ + ChannelId + || {ChannelId, Status} <- maps:to_list(Channels), + channel_status_is_channel_added(Status) + ], + ChannelsWithNewStatuses = + [ + {ChannelId, channel_status_new({connecting, resource_is_connecting})} + || ChannelId <- ChannelsToChangeStatusFor + ], + %% Update the channels map + NewChannels = lists:foldl( + fun({ChannelId, NewStatus}, Acc) -> + maps:update(ChannelId, NewStatus, Acc) + end, + Channels, + ChannelsWithNewStatuses + ), + ChannelsWithNewAndPrevErrorStatuses = + [ + {ChannelId, NewStatus, maps:get(ChannelId, Channels)} + || {ChannelId, NewStatus} <- maps:to_list(NewChannels) + ], + %% Raise alarms for all channels + lists:foreach( + fun({ChannelId, Status, PrevStatus}) -> + maybe_alarm(connecting, ChannelId, Status, PrevStatus) + end, + ChannelsWithNewAndPrevErrorStatuses + ), + Data1 = Data0#data{added_channels = NewChannels}, + update_state(Data1, Data0); channels_health_check(ResourceStatus, Data0) -> - %% Whenever the resource is not connected: + %% Whenever the resource is not connected and not connecting: %% 1. Remove all added channels %% 2. Change the status to an error status %% 3. Raise alarms @@ -882,13 +915,20 @@ channels_health_check(ResourceStatus, Data0) -> ChannelsToRemove = [ ChannelId || {ChannelId, Status} <- maps:to_list(Channels), - is_channel_added(Status) + channel_status_is_channel_added(Status) ], Data1 = remove_channels_in_list(ChannelsToRemove, Data0, true), ChannelsWithNewAndOldStatuses = [ {ChannelId, OldStatus, - {error, resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1)}} + channel_status_new( + {error, + resource_not_connected_channel_error_msg( + ResourceStatus, + ChannelId, + Data1 + )} + )} || {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels) ], %% Raise alarms @@ -928,18 +968,19 @@ channel_status_for_all_channels(Data) -> AddedChannelsWithOldAndNewStatus = [ {ChannelId, OldStatus, get_channel_status_channel_added(Data, ChannelId)} || {ChannelId, OldStatus} <- Channels, - is_channel_added(OldStatus) + channel_status_is_channel_added(OldStatus) ], - %% Remove the added channels with a new error statuses + %% Remove the added channels with a a status different from connected or connecting ChannelsToRemove = [ ChannelId - || {ChannelId, _, {error, _}} <- AddedChannelsWithOldAndNewStatus + || {ChannelId, _, NewStatus} <- AddedChannelsWithOldAndNewStatus, + not channel_status_is_channel_added(NewStatus) ], Data1 = remove_channels_in_list(ChannelsToRemove, Data, true), %% Raise/clear alarms lists:foreach( fun - ({ID, _OldStatus, connected}) -> + ({ID, _OldStatus, #{status := connected}}) -> _ = maybe_clear_alarm(ID); ({ID, OldStatus, NewStatus}) -> _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) @@ -958,18 +999,14 @@ channel_status_for_all_channels(Data) -> ), Data1#data{added_channels = NewChannelsMap}. -is_channel_added({error, _}) -> - false; -is_channel_added(_) -> - true. - get_config_for_channels(Data0, ChannelsWithoutConfig) -> ResId = Data0#data.id, Mod = Data0#data.mod, Channels = emqx_resource:call_get_channels(ResId, Mod), ChannelIdToConfig = maps:from_list(Channels), + ChannelStatusMap = Data0#data.added_channels, ChannelsWithConfig = [ - {Id, maps:get(Id, ChannelIdToConfig, no_config)} + {Id, get_config_from_map_or_channel_status(Id, ChannelIdToConfig, ChannelStatusMap)} || Id <- ChannelsWithoutConfig ], %% Filter out channels without config @@ -979,6 +1016,16 @@ get_config_for_channels(Data0, ChannelsWithoutConfig) -> Conf =/= no_config ]. +get_config_from_map_or_channel_status(ChannelId, ChannelIdToConfig, ChannelStatusMap) -> + ChannelStatus = maps:get(ChannelId, ChannelStatusMap, #{}), + case maps:get(config, ChannelStatus, undefined) of + undefined -> + %% Channel config + maps:get(ChannelId, ChannelIdToConfig, no_config); + Config -> + Config + end. + update_state(Data) -> update_state(Data, undefined). @@ -1098,3 +1145,86 @@ safe_call(ResId, Message, Timeout) -> exit:{timeout, _} -> {error, timeout} end. + +%% Helper functions for chanel status data + +channel_status_new() -> + #{ + %% The status of the channel. Can be one of the following: + %% - disconnected: the channel is not added to the resource (error may contain the reason)) + %% - connecting: the channel has been added to the resource state but + %% either the resource status is connecting or the + %% on_channel_get_status callback has returned connecting + %% - connected: the channel is added to the resource, the resource is + %% connected and the on_channel_get_status callback has returned + %% connected. The error field should be undefined. + status => disconnected, + error => not_added_yet + }. + +%% If the channel is added with add_channel/2, the config field will be set to +%% the config. This is useful when doing probing since the config is not stored +%% anywhere else in that case. +channel_status_new_with_config(Config) -> + #{ + status => disconnected, + error => not_added_yet, + config => Config + }. + +channel_status_new_waiting_for_health_check() -> + #{ + status => connecting, + error => no_health_check_yet + }. + +channel_status_new({connecting, Error}) -> + #{ + status => connecting, + error => Error + }; +channel_status_new(connecting) -> + #{ + status => connecting, + error => <<"Not connected for unknown reason">> + }; +channel_status_new(connected) -> + #{ + status => connected, + error => undefined + }; +%% Probably not so useful but it is permitted to set an error even when the +%% status is connected +channel_status_new({connected, Error}) -> + #{ + status => connected, + error => Error + }; +channel_status_new({error, Reason}) -> + #{ + status => disconnected, + error => Reason + }. + +channel_status_is_channel_added(#{ + status := connected +}) -> + true; +channel_status_is_channel_added(#{ + status := connecting +}) -> + true; +channel_status_is_channel_added(_Status) -> + false. + +add_channel_status_if_not_exists(Data, ChannelId, State) -> + Channels = Data#data.added_channels, + case maps:is_key(ChannelId, Channels) of + true -> + Data; + false -> + ChannelStatus = channel_status_new({error, resource_not_operational}), + NewChannels = maps:put(ChannelId, ChannelStatus, Channels), + maybe_alarm(State, ChannelId, ChannelStatus, no_prev), + Data#data{added_channels = NewChannels} + end. From edb1d37e671f9d2f13282b7e4b84160ce50f35af Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 1 Nov 2023 06:30:45 +0100 Subject: [PATCH 2/5] chore(bridge_v2): make fixes thanks to PR comments from @thalesmg --- apps/emqx_resource/src/emqx_resource.erl | 2 +- apps/emqx_resource/src/emqx_resource_manager.erl | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 155a07593..cb3eb44fe 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -458,7 +458,7 @@ health_check(ResId) -> emqx_resource_manager:health_check(ResId). -spec channel_health_check(resource_id(), channel_id()) -> - #{status := channel_status(), error := term(), any() := any()}. + #{status := channel_status(), error := term(), any() => any()}. channel_health_check(ResId, ChannelId) -> emqx_resource_manager:channel_health_check(ResId, ChannelId). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index f4c11fd04..7848bd0a5 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -307,7 +307,7 @@ health_check(ResId) -> safe_call(ResId, health_check, ?T_OPERATION). -spec channel_health_check(resource_id(), channel_id()) -> - #{status := channel_status(), error := term(), any() := any()}. + #{status := channel_status(), error := term(), any() => any()}. channel_health_check(ResId, ChannelId) -> %% Do normal health check first to trigger health checks for channels %% and update the cached health status for the channels @@ -464,7 +464,7 @@ handle_event( handle_event( {call, From}, {remove_channel, ChannelId}, _State, Data ) -> - handle_not_connected_or_connecting_remove_channel(From, ChannelId, Data); + handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data); handle_event( {call, From}, get_channels, _State, Data ) -> @@ -706,14 +706,13 @@ handle_add_channel(From, Data, ChannelId, Config) -> of false -> %% The channel is not installed in the connector state - %% We need insert it into the channels map and let the health check + %% We insert it into the channels map and let the health check %% take care of the rest NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels), NewData = Data#data{added_channels = NewChannels}, {keep_state, update_state(NewData, Data), [ {reply, From, ok}, {state_timeout, 0, health_check} ]}; - %%handle_add_channel_need_insert(From, Data, ChannelId, Data, ChannelConfig); true -> %% The channel is already installed in the connector state %% We don't need to install it again @@ -770,9 +769,10 @@ handle_remove_channel_exists(From, ChannelId, Data) -> {keep_state_and_data, [{reply, From, Error}]} end. -handle_not_connected_or_connecting_remove_channel(From, ChannelId, Data) -> - %% When state is not connected or connecting the channel will be removed - %% from the channels map but nothing else will happen. +handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) -> + %% When state is not connected and not connecting the channel will be removed + %% from the channels map but nothing else will happen since the channel + %% is not addded/installed in the resource state. Channels = Data#data.added_channels, NewChannels = maps:remove(ChannelId, Channels), NewData = Data#data{added_channels = NewChannels}, From 96d6c6db49876d984ed7ec6d5a7fd299354bc48e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 1 Nov 2023 07:20:14 +0100 Subject: [PATCH 3/5] test(bridge_v2): emqx_bridge_v2_kafka_producer_SUITE fix after API change --- .../test/emqx_bridge_v2_kafka_producer_SUITE.erl | 2 +- apps/emqx_resource/src/emqx_resource_manager.erl | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index aabb4d46e..37c2e2325 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -119,7 +119,7 @@ t_health_check(_) -> ConnectorConfig = connector_config(), {ok, _} = emqx_connector:create(?TYPE, test_connector3, ConnectorConfig), {ok, _} = emqx_bridge_v2:create(?TYPE, test_bridge_v2, BridgeV2Config), - connected = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), + #{status := connected} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), ok = emqx_bridge_v2:remove(?TYPE, test_bridge_v2), %% Check behaviour when bridge does not exist {error, bridge_not_found} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 7848bd0a5..2eb8e666d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -315,7 +315,10 @@ channel_health_check(ResId, ChannelId) -> safe_call(ResId, {channel_health_check, ChannelId}, ?T_OPERATION). add_channel(ResId, ChannelId, Config) -> - safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION). + Result = safe_call(ResId, {add_channel, ChannelId, Config}, ?T_OPERATION), + %% Wait for health_check to finish + _ = health_check(ResId), + Result. remove_channel(ResId, ChannelId) -> safe_call(ResId, {remove_channel, ChannelId}, ?T_OPERATION). From b06d05eaac04c39e0036e4e42fb5924a20ca6344 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 1 Nov 2023 07:39:18 +0100 Subject: [PATCH 4/5] test(bridge_v2): fix test case after new API --- .../test/emqx_bridge_kafka_impl_producer_SUITE.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index ef9e6cdf6..9a28ed26a 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -570,7 +570,9 @@ t_nonexistent_topic(_Config) -> erlang:list_to_atom(Type), erlang:list_to_atom(Name), Conf ), % TODO: make sure the user facing APIs for Bridge V1 also get this error - {error, _} = emqx_bridge_v2:health_check(?BRIDGE_TYPE_V2, list_to_atom(Name)), + #{status := disconnected, error := #{error := unknown_kafka_topic}} = emqx_bridge_v2:health_check( + ?BRIDGE_TYPE_V2, list_to_atom(Name) + ), ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)), delete_all_bridges(), ok. From 357b664c8da626564623ff0bcddf8b16343b3370 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 1 Nov 2023 15:27:06 +0100 Subject: [PATCH 5/5] fix(bridge_v2): more fixes thanks to PR comments from @thalesmg --- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 1 - .../src/emqx_resource_manager.erl | 34 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 915691073..837425888 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -133,7 +133,6 @@ setup_mocks() -> ok. init_per_suite(Config) -> - snabbkaffe:fix_ct_logging(), Apps = emqx_cth_suite:start( app_specs(), #{work_dir => emqx_cth_suite:work_dir(Config)} diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 2eb8e666d..04d40d581 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -578,7 +578,7 @@ add_channels(Data) -> Channels = Data#data.added_channels, NewChannels = lists:foldl( fun({ChannelID, _Conf}, Acc) -> - maps:put(ChannelID, channel_status_new(), Acc) + maps:put(ChannelID, channel_status(), Acc) end, Channels, ChannelIDConfigTuples @@ -617,7 +617,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> AddedChannelsMap = Data#data.added_channels, NewAddedChannelsMap = maps:put( ChannelID, - channel_status_new(Error), + channel_status(Error), AddedChannelsMap ), NewData = Data#data{ @@ -703,7 +703,7 @@ handle_add_channel(From, Data, ChannelId, Config) -> maps:get( ChannelId, Channels, - channel_status_new() + channel_status() ) ) of @@ -714,7 +714,7 @@ handle_add_channel(From, Data, ChannelId, Config) -> NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels), NewData = Data#data{added_channels = NewChannels}, {keep_state, update_state(NewData, Data), [ - {reply, From, ok}, {state_timeout, 0, health_check} + {reply, From, ok} ]}; true -> %% The channel is already installed in the connector state @@ -732,7 +732,7 @@ handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, %% Deactivate alarm _ = maybe_clear_alarm(ChannelId), - case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status_new())) of + case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status())) of false -> %% The channel is already not installed in the connector state. %% We still need to remove it from the added_channels map @@ -792,7 +792,7 @@ handle_manually_health_check(From, Data) -> ). handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) -> - {keep_state_and_data, [{reply, From, channel_status_new({error, resource_disconnected})}]}; + {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]}; handle_manually_channel_health_check( From, #data{added_channels = Channels} = _Data, @@ -806,11 +806,11 @@ handle_manually_channel_health_check( _Data, _ChannelId ) -> - {keep_state_and_data, [{reply, From, channel_status_new({error, channel_not_found})}]}. + {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}. get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, ChannelId) -> RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), - channel_status_new(RawStatus). + channel_status(RawStatus). handle_connecting_health_check(Data) -> with_health_check( @@ -884,7 +884,7 @@ channels_health_check(connecting, Data0) -> ], ChannelsWithNewStatuses = [ - {ChannelId, channel_status_new({connecting, resource_is_connecting})} + {ChannelId, channel_status({connecting, resource_is_connecting})} || ChannelId <- ChannelsToChangeStatusFor ], %% Update the channels map @@ -924,7 +924,7 @@ channels_health_check(ResourceStatus, Data0) -> ChannelsWithNewAndOldStatuses = [ {ChannelId, OldStatus, - channel_status_new( + channel_status( {error, resource_not_connected_channel_error_msg( ResourceStatus, @@ -1151,7 +1151,7 @@ safe_call(ResId, Message, Timeout) -> %% Helper functions for chanel status data -channel_status_new() -> +channel_status() -> #{ %% The status of the channel. Can be one of the following: %% - disconnected: the channel is not added to the resource (error may contain the reason)) @@ -1181,29 +1181,29 @@ channel_status_new_waiting_for_health_check() -> error => no_health_check_yet }. -channel_status_new({connecting, Error}) -> +channel_status({connecting, Error}) -> #{ status => connecting, error => Error }; -channel_status_new(connecting) -> +channel_status(connecting) -> #{ status => connecting, error => <<"Not connected for unknown reason">> }; -channel_status_new(connected) -> +channel_status(connected) -> #{ status => connected, error => undefined }; %% Probably not so useful but it is permitted to set an error even when the %% status is connected -channel_status_new({connected, Error}) -> +channel_status({connected, Error}) -> #{ status => connected, error => Error }; -channel_status_new({error, Reason}) -> +channel_status({error, Reason}) -> #{ status => disconnected, error => Reason @@ -1226,7 +1226,7 @@ add_channel_status_if_not_exists(Data, ChannelId, State) -> true -> Data; false -> - ChannelStatus = channel_status_new({error, resource_not_operational}), + ChannelStatus = channel_status({error, resource_not_operational}), NewChannels = maps:put(ChannelId, ChannelStatus, Channels), maybe_alarm(State, ChannelId, ChannelStatus, no_prev), Data#data{added_channels = NewChannels}