From db38137d5c8f52d4ab46c26504c258cd46ac8516 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 1 Jul 2021 15:00:59 +0800 Subject: [PATCH] feat(config): apply new config struct to emqx_alarm --- apps/emqx/src/emqx_alarm.erl | 59 ++++++++++----------------- apps/emqx/src/emqx_config_handler.erl | 1 + apps/emqx/src/emqx_kernel_sup.erl | 17 ++++---- apps/emqx/src/emqx_schema.erl | 45 +++++++++----------- apps/emqx/src/emqx_sys_sup.erl | 2 +- bin/emqx | 4 +- 6 files changed, 54 insertions(+), 74 deletions(-) diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 62ce1af8b..9fa4b3c3a 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -29,7 +29,7 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --export([ start_link/1 +-export([ start_link/0 , stop/0 ]). @@ -75,17 +75,9 @@ }). -record(state, { - actions :: [action()], - - size_limit :: non_neg_integer(), - - validity_period :: non_neg_integer(), - timer = undefined :: undefined | reference() }). --type action() :: log | publish | event. - -define(ACTIVATED_ALARM, emqx_activated_alarm). -define(DEACTIVATED_ALARM, emqx_deactivated_alarm). @@ -120,8 +112,8 @@ mnesia(copy) -> %% API %%-------------------------------------------------------------------- -start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). stop() -> gen_server:stop(?MODULE). @@ -158,22 +150,15 @@ get_alarms(deactivated) -> %%-------------------------------------------------------------------- init([]) -> - Opts = [{actions, [log, publish]}], - init([Opts]); -init([Opts]) -> deactivate_all_alarms(), - Actions = proplists:get_value(actions, Opts), - SizeLimit = proplists:get_value(size_limit, Opts), - ValidityPeriod = timer:seconds(proplists:get_value(validity_period, Opts)), - {ok, ensure_delete_timer(#state{actions = Actions, - size_limit = SizeLimit, - validity_period = ValidityPeriod})}. + ensure_delete_timer(), + {ok, #state{}}. %% suppress dialyzer warning due to dirty read/write race condition. %% TODO: change from dirty_read/write to transactional. %% TODO: handle mnesia write errors. -dialyzer([{nowarn_function, [handle_call/3]}]). -handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) -> +handle_call({activate_alarm, Name, Details}, _From, State) -> case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of [#activated_alarm{name = Name}] -> {reply, {error, already_existed}, State}; @@ -183,17 +168,16 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act message = normalize_message(Name, Details), activate_at = erlang:system_time(microsecond)}, mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), - do_actions(activate, Alarm, Actions), + do_actions(activate, Alarm, emqx_config:get([alarm, actions])), {reply, ok, State} end; -handle_call({deactivate_alarm, Name, Details}, _From, State = #state{ - actions = Actions, size_limit = SizeLimit}) -> +handle_call({deactivate_alarm, Name, Details}, _From, State) -> case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of [] -> {reply, {error, not_found}, State}; [Alarm] -> - deactivate_alarm(Details, SizeLimit, Actions, Alarm), + deactivate_alarm(Details, Alarm), {reply, ok, State} end; @@ -223,11 +207,11 @@ handle_cast(Msg, State) -> ?LOG(error, "Unexpected msg: ~p", [Msg]), {noreply, State}. -handle_info({timeout, TRef, delete_expired_deactivated_alarm}, - State = #state{timer = TRef, - validity_period = ValidityPeriod}) -> +handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, State) -> + ValidityPeriod = emqx_config:get([alarm, validity_period]), delete_expired_deactivated_alarms(erlang:system_time(microsecond) - ValidityPeriod * 1000), - {noreply, ensure_delete_timer(State)}; + ensure_delete_timer(), + {noreply, State}; handle_info(Info, State) -> ?LOG(error, "Unexpected info: ~p", [Info]), @@ -243,11 +227,10 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ - activate_at = ActivateAt, name = Name, details = Details0, - message = Msg0}) -> - case SizeLimit > 0 andalso - (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of +deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name, + details = Details0, message = Msg0}) -> + SizeLimit = emqx_config:get([alarm, size_limit]), + case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of true -> case mnesia:dirty_first(?DEACTIVATED_ALARM) of '$end_of_table' -> ok; @@ -263,7 +246,7 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ erlang:system_time(microsecond)), mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), mnesia:dirty_delete(?ACTIVATED_ALARM, Name), - do_actions(deactivate, DeActAlarm, Actions). + do_actions(deactivate, DeActAlarm, emqx_config:get([alarm, actions])). make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> #deactivated_alarm{ @@ -299,9 +282,9 @@ clear_table(TableName) -> ok end. -ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) -> - TRef = emqx_misc:start_timer(ValidityPeriod, delete_expired_deactivated_alarm), - State#state{timer = TRef}. +ensure_delete_timer() -> + emqx_misc:start_timer(emqx_config:get([alarm, validity_period]), + delete_expired_deactivated_alarm). delete_expired_deactivated_alarms(Checkpoint) -> delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint). diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index bc915d778..138521929 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -160,6 +160,7 @@ save_configs(RootKeys, RawConf) -> % end, MappedEnvs). save_config_to_emqx(Conf, RawConf) -> + ?LOG(debug, "set config: ~p", [Conf]), emqx_config:put(emqx_config:unsafe_atom_key_map(Conf)), emqx_config:put_raw(RawConf). diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index 4e29431e2..1c6b0617e 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -27,14 +27,15 @@ start_link() -> init([]) -> {ok, {{one_for_one, 10, 100}, - [child_spec(emqx_global_gc, worker), - child_spec(emqx_pool_sup, supervisor), - child_spec(emqx_hooks, worker), - child_spec(emqx_stats, worker), - child_spec(emqx_metrics, worker), - child_spec(emqx_ctl, worker), - child_spec(emqx_zone, worker), - child_spec(emqx_config_handler, worker) + %% always start emqx_config_handler first to load the emqx.conf to emqx_config + [ child_spec(emqx_config_handler, worker) + , child_spec(emqx_global_gc, worker) + , child_spec(emqx_pool_sup, supervisor) + , child_spec(emqx_hooks, worker) + , child_spec(emqx_stats, worker) + , child_spec(emqx_metrics, worker) + , child_spec(emqx_ctl, worker) + , child_spec(emqx_zone, worker) ]}}. child_spec(M, Type) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3ec691bc0..76993bb94 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -9,7 +9,6 @@ -include_lib("typerefl/include/types.hrl"). -type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all. --type flag() :: true | false. -type duration() :: integer(). -type duration_s() :: integer(). -type duration_ms() :: integer(). @@ -22,7 +21,6 @@ -type bar_separated_list() :: list(). -type ip_port() :: tuple(). --typerefl_from_string({flag/0, emqx_schema, to_flag}). -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). -typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). @@ -37,13 +35,13 @@ % workaround: prevent being recognized as unused functions -export([to_duration/1, to_duration_s/1, to_duration_ms/1, to_bytesize/1, to_wordsize/1, - to_flag/1, to_percent/1, to_comma_separated_list/1, + to_percent/1, to_comma_separated_list/1, to_bar_separated_list/1, to_ip_port/1, to_comma_separated_atoms/1]). -behaviour(hocon_schema). --reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0, +-reflect_type([ log_level/0, duration/0, duration_s/0, duration_ms/0, bytesize/0, wordsize/0, percent/0, file/0, comma_separated_list/0, bar_separated_list/0, ip_port/0, comma_separated_atoms/0]). @@ -75,7 +73,7 @@ fields("cluster") -> , {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]), undefined, manual)} , {"autoclean", t(duration(), "ekka.cluster_autoclean", undefined)} - , {"autoheal", t(flag(), "ekka.cluster_autoheal", false)} + , {"autoheal", t(boolean(), "ekka.cluster_autoheal", false)} , {"static", ref("static")} , {"mcast", ref("mcast")} , {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)} @@ -94,7 +92,7 @@ fields("mcast") -> , {"ports", t(comma_separated_list(), undefined, "4369")} , {"iface", t(string(), undefined, "0.0.0.0")} , {"ttl", t(integer(), undefined, 255)} - , {"loop", t(flag(), undefined, true)} + , {"loop", t(boolean(), undefined, true)} , {"sndbuf", t(bytesize(), undefined, "16KB")} , {"recbuf", t(bytesize(), undefined, "16KB")} , {"buffer", t(bytesize(), undefined, "32KB")} @@ -183,7 +181,7 @@ fields("log") -> ]; fields("console_handler") -> - [ {"enable", t(flag(), undefined, false)} + [ {"enable", t(boolean(), undefined, false)} , {"level", t(log_level(), undefined, warning)} ]; @@ -199,26 +197,26 @@ fields("log_file_handler") -> ]; fields("log_rotation") -> - [ {"enable", t(flag(), undefined, true)} + [ {"enable", t(boolean(), undefined, true)} , {"count", t(range(1, 2048), undefined, 10)} ]; fields("log_overload_kill") -> - [ {"enable", t(flag(), undefined, true)} + [ {"enable", t(boolean(), undefined, true)} , {"mem_size", t(bytesize(), undefined, "30MB")} , {"qlen", t(integer(), undefined, 20000)} , {"restart_after", t(union(duration(), infinity), undefined, "5s")} ]; fields("log_burst_limit") -> - [ {"enable", t(flag(), undefined, true)} + [ {"enable", t(boolean(), undefined, true)} , {"max_count", t(integer(), undefined, 10000)} , {"window_time", t(duration(), undefined, "1s")} ]; fields("lager") -> [ {"handlers", t(string(), "lager.handlers", "")} - , {"crash_log", t(flag(), "lager.crash_log", false)} + , {"crash_log", t(boolean(), "lager.crash_log", false)} ]; fields("stats") -> @@ -258,7 +256,7 @@ fields("mqtt") -> , {"server_keepalive", maybe_disabled(integer())} , {"keepalive_backoff", t(float(), undefined, 0.75)} , {"max_subscriptions", maybe_infinity(integer())} - , {"upgrade_qos", t(flag(), undefined, false)} + , {"upgrade_qos", t(boolean(), undefined, false)} , {"max_inflight", t(range(1, 65535))} , {"retry_interval", t(duration_s(), undefined, "30s")} , {"max_awaiting_rel", maybe_infinity(duration())} @@ -329,7 +327,7 @@ fields("force_shutdown") -> ]; fields("conn_congestion") -> - [ {"enable_alarm", t(flag(), undefined, false)} + [ {"enable_alarm", t(boolean(), undefined, false)} , {"min_alarm_sustain_duration", t(duration(), undefined, "1m")} ]; @@ -380,11 +378,11 @@ fields("tcp_opts") -> [ {"active_n", t(integer(), undefined, 100)} , {"backlog", t(integer(), undefined, 1024)} , {"send_timeout", t(duration(), undefined, "15s")} - , {"send_timeout_close", t(flag(), undefined, true)} + , {"send_timeout_close", t(boolean(), undefined, true)} , {"recbuf", t(bytesize())} , {"sndbuf", t(bytesize())} , {"buffer", t(bytesize())} - , {"tune_buffer", t(flag())} + , {"tune_buffer", t(boolean())} , {"high_watermark", t(bytesize(), undefined, "1MB")} , {"nodelay", t(boolean())} , {"reuseaddr", t(boolean())} @@ -444,11 +442,11 @@ fields("plugins") -> fields("broker") -> [ {"sys_msg_interval", maybe_disabled(duration(), "1m")} , {"sys_heartbeat_interval", maybe_disabled(duration(), "30s")} - , {"enable_session_registry", t(flag(), undefined, true)} + , {"enable_session_registry", t(boolean(), undefined, true)} , {"session_locking_strategy", t(union([local, leader, quorum, all]), undefined, quorum)} , {"shared_subscription_strategy", t(union(random, round_robin), undefined, round_robin)} , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} - , {"route_batch_clean", t(flag(), undefined, true)} + , {"route_batch_clean", t(boolean(), undefined, true)} , {"perf", ref("perf")} ]; @@ -504,7 +502,7 @@ mqtt_listener() -> , {"max_connections", maybe_infinity(integer(), infinity)} , {"rate_limit", ref("rate_limit")} , {"access_rules", t(hoconsc:array(string()))} - , {"proxy_protocol", t(flag())} + , {"proxy_protocol", t(boolean(), undefined, false)} , {"proxy_protocol_timeout", t(duration())} ]. @@ -628,15 +626,15 @@ filter(Opts) -> %% ...] ssl(Defaults) -> D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end, - [ {"enable", t(flag(), undefined, D("enable"))} + [ {"enable", t(boolean(), undefined, D("enable"))} , {"cacertfile", t(string(), undefined, D("cacertfile"))} , {"certfile", t(string(), undefined, D("certfile"))} , {"keyfile", t(string(), undefined, D("keyfile"))} , {"verify", t(union(verify_peer, verify_none), undefined, D("verify"))} , {"fail_if_no_peer_cert", t(boolean(), undefined, D("fail_if_no_peer_cert"))} - , {"secure_renegotiate", t(flag(), undefined, D("secure_renegotiate"))} - , {"reuse_sessions", t(flag(), undefined, D("reuse_sessions"))} - , {"honor_cipher_order", t(flag(), undefined, D("honor_cipher_order"))} + , {"secure_renegotiate", t(boolean(), undefined, D("secure_renegotiate"))} + , {"reuse_sessions", t(boolean(), undefined, D("reuse_sessions"))} + , {"honor_cipher_order", t(boolean(), undefined, D("honor_cipher_order"))} , {"handshake_timeout", t(duration(), undefined, D("handshake_timeout"))} , {"depth", t(integer(), undefined, D("depth"))} , {"password", hoconsc:t(string(), #{default => D("key_password"), @@ -764,9 +762,6 @@ maybe_infinity(T, Default) -> maybe_sth(What, Type, Default) -> t(union([What, Type]), undefined, Default). -to_flag(Str) -> - {ok, hocon_postprocess:onoff(Str)}. - to_duration(Str) -> case hocon_postprocess:duration(Str) of I when is_integer(I) -> {ok, I}; diff --git a/apps/emqx/src/emqx_sys_sup.erl b/apps/emqx/src/emqx_sys_sup.erl index 50d086156..265184f05 100644 --- a/apps/emqx/src/emqx_sys_sup.erl +++ b/apps/emqx/src/emqx_sys_sup.erl @@ -27,7 +27,7 @@ start_link() -> init([]) -> Childs = [child_spec(emqx_sys), - child_spec(emqx_alarm, [config(alarm)]), + child_spec(emqx_alarm), child_spec(emqx_sys_mon, [config(sysmon)]), child_spec(emqx_os_mon, [config(os_mon)]), child_spec(emqx_vm_mon, [config(vm_mon)])], diff --git a/bin/emqx b/bin/emqx index 70b878715..22593ea82 100755 --- a/bin/emqx +++ b/bin/emqx @@ -582,7 +582,7 @@ case "$1" in # set before generate_config if [ "${_EMQX_START_MODE:-}" = '' ]; then - export EMQX_CONSOLE_HANDLER__ENABLE="${EMQX_CONSOLE_HANDLER__ENABLE:-true}" + export EMQX_LOG__CONSOLE_HANDLER__ENABLE="${EMQX_LOG__CONSOLE_HANDLER__ENABLE:-true}" fi #generate app.config and vm.args @@ -626,7 +626,7 @@ case "$1" in # or other supervision services # set before generate_config - export EMQX_CONSOLE_HANDLER__ENABLE="${EMQX_CONSOLE_HANDLER__ENABLE:-true}" + export EMQX_LOG__CONSOLE_HANDLER__ENABLE="${EMQX_LOG__CONSOLE_HANDLER__ENABLE:-true}" #generate app.config and vm.args generate_config