refactor(config): emqx.conf for 5.0

This commit is contained in:
Shawn 2021-06-30 09:39:03 +08:00
parent d5eb37c537
commit 02c9a3163b
3 changed files with 4853 additions and 2702 deletions

File diff suppressed because it is too large Load Diff

2467
apps/emqx/etc/emqx.conf.old Normal file

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,7 @@
-type duration_s() :: integer(). -type duration_s() :: integer().
-type duration_ms() :: integer(). -type duration_ms() :: integer().
-type bytesize() :: integer(). -type bytesize() :: integer().
-type wordsize() :: bytesize().
-type percent() :: float(). -type percent() :: float().
-type file() :: string(). -type file() :: string().
-type comma_separated_list() :: list(). -type comma_separated_list() :: list().
@ -26,6 +27,7 @@
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
-typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}). -typerefl_from_string({duration_ms/0, emqx_schema, to_duration_ms}).
-typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}). -typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}).
-typerefl_from_string({wordsize/0, emqx_schema, to_wordsize}).
-typerefl_from_string({percent/0, emqx_schema, to_percent}). -typerefl_from_string({percent/0, emqx_schema, to_percent}).
-typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}). -typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}).
-typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}). -typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}).
@ -33,7 +35,8 @@
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
% workaround: prevent being recognized as unused functions % workaround: prevent being recognized as unused functions
-export([to_duration/1, to_duration_s/1, to_duration_ms/1, to_bytesize/1, -export([to_duration/1, to_duration_s/1, to_duration_ms/1,
to_bytesize/1, to_wordsize/1,
to_flag/1, to_percent/1, to_comma_separated_list/1, to_flag/1, to_percent/1, to_comma_separated_list/1,
to_bar_separated_list/1, to_ip_port/1, to_bar_separated_list/1, to_ip_port/1,
to_comma_separated_atoms/1]). to_comma_separated_atoms/1]).
@ -41,21 +44,21 @@
-behaviour(hocon_schema). -behaviour(hocon_schema).
-reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0, -reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, duration_ms/0,
bytesize/0, percent/0, file/0, bytesize/0, wordsize/0, percent/0, file/0,
comma_separated_list/0, bar_separated_list/0, ip_port/0, comma_separated_list/0, bar_separated_list/0, ip_port/0,
comma_separated_atoms/0]). comma_separated_atoms/0]).
-export([structs/0, fields/1, translations/0, translation/1]). -export([structs/0, fields/1, translations/0, translation/1]).
-export([t/1, t/3, t/4, ref/1]). -export([t/1, t/3, t/4, ref/1]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([ssl/2, tr_ssl/2, tr_password_hash/2]). -export([ssl/1, tr_ssl/2, tr_password_hash/2]).
%% will be used by emqx_ct_helper to find the dependent apps %% will be used by emqx_ct_helper to find the dependent apps
-export([includes/0]). -export([includes/0]).
structs() -> ["cluster", "node", "rpc", "log", "lager", structs() -> ["cluster", "node", "rpc", "log", "lager",
"acl", "mqtt", "zone", "listener", "module", "broker", "acl", "mqtt", "zone", "listeners", "module", "broker",
"plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"] "plugins", "sysmon", "alarm", "telemetry"]
++ includes(). ++ includes().
-ifdef(TEST). -ifdef(TEST).
@ -69,7 +72,8 @@ includes() ->
fields("cluster") -> fields("cluster") ->
[ {"name", t(atom(), "ekka.cluster_name", emqxcl)} [ {"name", t(atom(), "ekka.cluster_name", emqxcl)}
, {"discovery", t(atom(), undefined, manual)} , {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]),
undefined, manual)}
, {"autoclean", t(duration(), "ekka.cluster_autoclean", undefined)} , {"autoclean", t(duration(), "ekka.cluster_autoclean", undefined)}
, {"autoheal", t(flag(), "ekka.cluster_autoheal", false)} , {"autoheal", t(flag(), "ekka.cluster_autoheal", false)}
, {"static", ref("static")} , {"static", ref("static")}
@ -83,7 +87,7 @@ fields("cluster") ->
]; ];
fields("static") -> fields("static") ->
[ {"seeds", t(comma_separated_list())}]; [ {"seeds", t(list(string()))}];
fields("mcast") -> fields("mcast") ->
[ {"addr", t(string(), undefined, "239.192.0.1")} [ {"addr", t(string(), undefined, "239.192.0.1")}
@ -97,7 +101,8 @@ fields("mcast") ->
]; ];
fields("dns") -> fields("dns") ->
[ {"app", t(string())}]; [ {"name", t(string())}
, {"app", t(string())}];
fields("etcd") -> fields("etcd") ->
[ {"server", t(comma_separated_list())} [ {"server", t(comma_separated_list())}
@ -107,7 +112,7 @@ fields("etcd") ->
]; ];
fields("etcd_ssl") -> fields("etcd_ssl") ->
ssl(undefined, #{}); ssl(#{});
fields("k8s") -> fields("k8s") ->
[ {"apiserver", t(string())} [ {"apiserver", t(string())}
@ -125,7 +130,6 @@ fields("rlog") ->
fields("node") -> fields("node") ->
[ {"name", t(string(), "vm_args.-name", "emqx@127.0.0.1", "EMQX_NODE_NAME")} [ {"name", t(string(), "vm_args.-name", "emqx@127.0.0.1", "EMQX_NODE_NAME")}
, {"ssl_dist_optfile", t(string(), "vm_args.-ssl_dist_optfile", undefined)}
, {"cookie", hoconsc:t(string(), #{mapping => "vm_args.-setcookie", , {"cookie", hoconsc:t(string(), #{mapping => "vm_args.-setcookie",
default => "emqxsecretcookie", default => "emqxsecretcookie",
sensitive => true, sensitive => true,
@ -133,16 +137,8 @@ fields("node") ->
})} })}
, {"data_dir", t(string(), "emqx.data_dir", undefined)} , {"data_dir", t(string(), "emqx.data_dir", undefined)}
, {"etc_dir", t(string(), "emqx.etc_dir", undefined)} , {"etc_dir", t(string(), "emqx.etc_dir", undefined)}
, {"heartbeat", t(flag(), undefined, false)}
, {"async_threads", t(range(1, 1024), "vm_args.+A", undefined)}
, {"process_limit", t(integer(), "vm_args.+P", undefined)}
, {"max_ports", t(range(1024, 134217727), "vm_args.+Q", undefined, "EMQX_MAX_PORTS")}
, {"dist_buffer_size", fun node__dist_buffer_size/1}
, {"global_gc_interval", t(duration_s(), "emqx.global_gc_interval", undefined)} , {"global_gc_interval", t(duration_s(), "emqx.global_gc_interval", undefined)}
, {"fullsweep_after", t(non_neg_integer(), , {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
"vm_args.-env ERL_FULLSWEEP_AFTER", 1000)}
, {"max_ets_tables", t(integer(), "vm_args.+e", 256000)}
, {"crash_dump", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
, {"dist_net_ticktime", t(integer(), "vm_args.-kernel net_ticktime", undefined)} , {"dist_net_ticktime", t(integer(), "vm_args.-kernel net_ticktime", undefined)}
, {"dist_listen_min", t(integer(), "kernel.inet_dist_listen_min", undefined)} , {"dist_listen_min", t(integer(), "kernel.inet_dist_listen_min", undefined)}
, {"dist_listen_max", t(integer(), "kernel.inet_dist_listen_max", undefined)} , {"dist_listen_max", t(integer(), "kernel.inet_dist_listen_max", undefined)}
@ -215,50 +211,40 @@ fields("lager") ->
, {"crash_log", t(flag(), "lager.crash_log", false)} , {"crash_log", t(flag(), "lager.crash_log", false)}
]; ];
fields("stats") ->
[ {"enable", t(boolean(), undefined, true)}
];
fields("auth") ->
[ {"enable", t(boolean(), undefined, false)}
];
fields("acl") -> fields("acl") ->
[ {"allow_anonymous", t(boolean(), "emqx.allow_anonymous", false)} [ {"enable", t(boolean(), undefined, false)}
, {"acl_nomatch", t(union(allow, deny), "emqx.acl_nomatch", deny)} , {"cache", ref("acl_cache")}
, {"acl_file", t(string(), "emqx.acl_file", undefined)} , {"deny_action", t(union(ignore, disconnect), undefined, ignore)}
, {"enable_acl_cache", t(flag(), "emqx.enable_acl_cache", true)} ];
, {"acl_cache_ttl", t(duration(), "emqx.acl_cache_ttl", "1m")}
, {"acl_cache_max_size", t(range(1, inf), "emqx.acl_cache_max_size", 32)} fields("acl_cache") ->
, {"acl_deny_action", t(union(ignore, disconnect), "emqx.acl_deny_action", ignore)} [ {"enable", t(boolean(), undefined, true)}
, {"flapping_detect_policy", t(comma_separated_list(), undefined, "30,1m,5m")} , {"max_size", t(range(1, 1048576), undefined, 32)}
, {"ttl", t(duration(), undefined, "1m")}
]; ];
fields("mqtt") -> fields("mqtt") ->
[ {"max_packet_size", t(bytesize(), "emqx.max_packet_size", "1MB", "EMQX_MAX_PACKET_SIZE")} [ {"mountpoint", t(binary(), undefined, <<"">>)}
, {"max_clientid_len", t(integer(), "emqx.max_clientid_len", 65535)} , {"idle_timeout", t(duration(), undefined, "15s")}
, {"max_topic_levels", t(integer(), "emqx.max_topic_levels", 0)} , {"max_packet_size", t(bytesize(), undefined, "1MB")}
, {"max_qos_allowed", t(range(0, 2), "emqx.max_qos_allowed", 2)} , {"max_clientid_len", t(integer(), undefined, 65535)}
, {"max_topic_alias", t(integer(), "emqx.max_topic_alias", 65535)} , {"max_topic_levels", t(integer(), undefined, 0)}
, {"retain_available", t(boolean(), "emqx.retain_available", true)} , {"max_qos_allowed", t(range(0, 2), undefined, 2)}
, {"wildcard_subscription", t(boolean(), "emqx.wildcard_subscription", true)} , {"max_topic_alias", t(integer(), undefined, 65535)}
, {"shared_subscription", t(boolean(), "emqx.shared_subscription", true)} , {"retain_available", t(boolean(), undefined, true)}
, {"ignore_loop_deliver", t(boolean(), "emqx.ignore_loop_deliver", true)} , {"wildcard_subscription", t(boolean(), undefined, true)}
, {"strict_mode", t(boolean(), "emqx.strict_mode", false)} , {"shared_subscription", t(boolean(), undefined, true)}
, {"response_information", t(string(), "emqx.response_information", undefined)} , {"ignore_loop_deliver", t(boolean())}
]; , {"strict_mode", t(boolean(), undefined, false)}
, {"response_information", t(string(), undefined, undefined)}
fields("zone") ->
[ {"$name", ref("zone_settings")}];
fields("zone_settings") ->
[ {"idle_timeout", t(duration(), undefined, "15s")}
, {"allow_anonymous", t(boolean())}
, {"acl_nomatch", t(union(allow, deny))}
, {"enable_acl", t(flag(), undefined, false)}
, {"acl_deny_action", t(union(ignore, disconnect), undefined, ignore)}
, {"enable_ban", t(flag(), undefined, false)}
, {"enable_stats", t(flag(), undefined, false)}
, {"max_packet_size", t(bytesize())}
, {"max_clientid_len", t(integer())}
, {"max_topic_levels", t(integer())}
, {"max_qos_allowed", t(range(0, 2))}
, {"max_topic_alias", t(integer())}
, {"retain_available", t(boolean())}
, {"wildcard_subscription", t(boolean())}
, {"shared_subscription", t(boolean())}
, {"server_keepalive", t(integer())} , {"server_keepalive", t(integer())}
, {"keepalive_backoff", t(float(), undefined, 0.75)} , {"keepalive_backoff", t(float(), undefined, 0.75)}
, {"max_subscriptions", t(integer(), undefined, 0)} , {"max_subscriptions", t(integer(), undefined, 0)}
@ -267,121 +253,150 @@ fields("zone_settings") ->
, {"retry_interval", t(duration_s(), undefined, "30s")} , {"retry_interval", t(duration_s(), undefined, "30s")}
, {"max_awaiting_rel", t(duration(), undefined, 0)} , {"max_awaiting_rel", t(duration(), undefined, 0)}
, {"await_rel_timeout", t(duration_s(), undefined, "300s")} , {"await_rel_timeout", t(duration_s(), undefined, "300s")}
, {"ignore_loop_deliver", t(boolean())}
, {"session_expiry_interval", t(duration_s(), undefined, "2h")} , {"session_expiry_interval", t(duration_s(), undefined, "2h")}
, {"max_mqueue_len", t(integer(), undefined, 1000)} , {"max_mqueue_len", t(integer(), undefined, 1000)}
, {"mqueue_priorities", t(comma_separated_list(), undefined, "none")} , {"mqueue_priorities", t(comma_separated_list(), undefined, "none")}
, {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)} , {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)}
, {"mqueue_store_qos0", t(boolean(), undefined, true)} , {"mqueue_store_qos0", t(boolean(), undefined, true)}
, {"enable_flapping_detect", t(flag(), undefined, false)}
, {"rate_limit", ref("rate_limit")}
, {"conn_congestion", ref("conn_congestion")}
, {"quota", ref("quota")}
, {"force_gc_policy", t(bar_separated_list())}
, {"force_shutdown_policy", t(bar_separated_list(), undefined, "default")}
, {"mountpoint", t(string())}
, {"use_username_as_clientid", t(boolean(), undefined, false)} , {"use_username_as_clientid", t(boolean(), undefined, false)}
, {"strict_mode", t(boolean(), undefined, false)} , {"peer_cert_as_username", maybe_disabled(union([cn, dn, crt, pem, md5]))}
, {"response_information", t(string())} , {"peer_cert_as_clientid", maybe_disabled(union([cn, dn, crt, pem, md5]))}
, {"bypass_auth_plugins", t(boolean(), undefined, false)} ];
fields("zone") ->
[ {"$name", ref("zone_settings")}];
fields("zone_settings") ->
[ {"mqtt", ref("mqtt")}
, {"acl", ref("acl")}
, {"auth", ref("auth")}
, {"stats", ref("stats")}
, {"flapping_detect", ref("flapping_detect")}
, {"force_shutdown", ref("force_shutdown")}
, {"conn_congestion", ref("conn_congestion")}
, {"force_gc", ref("force_gc")}
, {"overall_max_connections", t(integer(), undefined, 2048000)}
, {"listeners", t("listeners")}
]; ];
fields("rate_limit") -> fields("rate_limit") ->
[ {"conn_messages_in", t(comma_separated_list())} [ {"max_conn_rate", maybe_infinity(integer(), 1000)}
, {"conn_messages_in", t(comma_separated_list())}
, {"conn_bytes_in", t(comma_separated_list())} , {"conn_bytes_in", t(comma_separated_list())}
, {"quota", ref("rate_limit_quota")}
]; ];
fields("conn_congestion") -> fields("rate_limit_quota") ->
[ {"alarm", t(flag(), undefined, false)}
, {"min_alarm_sustain_duration", t(duration(), undefined, "1m")}
];
fields("quota") ->
[ {"conn_messages_routing", t(comma_separated_list())} [ {"conn_messages_routing", t(comma_separated_list())}
, {"overall_messages_routing", t(comma_separated_list())} , {"overall_messages_routing", t(comma_separated_list())}
]; ];
fields("listener") -> fields("flapping_detect") ->
[ {"tcp", ref("tcp_listener")} [ {"enable", t(boolean(), undefined, true)}
, {"ssl", ref("ssl_listener")} , {"max_count", t(integer(), undefined, 15)}
, {"ws", ref("ws_listener")} , {"window_time", t(duration(), undefined, "1m")}
, {"wss", ref("wss_listener")} , {"ban_time", t(duration(), undefined, "5m")}
]; ];
fields("tcp_listener") -> fields("force_shutdown") ->
[ {"$name", ref("tcp_listener_settings")}]; [ {"enable", t(boolean(), undefined, true)}
, {"max_message_queue_len", t(range(0, inf), undefined, 1000)}
, {"max_heap_size", t(wordsize(), undefined, "32MB", undefined,
fun(Siz) ->
MaxSiz = case erlang:system_info(wordsize) of
8 -> % arch_64
(1 bsl 59) - 1;
4 -> % arch_32
(1 bsl 27) - 1
end,
case Siz > MaxSiz of
true ->
error(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz]));
false ->
ok
end
end)}
];
fields("ssl_listener") -> fields("conn_congestion") ->
[ {"$name", ref("ssl_listener_settings")}]; [ {"enable_alarm", t(flag(), undefined, false)}
, {"min_alarm_sustain_duration", t(duration(), undefined, "1m")}
];
fields("ws_listener") -> fields("force_gc") ->
[ {"$name", ref("ws_listener_settings")}]; [ {"enable", t(boolean(), undefined, true)}
, {"count", t(range(0, inf), undefined, 16000)}
, {"bytes", t(bytesize(), undefined, "16MB")}
];
fields("wss_listener") -> fields("listeners") ->
[ {"$name", ref("wss_listener_settings")}]; [ {"$name", hoconsc:union(
[ ref("mqtt_tcp_listener")
, ref("mqtt_ssl_listener")
, ref("mqtt_ws_listener")
, ref("mqtt_wss_listener")
])}
];
fields("listener_settings") -> fields("mqtt_tcp_listener") ->
[ {"endpoint", t(union(ip_port(), integer()))} [ {"type", t(typerefl:atom(tcp))}
, {"acceptors", t(integer(), undefined, 8)} , {"tcp", ref("tcp_opts")}
, {"max_connections", t(integer(), undefined, 1024)} ] ++ mqtt_listener();
, {"max_conn_rate", t(integer())}
, {"active_n", t(integer(), undefined, 100)} fields("mqtt_ssl_listener") ->
, {"zone", t(string())} [ {"type", t(typerefl:atom(ssl))}
, {"rate_limit", t(comma_separated_list())} , {"ssl", ref("ssl_opts")}
, {"access", ref("access")} , {"tcp", ref("tcp_opts")}
, {"proxy_protocol", t(flag())} ] ++ mqtt_listener();
, {"proxy_protocol_timeout", t(duration())}
fields("mqtt_ws_listener") ->
[ {"type", t(typerefl:atom(ws))}
, {"tcp", ref("tcp_opts")}
, {"websocket", ref("ws_opts")}
] ++ mqtt_listener();
fields("mqtt_wss_listener") ->
[ {"type", t(typerefl:atom(ws))}
, {"tcp", ref("tcp_opts")}
, {"ssl", ref("ssl_opts")}
, {"websocket", ref("ws_opts")}
] ++ mqtt_listener();
fields("ws_opts") ->
[ {"mqtt_path", t(string(), undefined, "/mqtt")}
, {"mqtt_piggyback", t(union(single, multiple), undefined, multiple)}
, {"compress", t(boolean())}
, {"idle_timeout", t(duration())}
, {"max_frame_size", t(integer())}
, {"fail_if_no_subprotocol", t(boolean(), undefined, true)}
, {"supported_subprotocols", t(string(), undefined,
"mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5")}
, {"check_origin_enable", t(boolean(), undefined, false)}
, {"allow_origin_absence", t(boolean(), undefined, true)}
, {"check_origins", t(comma_separated_list())}
, {"proxy_address_header", t(string(), undefined, "x-forwarded-for")}
, {"proxy_port_header", t(string(), undefined, "x-forwarded-port")}
, {"deflate_opts", ref("deflate_opts")}
];
fields("tcp_opts") ->
[ {"active_n", t(integer(), undefined, 100)}
, {"backlog", t(integer(), undefined, 1024)} , {"backlog", t(integer(), undefined, 1024)}
, {"send_timeout", t(duration(), undefined, "15s")} , {"send_timeout", t(duration(), undefined, "15s")}
, {"send_timeout_close", t(flag(), undefined, true)} , {"send_timeout_close", t(flag(), undefined, true)}
, {"recbuf", t(bytesize())} , {"recbuf", t(bytesize())}
, {"sndbuf", t(bytesize())} , {"sndbuf", t(bytesize())}
, {"buffer", t(bytesize())} , {"buffer", t(bytesize())}
, {"high_watermark", t(bytesize(), undefined, "1MB")}
, {"tune_buffer", t(flag())} , {"tune_buffer", t(flag())}
, {"high_watermark", t(bytesize(), undefined, "1MB")}
, {"nodelay", t(boolean())} , {"nodelay", t(boolean())}
, {"reuseaddr", t(boolean())} , {"reuseaddr", t(boolean())}
]; ];
fields("tcp_listener_settings") -> fields("ssl_opts") ->
[ {"peer_cert_as_username", t(cn)} ssl(#{handshake_timeout => "15s"
, {"peer_cert_as_clientid", t(cn)} , depth => 10
] ++ fields("listener_settings"); , reuse_sessions => true});
fields("ssl_listener_settings") ->
[ {"peer_cert_as_username", t(union([cn, dn, crt, pem, md5]))}
, {"peer_cert_as_clientid", t(union([cn, dn, crt, pem, md5]))}
] ++
ssl(undefined, #{handshake_timeout => "15s"
, depth => 10
, reuse_sessions => true}) ++ fields("listener_settings");
fields("ws_listener_settings") ->
[ {"mqtt_path", t(string(), undefined, "/mqtt")}
, {"fail_if_no_subprotocol", t(boolean(), undefined, true)}
, {"supported_subprotocols", t(string(), undefined, "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5")}
, {"proxy_address_header", t(string(), undefined, "x-forwarded-for")}
, {"proxy_port_header", t(string(), undefined, "x-forwarded-port")}
, {"compress", t(boolean())}
, {"deflate_opts", ref("deflate_opts")}
, {"idle_timeout", t(duration())}
, {"max_frame_size", t(integer())}
, {"mqtt_piggyback", t(union(single, multiple), undefined, multiple)}
, {"check_origin_enable", t(boolean(), undefined, false)}
, {"allow_origin_absence", t(boolean(), undefined, true)}
, {"check_origins", t(comma_separated_list())}
% @fixme
] ++ lists:keydelete("high_watermark", 1, fields("tcp_listener_settings"));
fields("wss_listener_settings") ->
% @fixme
Ssl = ssl(undefined, #{depth => 10
, reuse_sessions => true}) ++ fields("listener_settings"),
Settings = lists:ukeymerge(1, Ssl, fields("ws_listener_settings")),
lists:keydelete("high_watermark", 1, Settings);
fields("access") ->
[ {"$id", t(string(), undefined, undefined)}];
fields("deflate_opts") -> fields("deflate_opts") ->
[ {"level", t(union([none, default, best_compression, best_speed]))} [ {"level", t(union([none, default, best_compression, best_speed]))}
@ -414,7 +429,6 @@ fields("subscription_settings") ->
, {"rh", t(range(0, 2), undefined, 0)} , {"rh", t(range(0, 2), undefined, 0)}
]; ];
fields("rewrite") -> fields("rewrite") ->
[ {"rule", ref("rule")} [ {"rule", ref("rule")}
, {"pub_rule", ref("rule")} , {"pub_rule", ref("rule")}
@ -431,15 +445,13 @@ fields("plugins") ->
]; ];
fields("broker") -> fields("broker") ->
[ {"sys_interval", t(duration(), "emqx.broker_sys_interval", "1m")} [ {"sys_msg_interval", maybe_disabled(duration(), "1m")}
, {"sys_heartbeat", t(duration(), "emqx.broker_sys_heartbeat", "30s")} , {"sys_heartbeat_interval", maybe_disabled(duration(), "30s")}
, {"enable_session_registry", t(flag(), "emqx.enable_session_registry", true)} , {"enable_session_registry", t(flag(), undefined, true)}
, {"session_locking_strategy", t(union([local, leader, quorum, all]), , {"session_locking_strategy", t(union([local, leader, quorum, all]), undefined, quorum)}
"emqx.session_locking_strategy", quorum)} , {"shared_subscription_strategy", t(union(random, round_robin), undefined, round_robin)}
, {"shared_subscription_strategy", t(union(random, round_robin), , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)}
"emqx.shared_subscription_strategy", round_robin)} , {"route_batch_clean", t(flag(), undefined, true)}
, {"shared_dispatch_ack_enabled", t(boolean(), "emqx.shared_dispatch_ack_enabled", false)}
, {"route_batch_clean", t(flag(), "emqx.route_batch_clean", true)}
, {"perf", ref("perf")} , {"perf", ref("perf")}
]; ];
@ -449,14 +461,22 @@ fields("perf") ->
]; ];
fields("sysmon") -> fields("sysmon") ->
[ {"long_gc", t(duration(), undefined, 0)} [ {"vm", ref("sysmon_vm")}
, {"long_schedule", t(duration(), undefined, 240)} , {"os", ref("sysmon_os")}
, {"large_heap", t(bytesize(), undefined, "8MB")} ];
fields("sysmon_vm") ->
[ {"process_check_interval", t(duration_s(), undefined, 30)}
, {"process_high_watermark", t(percent(), undefined, "80%")}
, {"process_low_watermark", t(percent(), undefined, "60%")}
, {"long_gc", maybe_disabled(duration())}
, {"long_schedule", maybe_disabled(duration(), 240)}
, {"large_heap", maybe_disabled(bytesize(), "8MB")}
, {"busy_dist_port", t(boolean(), undefined, true)} , {"busy_dist_port", t(boolean(), undefined, true)}
, {"busy_port", t(boolean(), undefined, false)} , {"busy_port", t(boolean(), undefined, false)}
]; ];
fields("os_mon") -> fields("sysmon_os") ->
[ {"cpu_check_interval", t(duration_s(), undefined, 60)} [ {"cpu_check_interval", t(duration_s(), undefined, 60)}
, {"cpu_high_watermark", t(percent(), undefined, "80%")} , {"cpu_high_watermark", t(percent(), undefined, "80%")}
, {"cpu_low_watermark", t(percent(), undefined, "60%")} , {"cpu_low_watermark", t(percent(), undefined, "60%")}
@ -465,12 +485,6 @@ fields("os_mon") ->
, {"procmem_high_watermark", t(percent(), undefined, "5%")} , {"procmem_high_watermark", t(percent(), undefined, "5%")}
]; ];
fields("vm_mon") ->
[ {"check_interval", t(duration_s(), undefined, 30)}
, {"process_high_watermark", t(percent(), undefined, "80%")}
, {"process_low_watermark", t(percent(), undefined, "60%")}
];
fields("alarm") -> fields("alarm") ->
[ {"actions", t(comma_separated_list(), undefined, "log,publish")} [ {"actions", t(comma_separated_list(), undefined, "log,publish")}
, {"size_limit", t(integer(), undefined, 1000)} , {"size_limit", t(integer(), undefined, 1000)}
@ -487,6 +501,15 @@ fields(ExtraField) ->
Mod = list_to_atom(ExtraField++"_schema"), Mod = list_to_atom(ExtraField++"_schema"),
Mod:fields(ExtraField). Mod:fields(ExtraField).
mqtt_listener() ->
[ {"bind", t(union(ip_port(), integer()))}
, {"acceptors", t(integer(), undefined, 8)}
, {"max_connections", t(integer(), undefined, 1024000)}
, {"access", t(list(string()))}
, {"proxy_protocol", t(flag())}
, {"proxy_protocol_timeout", t(duration())}
].
translations() -> ["ekka", "vm_args", "gen_rpc", "kernel", "emqx"]. translations() -> ["ekka", "vm_args", "gen_rpc", "kernel", "emqx"].
translation("ekka") -> translation("ekka") ->
@ -509,9 +532,6 @@ translation("emqx") ->
, {"zones", fun tr_zones/1} , {"zones", fun tr_zones/1}
, {"listeners", fun tr_listeners/1} , {"listeners", fun tr_listeners/1}
, {"modules", fun tr_modules/1} , {"modules", fun tr_modules/1}
, {"sysmon", fun tr_sysmon/1}
, {"os_mon", fun tr_os_mon/1}
, {"vm_mon", fun tr_vm_mon/1}
, {"alarm", fun tr_alarm/1} , {"alarm", fun tr_alarm/1}
, {"telemetry", fun tr_telemetry/1} , {"telemetry", fun tr_telemetry/1}
]. ].
@ -527,19 +547,6 @@ tr_heart(Conf) ->
_ -> undefined _ -> undefined
end. end.
%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
node__dist_buffer_size(type) -> bytesize();
node__dist_buffer_size(validator) ->
fun(ZDBBL) ->
case ZDBBL >= 1024 andalso ZDBBL =< 2147482624 of
true ->
ok;
false ->
{error, "must be between 1KB and 2097151KB"}
end
end;
node__dist_buffer_size(_) -> undefined.
tr_zdbbl(Conf) -> tr_zdbbl(Conf) ->
case conf_get("node.dist_buffer_size", Conf) of case conf_get("node.dist_buffer_size", Conf) of
undefined -> undefined; undefined -> undefined;
@ -834,25 +841,6 @@ tr_modules(Conf) ->
[{emqx_mod_acl_internal, [{acl_file, conf_get("acl.acl_file", Conf)}]}] [{emqx_mod_acl_internal, [{acl_file, conf_get("acl.acl_file", Conf)}]}]
]). ]).
tr_sysmon(Conf) ->
Keys = maps:to_list(conf_get("sysmon", Conf, #{})),
[{binary_to_atom(K), maps:get(value, V)} || {K, V} <- Keys].
tr_os_mon(Conf) ->
[{cpu_check_interval, conf_get("os_mon.cpu_check_interval", Conf)}
, {cpu_high_watermark, conf_get("os_mon.cpu_high_watermark", Conf) * 100}
, {cpu_low_watermark, conf_get("os_mon.cpu_low_watermark", Conf) * 100}
, {mem_check_interval, conf_get("os_mon.mem_check_interval", Conf)}
, {sysmem_high_watermark, conf_get("os_mon.sysmem_high_watermark", Conf) * 100}
, {procmem_high_watermark, conf_get("os_mon.procmem_high_watermark", Conf) * 100}
].
tr_vm_mon(Conf) ->
[ {check_interval, conf_get("vm_mon.check_interval", Conf)}
, {process_high_watermark, conf_get("vm_mon.process_high_watermark", Conf) * 100}
, {process_low_watermark, conf_get("vm_mon.process_low_watermark", Conf) * 100}
].
tr_alarm(Conf) -> tr_alarm(Conf) ->
[ {actions, [list_to_atom(Action) || Action <- conf_get("alarm.actions", Conf)]} [ {actions, [list_to_atom(Action) || Action <- conf_get("alarm.actions", Conf)]}
, {size_limit, conf_get("alarm.size_limit", Conf)} , {size_limit, conf_get("alarm.size_limit", Conf)}
@ -966,46 +954,7 @@ rate_limit_num_dur([L, D]) ->
map_zones(_, undefined) -> map_zones(_, undefined) ->
{undefined, undefined}; {undefined, undefined};
map_zones("force_gc_policy", [Count, Bytes]) ->
GcPolicy = case to_bytesize(Bytes) of
{error, Reason} ->
error({bytesize, Reason});
{ok, Bytes1} ->
#{bytes => Bytes1,
count => list_to_integer(Count)}
end,
{force_gc_policy, GcPolicy};
map_zones("force_shutdown_policy", ["default"]) ->
WordSize = erlang:system_info(wordsize),
{DefaultLen, DefaultSize} =
case WordSize of
8 -> % arch_64
{10000, hocon_postprocess:bytesize("64MB")};
4 -> % arch_32
{1000, hocon_postprocess:bytesize("32MB")}
end,
{force_shutdown_policy, #{message_queue_len => DefaultLen,
max_heap_size => DefaultSize div WordSize
}};
map_zones("force_shutdown_policy", [Len, Siz]) ->
WordSize = erlang:system_info(wordsize),
MaxSiz = case WordSize of
8 -> % arch_64
(1 bsl 59) - 1;
4 -> % arch_32
(1 bsl 27) - 1
end,
ShutdownPolicy =
case to_bytesize(Siz) of
{error, Reason} ->
error(Reason);
{ok, Siz1} when Siz1 > MaxSiz ->
error(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz]));
{ok, Siz1} ->
#{message_queue_len => list_to_integer(Len),
max_heap_size => Siz1 div WordSize}
end,
{force_shutdown_policy, ShutdownPolicy};
map_zones("mqueue_priorities", Val) -> map_zones("mqueue_priorities", Val) ->
case Val of case Val of
["none"] -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE ["none"] -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE
@ -1019,8 +968,6 @@ map_zones("mqueue_priorities", Val) ->
end, #{}, Val), end, #{}, Val),
{mqueue_priorities, MqueuePriorities} {mqueue_priorities, MqueuePriorities}
end; end;
map_zones("mountpoint", Val) ->
{mountpoint, iolist_to_binary(Val)};
map_zones("response_information", Val) -> map_zones("response_information", Val) ->
{response_information, iolist_to_binary(Val)}; {response_information, iolist_to_binary(Val)};
map_zones("rate_limit", Conf) -> map_zones("rate_limit", Conf) ->
@ -1094,41 +1041,34 @@ filter(Opts) ->
[{K, V} || {K, V} <- Opts, V =/= undefined]. [{K, V} || {K, V} <- Opts, V =/= undefined].
%% generate a ssl field. %% generate a ssl field.
%% ssl("emqx", #{"verify" => verify_peer}) will return %% ssl(#{"verify" => verify_peer}) will return:
%% [ {"cacertfile", t(string(), "emqx.cacertfile", undefined)} %% [ {"cacertfile", t(string(), undefined, undefined)}
%% , {"certfile", t(string(), "emqx.certfile", undefined)} %% , {"certfile", t(string(), undefined, undefined)}
%% , {"keyfile", t(string(), "emqx.keyfile", undefined)} %% , {"keyfile", t(string(), undefined, undefined)}
%% , {"verify", t(union(verify_peer, verify_none), "emqx.verify", verify_peer)} %% , {"verify", t(union(verify_peer, verify_none), undefined, verify_peer)}
%% , {"server_name_indication", "emqx.server_name_indication", undefined)} %% , {"server_name_indication", undefined, undefined)}
%% ... %% ...]
ssl(Mapping, Defaults) -> ssl(Defaults) ->
M = fun (Field) ->
case (Mapping) of
undefined -> undefined;
_ -> Mapping ++ "." ++ Field
end end,
D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end, D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end,
[ {"enable", t(flag(), M("enable"), D("enable"))} [ {"cacertfile", t(string(), undefined, D("cacertfile"))}
, {"cacertfile", t(string(), M("cacertfile"), D("cacertfile"))} , {"certfile", t(string(), undefined, D("certfile"))}
, {"certfile", t(string(), M("certfile"), D("certfile"))} , {"keyfile", t(string(), undefined, D("keyfile"))}
, {"keyfile", t(string(), M("keyfile"), D("keyfile"))} , {"verify", t(union(verify_peer, verify_none), undefined, D("verify"))}
, {"verify", t(union(verify_peer, verify_none), M("verify"), D("verify"))} , {"fail_if_no_peer_cert", t(boolean(), undefined, D("fail_if_no_peer_cert"))}
, {"fail_if_no_peer_cert", t(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))} , {"secure_renegotiate", t(flag(), undefined, D("secure_renegotiate"))}
, {"secure_renegotiate", t(flag(), M("secure_renegotiate"), D("secure_renegotiate"))} , {"reuse_sessions", t(flag(), undefined, D("reuse_sessions"))}
, {"reuse_sessions", t(flag(), M("reuse_sessions"), D("reuse_sessions"))} , {"honor_cipher_order", t(flag(), undefined, D("honor_cipher_order"))}
, {"honor_cipher_order", t(flag(), M("honor_cipher_order"), D("honor_cipher_order"))} , {"handshake_timeout", t(duration(), undefined, D("handshake_timeout"))}
, {"handshake_timeout", t(duration(), M("handshake_timeout"), D("handshake_timeout"))} , {"depth", t(integer(), undefined, D("depth"))}
, {"depth", t(integer(), M("depth"), D("depth"))} , {"password", hoconsc:t(string(), #{default => D("key_password"),
, {"password", hoconsc:t(string(), #{mapping => M("key_password"),
default => D("key_password"),
sensitive => true sensitive => true
})} })}
, {"dhfile", t(string(), M("dhfile"), D("dhfile"))} , {"dhfile", t(string(), undefined, D("dhfile"))}
, {"server_name_indication", t(union(disable, string()), M("server_name_indication"), , {"server_name_indication", t(union(disable, string()), undefined,
D("server_name_indication"))} D("server_name_indication"))}
, {"tls_versions", t(comma_separated_list(), M("tls_versions"), D("tls_versions"))} , {"tls_versions", t(comma_separated_list(), undefined, D("tls_versions"))}
, {"ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))} , {"ciphers", t(comma_separated_list(), undefined, D("ciphers"))}
, {"psk_ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))}]. , {"psk_ciphers", t(comma_separated_list(), undefined, D("ciphers"))}].
tr_ssl(Field, Conf) -> tr_ssl(Field, Conf) ->
Versions = case conf_get([Field, "tls_versions"], Conf) of Versions = case conf_get([Field, "tls_versions"], Conf) of
@ -1220,9 +1160,31 @@ t(Type, Mapping, Default, OverrideEnv) ->
, override_env => OverrideEnv , override_env => OverrideEnv
}). }).
t(Type, Mapping, Default, OverrideEnv, Validator) ->
hoconsc:t(Type, #{ mapping => Mapping
, default => Default
, override_env => OverrideEnv
, validator => Validator
}).
ref(Field) -> ref(Field) ->
fun (type) -> Field; (_) -> undefined end. fun (type) -> Field; (_) -> undefined end.
maybe_disabled(T) ->
maybe_sth(disabled, T, disabled).
maybe_disabled(T, Default) ->
maybe_sth(disabled, T, Default).
% maybe_infinity(T) ->
% maybe_sth(infinity, T, infinity).
maybe_infinity(T, Default) ->
maybe_sth(infinity, T, Default).
maybe_sth(What, Type, Default) ->
t(union([What, Type]), undefined, Default).
to_flag(Str) -> to_flag(Str) ->
{ok, hocon_postprocess:onoff(Str)}. {ok, hocon_postprocess:onoff(Str)}.
@ -1250,6 +1212,13 @@ to_bytesize(Str) ->
_ -> {error, Str} _ -> {error, Str}
end. end.
to_wordsize(Str) ->
WordSize = erlang:system_info(wordsize),
case to_bytesize(Str) of
{ok, Bytes} -> Bytes div WordSize;
Error -> Error
end.
to_percent(Str) -> to_percent(Str) ->
{ok, hocon_postprocess:percent(Str)}. {ok, hocon_postprocess:percent(Str)}.