Merge pull request #10407 from SergeTupchiy/EMQX-9529-resource-manager-crash-on-alarm-timeout

fix(emqx_resource): call emqx_alarm safely and don't reactivate alarm on reoccurring errors
This commit is contained in:
SergeTupchiy 2023-04-20 17:53:56 +03:00 committed by GitHub
commit b38ae7f78f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 111 additions and 36 deletions

View File

@ -42,7 +42,9 @@
get_alarms/0, get_alarms/0,
get_alarms/1, get_alarms/1,
format/1, format/1,
format/2 format/2,
safe_activate/3,
safe_deactivate/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -57,7 +59,6 @@
%% Internal exports (RPC) %% Internal exports (RPC)
-export([ -export([
create_activate_alarm/3,
do_get_alarms/0 do_get_alarms/0
]). ]).
@ -123,6 +124,9 @@ activate(Name, Details) ->
activate(Name, Details, Message) -> activate(Name, Details, Message) ->
gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}). gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}).
safe_activate(Name, Details, Message) ->
safe_call({activate_alarm, Name, Details, Message}).
-spec ensure_deactivated(binary() | atom()) -> ok. -spec ensure_deactivated(binary() | atom()) -> ok.
ensure_deactivated(Name) -> ensure_deactivated(Name) ->
ensure_deactivated(Name, no_details). ensure_deactivated(Name, no_details).
@ -155,6 +159,9 @@ deactivate(Name, Details) ->
deactivate(Name, Details, Message) -> deactivate(Name, Details, Message) ->
gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}). gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}).
safe_deactivate(Name) ->
safe_call({deactivate_alarm, Name, no_details, <<"">>}).
-spec delete_all_deactivated_alarms() -> ok. -spec delete_all_deactivated_alarms() -> ok.
delete_all_deactivated_alarms() -> delete_all_deactivated_alarms() ->
gen_server:call(?MODULE, delete_all_deactivated_alarms). gen_server:call(?MODULE, delete_all_deactivated_alarms).
@ -218,17 +225,12 @@ init([]) ->
{ok, #{}, get_validity_period()}. {ok, #{}, get_validity_period()}.
handle_call({activate_alarm, Name, Details, Message}, _From, State) -> handle_call({activate_alarm, Name, Details, Message}, _From, State) ->
Res = mria:transaction( case create_activate_alarm(Name, Details, Message) of
mria:local_content_shard(), {ok, Alarm} ->
fun ?MODULE:create_activate_alarm/3,
[Name, Details, Message]
),
case Res of
{atomic, Alarm} ->
do_actions(activate, Alarm, emqx:get_config([alarm, actions])), do_actions(activate, Alarm, emqx:get_config([alarm, actions])),
{reply, ok, State, get_validity_period()}; {reply, ok, State, get_validity_period()};
{aborted, Reason} -> Err ->
{reply, Reason, State, get_validity_period()} {reply, Err, State, get_validity_period()}
end; end;
handle_call({deactivate_alarm, Name, Details, Message}, _From, State) -> handle_call({deactivate_alarm, Name, Details, Message}, _From, State) ->
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
@ -283,9 +285,9 @@ get_validity_period() ->
emqx:get_config([alarm, validity_period]). emqx:get_config([alarm, validity_period]).
create_activate_alarm(Name, Details, Message) -> create_activate_alarm(Name, Details, Message) ->
case mnesia:read(?ACTIVATED_ALARM, Name) of case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
[#activated_alarm{name = Name}] -> [#activated_alarm{name = Name}] ->
mnesia:abort({error, already_existed}); {error, already_existed};
[] -> [] ->
Alarm = #activated_alarm{ Alarm = #activated_alarm{
name = Name, name = Name,
@ -293,8 +295,8 @@ create_activate_alarm(Name, Details, Message) ->
message = normalize_message(Name, iolist_to_binary(Message)), message = normalize_message(Name, iolist_to_binary(Message)),
activate_at = erlang:system_time(microsecond) activate_at = erlang:system_time(microsecond)
}, },
ok = mnesia:write(?ACTIVATED_ALARM, Alarm, write), ok = mria:dirty_write(?ACTIVATED_ALARM, Alarm),
Alarm {ok, Alarm}
end. end.
do_get_alarms() -> do_get_alarms() ->
@ -474,3 +476,19 @@ normalize_message(Name, <<"">>) ->
list_to_binary(io_lib:format("~p", [Name])); list_to_binary(io_lib:format("~p", [Name]));
normalize_message(_Name, Message) -> normalize_message(_Name, Message) ->
Message. Message.
safe_call(Req) ->
try
gen_server:call(?MODULE, Req)
catch
_:{timeout, _} = Reason ->
?SLOG(warning, #{msg => "emqx_alarm_safe_call_timeout", reason => Reason}),
{error, timeout};
_:Reason:St ->
?SLOG(error, #{
msg => "emqx_alarm_safe_call_exception",
reason => Reason,
stacktrace => St
}),
{error, Reason}
end.

