fix: action config update would sometimes not be reflected in connector

Before this commit the following happened sometimes:

1. action status is connected
2. action config is updated to something that should change the status to
   disconnected
3. action status is still connected and the old config is being used by
   the connector even though the config has been correctly updated.

The reason for this bug is that the post_config_hook runs before the
global EMQX config is updated. The post config hook is adding the new
config to the connector. Since the new config causes the action to get
status disconnected, the adding of the action to the connector is
retried when the health check runs but this time the config will be
loaded from the global config which could cause the old config to be
loaded (this happens when the global config has not had time to get
updated).

The above problem has been fixed in this commit by only reading action
configs from the global config when the connector starts/restarts and
instead store the latest configs for the actions in the connector.

Fixes:
https://emqx.atlassian.net/browse/EMQX-12376
This commit is contained in:
Kjell Winblad 2024-05-20 16:03:14 +02:00
parent a3cd3e31b1
commit 917474f694
2 changed files with 147 additions and 66 deletions

View File

@ -788,6 +788,60 @@ t_update_connector_not_found(_Config) ->
), ),
ok. ok.
%% Check that https://emqx.atlassian.net/browse/EMQX-12376 is fixed
t_update_concurrent_health_check(_Config) ->
Msg = <<"Channel status check failed">>,
ok = meck:expect(
emqx_bridge_v2_test_connector,
on_get_channel_status,
fun(
_ResId,
ChannelId,
#{channels := Channels}
) ->
#{
is_conf_for_connected := Connected
} = maps:get(ChannelId, Channels),
case Connected of
true ->
connected;
false ->
{error, Msg}
end
end
),
BaseConf = (bridge_config())#{
is_conf_for_connected => false
},
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, BaseConf)),
SetStatusConnected =
fun
(true) ->
Conf = BaseConf#{is_conf_for_connected => true},
%% Update the config
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
?assertMatch(
#{status := connected},
emqx_bridge_v2:health_check(bridge_type(), my_test_bridge)
);
(false) ->
Conf = BaseConf#{is_conf_for_connected => false},
%% Update the config
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
?assertMatch(
#{status := disconnected},
emqx_bridge_v2:health_check(bridge_type(), my_test_bridge)
)
end,
[
begin
Connected = (N rem 2) =:= 0,
SetStatusConnected(Connected)
end
|| N <- lists:seq(0, 20)
],
ok.
t_remove_single_connector_being_referenced_with_active_channels(_Config) -> t_remove_single_connector_being_referenced_with_active_channels(_Config) ->
%% we test the connector post config update here because we also need bridges. %% we test the connector post config update here because we also need bridges.
Conf = bridge_config(), Conf = bridge_config(),

View File

