refactor(resource_manager): use macros and better differentiate status from state

Internally in `emqx_resource_manager`, there seems to be many points where the
`gen_statem` states are conflated with resource status, since their names coincide.  While
that works for now, introducing a new `gen_statem` state, an internal state, shouldn't
necessarily imply a new, externally facing resource status.

Here we also introduce the usage of some macros to avoid the pitfalls of making a typo in
a state/status name.
This commit is contained in:
Thales Macedo Garitezi 2023-11-16 11:21:04 -03:00
parent 55621fbb86
commit dc5e3b939c
4 changed files with 127 additions and 94 deletions

View File

@ -201,9 +201,9 @@
%% when calling emqx_resource:health_check/2 %% when calling emqx_resource:health_check/2
-callback on_get_status(resource_id(), resource_state()) -> -callback on_get_status(resource_id(), resource_state()) ->
resource_status() health_check_status()
| {resource_status(), resource_state()} | {health_check_status(), resource_state()}
| {resource_status(), resource_state(), term()}. | {health_check_status(), resource_state(), term()}.
-callback on_get_channel_status(resource_id(), channel_id(), resource_state()) -> -callback on_get_channel_status(resource_id(), channel_id(), resource_state()) ->
channel_status() channel_status()
@ -248,7 +248,7 @@
{error, Reason}; {error, Reason};
C:E:S -> C:E:S ->
{error, #{ {error, #{
execption => C, exception => C,
reason => emqx_utils:redact(E), reason => emqx_utils:redact(E),
stacktrace => emqx_utils:redact(S) stacktrace => emqx_utils:redact(S)
}} }}

View File

