feat(config): apply new config struct to emqx_alarm

This commit is contained in:
Shawn 2021-07-01 15:00:59 +08:00
parent 66aaed1f87
commit db38137d5c
6 changed files with 54 additions and 74 deletions

View File

@ -29,7 +29,7 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
-export([ start_link/1 -export([ start_link/0
, stop/0 , stop/0
]). ]).
@ -75,17 +75,9 @@
}). }).
-record(state, { -record(state, {
actions :: [action()],
size_limit :: non_neg_integer(),
validity_period :: non_neg_integer(),
timer = undefined :: undefined | reference() timer = undefined :: undefined | reference()
}). }).
-type action() :: log | publish | event.
-define(ACTIVATED_ALARM, emqx_activated_alarm). -define(ACTIVATED_ALARM, emqx_activated_alarm).
-define(DEACTIVATED_ALARM, emqx_deactivated_alarm). -define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
@ -120,8 +112,8 @@ mnesia(copy) ->
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_link(Opts) -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() -> stop() ->
gen_server:stop(?MODULE). gen_server:stop(?MODULE).
@ -158,22 +150,15 @@ get_alarms(deactivated) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
Opts = [{actions, [log, publish]}],
init([Opts]);
init([Opts]) ->
deactivate_all_alarms(), deactivate_all_alarms(),
Actions = proplists:get_value(actions, Opts), ensure_delete_timer(),
SizeLimit = proplists:get_value(size_limit, Opts), {ok, #state{}}.
ValidityPeriod = timer:seconds(proplists:get_value(validity_period, Opts)),
{ok, ensure_delete_timer(#state{actions = Actions,
size_limit = SizeLimit,
validity_period = ValidityPeriod})}.
%% suppress dialyzer warning due to dirty read/write race condition. %% suppress dialyzer warning due to dirty read/write race condition.
%% TODO: change from dirty_read/write to transactional. %% TODO: change from dirty_read/write to transactional.
%% TODO: handle mnesia write errors. %% TODO: handle mnesia write errors.
-dialyzer([{nowarn_function, [handle_call/3]}]). -dialyzer([{nowarn_function, [handle_call/3]}]).
handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Actions}) -> handle_call({activate_alarm, Name, Details}, _From, State) ->
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
[#activated_alarm{name = Name}] -> [#activated_alarm{name = Name}] ->
{reply, {error, already_existed}, State}; {reply, {error, already_existed}, State};
@ -183,17 +168,16 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act
message = normalize_message(Name, Details), message = normalize_message(Name, Details),
activate_at = erlang:system_time(microsecond)}, activate_at = erlang:system_time(microsecond)},
mnesia:dirty_write(?ACTIVATED_ALARM, Alarm), mnesia:dirty_write(?ACTIVATED_ALARM, Alarm),
do_actions(activate, Alarm, Actions), do_actions(activate, Alarm, emqx_config:get([alarm, actions])),
{reply, ok, State} {reply, ok, State}
end; end;
handle_call({deactivate_alarm, Name, Details}, _From, State = #state{ handle_call({deactivate_alarm, Name, Details}, _From, State) ->
actions = Actions, size_limit = SizeLimit}) ->
case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of
[] -> [] ->
{reply, {error, not_found}, State}; {reply, {error, not_found}, State};
[Alarm] -> [Alarm] ->
deactivate_alarm(Details, SizeLimit, Actions, Alarm), deactivate_alarm(Details, Alarm),
{reply, ok, State} {reply, ok, State}
end; end;
@ -223,11 +207,11 @@ handle_cast(Msg, State) ->
?LOG(error, "Unexpected msg: ~p", [Msg]), ?LOG(error, "Unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({timeout, TRef, delete_expired_deactivated_alarm}, handle_info({timeout, _TRef, delete_expired_deactivated_alarm}, State) ->
State = #state{timer = TRef, ValidityPeriod = emqx_config:get([alarm, validity_period]),
validity_period = ValidityPeriod}) ->
delete_expired_deactivated_alarms(erlang:system_time(microsecond) - ValidityPeriod * 1000), delete_expired_deactivated_alarms(erlang:system_time(microsecond) - ValidityPeriod * 1000),
{noreply, ensure_delete_timer(State)}; ensure_delete_timer(),
{noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]), ?LOG(error, "Unexpected info: ~p", [Info]),
@ -243,11 +227,10 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name,
activate_at = ActivateAt, name = Name, details = Details0, details = Details0, message = Msg0}) ->
message = Msg0}) -> SizeLimit = emqx_config:get([alarm, size_limit]),
case SizeLimit > 0 andalso case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
(mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of
true -> true ->
case mnesia:dirty_first(?DEACTIVATED_ALARM) of case mnesia:dirty_first(?DEACTIVATED_ALARM) of
'$end_of_table' -> ok; '$end_of_table' -> ok;
@ -263,7 +246,7 @@ deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{
erlang:system_time(microsecond)), erlang:system_time(microsecond)),
mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm),
mnesia:dirty_delete(?ACTIVATED_ALARM, Name), mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
do_actions(deactivate, DeActAlarm, Actions). do_actions(deactivate, DeActAlarm, emqx_config:get([alarm, actions])).
make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) ->
#deactivated_alarm{ #deactivated_alarm{
@ -299,9 +282,9 @@ clear_table(TableName) ->
ok ok
end. end.
ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) -> ensure_delete_timer() ->
TRef = emqx_misc:start_timer(ValidityPeriod, delete_expired_deactivated_alarm), emqx_misc:start_timer(emqx_config:get([alarm, validity_period]),
State#state{timer = TRef}. delete_expired_deactivated_alarm).
delete_expired_deactivated_alarms(Checkpoint) -> delete_expired_deactivated_alarms(Checkpoint) ->
delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint). delete_expired_deactivated_alarms(mnesia:dirty_first(?DEACTIVATED_ALARM), Checkpoint).

