From 917474f69438f58973337c8c3ffce92f2691e20f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 20 May 2024 16:03:14 +0200 Subject: [PATCH 1/6] fix: action config update would sometimes not be reflected in connector Before this commit the following happened sometimes: 1. action status is connected 2. action config is updated to something that should change the status to disconnected 3. action status is still connected and the old config is being used by the connector even though the config has been correctly updated. The reason for this bug is that the post_config_hook runs before the global EMQX config is updated. The post config hook is adding the new config to the connector. Since the new config causes the action to get status disconnected, the adding of the action to the connector is retried when the health check runs but this time the config will be loaded from the global config which could cause the old config to be loaded (this happens when the global config has not had time to get updated). The above problem has been fixed in this commit by only reading action configs from the global config when the connector starts/restarts and instead store the latest configs for the actions in the connector. Fixes: https://emqx.atlassian.net/browse/EMQX-12376 --- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 54 ++++++ .../src/emqx_resource_manager.erl | 159 ++++++++++-------- 2 files changed, 147 insertions(+), 66 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index d6199f3a3..dc2e8f275 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -788,6 +788,60 @@ t_update_connector_not_found(_Config) -> ), ok. +%% Check that https://emqx.atlassian.net/browse/EMQX-12376 is fixed +t_update_concurrent_health_check(_Config) -> + Msg = <<"Channel status check failed">>, + ok = meck:expect( + emqx_bridge_v2_test_connector, + on_get_channel_status, + fun( + _ResId, + ChannelId, + #{channels := Channels} + ) -> + #{ + is_conf_for_connected := Connected + } = maps:get(ChannelId, Channels), + case Connected of + true -> + connected; + false -> + {error, Msg} + end + end + ), + BaseConf = (bridge_config())#{ + is_conf_for_connected => false + }, + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, BaseConf)), + SetStatusConnected = + fun + (true) -> + Conf = BaseConf#{is_conf_for_connected => true}, + %% Update the config + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)), + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check(bridge_type(), my_test_bridge) + ); + (false) -> + Conf = BaseConf#{is_conf_for_connected => false}, + %% Update the config + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)), + ?assertMatch( + #{status := disconnected}, + emqx_bridge_v2:health_check(bridge_type(), my_test_bridge) + ) + end, + [ + begin + Connected = (N rem 2) =:= 0, + SetStatusConnected(Connected) + end + || N <- lists:seq(0, 20) + ], + ok. + t_remove_single_connector_being_referenced_with_active_channels(_Config) -> %% we test the connector post config update here because we also need bridges. Conf = bridge_config(), diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 1c0c3f4ca..1efd88b24 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -542,9 +542,9 @@ handle_event(enter, _OldState, ?state_stopped = State, Data) -> {keep_state_and_data, []}; %% The following events can be handled in any other state handle_event( - {call, From}, {add_channel, ChannelId, _Config}, State, Data + {call, From}, {add_channel, ChannelId, Config}, State, Data ) -> - handle_not_connected_add_channel(From, ChannelId, State, Data); + handle_not_connected_add_channel(From, ChannelId, Config, State, Data); handle_event( {call, From}, {remove_channel, ChannelId}, _State, Data ) -> @@ -678,8 +678,8 @@ add_channels(Data) -> Channels = Data#data.added_channels, NewChannels = lists:foldl( fun - ({ChannelID, #{enable := true}}, Acc) -> - maps:put(ChannelID, channel_status(), Acc); + ({ChannelID, #{enable := true} = Config}, Acc) -> + maps:put(ChannelID, channel_status_not_added(Config), Acc); ({_, #{enable := false}}, Acc) -> Acc end, @@ -702,7 +702,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> %% we have not yet performed the initial health_check NewAddedChannelsMap = maps:put( ChannelID, - channel_status_new_waiting_for_health_check(), + channel_status_new_waiting_for_health_check(ChannelConfig), AddedChannelsMap ), NewData = Data#data{ @@ -720,7 +720,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> AddedChannelsMap = Data#data.added_channels, NewAddedChannelsMap = maps:put( ChannelID, - channel_status(Error), + channel_status(Error, ChannelConfig), AddedChannelsMap ), NewData = Data#data{ @@ -835,7 +835,7 @@ handle_add_channel(From, Data, ChannelId, Config) -> maps:get( ChannelId, Channels, - channel_status() + channel_status_not_added(Config) ) ) of @@ -843,7 +843,7 @@ handle_add_channel(From, Data, ChannelId, Config) -> %% The channel is not installed in the connector state %% We insert it into the channels map and let the health check %% take care of the rest - NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels), + NewChannels = maps:put(ChannelId, channel_status_not_added(Config), Channels), NewData = Data#data{added_channels = NewChannels}, {keep_state, update_state(NewData, Data), [ {reply, From, ok} @@ -854,17 +854,21 @@ handle_add_channel(From, Data, ChannelId, Config) -> {keep_state_and_data, [{reply, From, ok}]} end. -handle_not_connected_add_channel(From, ChannelId, State, Data) -> +handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) -> %% When state is not connected the channel will be added to the channels %% map but nothing else will happen. - NewData = add_channel_status_if_not_exists(Data, ChannelId, State), + NewData = add_or_update_channel_status(Data, ChannelId, ChannelConfig, State), {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, %% Deactivate alarm _ = maybe_clear_alarm(ChannelId), - case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status())) of + case + channel_status_is_channel_added( + maps:get(ChannelId, Channels, channel_status_not_added(undefined)) + ) + of false -> %% The channel is already not installed in the connector state. %% We still need to remove it from the added_channels map @@ -1033,7 +1037,9 @@ continue_resource_health_check_not_connected(NewStatus, Data0) -> end. handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) -> - {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]}; + {keep_state_and_data, [ + {reply, From, channel_status({error, resource_disconnected}, undefined)} + ]}; handle_manual_channel_health_check( From, #data{ @@ -1072,7 +1078,7 @@ handle_manual_channel_health_check( _Data, _ChannelId ) -> - {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}. + {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found}, undefined)}]}. -spec channels_health_check(resource_status(), data()) -> data(). channels_health_check(?status_connected = _ConnectorStatus, Data0) -> @@ -1097,14 +1103,14 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) Channels = Data0#data.added_channels, ChannelsToChangeStatusFor = [ - ChannelId - || {ChannelId, Status} <- maps:to_list(Channels), + {ChannelId, Config} + || {ChannelId, #{config := Config} = Status} <- maps:to_list(Channels), channel_status_is_channel_added(Status) ], ChannelsWithNewStatuses = [ - {ChannelId, channel_status({?status_connecting, resource_is_connecting})} - || ChannelId <- ChannelsToChangeStatusFor + {ChannelId, channel_status({?status_connecting, resource_is_connecting}, Config)} + || {ChannelId, Config} <- ChannelsToChangeStatusFor ], %% Update the channels map NewChannels = lists:foldl( @@ -1149,9 +1155,10 @@ channels_health_check(ConnectorStatus, Data0) -> ConnectorStatus, ChannelId, Data1 - )} + )}, + Config )} - || {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels) + || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels) ], %% Raise alarms _ = lists:foreach( @@ -1224,7 +1231,7 @@ continue_channel_health_check_connected(ChannelId, OldStatus, Data0) -> #{channel := CHCWorkers0} = HCWorkers0, CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0), Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, - %% Remove the added channels with a a status different from connected or connecting + %% Remove the added channels with a status different from connected or connecting NewStatus = maps:get(ChannelId, Data0#data.added_channels), ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], Data = remove_channels_in_list(ChannelsToRemove, Data1, true), @@ -1253,9 +1260,11 @@ spawn_channel_health_check_worker(#data{} = Data, ChannelId) -> %% separated so it can be spec'ed and placate dialyzer tantrums... -spec worker_channel_health_check(data(), channel_id()) -> no_return(). worker_channel_health_check(Data, ChannelId) -> - #data{id = ResId, mod = Mod, state = State} = Data, + #data{id = ResId, mod = Mod, state = State, added_channels = Channels} = Data, + ChannelStatus = maps:get(ChannelId, Channels, #{}), + ChannelConfig = maps:get(config, ChannelStatus, undefined), RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), - exit({ok, channel_status(RawStatus)}). + exit({ok, channel_status(RawStatus, ChannelConfig)}). -spec handle_channel_health_check_worker_down( data(), {pid(), reference()}, {ok, channel_status_map()} @@ -1267,11 +1276,15 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> added_channels = AddedChannels0 } = Data0, {ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0), - case ExitResult of - {ok, NewStatus} -> - %% `emqx_resource:call_channel_health_check' catches all exceptions. - AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0) - end, + %% The channel might have got removed while the health check was going on + CurrentStatus = maps:get(ChannelId, AddedChannels0, channel_not_added), + {AddedChannels, NewStatus} = + handle_channel_health_check_worker_down_new_channels_and_status( + ChannelId, + ExitResult, + CurrentStatus, + AddedChannels0 + ), #{ongoing := Ongoing0} = CHCWorkers1, {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0), CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1}, @@ -1293,6 +1306,26 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> {keep_state, update_state(Data, Data0), Replies} end. +handle_channel_health_check_worker_down_new_channels_and_status( + ChannelId, + {ok, #{config := CheckedConfig} = NewStatus} = _ExitResult, + #{config := CurrentConfig} = _CurrentStatus, + AddedChannels +) when CheckedConfig =:= CurrentConfig -> + %% Checked config is the same as the current config so we can update the + %% status in AddedChannels + {maps:put(ChannelId, NewStatus, AddedChannels), NewStatus}; +handle_channel_health_check_worker_down_new_channels_and_status( + _ChannelId, + {ok, NewStatus} = _ExitResult, + _CurrentStatus, + AddedChannels +) -> + %% The checked config is different from the current config which means we + %% should not update AddedChannels because the channel has been removed or + %% updated while the health check was in progress + {AddedChannels, NewStatus}. + reply_pending_channel_health_check_callers( ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0} ) -> @@ -1469,7 +1502,7 @@ safe_call(ResId, Message, Timeout) -> %% Helper functions for chanel status data -channel_status() -> +channel_status_not_added(ChannelConfig) -> #{ %% The status of the channel. Can be one of the following: %% - disconnected: the channel is not added to the resource (error may contain the reason)) @@ -1479,62 +1512,61 @@ channel_status() -> %% - connected: the channel is added to the resource, the resource is %% connected and the on_channel_get_status callback has returned %% connected. The error field should be undefined. - status => ?status_disconnected, - error => not_added_yet - }. - -%% If the channel is added with add_channel/2, the config field will be set to -%% the config. This is useful when doing probing since the config is not stored -%% anywhere else in that case. -channel_status_new_with_config(Config) -> - #{ status => ?status_disconnected, error => not_added_yet, - config => Config + config => ChannelConfig }. -channel_status_new_waiting_for_health_check() -> +channel_status_new_waiting_for_health_check(ChannelConfig) -> #{ status => ?status_connecting, - error => no_health_check_yet + error => no_health_check_yet, + config => ChannelConfig }. -channel_status({?status_connecting, Error}) -> +channel_status({?status_connecting, Error}, ChannelConfig) -> #{ status => ?status_connecting, - error => Error + error => Error, + config => ChannelConfig }; -channel_status({?status_disconnected, Error}) -> +channel_status({?status_disconnected, Error}, ChannelConfig) -> #{ status => ?status_disconnected, - error => Error + error => Error, + config => ChannelConfig }; -channel_status(?status_disconnected) -> +channel_status(?status_disconnected, ChannelConfig) -> #{ status => ?status_disconnected, - error => <<"Disconnected for unknown reason">> + error => <<"Disconnected for unknown reason">>, + config => ChannelConfig }; -channel_status(?status_connecting) -> +channel_status(?status_connecting, ChannelConfig) -> #{ status => ?status_connecting, - error => <<"Not connected for unknown reason">> + error => <<"Not connected for unknown reason">>, + config => ChannelConfig }; -channel_status(?status_connected) -> +channel_status(?status_connected, ChannelConfig) -> #{ status => ?status_connected, - error => undefined + error => undefined, + config => ChannelConfig }; %% Probably not so useful but it is permitted to set an error even when the %% status is connected -channel_status({?status_connected, Error}) -> +channel_status({?status_connected, Error}, ChannelConfig) -> #{ status => ?status_connected, - error => Error + error => Error, + config => ChannelConfig }; -channel_status({error, Reason}) -> +channel_status({error, Reason}, ChannelConfig) -> #{ status => ?status_disconnected, - error => Reason + error => Reason, + config => ChannelConfig }. channel_status_is_channel_added(#{ @@ -1548,19 +1580,14 @@ channel_status_is_channel_added(#{ channel_status_is_channel_added(_Status) -> false. --spec add_channel_status_if_not_exists(data(), channel_id(), resource_state()) -> data(). -add_channel_status_if_not_exists(Data, ChannelId, State) -> +-spec add_or_update_channel_status(data(), channel_id(), map(), resource_state()) -> data(). +add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) -> Channels = Data#data.added_channels, - case maps:is_key(ChannelId, Channels) of - true -> - Data; - false -> - ChannelStatus = channel_status({error, resource_not_operational}), - NewChannels = maps:put(ChannelId, ChannelStatus, Channels), - ResStatus = state_to_status(State), - maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), - Data#data{added_channels = NewChannels} - end. + ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig), + NewChannels = maps:put(ChannelId, ChannelStatus, Channels), + ResStatus = state_to_status(State), + maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), + Data#data{added_channels = NewChannels}. state_to_status(?state_stopped) -> ?rm_status_stopped; state_to_status(?state_connected) -> ?status_connected; From 331f9a1b96db9a4534853fa8f60007f4faf19e24 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 20 May 2024 16:26:20 +0200 Subject: [PATCH 2/6] docs: add change log entry --- changes/ce/fix-13077.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-13077.en.md diff --git a/changes/ce/fix-13077.en.md b/changes/ce/fix-13077.en.md new file mode 100644 index 000000000..8082c6c40 --- /dev/null +++ b/changes/ce/fix-13077.en.md @@ -0,0 +1 @@ +Updates to action configurations would sometimes not take effect without disabling and enabling the action. This means that an action could sometimes run with the old (previous) configuration even though it would look like the action configuration has been updated successfully. From 39d758c4d6aacb6bb4a67c4f7f73211bdeeaf29a Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 21 May 2024 07:15:21 +0200 Subject: [PATCH 3/6] fix: do not return configs for channels from emqx_resource_manager --- .../src/emqx_resource_manager.erl | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 1efd88b24..8bf673b91 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -1038,7 +1038,8 @@ continue_resource_health_check_not_connected(NewStatus, Data0) -> handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) -> {keep_state_and_data, [ - {reply, From, channel_status({error, resource_disconnected}, undefined)} + {reply, From, + maps:remove(config, channel_status({error, resource_disconnected}, undefined))} ]}; handle_manual_channel_health_check( From, @@ -1072,13 +1073,15 @@ handle_manual_channel_health_check( is_map_key(ChannelId, Channels) -> %% No ongoing health check: reply with current status. - {keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]}; + {keep_state_and_data, [{reply, From, maps:remove(config, maps:get(ChannelId, Channels))}]}; handle_manual_channel_health_check( From, _Data, _ChannelId ) -> - {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found}, undefined)}]}. + {keep_state_and_data, [ + {reply, From, maps:remove(config, channel_status({error, channel_not_found}, undefined))} + ]}. -spec channels_health_check(resource_status(), data()) -> data(). channels_health_check(?status_connected = _ConnectorStatus, Data0) -> @@ -1327,8 +1330,9 @@ handle_channel_health_check_worker_down_new_channels_and_status( {AddedChannels, NewStatus}. reply_pending_channel_health_check_callers( - ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0} + ChannelId, Status0, Data0 = #data{hc_pending_callers = Pending0} ) -> + Status = maps:remove(config, Status0), #{channel := CPending0} = Pending0, Pending = maps:get(ChannelId, CPending0, []), Actions = [{reply, From, Status} || From <- Pending], @@ -1459,6 +1463,14 @@ maybe_reply(Actions, From, Reply) -> -spec data_record_to_external_map(data()) -> resource_data(). data_record_to_external_map(Data) -> + AddedChannelsList = maps:to_list(Data#data.added_channels), + AddedChannelsListWithoutConfigs = + [ + {ChanID, maps:remove(config, Status)} + || {ChanID, Status} <- AddedChannelsList + ], + AddedChannelsWithoutConfigs = + maps:from_list(AddedChannelsListWithoutConfigs), #{ id => Data#data.id, error => external_error(Data#data.error), @@ -1468,7 +1480,7 @@ data_record_to_external_map(Data) -> config => Data#data.config, status => Data#data.status, state => Data#data.state, - added_channels => Data#data.added_channels + added_channels => AddedChannelsWithoutConfigs }. -spec wait_for_ready(resource_id(), integer()) -> ok | timeout | {error, term()}. From cff8b97e8a56200deaddeed6ef0ff81a7a5c2857 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 22 May 2024 10:39:19 +0200 Subject: [PATCH 4/6] fix: handle channel updated during health check This commit fixes an issue found by CI test case emqx_bridge_influxdb_SUITE:t_start_stop and others. While the channel health check process is running, the channel could be removed or updated which could cause a crash in the resource manager or non up-to-date alarms being triggered. --- .../src/emqx_resource_manager.erl | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 8bf673b91..70dac17ae 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -1228,14 +1228,29 @@ trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0}) start_channel_health_check(Data1, ChannelId) end. --spec continue_channel_health_check_connected(channel_id(), channel_status_map(), data()) -> data(). -continue_channel_health_check_connected(ChannelId, OldStatus, Data0) -> +-spec continue_channel_health_check_connected( + channel_id(), channel_status_map(), channel_status_map(), data() +) -> data(). +continue_channel_health_check_connected(ChannelId, OldStatus, CurrentStatus, Data0) -> #data{hc_workers = HCWorkers0} = Data0, #{channel := CHCWorkers0} = HCWorkers0, CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0), Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, + case OldStatus =:= CurrentStatus of + true -> + continue_channel_health_check_connected_no_update_during_check( + ChannelId, OldStatus, Data1 + ); + false -> + %% Channel has been updated while the health check process was working so + %% we should not clear any alarm or remove the channel from the + %% connector + Data1 + end. + +continue_channel_health_check_connected_no_update_during_check(ChannelId, OldStatus, Data1) -> %% Remove the added channels with a status different from connected or connecting - NewStatus = maps:get(ChannelId, Data0#data.added_channels), + NewStatus = maps:get(ChannelId, Data1#data.added_channels), ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], Data = remove_channels_in_list(ChannelsToRemove, Data1, true), %% Raise/clear alarms @@ -1299,13 +1314,23 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> CHCWorkers = CHCWorkers3#{pending := Rest}, HCWorkers = HCWorkers0#{channel := CHCWorkers}, Data3 = Data2#data{hc_workers = HCWorkers}, - Data4 = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3), + Data4 = continue_channel_health_check_connected( + ChannelId, + PreviousChanStatus, + CurrentStatus, + Data3 + ), Data = start_channel_health_check(Data4, NextChannelId), {keep_state, update_state(Data, Data0), Replies}; #{pending := []} -> HCWorkers = HCWorkers0#{channel := CHCWorkers3}, Data3 = Data2#data{hc_workers = HCWorkers}, - Data = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3), + Data = continue_channel_health_check_connected( + ChannelId, + PreviousChanStatus, + CurrentStatus, + Data3 + ), {keep_state, update_state(Data, Data0), Replies} end. @@ -1326,7 +1351,9 @@ handle_channel_health_check_worker_down_new_channels_and_status( ) -> %% The checked config is different from the current config which means we %% should not update AddedChannels because the channel has been removed or - %% updated while the health check was in progress + %% updated while the health check was in progress. We can still reply with + %% NewStatus because the health check must have been issued before the + %% configuration changed or the channel got removed. {AddedChannels, NewStatus}. reply_pending_channel_health_check_callers( From 4b540e3bd005a0008729f93e0561ee6d572e6e56 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 22 May 2024 17:43:07 +0200 Subject: [PATCH 5/6] fix: do not leak action configurations in alarm messages --- apps/emqx_resource/src/emqx_resource_manager.erl | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 70dac17ae..7b9e13b16 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -1431,9 +1431,13 @@ maybe_alarm(_Status, _ResId, Error, Error) -> maybe_alarm(_Status, ResId, Error, _PrevError) -> HrError = case Error of - {error, undefined} -> <<"Unknown reason">>; - {error, Reason} -> emqx_utils:readable_error_msg(Reason); - _ -> emqx_utils:readable_error_msg(Error) + {error, undefined} -> + <<"Unknown reason">>; + {error, Reason} -> + emqx_utils:readable_error_msg(Reason); + _ -> + Error1 = redact_config_from_error_status(Error), + emqx_utils:readable_error_msg(Error1) end, emqx_alarm:safe_activate( ResId, @@ -1442,6 +1446,11 @@ maybe_alarm(_Status, ResId, Error, _PrevError) -> ), ?tp(resource_activate_alarm, #{resource_id => ResId}). +redact_config_from_error_status(#{config := _} = ErrorStatus) -> + maps:remove(config, ErrorStatus); +redact_config_from_error_status(Error) -> + Error. + -spec maybe_resume_resource_workers(resource_id(), resource_status()) -> ok. maybe_resume_resource_workers(ResId, ?status_connected) -> lists:foreach( From 88c96e26de41199f4fb66f69be2960a04511098f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 22 May 2024 18:03:43 +0200 Subject: [PATCH 6/6] refactor: simplify the code with maps:map/2 Thanks @thalesmg for the suggestion --- apps/emqx_resource/src/emqx_resource_manager.erl | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 7b9e13b16..d650a2afb 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -1499,14 +1499,11 @@ maybe_reply(Actions, From, Reply) -> -spec data_record_to_external_map(data()) -> resource_data(). data_record_to_external_map(Data) -> - AddedChannelsList = maps:to_list(Data#data.added_channels), - AddedChannelsListWithoutConfigs = - [ - {ChanID, maps:remove(config, Status)} - || {ChanID, Status} <- AddedChannelsList - ], AddedChannelsWithoutConfigs = - maps:from_list(AddedChannelsListWithoutConfigs), + maps:map( + fun(_ChanID, Status) -> maps:remove(config, Status) end, + Data#data.added_channels + ), #{ id => Data#data.id, error => external_error(Data#data.error),