Merge pull request #13077 from kjellwinblad/kjell/fix_action_update_race_issue/EMQX-12376

fix: action config update would sometimes not be reflected in connector
This commit is contained in:
Kjell Winblad 2024-05-23 09:11:48 +02:00 committed by GitHub
commit 26c988fe11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 205 additions and 78 deletions

View File

@ -788,6 +788,60 @@ t_update_connector_not_found(_Config) ->
),
ok.
%% Check that https://emqx.atlassian.net/browse/EMQX-12376 is fixed
t_update_concurrent_health_check(_Config) ->
Msg = <<"Channel status check failed">>,
ok = meck:expect(
emqx_bridge_v2_test_connector,
on_get_channel_status,
fun(
_ResId,
ChannelId,
#{channels := Channels}
) ->
#{
is_conf_for_connected := Connected
} = maps:get(ChannelId, Channels),
case Connected of
true ->
connected;
false ->
{error, Msg}
end
end
),
BaseConf = (bridge_config())#{
is_conf_for_connected => false
},
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, BaseConf)),
SetStatusConnected =
fun
(true) ->
Conf = BaseConf#{is_conf_for_connected => true},
%% Update the config
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
?assertMatch(
#{status := connected},
emqx_bridge_v2:health_check(bridge_type(), my_test_bridge)
);
(false) ->
Conf = BaseConf#{is_conf_for_connected => false},
%% Update the config
?assertMatch({ok, _}, emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)),
?assertMatch(
#{status := disconnected},
emqx_bridge_v2:health_check(bridge_type(), my_test_bridge)
)
end,
[
begin
Connected = (N rem 2) =:= 0,
SetStatusConnected(Connected)
end
|| N <- lists:seq(0, 20)
],
ok.
t_remove_single_connector_being_referenced_with_active_channels(_Config) ->
%% we test the connector post config update here because we also need bridges.
Conf = bridge_config(),

View File