@ -1077,9 +1077,11 @@ handle_async_worker_down(Data0, Pid) ->
call_query(QM, Id, Index, Ref, Query, QueryOpts) -> call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}), ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}),
case emqx_resource_manager:lookup_cached(extract_connector_id(Id)) of case emqx_resource_manager:lookup_cached(extract_connector_id(Id)) of
{ok, _Group, #{status := stopped}} -> %% This seems to be the only place where the `rm_status_stopped' status matters,
%% to distinguish from the `disconnected' status.
{ok, _Group, #{status := ?rm_status_stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled"); ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, #{status := connecting, error := unhealthy_target}} -> {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
{error, {unrecoverable_error, unhealthy_target}}; {error, {unrecoverable_error, unhealthy_target}};
{ok, _Group, Resource} -> {ok, _Group, Resource} ->
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource); do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);

View File

@ -85,7 +85,19 @@
-define(T_OPERATION, 5000). -define(T_OPERATION, 5000).
-define(T_LOOKUP, 1000). -define(T_LOOKUP, 1000).
-define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected). %% `gen_statem' states
%% Note: most of them coincide with resource _status_. We use a different set of macros
%% to avoid mixing those concepts up.
%% Also note: the `stopped' _status_ can only be emitted by `emqx_resource_manager'...
%% Modules implementing `emqx_resource' behavior should not return it.
-define(state_connected, connected).
-define(state_connecting, connecting).
-define(state_disconnected, disconnected).
-define(state_stopped, stopped).
-define(IS_STATUS(ST),
ST =:= ?status_connecting; ST =:= ?status_connected; ST =:= ?status_disconnected
).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
@ -397,12 +409,12 @@ init({DataIn, Opts}) ->
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true -> true ->
%% init the cache so that lookup/1 will always return something %% init the cache so that lookup/1 will always return something
UpdatedData = update_state(Data#data{status = connecting}), UpdatedData = update_state(Data#data{status = ?status_connecting}),
{ok, connecting, UpdatedData, {next_event, internal, start_resource}}; {ok, ?state_connecting, UpdatedData, {next_event, internal, start_resource}};
false -> false ->
%% init the cache so that lookup/1 will always return something %% init the cache so that lookup/1 will always return something
UpdatedData = update_state(Data#data{status = stopped}), UpdatedData = update_state(Data#data{status = ?rm_status_stopped}),
{ok, stopped, UpdatedData} {ok, ?state_stopped, UpdatedData}
end. end.
terminate({shutdown, removed}, _State, _Data) -> terminate({shutdown, removed}, _State, _Data) ->
@ -420,26 +432,26 @@ callback_mode() -> [handle_event_function, state_enter].
% Called during testing to force a specific state % Called during testing to force a specific state
handle_event({call, From}, set_resource_status_connecting, _State, Data) -> handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
UpdatedData = update_state(Data#data{status = connecting}, Data), UpdatedData = update_state(Data#data{status = ?status_connecting}, Data),
{next_state, connecting, UpdatedData, [{reply, From, ok}]}; {next_state, ?state_connecting, UpdatedData, [{reply, From, ok}]};
% Called when the resource is to be restarted % Called when the resource is to be restarted
handle_event({call, From}, restart, _State, Data) -> handle_event({call, From}, restart, _State, Data) ->
DataNext = stop_resource(Data), DataNext = stop_resource(Data),
start_resource(DataNext, From); start_resource(DataNext, From);
% Called when the resource is to be started (also used for manual reconnect) % Called when the resource is to be started (also used for manual reconnect)
handle_event({call, From}, start, State, Data) when handle_event({call, From}, start, State, Data) when
State =:= stopped orelse State =:= ?state_stopped orelse
State =:= disconnected State =:= ?state_disconnected
-> ->
start_resource(Data, From); start_resource(Data, From);
handle_event({call, From}, start, _State, _Data) -> handle_event({call, From}, start, _State, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
% Called when the resource is to be stopped % Called when the resource is to be stopped
handle_event({call, From}, stop, stopped, _Data) -> handle_event({call, From}, stop, ?state_stopped, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
handle_event({call, From}, stop, _State, Data) -> handle_event({call, From}, stop, _State, Data) ->
UpdatedData = stop_resource(Data), UpdatedData = stop_resource(Data),
{next_state, stopped, update_state(UpdatedData, Data), [{reply, From, ok}]}; {next_state, ?state_stopped, update_state(UpdatedData, Data), [{reply, From, ok}]};
% Called when a resource is to be stopped and removed. % Called when a resource is to be stopped and removed.
handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
handle_remove_event(From, ClearMetrics, Data); handle_remove_event(From, ClearMetrics, Data);
@ -448,10 +460,10 @@ handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
Reply = {ok, Group, data_record_to_external_map(Data)}, Reply = {ok, Group, data_record_to_external_map(Data)},
{keep_state_and_data, [{reply, From, Reply}]}; {keep_state_and_data, [{reply, From, Reply}]};
% Called when doing a manually health check. % Called when doing a manually health check.
handle_event({call, From}, health_check, stopped, _Data) -> handle_event({call, From}, health_check, ?state_stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}], Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions}; {keep_state_and_data, Actions};
handle_event({call, From}, {channel_health_check, _}, stopped, _Data) -> handle_event({call, From}, {channel_health_check, _}, ?state_stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}], Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions}; {keep_state_and_data, Actions};
handle_event({call, From}, health_check, _State, Data) -> handle_event({call, From}, health_check, _State, Data) ->
@ -459,47 +471,47 @@ handle_event({call, From}, health_check, _State, Data) ->
handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) -> handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
handle_manually_channel_health_check(From, Data, ChannelId); handle_manually_channel_health_check(From, Data, ChannelId);
% State: CONNECTING % State: CONNECTING
handle_event(enter, _OldState, connecting = State, Data) -> handle_event(enter, _OldState, ?state_connecting = State, Data) ->
ok = log_state_consistency(State, Data), ok = log_status_consistency(State, Data),
{keep_state_and_data, [{state_timeout, 0, health_check}]}; {keep_state_and_data, [{state_timeout, 0, health_check}]};
handle_event(internal, start_resource, connecting, Data) -> handle_event(internal, start_resource, ?state_connecting, Data) ->
start_resource(Data, undefined); start_resource(Data, undefined);
handle_event(state_timeout, health_check, connecting, Data) -> handle_event(state_timeout, health_check, ?state_connecting, Data) ->
handle_connecting_health_check(Data); handle_connecting_health_check(Data);
handle_event( handle_event(
{call, From}, {remove_channel, ChannelId}, connecting = _State, Data {call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data
) -> ) ->
handle_remove_channel(From, ChannelId, Data); handle_remove_channel(From, ChannelId, Data);
%% State: CONNECTED %% State: CONNECTED
%% The connected state is entered after a successful on_start/2 of the callback mod %% The connected state is entered after a successful on_start/2 of the callback mod
%% and successful health_checks %% and successful health_checks
handle_event(enter, _OldState, connected = State, Data) -> handle_event(enter, _OldState, ?state_connected = State, Data) ->
ok = log_state_consistency(State, Data), ok = log_status_consistency(State, Data),
_ = emqx_alarm:safe_deactivate(Data#data.id), _ = emqx_alarm:safe_deactivate(Data#data.id),
?tp(resource_connected_enter, #{}), ?tp(resource_connected_enter, #{}),
{keep_state_and_data, health_check_actions(Data)}; {keep_state_and_data, health_check_actions(Data)};
handle_event(state_timeout, health_check, connected, Data) -> handle_event(state_timeout, health_check, ?state_connected, Data) ->
handle_connected_health_check(Data); handle_connected_health_check(Data);
handle_event( handle_event(
{call, From}, {add_channel, ChannelId, Config}, connected = _State, Data {call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data
) -> ) ->
handle_add_channel(From, Data, ChannelId, Config); handle_add_channel(From, Data, ChannelId, Config);
handle_event( handle_event(
{call, From}, {remove_channel, ChannelId}, connected = _State, Data {call, From}, {remove_channel, ChannelId}, ?state_connected = _State, Data
) -> ) ->
handle_remove_channel(From, ChannelId, Data); handle_remove_channel(From, ChannelId, Data);
%% State: DISCONNECTED %% State: DISCONNECTED
handle_event(enter, _OldState, disconnected = State, Data) -> handle_event(enter, _OldState, ?state_disconnected = State, Data) ->
ok = log_state_consistency(State, Data), ok = log_status_consistency(State, Data),
?tp(resource_disconnected_enter, #{}), ?tp(resource_disconnected_enter, #{}),
{keep_state_and_data, retry_actions(Data)}; {keep_state_and_data, retry_actions(Data)};
handle_event(state_timeout, auto_retry, disconnected, Data) -> handle_event(state_timeout, auto_retry, ?state_disconnected, Data) ->
?tp(resource_auto_reconnect, #{}), ?tp(resource_auto_reconnect, #{}),
start_resource(Data, undefined); start_resource(Data, undefined);
%% State: STOPPED %% State: STOPPED
%% The stopped state is entered after the resource has been explicitly stopped %% The stopped state is entered after the resource has been explicitly stopped
handle_event(enter, _OldState, stopped = State, Data) -> handle_event(enter, _OldState, ?state_stopped = State, Data) ->
ok = log_state_consistency(State, Data), ok = log_status_consistency(State, Data),
{keep_state_and_data, []}; {keep_state_and_data, []};
%% The following events can be handled in any other state %% The following events can be handled in any other state
handle_event( handle_event(
@ -529,11 +541,11 @@ handle_event(EventType, EventData, State, Data) ->
), ),
keep_state_and_data. keep_state_and_data.
log_state_consistency(State, #data{status = State} = Data) -> log_status_consistency(Status, #data{status = Status} = Data) ->
log_cache_consistency(read_cache(Data#data.id), Data); log_cache_consistency(read_cache(Data#data.id), Data);
log_state_consistency(State, Data) -> log_status_consistency(Status, Data) ->
?tp(warning, "inconsistent_state", #{ ?tp(warning, "inconsistent_status", #{
state => State, status => Status,
data => emqx_utils:redact(Data) data => emqx_utils:redact(Data)
}). }).
@ -591,25 +603,25 @@ start_resource(Data, From) ->
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
{ok, ResourceState} -> {ok, ResourceState} ->
UpdatedData1 = Data#data{status = connecting, state = ResourceState}, UpdatedData1 = Data#data{status = ?status_connecting, state = ResourceState},
%% Perform an initial health_check immediately before transitioning into a connected state %% Perform an initial health_check immediately before transitioning into a connected state
UpdatedData2 = add_channels(UpdatedData1), UpdatedData2 = add_channels(UpdatedData1),
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
{next_state, connecting, update_state(UpdatedData2, Data), Actions}; {next_state, ?state_connecting, update_state(UpdatedData2, Data), Actions};
{error, Reason} = Err -> {error, Reason} = Err ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => "start_resource_failed", msg => "start_resource_failed",
id => Data#data.id, id => Data#data.id,
reason => Reason reason => Reason
}), }),
_ = maybe_alarm(disconnected, Data#data.id, Err, Data#data.error), _ = maybe_alarm(?status_disconnected, Data#data.id, Err, Data#data.error),
%% Add channels and raise alarms %% Add channels and raise alarms
NewData1 = channels_health_check(disconnected, add_channels(Data)), NewData1 = channels_health_check(?status_disconnected, add_channels(Data)),
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made. %% so that the Reason can be returned when the verification call is made.
NewData2 = NewData1#data{status = disconnected, error = Err}, NewData2 = NewData1#data{status = ?status_disconnected, error = Err},
Actions = maybe_reply(retry_actions(NewData2), From, Err), Actions = maybe_reply(retry_actions(NewData2), From, Err),
{next_state, disconnected, update_state(NewData2, Data), Actions} {next_state, ?state_disconnected, update_state(NewData2, Data), Actions}
end. end.
add_channels(Data) -> add_channels(Data) ->
@ -666,13 +678,13 @@ add_channels_in_list([{ChannelID, ChannelConfig} | Rest], Data) ->
added_channels = NewAddedChannelsMap added_channels = NewAddedChannelsMap
}, },
%% Raise an alarm since the channel could not be added %% Raise an alarm since the channel could not be added
_ = maybe_alarm(disconnected, ChannelID, Error, no_prev_error), _ = maybe_alarm(?status_disconnected, ChannelID, Error, no_prev_error),
add_channels_in_list(Rest, NewData) add_channels_in_list(Rest, NewData)
end. end.
maybe_stop_resource(#data{status = Status} = Data) when Status /= stopped -> maybe_stop_resource(#data{status = Status} = Data) when Status =/= ?rm_status_stopped ->
stop_resource(Data); stop_resource(Data);
maybe_stop_resource(#data{status = stopped} = Data) -> maybe_stop_resource(#data{status = ?rm_status_stopped} = Data) ->
Data. Data.
stop_resource(#data{state = ResState, id = ResId} = Data) -> stop_resource(#data{state = ResState, id = ResId} = Data) ->
@ -691,7 +703,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
end, end,
_ = maybe_clear_alarm(ResId), _ = maybe_clear_alarm(ResId),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
NewData#data{status = stopped}. NewData#data{status = ?rm_status_stopped}.
remove_channels(Data) -> remove_channels(Data) ->
Channels = maps:keys(Data#data.added_channels), Channels = maps:keys(Data#data.added_channels),
@ -706,7 +718,7 @@ remove_channels_in_list([ChannelID | Rest], Data, KeepInChannelMap) ->
true -> true ->
AddedChannelsMap; AddedChannelsMap;
false -> false ->
maybe_clear_alarm(ChannelID), _ = maybe_clear_alarm(ChannelID),
maps:remove(ChannelID, AddedChannelsMap) maps:remove(ChannelID, AddedChannelsMap)
end, end,
case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of case safe_call_remove_channel(Data#data.id, Data#data.mod, Data#data.state, ChannelID) of
@ -858,13 +870,15 @@ handle_connecting_health_check(Data) ->
with_health_check( with_health_check(
Data, Data,
fun fun
(connected, UpdatedData) -> (?status_connected, UpdatedData) ->
{next_state, connected, channels_health_check(connected, UpdatedData)}; {next_state, ?state_connected,
(connecting, UpdatedData) -> channels_health_check(?status_connected, UpdatedData)};
{keep_state, channels_health_check(connecting, UpdatedData), (?status_connecting, UpdatedData) ->
{keep_state, channels_health_check(?status_connecting, UpdatedData),
health_check_actions(UpdatedData)}; health_check_actions(UpdatedData)};
(disconnected, UpdatedData) -> (?status_disconnected, UpdatedData) ->
{next_state, disconnected, channels_health_check(disconnected, UpdatedData)} {next_state, ?state_disconnected,
channels_health_check(?status_disconnected, UpdatedData)}
end end
). ).
@ -872,8 +886,8 @@ handle_connected_health_check(Data) ->
with_health_check( with_health_check(
Data, Data,
fun fun
(connected, UpdatedData0) -> (?status_connected, UpdatedData0) ->
UpdatedData1 = channels_health_check(connected, UpdatedData0), UpdatedData1 = channels_health_check(?status_connected, UpdatedData0),
{keep_state, UpdatedData1, health_check_actions(UpdatedData1)}; {keep_state, UpdatedData1, health_check_actions(UpdatedData1)};
(Status, UpdatedData) -> (Status, UpdatedData) ->
?SLOG(warning, #{ ?SLOG(warning, #{
@ -881,6 +895,10 @@ handle_connected_health_check(Data) ->
id => Data#data.id, id => Data#data.id,
status => Status status => Status
}), }),
%% Note: works because, coincidentally, channel/resource status is a
%% subset of resource manager state... But there should be a conversion
%% between the two here, as resource manager also has `stopped', which is
%% not a valid status at the time of writing.
{next_state, Status, channels_health_check(Status, UpdatedData)} {next_state, Status, channels_health_check(Status, UpdatedData)}
end end
). ).
@ -898,7 +916,8 @@ with_health_check(#data{error = PrevError} = Data, Func) ->
}, },
Func(Status, update_state(UpdatedData, Data)). Func(Status, update_state(UpdatedData, Data)).
channels_health_check(connected = _ResourceStatus, Data0) -> -spec channels_health_check(resource_status(), data()) -> data().
channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
Channels = maps:to_list(Data0#data.added_channels), Channels = maps:to_list(Data0#data.added_channels),
%% All channels with a stutus different from connected or connecting are %% All channels with a stutus different from connected or connecting are
%% not added %% not added
@ -914,7 +933,7 @@ channels_health_check(connected = _ResourceStatus, Data0) ->
%% Now that we have done the adding, we can get the status of all channels %% Now that we have done the adding, we can get the status of all channels
Data2 = channel_status_for_all_channels(Data1), Data2 = channel_status_for_all_channels(Data1),
update_state(Data2, Data0); update_state(Data2, Data0);
channels_health_check(connecting, Data0) -> channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
%% Whenever the resource is connecting: %% Whenever the resource is connecting:
%% 1. Change the status of all added channels to connecting %% 1. Change the status of all added channels to connecting
%% 2. Raise alarms (TODO: if it is a probe we should not raise alarms) %% 2. Raise alarms (TODO: if it is a probe we should not raise alarms)
@ -926,7 +945,7 @@ channels_health_check(connecting, Data0) ->
], ],
ChannelsWithNewStatuses = ChannelsWithNewStatuses =
[ [
{ChannelId, channel_status({connecting, resource_is_connecting})} {ChannelId, channel_status({?status_connecting, resource_is_connecting})}
|| ChannelId <- ChannelsToChangeStatusFor || ChannelId <- ChannelsToChangeStatusFor
], ],
%% Update the channels map %% Update the channels map
@ -945,13 +964,13 @@ channels_health_check(connecting, Data0) ->
%% Raise alarms for all channels %% Raise alarms for all channels
lists:foreach( lists:foreach(
fun({ChannelId, Status, PrevStatus}) -> fun({ChannelId, Status, PrevStatus}) ->
maybe_alarm(connecting, ChannelId, Status, PrevStatus) maybe_alarm(?status_connecting, ChannelId, Status, PrevStatus)
end, end,
ChannelsWithNewAndPrevErrorStatuses ChannelsWithNewAndPrevErrorStatuses
), ),
Data1 = Data0#data{added_channels = NewChannels}, Data1 = Data0#data{added_channels = NewChannels},
update_state(Data1, Data0); update_state(Data1, Data0);
channels_health_check(ResourceStatus, Data0) -> channels_health_check(ConnectorStatus, Data0) ->
%% Whenever the resource is not connected and not connecting: %% Whenever the resource is not connected and not connecting:
%% 1. Remove all added channels %% 1. Remove all added channels
%% 2. Change the status to an error status %% 2. Change the status to an error status
@ -969,7 +988,7 @@ channels_health_check(ResourceStatus, Data0) ->
channel_status( channel_status(
{error, {error,
resource_not_connected_channel_error_msg( resource_not_connected_channel_error_msg(
ResourceStatus, ConnectorStatus,
ChannelId, ChannelId,
Data1 Data1
)} )}
@ -1025,7 +1044,7 @@ channel_status_for_all_channels(Data) ->
%% Raise/clear alarms %% Raise/clear alarms
lists:foreach( lists:foreach(
fun fun
({ID, _OldStatus, #{status := connected}}) -> ({ID, _OldStatus, #{status := ?status_connected}}) ->
_ = maybe_clear_alarm(ID); _ = maybe_clear_alarm(ID);
({ID, OldStatus, NewStatus}) -> ({ID, OldStatus, NewStatus}) ->
_ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus) _ = maybe_alarm(NewStatus, ID, NewStatus, OldStatus)
@ -1071,9 +1090,11 @@ get_config_from_map_or_channel_status(ChannelId, ChannelIdToConfig, ChannelStatu
Config Config
end. end.
-spec update_state(data()) -> data().
update_state(Data) -> update_state(Data) ->
update_state(Data, undefined). update_state(Data, undefined).
-spec update_state(data(), data() | undefined) -> data().
update_state(DataWas, DataWas) -> update_state(DataWas, DataWas) ->
DataWas; DataWas;
update_state(Data, _DataWas) -> update_state(Data, _DataWas) ->
@ -1083,7 +1104,8 @@ update_state(Data, _DataWas) ->
health_check_interval(Opts) -> health_check_interval(Opts) ->
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
maybe_alarm(connected, _ResId, _Error, _PrevError) -> -spec maybe_alarm(resource_status(), resource_id(), _Error :: term(), _PrevError :: term()) -> ok.
maybe_alarm(?status_connected, _ResId, _Error, _PrevError) ->
ok; ok;
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error, _PrevError) -> maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error, _PrevError) ->
ok; ok;
@ -1095,7 +1117,7 @@ maybe_alarm(_Status, ResId, Error, _PrevError) ->
case Error of case Error of
{error, undefined} -> <<"Unknown reason">>; {error, undefined} -> <<"Unknown reason">>;
{error, Reason} -> emqx_utils:readable_error_msg(Reason); {error, Reason} -> emqx_utils:readable_error_msg(Reason);
Error -> emqx_utils:readable_error_msg(Error) _ -> emqx_utils:readable_error_msg(Error)
end, end,
emqx_alarm:safe_activate( emqx_alarm:safe_activate(
ResId, ResId,
@ -1104,7 +1126,8 @@ maybe_alarm(_Status, ResId, Error, _PrevError) ->
), ),
?tp(resource_activate_alarm, #{resource_id => ResId}). ?tp(resource_activate_alarm, #{resource_id => ResId}).
maybe_resume_resource_workers(ResId, connected) -> -spec maybe_resume_resource_workers(resource_id(), resource_status()) -> ok.
maybe_resume_resource_workers(ResId, ?status_connected) ->
lists:foreach( lists:foreach(
fun emqx_resource_buffer_worker:resume/1, fun emqx_resource_buffer_worker:resume/1,
emqx_resource_buffer_worker_sup:worker_pids(ResId) emqx_resource_buffer_worker_sup:worker_pids(ResId)
@ -1112,6 +1135,7 @@ maybe_resume_resource_workers(ResId, connected) ->
maybe_resume_resource_workers(_, _) -> maybe_resume_resource_workers(_, _) ->
ok. ok.
-spec maybe_clear_alarm(resource_id()) -> ok | {error, not_found}.
maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) -> maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->
ok; ok;
maybe_clear_alarm(ResId) -> maybe_clear_alarm(ResId) ->
@ -1132,9 +1156,9 @@ parse_health_check_result({error, Error}, Data) ->
reason => Error reason => Error
} }
), ),
{disconnected, Data#data.state, {error, Error}}. {?status_disconnected, Data#data.state, {error, Error}}.
status_to_error(connected) -> status_to_error(?status_connected) ->
undefined; undefined;
status_to_error(_) -> status_to_error(_) ->
{error, undefined}. {error, undefined}.
@ -1170,9 +1194,9 @@ do_wait_for_ready(_ResId, 0) ->
timeout; timeout;
do_wait_for_ready(ResId, Retry) -> do_wait_for_ready(ResId, Retry) ->
case try_read_cache(ResId) of case try_read_cache(ResId) of
#data{status = connected} -> #data{status = ?status_connected} ->
ok; ok;
#data{status = disconnected, error = Err} -> #data{status = ?status_disconnected, error = Err} ->
{error, external_error(Err)}; {error, external_error(Err)};
_ -> _ ->
timer:sleep(?WAIT_FOR_RESOURCE_DELAY), timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
@ -1203,7 +1227,7 @@ channel_status() ->
%% - connected: the channel is added to the resource, the resource is %% - connected: the channel is added to the resource, the resource is
%% connected and the on_channel_get_status callback has returned %% connected and the on_channel_get_status callback has returned
%% connected. The error field should be undefined. %% connected. The error field should be undefined.
status => disconnected, status => ?status_disconnected,
error => not_added_yet error => not_added_yet
}. }.
@ -1212,20 +1236,20 @@ channel_status() ->
%% anywhere else in that case. %% anywhere else in that case.
channel_status_new_with_config(Config) -> channel_status_new_with_config(Config) ->
#{ #{
status => disconnected, status => ?status_disconnected,
error => not_added_yet, error => not_added_yet,
config => Config config => Config
}. }.
channel_status_new_waiting_for_health_check() -> channel_status_new_waiting_for_health_check() ->
#{ #{
status => connecting, status => ?status_connecting,
error => no_health_check_yet error => no_health_check_yet
}. }.
channel_status({connecting, Error}) -> channel_status({?status_connecting, Error}) ->
#{ #{
status => connecting, status => ?status_connecting,
error => Error error => Error
}; };
channel_status(?status_disconnected) -> channel_status(?status_disconnected) ->
@ -1233,40 +1257,41 @@ channel_status(?status_disconnected) ->
status => ?status_disconnected, status => ?status_disconnected,
error => <<"Disconnected for unknown reason">> error => <<"Disconnected for unknown reason">>
}; };
channel_status(connecting) -> channel_status(?status_connecting) ->
#{ #{
status => connecting, status => ?status_connecting,
error => <<"Not connected for unknown reason">> error => <<"Not connected for unknown reason">>
}; };
channel_status(connected) -> channel_status(?status_connected) ->
#{ #{
status => connected, status => ?status_connected,
error => undefined error => undefined
}; };
%% Probably not so useful but it is permitted to set an error even when the %% Probably not so useful but it is permitted to set an error even when the
%% status is connected %% status is connected
channel_status({connected, Error}) -> channel_status({?status_connected, Error}) ->
#{ #{
status => connected, status => ?status_connected,
error => Error error => Error
}; };
channel_status({error, Reason}) -> channel_status({error, Reason}) ->
#{ #{
status => disconnected, status => ?status_disconnected,
error => Reason error => Reason
}. }.
channel_status_is_channel_added(#{ channel_status_is_channel_added(#{
status := connected status := ?status_connected
}) -> }) ->
true; true;
channel_status_is_channel_added(#{ channel_status_is_channel_added(#{
status := connecting status := ?status_connecting
}) -> }) ->
true; true;
channel_status_is_channel_added(_Status) -> channel_status_is_channel_added(_Status) ->
false. false.
-spec add_channel_status_if_not_exists(data(), channel_id(), resource_state()) -> data().
add_channel_status_if_not_exists(Data, ChannelId, State) -> add_channel_status_if_not_exists(Data, ChannelId, State) ->
Channels = Data#data.added_channels, Channels = Data#data.added_channels,
case maps:is_key(ChannelId, Channels) of case maps:is_key(ChannelId, Channels) of
@ -1275,6 +1300,12 @@ add_channel_status_if_not_exists(Data, ChannelId, State) ->
false -> false ->
ChannelStatus = channel_status({error, resource_not_operational}), ChannelStatus = channel_status({error, resource_not_operational}),
NewChannels = maps:put(ChannelId, ChannelStatus, Channels), NewChannels = maps:put(ChannelId, ChannelStatus, Channels),
maybe_alarm(State, ChannelId, ChannelStatus, no_prev), ResStatus = state_to_status(State),
maybe_alarm(ResStatus, ChannelId, ChannelStatus, no_prev),
Data#data{added_channels = NewChannels} Data#data{added_channels = NewChannels}
end. end.
state_to_status(?state_stopped) -> ?rm_status_stopped;
state_to_status(?state_connected) -> ?status_connected;
state_to_status(?state_connecting) -> ?status_connecting;
state_to_status(?state_disconnected) -> ?status_disconnected.