View File

@ -160,6 +160,7 @@ save_configs(RootKeys, RawConf) ->
% end, MappedEnvs). % end, MappedEnvs).
save_config_to_emqx(Conf, RawConf) -> save_config_to_emqx(Conf, RawConf) ->
?LOG(debug, "set config: ~p", [Conf]),
emqx_config:put(emqx_config:unsafe_atom_key_map(Conf)), emqx_config:put(emqx_config:unsafe_atom_key_map(Conf)),
emqx_config:put_raw(RawConf). emqx_config:put_raw(RawConf).

View File

@ -27,14 +27,15 @@ start_link() ->
init([]) -> init([]) ->
{ok, {{one_for_one, 10, 100}, {ok, {{one_for_one, 10, 100},
[child_spec(emqx_global_gc, worker), %% always start emqx_config_handler first to load the emqx.conf to emqx_config
child_spec(emqx_pool_sup, supervisor), [ child_spec(emqx_config_handler, worker)
child_spec(emqx_hooks, worker), , child_spec(emqx_global_gc, worker)
child_spec(emqx_stats, worker), , child_spec(emqx_pool_sup, supervisor)
child_spec(emqx_metrics, worker), , child_spec(emqx_hooks, worker)
child_spec(emqx_ctl, worker), , child_spec(emqx_stats, worker)
child_spec(emqx_zone, worker), , child_spec(emqx_metrics, worker)
child_spec(emqx_config_handler, worker) , child_spec(emqx_ctl, worker)
, child_spec(emqx_zone, worker)
]}}. ]}}.
child_spec(M, Type) -> child_spec(M, Type) ->

View File

