fix: also alarm resource down when start resource failed

This commit is contained in:
Shawn 2022-06-01 15:36:51 +08:00
parent 69fba6958b
commit b7f27157e5
2 changed files with 38 additions and 30 deletions

View File

@ -33,7 +33,7 @@
-type create_opts() :: #{
health_check_interval => integer(),
health_check_timeout => integer(),
waiting_connect_complete => integer(),
wait_for_resource_ready => integer(),
auto_retry_interval => integer()
}.
-type after_query() ::

View File

@ -267,13 +267,13 @@ handle_event({call, From}, restart, _State, Data) ->
_ = stop_resource(Data),
start_resource(Data, From);
% Called when the resource is to be started
handle_event({call, From}, start, _State, #data{status = disconnected} = Data) ->
handle_event({call, From}, start, stopped, Data) ->
start_resource(Data, From);
handle_event({call, From}, start, _State, _Data) ->
{keep_state_and_data, [{reply, From, ok}]};
% Called when the resource is to be stopped
handle_event({call, From}, stop, _State, #data{status = disconnected} = Data) ->
{next_state, stopped, Data, [{reply, From, ok}]};
handle_event({call, From}, stop, stopped, _Data) ->
{keep_state_and_data, [{reply, From, ok}]};
handle_event({call, From}, stop, _State, Data) ->
Result = stop_resource(Data),
UpdatedData = Data#data{status = disconnected},
@ -281,21 +281,27 @@ handle_event({call, From}, stop, _State, Data) ->
% Called when a resource is to be stopped and removed.
handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
handle_remove_event(From, ClearMetrics, Data);
% Called when the state of the resource is being looked up.
% Called when the state-data of the resource is being looked up.
handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)},
{keep_state_and_data, [{reply, From, Reply}]};
% Connecting state enter
handle_event(internal, try_connect, connecting, Data) ->
start_resource(Data, undefined);
% Called when doing a manually health check.
handle_event({call, From}, health_check, stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions};
handle_event({call, From}, health_check, _State, Data) ->
handle_manually_health_check(From, Data);
% State: CONNECTING
handle_event(enter, _OldState, connecting, Data) ->
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
Actions = [{state_timeout, 0, health_check}],
{next_state, connecting, Data, Actions};
% Connecting state health_check timeouts.
{keep_state_and_data, Actions};
handle_event(internal, try_connect, connecting, Data) ->
start_resource(Data, undefined);
handle_event(state_timeout, health_check, connecting, Data) ->
connecting_health_check(Data);
%% The connected state is entered after a successful start of the callback mod
handle_connecting_health_check(Data);
%% State: CONNECTED
%% The connected state is entered after a successful on_start/2 of the callback mod
%% and successful health_checks
handle_event(enter, _OldState, connected, Data) ->
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
@ -303,22 +309,19 @@ handle_event(enter, _OldState, connected, Data) ->
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
{next_state, connected, Data, Actions};
handle_event(state_timeout, health_check, connected, Data) ->
perform_connected_health_check(Data);
handle_connected_health_check(Data);
%% State: DISCONNECTED
handle_event(enter, _OldState, disconnected, Data) ->
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
handle_disconnected_state_enter(Data);
handle_event(state_timeout, auto_retry, disconnected, Data) ->
start_resource(Data, undefined);
%% State: STOPPED
%% The stopped state is entered after the resource has been explicitly stopped
handle_event(enter, _OldState, stopped, Data) ->
UpdatedData = Data#data{status = disconnected},
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}),
{next_state, stopped, UpdatedData};
% Resource has been explicitly stopped, so return that as the error reason.
handle_event({call, From}, health_check, stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions};
handle_event({call, From}, health_check, _State, Data) ->
handle_health_check_request(From, Data);
% Ignore all other events
handle_event(EventType, EventData, State, Data) ->
?SLOG(
@ -331,7 +334,7 @@ handle_event(EventType, EventData, State, Data) ->
data => Data
}
),
{next_state, State, Data}.
keep_state_and_data.
%%------------------------------------------------------------------------------
%% internal functions
@ -364,6 +367,7 @@ start_resource(Data, From) ->
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
{next_state, connecting, UpdatedData, Actions};
{error, Reason} = Err ->
_ = maybe_alarm(disconnected, Data#data.id),
%% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made.
UpdatedData = Data#data{status = disconnected, error = Reason},
@ -371,25 +375,29 @@ start_resource(Data, From) ->
{next_state, disconnected, UpdatedData, Actions}
end.
stop_resource(#data{state = undefined} = _Data) ->
stop_resource(#data{state = undefined, id = ResId} = _Data) ->
_ = maybe_clear_alarm(ResId),
ok;
stop_resource(Data) ->
Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state),
%% We don't care the return value of the Mod:on_stop/2.
%% The callback mod should make sure the resource is stopped after on_stop/2
%% is returned.
_ = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state),
_ = maybe_clear_alarm(Data#data.id),
Result.
ok.
proc_name(Id) ->
Module = atom_to_binary(?MODULE),
Connector = <<"_">>,
binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>).
handle_health_check_request(From, Data) ->
handle_manually_health_check(From, Data) ->
with_health_check(Data, fun(Status, UpdatedData) ->
Actions = [{reply, From, {ok, Status}}],
{next_state, Status, UpdatedData, Actions}
end).
connecting_health_check(Data) ->
handle_connecting_health_check(Data) ->
with_health_check(
Data,
fun
@ -403,7 +411,7 @@ connecting_health_check(Data) ->
end
).
perform_connected_health_check(Data) ->
handle_connected_health_check(Data) ->
with_health_check(
Data,
fun
@ -420,18 +428,18 @@ with_health_check(Data, Func) ->
ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state),
_ = maybe_alarm_resource_down(Status, ResId),
_ = maybe_alarm(Status, ResId),
UpdatedData = Data#data{
state = NewState, status = Status, error = Err
},
ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}),
Func(Status, UpdatedData).
maybe_alarm_resource_down(connected, _ResId) ->
maybe_alarm(connected, _ResId) ->
ok;
maybe_alarm_resource_down(_Status, <<?TEST_ID_PREFIX, _/binary>>) ->
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>) ->
ok;
maybe_alarm_resource_down(_Status, ResId) ->
maybe_alarm(_Status, ResId) ->
emqx_alarm:activate(
ResId,
#{resource_id => ResId, reason => resource_down},