View File

@ -115,7 +115,7 @@ t_create_remove(_) ->
?assertNot(is_process_alive(Pid)) ?assertNot(is_process_alive(Pid))
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)), ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)) ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end end
). ).
@ -174,7 +174,7 @@ t_create_remove_local(_) ->
?assertNot(is_process_alive(Pid)) ?assertNot(is_process_alive(Pid))
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)), ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)) ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end end
). ).
@ -219,7 +219,7 @@ t_do_not_start_after_created(_) ->
?assertNot(is_process_alive(Pid2)) ?assertNot(is_process_alive(Pid2))
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)), ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)) ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end end
). ).
@ -855,7 +855,7 @@ t_healthy_timeout(_) ->
?assertEqual(ok, emqx_resource:remove_local(?ID)) ?assertEqual(ok, emqx_resource:remove_local(?ID))
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)), ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)) ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end end
). ).
@ -894,7 +894,7 @@ t_healthy(_) ->
?assertEqual(ok, emqx_resource:remove_local(?ID)) ?assertEqual(ok, emqx_resource:remove_local(?ID))
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)), ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)) ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end end
). ).
@ -1006,7 +1006,7 @@ t_stop_start(_) ->
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)), ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)) ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end end
). ).
@ -1064,7 +1064,7 @@ t_stop_start_local(_) ->
?assert(is_process_alive(Pid1)) ?assert(is_process_alive(Pid1))
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)), ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)) ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end end
). ).
@ -1269,7 +1269,7 @@ t_health_check_disconnected(_) ->
) )
end, end,
fun(Trace) -> fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)), ?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)) ?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end end
). ).