@ -9,7 +9,6 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all. -type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all.
-type flag() :: true | false.
-type duration() :: integer(). -type duration() :: integer().
-type duration_s() :: integer(). -type duration_s() :: integer().
-type duration_ms() :: integer(). -type duration_ms() :: integer().
@ -22,7 +21,6 @@
-type bar_separated_list() :: list(). -type bar_separated_list() :: list().
-type ip_port() :: tuple(). -type ip_port() :: tuple().
-typerefl_from_string({flag/0, emqx_schema, to_flag}).
-typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). -typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}).
@ -37,13 +35,13 @@
% workaround: prevent being recognized as unused functions % workaround: prevent being recognized as unused functions
-export([to_duration/1, to_duration_s/1, to_duration_ms/1, -export([to_duration/1, to_duration_s/1, to_duration_ms/1,
to_bytesize/1, to_wordsize/1, to_bytesize/1, to_wordsize/1,
to_flag/1, to_percent/1, to_comma_separated_list/1, to_percent/1, to_comma_separated_list/1,
to_bar_separated_list/1, to_ip_port/1, to_bar_separated_list/1, to_ip_port/1,
to_comma_separated_atoms/1]). to_comma_separated_atoms/1]).
-behaviour(hocon_schema). -behaviour(hocon_schema).
-reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0, -reflect_type([ log_level/0, duration/0, duration_s/0, duration_ms/0,
bytesize/0, wordsize/0, percent/0, file/0, bytesize/0, wordsize/0, percent/0, file/0,
comma_separated_list/0, bar_separated_list/0, ip_port/0, comma_separated_list/0, bar_separated_list/0, ip_port/0,
comma_separated_atoms/0]). comma_separated_atoms/0]).
@ -75,7 +73,7 @@ fields("cluster") ->
, {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]), , {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]),
undefined, manual)} undefined, manual)}
, {"autoclean", t(duration(), "ekka.cluster_autoclean", undefined)} , {"autoclean", t(duration(), "ekka.cluster_autoclean", undefined)}
, {"autoheal", t(flag(), "ekka.cluster_autoheal", false)} , {"autoheal", t(boolean(), "ekka.cluster_autoheal", false)}
, {"static", ref("static")} , {"static", ref("static")}
, {"mcast", ref("mcast")} , {"mcast", ref("mcast")}
, {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)} , {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)}
@ -94,7 +92,7 @@ fields("mcast") ->
, {"ports", t(comma_separated_list(), undefined, "4369")} , {"ports", t(comma_separated_list(), undefined, "4369")}
, {"iface", t(string(), undefined, "0.0.0.0")} , {"iface", t(string(), undefined, "0.0.0.0")}
, {"ttl", t(integer(), undefined, 255)} , {"ttl", t(integer(), undefined, 255)}
, {"loop", t(flag(), undefined, true)} , {"loop", t(boolean(), undefined, true)}
, {"sndbuf", t(bytesize(), undefined, "16KB")} , {"sndbuf", t(bytesize(), undefined, "16KB")}
, {"recbuf", t(bytesize(), undefined, "16KB")} , {"recbuf", t(bytesize(), undefined, "16KB")}
, {"buffer", t(bytesize(), undefined, "32KB")} , {"buffer", t(bytesize(), undefined, "32KB")}
@ -183,7 +181,7 @@ fields("log") ->
]; ];
fields("console_handler") -> fields("console_handler") ->
[ {"enable", t(flag(), undefined, false)} [ {"enable", t(boolean(), undefined, false)}
, {"level", t(log_level(), undefined, warning)} , {"level", t(log_level(), undefined, warning)}
]; ];
@ -199,26 +197,26 @@ fields("log_file_handler") ->
]; ];
fields("log_rotation") -> fields("log_rotation") ->
[ {"enable", t(flag(), undefined, true)} [ {"enable", t(boolean(), undefined, true)}
, {"count", t(range(1, 2048), undefined, 10)} , {"count", t(range(1, 2048), undefined, 10)}
]; ];
fields("log_overload_kill") -> fields("log_overload_kill") ->
[ {"enable", t(flag(), undefined, true)} [ {"enable", t(boolean(), undefined, true)}
, {"mem_size", t(bytesize(), undefined, "30MB")} , {"mem_size", t(bytesize(), undefined, "30MB")}
, {"qlen", t(integer(), undefined, 20000)} , {"qlen", t(integer(), undefined, 20000)}
, {"restart_after", t(union(duration(), infinity), undefined, "5s")} , {"restart_after", t(union(duration(), infinity), undefined, "5s")}
]; ];
fields("log_burst_limit") -> fields("log_burst_limit") ->
[ {"enable", t(flag(), undefined, true)} [ {"enable", t(boolean(), undefined, true)}
, {"max_count", t(integer(), undefined, 10000)} , {"max_count", t(integer(), undefined, 10000)}
, {"window_time", t(duration(), undefined, "1s")} , {"window_time", t(duration(), undefined, "1s")}
]; ];
fields("lager") -> fields("lager") ->
[ {"handlers", t(string(), "lager.handlers", "")} [ {"handlers", t(string(), "lager.handlers", "")}
, {"crash_log", t(flag(), "lager.crash_log", false)} , {"crash_log", t(boolean(), "lager.crash_log", false)}
]; ];
fields("stats") -> fields("stats") ->
@ -258,7 +256,7 @@ fields("mqtt") ->
, {"server_keepalive", maybe_disabled(integer())} , {"server_keepalive", maybe_disabled(integer())}
, {"keepalive_backoff", t(float(), undefined, 0.75)} , {"keepalive_backoff", t(float(), undefined, 0.75)}
, {"max_subscriptions", maybe_infinity(integer())} , {"max_subscriptions", maybe_infinity(integer())}
, {"upgrade_qos", t(flag(), undefined, false)} , {"upgrade_qos", t(boolean(), undefined, false)}
, {"max_inflight", t(range(1, 65535))} , {"max_inflight", t(range(1, 65535))}
, {"retry_interval", t(duration_s(), undefined, "30s")} , {"retry_interval", t(duration_s(), undefined, "30s")}
, {"max_awaiting_rel", maybe_infinity(duration())} , {"max_awaiting_rel", maybe_infinity(duration())}
@ -329,7 +327,7 @@ fields("force_shutdown") ->
]; ];
fields("conn_congestion") -> fields("conn_congestion") ->
[ {"enable_alarm", t(flag(), undefined, false)} [ {"enable_alarm", t(boolean(), undefined, false)}
, {"min_alarm_sustain_duration", t(duration(), undefined, "1m")} , {"min_alarm_sustain_duration", t(duration(), undefined, "1m")}
]; ];
@ -380,11 +378,11 @@ fields("tcp_opts") ->
[ {"active_n", t(integer(), undefined, 100)} [ {"active_n", t(integer(), undefined, 100)}
, {"backlog", t(integer(), undefined, 1024)} , {"backlog", t(integer(), undefined, 1024)}
, {"send_timeout", t(duration(), undefined, "15s")} , {"send_timeout", t(duration(), undefined, "15s")}
, {"send_timeout_close", t(flag(), undefined, true)} , {"send_timeout_close", t(boolean(), undefined, true)}
, {"recbuf", t(bytesize())} , {"recbuf", t(bytesize())}
, {"sndbuf", t(bytesize())} , {"sndbuf", t(bytesize())}
, {"buffer", t(bytesize())} , {"buffer", t(bytesize())}
, {"tune_buffer", t(flag())} , {"tune_buffer", t(boolean())}
, {"high_watermark", t(bytesize(), undefined, "1MB")} , {"high_watermark", t(bytesize(), undefined, "1MB")}
, {"nodelay", t(boolean())} , {"nodelay", t(boolean())}
, {"reuseaddr", t(boolean())} , {"reuseaddr", t(boolean())}
@ -444,11 +442,11 @@ fields("plugins") ->
fields("broker") -> fields("broker") ->
[ {"sys_msg_interval", maybe_disabled(duration(), "1m")} [ {"sys_msg_interval", maybe_disabled(duration(), "1m")}
, {"sys_heartbeat_interval", maybe_disabled(duration(), "30s")} , {"sys_heartbeat_interval", maybe_disabled(duration(), "30s")}
, {"enable_session_registry", t(flag(), undefined, true)} , {"enable_session_registry", t(boolean(), undefined, true)}
, {"session_locking_strategy", t(union([local, leader, quorum, all]), undefined, quorum)} , {"session_locking_strategy", t(union([local, leader, quorum, all]), undefined, quorum)}
, {"shared_subscription_strategy", t(union(random, round_robin), undefined, round_robin)} , {"shared_subscription_strategy", t(union(random, round_robin), undefined, round_robin)}
, {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)}
, {"route_batch_clean", t(flag(), undefined, true)} , {"route_batch_clean", t(boolean(), undefined, true)}
, {"perf", ref("perf")} , {"perf", ref("perf")}
]; ];
@ -504,7 +502,7 @@ mqtt_listener() ->
, {"max_connections", maybe_infinity(integer(), infinity)} , {"max_connections", maybe_infinity(integer(), infinity)}
, {"rate_limit", ref("rate_limit")} , {"rate_limit", ref("rate_limit")}
, {"access_rules", t(hoconsc:array(string()))} , {"access_rules", t(hoconsc:array(string()))}
, {"proxy_protocol", t(flag())} , {"proxy_protocol", t(boolean(), undefined, false)}
, {"proxy_protocol_timeout", t(duration())} , {"proxy_protocol_timeout", t(duration())}
]. ].
@ -628,15 +626,15 @@ filter(Opts) ->
%% ...] %% ...]
ssl(Defaults) -> ssl(Defaults) ->
D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end, D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end,
[ {"enable", t(flag(), undefined, D("enable"))} [ {"enable", t(boolean(), undefined, D("enable"))}
, {"cacertfile", t(string(), undefined, D("cacertfile"))} , {"cacertfile", t(string(), undefined, D("cacertfile"))}
, {"certfile", t(string(), undefined, D("certfile"))} , {"certfile", t(string(), undefined, D("certfile"))}
, {"keyfile", t(string(), undefined, D("keyfile"))} , {"keyfile", t(string(), undefined, D("keyfile"))}
, {"verify", t(union(verify_peer, verify_none), undefined, D("verify"))} , {"verify", t(union(verify_peer, verify_none), undefined, D("verify"))}
, {"fail_if_no_peer_cert", t(boolean(), undefined, D("fail_if_no_peer_cert"))} , {"fail_if_no_peer_cert", t(boolean(), undefined, D("fail_if_no_peer_cert"))}
, {"secure_renegotiate", t(flag(), undefined, D("secure_renegotiate"))} , {"secure_renegotiate", t(boolean(), undefined, D("secure_renegotiate"))}
, {"reuse_sessions", t(flag(), undefined, D("reuse_sessions"))} , {"reuse_sessions", t(boolean(), undefined, D("reuse_sessions"))}
, {"honor_cipher_order", t(flag(), undefined, D("honor_cipher_order"))} , {"honor_cipher_order", t(boolean(), undefined, D("honor_cipher_order"))}
, {"handshake_timeout", t(duration(), undefined, D("handshake_timeout"))} , {"handshake_timeout", t(duration(), undefined, D("handshake_timeout"))}
, {"depth", t(integer(), undefined, D("depth"))} , {"depth", t(integer(), undefined, D("depth"))}
, {"password", hoconsc:t(string(), #{default => D("key_password"), , {"password", hoconsc:t(string(), #{default => D("key_password"),
@ -764,9 +762,6 @@ maybe_infinity(T, Default) ->
maybe_sth(What, Type, Default) -> maybe_sth(What, Type, Default) ->
t(union([What, Type]), undefined, Default). t(union([What, Type]), undefined, Default).
to_flag(Str) ->
{ok, hocon_postprocess:onoff(Str)}.
to_duration(Str) -> to_duration(Str) ->
case hocon_postprocess:duration(Str) of case hocon_postprocess:duration(Str) of
I when is_integer(I) -> {ok, I}; I when is_integer(I) -> {ok, I};

View File

@ -27,7 +27,7 @@ start_link() ->
init([]) -> init([]) ->
Childs = [child_spec(emqx_sys), Childs = [child_spec(emqx_sys),
child_spec(emqx_alarm, [config(alarm)]), child_spec(emqx_alarm),
child_spec(emqx_sys_mon, [config(sysmon)]), child_spec(emqx_sys_mon, [config(sysmon)]),
child_spec(emqx_os_mon, [config(os_mon)]), child_spec(emqx_os_mon, [config(os_mon)]),
child_spec(emqx_vm_mon, [config(vm_mon)])], child_spec(emqx_vm_mon, [config(vm_mon)])],

View File

@ -582,7 +582,7 @@ case "$1" in
# set before generate_config # set before generate_config
if [ "${_EMQX_START_MODE:-}" = '' ]; then if [ "${_EMQX_START_MODE:-}" = '' ]; then
export EMQX_CONSOLE_HANDLER__ENABLE="${EMQX_CONSOLE_HANDLER__ENABLE:-true}" export EMQX_LOG__CONSOLE_HANDLER__ENABLE="${EMQX_LOG__CONSOLE_HANDLER__ENABLE:-true}"
fi fi
#generate app.config and vm.args #generate app.config and vm.args
@ -626,7 +626,7 @@ case "$1" in
# or other supervision services # or other supervision services
# set before generate_config # set before generate_config
export EMQX_CONSOLE_HANDLER__ENABLE="${EMQX_CONSOLE_HANDLER__ENABLE:-true}" export EMQX_LOG__CONSOLE_HANDLER__ENABLE="${EMQX_LOG__CONSOLE_HANDLER__ENABLE:-true}"
#generate app.config and vm.args #generate app.config and vm.args
generate_config generate_config