diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index cf419edc5..f7d3418ca 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -48,6 +48,12 @@ %% Queue topic -define(QUEUE, <<"$queue/">>). +%%-------------------------------------------------------------------- +%% alarms +%%-------------------------------------------------------------------- +-define(ACTIVATED_ALARM, emqx_activated_alarm). +-define(DEACTIVATED_ALARM, emqx_deactivated_alarm). + %%-------------------------------------------------------------------- %% Message and Delivery %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 2585494eb..403308a68 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -17,7 +17,6 @@ -module(emqx_alarm). -behaviour(gen_server). --behaviour(emqx_config_handler). -include("emqx.hrl"). -include("logger.hrl"). @@ -27,22 +26,19 @@ -boot_mnesia({mnesia, [boot]}). --export([post_config_update/4]). - --export([ start_link/0 - , stop/0 +-export([start_link/0 ]). - --export([format/1]). - %% API -export([ activate/1 , activate/2 + , activate/3 , deactivate/1 , deactivate/2 + , deactivate/3 , delete_all_deactivated_alarms/0 , get_alarms/0 , get_alarms/1 + , format/1 ]). %% gen_server callbacks @@ -56,34 +52,19 @@ -record(activated_alarm, { name :: binary() | atom(), - details :: map() | list(), - message :: binary(), - activate_at :: integer() }). -record(deactivated_alarm, { activate_at :: integer(), - name :: binary() | atom(), - details :: map() | list(), - message :: binary(), - deactivate_at :: integer() | infinity }). --record(state, { - timer :: reference() - }). - --define(ACTIVATED_ALARM, emqx_activated_alarm). - --define(DEACTIVATED_ALARM, emqx_deactivated_alarm). - -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -114,20 +95,23 @@ mnesia(boot) -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -stop() -> - gen_server:stop(?MODULE). - activate(Name) -> activate(Name, #{}). activate(Name, Details) -> - gen_server:call(?MODULE, {activate_alarm, Name, Details}). + activate(Name, Details, <<"">>). + +activate(Name, Details, Message) -> + gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}). deactivate(Name) -> - gen_server:call(?MODULE, {deactivate_alarm, Name, no_details}). + deactivate(Name, no_details, <<"">>). deactivate(Name, Details) -> - gen_server:call(?MODULE, {deactivate_alarm, Name, Details}). + deactivate(Name, Details, <<"">>). + +deactivate(Name, Details, Message) -> + gen_server:call(?MODULE, {deactivate_alarm, Name, Details, Message}). delete_all_deactivated_alarms() -> gen_server:call(?MODULE, delete_all_deactivated_alarms). @@ -144,10 +128,6 @@ get_alarms(activated) -> get_alarms(deactivated) -> gen_server:call(?MODULE, {get_alarms, deactivated}). -post_config_update(_, #{validity_period := Period0}, _OldConf, _AppEnv) -> - ?MODULE ! {update_timer, Period0}, - ok. - format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) -> Now = erlang:system_time(microsecond), #{ @@ -159,7 +139,7 @@ format(#activated_alarm{name = Name, message = Message, activate_at = At, detail details => Details }; format(#deactivated_alarm{name = Name, message = Message, activate_at = At, details = Details, - deactivate_at = DAt}) -> + deactivate_at = DAt}) -> #{ node => node(), name => Name, @@ -168,9 +148,7 @@ format(#deactivated_alarm{name = Name, message = Message, activate_at = At, deta activate_at => to_rfc3339(At), deactivate_at => to_rfc3339(DAt), details => Details - }; -format(_) -> - {error, unknow_alarm}. + }. to_rfc3339(Timestamp) -> list_to_binary(calendar:system_time_to_rfc3339(Timestamp div 1000, [{unit, millisecond}])). @@ -180,85 +158,72 @@ to_rfc3339(Timestamp) -> %%-------------------------------------------------------------------- init([]) -> - _ = mria:wait_for_tables([?ACTIVATED_ALARM, ?DEACTIVATED_ALARM]), + ok = mria:wait_for_tables([?ACTIVATED_ALARM, ?DEACTIVATED_ALARM]), deactivate_all_alarms(), - ok = emqx_config_handler:add_handler([alarm], ?MODULE), - {ok, #state{timer = ensure_timer(undefined, get_validity_period())}}. + {ok, #{}, get_validity_period()}. -%% suppress dialyzer warning due to dirty read/write race condition. -%% TODO: change from dirty_read/write to transactional. -%% TODO: handle mnesia write errors. --dialyzer([{nowarn_function, [handle_call/3]}]). -handle_call({activate_alarm, Name, Details}, _From, State) -> - case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of - [#activated_alarm{name = Name}] -> - {reply, {error, already_existed}, State}; - [] -> - Alarm = #activated_alarm{name = Name, - details = Details, - message = normalize_message(Name, Details), - activate_at = erlang:system_time(microsecond)}, - mria:dirty_write(?ACTIVATED_ALARM, Alarm), +handle_call({activate_alarm, Name, Details, Message}, _From, State) -> + Res = mria:transaction(mria:local_content_shard(), + fun create_activate_alarm/3, + [Name, Details, Message]), + case Res of + {atomic, Alarm} -> do_actions(activate, Alarm, emqx:get_config([alarm, actions])), - {reply, ok, State} + {reply, ok, State, get_validity_period()}; + {aborted, Reason} -> + {reply, Reason, State, get_validity_period()} end; -handle_call({deactivate_alarm, Name, Details}, _From, State) -> +handle_call({deactivate_alarm, Name, Details, Message}, _From, State) -> case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of [] -> {reply, {error, not_found}, State}; [Alarm] -> - deactivate_alarm(Details, Alarm), - {reply, ok, State} + deactivate_alarm(Alarm, Details, Message), + {reply, ok, State, get_validity_period()} end; handle_call(delete_all_deactivated_alarms, _From, State) -> clear_table(?DEACTIVATED_ALARM), - {reply, ok, State}; + {reply, ok, State, get_validity_period()}; handle_call({get_alarms, all}, _From, State) -> {atomic, Alarms} = mria:ro_transaction( - ?COMMON_SHARD, + mria:local_content_shard(), fun() -> [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM) ++ ets:tab2list(?DEACTIVATED_ALARM)] end), - {reply, Alarms, State}; + {reply, Alarms, State, get_validity_period()}; handle_call({get_alarms, activated}, _From, State) -> Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?ACTIVATED_ALARM)], - {reply, Alarms, State}; + {reply, Alarms, State, get_validity_period()}; handle_call({get_alarms, deactivated}, _From, State) -> Alarms = [normalize(Alarm) || Alarm <- ets:tab2list(?DEACTIVATED_ALARM)], - {reply, Alarms, State}; + {reply, Alarms, State, get_validity_period()}; -handle_call(Req, _From, State) -> - ?SLOG(error, #{msg => "unexpected_call", call => Req}), - {reply, ignored, State}. +handle_call(Req, From, State) -> + ?SLOG(error, #{msg => "unexpected_call", call_req => Req, from => From}), + {reply, ignored, State, get_validity_period()}. handle_cast(Msg, State) -> - ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), - {noreply, State}. + ?SLOG(error, #{msg => "unexpected_cast", cast_req => Msg}), + {noreply, State, get_validity_period()}. -handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, - #state{timer = TRef} = State) -> +handle_info(timeout, State) -> Period = get_validity_period(), delete_expired_deactivated_alarms(erlang:system_time(microsecond) - Period * 1000), - {noreply, State#state{timer = ensure_timer(TRef, Period)}}; - -handle_info({update_timer, Period}, #state{timer = TRef} = State) -> - ?SLOG(warning, #{msg => "validity_timer_updated", period => Period}), - {noreply, State#state{timer = ensure_timer(TRef, Period)}}; + {noreply, State, Period}; handle_info(Info, State) -> - ?SLOG(error, #{msg => "unexpected_info", info => Info}), - {noreply, State}. + ?SLOG(error, #{msg => "unexpected_info", info_req => Info}), + {noreply, State, get_validity_period()}. terminate(_Reason, _State) -> - ok = emqx_config_handler:remove_handler([alarm]), ok. code_change(_OldVsn, State, _Extra) -> @@ -271,8 +236,21 @@ code_change(_OldVsn, State, _Extra) -> get_validity_period() -> emqx:get_config([alarm, validity_period]). -deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name, - details = Details0, message = Msg0}) -> +create_activate_alarm(Name, Details, Message) -> + case mnesia:read(?ACTIVATED_ALARM, Name) of + [#activated_alarm{name = Name}] -> + mnesia:abort({error, already_existed}); + [] -> + Alarm = #activated_alarm{name = Name, + details = Details, + message = normalize_message(Name, iolist_to_binary(Message)), + activate_at = erlang:system_time(microsecond)}, + ok = mnesia:write(?ACTIVATED_ALARM, Alarm, write), + Alarm + end. + +deactivate_alarm(#activated_alarm{activate_at = ActivateAt, name = Name, + details = Details0, message = Msg0}, Details, Message) -> SizeLimit = emqx:get_config([alarm, size_limit]), case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of true -> @@ -286,7 +264,7 @@ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name HistoryAlarm = make_deactivated_alarm(ActivateAt, Name, Details0, Msg0, erlang:system_time(microsecond)), DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, - normalize_message(Name, Details), + normalize_message(Name, iolist_to_binary(Message)), erlang:system_time(microsecond)), mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), mria:dirty_delete(?ACTIVATED_ALARM, Name), @@ -329,13 +307,6 @@ clear_table(TableName) -> ok end. -ensure_timer(OldTRef, Period) -> - _ = case is_reference(OldTRef) of - true -> erlang:cancel_timer(OldTRef); - false -> ok - end, - emqx_misc:start_timer(Period, delete_expired_deactivated_alarm). - delete_expired_deactivated_alarms(Checkpoint) -> delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint). @@ -368,16 +339,12 @@ do_actions(deactivate, Alarm = #deactivated_alarm{name = Name}, [log | More]) -> do_actions(deactivate, Alarm, More); do_actions(Operation, Alarm, [publish | More]) -> Topic = topic(Operation), - {ok, Payload} = encode_to_json(Alarm), + {ok, Payload} = emqx_json:safe_encode(normalize(Alarm)), Message = emqx_message:make(?MODULE, 0, Topic, Payload, #{sys => true}, #{properties => #{'Content-Type' => <<"application/json">>}}), - %% TODO log failed publishes _ = emqx_broker:safe_publish(Message), do_actions(Operation, Alarm, More). -encode_to_json(Alarm) -> - emqx_json:safe_encode(normalize(Alarm)). - topic(activate) -> emqx_topic:systop(<<"alarms/activate">>); topic(deactivate) -> @@ -405,25 +372,6 @@ normalize(#deactivated_alarm{activate_at = ActivateAt, deactivate_at => DeactivateAt, activated => false}. -normalize_message(Name, no_details) -> +normalize_message(Name, <<"">>) -> list_to_binary(io_lib:format("~p", [Name])); -normalize_message(runq_overload, #{node := Node, runq_length := Len}) -> - list_to_binary(io_lib:format("VM is overloaded on node: ~p: ~p", [Node, Len])); -normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> - list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); -normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) -> - list_to_binary(io_lib:format("Process memory usage is higher than ~p%", [HighWatermark])); -normalize_message(high_cpu_usage, #{usage := Usage}) -> - list_to_binary(io_lib:format("~ts cpu usage", [Usage])); -normalize_message(too_many_processes, #{usage := Usage}) -> - list_to_binary(io_lib:format("~ts process usage", [Usage])); -normalize_message(cluster_rpc_apply_failed, #{tnx_id := TnxId}) -> - list_to_binary(io_lib:format("cluster_rpc_apply_failed:~w", [TnxId])); -normalize_message(partition, #{occurred := Node}) -> - list_to_binary(io_lib:format("Partition occurs at node ~ts", [Node])); -normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> - list_to_binary(io_lib:format("Resource ~ts(~ts) is down", [Type, ID])); -normalize_message(<<"conn_congestion/", Info/binary>>, _) -> - list_to_binary(io_lib:format("connection congested: ~ts", [Info])); -normalize_message(_Name, _UnknownDetails) -> - <<"Unknown alarm">>. +normalize_message(_Name, Message) -> Message. diff --git a/apps/emqx/src/emqx_alarm_handler.erl b/apps/emqx/src/emqx_alarm_handler.erl index 4cf699895..5290404b3 100644 --- a/apps/emqx/src/emqx_alarm_handler.erl +++ b/apps/emqx/src/emqx_alarm_handler.erl @@ -57,14 +57,18 @@ init(_) -> {ok, []}. handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> + HighWatermark = emqx_os_mon:get_sysmem_high_watermark(), + Message = to_bin("System memory usage is higher than ~p%", [HighWatermark]), emqx_alarm:activate(high_system_memory_usage, - #{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}), + #{high_watermark => HighWatermark}, Message), {ok, State}; handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> + HighWatermark = emqx_os_mon:get_procmem_high_watermark(), + Message = to_bin("Process memory usage is higher than ~p%", [HighWatermark]), emqx_alarm:activate(high_process_memory_usage, #{pid => list_to_binary(pid_to_list(Pid)), - high_watermark => emqx_os_mon:get_procmem_high_watermark()}), + high_watermark => HighWatermark}, Message), {ok, State}; handle_event({clear_alarm, system_memory_high_watermark}, State) -> @@ -76,7 +80,9 @@ handle_event({clear_alarm, process_memory_high_watermark}, State) -> {ok, State}; handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> - emqx_alarm:activate(runq_overload, Info), + #{node := Node, runq_length := Len} = Info, + Message = to_bin("VM is overloaded on node: ~p: ~p", [Node, Len]), + emqx_alarm:activate(runq_overload, Info, Message), {ok, State}; handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) -> @@ -96,3 +102,6 @@ terminate(swap, _State) -> {emqx_alarm_handler, []}; terminate(_, _) -> ok. + +to_bin(Format, Args) -> + io_lib:format(Format, Args). diff --git a/apps/emqx/src/emqx_congestion.erl b/apps/emqx/src/emqx_congestion.erl index 170c6bc69..783f4ee4a 100644 --- a/apps/emqx/src/emqx_congestion.erl +++ b/apps/emqx/src/emqx_congestion.erl @@ -78,13 +78,15 @@ cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> do_alarm_congestion(Socket, Transport, Channel, Reason) -> ok = update_alarm_sent_at(Reason), AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), - emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails), + Message = io_lib:format("connection congested: ~ts", [AlarmDetails]), + emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message), ok. do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> ok = remove_alarm_sent_at(Reason), AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), - emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails), + Message = io_lib:format("connection congested: ~ts", [AlarmDetails]), + emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message), ok. is_tcp_congested(Socket, Transport) -> diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index 24795c7ba..e0cfac7af 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -96,12 +96,26 @@ handle_info({timeout, _Timer, check}, State) -> _ = case emqx_vm:cpu_util() of %% TODO: should be improved? 0 -> ok; Busy when Busy >= CPUHighWatermark -> - emqx_alarm:activate(high_cpu_usage, #{usage => io_lib:format("~p%", [Busy]), - high_watermark => CPUHighWatermark, - low_watermark => CPULowWatermark}), + Usage = io_lib:format("~p%", [Busy]), + Message = [Usage, " cpu usage"], + emqx_alarm:activate(high_cpu_usage, + #{ + usage => Usage, + high_watermark => CPUHighWatermark, + low_watermark => CPULowWatermark + }, + Message), start_check_timer(); Busy when Busy =< CPULowWatermark -> - emqx_alarm:deactivate(high_cpu_usage), + Usage = io_lib:format("~p%", [Busy]), + Message = [Usage, " cpu usage"], + emqx_alarm:deactivate(high_cpu_usage, + #{ + usage => Usage, + high_watermark => CPUHighWatermark, + low_watermark => CPULowWatermark + }, + Message), start_check_timer(); _Busy -> start_check_timer() diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 99a7ed59b..e319dbe15 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -51,6 +51,7 @@ -export([ validate_heap_size/1 , parse_user_lookup_fun/1 + , validate_alarm_actions/1 ]). % workaround: prevent being recognized as unused functions @@ -889,17 +890,34 @@ fields("sysmon_os") -> fields("alarm") -> [ {"actions", sc(hoconsc:array(atom()), - #{ default => [log, publish] + #{ default => [log, publish], + validator => fun ?MODULE:validate_alarm_actions/1, + example => [log, publish], + desc => + """The actions triggered when the alarm is activated.<\br> +Currently supports two actions, 'log' and 'publish'. +'log' is to write the alarm to log (console or file). +'publish' is to publish the alarm as an MQTT message to the system topics: +$SYS/brokers/emqx@xx.xx.xx.x/alarms/activate and $SYS/brokers/emqx@xx.xx.xx.x/alarms/deactivate""" }) } , {"size_limit", - sc(integer(), - #{ default => 1000 + sc(range(1, 3000), + #{ default => 1000, + example => 1000, + desc => + """The maximum total number of deactivated alarms to keep as history.
+When this limit is exceeded, the oldest deactivated alarms are deleted to cap the total number. +""" }) } , {"validity_period", sc(duration(), - #{ default => "24h" + #{ default => "24h", + example => "24h", + desc => + """Retention time of deactivated alarms. Alarms are not deleted immediately when deactivated, but after the retention time. + """ }) } ]. @@ -1345,6 +1363,14 @@ validate_heap_size(Siz) -> true -> error(io_lib:format("force_shutdown_policy: heap-size ~ts is too large", [Siz])); false -> ok end. + +validate_alarm_actions(Actions) -> + UnSupported = lists:filter(fun(Action) -> Action =/= log andalso Action =/= publish end, Actions), + case UnSupported of + [] -> ok; + Error -> {error, Error} + end. + parse_user_lookup_fun(StrConf) -> [ModStr, FunStr] = string:tokens(str(StrConf), ":"), Mod = list_to_atom(ModStr), diff --git a/apps/emqx/src/emqx_sys_mon.erl b/apps/emqx/src/emqx_sys_mon.erl index 7d798060f..cdc4677f3 100644 --- a/apps/emqx/src/emqx_sys_mon.erl +++ b/apps/emqx/src/emqx_sys_mon.erl @@ -170,9 +170,11 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- handle_partition_event({partition, {occurred, Node}}) -> - emqx_alarm:activate(partition, #{occurred => Node}); -handle_partition_event({partition, {healed, _Node}}) -> - emqx_alarm:deactivate(partition). + Message = io_lib:format("Partition occurs at node ~ts", [Node]), + emqx_alarm:activate(partition, #{occurred => Node}, Message); +handle_partition_event({partition, {healed, Node}}) -> + Message = io_lib:format("Partition healed at node ~ts", [Node]), + emqx_alarm:deactivate(partition, no_details, Message). suppress(Key, SuccFun, State = #{events := Events}) -> case lists:member(Key, Events) of diff --git a/apps/emqx/src/emqx_vm_mon.erl b/apps/emqx/src/emqx_vm_mon.erl index 703aca52f..9a30e71f2 100644 --- a/apps/emqx/src/emqx_vm_mon.erl +++ b/apps/emqx/src/emqx_vm_mon.erl @@ -62,12 +62,23 @@ handle_info({timeout, _Timer, check}, State) -> ProcessCount = erlang:system_info(process_count), case ProcessCount / erlang:system_info(process_limit) of Percent when Percent >= ProcHighWatermark -> - emqx_alarm:activate(too_many_processes, #{ - usage => io_lib:format("~p%", [Percent*100]), - high_watermark => ProcHighWatermark, - low_watermark => ProcLowWatermark}); + Usage = io_lib:format("~p%", [Percent*100]), + Message = [Usage, " process usage"], + emqx_alarm:activate(too_many_processes, + #{ + usage => Usage, + high_watermark => ProcHighWatermark, + low_watermark => ProcLowWatermark}, + Message); Percent when Percent < ProcLowWatermark -> - emqx_alarm:deactivate(too_many_processes); + Usage = io_lib:format("~p%", [Percent*100]), + Message = [Usage, " process usage"], + emqx_alarm:deactivate(too_many_processes, + #{ + usage => Usage, + high_watermark => ProcHighWatermark, + low_watermark => ProcLowWatermark}, + Message); _Precent -> ok end, diff --git a/apps/emqx/test/emqx_alarm_SUITE.erl b/apps/emqx/test/emqx_alarm_SUITE.erl index 0a720ffc1..b542250b3 100644 --- a/apps/emqx/test/emqx_alarm_SUITE.erl +++ b/apps/emqx/test/emqx_alarm_SUITE.erl @@ -32,16 +32,12 @@ init_per_testcase(t_size_limit, Config) -> <<"size_limit">> => 2 }), Config; -init_per_testcase(t_validity_period, Config) -> +init_per_testcase(_, Config) -> emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), {ok, _} = emqx:update_config([alarm], #{ <<"validity_period">> => <<"1s">> }), - Config; -init_per_testcase(_, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), Config. end_per_testcase(_, _Config) -> @@ -86,17 +82,77 @@ t_size_limit(_) -> ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), emqx_alarm:delete_all_deactivated_alarms(). -t_validity_period(_) -> - ok = emqx_alarm:activate(a), - ok = emqx_alarm:deactivate(a), +t_validity_period(_Config) -> + ok = emqx_alarm:activate(a, #{msg => "Request frequency is too high"}, <<"Reach Rate Limit">>), + ok = emqx_alarm:deactivate(a, #{msg => "Request frequency returns to normal"}), ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), + %% call with unknown msg + ?assertEqual(ignored, gen_server:call(emqx_alarm, unknown_alarm)), ct:sleep(3000), ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))). +t_validity_period_1(_Config) -> + ok = emqx_alarm:activate(a, #{msg => "Request frequency is too high"}, <<"Reach Rate Limit">>), + ok = emqx_alarm:deactivate(a, #{msg => "Request frequency returns to normal"}), + ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), + %% info with unknown msg + erlang:send(emqx_alarm, unknown_alarm), + ct:sleep(3000), + ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))). + +t_validity_period_2(_Config) -> + ok = emqx_alarm:activate(a, #{msg => "Request frequency is too high"}, <<"Reach Rate Limit">>), + ok = emqx_alarm:deactivate(a, #{msg => "Request frequency returns to normal"}), + ?assertNotEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))), + %% cast with unknown msg + gen_server:cast(emqx_alarm, unknown_alarm), + ct:sleep(3000), + ?assertEqual({error, not_found}, get_alarm(a, emqx_alarm:get_alarms(deactivated))). + +-record(activated_alarm, { + name :: binary() | atom(), + details :: map() | list(), + message :: binary(), + activate_at :: integer() +}). + +-record(deactivated_alarm, { + activate_at :: integer(), + name :: binary() | atom(), + details :: map() | list(), + message :: binary(), + deactivate_at :: integer() | infinity +}). + +t_format(_Config) -> + Name = test_alarm, + Message = "test_msg", + At = erlang:system_time(microsecond), + Details = "test_details", + Node = node(), + Activate = #activated_alarm{name = Name, message = Message, activate_at = At, details = Details}, + #{ + node := Node, + name := Name, + message := Message, + duration := 0, + details := Details + } = emqx_alarm:format(Activate), + Deactivate = #deactivated_alarm{name = Name, message = Message, activate_at = At, details = Details, + deactivate_at = At}, + #{ + node := Node, + name := Name, + message := Message, + duration := 0, + details := Details + } = emqx_alarm:format(Deactivate), + ok. + + get_alarm(Name, [Alarm = #{name := Name} | _More]) -> Alarm; get_alarm(Name, [_Alarm | More]) -> get_alarm(Name, More); get_alarm(_Name, []) -> {error, not_found}. - diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 4187b35aa..153800414 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -320,21 +320,23 @@ apply_mfa(TnxId, {M, F, A}) -> end, Meta = #{tnx_id => TnxId, module => M, function => F, args => ?TO_BIN(A)}, IsSuccess = is_success(Res), - log_and_alarm(IsSuccess, Res, Meta), + log_and_alarm(IsSuccess, Res, Meta, TnxId), {IsSuccess, Res}. is_success(ok) -> true; is_success({ok, _}) -> true; is_success(_) -> false. -log_and_alarm(true, Res, Meta) -> +log_and_alarm(true, Res, Meta, TnxId) -> OkMeta = Meta#{msg => <<"succeeded to apply MFA">>, result => Res}, ?SLOG(debug, OkMeta), - emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta#{result => ?TO_BIN(Res)}); -log_and_alarm(false, Res, Meta) -> + Message = ["cluster_rpc_apply_failed:", integer_to_binary(TnxId)], + emqx_alarm:deactivate(cluster_rpc_apply_failed, OkMeta#{result => ?TO_BIN(Res)}, Message); +log_and_alarm(false, Res, Meta, TnxId) -> NotOkMeta = Meta#{msg => <<"failed to apply MFA">>, result => Res}, ?SLOG(error, NotOkMeta), - emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta#{result => ?TO_BIN(Res)}). + Message = ["cluster_rpc_apply_failed:", integer_to_binary(TnxId)], + emqx_alarm:activate(cluster_rpc_apply_failed, NotOkMeta#{result => ?TO_BIN(Res)}, Message). wait_for_all_nodes_commit(TnxId, Delay, Remain) -> case lagging_node(TnxId) of diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index cb79151ce..4e689916a 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -43,8 +43,8 @@ init_per_suite(Config) -> ok = ekka:start(), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), meck:new(emqx_alarm, [non_strict, passthrough, no_link]), - meck:expect(emqx_alarm, activate, 2, ok), - meck:expect(emqx_alarm, deactivate, 2, ok), + meck:expect(emqx_alarm, activate, 3, ok), + meck:expect(emqx_alarm, deactivate, 3, ok), Config. end_per_suite(_Config) -> @@ -122,17 +122,21 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), Now = erlang:system_time(millisecond), + ct:pal("111:~p~n", [ets:tab2list(cluster_rpc_commit)]), {M, F, A} = {?MODULE, failed_on_other_recover_after_5_second, [erlang:whereis(?NODE1), Now]}, - {ok, _, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), - {ok, _, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000), + {ok, 1, ok} = emqx_cluster_rpc:multicall(M, F, A, 1, 1000), + ct:pal("222:~p~n", [ets:tab2list(cluster_rpc_commit)]), + {ok, 2, ok} = emqx_cluster_rpc:multicall(io, format, ["test"], 1, 1000), + ct:pal("333:~p~n", [ets:tab2list(cluster_rpc_commit)]), + ct:pal("444:~p~n", [emqx_cluster_rpc:status()]), {atomic, [Status|L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ?assertEqual({io, format, ["test"]}, maps:get(mfa, Status)), ?assertEqual(node(), maps:get(node, Status)), - sleep(2300), + ct:sleep(2300), {atomic, [Status1]} = emqx_cluster_rpc:status(), ?assertEqual(Status, Status1), - sleep(3600), + ct:sleep(3600), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), Pid = self(), @@ -161,7 +165,7 @@ t_del_stale_mfa(_Config) -> {ok, TnxId, ok} = emqx_cluster_rpc:multicall(M, F, A), TnxId end || _ <- Keys2], ?assertEqual(Keys2, Ids2), - sleep(1200), + ct:sleep(1200), [begin ?assertEqual({aborted, not_found}, emqx_cluster_rpc:query(I)) end || I <- lists:seq(1, 50)], @@ -177,7 +181,7 @@ t_skip_failed_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), - sleep(180), + ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), ?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}], @@ -250,8 +254,3 @@ failed_on_other_recover_after_5_second(Pid, CreatedAt) -> false -> ok end end. - -sleep(Ms) -> - receive _ -> ok - after Ms -> timeout - end. diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 1c8aeaf83..64514a7aa 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -362,7 +362,7 @@ typename_to_spec("timeout()", _Mod) -> #{<<"oneOf">> => [#{type => string, examp #{type => integer, example => 100}], example => infinity}; typename_to_spec("bytesize()", _Mod) -> #{type => string, example => <<"32MB">>}; typename_to_spec("wordsize()", _Mod) -> #{type => string, example => <<"1024KB">>}; -typename_to_spec("map()", _Mod) -> #{type => string, example => <<>>}; +typename_to_spec("map()", _Mod) -> #{type => object, example => #{}}; typename_to_spec("comma_separated_list()", _Mod) -> #{type => string, example => <<"item1,item2">>}; typename_to_spec("comma_separated_atoms()", _Mod) -> #{type => string, example => <<"item1,item2">>}; typename_to_spec("pool_type()", _Mod) -> #{type => string, enum => [random, hash], example => hash}; diff --git a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl index 92f411da7..dff491225 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl @@ -172,7 +172,7 @@ t_complicated_type(_Config) -> [#{example => infinity, type => string}, #{example => 100, type => integer}]}}, {<<"bytesize">>, #{example => <<"32MB">>, type => string}}, {<<"wordsize">>, #{example => <<"1024KB">>, type => string}}, - {<<"maps">>, #{example => <<>>, type => string}}, + {<<"maps">>, #{example => #{}, type => object}}, {<<"comma_separated_list">>, #{example => <<"item1,item2">>, type => string}}, {<<"comma_separated_atoms">>, #{example => <<"item1,item2">>, type => string}}, {<<"log_level">>, diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index c4e49a616..38d923742 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -18,65 +18,76 @@ -behaviour(minirest_api). --export([api_spec/0]). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("typerefl/include/types.hrl"). + +-export([api_spec/0, paths/0, schema/1, fields/1]). -export([alarms/2]). %% internal export (for query) --export([ query/4 - ]). - -%% notice: from emqx_alarms --define(ACTIVATED_ALARM, emqx_activated_alarm). --define(DEACTIVATED_ALARM, emqx_deactivated_alarm). - --import(emqx_mgmt_util, [ object_array_schema/2 - , schema/1 - , properties/1 - ]). +-export([query/4]). api_spec() -> - {[alarms_api()], []}. + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). -properties() -> - properties([ - {node, string, <<"Alarm in node">>}, - {name, string, <<"Alarm name">>}, - {message, string, <<"Alarm readable information">>}, - {details, object}, - {duration, integer, <<"Alarms duration time; UNIX time stamp, millisecond">>}, - {activate_at, string, <<"Alarms activate time, RFC 3339">>}, - {deactivate_at, string, <<"Nullable, alarms deactivate time, RFC 3339">>} - ]). +paths() -> + ["/alarms"]. -alarms_api() -> - Metadata = #{ +schema("/alarms") -> + #{ + operationId => alarms, get => #{ description => <<"EMQ X alarms">>, - parameters => emqx_mgmt_util:page_params() ++ [#{ - name => activated, - in => query, - description => <<"All alarms, if not specified">>, - required => false, - schema => #{type => boolean, default => true} - }], + parameters => [ + hoconsc:ref(emqx_dashboard_swagger, page), + hoconsc:ref(emqx_dashboard_swagger, limit), + {activated, hoconsc:mk(boolean(), #{in => query, + desc => <<"All alarms, if not specified">>, + nullable => true})} + ], responses => #{ - <<"200">> => - object_array_schema(properties(), <<"List all alarms">>)}}, - delete => #{ + 200 => [ + {data, hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, alarm)), #{})}, + {meta, hoconsc:mk(hoconsc:ref(?MODULE, meta), #{})} + ] + } + }, + delete => #{ description => <<"Remove all deactivated alarms">>, responses => #{ - <<"200">> => - schema(<<"Remove all deactivated alarms ok">>)}}}, - {"/alarms", Metadata, alarms}. + 200 => <<"Remove all deactivated alarms ok">> + } + } + }. +fields(alarm) -> + [ + {node, hoconsc:mk(binary(), #{desc => <<"Alarm in node">>, example => atom_to_list(node())})}, + {name, hoconsc:mk(binary(), #{desc => <<"Alarm name">>, example => <<"high_system_memory_usage">>})}, + {message, hoconsc:mk(binary(), #{desc => <<"Alarm readable information">>, + example => <<"System memory usage is higher than 70%">>})}, + {details, hoconsc:mk(map(), #{desc => <<"Alarm details information">>, + example => #{<<"high_watermark">> => 70}})}, + {duration, hoconsc:mk(integer(), #{desc => <<"Alarms duration time; UNIX time stamp, millisecond">>, + example => 297056})}, + {activate_at, hoconsc:mk(binary(), #{desc => <<"Alarms activate time, RFC 3339">>, + example => <<"2021-10-25T11:52:52.548+08:00">>})}, + {deactivate_at, hoconsc:mk(binary(), #{desc => <<"Nullable, alarms deactivate time, RFC 3339">>, + example => <<"2021-10-31T10:52:52.548+08:00">>})} + ]; + +fields(meta) -> + emqx_dashboard_swagger:fields(page) ++ + emqx_dashboard_swagger:fields(limit) ++ + [{count, hoconsc:mk(integer(), #{example => 1})}]. %%%============================================================================================== %% parameters trans alarms(get, #{query_string := Qs}) -> Table = - case maps:get(<<"activated">>, Qs, <<"true">>) of - <<"true">> -> ?ACTIVATED_ALARM; - <<"false">> -> ?DEACTIVATED_ALARM + case maps:get(<<"activated">>, Qs, true) of + true -> ?ACTIVATED_ALARM; + false -> ?DEACTIVATED_ALARM end, Response = emqx_mgmt_api:cluster_query(Qs, Table, [], {?MODULE, query}), emqx_mgmt_util:generate_response(Response);