From 2d67bb3fb67cb8544dd401699d93ad01d67f59b2 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Sun, 29 Jan 2023 10:25:28 +0800 Subject: [PATCH 01/12] fix: /api/nodes is timeout if emqx in high load --- apps/emqx/src/emqx_os_mon.erl | 8 +- apps/emqx/src/emqx_vm.erl | 11 ++- apps/emqx/src/emqx_vm_mon.erl | 7 +- apps/emqx_management/src/emqx_mgmt.erl | 2 +- apps/emqx_management/src/emqx_mgmt_sup.erl | 13 ++- .../src/emqx_mgmt_sys_memory.erl | 79 +++++++++++++++++++ 6 files changed, 109 insertions(+), 11 deletions(-) create mode 100644 apps/emqx_management/src/emqx_mgmt_sys_memory.erl diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index a06f56a4c..5c6987ea0 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -130,8 +130,10 @@ handle_info({timeout, _Timer, mem_check}, #{sysmem_high_watermark := HWM} = Stat handle_info({timeout, _Timer, cpu_check}, State) -> CPUHighWatermark = emqx:get_config([sysmon, os, cpu_high_watermark]) * 100, CPULowWatermark = emqx:get_config([sysmon, os, cpu_low_watermark]) * 100, - case emqx_vm:cpu_util() of - 0 -> + CPUVal = emqx_vm:cpu_util(), + case CPUVal of + %% 0 or 0.0 + Busy when Busy == 0 -> ok; Busy when Busy > CPUHighWatermark -> _ = emqx_alarm:activate( @@ -236,5 +238,5 @@ do_update_mem_alarm_status(HWM0) -> ok. usage_msg(Usage, What) -> - %% devide by 1.0 to ensure float point number + %% divide by 1.0 to ensure float point number iolist_to_binary(io_lib:format("~.2f% ~p usage", [Usage / 1.0, What])). diff --git a/apps/emqx/src/emqx_vm.erl b/apps/emqx/src/emqx_vm.erl index cf1a9dc08..fc94a461a 100644 --- a/apps/emqx/src/emqx_vm.erl +++ b/apps/emqx/src/emqx_vm.erl @@ -232,8 +232,10 @@ mem_info() -> Free = proplists:get_value(free_memory, Dataset), [{total_memory, Total}, {used_memory, Total - Free}]. -ftos(F) -> - io_lib:format("~.2f", [F / 1.0]). +ftos(F) when is_float(F) -> + float_to_binary(F, [{decimals, 2}]); +ftos(F) when is_integer(F) -> + ftos(F / 1.0). %%%% erlang vm scheduler_usage fun copied from recon scheduler_usage(Interval) when is_integer(Interval) -> @@ -391,11 +393,12 @@ cpu_util() -> compat_windows(Fun) -> case os:type() of {win32, nt} -> - 0; + 0.0; _Type -> case catch Fun() of + Val when is_float(Val) -> floor(Val * 100) / 100; Val when is_number(Val) -> Val; - _Error -> 0 + _Error -> 0.0 end end. diff --git a/apps/emqx/src/emqx_vm_mon.erl b/apps/emqx/src/emqx_vm_mon.erl index 5447e94e9..1327a1bb0 100644 --- a/apps/emqx/src/emqx_vm_mon.erl +++ b/apps/emqx/src/emqx_vm_mon.erl @@ -63,7 +63,7 @@ handle_info({timeout, _Timer, check}, State) -> ProcessCount = erlang:system_info(process_count), case ProcessCount / erlang:system_info(process_limit) of Percent when Percent > ProcHighWatermark -> - Usage = io_lib:format("~p%", [Percent * 100]), + Usage = usage(Percent), Message = [Usage, " process usage"], emqx_alarm:activate( too_many_processes, @@ -75,7 +75,7 @@ handle_info({timeout, _Timer, check}, State) -> Message ); Percent when Percent < ProcLowWatermark -> - Usage = io_lib:format("~p%", [Percent * 100]), + Usage = usage(Percent), Message = [Usage, " process usage"], emqx_alarm:ensure_deactivated( too_many_processes, @@ -108,3 +108,6 @@ code_change(_OldVsn, State, _Extra) -> start_check_timer() -> Interval = emqx:get_config([sysmon, vm, process_check_interval]), emqx_misc:start_timer(Interval, check). + +usage(Percent) -> + integer_to_list(floor(Percent * 100)) ++ "%". diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 6b38e8ca0..09adde8bd 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -150,7 +150,7 @@ node_info() -> get_sys_memory() -> case os:type() of {unix, linux} -> - load_ctl:get_sys_memory(); + emqx_mgmt_sys_memory:get_sys_memory(); _ -> {0, 0} end. diff --git a/apps/emqx_management/src/emqx_mgmt_sup.erl b/apps/emqx_management/src/emqx_mgmt_sup.erl index 329532fa1..fa49c02a6 100644 --- a/apps/emqx_management/src/emqx_mgmt_sup.erl +++ b/apps/emqx_management/src/emqx_mgmt_sup.erl @@ -26,4 +26,15 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, {{one_for_one, 1, 5}, []}}. + LC = child_spec(emqx_mgmt_sys_memory, 5000, worker), + {ok, {{one_for_one, 1, 5}, [LC]}}. + +child_spec(Mod, Shutdown, Type) -> + #{ + id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => Shutdown, + type => Type, + modules => [Mod] + }. diff --git a/apps/emqx_management/src/emqx_mgmt_sys_memory.erl b/apps/emqx_management/src/emqx_mgmt_sys_memory.erl new file mode 100644 index 000000000..d393caabe --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_sys_memory.erl @@ -0,0 +1,79 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_sys_memory). + +-behaviour(gen_server). +-define(SYS_MEMORY_CACHE_KEY, ?MODULE). +-define(TIMEOUT, 3000). + +-export([start_link/0, get_sys_memory/0, get_sys_memory/1]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +get_sys_memory() -> + get_sys_memory(?TIMEOUT). + +get_sys_memory(Timeout) -> + try + gen_server:call(?MODULE, get_sys_memory, Timeout) + catch + exit:{timeout, _} -> + get_memory_from_cache() + end. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + {ok, #{last_time => 0}}. + +handle_call(get_sys_memory, _From, State = #{last_time := LastTime}) -> + Now = erlang:system_time(millisecond), + case Now - LastTime >= ?TIMEOUT of + true -> + Memory = load_ctl:get_sys_memory(), + persistent_term:put(?SYS_MEMORY_CACHE_KEY, Memory), + {reply, Memory, State#{last_time => Now}}; + false -> + {reply, get_memory_from_cache(), State} + end; +handle_call(_Request, _From, State = #{}) -> + {reply, ok, State}. + +handle_cast(_Request, State = #{}) -> + {noreply, State}. + +handle_info(_Info, State = #{}) -> + {noreply, State}. + +terminate(_Reason, _State = #{}) -> + ok. + +code_change(_OldVsn, State = #{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +get_memory_from_cache() -> + persistent_term:get(?SYS_MEMORY_CACHE_KEY, {0, 0}). From 5783127c3017da8a5855daf8ad37763aae3f52e7 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Sun, 29 Jan 2023 11:01:43 +0800 Subject: [PATCH 02/12] test: cpu_sup:load mock test --- apps/emqx/src/emqx_vm.erl | 4 +--- apps/emqx/test/emqx_vm_SUITE.erl | 16 +++++++++++++++- .../test/emqx_mgmt_api_alarms_SUITE.erl | 3 +++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_vm.erl b/apps/emqx/src/emqx_vm.erl index fc94a461a..c1096f611 100644 --- a/apps/emqx/src/emqx_vm.erl +++ b/apps/emqx/src/emqx_vm.erl @@ -233,9 +233,7 @@ mem_info() -> [{total_memory, Total}, {used_memory, Total - Free}]. ftos(F) when is_float(F) -> - float_to_binary(F, [{decimals, 2}]); -ftos(F) when is_integer(F) -> - ftos(F / 1.0). + float_to_binary(F, [{decimals, 2}]). %%%% erlang vm scheduler_usage fun copied from recon scheduler_usage(Interval) when is_integer(Interval) -> diff --git a/apps/emqx/test/emqx_vm_SUITE.erl b/apps/emqx/test/emqx_vm_SUITE.erl index f9809361b..9115c5ab4 100644 --- a/apps/emqx/test/emqx_vm_SUITE.erl +++ b/apps/emqx/test/emqx_vm_SUITE.erl @@ -24,7 +24,21 @@ all() -> emqx_common_test_helpers:all(?MODULE). t_load(_Config) -> - ?assertMatch([{load1, _}, {load5, _}, {load15, _}], emqx_vm:loads()). + lists:foreach( + fun(Avg, Int) -> + emqx_common_test_helpers:with_mock( + cpu_sup, + Avg, + fun() -> Int end, + fun() -> + Load = proplists:get_value(Avg, emqx_vm:loads()), + ?assertEqual(Int / 1.0, Load) + end + ), + ?assertMatch([{load1, _}, {load5, _}, {load15, _}], emqx_vm:loads()) + end, + [{load1, 1}, {load5, 5}, {load15, 15}] + ). t_systeminfo(_Config) -> ?assertEqual( diff --git a/apps/emqx_management/test/emqx_mgmt_api_alarms_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_alarms_SUITE.erl index adff41214..2c61651bf 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_alarms_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_alarms_SUITE.erl @@ -40,6 +40,9 @@ t_alarms_api(_) -> get_alarms(1, true), get_alarms(1, false). +t_alarm_cpu(_) -> + ok. + t_delete_alarms_api(_) -> Path = emqx_mgmt_api_test_util:api_path(["alarms"]), {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Path), From 6162f90610758c14e3d280869660e70e2857ccdd Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Sat, 28 Jan 2023 11:53:54 +0800 Subject: [PATCH 03/12] fix: don't crash when OTP_VERSION file is missing --- apps/emqx/src/emqx_vm.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_vm.erl b/apps/emqx/src/emqx_vm.erl index c1096f611..50582a2cc 100644 --- a/apps/emqx/src/emqx_vm.erl +++ b/apps/emqx/src/emqx_vm.erl @@ -400,7 +400,7 @@ compat_windows(Fun) -> end end. -%% @doc Return on which Eralng/OTP the current vm is running. +%% @doc Return on which Erlang/OTP the current vm is running. %% NOTE: This API reads a file, do not use it in critical code paths. get_otp_version() -> read_otp_version(). @@ -417,6 +417,8 @@ read_otp_version() -> %% running tests etc. OtpMajor = erlang:system_info(otp_release), OtpVsnFile = filename:join([ReleasesDir, OtpMajor, "OTP_VERSION"]), - {ok, Vsn} = file:read_file(OtpVsnFile), - Vsn + case file:read_file(OtpVsnFile) of + {ok, Vsn} -> Vsn; + {error, enoent} -> list_to_binary(OtpMajor) + end end. From 0b19be074c7856c9f52a53364f05145b65eda5ed Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Sat, 28 Jan 2023 12:07:50 +0800 Subject: [PATCH 04/12] feat: cache OTP_VERSION in persistent_term --- apps/emqx/src/emqx_vm.erl | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_vm.erl b/apps/emqx/src/emqx_vm.erl index 50582a2cc..7da49016d 100644 --- a/apps/emqx/src/emqx_vm.erl +++ b/apps/emqx/src/emqx_vm.erl @@ -401,9 +401,19 @@ compat_windows(Fun) -> end. %% @doc Return on which Erlang/OTP the current vm is running. -%% NOTE: This API reads a file, do not use it in critical code paths. +%% The dashboard's /api/nodes endpoint will call this function frequently. +%% we should avoid reading file every time. +%% The OTP version never changes at runtime expect upgrade erts, +%% so we cache it in a persistent term for performance. get_otp_version() -> - read_otp_version(). + case persistent_term:get(emqx_otp_version, undefined) of + undefined -> + OtpVsn = read_otp_version(), + persistent_term:put(emqx_otp_version, OtpVsn), + OtpVsn; + OtpVsn when is_binary(OtpVsn) -> + OtpVsn + end. read_otp_version() -> ReleasesDir = filename:join([code:root_dir(), "releases"]), From b6e6315b5076c39a2c394623651fdfc44dc3b1a9 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 30 Jan 2023 11:53:49 +0800 Subject: [PATCH 05/12] feat: change loads from string to float --- apps/emqx/src/emqx_alarm.erl | 5 +- apps/emqx/src/emqx_os_mon.erl | 34 +++-- apps/emqx/src/emqx_vm.erl | 12 +- apps/emqx/test/emqx_os_mon_SUITE.erl | 142 ++++++++++++++++-- apps/emqx/test/emqx_vm_SUITE.erl | 15 +- apps/emqx/test/emqx_vm_mon_SUITE.erl | 26 +++- apps/emqx_management/src/emqx_mgmt.erl | 2 +- .../src/emqx_mgmt_api_nodes.erl | 12 +- .../src/emqx_mgmt_sys_memory.erl | 2 +- .../test/emqx_mgmt_api_nodes_SUITE.erl | 4 +- 10 files changed, 199 insertions(+), 55 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 209715a85..84c40ef2a 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -325,19 +325,20 @@ deactivate_alarm( false -> ok end, + Now = erlang:system_time(microsecond), HistoryAlarm = make_deactivated_alarm( ActivateAt, Name, Details0, Msg0, - erlang:system_time(microsecond) + Now ), DeActAlarm = make_deactivated_alarm( ActivateAt, Name, Details, normalize_message(Name, iolist_to_binary(Message)), - erlang:system_time(microsecond) + Now ), mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), mria:dirty_delete(?ACTIVATED_ALARM, Name), diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index 5c6987ea0..c5ce35bf9 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -93,9 +93,9 @@ init([]) -> %% memsup is not reliable, ignore memsup:set_sysmem_high_watermark(1.0), SysHW = init_os_monitor(), - _ = start_mem_check_timer(), - _ = start_cpu_check_timer(), - {ok, #{sysmem_high_watermark => SysHW}}. + MemRef = start_mem_check_timer(), + CpuRef = start_cpu_check_timer(), + {ok, #{sysmem_high_watermark => SysHW, mem_time_ref => MemRef, cpu_time_ref => CpuRef}}. init_os_monitor() -> init_os_monitor(emqx:get_config([sysmon, os])). @@ -125,8 +125,8 @@ handle_cast(Msg, State) -> handle_info({timeout, _Timer, mem_check}, #{sysmem_high_watermark := HWM} = State) -> ok = update_mem_alarm_status(HWM), - ok = start_mem_check_timer(), - {noreply, State}; + Ref = start_mem_check_timer(), + {noreply, State#{mem_time_ref => Ref}}; handle_info({timeout, _Timer, cpu_check}, State) -> CPUHighWatermark = emqx:get_config([sysmon, os, cpu_high_watermark]) * 100, CPULowWatermark = emqx:get_config([sysmon, os, cpu_low_watermark]) * 100, @@ -158,11 +158,14 @@ handle_info({timeout, _Timer, cpu_check}, State) -> _Busy -> ok end, - ok = start_cpu_check_timer(), - {noreply, State}; -handle_info({monitor_conf_update, OS}, _State) -> + Ref = start_cpu_check_timer(), + {noreply, State#{cpu_time_ref => Ref}}; +handle_info({monitor_conf_update, OS}, State) -> + cancel_outdated_timer(State), SysHW = init_os_monitor(OS), - {noreply, #{sysmem_high_watermark => SysHW}}; + MemRef = start_mem_check_timer(), + CpuRef = start_cpu_check_timer(), + {noreply, #{sysmem_high_watermark => SysHW, mem_time_ref => MemRef, cpu_time_ref => CpuRef}}; handle_info(Info, State) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {noreply, State}. @@ -176,11 +179,15 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- +cancel_outdated_timer(#{mem_time_ref := MemRef, cpu_time_ref := CpuRef}) -> + emqx_misc:cancel_timer(MemRef), + emqx_misc:cancel_timer(CpuRef), + ok. start_cpu_check_timer() -> Interval = emqx:get_config([sysmon, os, cpu_check_interval]), case erlang:system_info(system_architecture) of - "x86_64-pc-linux-musl" -> ok; + "x86_64-pc-linux-musl" -> undefined; _ -> start_timer(Interval, cpu_check) end. @@ -193,12 +200,11 @@ start_mem_check_timer() -> true -> start_timer(Interval, mem_check); false -> - ok + undefined end. start_timer(Interval, Msg) -> - _ = emqx_misc:start_timer(Interval, Msg), - ok. + emqx_misc:start_timer(Interval, Msg). update_mem_alarm_status(HWM) when HWM > 1.0 orelse HWM < 0.0 -> ?SLOG(warning, #{msg => "discarded_out_of_range_mem_alarm_threshold", value => HWM}), @@ -225,7 +231,7 @@ do_update_mem_alarm_status(HWM0) -> }, usage_msg(Usage, mem) ); - _ -> + false -> ok = emqx_alarm:ensure_deactivated( high_system_memory_usage, #{ diff --git a/apps/emqx/src/emqx_vm.erl b/apps/emqx/src/emqx_vm.erl index 7da49016d..f80d18a3a 100644 --- a/apps/emqx/src/emqx_vm.erl +++ b/apps/emqx/src/emqx_vm.erl @@ -175,9 +175,9 @@ schedulers() -> loads() -> [ - {load1, ftos(avg1() / 256)}, - {load5, ftos(avg5() / 256)}, - {load15, ftos(avg15() / 256)} + {load1, load(avg1())}, + {load5, load(avg5())}, + {load15, load(avg15())} ]. system_info_keys() -> ?SYSTEM_INFO_KEYS. @@ -232,9 +232,6 @@ mem_info() -> Free = proplists:get_value(free_memory, Dataset), [{total_memory, Total}, {used_memory, Total - Free}]. -ftos(F) when is_float(F) -> - float_to_binary(F, [{decimals, 2}]). - %%%% erlang vm scheduler_usage fun copied from recon scheduler_usage(Interval) when is_integer(Interval) -> %% We start and stop the scheduler_wall_time system flag @@ -400,6 +397,9 @@ compat_windows(Fun) -> end end. +load(Avg) -> + floor((Avg / 256) * 100) / 100. + %% @doc Return on which Erlang/OTP the current vm is running. %% The dashboard's /api/nodes endpoint will call this function frequently. %% we should avoid reading file every time. diff --git a/apps/emqx/test/emqx_os_mon_SUITE.erl b/apps/emqx/test/emqx_os_mon_SUITE.erl index 8729bbdb6..0c5a1f261 100644 --- a/apps/emqx/test/emqx_os_mon_SUITE.erl +++ b/apps/emqx/test/emqx_os_mon_SUITE.erl @@ -25,25 +25,44 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps( - [], - fun - (emqx) -> - application:set_env(emqx, os_mon, [ - {cpu_check_interval, 1}, - {cpu_high_watermark, 5}, - {cpu_low_watermark, 80}, - {procmem_high_watermark, 5} - ]); - (_) -> - ok - end - ), + emqx_common_test_helpers:start_apps([]), Config. end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]). +init_per_testcase(t_cpu_check_alarm, Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + SysMon = emqx_config:get([sysmon, os], #{}), + emqx_config:put([sysmon, os], SysMon#{ + cpu_high_watermark => 0.9, + cpu_low_watermark => 0, + %% 200ms + cpu_check_interval => 200 + }), + ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon), + {ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon), + Config; +init_per_testcase(t_sys_mem_check_alarm, Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + SysMon = emqx_config:get([sysmon, os], #{}), + emqx_config:put([sysmon, os], SysMon#{ + sysmem_high_watermark => 0.51, + %% 200ms + mem_check_interval => 200 + }), + ok = meck:new(os, [non_strict, no_link, no_history, passthrough, unstick]), + ok = meck:expect(os, type, fun() -> {unix, linux} end), + ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon), + {ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon), + Config; +init_per_testcase(_, Config) -> + emqx_common_test_helpers:boot_modules(all), + emqx_common_test_helpers:start_apps([]), + Config. + t_api(_) -> ?assertEqual(60000, emqx_os_mon:get_mem_check_interval()), ?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30000)), @@ -67,3 +86,98 @@ t_api(_) -> emqx_os_mon ! ignored, gen_server:stop(emqx_os_mon), ok. + +t_sys_mem_check_alarm(_) -> + emqx_config:put([sysmon, os, mem_check_interval], 200), + emqx_os_mon:update(emqx_config:get([sysmon, os])), + Mem = 0.52345, + Usage = floor(Mem * 10000) / 100, + emqx_common_test_helpers:with_mock( + load_ctl, + get_memory_usage, + fun() -> Mem end, + fun() -> + timer:sleep(500), + Alarms = emqx_alarm:get_alarms(activated), + ?assert( + emqx_vm_mon_SUITE:is_existing( + high_system_memory_usage, emqx_alarm:get_alarms(activated) + ), + #{ + load_ctl_memory => load_ctl:get_memory_usage(), + config => emqx_config:get([sysmon, os]), + process => sys:get_state(emqx_os_mon), + alarms => Alarms + } + ), + [ + #{ + activate_at := _, + activated := true, + deactivate_at := infinity, + details := #{high_watermark := 51.0, usage := RealUsage}, + message := Msg, + name := high_system_memory_usage + } + ] = + lists:filter( + fun + (#{name := high_system_memory_usage}) -> true; + (_) -> false + end, + Alarms + ), + ?assert(RealUsage >= Usage, {RealUsage, Usage}), + ?assert(is_binary(Msg)), + emqx_config:put([sysmon, os, sysmem_high_watermark], 0.99999), + ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon), + {ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon), + timer:sleep(600), + Activated = emqx_alarm:get_alarms(activated), + ?assertNot( + emqx_vm_mon_SUITE:is_existing(high_system_memory_usage, Activated), + #{activated => Activated, process_state => sys:get_state(emqx_os_mon)} + ) + end + ). + +t_cpu_check_alarm(_) -> + CpuUtil = 90.12345, + Usage = floor(CpuUtil * 100) / 100, + emqx_common_test_helpers:with_mock( + cpu_sup, + util, + fun() -> CpuUtil end, + fun() -> + timer:sleep(500), + Alarms = emqx_alarm:get_alarms(activated), + ?assert( + emqx_vm_mon_SUITE:is_existing(high_cpu_usage, emqx_alarm:get_alarms(activated)) + ), + [ + #{ + activate_at := _, + activated := true, + deactivate_at := infinity, + details := #{high_watermark := 90.0, low_watermark := 0, usage := RealUsage}, + message := Msg, + name := high_cpu_usage + } + ] = + lists:filter( + fun + (#{name := high_cpu_usage}) -> true; + (_) -> false + end, + Alarms + ), + ?assert(RealUsage >= Usage, {RealUsage, Usage}), + ?assert(is_binary(Msg)), + emqx_config:put([sysmon, os, cpu_high_watermark], 1), + emqx_config:put([sysmon, os, cpu_low_watermark], 0.96), + timer:sleep(500), + ?assertNot( + emqx_vm_mon_SUITE:is_existing(high_cpu_usage, emqx_alarm:get_alarms(activated)) + ) + end + ). diff --git a/apps/emqx/test/emqx_vm_SUITE.erl b/apps/emqx/test/emqx_vm_SUITE.erl index 9115c5ab4..35f37a41e 100644 --- a/apps/emqx/test/emqx_vm_SUITE.erl +++ b/apps/emqx/test/emqx_vm_SUITE.erl @@ -25,19 +25,22 @@ all() -> emqx_common_test_helpers:all(?MODULE). t_load(_Config) -> lists:foreach( - fun(Avg, Int) -> + fun({Avg, LoadKey, Int}) -> emqx_common_test_helpers:with_mock( cpu_sup, Avg, fun() -> Int end, fun() -> - Load = proplists:get_value(Avg, emqx_vm:loads()), - ?assertEqual(Int / 1.0, Load) + Load = proplists:get_value(LoadKey, emqx_vm:loads()), + ?assertEqual(Int / 256, Load) end - ), - ?assertMatch([{load1, _}, {load5, _}, {load15, _}], emqx_vm:loads()) + ) end, - [{load1, 1}, {load5, 5}, {load15, 15}] + [{avg1, load1, 0}, {avg5, load5, 128}, {avg15, load15, 256}] + ), + ?assertMatch( + [{load1, _}, {load5, _}, {load15, _}], + emqx_vm:loads() ). t_systeminfo(_Config) -> diff --git a/apps/emqx/test/emqx_vm_mon_SUITE.erl b/apps/emqx/test/emqx_vm_mon_SUITE.erl index 140a00010..ceeffafb5 100644 --- a/apps/emqx/test/emqx_vm_mon_SUITE.erl +++ b/apps/emqx/test/emqx_vm_mon_SUITE.erl @@ -23,13 +23,13 @@ all() -> emqx_common_test_helpers:all(?MODULE). -init_per_testcase(t_alarms, Config) -> +init_per_testcase(t_too_many_processes_alarm, Config) -> emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), emqx_config:put([sysmon, vm], #{ process_high_watermark => 0, process_low_watermark => 0, - %% 1s + %% 100ms process_check_interval => 100 }), ok = supervisor:terminate_child(emqx_sys_sup, emqx_vm_mon), @@ -43,9 +43,29 @@ init_per_testcase(_, Config) -> end_per_testcase(_, _Config) -> emqx_common_test_helpers:stop_apps([]). -t_alarms(_) -> +t_too_many_processes_alarm(_) -> timer:sleep(500), + Alarms = emqx_alarm:get_alarms(activated), ?assert(is_existing(too_many_processes, emqx_alarm:get_alarms(activated))), + ?assertMatch( + [ + #{ + activate_at := _, + activated := true, + deactivate_at := infinity, + details := #{high_watermark := 0, low_watermark := 0, usage := "0%"}, + message := <<"0% process usage">>, + name := too_many_processes + } + ], + lists:filter( + fun + (#{name := too_many_processes}) -> true; + (_) -> false + end, + Alarms + ) + ), emqx_config:put([sysmon, vm, process_high_watermark], 70), emqx_config:put([sysmon, vm, process_low_watermark], 60), timer:sleep(500), diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 09adde8bd..f794ef01d 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -126,7 +126,7 @@ lookup_node(Node) -> node_info() -> {UsedRatio, Total} = get_sys_memory(), - Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]), + Info = maps:from_list(emqx_vm:loads()), BrokerInfo = emqx_sys:info(), Info#{ node => node(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl index 64ef3c1ef..cb8d37609 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl @@ -159,18 +159,18 @@ fields(node_info) -> )}, {load1, mk( - string(), - #{desc => <<"CPU average load in 1 minute">>, example => "2.66"} + float(), + #{desc => <<"CPU average load in 1 minute">>, example => 2.66} )}, {load5, mk( - string(), - #{desc => <<"CPU average load in 5 minute">>, example => "2.66"} + float(), + #{desc => <<"CPU average load in 5 minute">>, example => 2.66} )}, {load15, mk( - string(), - #{desc => <<"CPU average load in 15 minute">>, example => "2.66"} + float(), + #{desc => <<"CPU average load in 15 minute">>, example => 2.66} )}, {max_fds, mk( diff --git a/apps/emqx_management/src/emqx_mgmt_sys_memory.erl b/apps/emqx_management/src/emqx_mgmt_sys_memory.erl index d393caabe..cc4f987b5 100644 --- a/apps/emqx_management/src/emqx_mgmt_sys_memory.erl +++ b/apps/emqx_management/src/emqx_mgmt_sys_memory.erl @@ -17,7 +17,7 @@ -behaviour(gen_server). -define(SYS_MEMORY_CACHE_KEY, ?MODULE). --define(TIMEOUT, 3000). +-define(TIMEOUT, 2200). -export([start_link/0, get_sys_memory/0, get_sys_memory/1]). -export([ diff --git a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl index 2bbdf938d..a0dbb9314 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl @@ -24,11 +24,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite([emqx_conf]), + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]), Config. end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite([emqx_conf]). + emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]). init_per_testcase(t_log_path, Config) -> emqx_config_logger:add_handler(), From c2bdb9faa7d5b10e0d00de48685ecf0e2b57920b Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Mon, 30 Jan 2023 12:48:28 +0800 Subject: [PATCH 06/12] test: multiple_nodes case failed --- apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl index a0dbb9314..03b0ea2d9 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_nodes_SUITE.erl @@ -152,7 +152,7 @@ cluster(Specs) -> Env = [{emqx, boot_modules, []}], emqx_common_test_helpers:emqx_cluster(Specs, [ {env, Env}, - {apps, [emqx_conf]}, + {apps, [emqx_conf, emqx_management]}, {load_schema, false}, {join_to, true}, {env_handler, fun From 7a9f47726718175249bc86f77a140e96b95a9075 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 1 Feb 2023 00:51:34 +0800 Subject: [PATCH 07/12] feat: use emqx_mgmt_cache to cache sys_memory --- apps/emqx_management/src/emqx_mgmt_cache.erl | 104 ++++++++++++++++++ apps/emqx_management/src/emqx_mgmt_sup.erl | 2 +- .../src/emqx_mgmt_sys_memory.erl | 79 ------------- 3 files changed, 105 insertions(+), 80 deletions(-) create mode 100644 apps/emqx_management/src/emqx_mgmt_cache.erl delete mode 100644 apps/emqx_management/src/emqx_mgmt_sys_memory.erl diff --git a/apps/emqx_management/src/emqx_mgmt_cache.erl b/apps/emqx_management/src/emqx_mgmt_cache.erl new file mode 100644 index 000000000..37f8e1367 --- /dev/null +++ b/apps/emqx_management/src/emqx_mgmt_cache.erl @@ -0,0 +1,104 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_cache). + +-behaviour(gen_server). + +-define(SYS_MEMORY_KEY, sys_memory). +-define(EXPIRED_MS, 3000). +%% -100ms to early update cache +-define(REFRESH_MS, ?EXPIRED_MS - 100). +-define(DEFAULT_BAD_MEMORY, {0, 0}). + +-export([start_link/0, get_sys_memory/0]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +get_sys_memory() -> + Now = now_millisecond(), + {CacheMem, ExpiredAt} = get_memory_from_cache(), + case Now > ExpiredAt of + true -> + erlang:send(?MODULE, fresh_sys_memory), + CacheMem; + %% stale cache value, try to recalculate + false -> + get_sys_memory_sync() + end. + +get_sys_memory_sync() -> + try + gen_server:call(?MODULE, get_sys_memory, ?EXPIRED_MS) + catch + exit:{timeout, _} -> + ?DEFAULT_BAD_MEMORY + end. + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + ets:new(?MODULE, [set, named_table, public, {keypos, 1}]), + {ok, #{fresh_at => 0}}. + +handle_call(get_sys_memory, _From, State) -> + {Mem, NewState} = fresh_sys_memory(State), + {reply, Mem, NewState}; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(fresh_sys_memory, State) -> + {_, NewState} = fresh_sys_memory(State), + {noreply, NewState}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +fresh_sys_memory(State = #{fresh_at := LastFreshAt}) -> + Now = now_millisecond(), + {Mem, ExpiredAt} = get_memory_from_cache(), + case Now >= ExpiredAt orelse Now - LastFreshAt >= ?REFRESH_MS of + true -> + %% NOTE: Now /= UpdateAt, because + %% load_ctl:get_sys_memory/0 maybe a heavy operation, + %% so record update_at timestamp after get_sys_memory/0. + NewMem = load_ctl:get_sys_memory(), + NewExpiredAt = now_millisecond() + ?EXPIRED_MS, + ets:insert(?MODULE, {?SYS_MEMORY_KEY, {NewMem, NewExpiredAt}}), + {NewMem, State#{fresh_at => Now}}; + false -> + {Mem, State} + end. + +get_memory_from_cache() -> + case ets:lookup(?MODULE, ?SYS_MEMORY_KEY) of + [] -> {?DEFAULT_BAD_MEMORY, 0}; + [{_, CacheVal}] -> CacheVal + end. + +now_millisecond() -> + erlang:system_time(millisecond). diff --git a/apps/emqx_management/src/emqx_mgmt_sup.erl b/apps/emqx_management/src/emqx_mgmt_sup.erl index fa49c02a6..2d9a9ba8a 100644 --- a/apps/emqx_management/src/emqx_mgmt_sup.erl +++ b/apps/emqx_management/src/emqx_mgmt_sup.erl @@ -26,7 +26,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - LC = child_spec(emqx_mgmt_sys_memory, 5000, worker), + LC = child_spec(emqx_mgmt_cache, 5000, worker), {ok, {{one_for_one, 1, 5}, [LC]}}. child_spec(Mod, Shutdown, Type) -> diff --git a/apps/emqx_management/src/emqx_mgmt_sys_memory.erl b/apps/emqx_management/src/emqx_mgmt_sys_memory.erl deleted file mode 100644 index cc4f987b5..000000000 --- a/apps/emqx_management/src/emqx_mgmt_sys_memory.erl +++ /dev/null @@ -1,79 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_mgmt_sys_memory). - --behaviour(gen_server). --define(SYS_MEMORY_CACHE_KEY, ?MODULE). --define(TIMEOUT, 2200). - --export([start_link/0, get_sys_memory/0, get_sys_memory/1]). --export([ - init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3 -]). - -get_sys_memory() -> - get_sys_memory(?TIMEOUT). - -get_sys_memory(Timeout) -> - try - gen_server:call(?MODULE, get_sys_memory, Timeout) - catch - exit:{timeout, _} -> - get_memory_from_cache() - end. - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -init([]) -> - {ok, #{last_time => 0}}. - -handle_call(get_sys_memory, _From, State = #{last_time := LastTime}) -> - Now = erlang:system_time(millisecond), - case Now - LastTime >= ?TIMEOUT of - true -> - Memory = load_ctl:get_sys_memory(), - persistent_term:put(?SYS_MEMORY_CACHE_KEY, Memory), - {reply, Memory, State#{last_time => Now}}; - false -> - {reply, get_memory_from_cache(), State} - end; -handle_call(_Request, _From, State = #{}) -> - {reply, ok, State}. - -handle_cast(_Request, State = #{}) -> - {noreply, State}. - -handle_info(_Info, State = #{}) -> - {noreply, State}. - -terminate(_Reason, _State = #{}) -> - ok. - -code_change(_OldVsn, State = #{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -get_memory_from_cache() -> - persistent_term:get(?SYS_MEMORY_CACHE_KEY, {0, 0}). From 71f00f2962a3cbd6c7bd50f2503b3c8cbf41932c Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 1 Feb 2023 10:07:09 +0800 Subject: [PATCH 08/12] test: ctl suite failed --- apps/emqx_management/src/emqx_mgmt_cli.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 0e7506a0b..442d5c7de 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -315,7 +315,7 @@ vm([]) -> vm(["all"]) -> [vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]]; vm(["load"]) -> - [emqx_ctl:print("cpu/~-20s: ~ts~n", [L, V]) || {L, V} <- emqx_vm:loads()]; + [emqx_ctl:print("cpu/~-20s: ~w~n", [L, V]) || {L, V} <- emqx_vm:loads()]; vm(["memory"]) -> [emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()]; vm(["process"]) -> From 56b9238645bf39072fec1f36e5cf69d4799b3538 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 1 Feb 2023 12:21:34 +0800 Subject: [PATCH 09/12] fix: only cache sys_memory in linux --- apps/emqx_management/src/emqx_mgmt.erl | 2 +- apps/emqx_management/src/emqx_mgmt_sup.erl | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index f794ef01d..814b39cdc 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -150,7 +150,7 @@ node_info() -> get_sys_memory() -> case os:type() of {unix, linux} -> - emqx_mgmt_sys_memory:get_sys_memory(); + emqx_mgmt_cache:get_sys_memory(); _ -> {0, 0} end. diff --git a/apps/emqx_management/src/emqx_mgmt_sup.erl b/apps/emqx_management/src/emqx_mgmt_sup.erl index 2d9a9ba8a..713ff87dc 100644 --- a/apps/emqx_management/src/emqx_mgmt_sup.erl +++ b/apps/emqx_management/src/emqx_mgmt_sup.erl @@ -26,8 +26,14 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - LC = child_spec(emqx_mgmt_cache, 5000, worker), - {ok, {{one_for_one, 1, 5}, [LC]}}. + Workers = + case os:type() of + {unix, linux} -> + [child_spec(emqx_mgmt_cache, 5000, worker)]; + _ -> + [] + end, + {ok, {{one_for_one, 1, 5}, Workers}}. child_spec(Mod, Shutdown, Type) -> #{ From ced55719ef2fd8b93fe021c6434fcfe219a0e58d Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 1 Feb 2023 14:47:41 +0800 Subject: [PATCH 10/12] chore: only run t_sys_mem_check ct in linux --- apps/emqx/test/emqx_os_mon_SUITE.erl | 39 ++++++++++++++++------------ 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/apps/emqx/test/emqx_os_mon_SUITE.erl b/apps/emqx/test/emqx_os_mon_SUITE.erl index 0c5a1f261..0538d949a 100644 --- a/apps/emqx/test/emqx_os_mon_SUITE.erl +++ b/apps/emqx/test/emqx_os_mon_SUITE.erl @@ -32,8 +32,6 @@ end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([]). init_per_testcase(t_cpu_check_alarm, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), SysMon = emqx_config:get([sysmon, os], #{}), emqx_config:put([sysmon, os], SysMon#{ cpu_high_watermark => 0.9, @@ -45,19 +43,20 @@ init_per_testcase(t_cpu_check_alarm, Config) -> {ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon), Config; init_per_testcase(t_sys_mem_check_alarm, Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), - SysMon = emqx_config:get([sysmon, os], #{}), - emqx_config:put([sysmon, os], SysMon#{ - sysmem_high_watermark => 0.51, - %% 200ms - mem_check_interval => 200 - }), - ok = meck:new(os, [non_strict, no_link, no_history, passthrough, unstick]), - ok = meck:expect(os, type, fun() -> {unix, linux} end), - ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon), - {ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon), - Config; + case os:type() of + {unix, linux} -> + SysMon = emqx_config:get([sysmon, os], #{}), + emqx_config:put([sysmon, os], SysMon#{ + sysmem_high_watermark => 0.51, + %% 200ms + mem_check_interval => 200 + }), + ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon), + {ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon), + Config; + _ -> + Config + end; init_per_testcase(_, Config) -> emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:start_apps([]), @@ -87,7 +86,15 @@ t_api(_) -> gen_server:stop(emqx_os_mon), ok. -t_sys_mem_check_alarm(_) -> +t_sys_mem_check_alarm(Config) -> + case os:type() of + {unix, linux} -> + do_sys_mem_check_alarm(Config); + _ -> + skip + end. + +do_sys_mem_check_alarm(_Config) -> emqx_config:put([sysmon, os, mem_check_interval], 200), emqx_os_mon:update(emqx_config:get([sysmon, os])), Mem = 0.52345, From 3c4d09a752cb4debbf72929511f47e56a6720bb3 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 1 Feb 2023 16:00:55 +0800 Subject: [PATCH 11/12] fix: get_memory_cache return {ok, Mem} | stale --- apps/emqx_management/src/emqx_mgmt_cache.erl | 40 +++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_cache.erl b/apps/emqx_management/src/emqx_mgmt_cache.erl index 37f8e1367..05736a33c 100644 --- a/apps/emqx_management/src/emqx_mgmt_cache.erl +++ b/apps/emqx_management/src/emqx_mgmt_cache.erl @@ -27,14 +27,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). get_sys_memory() -> - Now = now_millisecond(), - {CacheMem, ExpiredAt} = get_memory_from_cache(), - case Now > ExpiredAt of - true -> + case get_memory_from_cache() of + {ok, CacheMem} -> erlang:send(?MODULE, fresh_sys_memory), CacheMem; - %% stale cache value, try to recalculate - false -> + stale -> get_sys_memory_sync() end. @@ -80,24 +77,31 @@ code_change(_OldVsn, State, _Extra) -> fresh_sys_memory(State = #{fresh_at := LastFreshAt}) -> Now = now_millisecond(), - {Mem, ExpiredAt} = get_memory_from_cache(), - case Now >= ExpiredAt orelse Now - LastFreshAt >= ?REFRESH_MS of + case Now - LastFreshAt >= ?REFRESH_MS of true -> - %% NOTE: Now /= UpdateAt, because - %% load_ctl:get_sys_memory/0 maybe a heavy operation, - %% so record update_at timestamp after get_sys_memory/0. - NewMem = load_ctl:get_sys_memory(), - NewExpiredAt = now_millisecond() + ?EXPIRED_MS, - ets:insert(?MODULE, {?SYS_MEMORY_KEY, {NewMem, NewExpiredAt}}), - {NewMem, State#{fresh_at => Now}}; + do_fresh_sys_memory(Now, State); false -> - {Mem, State} + case get_memory_from_cache() of + stale -> do_fresh_sys_memory(Now, State); + {ok, Mem} -> {Mem, State} + end end. +do_fresh_sys_memory(FreshAt, State) -> + NewMem = load_ctl:get_sys_memory(), + NewExpiredAt = now_millisecond() + ?EXPIRED_MS, + ets:insert(?MODULE, {?SYS_MEMORY_KEY, {NewMem, NewExpiredAt}}), + {NewMem, State#{fresh_at => FreshAt}}. + get_memory_from_cache() -> case ets:lookup(?MODULE, ?SYS_MEMORY_KEY) of - [] -> {?DEFAULT_BAD_MEMORY, 0}; - [{_, CacheVal}] -> CacheVal + [] -> + stale; + [{_, {Mem, ExpiredAt}}] -> + case now_millisecond() < ExpiredAt of + true -> {ok, Mem}; + false -> stale + end end. now_millisecond() -> From 638291503675fc1fa33a06f55790e23bdd2fa826 Mon Sep 17 00:00:00 2001 From: Zhongwen Deng Date: Wed, 1 Feb 2023 17:36:30 +0800 Subject: [PATCH 12/12] chore: repalce fresh by refresh --- apps/emqx_management/src/emqx_mgmt_cache.erl | 22 ++++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_cache.erl b/apps/emqx_management/src/emqx_mgmt_cache.erl index 05736a33c..9b3cd4f56 100644 --- a/apps/emqx_management/src/emqx_mgmt_cache.erl +++ b/apps/emqx_management/src/emqx_mgmt_cache.erl @@ -29,7 +29,7 @@ get_sys_memory() -> case get_memory_from_cache() of {ok, CacheMem} -> - erlang:send(?MODULE, fresh_sys_memory), + erlang:send(?MODULE, refresh_sys_memory), CacheMem; stale -> get_sys_memory_sync() @@ -48,10 +48,10 @@ start_link() -> init([]) -> ets:new(?MODULE, [set, named_table, public, {keypos, 1}]), - {ok, #{fresh_at => 0}}. + {ok, #{latest_refresh => 0}}. handle_call(get_sys_memory, _From, State) -> - {Mem, NewState} = fresh_sys_memory(State), + {Mem, NewState} = refresh_sys_memory(State), {reply, Mem, NewState}; handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -59,8 +59,8 @@ handle_call(_Request, _From, State) -> handle_cast(_Request, State) -> {noreply, State}. -handle_info(fresh_sys_memory, State) -> - {_, NewState} = fresh_sys_memory(State), +handle_info(refresh_sys_memory, State) -> + {_, NewState} = refresh_sys_memory(State), {noreply, NewState}; handle_info(_Info, State) -> {noreply, State}. @@ -75,23 +75,23 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== -fresh_sys_memory(State = #{fresh_at := LastFreshAt}) -> +refresh_sys_memory(State = #{latest_refresh := LatestRefresh}) -> Now = now_millisecond(), - case Now - LastFreshAt >= ?REFRESH_MS of + case Now - LatestRefresh >= ?REFRESH_MS of true -> - do_fresh_sys_memory(Now, State); + do_refresh_sys_memory(Now, State); false -> case get_memory_from_cache() of - stale -> do_fresh_sys_memory(Now, State); + stale -> do_refresh_sys_memory(Now, State); {ok, Mem} -> {Mem, State} end end. -do_fresh_sys_memory(FreshAt, State) -> +do_refresh_sys_memory(RefreshAt, State) -> NewMem = load_ctl:get_sys_memory(), NewExpiredAt = now_millisecond() + ?EXPIRED_MS, ets:insert(?MODULE, {?SYS_MEMORY_KEY, {NewMem, NewExpiredAt}}), - {NewMem, State#{fresh_at => FreshAt}}. + {NewMem, State#{latest_refresh => RefreshAt}}. get_memory_from_cache() -> case ets:lookup(?MODULE, ?SYS_MEMORY_KEY) of