@ -542,9 +542,9 @@ handle_event(enter, _OldState, ?state_stopped = State, Data) ->
{keep_state_and_data, []};
%% The following events can be handled in any other state
handle_event(
{call, From}, {add_channel, ChannelId, _Config}, State, Data
{call, From}, {add_channel, ChannelId, Config}, State, Data
) ->
handle_not_connected_add_channel(From, ChannelId, State, Data);
handle_not_connected_add_channel(From, ChannelId, Config, State, Data);
handle_event(
{call, From}, {remove_channel, ChannelId}, _State, Data
) ->
@ -678,8 +678,8 @@ add_channels(Data) ->
Channels = Data#data.added_channels,
NewChannels = lists:foldl(
fun
({ChannelID, #{enable := true}}, Acc) ->
maps:put(ChannelID, channel_status(), Acc);
({ChannelID, #{enable := true} = Config}, Acc) ->
maps:put(ChannelID, channel_status_not_added(Config), Acc);
({_, #{enable := false}}, Acc) ->
Acc
end,
@ -702,7 +702,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
%% we have not yet performed the initial health_check
NewAddedChannelsMap = maps:put(
ChannelID,
channel_status_new_waiting_for_health_check(),
channel_status_new_waiting_for_health_check(ChannelConfig),
AddedChannelsMap
),
NewData = Data#data{
@ -720,7 +720,7 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
AddedChannelsMap = Data#data.added_channels,
NewAddedChannelsMap = maps:put(
ChannelID,
channel_status(Error),
channel_status(Error, ChannelConfig),
AddedChannelsMap
),
NewData = Data#data{
@ -835,7 +835,7 @@ handle_add_channel(From, Data, ChannelId, Config) ->
maps:get(
ChannelId,
Channels,
channel_status()
channel_status_not_added(Config)
)
)
of
@ -843,7 +843,7 @@ handle_add_channel(From, Data, ChannelId, Config) ->
%% The channel is not installed in the connector state
%% We insert it into the channels map and let the health check
%% take care of the rest
NewChannels = maps:put(ChannelId, channel_status_new_with_config(Config), Channels),
NewChannels = maps:put(ChannelId, channel_status_not_added(Config), Channels),
NewData = Data#data{added_channels = NewChannels},
{keep_state, update_state(NewData, Data), [
{reply, From, ok}
@ -854,17 +854,21 @@ handle_add_channel(From, Data, ChannelId, Config) ->
{keep_state_and_data, [{reply, From, ok}]}
end.
handle_not_connected_add_channel(From, ChannelId, State, Data) ->
handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) ->
%% When state is not connected the channel will be added to the channels
%% map but nothing else will happen.
NewData = add_channel_status_if_not_exists(Data, ChannelId, State),
NewData = add_or_update_channel_status(Data, ChannelId, ChannelConfig, State),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
handle_remove_channel(From, ChannelId, Data) ->
Channels = Data#data.added_channels,
%% Deactivate alarm
_ = maybe_clear_alarm(ChannelId),
case channel_status_is_channel_added(maps:get(ChannelId, Channels, channel_status())) of
case
channel_status_is_channel_added(
maps:get(ChannelId, Channels, channel_status_not_added(undefined))
)
of
false ->
%% The channel is already not installed in the connector state.
%% We still need to remove it from the added_channels map
@ -1033,7 +1037,10 @@ continue_resource_health_check_not_connected(NewStatus, Data0) ->
end.
handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
{keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
{keep_state_and_data, [
{reply, From,
maps:remove(config, channel_status({error, resource_disconnected}, undefined))}
]};
handle_manual_channel_health_check(
From,
#data{
@ -1066,13 +1073,15 @@ handle_manual_channel_health_check(
is_map_key(ChannelId, Channels)
->
%% No ongoing health check: reply with current status.
{keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
{keep_state_and_data, [{reply, From, maps:remove(config, maps:get(ChannelId, Channels))}]};
handle_manual_channel_health_check(
From,
_Data,
_ChannelId
) ->
{keep_state_and_data, [{reply, From, channel_status({error, channel_not_found})}]}.
{keep_state_and_data, [
{reply, From, maps:remove(config, channel_status({error, channel_not_found}, undefined))}
]}.
-spec channels_health_check(resource_status(), data()) -> data().
channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
@ -1097,14 +1106,14 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
%% 2. Raise alarms (TODO: if it is a probe we should not raise alarms)
Channels = Data0#data.added_channels,
ChannelsToChangeStatusFor = [
ChannelId
|| {ChannelId, Status} <- maps:to_list(Channels),
{ChannelId, Config}
|| {ChannelId, #{config := Config} = Status} <- maps:to_list(Channels),
channel_status_is_channel_added(Status)
],
ChannelsWithNewStatuses =
[
{ChannelId, channel_status({?status_connecting, resource_is_connecting})}
|| ChannelId <- ChannelsToChangeStatusFor
{ChannelId, channel_status({?status_connecting, resource_is_connecting}, Config)}
|| {ChannelId, Config} <- ChannelsToChangeStatusFor
],
%% Update the channels map
NewChannels = lists:foldl(
@ -1149,9 +1158,10 @@ channels_health_check(ConnectorStatus, Data0) ->
ConnectorStatus,
ChannelId,
Data1
)},
Config
)}
)}
|| {ChannelId, OldStatus} <- maps:to_list(Data1#data.added_channels)
|| {ChannelId, #{config := Config} = OldStatus} <- maps:to_list(Data1#data.added_channels)
],
%% Raise alarms
_ = lists:foreach(
@ -1218,14 +1228,29 @@ trigger_health_check_for_added_channels(Data0 = #data{hc_workers = HCWorkers0})
start_channel_health_check(Data1, ChannelId)
end.
-spec continue_channel_health_check_connected(channel_id(), channel_status_map(), data()) -> data().
continue_channel_health_check_connected(ChannelId, OldStatus, Data0) ->
-spec continue_channel_health_check_connected(
channel_id(), channel_status_map(), channel_status_map(), data()
) -> data().
continue_channel_health_check_connected(ChannelId, OldStatus, CurrentStatus, Data0) ->
#data{hc_workers = HCWorkers0} = Data0,
#{channel := CHCWorkers0} = HCWorkers0,
CHCWorkers = emqx_utils_maps:deep_remove([ongoing, ChannelId], CHCWorkers0),
Data1 = Data0#data{hc_workers = HCWorkers0#{channel := CHCWorkers}},
%% Remove the added channels with a a status different from connected or connecting
NewStatus = maps:get(ChannelId, Data0#data.added_channels),
case OldStatus =:= CurrentStatus of
true ->
continue_channel_health_check_connected_no_update_during_check(
ChannelId, OldStatus, Data1
);
false ->
%% Channel has been updated while the health check process was working so
%% we should not clear any alarm or remove the channel from the
%% connector
Data1
end.
continue_channel_health_check_connected_no_update_during_check(ChannelId, OldStatus, Data1) ->
%% Remove the added channels with a status different from connected or connecting
NewStatus = maps:get(ChannelId, Data1#data.added_channels),
ChannelsToRemove = [ChannelId || not channel_status_is_channel_added(NewStatus)],
Data = remove_channels_in_list(ChannelsToRemove, Data1, true),
%% Raise/clear alarms
@ -1253,9 +1278,11 @@ spawn_channel_health_check_worker(#data{} = Data, ChannelId) ->
%% separated so it can be spec'ed and placate dialyzer tantrums...
-spec worker_channel_health_check(data(), channel_id()) -> no_return().
worker_channel_health_check(Data, ChannelId) ->
#data{id = ResId, mod = Mod, state = State} = Data,
#data{id = ResId, mod = Mod, state = State, added_channels = Channels} = Data,
ChannelStatus = maps:get(ChannelId, Channels, #{}),
ChannelConfig = maps:get(config, ChannelStatus, undefined),
RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
exit({ok, channel_status(RawStatus)}).
exit({ok, channel_status(RawStatus, ChannelConfig)}).
-spec handle_channel_health_check_worker_down(
data(), {pid(), reference()}, {ok, channel_status_map()}
@ -1267,11 +1294,15 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
added_channels = AddedChannels0
} = Data0,
{ChannelId, CHCWorkers1} = maps:take(WorkerRef, CHCWorkers0),
case ExitResult of
{ok, NewStatus} ->
%% `emqx_resource:call_channel_health_check' catches all exceptions.
AddedChannels = maps:put(ChannelId, NewStatus, AddedChannels0)
end,
%% The channel might have got removed while the health check was going on
CurrentStatus = maps:get(ChannelId, AddedChannels0, channel_not_added),
{AddedChannels, NewStatus} =
handle_channel_health_check_worker_down_new_channels_and_status(
ChannelId,
ExitResult,
CurrentStatus,
AddedChannels0
),
#{ongoing := Ongoing0} = CHCWorkers1,
{PreviousChanStatus, Ongoing1} = maps:take(ChannelId, Ongoing0),
CHCWorkers2 = CHCWorkers1#{ongoing := Ongoing1},
@ -1283,19 +1314,52 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
CHCWorkers = CHCWorkers3#{pending := Rest},
HCWorkers = HCWorkers0#{channel := CHCWorkers},
Data3 = Data2#data{hc_workers = HCWorkers},
Data4 = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
Data4 = continue_channel_health_check_connected(
ChannelId,
PreviousChanStatus,
CurrentStatus,
Data3
),
Data = start_channel_health_check(Data4, NextChannelId),
{keep_state, update_state(Data, Data0), Replies};
#{pending := []} ->
HCWorkers = HCWorkers0#{channel := CHCWorkers3},
Data3 = Data2#data{hc_workers = HCWorkers},
Data = continue_channel_health_check_connected(ChannelId, PreviousChanStatus, Data3),
Data = continue_channel_health_check_connected(
ChannelId,
PreviousChanStatus,
CurrentStatus,
Data3
),
{keep_state, update_state(Data, Data0), Replies}
end.
reply_pending_channel_health_check_callers(
ChannelId, Status, Data0 = #data{hc_pending_callers = Pending0}
handle_channel_health_check_worker_down_new_channels_and_status(
ChannelId,
{ok, #{config := CheckedConfig} = NewStatus} = _ExitResult,
#{config := CurrentConfig} = _CurrentStatus,
AddedChannels
) when CheckedConfig =:= CurrentConfig ->
%% Checked config is the same as the current config so we can update the
%% status in AddedChannels
{maps:put(ChannelId, NewStatus, AddedChannels), NewStatus};
handle_channel_health_check_worker_down_new_channels_and_status(
_ChannelId,
{ok, NewStatus} = _ExitResult,
_CurrentStatus,
AddedChannels
) ->
%% The checked config is different from the current config which means we
%% should not update AddedChannels because the channel has been removed or
%% updated while the health check was in progress. We can still reply with
%% NewStatus because the health check must have been issued before the
%% configuration changed or the channel got removed.
{AddedChannels, NewStatus}.
reply_pending_channel_health_check_callers(
ChannelId, Status0, Data0 = #data{hc_pending_callers = Pending0}
) ->
Status = maps:remove(config, Status0),
#{channel := CPending0} = Pending0,
Pending = maps:get(ChannelId, CPending0, []),
Actions = [{reply, From, Status} || From <- Pending],
@ -1367,9 +1431,13 @@ maybe_alarm(_Status, _ResId, Error, Error) ->
maybe_alarm(_Status, ResId, Error, _PrevError) ->
HrError =
case Error of
{error, undefined} -> <<"Unknown reason">>;
{error, Reason} -> emqx_utils:readable_error_msg(Reason);
_ -> emqx_utils:readable_error_msg(Error)
{error, undefined} ->
<<"Unknown reason">>;
{error, Reason} ->
emqx_utils:readable_error_msg(Reason);
_ ->
Error1 = redact_config_from_error_status(Error),
emqx_utils:readable_error_msg(Error1)
end,
emqx_alarm:safe_activate(
ResId,
@ -1378,6 +1446,11 @@ maybe_alarm(_Status, ResId, Error, _PrevError) ->
),
?tp(resource_activate_alarm, #{resource_id => ResId}).
redact_config_from_error_status(#{config := _} = ErrorStatus) ->
maps:remove(config, ErrorStatus);
redact_config_from_error_status(Error) ->
Error.
-spec maybe_resume_resource_workers(resource_id(), resource_status()) -> ok.
maybe_resume_resource_workers(ResId, ?status_connected) ->
lists:foreach(
@ -1426,6 +1499,11 @@ maybe_reply(Actions, From, Reply) ->
-spec data_record_to_external_map(data()) -> resource_data().
data_record_to_external_map(Data) ->
AddedChannelsWithoutConfigs =
maps:map(
fun(_ChanID, Status) -> maps:remove(config, Status) end,
Data#data.added_channels
),
#{
id => Data#data.id,
error => external_error(Data#data.error),
@ -1435,7 +1513,7 @@ data_record_to_external_map(Data) ->
config => Data#data.config,
status => Data#data.status,
state => Data#data.state,
added_channels => Data#data.added_channels
added_channels => AddedChannelsWithoutConfigs
}.
-spec wait_for_ready(resource_id(), integer()) -> ok | timeout | {error, term()}.
@ -1469,7 +1547,7 @@ safe_call(ResId, Message, Timeout) ->
%% Helper functions for chanel status data
channel_status() ->
channel_status_not_added(ChannelConfig) ->
#{
%% The status of the channel. Can be one of the following:
%% - disconnected: the channel is not added to the resource (error may contain the reason))
@ -1479,62 +1557,61 @@ channel_status() ->
%% - connected: the channel is added to the resource, the resource is
%% connected and the on_channel_get_status callback has returned
%% connected. The error field should be undefined.
status => ?status_disconnected,
error => not_added_yet
}.
%% If the channel is added with add_channel/2, the config field will be set to
%% the config. This is useful when doing probing since the config is not stored
%% anywhere else in that case.
channel_status_new_with_config(Config) ->
#{
status => ?status_disconnected,
error => not_added_yet,
config => Config
config => ChannelConfig
}.
channel_status_new_waiting_for_health_check() ->
channel_status_new_waiting_for_health_check(ChannelConfig) ->
#{
status => ?status_connecting,
error => no_health_check_yet
error => no_health_check_yet,
config => ChannelConfig
}.
channel_status({?status_connecting, Error}) ->
channel_status({?status_connecting, Error}, ChannelConfig) ->
#{
status => ?status_connecting,
error => Error
error => Error,
config => ChannelConfig
};
channel_status({?status_disconnected, Error}) ->
channel_status({?status_disconnected, Error}, ChannelConfig) ->
#{
status => ?status_disconnected,
error => Error
error => Error,
config => ChannelConfig
};
channel_status(?status_disconnected) ->
channel_status(?status_disconnected, ChannelConfig) ->
#{
status => ?status_disconnected,
error => <<"Disconnected for unknown reason">>
error => <<"Disconnected for unknown reason">>,
config => ChannelConfig
};
channel_status(?status_connecting) ->
channel_status(?status_connecting, ChannelConfig) ->
#{
status => ?status_connecting,
error => <<"Not connected for unknown reason">>
error => <<"Not connected for unknown reason">>,
config => ChannelConfig
};
channel_status(?status_connected) ->
channel_status(?status_connected, ChannelConfig) ->
#{
status => ?status_connected,
error => undefined
error => undefined,
config => ChannelConfig
};
%% Probably not so useful but it is permitted to set an error even when the
%% status is connected
channel_status({?status_connected, Error}) ->
channel_status({?status_connected, Error}, ChannelConfig) ->
#{
status => ?status_connected,
error => Error
error => Error,
config => ChannelConfig
};
channel_status({error, Reason}) ->
channel_status({error, Reason}, ChannelConfig) ->
#{
status => ?status_disconnected,
error => Reason
error => Reason,
config => ChannelConfig
}.
channel_status_is_channel_added(#{
@ -1548,19 +1625,14 @@ channel_status_is_channel_added(#{
channel_status_is_channel_added(_Status) ->
false.
-spec add_channel_status_if_not_exists(data(), channel_id(), resource_state()) -> data().
add_channel_status_if_not_exists(Data, ChannelId, State) ->
-spec add_or_update_channel_status(data(), channel_id(), map(), resource_state()) -> data().
add_or_update_channel_status(Data, ChannelId, ChannelConfig, State) ->
Channels = Data#data.added_channels,
case maps:is_key(ChannelId, Channels) of
true ->
Data;
false ->
ChannelStatus = channel_status({error, resource_not_operational}),
ChannelStatus = channel_status({error, resource_not_operational}, ChannelConfig),
NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
ResStatus = state_to_status(State),
maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels}
end.
Data#data{added_channels = NewChannels}.
state_to_status(?state_stopped) -> ?rm_status_stopped;
state_to_status(?state_connected) -> ?status_connected;

View File

@ -0,0 +1 @@
Updates to action configurations would sometimes not take effect without disabling and enabling the action. This means that an action could sometimes run with the old (previous) configuration even though it would look like the action configuration has been updated successfully.