diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index d6199f3a3..dc2e8f275 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -788,6 +788,60 @@ t_update_connector_not_found(_Config) -> ), ok. +%% Check that https://emqx.atlassian.net/browse/EMQX-12376 is fixed +t_update_concurrent_health_check(_Config) -> + Msg = <<"Channel status check failed">>, + ok = meck:expect( + emqx_bridge_v2_test_connector, + on_get_channel_status, + fun( + _ResId, + ChannelId, + #{channels := Channels} + ) -> + #{ + is_conf_for_connected := Connected + } = maps:get(ChannelId, Channels), + case Connected of + true -> + connected; + false -> + {error, Msg} + end + end + ), + BaseConf = (bridge_config())#{ + is_conf_for_connected => false + }, + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, BaseConf)), + SetStatusConnected = + fun + (true) -> + Conf = BaseConf#{is_conf_for_connected => true}, + %% Update the config + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)), + ?assertMatch( + #{status := connected}, + emqx_bridge_v2:health_check(bridge_type(), my_test_bridge) + ); + (false) -> + Conf = BaseConf#{is_conf_for_connected => false}, + %% Update the config + ?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)), + ?assertMatch( + #{status := disconnected}, + emqx_bridge_v2:health_check(bridge_type(), my_test_bridge) + ) + end, + [ + begin + Connected = (N rem 2) =:= 0, + SetStatusConnected(Connected) + end + || N <- lists:seq(0, 20) + ], + ok. + t_remove_single_connector_being_referenced_with_active_channels(_Config) -> %% we test the connector post config update here because we also need bridges. Conf = bridge_config(), diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 1c0c3f4ca..1efd88b24 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -542,9 +542,9 @@ handle_event(enter, _OldState, ?state_stopped = State, Data) -> {keep_state_and_data, []}; %% The following events can be handled in any other state handle_event( - {call, From}, {add_channel, ChannelId, _Config}, State, Data + {call, From}, {add_channel, ChannelId, Config}, State, Data ) -> - handle_not_connected_add_channel(From, ChannelId, State, Data); + handle_not_connected_add_channel(From, ChannelId, Config, State, Data); handle_event( {call, From}, {remove_channel, ChannelId}, _State, Data ) -> @@ -678,8 +678,8 @@ add_channels(Data) -> Channels = Data#data.added_channels, NewChannels = lists:foldl( fun - ({ChannelID, #{enable := true}}, Acc) -> - maps:put(ChannelID, channel_status(), Acc); + ({ChannelID, #{enable := true} = Config}, Acc) -> + maps:put(ChannelID, channel_status_not_added(Config), Acc); ({_, #{enable := false}}, Acc) -> Acc end, @@ -702,7 +702,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> %% we have not yet performed the initial health_check NewAddedChannelsMap = maps:put( ChannelID, - channel_status_new_waiting_for_health_check(), + channel_status_new_waiting_for_health_check(ChannelConfig), AddedChannelsMap ), NewData = Data#data{ @@ -720,7 +720,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) -> AddedChannelsMap = Data#data.added_channels, NewAddedChannelsMap = maps:put( ChannelID, - channel_status(Error), + channel_status(Error, ChannelConfig), AddedChannelsMap ), NewData = Data#data{ @@ -835,7 +835,7 @@ handle_add_channel(From, Data, ChannelId, Config) -> maps:get( ChannelId, Channels, - channel_status() + channel_status_not_added(Config) ) ) of @@ -843,7 +843,7 @@ handle_add_channel(From, Data, ChannelId, Config) -> %% The channel is not installed in the connector state %% We insert it into the channels map and let the health check %% take care of the rest - NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels), + NewChannels = maps:put(ChannelId, channel_status_not_added(Config), Channels), NewData = Data#data{added_channels = NewChannels}, {keep_state, update_state(NewData, Data), [ {reply, From, ok} @@ -854,17 +854,21 @@ handle_add_channel(From, Data, ChannelId, Config) -> {keep_state_and_data, [{reply, From, ok}]} end. -handle_not_connected_add_channel(From, ChannelId, State, Data) -> +handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) -> %% When state is not connected the channel will be added to the channels %% map but nothing else will happen. - NewData = add_channel_status_if_not_exists(Data, ChannelId, State), + NewData = add_or_update_channel_status(Data, ChannelId, ChannelConfig, State), {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. handle_remove_channel(From, ChannelId, Data) -> Channels = Data#data.added_channels, %% Deactivate alarm _ = maybe_clear_alarm(ChannelId), - case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status())) of + case + channel_status_is_channel_added( + maps:get(ChannelId, Channels, channel_status_not_added(undefined)) + ) + of false -> %% The channel is already not installed in the connector state. %% We still need to remove it from the added_channels map @@ -1033,7 +1037,9 @@ continue_resource_health_check_not_connected(NewStatus, Data0) -> end. handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) -> - {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]}; + {keep_state_and_data, [ + {reply, From, channel_status({error, resource_disconnected}, undefined)} + ]}; handle_manual_channel_health_check( From, #data{ @@ -1072,7 +1078,7 @@ handle_manual_channel_health_check( _Data, _ChannelId ) -> - {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}. + {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found}, undefined)}]}. -spec channels_health_check(resource_status(), data()) -> data(). channels_health_check(?status_connected = _ConnectorStatus, Data0) -> @@ -1097,14 +1103,14 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) -> %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) Channels = Data0#data.added_channels, ChannelsToChangeStatusFor = [ - ChannelId - || {ChannelId, Status} <- maps:to_list(Channels), + {ChannelId, Config} + || {ChannelId, #{config := Config} = Status} <- maps:to_list(Channels), channel_status_is_channel_added(Status) ], ChannelsWithNewStatuses = [ - {ChannelId, channel_status({?status_connecting, resource_is_connecting})} - || ChannelId <- ChannelsToChangeStatusFor + {ChannelId, channel_status({?status_connecting, resource_is_connecting}, Config)} + || {ChannelId, Config} <- ChannelsToChangeStatusFor ], %% Update the channels map NewChannels = lists:foldl( @@ -1149,9 +1155,10 @@ channels_health_check(ConnectorStatus, Data0) -> ConnectorStatus, ChannelId, Data1 - )} + )}, + Config )} - || {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels) + || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels) ], %% Raise alarms _ = lists:foreach( @@ -1224,7 +1231,7 @@ continue_channel_health_check_connected(ChannelId, OldStatus, Data0) -> #{channel := CHCWorkers0} = HCWorkers0, CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0), Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, - %% Remove the added channels with a a status different from connected or connecting + %% Remove the added channels with a status different from connected or connecting NewStatus = maps:get(ChannelId, Data0#data.added_channels), ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], Data = remove_channels_in_list(ChannelsToRemove, Data1, true), @@ -1253,9 +1260,11 @@ spawn_channel_health_check_worker(#data{} = Data, ChannelId) -> %% separated so it can be spec'ed and placate dialyzer tantrums... -spec worker_channel_health_check(data(), channel_id()) -> no_return(). worker_channel_health_check(Data, ChannelId) -> - #data{id = ResId, mod = Mod, state = State} = Data, + #data{id = ResId, mod = Mod, state = State, added_channels = Channels} = Data, + ChannelStatus = maps:get(ChannelId, Channels, #{}), + ChannelConfig = maps:get(config, ChannelStatus, undefined), RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), - exit({ok, channel_status(RawStatus)}). + exit({ok, channel_status(RawStatus, ChannelConfig)}). -spec handle_channel_health_check_worker_down( data(), {pid(), reference()}, {ok, channel_status_map()} @@ -1267,11 +1276,15 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> added_channels = AddedChannels0 } = Data0, {ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0), - case ExitResult of - {ok, NewStatus} -> - %% `emqx_resource:call_channel_health_check' catches all exceptions. - AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0) - end, + %% The channel might have got removed while the health check was going on + CurrentStatus = maps:get(ChannelId, AddedChannels0, channel_not_added), + {AddedChannels, NewStatus} = + handle_channel_health_check_worker_down_new_channels_and_status( + ChannelId, + ExitResult, + CurrentStatus, + AddedChannels0 + ), #{ongoing := Ongoing0} = CHCWorkers1, {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0), CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1}, @@ -1293,6 +1306,26 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) -> {keep_state, update_state(Data, Data0), Replies} end. +handle_channel_health_check_worker_down_new_channels_and_status( + ChannelId, + {ok, #{config := CheckedConfig} = NewStatus} = _ExitResult, + #{config := CurrentConfig} = _CurrentStatus, + AddedChannels +) when CheckedConfig =:= CurrentConfig -> + %% Checked config is the same as the current config so we can update the + %% status in AddedChannels + {maps:put(ChannelId, NewStatus, AddedChannels), NewStatus}; +handle_channel_health_check_worker_down_new_channels_and_status( + _ChannelId, + {ok, NewStatus} = _ExitResult, + _CurrentStatus, + AddedChannels +) -> + %% The checked config is different from the current config which means we + %% should not update AddedChannels because the channel has been removed or + %% updated while the health check was in progress + {AddedChannels, NewStatus}. + reply_pending_channel_health_check_callers( ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0} ) -> @@ -1469,7 +1502,7 @@ safe_call(ResId, Message, Timeout) -> %% Helper functions for chanel status data -channel_status() -> +channel_status_not_added(ChannelConfig) -> #{ %% The status of the channel. Can be one of the following: %% - disconnected: the channel is not added to the resource (error may contain the reason)) @@ -1479,62 +1512,61 @@ channel_status() -> %% - connected: the channel is added to the resource, the resource is %% connected and the on_channel_get_status callback has returned %% connected. The error field should be undefined. - status => ?status_disconnected, - error => not_added_yet - }. - -%% If the channel is added with add_channel/2, the config field will be set to -%% the config. This is useful when doing probing since the config is not stored -%% anywhere else in that case. -channel_status_new_with_config(Config) -> - #{ status => ?status_disconnected, error => not_added_yet, - config => Config + config => ChannelConfig }. -channel_status_new_waiting_for_health_check() -> +channel_status_new_waiting_for_health_check(ChannelConfig) -> #{ status => ?status_connecting, - error => no_health_check_yet + error => no_health_check_yet, + config => ChannelConfig }. -channel_status({?status_connecting, Error}) -> +channel_status({?status_connecting, Error}, ChannelConfig) -> #{ status => ?status_connecting, - error => Error + error => Error, + config => ChannelConfig }; -channel_status({?status_disconnected, Error}) -> +channel_status({?status_disconnected, Error}, ChannelConfig) -> #{ status => ?status_disconnected, - error => Error + error => Error, + config => ChannelConfig }; -channel_status(?status_disconnected) -> +channel_status(?status_disconnected, ChannelConfig) -> #{ status => ?status_disconnected, - error => <<"Disconnected for unknown reason">> + error => <<"Disconnected for unknown reason">>, + config => ChannelConfig }; -channel_status(?status_connecting) -> +channel_status(?status_connecting, ChannelConfig) -> #{ status => ?status_connecting, - error => <<"Not connected for unknown reason">> + error => <<"Not connected for unknown reason">>, + config => ChannelConfig }; -channel_status(?status_connected) -> +channel_status(?status_connected, ChannelConfig) -> #{ status => ?status_connected, - error => undefined + error => undefined, + config => ChannelConfig }; %% Probably not so useful but it is permitted to set an error even when the %% status is connected -channel_status({?status_connected, Error}) -> +channel_status({?status_connected, Error}, ChannelConfig) -> #{ status => ?status_connected, - error => Error + error => Error, + config => ChannelConfig }; -channel_status({error, Reason}) -> +channel_status({error, Reason}, ChannelConfig) -> #{ status => ?status_disconnected, - error => Reason + error => Reason, + config => ChannelConfig }. channel_status_is_channel_added(#{ @@ -1548,19 +1580,14 @@ channel_status_is_channel_added(#{ channel_status_is_channel_added(_Status) -> false. --spec add_channel_status_if_not_exists(data(), channel_id(), resource_state()) -> data(). -add_channel_status_if_not_exists(Data, ChannelId, State) -> +-spec add_or_update_channel_status(data(), channel_id(), map(), resource_state()) -> data(). +add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) -> Channels = Data#data.added_channels, - case maps:is_key(ChannelId, Channels) of - true -> - Data; - false -> - ChannelStatus = channel_status({error, resource_not_operational}), - NewChannels = maps:put(ChannelId, ChannelStatus, Channels), - ResStatus = state_to_status(State), - maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), - Data#data{added_channels = NewChannels} - end. + ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig), + NewChannels = maps:put(ChannelId, ChannelStatus, Channels), + ResStatus = state_to_status(State), + maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev), + Data#data{added_channels = NewChannels}. state_to_status(?state_stopped) -> ?rm_status_stopped; state_to_status(?state_connected) -> ?status_connected;