@ -542,9 +542,9 @@ handle_event(enter, _OldState, ?state_stopped = State, Data) ->
{keep_state_and_data, []}; {keep_state_and_data, []};
%% The following events can be handled in any other state %% The following events can be handled in any other state
handle_event( handle_event(
{call, From}, {add_channel, ChannelId, _Config}, State, Data {call, From}, {add_channel, ChannelId, Config}, State, Data
) -> ) ->
handle_not_connected_add_channel(From, ChannelId, State, Data); handle_not_connected_add_channel(From, ChannelId, Config, State, Data);
handle_event( handle_event(
{call, From}, {remove_channel, ChannelId}, _State, Data {call, From}, {remove_channel, ChannelId}, _State, Data
) -> ) ->
@ -678,8 +678,8 @@ add_channels(Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
NewChannels = lists:foldl( NewChannels = lists:foldl(
fun fun
({ChannelID, #{enable := true}}, Acc) -> ({ChannelID, #{enable := true} = Config}, Acc) ->
maps:put(ChannelID, channel_status(), Acc); maps:put(ChannelID, channel_status_not_added(Config), Acc);
({_, #{enable := false}}, Acc) -> ({_, #{enable := false}}, Acc) ->
Acc Acc
end, end,
@ -702,7 +702,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
%% we have not yet performed the initial health_check %% we have not yet performed the initial health_check
NewAddedChannelsMap = maps:put( NewAddedChannelsMap = maps:put(
ChannelID, ChannelID,
channel_status_new_waiting_for_health_check(), channel_status_new_waiting_for_health_check(ChannelConfig),
AddedChannelsMap AddedChannelsMap
), ),
NewData = Data#data{ NewData = Data#data{
@ -720,7 +720,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
AddedChannelsMap = Data#data.added_channels, AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put( NewAddedChannelsMap = maps:put(
ChannelID, ChannelID,
channel_status(Error), channel_status(Error, ChannelConfig),
AddedChannelsMap AddedChannelsMap
), ),
NewData = Data#data{ NewData = Data#data{
@ -835,7 +835,7 @@ handle_add_channel(From, Data, ChannelId, Config) ->
maps:get( maps:get(
ChannelId, ChannelId,
Channels, Channels,
channel_status() channel_status_not_added(Config)
) )
) )
of of
@ -843,7 +843,7 @@ handle_add_channel(From, Data, ChannelId, Config) ->
%% The channel is not installed in the connector state %% The channel is not installed in the connector state
%% We insert it into the channels map and let the health check %% We insert it into the channels map and let the health check
%% take care of the rest %% take care of the rest
NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels), NewChannels = maps:put(ChannelId, channel_status_not_added(Config), Channels),
NewData = Data#data{added_channels = NewChannels}, NewData = Data#data{added_channels = NewChannels},
{keep_state, update_state(NewData, Data), [ {keep_state, update_state(NewData, Data), [
{reply, From, ok} {reply, From, ok}
@ -854,17 +854,21 @@ handle_add_channel(From, Data, ChannelId, Config) ->
{keep_state_and_data, [{reply, From, ok}]} {keep_state_and_data, [{reply, From, ok}]}
end. end.
handle_not_connected_add_channel(From, ChannelId, State, Data) -> handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) ->
%% When state is not connected the channel will be added to the channels %% When state is not connected the channel will be added to the channels
%% map but nothing else will happen. %% map but nothing else will happen.
NewData = add_channel_status_if_not_exists(Data, ChannelId, State), NewData = add_or_update_channel_status(Data, ChannelId, ChannelConfig, State),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}. {keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
handle_remove_channel(From, ChannelId, Data) -> handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
%% Deactivate alarm %% Deactivate alarm
_ = maybe_clear_alarm(ChannelId), _ = maybe_clear_alarm(ChannelId),
case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status())) of case
channel_status_is_channel_added(
maps:get(ChannelId, Channels, channel_status_not_added(undefined))
)
of
false -> false ->
%% The channel is already not installed in the connector state. %% The channel is already not installed in the connector state.
%% We still need to remove it from the added_channels map %% We still need to remove it from the added_channels map
@ -1033,7 +1037,9 @@ continue_resource_health_check_not_connected(NewStatus, Data0) ->
end. end.
handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) -> handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
{keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]}; {keep_state_and_data, [
{reply, From, channel_status({error, resource_disconnected}, undefined)}
]};
handle_manual_channel_health_check( handle_manual_channel_health_check(
From, From,
#data{ #data{
@ -1072,7 +1078,7 @@ handle_manual_channel_health_check(
_Data, _Data,
_ChannelId _ChannelId
) -> ) ->
{keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}. {keep_state_and_data, [{reply, From, channel_status({error, channel_not_found}, undefined)}]}.
-spec channels_health_check(resource_status(), data()) -> data(). -spec channels_health_check(resource_status(), data()) -> data().
channels_health_check(?status_connected = _ConnectorStatus, Data0) -> channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
@ -1097,14 +1103,14 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
%% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms)
Channels = Data0#data.added_channels, Channels = Data0#data.added_channels,
ChannelsToChangeStatusFor = [ ChannelsToChangeStatusFor = [
ChannelId {ChannelId, Config}
|| {ChannelId, Status} <- maps:to_list(Channels), || {ChannelId, #{config := Config} = Status} <- maps:to_list(Channels),
channel_status_is_channel_added(Status) channel_status_is_channel_added(Status)
], ],
ChannelsWithNewStatuses = ChannelsWithNewStatuses =
[ [
{ChannelId, channel_status({?status_connecting, resource_is_connecting})} {ChannelId, channel_status({?status_connecting, resource_is_connecting}, Config)}
|| ChannelId <- ChannelsToChangeStatusFor || {ChannelId, Config} <- ChannelsToChangeStatusFor
], ],
%% Update the channels map %% Update the channels map
NewChannels = lists:foldl( NewChannels = lists:foldl(
@ -1149,9 +1155,10 @@ channels_health_check(ConnectorStatus, Data0) ->
ConnectorStatus, ConnectorStatus,
ChannelId, ChannelId,
Data1 Data1
)} )},
Config
)} )}
|| {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels) || {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels)
], ],
%% Raise alarms %% Raise alarms
_ = lists:foreach( _ = lists:foreach(
@ -1224,7 +1231,7 @@ continue_channel_health_check_connected(ChannelId, OldStatus, Data0) ->
#{channel := CHCWorkers0} = HCWorkers0, #{channel := CHCWorkers0} = HCWorkers0,
CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0), CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0),
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}}, Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
%% Remove the added channels with a a status different from connected or connecting %% Remove the added channels with a status different from connected or connecting
NewStatus = maps:get(ChannelId, Data0#data.added_channels), NewStatus = maps:get(ChannelId, Data0#data.added_channels),
ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)], ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
Data = remove_channels_in_list(ChannelsToRemove, Data1, true), Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
@ -1253,9 +1260,11 @@ spawn_channel_health_check_worker(#data{} = Data, ChannelId) ->
%% separated so it can be spec'ed and placate dialyzer tantrums... %% separated so it can be spec'ed and placate dialyzer tantrums...
-spec worker_channel_health_check(data(), channel_id()) -> no_return(). -spec worker_channel_health_check(data(), channel_id()) -> no_return().
worker_channel_health_check(Data, ChannelId) -> worker_channel_health_check(Data, ChannelId) ->
#data{id = ResId, mod = Mod, state = State} = Data, #data{id = ResId, mod = Mod, state = State, added_channels = Channels} = Data,
ChannelStatus = maps:get(ChannelId, Channels, #{}),
ChannelConfig = maps:get(config, ChannelStatus, undefined),
RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
exit({ok, channel_status(RawStatus)}). exit({ok, channel_status(RawStatus, ChannelConfig)}).
-spec handle_channel_health_check_worker_down( -spec handle_channel_health_check_worker_down(
data(), {pid(), reference()}, {ok, channel_status_map()} data(), {pid(), reference()}, {ok, channel_status_map()}
@ -1267,11 +1276,15 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
added_channels = AddedChannels0 added_channels = AddedChannels0
} = Data0, } = Data0,
{ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0), {ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0),
case ExitResult of %% The channel might have got removed while the health check was going on
{ok, NewStatus} -> CurrentStatus = maps:get(ChannelId, AddedChannels0, channel_not_added),
%% `emqx_resource:call_channel_health_check' catches all exceptions. {AddedChannels, NewStatus} =
AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0) handle_channel_health_check_worker_down_new_channels_and_status(
end, ChannelId,
ExitResult,
CurrentStatus,
AddedChannels0
),
#{ongoing := Ongoing0} = CHCWorkers1, #{ongoing := Ongoing0} = CHCWorkers1,
{PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0), {PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0),
CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1}, CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1},
@ -1293,6 +1306,26 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
{keep_state, update_state(Data, Data0), Replies} {keep_state, update_state(Data, Data0), Replies}
end. end.
handle_channel_health_check_worker_down_new_channels_and_status(
ChannelId,
{ok, #{config := CheckedConfig} = NewStatus} = _ExitResult,
#{config := CurrentConfig} = _CurrentStatus,
AddedChannels
) when CheckedConfig =:= CurrentConfig ->
%% Checked config is the same as the current config so we can update the
%% status in AddedChannels
{maps:put(ChannelId, NewStatus, AddedChannels), NewStatus};
handle_channel_health_check_worker_down_new_channels_and_status(
_ChannelId,
{ok, NewStatus} = _ExitResult,
_CurrentStatus,
AddedChannels
) ->
%% The checked config is different from the current config which means we
%% should not update AddedChannels because the channel has been removed or
%% updated while the health check was in progress
{AddedChannels, NewStatus}.
reply_pending_channel_health_check_callers( reply_pending_channel_health_check_callers(
ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0} ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0}
) -> ) ->
@ -1469,7 +1502,7 @@ safe_call(ResId, Message, Timeout) ->
%% Helper functions for chanel status data %% Helper functions for chanel status data
channel_status() -> channel_status_not_added(ChannelConfig) ->
#{ #{
%% The status of the channel. Can be one of the following: %% The status of the channel. Can be one of the following:
%% - disconnected: the channel is not added to the resource (error may contain the reason)) %% - disconnected: the channel is not added to the resource (error may contain the reason))
@ -1479,62 +1512,61 @@ channel_status() ->
%% - connected: the channel is added to the resource, the resource is %% - connected: the channel is added to the resource, the resource is
%% connected and the on_channel_get_status callback has returned %% connected and the on_channel_get_status callback has returned
%% connected. The error field should be undefined. %% connected. The error field should be undefined.
status => ?status_disconnected,
error => not_added_yet
}.
%% If the channel is added with add_channel/2, the config field will be set to
%% the config. This is useful when doing probing since the config is not stored
%% anywhere else in that case.
channel_status_new_with_config(Config) ->
#{
status => ?status_disconnected, status => ?status_disconnected,
error => not_added_yet, error => not_added_yet,
config => Config config => ChannelConfig
}. }.
channel_status_new_waiting_for_health_check() -> channel_status_new_waiting_for_health_check(ChannelConfig) ->
#{ #{
status => ?status_connecting, status => ?status_connecting,
error => no_health_check_yet error => no_health_check_yet,
config => ChannelConfig
}. }.
channel_status({?status_connecting, Error}) -> channel_status({?status_connecting, Error}, ChannelConfig) ->
#{ #{
status => ?status_connecting, status => ?status_connecting,
error => Error error => Error,
config => ChannelConfig
}; };
channel_status({?status_disconnected, Error}) -> channel_status({?status_disconnected, Error}, ChannelConfig) ->
#{ #{
status => ?status_disconnected, status => ?status_disconnected,
error => Error error => Error,
config => ChannelConfig
}; };
channel_status(?status_disconnected) -> channel_status(?status_disconnected, ChannelConfig) ->
#{ #{
status => ?status_disconnected, status => ?status_disconnected,
error => <<"Disconnected for unknown reason">> error => <<"Disconnected for unknown reason">>,
config => ChannelConfig
}; };
channel_status(?status_connecting) -> channel_status(?status_connecting, ChannelConfig) ->
#{ #{
status => ?status_connecting, status => ?status_connecting,
error => <<"Not connected for unknown reason">> error => <<"Not connected for unknown reason">>,
config => ChannelConfig
}; };
channel_status(?status_connected) -> channel_status(?status_connected, ChannelConfig) ->
#{ #{
status => ?status_connected, status => ?status_connected,
error => undefined error => undefined,
config => ChannelConfig
}; };
%% Probably not so useful but it is permitted to set an error even when the %% Probably not so useful but it is permitted to set an error even when the
%% status is connected %% status is connected
channel_status({?status_connected, Error}) -> channel_status({?status_connected, Error}, ChannelConfig) ->
#{ #{
status => ?status_connected, status => ?status_connected,
error => Error error => Error,
config => ChannelConfig
}; };
channel_status({error, Reason}) -> channel_status({error, Reason}, ChannelConfig) ->
#{ #{
status => ?status_disconnected, status => ?status_disconnected,
error => Reason error => Reason,
config => ChannelConfig
}. }.
channel_status_is_channel_added(#{ channel_status_is_channel_added(#{
@ -1548,19 +1580,14 @@ channel_status_is_channel_added(#{
channel_status_is_channel_added(_Status) -> channel_status_is_channel_added(_Status) ->
false. false.
-spec add_channel_status_if_not_exists(data(), channel_id(), resource_state()) -> data(). -spec add_or_update_channel_status(data(), channel_id(), map(), resource_state()) -> data().
add_channel_status_if_not_exists(Data, ChannelId, State) -> add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
case maps:is_key(ChannelId, Channels) of ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig),
true -> NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
Data; ResStatus = state_to_status(State),
false -> maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev),
ChannelStatus = channel_status({error, resource_not_operational}), Data#data{added_channels = NewChannels}.
NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
ResStatus = state_to_status(State),
maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels}
end.
state_to_status(?state_stopped) -> ?rm_status_stopped; state_to_status(?state_stopped) -> ?rm_status_stopped;
state_to_status(?state_connected) -> ?status_connected; state_to_status(?state_connected) -> ?status_connected;