View File

@ -375,7 +375,7 @@ handle_event(state_timeout, health_check, connecting, Data) ->
%% and successful health_checks %% and successful health_checks
handle_event(enter, _OldState, connected = State, Data) -> handle_event(enter, _OldState, connected = State, Data) ->
ok = log_state_consistency(State, Data), ok = log_state_consistency(State, Data),
_ = emqx_alarm:deactivate(Data#data.id), _ = emqx_alarm:safe_deactivate(Data#data.id),
?tp(resource_connected_enter, #{}), ?tp(resource_connected_enter, #{}),
{keep_state_and_data, health_check_actions(Data)}; {keep_state_and_data, health_check_actions(Data)};
handle_event(state_timeout, health_check, connected, Data) -> handle_event(state_timeout, health_check, connected, Data) ->
@ -511,10 +511,10 @@ start_resource(Data, From) ->
id => Data#data.id, id => Data#data.id,
reason => Reason reason => Reason
}), }),
_ = maybe_alarm(disconnected, Data#data.id, Data#data.error), _ = maybe_alarm(disconnected, Data#data.id, Err, Data#data.error),
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made. %% so that the Reason can be returned when the verification call is made.
UpdatedData = Data#data{status = disconnected, error = Reason}, UpdatedData = Data#data{status = disconnected, error = Err},
Actions = maybe_reply(retry_actions(UpdatedData), From, Err), Actions = maybe_reply(retry_actions(UpdatedData), From, Err),
{next_state, disconnected, update_state(UpdatedData, Data), Actions} {next_state, disconnected, update_state(UpdatedData, Data), Actions}
end. end.
@ -582,11 +582,11 @@ handle_connected_health_check(Data) ->
with_health_check(#data{state = undefined} = Data, Func) -> with_health_check(#data{state = undefined} = Data, Func) ->
Func(disconnected, Data); Func(disconnected, Data);
with_health_check(Data, Func) -> with_health_check(#data{error = PrevError} = Data, Func) ->
ResId = Data#data.id, ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data), {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
_ = maybe_alarm(Status, ResId, Err), _ = maybe_alarm(Status, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, Status), ok = maybe_resume_resource_workers(ResId, Status),
UpdatedData = Data#data{ UpdatedData = Data#data{
state = NewState, status = Status, error = Err state = NewState, status = Status, error = Err
@ -605,21 +605,25 @@ update_state(Data, _DataWas) ->
health_check_interval(Opts) -> health_check_interval(Opts) ->
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
maybe_alarm(connected, _ResId, _Error) -> maybe_alarm(connected, _ResId, _Error, _PrevError) ->
ok; ok;
maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error) -> maybe_alarm(_Status, <<?TEST_ID_PREFIX, _/binary>>, _Error, _PrevError) ->
ok; ok;
maybe_alarm(_Status, ResId, Error) -> %% Assume that alarm is already active
maybe_alarm(_Status, _ResId, Error, Error) ->
ok;
maybe_alarm(_Status, ResId, Error, _PrevError) ->
HrError = HrError =
case Error of case Error of
undefined -> <<"Unknown reason">>; {error, undefined} -> <<"Unknown reason">>;
_Else -> emqx_utils:readable_error_msg(Error) {error, Reason} -> emqx_utils:readable_error_msg(Reason)
end, end,
emqx_alarm:activate( emqx_alarm:safe_activate(
ResId, ResId,
#{resource_id => ResId, reason => resource_down}, #{resource_id => ResId, reason => resource_down},
<<"resource down: ", HrError/binary>> <<"resource down: ", HrError/binary>>
). ),
?tp(resource_activate_alarm, #{resource_id => ResId}).
maybe_resume_resource_workers(ResId, connected) -> maybe_resume_resource_workers(ResId, connected) ->
lists:foreach( lists:foreach(
@ -632,14 +636,14 @@ maybe_resume_resource_workers(_, _) ->
maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) -> maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->
ok; ok;
maybe_clear_alarm(ResId) -> maybe_clear_alarm(ResId) ->
emqx_alarm:deactivate(ResId). emqx_alarm:safe_deactivate(ResId).
parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> parse_health_check_result(Status, Data) when ?IS_STATUS(Status) ->
{Status, Data#data.state, undefined}; {Status, Data#data.state, status_to_error(Status)};
parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) -> parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
{Status, NewState, undefined}; {Status, NewState, status_to_error(Status)};
parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) -> parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
{Status, NewState, Error}; {Status, NewState, {error, Error}};
parse_health_check_result({error, Error}, Data) -> parse_health_check_result({error, Error}, Data) ->
?SLOG( ?SLOG(
error, error,
@ -649,7 +653,16 @@ parse_health_check_result({error, Error}, Data) ->
reason => Error reason => Error
} }
), ),
{disconnected, Data#data.state, Error}. {disconnected, Data#data.state, {error, Error}}.
status_to_error(connected) ->
undefined;
status_to_error(_) ->
{error, undefined}.
%% Compatibility
external_error({error, Reason}) -> Reason;
external_error(Other) -> Other.
maybe_reply(Actions, undefined, _Reply) -> maybe_reply(Actions, undefined, _Reply) ->
Actions; Actions;
@ -660,7 +673,7 @@ maybe_reply(Actions, From, Reply) ->
data_record_to_external_map(Data) -> data_record_to_external_map(Data) ->
#{ #{
id => Data#data.id, id => Data#data.id,
error => Data#data.error, error => external_error(Data#data.error),
mod => Data#data.mod, mod => Data#data.mod,
callback_mode => Data#data.callback_mode, callback_mode => Data#data.callback_mode,
query_mode => Data#data.query_mode, query_mode => Data#data.query_mode,
@ -679,8 +692,8 @@ do_wait_for_ready(ResId, Retry) ->
case read_cache(ResId) of case read_cache(ResId) of
{_Group, #data{status = connected}} -> {_Group, #data{status = connected}} ->
ok; ok;
{_Group, #data{status = disconnected, error = Reason}} -> {_Group, #data{status = disconnected, error = Err}} ->
{error, Reason}; {error, external_error(Err)};
_ -> _ ->
timer:sleep(?WAIT_FOR_RESOURCE_DELAY), timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
do_wait_for_ready(ResId, Retry - 1) do_wait_for_ready(ResId, Retry - 1)

View File

@ -62,6 +62,7 @@ set_callback_mode(Mode) ->
persistent_term:put(?CM_KEY, Mode). persistent_term:put(?CM_KEY, Mode).
on_start(_InstId, #{create_error := true}) -> on_start(_InstId, #{create_error := true}) ->
?tp(connector_demo_start_error, #{}),
error("some error"); error("some error");
on_start(InstId, #{name := Name} = Opts) -> on_start(InstId, #{name := Name} = Opts) ->
Register = maps:get(register, Opts, false), Register = maps:get(register, Opts, false),
@ -243,6 +244,7 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}
{ok, Pid}. {ok, Pid}.
on_get_status(_InstId, #{health_check_error := true}) -> on_get_status(_InstId, #{health_check_error := true}) ->
?tp(connector_demo_health_check_error, #{}),
disconnected; disconnected;
on_get_status(_InstId, #{pid := Pid}) -> on_get_status(_InstId, #{pid := Pid}) ->
timer:sleep(300), timer:sleep(300),

View File

@ -2635,7 +2635,6 @@ t_call_mode_uncoupled_from_query_mode(_Config) ->
Trace2 Trace2
) )
), ),
ok ok
end end
). ).
@ -2796,6 +2795,42 @@ t_late_call_reply(_Config) ->
), ),
ok. ok.
t_resource_create_error_activate_alarm_once(_) ->
do_t_resource_activate_alarm_once(
#{name => test_resource, create_error => true},
connector_demo_start_error
).
t_resource_health_check_error_activate_alarm_once(_) ->
do_t_resource_activate_alarm_once(
#{name => test_resource, health_check_error => true},
connector_demo_health_check_error
).
do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
?check_trace(
begin
?wait_async_action(
emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
ResourceConfig,
#{auto_restart_interval => 100, health_check_interval => 100}
),
#{?snk_kind := resource_activate_alarm, resource_id := ?ID}
),
?assertMatch([#{activated := true, name := ?ID}], emqx_alarm:get_alarms(activated)),
{ok, SubRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := SubscribeEvent}), 4, 7000
),
?assertMatch({ok, [_, _, _, _]}, snabbkaffe:receive_events(SubRef))
end,
fun(Trace) ->
?assertMatch([_], ?of_kind(resource_activate_alarm, Trace))
end
).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -0,0 +1,7 @@
Improve 'emqx_alarm' performance by using Mnesia dirty operations and avoiding
unnecessary calls from 'emqx_resource_manager' to reactivate alarms that have been already activated.
Use new safe 'emqx_alarm' API to activate/deactivate alarms to ensure that emqx_resource_manager
doesn't crash because of alarm timeouts.
The crashes were possible when the following conditions co-occurred:
- a relatively high number of failing resources, e.g. bridges tried to activate alarms on re-occurring errors;
- the system experienced a very high load.