diff --git a/apps/emqx/src/emqx_alarm_handler.erl b/apps/emqx/src/emqx_alarm_handler.erl index a69913afd..2307b79db 100644 --- a/apps/emqx/src/emqx_alarm_handler.erl +++ b/apps/emqx/src/emqx_alarm_handler.erl @@ -56,20 +56,22 @@ init({_Args, {alarm_handler, _ExistingAlarms}}) -> init(_) -> {ok, []}. -handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> - emqx_alarm:activate(high_system_memory_usage, #{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}), +handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> + emqx_alarm:activate(high_system_memory_usage, + #{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}), {ok, State}; -handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> - emqx_alarm:activate(high_process_memory_usage, #{pid => Pid, - high_watermark => emqx_os_mon:get_procmem_high_watermark()}), +handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> + emqx_alarm:activate(high_process_memory_usage, + #{pid => list_to_binary(pid_to_list(Pid)), + high_watermark => emqx_os_mon:get_procmem_high_watermark()}), {ok, State}; -handle_event({clear_alarm, system_memory_high_watermark}, State) -> +handle_event({clear_alarm, system_memory_high_watermark}, State) -> emqx_alarm:deactivate(high_system_memory_usage), {ok, State}; -handle_event({clear_alarm, process_memory_high_watermark}, State) -> +handle_event({clear_alarm, process_memory_high_watermark}, State) -> emqx_alarm:deactivate(high_process_memory_usage), {ok, State}; diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index d6579cac9..8768586ce 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -22,7 +22,7 @@ -logger_header("[OS_MON]"). --export([start_link/1]). +-export([start_link/0]). -export([ get_cpu_check_interval/0 , set_cpu_check_interval/1 @@ -51,8 +51,8 @@ -define(OS_MON, ?MODULE). -start_link(Opts) -> - gen_server:start_link({local, ?OS_MON}, ?MODULE, [Opts], []). +start_link() -> + gen_server:start_link({local, ?OS_MON}, ?MODULE, [], []). %%-------------------------------------------------------------------- %% API @@ -88,13 +88,13 @@ get_sysmem_high_watermark() -> memsup:get_sysmem_high_watermark(). set_sysmem_high_watermark(Float) -> - memsup:set_sysmem_high_watermark(Float / 100). + memsup:set_sysmem_high_watermark(Float). get_procmem_high_watermark() -> memsup:get_procmem_high_watermark(). set_procmem_high_watermark(Float) -> - memsup:set_procmem_high_watermark(Float / 100). + memsup:set_procmem_high_watermark(Float). call(Req) -> gen_server:call(?OS_MON, Req, infinity). @@ -103,14 +103,13 @@ call(Req) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Opts]) -> - set_mem_check_interval(proplists:get_value(mem_check_interval, Opts)), - set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts)), - set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts)), - {ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts), - cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts), - cpu_check_interval => proplists:get_value(cpu_check_interval, Opts), - timer => undefined})}. +init([]) -> + Opts = emqx_config:get([sysmon, os]), + set_mem_check_interval(maps:get(mem_check_interval, Opts)), + set_sysmem_high_watermark(maps:get(sysmem_high_watermark, Opts)), + set_procmem_high_watermark(maps:get(procmem_high_watermark, Opts)), + start_check_timer(), + {ok, #{}}. handle_call(get_cpu_check_interval, _From, State) -> {reply, maps:get(cpu_check_interval, State, undefined), State}; @@ -138,32 +137,30 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({timeout, Timer, check}, State = #{timer := Timer, - cpu_high_watermark := CPUHighWatermark, - cpu_low_watermark := CPULowWatermark}) -> - NState = +handle_info({timeout, _Timer, check}, State) -> + CPUHighWatermark = emqx_config:get([sysmon, os, cpu_high_watermark]) * 100, + CPULowWatermark = emqx_config:get([sysmon, os, cpu_low_watermark]) * 100, case emqx_vm:cpu_util() of %% TODO: should be improved? - 0 -> - State#{timer := undefined}; + 0 -> ok; Busy when Busy >= CPUHighWatermark -> emqx_alarm:activate(high_cpu_usage, #{usage => Busy, high_watermark => CPUHighWatermark, low_watermark => CPULowWatermark}), - ensure_check_timer(State); + start_check_timer(); Busy when Busy =< CPULowWatermark -> emqx_alarm:deactivate(high_cpu_usage), - ensure_check_timer(State); + start_check_timer(); _Busy -> - ensure_check_timer(State) + start_check_timer() end, - {noreply, NState}; + {noreply, State}; handle_info(Info, State) -> ?LOG(error, "unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #{timer := Timer}) -> - emqx_misc:cancel_timer(Timer). +terminate(_Reason, _State) -> + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -172,8 +169,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -ensure_check_timer(State = #{cpu_check_interval := Interval}) -> +start_check_timer() -> + Interval = emqx_config:get([sysmon, os, cpu_check_interval]), case erlang:system_info(system_architecture) of - "x86_64-pc-linux-musl" -> State; - _ -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)} + "x86_64-pc-linux-musl" -> ok; + _ -> emqx_misc:start_timer(timer:seconds(Interval), check) end. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 76993bb94..f4900d36b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -481,7 +481,7 @@ fields("sysmon_os") -> ]; fields("alarm") -> - [ {"actions", t(comma_separated_list(), undefined, "log,publish")} + [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} , {"size_limit", t(integer(), undefined, 1000)} , {"validity_period", t(duration_s(), undefined, "24h")} ]; diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index 2d816569d..6b71b7807 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -107,12 +107,12 @@ datetime() -> %% @doc Get sys interval -spec(sys_interval() -> pos_integer()). sys_interval() -> - emqx:get_env(broker_sys_interval, 60000). + emqx_config:get([broker, sys_msg_interval]). %% @doc Get sys heatbeat interval -spec(sys_heatbeat_interval() -> pos_integer()). sys_heatbeat_interval() -> - emqx:get_env(broker_sys_heartbeat, 30000). + emqx_config:get([broker, sys_heartbeat_interval]). %% @doc Get sys info -spec(info() -> list(tuple())). diff --git a/apps/emqx/src/emqx_sys_sup.erl b/apps/emqx/src/emqx_sys_sup.erl index e9d968d5a..61342fd0e 100644 --- a/apps/emqx/src/emqx_sys_sup.erl +++ b/apps/emqx/src/emqx_sys_sup.erl @@ -29,8 +29,8 @@ init([]) -> Childs = [child_spec(emqx_sys), child_spec(emqx_alarm), child_spec(emqx_sys_mon), - child_spec(emqx_os_mon, [config(os_mon)]), - child_spec(emqx_vm_mon, [config(vm_mon)])], + child_spec(emqx_os_mon), + child_spec(emqx_vm_mon)], {ok, {{one_for_one, 10, 100}, Childs}}. %%-------------------------------------------------------------------- @@ -48,6 +48,3 @@ child_spec(Mod, Args) -> type => worker, modules => [Mod] }. - -config(Name) -> emqx:get_env(Name, []). - diff --git a/apps/emqx/src/emqx_vm_mon.erl b/apps/emqx/src/emqx_vm_mon.erl index ce34fff43..79b1537d4 100644 --- a/apps/emqx/src/emqx_vm_mon.erl +++ b/apps/emqx/src/emqx_vm_mon.erl @@ -21,15 +21,7 @@ -include("logger.hrl"). %% APIs --export([start_link/1]). - --export([ get_check_interval/0 - , set_check_interval/1 - , get_process_high_watermark/0 - , set_process_high_watermark/1 - , get_process_low_watermark/0 - , set_process_low_watermark/1 - ]). +-export([start_link/0]). %% gen_server callbacks -export([ init/1 @@ -42,61 +34,19 @@ -define(VM_MON, ?MODULE). -start_link(Opts) -> - gen_server:start_link({local, ?VM_MON}, ?MODULE, [Opts], []). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- - -get_check_interval() -> - call(get_check_interval). - -set_check_interval(Seconds) -> - call({set_check_interval, Seconds}). - -get_process_high_watermark() -> - call(get_process_high_watermark). - -set_process_high_watermark(Float) -> - call({set_process_high_watermark, Float}). - -get_process_low_watermark() -> - call(get_process_low_watermark). - -set_process_low_watermark(Float) -> - call({set_process_low_watermark, Float}). - -call(Req) -> - gen_server:call(?VM_MON, Req, infinity). +start_link() -> + gen_server:start_link({local, ?VM_MON}, ?MODULE, [], []). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- -init([Opts]) -> - {ok, ensure_check_timer(#{check_interval => proplists:get_value(check_interval, Opts), - process_high_watermark => proplists:get_value(process_high_watermark, Opts), - process_low_watermark => proplists:get_value(process_low_watermark, Opts), - timer => undefined})}. - -handle_call(get_check_interval, _From, State) -> - {reply, maps:get(check_interval, State, undefined), State}; - -handle_call({set_check_interval, Seconds}, _From, State) -> - {reply, ok, State#{check_interval := Seconds}}; - -handle_call(get_process_high_watermark, _From, State) -> - {reply, maps:get(process_high_watermark, State, undefined), State}; - -handle_call({set_process_high_watermark, Float}, _From, State) -> - {reply, ok, State#{process_high_watermark := Float}}; - -handle_call(get_process_low_watermark, _From, State) -> - {reply, maps:get(process_low_watermark, State, undefined), State}; - -handle_call({set_process_low_watermark, Float}, _From, State) -> - {reply, ok, State#{process_low_watermark := Float}}; +init([]) -> + start_check_timer(), + {ok, #{}}. handle_call(Req, _From, State) -> ?LOG(error, "[VM_MON] Unexpected call: ~p", [Req]), @@ -106,10 +56,9 @@ handle_cast(Msg, State) -> ?LOG(error, "[VM_MON] Unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({timeout, Timer, check}, - State = #{timer := Timer, - process_high_watermark := ProcHighWatermark, - process_low_watermark := ProcLowWatermark}) -> +handle_info({timeout, _Timer, check}, State) -> + ProcHighWatermark = emqx_config:get([sysmon, vm, process_high_watermark]), + ProcLowWatermark = emqx_config:get([sysmon, vm, process_low_watermark]), ProcessCount = erlang:system_info(process_count), case ProcessCount / erlang:system_info(process_limit) * 100 of Percent when Percent >= ProcHighWatermark -> @@ -121,14 +70,15 @@ handle_info({timeout, Timer, check}, _Precent -> ok end, - {noreply, ensure_check_timer(State)}; + start_check_timer(), + {noreply, State}; handle_info(Info, State) -> ?LOG(error, "[VM_MON] Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #{timer := Timer}) -> - emqx_misc:cancel_timer(Timer). +terminate(_Reason, _State) -> + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -137,5 +87,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -ensure_check_timer(State = #{check_interval := Interval}) -> - State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}. +start_check_timer() -> + Interval = emqx_config:get([sysmon, vm, process_check_interval]), + emqx_misc:start_timer(timer:seconds(Interval), check). diff --git a/apps/emqx_authz/src/emqx_authz_app.erl b/apps/emqx_authz/src/emqx_authz_app.erl index 460d7cbf9..dcce015c7 100644 --- a/apps/emqx_authz/src/emqx_authz_app.erl +++ b/apps/emqx_authz/src/emqx_authz_app.erl @@ -11,7 +11,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_authz_sup:start_link(), - ok = emqx_authz:init(), + %ok = emqx_authz:init(), {ok, Sup}. stop(_State) ->