diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index ede072cad..f4948bf1e 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -22,7 +22,7 @@ %% This rebar.config is necessary because the app may be used as a %% `git_subdir` dependency in other projects. {deps, [ - {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.1"}}}, + {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.0"}}}, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 9ef410299..a9419b27e 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -35,6 +35,9 @@ deactivate/1, deactivate/2, deactivate/3, + ensure_deactivated/1, + ensure_deactivated/2, + ensure_deactivated/3, delete_all_deactivated_alarms/0, get_alarms/0, get_alarms/1, @@ -113,6 +116,28 @@ activate(Name, Details) -> activate(Name, Details, Message) -> gen_server:call(?MODULE, {activate_alarm, Name, Details, Message}). +-spec ensure_deactivated(binary() | atom()) -> ok. +ensure_deactivated(Name) -> + ensure_deactivated(Name, no_details). + +-spec ensure_deactivated(binary() | atom(), atom() | map()) -> ok. +ensure_deactivated(Name, Data) -> + ensure_deactivated(Name, Data, <<>>). + +-spec ensure_deactivated(binary() | atom(), atom() | map(), iodata()) -> ok. +ensure_deactivated(Name, Data, Message) -> + %% this duplicates the dirty read in handle_call, + %% intention is to avoid making gen_server calls when there is no alarm + case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of + [] -> + ok; + _ -> + case deactivate(Name, Data, Message) of + {error, not_found} -> ok; + Other -> Other + end + end. + -spec deactivate(binary() | atom()) -> ok | {error, not_found}. deactivate(Name) -> deactivate(Name, no_details, <<"">>). diff --git a/apps/emqx/src/emqx_alarm_handler.erl b/apps/emqx/src/emqx_alarm_handler.erl index 2ba280f44..b5967a21d 100644 --- a/apps/emqx/src/emqx_alarm_handler.erl +++ b/apps/emqx/src/emqx_alarm_handler.erl @@ -56,18 +56,6 @@ init({_Args, {alarm_handler, _ExistingAlarms}}) -> init(_) -> {ok, []}. -handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> - HighWatermark = emqx_os_mon:get_sysmem_high_watermark(), - Message = to_bin("System memory usage is higher than ~p%", [HighWatermark]), - emqx_alarm:activate( - high_system_memory_usage, - #{ - high_watermark => HighWatermark, - percent => emqx_os_mon:current_sysmem_percent() - }, - Message - ), - {ok, State}; handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> HighWatermark = emqx_os_mon:get_procmem_high_watermark(), Message = to_bin("Process memory usage is higher than ~p%", [HighWatermark]), @@ -80,11 +68,8 @@ handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> Message ), {ok, State}; -handle_event({clear_alarm, system_memory_high_watermark}, State) -> - _ = emqx_alarm:deactivate(high_system_memory_usage), - {ok, State}; handle_event({clear_alarm, process_memory_high_watermark}, State) -> - _ = emqx_alarm:deactivate(high_process_memory_usage), + emqx_alarm:ensure_deactivated(high_process_memory_usage), {ok, State}; handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> #{node := Node, runq_length := Len} = Info, @@ -92,7 +77,7 @@ handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) -> emqx_alarm:activate(runq_overload, Info, Message), {ok, State}; handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) -> - _ = emqx_alarm:deactivate(runq_overload), + emqx_alarm:ensure_deactivated(runq_overload), {ok, State}; handle_event(_, State) -> {ok, State}. diff --git a/apps/emqx/src/emqx_congestion.erl b/apps/emqx/src/emqx_congestion.erl index f8448b106..1e25ab391 100644 --- a/apps/emqx/src/emqx_congestion.erl +++ b/apps/emqx/src/emqx_congestion.erl @@ -115,7 +115,7 @@ do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> ok = remove_alarm_sent_at(Reason), AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), Message = io_lib:format("connection congested: ~0p", [AlarmDetails]), - emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message), + emqx_alarm:ensure_deactivated(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message), ok. is_tcp_congested(Socket, Transport) -> diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index eafc8f1fd..609ce9002 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -65,10 +65,10 @@ set_mem_check_interval(Seconds) -> memsup:set_check_interval(Seconds div 60000). get_sysmem_high_watermark() -> - memsup:get_sysmem_high_watermark(). + gen_server:call(?OS_MON, ?FUNCTION_NAME, infinity). set_sysmem_high_watermark(Float) -> - memsup:set_sysmem_high_watermark(Float). + gen_server:call(?OS_MON, {?FUNCTION_NAME, Float}, infinity). get_procmem_high_watermark() -> memsup:get_procmem_high_watermark(). @@ -77,37 +77,34 @@ set_procmem_high_watermark(Float) -> memsup:set_procmem_high_watermark(Float). current_sysmem_percent() -> - case erlang:whereis(memsup) of - undefined -> - undefined; - _Pid -> - {Total, Allocated, _Worst} = memsup:get_memory_data(), - case Total =/= 0 of - true -> - erlang:floor((Allocated / Total) * 10000) / 100; - false -> - undefined - end - end. + Ratio = load_ctl:get_memory_usage(), + erlang:floor(Ratio * 10000) / 100. %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> + %% memsup is not reliable, ignore + memsup:set_sysmem_high_watermark(1.0), #{ sysmem_high_watermark := SysHW, procmem_high_watermark := PHW, mem_check_interval := MCI } = emqx:get_config([sysmon, os]), - set_sysmem_high_watermark(SysHW), set_procmem_high_watermark(PHW), set_mem_check_interval(MCI), - ensure_system_memory_alarm(SysHW), - _ = start_check_timer(), - {ok, #{}}. + update_mem_alarm_stauts(SysHW), + _ = start_mem_check_timer(), + _ = start_cpu_check_timer(), + {ok, #{sysmem_high_watermark => SysHW}}. +handle_call(get_sysmem_high_watermark, _From, #{sysmem_high_watermark := HWM} = State) -> + {reply, HWM, State}; +handle_call({set_sysmem_high_watermark, New}, _From, #{sysmem_high_watermark := _Old} = State) -> + ok = update_mem_alarm_stauts(New), + {reply, ok, State#{sysmem_high_watermark := New}}; handle_call(Req, _From, State) -> {reply, {error, {unexpected_call, Req}}, State}. @@ -115,43 +112,40 @@ handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), {noreply, State}. -handle_info({timeout, _Timer, check}, State) -> +handle_info({timeout, _Timer, mem_check}, #{sysmem_high_watermark := HWM} = State) -> + ok = update_mem_alarm_stauts(HWM), + ok = start_mem_check_timer(), + {noreply, State}; +handle_info({timeout, _Timer, cpu_check}, State) -> CPUHighWatermark = emqx:get_config([sysmon, os, cpu_high_watermark]) * 100, CPULowWatermark = emqx:get_config([sysmon, os, cpu_low_watermark]) * 100, - %% TODO: should be improved? - _ = - case emqx_vm:cpu_util() of - 0 -> - ok; - Busy when Busy > CPUHighWatermark -> - Usage = list_to_binary(io_lib:format("~.2f%", [Busy])), - Message = <>, - emqx_alarm:activate( - high_cpu_usage, - #{ - usage => Usage, - high_watermark => CPUHighWatermark, - low_watermark => CPULowWatermark - }, - Message - ), - start_check_timer(); - Busy when Busy < CPULowWatermark -> - Usage = list_to_binary(io_lib:format("~.2f%", [Busy])), - Message = <>, - emqx_alarm:deactivate( - high_cpu_usage, - #{ - usage => Usage, - high_watermark => CPUHighWatermark, - low_watermark => CPULowWatermark - }, - Message - ), - start_check_timer(); - _Busy -> - start_check_timer() - end, + case emqx_vm:cpu_util() of + 0 -> + ok; + Busy when Busy > CPUHighWatermark -> + _ = emqx_alarm:activate( + high_cpu_usage, + #{ + usage => Busy, + high_watermark => CPUHighWatermark, + low_watermark => CPULowWatermark + }, + usage_msg(Busy, cpu) + ); + Busy when Busy < CPULowWatermark -> + ok = emqx_alarm:ensure_deactivated( + high_cpu_usage, + #{ + usage => Busy, + high_watermark => CPUHighWatermark, + low_watermark => CPULowWatermark + }, + usage_msg(Busy, cpu) + ); + _Busy -> + ok + end, + ok = start_cpu_check_timer(), {noreply, State}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), @@ -167,26 +161,66 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -start_check_timer() -> +start_cpu_check_timer() -> Interval = emqx:get_config([sysmon, os, cpu_check_interval]), case erlang:system_info(system_architecture) of "x86_64-pc-linux-musl" -> ok; - _ -> emqx_misc:start_timer(Interval, check) + _ -> start_timer(Interval, cpu_check) end. -%% At startup, memsup starts first and checks for memory alarms, -%% but emqx_alarm_handler is not yet used instead of alarm_handler, -%% so alarm_handler is used directly for notification (normally emqx_alarm_handler should be used). -%%The internal memsup will no longer trigger events that have been alerted, -%% and there is no exported function to remove the alerted flag, -%% so it can only be checked again at startup. - -ensure_system_memory_alarm(HW) when HW =< 1.0 andalso HW >= 0 -> - case current_sysmem_percent() of - Usage when Usage > (HW * 100) -> - gen_event:notify( - alarm_handler, {set_alarm, {system_memory_high_watermark, []}} - ); - _ -> +start_mem_check_timer() -> + Interval = emqx:get_config([sysmon, os, mem_check_interval]), + IsSupported = + case os:type() of + {unix, linux} -> + true; + _ -> + %% sorry Mac and windows, for now + false + end, + case is_integer(Interval) andalso IsSupported of + true -> + start_timer(Interval, mem_check); + false -> ok end. + +start_timer(Interval, Msg) -> + _ = emqx_misc:start_timer(Interval, Msg), + ok. + +update_mem_alarm_stauts(HWM) when HWM > 1.0 orelse HWM < 0.0 -> + ?SLOG(warning, #{msg => "discarded_out_of_range_mem_alarm_threshold", value => HWM}), + ok = emqx_alarm:ensure_deactivated( + high_system_memory_usage, + #{}, + <<"Deactivated mem usage alarm due to out of range threshold">> + ); +update_mem_alarm_stauts(HWM0) -> + HWM = HWM0 * 100, + Usage = current_sysmem_percent(), + case Usage > HWM of + true -> + _ = emqx_alarm:activate( + high_system_memory_usage, + #{ + usage => Usage, + high_watermark => HWM + }, + usage_msg(Usage, mem) + ); + _ -> + ok = emqx_alarm:ensure_deactivated( + high_system_memory_usage, + #{ + usage => Usage, + high_watermark => HWM + }, + usage_msg(Usage, mem) + ) + end, + ok. + +usage_msg(Usage, What) -> + %% devide by 1.0 to ensure float point number + iolist_to_binary(io_lib:format("~.2f% ~p usage", [Usage / 1.0, What])). diff --git a/apps/emqx/src/emqx_sys_mon.erl b/apps/emqx/src/emqx_sys_mon.erl index 697ced06e..78da63057 100644 --- a/apps/emqx/src/emqx_sys_mon.erl +++ b/apps/emqx/src/emqx_sys_mon.erl @@ -195,7 +195,7 @@ handle_partition_event({partition, {occurred, Node}}) -> emqx_alarm:activate(partition, #{occurred => Node}, Message); handle_partition_event({partition, {healed, Node}}) -> Message = io_lib:format("Partition healed at node ~ts", [Node]), - emqx_alarm:deactivate(partition, no_details, Message). + emqx_alarm:ensure_deactivated(partition, no_details, Message). suppress(Key, SuccFun, State = #{events := Events}) -> case lists:member(Key, Events) of diff --git a/apps/emqx/src/emqx_vm.erl b/apps/emqx/src/emqx_vm.erl index 61cc55e47..731d05844 100644 --- a/apps/emqx/src/emqx_vm.erl +++ b/apps/emqx/src/emqx_vm.erl @@ -233,8 +233,7 @@ mem_info() -> [{total_memory, Total}, {used_memory, Total - Free}]. ftos(F) -> - S = io_lib:format("~.2f", [F]), - S. + io_lib:format("~.2f", [F / 1.0]). %%%% erlang vm scheduler_usage fun copied from recon scheduler_usage(Interval) when is_integer(Interval) -> diff --git a/apps/emqx/src/emqx_vm_mon.erl b/apps/emqx/src/emqx_vm_mon.erl index c7484c741..299c20c28 100644 --- a/apps/emqx/src/emqx_vm_mon.erl +++ b/apps/emqx/src/emqx_vm_mon.erl @@ -77,7 +77,7 @@ handle_info({timeout, _Timer, check}, State) -> Percent when Percent < ProcLowWatermark -> Usage = io_lib:format("~p%", [Percent * 100]), Message = [Usage, " process usage"], - emqx_alarm:deactivate( + emqx_alarm:ensure_deactivated( too_many_processes, #{ usage => Usage, diff --git a/apps/emqx/test/emqx_os_mon_SUITE.erl b/apps/emqx/test/emqx_os_mon_SUITE.erl index 38bc2acf2..c558669af 100644 --- a/apps/emqx/test/emqx_os_mon_SUITE.erl +++ b/apps/emqx/test/emqx_os_mon_SUITE.erl @@ -33,8 +33,6 @@ init_per_suite(Config) -> {cpu_check_interval, 1}, {cpu_high_watermark, 5}, {cpu_low_watermark, 80}, - {mem_check_interval, 60}, - {sysmem_high_watermark, 70}, {procmem_high_watermark, 5} ]); (_) -> @@ -53,9 +51,9 @@ t_api(_) -> ?assertEqual(ok, emqx_os_mon:set_mem_check_interval(122000)), ?assertEqual(120000, emqx_os_mon:get_mem_check_interval()), - ?assertEqual(70, emqx_os_mon:get_sysmem_high_watermark()), + ?assertEqual(0.7, emqx_os_mon:get_sysmem_high_watermark()), ?assertEqual(ok, emqx_os_mon:set_sysmem_high_watermark(0.8)), - ?assertEqual(80, emqx_os_mon:get_sysmem_high_watermark()), + ?assertEqual(0.8, emqx_os_mon:get_sysmem_high_watermark()), ?assertEqual(5, emqx_os_mon:get_procmem_high_watermark()), ?assertEqual(ok, emqx_os_mon:set_procmem_high_watermark(0.11)), diff --git a/apps/emqx_resource/src/emqx_resource_health_check.erl b/apps/emqx_resource/src/emqx_resource_health_check.erl index 265592582..88f3c3bb9 100644 --- a/apps/emqx_resource/src/emqx_resource_health_check.erl +++ b/apps/emqx_resource/src/emqx_resource_health_check.erl @@ -77,7 +77,7 @@ health_check(Name) -> {Pid, begin_health_check} -> case emqx_resource:health_check(Name) of ok -> - emqx_alarm:deactivate(Name); + emqx_alarm:ensure_deactivated(Name); {error, _} -> emqx_alarm:activate( Name, diff --git a/lib-ee/emqx_license/src/emqx_license_checker.erl b/lib-ee/emqx_license/src/emqx_license_checker.erl index 473ca0965..5840d8917 100644 --- a/lib-ee/emqx_license/src/emqx_license_checker.erl +++ b/lib-ee/emqx_license/src/emqx_license_checker.erl @@ -197,7 +197,7 @@ expiry_early_alarm(License) -> Date = iolist_to_binary(io_lib:format("~B~2..0B~2..0B", [Y, M, D])), ?OK(emqx_alarm:activate(license_expiry, #{expiry_at => Date})); false -> - ?OK(emqx_alarm:deactivate(license_expiry)) + ?OK(emqx_alarm:ensure_deactivated(license_expiry)) end. print_warnings(Warnings) -> diff --git a/lib-ee/emqx_license/src/emqx_license_resources.erl b/lib-ee/emqx_license/src/emqx_license_resources.erl index 96300d60b..551601923 100644 --- a/lib-ee/emqx_license/src/emqx_license_resources.erl +++ b/lib-ee/emqx_license/src/emqx_license_resources.erl @@ -103,7 +103,7 @@ connection_quota_early_alarm({ok, #{max_connections := Max}}) when is_integer(Ma ]), ?OK(emqx_alarm:activate(license_quota, #{high_watermark => HighPercent}, Message)) end, - Count < Max * Low andalso ?OK(emqx_alarm:deactivate(license_quota)); + Count < Max * Low andalso ?OK(emqx_alarm:ensure_deactivated(license_quota)); connection_quota_early_alarm(_Limits) -> ok. diff --git a/mix.exs b/mix.exs index bea43bf90..bb0960f60 100644 --- a/mix.exs +++ b/mix.exs @@ -46,7 +46,7 @@ defmodule EMQXUmbrella.MixProject do # we need several overrides here because dependencies specify # other exact versions, and not ranges. [ - {:lc, github: "emqx/lc", tag: "0.2.1"}, + {:lc, github: "emqx/lc", tag: "0.3.0"}, {:redbug, "2.0.7"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, {:ehttpc, github: "emqx/ehttpc", tag: "0.2.0"}, diff --git a/rebar.config b/rebar.config index 692a5dfed..11e36b5e1 100644 --- a/rebar.config +++ b/rebar.config @@ -44,7 +44,7 @@ {post_hooks,[]}. {deps, - [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.1"}}} + [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.0"}}} , {redbug, "2.0.7"} , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}