diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 6aa3cb95d..056f36050 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -42,7 +42,9 @@ get_alarms/0, get_alarms/1, format/1, - format/2 + format/2, + safe_activate/3, + safe_deactivate/1 ]). %% gen_server callbacks @@ -57,7 +59,6 @@ %% Internal exports (RPC) -export([ - create_activate_alarm/3, do_get_alarms/0 ]). @@ -123,6 +124,9 @@ activate(Name, Details) -> activate(Name, Details, Message) -> gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}). +safe_activate(Name, Details, Message) -> + safe_call({activate_alarm, Name, Details, Message}). + -spec ensure_deactivated(binary() | atom()) -> ok. ensure_deactivated(Name) -> ensure_deactivated(Name, no_details). @@ -155,6 +159,9 @@ deactivate(Name, Details) -> deactivate(Name, Details, Message) -> gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}). +safe_deactivate(Name) -> + safe_call({deactivate_alarm, Name, no_details, <<"">>}). + -spec delete_all_deactivated_alarms() -> ok. delete_all_deactivated_alarms() -> gen_server:call(?MODULE, delete_all_deactivated_alarms). @@ -218,17 +225,12 @@ init([]) -> {ok, #{}, get_validity_period()}. handle_call({activate_alarm, Name, Details, Message}, _From, State) -> - Res = mria:transaction( - mria:local_content_shard(), - fun ?MODULE:create_activate_alarm/3, - [Name, Details, Message] - ), - case Res of - {atomic, Alarm} -> + case create_activate_alarm(Name, Details, Message) of + {ok, Alarm} -> do_actions(activate, Alarm, emqx:get_config([alarm, actions])), {reply, ok, State, get_validity_period()}; - {aborted, Reason} -> - {reply, Reason, State, get_validity_period()} + Err -> + {reply, Err, State, get_validity_period()} end; handle_call({deactivate_alarm, Name, Details, Message}, _From, State) -> case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of @@ -283,9 +285,9 @@ get_validity_period() -> emqx:get_config([alarm, validity_period]). create_activate_alarm(Name, Details, Message) -> - case mnesia:read(?ACTIVATED_ALARM, Name) of + case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of [#activated_alarm{name = Name}] -> - mnesia:abort({error, already_existed}); + {error, already_existed}; [] -> Alarm = #activated_alarm{ name = Name, @@ -293,8 +295,8 @@ create_activate_alarm(Name, Details, Message) -> message = normalize_message(Name, iolist_to_binary(Message)), activate_at = erlang:system_time(microsecond) }, - ok = mnesia:write(?ACTIVATED_ALARM, Alarm, write), - Alarm + ok = mria:dirty_write(?ACTIVATED_ALARM, Alarm), + {ok, Alarm} end. do_get_alarms() -> @@ -474,3 +476,19 @@ normalize_message(Name, <<"">>) -> list_to_binary(io_lib:format("~p", [Name])); normalize_message(_Name, Message) -> Message. + +safe_call(Req) -> + try + gen_server:call(?MODULE, Req) + catch + _:{timeout, _} = Reason -> + ?SLOG(warning, #{msg => "emqx_alarm_safe_call_timeout", reason => Reason}), + {error, timeout}; + _:Reason:St -> + ?SLOG(error, #{ + msg => "emqx_alarm_safe_call_exception", + reason => Reason, + stacktrace => St + }), + {error, Reason} + end. diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 7ecf56c18..877b35fff 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -375,7 +375,7 @@ handle_event(state_timeout, health_check, connecting, Data) -> %% and successful health_checks handle_event(enter, _OldState, connected = State, Data) -> ok = log_state_consistency(State, Data), - _ = emqx_alarm:deactivate(Data#data.id), + _ = emqx_alarm:safe_deactivate(Data#data.id), ?tp(resource_connected_enter, #{}), {keep_state_and_data, health_check_actions(Data)}; handle_event(state_timeout, health_check, connected, Data) -> @@ -511,10 +511,10 @@ start_resource(Data, From) -> id => Data#data.id, reason => Reason }), - _ = maybe_alarm(disconnected, Data#data.id, Data#data.error), + _ = maybe_alarm(disconnected, Data#data.id, Err, Data#data.error), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. - UpdatedData = Data#data{status = disconnected, error = Reason}, + UpdatedData = Data#data{status = disconnected, error = Err}, Actions = maybe_reply(retry_actions(UpdatedData), From, Err), {next_state, disconnected, update_state(UpdatedData, Data), Actions} end. @@ -582,11 +582,11 @@ handle_connected_health_check(Data) -> with_health_check(#data{state = undefined} = Data, Func) -> Func(disconnected, Data); -with_health_check(Data, Func) -> +with_health_check(#data{error = PrevError} = Data, Func) -> ResId = Data#data.id, HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data), - _ = maybe_alarm(Status, ResId, Err), + _ = maybe_alarm(Status, ResId, Err, PrevError), ok = maybe_resume_resource_workers(ResId, Status), UpdatedData = Data#data{ state = NewState, status = Status, error = Err @@ -605,21 +605,25 @@ update_state(Data, _DataWas) -> health_check_interval(Opts) -> maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). -maybe_alarm(connected, _ResId, _Error) -> +maybe_alarm(connected, _ResId, _Error, _PrevError) -> ok; -maybe_alarm(_Status, <>, _Error) -> +maybe_alarm(_Status, <>, _Error, _PrevError) -> ok; -maybe_alarm(_Status, ResId, Error) -> +%% Assume that alarm is already active +maybe_alarm(_Status, _ResId, Error, Error) -> + ok; +maybe_alarm(_Status, ResId, Error, _PrevError) -> HrError = case Error of - undefined -> <<"Unknown reason">>; - _Else -> emqx_utils:readable_error_msg(Error) + {error, undefined} -> <<"Unknown reason">>; + {error, Reason} -> emqx_utils:readable_error_msg(Reason) end, - emqx_alarm:activate( + emqx_alarm:safe_activate( ResId, #{resource_id => ResId, reason => resource_down}, <<"resource down: ", HrError/binary>> - ). + ), + ?tp(resource_activate_alarm, #{resource_id => ResId}). maybe_resume_resource_workers(ResId, connected) -> lists:foreach( @@ -632,14 +636,14 @@ maybe_resume_resource_workers(_, _) -> maybe_clear_alarm(<>) -> ok; maybe_clear_alarm(ResId) -> - emqx_alarm:deactivate(ResId). + emqx_alarm:safe_deactivate(ResId). parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> - {Status, Data#data.state, undefined}; + {Status, Data#data.state, status_to_error(Status)}; parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) -> - {Status, NewState, undefined}; + {Status, NewState, status_to_error(Status)}; parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) -> - {Status, NewState, Error}; + {Status, NewState, {error, Error}}; parse_health_check_result({error, Error}, Data) -> ?SLOG( error, @@ -649,7 +653,16 @@ parse_health_check_result({error, Error}, Data) -> reason => Error } ), - {disconnected, Data#data.state, Error}. + {disconnected, Data#data.state, {error, Error}}. + +status_to_error(connected) -> + undefined; +status_to_error(_) -> + {error, undefined}. + +%% Compatibility +external_error({error, Reason}) -> Reason; +external_error(Other) -> Other. maybe_reply(Actions, undefined, _Reply) -> Actions; @@ -660,7 +673,7 @@ maybe_reply(Actions, From, Reply) -> data_record_to_external_map(Data) -> #{ id => Data#data.id, - error => Data#data.error, + error => external_error(Data#data.error), mod => Data#data.mod, callback_mode => Data#data.callback_mode, query_mode => Data#data.query_mode, @@ -679,8 +692,8 @@ do_wait_for_ready(ResId, Retry) -> case read_cache(ResId) of {_Group, #data{status = connected}} -> ok; - {_Group, #data{status = disconnected, error = Reason}} -> - {error, Reason}; + {_Group, #data{status = disconnected, error = Err}} -> + {error, external_error(Err)}; _ -> timer:sleep(?WAIT_FOR_RESOURCE_DELAY), do_wait_for_ready(ResId, Retry - 1) diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 5be854e93..96e22c6b6 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -62,6 +62,7 @@ set_callback_mode(Mode) -> persistent_term:put(?CM_KEY, Mode). on_start(_InstId, #{create_error := true}) -> + ?tp(connector_demo_start_error, #{}), error("some error"); on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), @@ -243,6 +244,7 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid} {ok, Pid}. on_get_status(_InstId, #{health_check_error := true}) -> + ?tp(connector_demo_health_check_error, #{}), disconnected; on_get_status(_InstId, #{pid := Pid}) -> timer:sleep(300), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e098c2e1c..f8ddd56b5 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2635,7 +2635,6 @@ t_call_mode_uncoupled_from_query_mode(_Config) -> Trace2 ) ), - ok end ). @@ -2796,6 +2795,42 @@ t_late_call_reply(_Config) -> ), ok. +t_resource_create_error_activate_alarm_once(_) -> + do_t_resource_activate_alarm_once( + #{name => test_resource, create_error => true}, + connector_demo_start_error + ). + +t_resource_health_check_error_activate_alarm_once(_) -> + do_t_resource_activate_alarm_once( + #{name => test_resource, health_check_error => true}, + connector_demo_health_check_error + ). + +do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) -> + ?check_trace( + begin + ?wait_async_action( + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + ResourceConfig, + #{auto_restart_interval => 100, health_check_interval => 100} + ), + #{?snk_kind := resource_activate_alarm, resource_id := ?ID} + ), + ?assertMatch([#{activated := true, name := ?ID}], emqx_alarm:get_alarms(activated)), + {ok, SubRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := SubscribeEvent}), 4, 7000 + ), + ?assertMatch({ok, [_, _, _, _]}, snabbkaffe:receive_events(SubRef)) + end, + fun(Trace) -> + ?assertMatch([_], ?of_kind(resource_activate_alarm, Trace)) + end + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-10407.en.md b/changes/ce/fix-10407.en.md new file mode 100644 index 000000000..d9df9ce69 --- /dev/null +++ b/changes/ce/fix-10407.en.md @@ -0,0 +1,7 @@ +Improve 'emqx_alarm' performance by using Mnesia dirty operations and avoiding +unnecessary calls from 'emqx_resource_manager' to reactivate alarms that have been already activated. +Use new safe 'emqx_alarm' API to activate/deactivate alarms to ensure that emqx_resource_manager +doesn't crash because of alarm timeouts. +The crashes were possible when the following conditions co-occurred: + - a relatively high number of failing resources, e.g. bridges tried to activate alarms on re-occurring errors; + - the system experienced a very high load.