%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_alarm). -behaviour(gen_server). -include("emqx.hrl"). -include("logger.hrl"). -logger_header("[Alarm Handler]"). %% Mnesia bootstrap -export([mnesia/1]). -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -export([ start_link/1 , stop/0 ]). %% API -export([ activate/1 , activate/2 , deactivate/1 , deactivate/2 , delete_all_deactivated_alarms/0 , get_alarms/0 , get_alarms/1 ]). %% gen_server callbacks -export([ init/1 , handle_call/3 , handle_cast/2 , handle_info/2 , terminate/2 , code_change/3 ]). -record(activated_alarm, { name :: binary() | atom(), details :: map() | list(), message :: binary(), activate_at :: integer() }). -record(deactivated_alarm, { activate_at :: integer(), name :: binary() | atom(), details :: map() | list(), message :: binary(), deactivate_at :: integer() | infinity }). -record(state, { actions :: [action()], size_limit :: non_neg_integer(), validity_period :: non_neg_integer(), timer = undefined :: undefined | reference() }). -type action() :: log | publish | event. -define(ACTIVATED_ALARM, emqx_activated_alarm). -define(DEACTIVATED_ALARM, emqx_deactivated_alarm). -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> ok = ekka_mnesia:create_table(?ACTIVATED_ALARM, [{type, set}, {disc_copies, [node()]}, {local_content, true}, {record_name, activated_alarm}, {attributes, record_info(fields, activated_alarm)}]), ok = ekka_mnesia:create_table(?DEACTIVATED_ALARM, [{type, ordered_set}, {disc_copies, [node()]}, {local_content, true}, {record_name, deactivated_alarm}, {attributes, record_info(fields, deactivated_alarm)}]); mnesia(copy) -> ok = ekka_mnesia:copy_table(?ACTIVATED_ALARM, disc_copies), ok = ekka_mnesia:copy_table(?DEACTIVATED_ALARM, disc_copies). %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- start_link(Opts) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). stop() -> gen_server:stop(?MODULE). activate(Name) -> activate(Name, #{}). activate(Name, Details) -> gen_server:call(?MODULE, {activate_alarm, Name, Details}). deactivate(Name) -> gen_server:call(?MODULE, {deactivate_alarm, Name, no_details}). deactivate(Name, Details) -> gen_server:call(?MODULE, {deactivate_alarm, Name, Details}). delete_all_deactivated_alarms() -> gen_server:call(?MODULE, delete_all_deactivated_alarms). get_alarms() -> get_alarms(all). get_alarms(all) -> gen_server:call(?MODULE, {get_alarms, all}); get_alarms(activated) -> gen_server:call(?MODULE, {get_alarms, activated}); get_alarms(deactivated) -> gen_server:call(?MODULE, {get_alarms, deactivated}). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> Opts = [{actions, [log, publish]}], init([Opts]); init([Opts]) -> deactivate_all_alarms(), Actions = proplists:get_value(actions, Opts), SizeLimit = proplists:get_value(size_limit, Opts), ValidityPeriod = timer:seconds(proplists:get_value(validity_period, Opts)), ok = emqx_alarm_handler:load(), process_flag(trap_exit, true), {ok, ensure_delete_timer(#state{actions = Actions, size_limit = SizeLimit, validity_period = ValidityPeriod})}. %% suppress dialyzer warning due to dirty read/write race condition. %% TODO: change from dirty_read/write to transactional. %% TODO: handle mnesia write errors. -dialyzer([{nowarn_function, [handle_call/3]}]). handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) -> case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of [#activated_alarm{name = Name}] -> {reply, {error, already_existed}, State}; [] -> Alarm = #activated_alarm{name = Name, details = Details, message = normalize_message(Name, Details), activate_at = erlang:system_time(microsecond)}, mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), do_actions(activate, Alarm, Actions), {reply, ok, State} end; handle_call({deactivate_alarm, Name, Details}, _From, State = #state{ actions = Actions, size_limit = SizeLimit}) -> case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of [] -> {reply, {error, not_found}, State}; [Alarm] -> deactivate_alarm(Details, SizeLimit, Actions, Alarm), {reply, ok, State} end; handle_call(delete_all_deactivated_alarms, _From, State) -> clear_table(?DEACTIVATED_ALARM), {reply, ok, State}; handle_call({get_alarms, all}, _From, State) -> Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM) ++ ets:tab2list(?DEACTIVATED_ALARM)], {reply, Alarms, State}; handle_call({get_alarms, activated}, _From, State) -> Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM)], {reply, Alarms, State}; handle_call({get_alarms, deactivated}, _From, State) -> Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?DEACTIVATED_ALARM)], {reply, Alarms, State}; handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. handle_cast(Msg, State) -> ?LOG(error, "Unexpected msg: ~p", [Msg]), {noreply, State}. handle_info({timeout, TRef, delete_expired_deactivated_alarm}, State = #state{timer = TRef, validity_period = ValidityPeriod}) -> delete_expired_deactivated_alarms(erlang:system_time(microsecond) - ValidityPeriod * 1000), {noreply, ensure_delete_timer(State)}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> _ = emqx_alarm_handler:unload(), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %%------------------------------------------------------------------------------ %% Internal functions %%------------------------------------------------------------------------------ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ activate_at = ActivateAt, name = Name, details = Details0, message = Msg0}) -> case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of true -> case mnesia:dirty_first(?DEACTIVATED_ALARM) of '$end_of_table' -> ok; ActivateAt2 -> mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) end; false -> ok end, HistoryAlarm = make_deactivated_alarm(ActivateAt, Name, Details0, Msg0, erlang:system_time(microsecond)), DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, normalize_message(Name, Details), erlang:system_time(microsecond)), mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), mnesia:dirty_delete(?ACTIVATED_ALARM, Name), do_actions(deactivate, DeActAlarm, Actions). make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> #deactivated_alarm{ activate_at = ActivateAt, name = Name, details = Details, message = Message, deactivate_at = DeActivateAt}. deactivate_all_alarms() -> lists:foreach( fun(#activated_alarm{name = Name, details = Details, message = Message, activate_at = ActivateAt}) -> mnesia:dirty_write(?DEACTIVATED_ALARM, #deactivated_alarm{ activate_at = ActivateAt, name = Name, details = Details, message = Message, deactivate_at = erlang:system_time(microsecond)}) end, ets:tab2list(?ACTIVATED_ALARM)), clear_table(?ACTIVATED_ALARM). %% Delete all records from the given table, ignore result. clear_table(TableName) -> case mnesia:clear_table(TableName) of {aborted, Reason} -> ?LOG(warning, "Faile to clear table ~p reason: ~p", [TableName, Reason]); {atomic, ok} -> ok end. ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) -> TRef = emqx_misc:start_timer(ValidityPeriod, delete_expired_deactivated_alarm), State#state{timer = TRef}. delete_expired_deactivated_alarms(Checkpoint) -> delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint). delete_expired_deactivated_alarms('$end_of_table', _Checkpoint) -> ok; delete_expired_deactivated_alarms(ActivatedAt, Checkpoint) -> case ActivatedAt =< Checkpoint of true -> mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivatedAt), NActivatedAt = mnesia:dirty_next(?DEACTIVATED_ALARM, ActivatedAt), delete_expired_deactivated_alarms(NActivatedAt, Checkpoint); false -> ok end. do_actions(_, _, []) -> ok; do_actions(activate, Alarm = #activated_alarm{name = Name, message = Message}, [log | More]) -> ?LOG(warning, "Alarm ~s is activated, ~s", [Name, Message]), do_actions(activate, Alarm, More); do_actions(deactivate, Alarm = #deactivated_alarm{name = Name}, [log | More]) -> ?LOG(warning, "Alarm ~s is deactivated", [Name]), do_actions(deactivate, Alarm, More); do_actions(Operation, Alarm, [publish | More]) -> Topic = topic(Operation), {ok, Payload} = encode_to_json(Alarm), Message = emqx_message:make(?MODULE, 0, Topic, Payload, #{sys => true}, #{properties => #{'Content-Type' => <<"application/json">>}}), %% TODO log failed publishes _ = emqx_broker:safe_publish(Message), do_actions(Operation, Alarm, More). encode_to_json(Alarm) -> emqx_json:safe_encode(normalize(Alarm)). topic(activate) -> emqx_topic:systop(<<"alarms/activate">>); topic(deactivate) -> emqx_topic:systop(<<"alarms/deactivate">>). normalize(#activated_alarm{name = Name, details = Details, message = Message, activate_at = ActivateAt}) -> #{name => Name, details => Details, message => Message, activate_at => ActivateAt, deactivate_at => infinity, activated => true}; normalize(#deactivated_alarm{activate_at = ActivateAt, name = Name, details = Details, message = Message, deactivate_at = DeactivateAt}) -> #{name => Name, details => Details, message => Message, activate_at => ActivateAt, deactivate_at => DeactivateAt, activated => false}. normalize_message(Name, no_details) -> list_to_binary(io_lib:format("~p", [Name])); normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("Process memory usage is higher than ~p%", [HighWatermark])); normalize_message(high_cpu_usage, #{usage := Usage}) -> list_to_binary(io_lib:format("~p% cpu usage", [Usage])); normalize_message(too_many_processes, #{usage := Usage}) -> list_to_binary(io_lib:format("~p% process usage", [Usage])); normalize_message(license_quota, #{high_watermark := High}) -> iolist_to_binary(["License: the number of connections exceeds ", High, "%"]); normalize_message(license_expiry, #{expiry_at := ExpiryAt}) -> iolist_to_binary(["License will be expired at ", ExpiryAt]); normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); normalize_message(<<"conn_congestion/", Info/binary>>, _) -> list_to_binary(io_lib:format("connection congested: ~s", [Info])); normalize_message(_Name, _UnknownDetails) -> <<"Unknown alarm">>.