perf(emqx_resource): don't reactivate alarms on reoccurring errors
Avoid unnecessary calls to activate an alarm if it has been already activated. Fixes: EMQX-9529/#10357
This commit is contained in:
parent
3f18c5e2e3
commit
b5eda9f0d1
|
@ -511,10 +511,10 @@ start_resource(Data, From) ->
|
||||||
id => Data#data.id,
|
id => Data#data.id,
|
||||||
reason => Reason
|
reason => Reason
|
||||||
}),
|
}),
|
||||||
_ = maybe_alarm(disconnected, Data#data.id, Data#data.error),
|
_ = maybe_alarm(disconnected, Data#data.id, Err, Data#data.error),
|
||||||
%% Keep track of the error reason why the connection did not work
|
%% Keep track of the error reason why the connection did not work
|
||||||
%% so that the Reason can be returned when the verification call is made.
|
%% so that the Reason can be returned when the verification call is made.
|
||||||
UpdatedData = Data#data{status = disconnected, error = Reason},
|
UpdatedData = Data#data{status = disconnected, error = Err},
|
||||||
Actions = maybe_reply(retry_actions(UpdatedData), From, Err),
|
Actions = maybe_reply(retry_actions(UpdatedData), From, Err),
|
||||||
{next_state, disconnected, update_state(UpdatedData, Data), Actions}
|
{next_state, disconnected, update_state(UpdatedData, Data), Actions}
|
||||||
end.
|
end.
|
||||||
|
@ -582,11 +582,11 @@ handle_connected_health_check(Data) ->
|
||||||
|
|
||||||
with_health_check(#data{state = undefined} = Data, Func) ->
|
with_health_check(#data{state = undefined} = Data, Func) ->
|
||||||
Func(disconnected, Data);
|
Func(disconnected, Data);
|
||||||
with_health_check(Data, Func) ->
|
with_health_check(#data{error = PrevError} = Data, Func) ->
|
||||||
ResId = Data#data.id,
|
ResId = Data#data.id,
|
||||||
HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
|
HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
|
||||||
{Status, NewState, Err} = parse_health_check_result(HCRes, Data),
|
{Status, NewState, Err} = parse_health_check_result(HCRes, Data),
|
||||||
_ = maybe_alarm(Status, ResId, Err),
|
_ = maybe_alarm(Status, ResId, Err, PrevError),
|
||||||
ok = maybe_resume_resource_workers(ResId, Status),
|
ok = maybe_resume_resource_workers(ResId, Status),
|
||||||
UpdatedData = Data#data{
|
UpdatedData = Data#data{
|
||||||
state = NewState, status = Status, error = Err
|
state = NewState, status = Status, error = Err
|
||||||
|
@ -605,21 +605,25 @@ update_state(Data, _DataWas) ->
|
||||||
health_check_interval(Opts) ->
|
health_check_interval(Opts) ->
|
||||||
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
|
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
|
||||||
|
|
||||||
maybe_alarm(connected, _ResId, _Error) ->
|
maybe_alarm(connected, _ResId, _Error, _PrevError) ->
|
||||||
ok;
|
ok;
|
||||||
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error) ->
|
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error, _PrevError) ->
|
||||||
ok;
|
ok;
|
||||||
maybe_alarm(_Status, ResId, Error) ->
|
%% Assume that alarm is already active
|
||||||
|
maybe_alarm(_Status, _ResId, Error, Error) ->
|
||||||
|
ok;
|
||||||
|
maybe_alarm(_Status, ResId, Error, _PrevError) ->
|
||||||
HrError =
|
HrError =
|
||||||
case Error of
|
case Error of
|
||||||
undefined -> <<"Unknown reason">>;
|
{error, undefined} -> <<"Unknown reason">>;
|
||||||
_Else -> emqx_utils:readable_error_msg(Error)
|
{error, Reason} -> emqx_utils:readable_error_msg(Reason)
|
||||||
end,
|
end,
|
||||||
emqx_alarm:activate(
|
emqx_alarm:activate(
|
||||||
ResId,
|
ResId,
|
||||||
#{resource_id => ResId, reason => resource_down},
|
#{resource_id => ResId, reason => resource_down},
|
||||||
<<"resource down: ", HrError/binary>>
|
<<"resource down: ", HrError/binary>>
|
||||||
).
|
),
|
||||||
|
?tp(resource_activate_alarm, #{resource_id => ResId}).
|
||||||
|
|
||||||
maybe_resume_resource_workers(ResId, connected) ->
|
maybe_resume_resource_workers(ResId, connected) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
@ -635,11 +639,11 @@ maybe_clear_alarm(ResId) ->
|
||||||
emqx_alarm:deactivate(ResId).
|
emqx_alarm:deactivate(ResId).
|
||||||
|
|
||||||
parse_health_check_result(Status, Data) when ?IS_STATUS(Status) ->
|
parse_health_check_result(Status, Data) when ?IS_STATUS(Status) ->
|
||||||
{Status, Data#data.state, undefined};
|
{Status, Data#data.state, status_to_error(Status)};
|
||||||
parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
|
parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
|
||||||
{Status, NewState, undefined};
|
{Status, NewState, status_to_error(Status)};
|
||||||
parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
|
parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
|
||||||
{Status, NewState, Error};
|
{Status, NewState, {error, Error}};
|
||||||
parse_health_check_result({error, Error}, Data) ->
|
parse_health_check_result({error, Error}, Data) ->
|
||||||
?SLOG(
|
?SLOG(
|
||||||
error,
|
error,
|
||||||
|
@ -649,7 +653,16 @@ parse_health_check_result({error, Error}, Data) ->
|
||||||
reason => Error
|
reason => Error
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
{disconnected, Data#data.state, Error}.
|
{disconnected, Data#data.state, {error, Error}}.
|
||||||
|
|
||||||
|
status_to_error(connected) ->
|
||||||
|
undefined;
|
||||||
|
status_to_error(_) ->
|
||||||
|
{error, undefined}.
|
||||||
|
|
||||||
|
%% Compatibility
|
||||||
|
external_error({error, Reason}) -> Reason;
|
||||||
|
external_error(Other) -> Other.
|
||||||
|
|
||||||
maybe_reply(Actions, undefined, _Reply) ->
|
maybe_reply(Actions, undefined, _Reply) ->
|
||||||
Actions;
|
Actions;
|
||||||
|
@ -660,7 +673,7 @@ maybe_reply(Actions, From, Reply) ->
|
||||||
data_record_to_external_map(Data) ->
|
data_record_to_external_map(Data) ->
|
||||||
#{
|
#{
|
||||||
id => Data#data.id,
|
id => Data#data.id,
|
||||||
error => Data#data.error,
|
error => external_error(Data#data.error),
|
||||||
mod => Data#data.mod,
|
mod => Data#data.mod,
|
||||||
callback_mode => Data#data.callback_mode,
|
callback_mode => Data#data.callback_mode,
|
||||||
query_mode => Data#data.query_mode,
|
query_mode => Data#data.query_mode,
|
||||||
|
@ -679,8 +692,8 @@ do_wait_for_ready(ResId, Retry) ->
|
||||||
case read_cache(ResId) of
|
case read_cache(ResId) of
|
||||||
{_Group, #data{status = connected}} ->
|
{_Group, #data{status = connected}} ->
|
||||||
ok;
|
ok;
|
||||||
{_Group, #data{status = disconnected, error = Reason}} ->
|
{_Group, #data{status = disconnected, error = Err}} ->
|
||||||
{error, Reason};
|
{error, external_error(Err)};
|
||||||
_ ->
|
_ ->
|
||||||
timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
|
timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
|
||||||
do_wait_for_ready(ResId, Retry - 1)
|
do_wait_for_ready(ResId, Retry - 1)
|
||||||
|
|
|
@ -62,6 +62,7 @@ set_callback_mode(Mode) ->
|
||||||
persistent_term:put(?CM_KEY, Mode).
|
persistent_term:put(?CM_KEY, Mode).
|
||||||
|
|
||||||
on_start(_InstId, #{create_error := true}) ->
|
on_start(_InstId, #{create_error := true}) ->
|
||||||
|
?tp(connector_demo_start_error, #{}),
|
||||||
error("some error");
|
error("some error");
|
||||||
on_start(InstId, #{name := Name} = Opts) ->
|
on_start(InstId, #{name := Name} = Opts) ->
|
||||||
Register = maps:get(register, Opts, false),
|
Register = maps:get(register, Opts, false),
|
||||||
|
@ -243,6 +244,7 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
on_get_status(_InstId, #{health_check_error := true}) ->
|
on_get_status(_InstId, #{health_check_error := true}) ->
|
||||||
|
?tp(connector_demo_health_check_error, #{}),
|
||||||
disconnected;
|
disconnected;
|
||||||
on_get_status(_InstId, #{pid := Pid}) ->
|
on_get_status(_InstId, #{pid := Pid}) ->
|
||||||
timer:sleep(300),
|
timer:sleep(300),
|
||||||
|
|
|
@ -2635,7 +2635,6 @@ t_call_mode_uncoupled_from_query_mode(_Config) ->
|
||||||
Trace2
|
Trace2
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
@ -2796,6 +2795,42 @@ t_late_call_reply(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_resource_create_error_activate_alarm_once(_) ->
|
||||||
|
do_t_resource_activate_alarm_once(
|
||||||
|
#{name => test_resource, create_error => true},
|
||||||
|
connector_demo_start_error
|
||||||
|
).
|
||||||
|
|
||||||
|
t_resource_health_check_error_activate_alarm_once(_) ->
|
||||||
|
do_t_resource_activate_alarm_once(
|
||||||
|
#{name => test_resource, health_check_error => true},
|
||||||
|
connector_demo_health_check_error
|
||||||
|
).
|
||||||
|
|
||||||
|
do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?wait_async_action(
|
||||||
|
emqx_resource:create_local(
|
||||||
|
?ID,
|
||||||
|
?DEFAULT_RESOURCE_GROUP,
|
||||||
|
?TEST_RESOURCE,
|
||||||
|
ResourceConfig,
|
||||||
|
#{auto_restart_interval => 100, health_check_interval => 100}
|
||||||
|
),
|
||||||
|
#{?snk_kind := resource_activate_alarm, resource_id := ?ID}
|
||||||
|
),
|
||||||
|
?assertMatch([#{activated := true, name := ?ID}], emqx_alarm:get_alarms(activated)),
|
||||||
|
{ok, SubRef} = snabbkaffe:subscribe(
|
||||||
|
?match_event(#{?snk_kind := SubscribeEvent}), 4, 7000
|
||||||
|
),
|
||||||
|
?assertMatch({ok, [_, _, _, _]}, snabbkaffe:receive_events(SubRef))
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch([_], ?of_kind(resource_activate_alarm, Trace))
|
||||||
|
end
|
||||||
|
).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
Loading…
Reference in New Issue