Merge pull request #9856 from zhongwencool/alarm-percent-decimals

fix: /api/nodes is timeout when emqx in high load
This commit is contained in:
zhongwencool 2023-02-01 19:40:53 +08:00 committed by GitHub
commit 13511d2782
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 376 additions and 65 deletions

View File

@ -325,19 +325,20 @@ deactivate_alarm(
false ->
ok
end,
Now = erlang:system_time(microsecond),
HistoryAlarm = make_deactivated_alarm(
ActivateAt,
Name,
Details0,
Msg0,
erlang:system_time(microsecond)
Now
),
DeActAlarm = make_deactivated_alarm(
ActivateAt,
Name,
Details,
normalize_message(Name, iolist_to_binary(Message)),
erlang:system_time(microsecond)
Now
),
mria:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
mria:dirty_delete(?ACTIVATED_ALARM, Name),

View File

@ -93,9 +93,9 @@ init([]) ->
%% memsup is not reliable, ignore
memsup:set_sysmem_high_watermark(1.0),
SysHW = init_os_monitor(),
_ = start_mem_check_timer(),
_ = start_cpu_check_timer(),
{ok, #{sysmem_high_watermark => SysHW}}.
MemRef = start_mem_check_timer(),
CpuRef = start_cpu_check_timer(),
{ok, #{sysmem_high_watermark => SysHW, mem_time_ref => MemRef, cpu_time_ref => CpuRef}}.
init_os_monitor() ->
init_os_monitor(emqx:get_config([sysmon, os])).
@ -125,13 +125,15 @@ handle_cast(Msg, State) ->
handle_info({timeout, _Timer, mem_check}, #{sysmem_high_watermark := HWM} = State) ->
ok = update_mem_alarm_status(HWM),
ok = start_mem_check_timer(),
{noreply, State};
Ref = start_mem_check_timer(),
{noreply, State#{mem_time_ref => Ref}};
handle_info({timeout, _Timer, cpu_check}, State) ->
CPUHighWatermark = emqx:get_config([sysmon, os, cpu_high_watermark]) * 100,
CPULowWatermark = emqx:get_config([sysmon, os, cpu_low_watermark]) * 100,
case emqx_vm:cpu_util() of
0 ->
CPUVal = emqx_vm:cpu_util(),
case CPUVal of
%% 0 or 0.0
Busy when Busy == 0 ->
ok;
Busy when Busy > CPUHighWatermark ->
_ = emqx_alarm:activate(
@ -156,11 +158,14 @@ handle_info({timeout, _Timer, cpu_check}, State) ->
_Busy ->
ok
end,
ok = start_cpu_check_timer(),
{noreply, State};
handle_info({monitor_conf_update, OS}, _State) ->
Ref = start_cpu_check_timer(),
{noreply, State#{cpu_time_ref => Ref}};
handle_info({monitor_conf_update, OS}, State) ->
cancel_outdated_timer(State),
SysHW = init_os_monitor(OS),
{noreply, #{sysmem_high_watermark => SysHW}};
MemRef = start_mem_check_timer(),
CpuRef = start_cpu_check_timer(),
{noreply, #{sysmem_high_watermark => SysHW, mem_time_ref => MemRef, cpu_time_ref => CpuRef}};
handle_info(Info, State) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{noreply, State}.
@ -174,11 +179,15 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
cancel_outdated_timer(#{mem_time_ref := MemRef, cpu_time_ref := CpuRef}) ->
emqx_misc:cancel_timer(MemRef),
emqx_misc:cancel_timer(CpuRef),
ok.
start_cpu_check_timer() ->
Interval = emqx:get_config([sysmon, os, cpu_check_interval]),
case erlang:system_info(system_architecture) of
"x86_64-pc-linux-musl" -> ok;
"x86_64-pc-linux-musl" -> undefined;
_ -> start_timer(Interval, cpu_check)
end.
@ -191,12 +200,11 @@ start_mem_check_timer() ->
true ->
start_timer(Interval, mem_check);
false ->
ok
undefined
end.
start_timer(Interval, Msg) ->
_ = emqx_misc:start_timer(Interval, Msg),
ok.
emqx_misc:start_timer(Interval, Msg).
update_mem_alarm_status(HWM) when HWM > 1.0 orelse HWM < 0.0 ->
?SLOG(warning, #{msg => "discarded_out_of_range_mem_alarm_threshold", value => HWM}),
@ -223,7 +231,7 @@ do_update_mem_alarm_status(HWM0) ->
},
usage_msg(Usage, mem)
);
_ ->
false ->
ok = emqx_alarm:ensure_deactivated(
high_system_memory_usage,
#{
@ -236,5 +244,5 @@ do_update_mem_alarm_status(HWM0) ->
ok.
usage_msg(Usage, What) ->
%% devide by 1.0 to ensure float point number
%% divide by 1.0 to ensure float point number
iolist_to_binary(io_lib:format("~.2f% ~p usage", [Usage / 1.0, What])).

View File

@ -175,9 +175,9 @@ schedulers() ->
loads() ->
[
{load1, ftos(avg1() / 256)},
{load5, ftos(avg5() / 256)},
{load15, ftos(avg15() / 256)}
{load1, load(avg1())},
{load5, load(avg5())},
{load15, load(avg15())}
].
system_info_keys() -> ?SYSTEM_INFO_KEYS.
@ -232,9 +232,6 @@ mem_info() ->
Free = proplists:get_value(free_memory, Dataset),
[{total_memory, Total}, {used_memory, Total - Free}].
ftos(F) ->
io_lib:format("~.2f", [F / 1.0]).
%%%% erlang vm scheduler_usage fun copied from recon
scheduler_usage(Interval) when is_integer(Interval) ->
%% We start and stop the scheduler_wall_time system flag
@ -391,18 +388,32 @@ cpu_util() ->
compat_windows(Fun) ->
case os:type() of
{win32, nt} ->
0;
0.0;
_Type ->
case catch Fun() of
Val when is_float(Val) -> floor(Val * 100) / 100;
Val when is_number(Val) -> Val;
_Error -> 0
_Error -> 0.0
end
end.
%% @doc Return on which Eralng/OTP the current vm is running.
%% NOTE: This API reads a file, do not use it in critical code paths.
load(Avg) ->
floor((Avg / 256) * 100) / 100.
%% @doc Return on which Erlang/OTP the current vm is running.
%% The dashboard's /api/nodes endpoint will call this function frequently.
%% we should avoid reading file every time.
%% The OTP version never changes at runtime expect upgrade erts,
%% so we cache it in a persistent term for performance.
get_otp_version() ->
read_otp_version().
case persistent_term:get(emqx_otp_version, undefined) of
undefined ->
OtpVsn = read_otp_version(),
persistent_term:put(emqx_otp_version, OtpVsn),
OtpVsn;
OtpVsn when is_binary(OtpVsn) ->
OtpVsn
end.
read_otp_version() ->
ReleasesDir = filename:join([code:root_dir(), "releases"]),
@ -416,6 +427,8 @@ read_otp_version() ->
%% running tests etc.
OtpMajor = erlang:system_info(otp_release),
OtpVsnFile = filename:join([ReleasesDir, OtpMajor, "OTP_VERSION"]),
{ok, Vsn} = file:read_file(OtpVsnFile),
Vsn
case file:read_file(OtpVsnFile) of
{ok, Vsn} -> Vsn;
{error, enoent} -> list_to_binary(OtpMajor)
end
end.

View File

@ -63,7 +63,7 @@ handle_info({timeout, _Timer, check}, State) ->
ProcessCount = erlang:system_info(process_count),
case ProcessCount / erlang:system_info(process_limit) of
Percent when Percent > ProcHighWatermark ->
Usage = io_lib:format("~p%", [Percent * 100]),
Usage = usage(Percent),
Message = [Usage, " process usage"],
emqx_alarm:activate(
too_many_processes,
@ -75,7 +75,7 @@ handle_info({timeout, _Timer, check}, State) ->
Message
);
Percent when Percent < ProcLowWatermark ->
Usage = io_lib:format("~p%", [Percent * 100]),
Usage = usage(Percent),
Message = [Usage, " process usage"],
emqx_alarm:ensure_deactivated(
too_many_processes,
@ -108,3 +108,6 @@ code_change(_OldVsn, State, _Extra) ->
start_check_timer() ->
Interval = emqx:get_config([sysmon, vm, process_check_interval]),
emqx_misc:start_timer(Interval, check).
usage(Percent) ->
integer_to_list(floor(Percent * 100)) ++ "%".

View File

@ -25,25 +25,43 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps(
[],
fun
(emqx) ->
application:set_env(emqx, os_mon, [
{cpu_check_interval, 1},
{cpu_high_watermark, 5},
{cpu_low_watermark, 80},
{procmem_high_watermark, 5}
]);
(_) ->
ok
end
),
emqx_common_test_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(t_cpu_check_alarm, Config) ->
SysMon = emqx_config:get([sysmon, os], #{}),
emqx_config:put([sysmon, os], SysMon#{
cpu_high_watermark => 0.9,
cpu_low_watermark => 0,
%% 200ms
cpu_check_interval => 200
}),
ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
Config;
init_per_testcase(t_sys_mem_check_alarm, Config) ->
case os:type() of
{unix, linux} ->
SysMon = emqx_config:get([sysmon, os], #{}),
emqx_config:put([sysmon, os], SysMon#{
sysmem_high_watermark => 0.51,
%% 200ms
mem_check_interval => 200
}),
ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
Config;
_ ->
Config
end;
init_per_testcase(_, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
t_api(_) ->
?assertEqual(60000, emqx_os_mon:get_mem_check_interval()),
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30000)),
@ -67,3 +85,106 @@ t_api(_) ->
emqx_os_mon ! ignored,
gen_server:stop(emqx_os_mon),
ok.
t_sys_mem_check_alarm(Config) ->
case os:type() of
{unix, linux} ->
do_sys_mem_check_alarm(Config);
_ ->
skip
end.
do_sys_mem_check_alarm(_Config) ->
emqx_config:put([sysmon, os, mem_check_interval], 200),
emqx_os_mon:update(emqx_config:get([sysmon, os])),
Mem = 0.52345,
Usage = floor(Mem * 10000) / 100,
emqx_common_test_helpers:with_mock(
load_ctl,
get_memory_usage,
fun() -> Mem end,
fun() ->
timer:sleep(500),
Alarms = emqx_alarm:get_alarms(activated),
?assert(
emqx_vm_mon_SUITE:is_existing(
high_system_memory_usage, emqx_alarm:get_alarms(activated)
),
#{
load_ctl_memory => load_ctl:get_memory_usage(),
config => emqx_config:get([sysmon, os]),
process => sys:get_state(emqx_os_mon),
alarms => Alarms
}
),
[
#{
activate_at := _,
activated := true,
deactivate_at := infinity,
details := #{high_watermark := 51.0, usage := RealUsage},
message := Msg,
name := high_system_memory_usage
}
] =
lists:filter(
fun
(#{name := high_system_memory_usage}) -> true;
(_) -> false
end,
Alarms
),
?assert(RealUsage >= Usage, {RealUsage, Usage}),
?assert(is_binary(Msg)),
emqx_config:put([sysmon, os, sysmem_high_watermark], 0.99999),
ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
timer:sleep(600),
Activated = emqx_alarm:get_alarms(activated),
?assertNot(
emqx_vm_mon_SUITE:is_existing(high_system_memory_usage, Activated),
#{activated => Activated, process_state => sys:get_state(emqx_os_mon)}
)
end
).
t_cpu_check_alarm(_) ->
CpuUtil = 90.12345,
Usage = floor(CpuUtil * 100) / 100,
emqx_common_test_helpers:with_mock(
cpu_sup,
util,
fun() -> CpuUtil end,
fun() ->
timer:sleep(500),
Alarms = emqx_alarm:get_alarms(activated),
?assert(
emqx_vm_mon_SUITE:is_existing(high_cpu_usage, emqx_alarm:get_alarms(activated))
),
[
#{
activate_at := _,
activated := true,
deactivate_at := infinity,
details := #{high_watermark := 90.0, low_watermark := 0, usage := RealUsage},
message := Msg,
name := high_cpu_usage
}
] =
lists:filter(
fun
(#{name := high_cpu_usage}) -> true;
(_) -> false
end,
Alarms
),
?assert(RealUsage >= Usage, {RealUsage, Usage}),
?assert(is_binary(Msg)),
emqx_config:put([sysmon, os, cpu_high_watermark], 1),
emqx_config:put([sysmon, os, cpu_low_watermark], 0.96),
timer:sleep(500),
?assertNot(
emqx_vm_mon_SUITE:is_existing(high_cpu_usage, emqx_alarm:get_alarms(activated))
)
end
).

View File

@ -24,7 +24,24 @@
all() -> emqx_common_test_helpers:all(?MODULE).
t_load(_Config) ->
?assertMatch([{load1, _}, {load5, _}, {load15, _}], emqx_vm:loads()).
lists:foreach(
fun({Avg, LoadKey, Int}) ->
emqx_common_test_helpers:with_mock(
cpu_sup,
Avg,
fun() -> Int end,
fun() ->
Load = proplists:get_value(LoadKey, emqx_vm:loads()),
?assertEqual(Int / 256, Load)
end
)
end,
[{avg1, load1, 0}, {avg5, load5, 128}, {avg15, load15, 256}]
),
?assertMatch(
[{load1, _}, {load5, _}, {load15, _}],
emqx_vm:loads()
).
t_systeminfo(_Config) ->
?assertEqual(

View File

@ -23,13 +23,13 @@
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_testcase(t_alarms, Config) ->
init_per_testcase(t_too_many_processes_alarm, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
emqx_config:put([sysmon, vm], #{
process_high_watermark => 0,
process_low_watermark => 0,
%% 1s
%% 100ms
process_check_interval => 100
}),
ok = supervisor:terminate_child(emqx_sys_sup, emqx_vm_mon),
@ -43,9 +43,29 @@ init_per_testcase(_, Config) ->
end_per_testcase(_, _Config) ->
emqx_common_test_helpers:stop_apps([]).
t_alarms(_) ->
t_too_many_processes_alarm(_) ->
timer:sleep(500),
Alarms = emqx_alarm:get_alarms(activated),
?assert(is_existing(too_many_processes, emqx_alarm:get_alarms(activated))),
?assertMatch(
[
#{
activate_at := _,
activated := true,
deactivate_at := infinity,
details := #{high_watermark := 0, low_watermark := 0, usage := "0%"},
message := <<"0% process usage">>,
name := too_many_processes
}
],
lists:filter(
fun
(#{name := too_many_processes}) -> true;
(_) -> false
end,
Alarms
)
),
emqx_config:put([sysmon, vm, process_high_watermark], 70),
emqx_config:put([sysmon, vm, process_low_watermark], 60),
timer:sleep(500),

View File

@ -126,7 +126,7 @@ lookup_node(Node) ->
node_info() ->
{UsedRatio, Total} = get_sys_memory(),
Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]),
Info = maps:from_list(emqx_vm:loads()),
BrokerInfo = emqx_sys:info(),
Info#{
node => node(),
@ -150,7 +150,7 @@ node_info() ->
get_sys_memory() ->
case os:type() of
{unix, linux} ->
load_ctl:get_sys_memory();
emqx_mgmt_cache:get_sys_memory();
_ ->
{0, 0}
end.

View File

@ -159,18 +159,18 @@ fields(node_info) ->
)},
{load1,
mk(
string(),
#{desc => <<"CPU average load in 1 minute">>, example => "2.66"}
float(),
#{desc => <<"CPU average load in 1 minute">>, example => 2.66}
)},
{load5,
mk(
string(),
#{desc => <<"CPU average load in 5 minute">>, example => "2.66"}
float(),
#{desc => <<"CPU average load in 5 minute">>, example => 2.66}
)},
{load15,
mk(
string(),
#{desc => <<"CPU average load in 15 minute">>, example => "2.66"}
float(),
#{desc => <<"CPU average load in 15 minute">>, example => 2.66}
)},
{max_fds,
mk(

View File

@ -0,0 +1,108 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_cache).
-behaviour(gen_server).
-define(SYS_MEMORY_KEY, sys_memory).
-define(EXPIRED_MS, 3000).
%% -100ms to early update cache
-define(REFRESH_MS, ?EXPIRED_MS - 100).
-define(DEFAULT_BAD_MEMORY, {0, 0}).
-export([start_link/0, get_sys_memory/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
get_sys_memory() ->
case get_memory_from_cache() of
{ok, CacheMem} ->
erlang:send(?MODULE, refresh_sys_memory),
CacheMem;
stale ->
get_sys_memory_sync()
end.
get_sys_memory_sync() ->
try
gen_server:call(?MODULE, get_sys_memory, ?EXPIRED_MS)
catch
exit:{timeout, _} ->
?DEFAULT_BAD_MEMORY
end.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
ets:new(?MODULE, [set, named_table, public, {keypos, 1}]),
{ok, #{latest_refresh => 0}}.
handle_call(get_sys_memory, _From, State) ->
{Mem, NewState} = refresh_sys_memory(State),
{reply, Mem, NewState};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(refresh_sys_memory, State) ->
{_, NewState} = refresh_sys_memory(State),
{noreply, NewState};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
refresh_sys_memory(State = #{latest_refresh := LatestRefresh}) ->
Now = now_millisecond(),
case Now - LatestRefresh >= ?REFRESH_MS of
true ->
do_refresh_sys_memory(Now, State);
false ->
case get_memory_from_cache() of
stale -> do_refresh_sys_memory(Now, State);
{ok, Mem} -> {Mem, State}
end
end.
do_refresh_sys_memory(RefreshAt, State) ->
NewMem = load_ctl:get_sys_memory(),
NewExpiredAt = now_millisecond() + ?EXPIRED_MS,
ets:insert(?MODULE, {?SYS_MEMORY_KEY, {NewMem, NewExpiredAt}}),
{NewMem, State#{latest_refresh => RefreshAt}}.
get_memory_from_cache() ->
case ets:lookup(?MODULE, ?SYS_MEMORY_KEY) of
[] ->
stale;
[{_, {Mem, ExpiredAt}}] ->
case now_millisecond() < ExpiredAt of
true -> {ok, Mem};
false -> stale
end
end.
now_millisecond() ->
erlang:system_time(millisecond).

View File

@ -315,7 +315,7 @@ vm([]) ->
vm(["all"]) ->
[vm([Name]) || Name <- ["load", "memory", "process", "io", "ports"]];
vm(["load"]) ->
[emqx_ctl:print("cpu/~-20s: ~ts~n", [L, V]) || {L, V} <- emqx_vm:loads()];
[emqx_ctl:print("cpu/~-20s: ~w~n", [L, V]) || {L, V} <- emqx_vm:loads()];
vm(["memory"]) ->
[emqx_ctl:print("memory/~-17s: ~w~n", [Cat, Val]) || {Cat, Val} <- erlang:memory()];
vm(["process"]) ->

View File

@ -26,4 +26,21 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, {{one_for_one, 1, 5}, []}}.
Workers =
case os:type() of
{unix, linux} ->
[child_spec(emqx_mgmt_cache, 5000, worker)];
_ ->
[]
end,
{ok, {{one_for_one, 1, 5}, Workers}}.
child_spec(Mod, Shutdown, Type) ->
#{
id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => Shutdown,
type => Type,
modules => [Mod]
}.

View File

@ -40,6 +40,9 @@ t_alarms_api(_) ->
get_alarms(1, true),
get_alarms(1, false).
t_alarm_cpu(_) ->
ok.
t_delete_alarms_api(_) ->
Path = emqx_mgmt_api_test_util:api_path(["alarms"]),
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, Path),

View File

@ -24,11 +24,11 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
Config.
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf]).
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
init_per_testcase(t_log_path, Config) ->
emqx_config_logger:add_handler(),
@ -152,7 +152,7 @@ cluster(Specs) ->
Env = [{emqx, boot_modules, []}],
emqx_common_test_helpers:emqx_cluster(Specs, [
{env, Env},
{apps, [emqx_conf]},
{apps, [emqx_conf, emqx_management]},
{load_schema, false},
{join_to, true},
{env_handler, fun