diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 271558f6d..bb3a588a9 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -15,7 +15,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.14.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.15.0"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 812e52f9b..fe4439aaa 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -49,6 +49,10 @@ -typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}). -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). +-export([ validate_heap_size/1 + , parse_user_lookup_fun/1 + ]). + % workaround: prevent being recognized as unused functions -export([to_duration/1, to_duration_s/1, to_duration_ms/1, to_bytesize/1, to_wordsize/1, @@ -65,204 +69,539 @@ cipher/0, comma_separated_atoms/0]). --export([roots/0, fields/1]). --export([t/1, t/3, t/4, ref/1]). +-export([namespace/0, roots/0, fields/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([ssl/1]). +namespace() -> undefined. + roots() -> - ["zones", "mqtt", "flapping_detect", "force_shutdown", "force_gc", - "conn_congestion", "rate_limit", "quota", "listeners", "broker", "plugins", - "stats", "sysmon", "alarm", "authorization"]. + ["zones", + "mqtt", + "flapping_detect", + "force_shutdown", + "force_gc", + "conn_congestion", + "rate_limit", + "quota", + {"listeners", + sc(ref("listeners"), + #{ desc => "MQTT listeners identified by their protocol type and assigned names. " + "The listeners enabled by default are named with 'default'"}) + }, + "broker", + "plugins", + "stats", + "sysmon", + "alarm" + ]. fields("stats") -> - [ {"enable", t(boolean(), undefined, true)} + [ {"enable", + sc(boolean(), + #{ default => true + })} ]; fields("authorization") -> - [ {"no_match", t(union(allow, deny), undefined, allow)} - , {"deny_action", t(union(ignore, disconnect), undefined, ignore)} - , {"cache", ref("authorization_cache")} + [ {"no_match", + sc(union(allow, deny), + #{ default => allow + })} + , {"deny_action", + sc(union(ignore, disconnect), + #{ default => ignore + })} + , {"cache", + sc(ref("authorization_cache"), + #{ + }) + } ]; fields("authorization_cache") -> - [ {"enable", t(boolean(), undefined, true)} - , {"max_size", t(range(1, 1048576), undefined, 32)} - , {"ttl", t(duration(), undefined, "1m")} + [ {"enable", + sc(boolean(), + #{ default => true + }) + } + , {"max_size", + sc(range(1, 1048576), + #{ default => 32 + }) + } + , {"ttl", + sc(duration(), + #{ default => "1m" + }) + } ]; fields("mqtt") -> - [ {"idle_timeout", maybe_infinity(duration(), "15s")} - , {"max_packet_size", t(bytesize(), undefined, "1MB")} - , {"max_clientid_len", t(range(23, 65535), undefined, 65535)} - , {"max_topic_levels", t(range(1, 65535), undefined, 65535)} - , {"max_qos_allowed", t(range(0, 2), undefined, 2)} - , {"max_topic_alias", t(range(0, 65535), undefined, 65535)} - , {"retain_available", t(boolean(), undefined, true)} - , {"wildcard_subscription", t(boolean(), undefined, true)} - , {"shared_subscription", t(boolean(), undefined, true)} - , {"ignore_loop_deliver", t(boolean(), undefined, false)} - , {"strict_mode", t(boolean(), undefined, false)} - , {"response_information", t(string(), undefined, "")} - , {"server_keepalive", maybe_disabled(integer())} - , {"keepalive_backoff", t(float(), undefined, 0.75)} - , {"max_subscriptions", maybe_infinity(range(1, inf))} - , {"upgrade_qos", t(boolean(), undefined, false)} - , {"max_inflight", t(range(1, 65535), undefined, 32)} - , {"retry_interval", t(duration(), undefined, "30s")} - , {"max_awaiting_rel", maybe_infinity(integer(), 100)} - , {"await_rel_timeout", t(duration(), undefined, "300s")} - , {"session_expiry_interval", t(duration(), undefined, "2h")} - , {"max_mqueue_len", maybe_infinity(range(0, inf), 1000)} - , {"mqueue_priorities", maybe_disabled(map())} - , {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)} - , {"mqueue_store_qos0", t(boolean(), undefined, true)} - , {"use_username_as_clientid", t(boolean(), undefined, false)} - , {"peer_cert_as_username", maybe_disabled(union([cn, dn, crt, pem, md5]))} - , {"peer_cert_as_clientid", maybe_disabled(union([cn, dn, crt, pem, md5]))} + [ {"idle_timeout", + sc(hoconsc:union([infinity, duration()]), + #{ default => "15s" + })} + , {"max_packet_size", + sc(bytesize(), + #{ default => "1MB" + })} + , {"max_clientid_len", + sc(range(23, 65535), + #{ default => 65535 + })} + , {"max_topic_levels", + sc(range(1, 65535), + #{ default => 65535 + })} + , {"max_qos_allowed", + sc(range(0, 2), + #{ default => 2 + })} + , {"max_topic_alias", + sc(range(0, 65535), + #{ default => 65535 + })} + , {"retain_available", + sc(boolean(), + #{ default => true + })} + , {"wildcard_subscription", + sc(boolean(), + #{ default => true + })} + , {"shared_subscription", + sc(boolean(), + #{ default => true + })} + , {"ignore_loop_deliver", + sc(boolean(), + #{ default => false + })} + , {"strict_mode", + sc(boolean(), + #{default => false + }) + } + , {"response_information", + sc(string(), + #{default => "" + }) + } + , {"server_keepalive", + sc(hoconsc:union([integer(), disabled]), + #{ default => disabled + }) + } + , {"keepalive_backoff", + sc(float(), + #{default => 0.75 + }) + } + , {"max_subscriptions", + sc(hoconsc:union([range(1, inf), infinity]), + #{ default => infinity + }) + } + , {"upgrade_qos", + sc(boolean(), + #{ default => false + }) + } + , {"max_inflight", + sc(range(1, 65535), + #{ default => 32 + }) + } + , {"retry_interval", + sc(duration(), + #{default => "30s" + }) + } + , {"max_awaiting_rel", + sc(hoconsc:union([integer(), infinity]), + #{ default => 100 + }) + } + , {"await_rel_timeout", + sc(duration(), + #{ default => "300s" + }) + } + , {"session_expiry_interval", + sc(duration(), + #{ default => "2h" + }) + } + , {"max_mqueue_len", + sc(hoconsc:union([range(0, inf), infinity]), + #{ default => 1000 + }) + } + , {"mqueue_priorities", + sc(hoconsc:union([map(), disabled]), + #{ default => disabled + }) + } + , {"mqueue_default_priority", + sc(union(highest, lowest), + #{ default => lowest + }) + } + , {"mqueue_store_qos0", + sc(boolean(), + #{ default => true + }) + } + , {"use_username_as_clientid", + sc(boolean(), + #{ default => false + }) + } + , {"peer_cert_as_username", + sc(hoconsc:union([disabled, cn, dn, crt, pem, md5]), + #{ default => disabled + })} + , {"peer_cert_as_clientid", + sc(hoconsc:union([disabled, cn, dn, crt, pem, md5]), + #{ default => disabled + })} ]; fields("zones") -> - [ {"$name", ref("zone_settings")}]; + [ {"$name", + sc(ref("zone_settings"), + #{ + } + )}]; fields("zone_settings") -> Fields = ["mqtt", "stats", "authorization", "flapping_detect", "force_shutdown", "conn_congestion", "rate_limit", "quota", "force_gc"], - [{F, ref("strip_default:" ++ F)} || F <- Fields]; + [{F, ref(emqx_zone_schema, F)} || F <- Fields]; fields("rate_limit") -> - [ {"max_conn_rate", maybe_infinity(integer(), 1000)} - , {"conn_messages_in", maybe_infinity(comma_separated_list())} - , {"conn_bytes_in", maybe_infinity(comma_separated_list())} + [ {"max_conn_rate", + sc(hoconsc:union([infinity, integer()]), + #{ default => 1000 + }) + } + , {"conn_messages_in", + sc(hoconsc:union([infinity, comma_separated_list()]), + #{ default => infinity + }) + } + , {"conn_bytes_in", + sc(hoconsc:union([infinity, comma_separated_list()]), + #{ default => infinity + }) + } ]; fields("quota") -> - [ {"conn_messages_routing", maybe_infinity(comma_separated_list())} - , {"overall_messages_routing", maybe_infinity(comma_separated_list())} + [ {"conn_messages_routing", + sc(hoconsc:union([infinity, comma_separated_list()]), + #{ default => infinity + }) + } + , {"overall_messages_routing", + sc(hoconsc:union([infinity, comma_separated_list()]), + #{ default => infinity + }) + } ]; fields("flapping_detect") -> - [ {"enable", t(boolean(), undefined, false)} - , {"max_count", t(integer(), undefined, 15)} - , {"window_time", t(duration(), undefined, "1m")} - , {"ban_time", t(duration(), undefined, "5m")} + [ {"enable", + sc(boolean(), + #{ default => false + })} + , {"max_count", + sc(integer(), + #{ default => 15 + })} + , {"window_time", + sc(duration(), + #{ default => "1m" + })} + , {"ban_time", + sc(duration(), + #{ default => "5m" + })} ]; fields("force_shutdown") -> - [ {"enable", t(boolean(), undefined, true)} - , {"max_message_queue_len", t(range(0, inf), undefined, 1000)} - , {"max_heap_size", t(wordsize(), undefined, "32MB", undefined, - fun(Siz) -> - MaxSiz = case erlang:system_info(wordsize) of - 8 -> % arch_64 - (1 bsl 59) - 1; - 4 -> % arch_32 - (1 bsl 27) - 1 - end, - case Siz > MaxSiz of - true -> - error(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); - false -> - ok - end - end)} + [ {"enable", + sc(boolean(), + #{ default => true})} + , {"max_message_queue_len", + sc(range(0, inf), + #{ default => 1000 + })} + , {"max_heap_size", + sc(wordsize(), + #{ default => "32MB", + validator => fun ?MODULE:validate_heap_size/1 + })} ]; fields("conn_congestion") -> - [ {"enable_alarm", t(boolean(), undefined, false)} - , {"min_alarm_sustain_duration", t(duration(), undefined, "1m")} + [ {"enable_alarm", + sc(boolean(), + #{ default => false + })} + , {"min_alarm_sustain_duration", + sc(duration(), + #{ default => "1m" + })} ]; fields("force_gc") -> - [ {"enable", t(boolean(), undefined, true)} - , {"count", t(range(0, inf), undefined, 16000)} - , {"bytes", t(bytesize(), undefined, "16MB")} + [ {"enable", + sc(boolean(), + #{ default => true + })} + , {"count", + sc(range(0, inf), + #{ default => 16000 + })} + , {"bytes", + sc(bytesize(), + #{ default => "16MB" + })} ]; fields("listeners") -> - [ {"tcp", ref("t_tcp_listeners")} - , {"ssl", ref("t_ssl_listeners")} - , {"ws", ref("t_ws_listeners")} - , {"wss", ref("t_wss_listeners")} - , {"quic", ref("t_quic_listeners")} + [ {"tcp", + sc(ref("tcp_listeners"), + #{ desc => "TCP listeners" + }) + } + , {"ssl", + sc(ref("ssl_listeners"), + #{ desc => "SSL listeners" + }) + } + , {"ws", + sc(ref("ws_listeners"), + #{ desc => "HTTP websocket listeners" + }) + } + , {"wss", + sc(ref("wss_listeners"), + #{ desc => "HTTPS websocket listeners" + }) + } + , {"quic", + sc(ref("quic_listeners"), + #{ desc => "QUIC listeners" + }) + } ]; -fields("t_tcp_listeners") -> +fields("tcp_listeners") -> [ {"$name", ref("mqtt_tcp_listener")} ]; -fields("t_ssl_listeners") -> +fields("ssl_listeners") -> [ {"$name", ref("mqtt_ssl_listener")} ]; -fields("t_ws_listeners") -> +fields("ws_listeners") -> [ {"$name", ref("mqtt_ws_listener")} ]; -fields("t_wss_listeners") -> +fields("wss_listeners") -> [ {"$name", ref("mqtt_wss_listener")} ]; -fields("t_quic_listeners") -> +fields("quic_listeners") -> [ {"$name", ref("mqtt_quic_listener")} ]; fields("mqtt_tcp_listener") -> - [ {"tcp", ref("tcp_opts")} + [ {"tcp", + sc(ref("tcp_opts"), + #{ desc => "TCP listener options" + }) + } ] ++ mqtt_listener(); fields("mqtt_ssl_listener") -> - [ {"tcp", ref("tcp_opts")} - , {"ssl", ref("ssl_opts")} + [ {"tcp", + sc(ref("tcp_opts"), + #{}) + } + , {"ssl", + sc(ref("listener_ssl_opts"), + #{}) + } ] ++ mqtt_listener(); fields("mqtt_ws_listener") -> - [ {"tcp", ref("tcp_opts")} - , {"websocket", ref("ws_opts")} + [ {"tcp", + sc(ref("tcp_opts"), + #{}) + } + , {"websocket", + sc(ref("ws_opts"), + #{}) + } ] ++ mqtt_listener(); fields("mqtt_wss_listener") -> - [ {"tcp", ref("tcp_opts")} - , {"ssl", ref("ssl_opts")} - , {"websocket", ref("ws_opts")} + [ {"tcp", + sc(ref("tcp_opts"), + #{}) + } + , {"ssl", + sc(ref("listener_ssl_opts"), + #{}) + } + , {"websocket", + sc(ref("ws_opts"), + #{}) + } ] ++ mqtt_listener(); fields("mqtt_quic_listener") -> - [ {"enabled", t(boolean(), undefined, true)} - , {"certfile", t(string(), undefined, undefined)} - , {"keyfile", t(string(), undefined, undefined)} - , {"ciphers", t(comma_separated_list(), undefined, "TLS_AES_256_GCM_SHA384," - "TLS_AES_128_GCM_SHA256,TLS_CHACHA20_POLY1305_SHA256")} - , {"idle_timeout", t(duration(), undefined, "15s")} + [ {"enabled", + sc(boolean(), + #{ default => true + }) + } + , {"certfile", + sc(string(), + #{}) + } + , {"keyfile", + sc(string(), + #{}) + } + , {"ciphers", + sc(comma_separated_list(), + #{ default => "TLS_AES_256_GCM_SHA384,TLS_AES_128_GCM_SHA256," + "TLS_CHACHA20_POLY1305_SHA256" + })} + , {"idle_timeout", + sc(duration(), + #{ default => "15s" + }) + } ] ++ base_listener(); fields("ws_opts") -> - [ {"mqtt_path", t(string(), undefined, "/mqtt")} - , {"mqtt_piggyback", t(union(single, multiple), undefined, multiple)} - , {"compress", t(boolean(), undefined, false)} - , {"idle_timeout", t(duration(), undefined, "15s")} - , {"max_frame_size", maybe_infinity(integer())} - , {"fail_if_no_subprotocol", t(boolean(), undefined, true)} - , {"supported_subprotocols", t(comma_separated_list(), undefined, - "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5")} - , {"check_origin_enable", t(boolean(), undefined, false)} - , {"allow_origin_absence", t(boolean(), undefined, true)} - , {"check_origins", t(hoconsc:array(binary()), undefined, [])} - , {"proxy_address_header", t(string(), undefined, "x-forwarded-for")} - , {"proxy_port_header", t(string(), undefined, "x-forwarded-port")} - , {"deflate_opts", ref("deflate_opts")} + [ {"mqtt_path", + sc(string(), + #{ default => "/mqtt" + }) + } + , {"mqtt_piggyback", + sc(hoconsc:union([single, multiple]), + #{ default => multiple + }) + } + , {"compress", + sc(boolean(), + #{ default => false + }) + } + , {"idle_timeout", + sc(duration(), + #{ default => "15s" + }) + } + , {"max_frame_size", + sc(hoconsc:union([infinity, integer()]), + #{ default => infinity + }) + } + , {"fail_if_no_subprotocol", + sc(boolean(), + #{ default => true + }) + } + , {"supported_subprotocols", + sc(comma_separated_list(), + #{ default => "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5" + }) + } + , {"check_origin_enable", + sc(boolean(), + #{ default => false + }) + } + , {"allow_origin_absence", + sc(boolean(), + #{ default => true + }) + } + , {"check_origins", + sc(hoconsc:array(binary()), + #{ default => [] + }) + } + , {"proxy_address_header", + sc(string(), + #{ default => "x-forwarded-for" + }) + } + , {"proxy_port_header", + sc(string(), + #{ default => "x-forwarded-port" + }) + } + , {"deflate_opts", + sc(ref("deflate_opts"), + #{}) + } ]; fields("tcp_opts") -> - [ {"active_n", t(integer(), undefined, 100)} - , {"backlog", t(integer(), undefined, 1024)} - , {"send_timeout", t(duration(), undefined, "15s")} - , {"send_timeout_close", t(boolean(), undefined, true)} - , {"recbuf", t(bytesize())} - , {"sndbuf", t(bytesize())} - , {"buffer", t(bytesize())} - , {"high_watermark", t(bytesize(), undefined, "1MB")} - , {"nodelay", t(boolean(), undefined, false)} - , {"reuseaddr", t(boolean(), undefined, true)} + [ {"active_n", + sc(integer(), + #{ default => 100 + }) + } + , {"backlog", + sc(integer(), + #{ default => 1024 + }) + } + , {"send_timeout", + sc(duration(), + #{ default => "15s" + }) + } + , {"send_timeout_close", + sc(boolean(), + #{ default => true + }) + } + , {"recbuf", + sc(bytesize(), + #{}) + } + , {"sndbuf", + sc(bytesize(), + #{}) + } + , {"buffer", + sc(bytesize(), + #{}) + } + , {"high_watermark", + sc(bytesize(), + #{ default => "1MB"}) + } + , {"nodelay", + sc(boolean(), + #{ default => false}) + } + , {"reuseaddr", + sc(boolean(), + #{ default => true + }) + } ]; -fields("ssl_opts") -> +fields("listener_ssl_opts") -> ssl(#{handshake_timeout => "15s" , depth => 10 , reuse_sessions => true @@ -271,82 +610,237 @@ fields("ssl_opts") -> }); fields("deflate_opts") -> - [ {"level", t(union([none, default, best_compression, best_speed]))} - , {"mem_level", t(range(1, 9), undefined, 8)} - , {"strategy", t(union([default, filtered, huffman_only, rle]))} - , {"server_context_takeover", t(union(takeover, no_takeover))} - , {"client_context_takeover", t(union(takeover, no_takeover))} - , {"server_max_window_bits", t(range(8, 15), undefined, 15)} - , {"client_max_window_bits", t(range(8, 15), undefined, 15)} + [ {"level", + sc(hoconsc:union([none, default, best_compression, best_speed]), + #{}) + } + , {"mem_level", + sc(range(1, 9), + #{ default => 8 + }) + } + , {"strategy", + sc(hoconsc:union([default, filtered, huffman_only, rle]), + #{}) + } + , {"server_context_takeover", + sc(hoconsc:union([takeover, no_takeover]), + #{}) + } + , {"client_context_takeover", + sc(hoconsc:union([takeover, no_takeover]), + #{}) + } + , {"server_max_window_bits", + sc(range(8, 15), + #{ default => 15 + }) + } + , {"client_max_window_bits", + sc(range(8, 15), + #{ default => 15 + }) + } ]; fields("plugins") -> - [ {"expand_plugins_dir", t(string())} + [ {"expand_plugins_dir", + sc(string(), + #{}) + } ]; fields("broker") -> - [ {"sys_msg_interval", maybe_disabled(duration(), "1m")} - , {"sys_heartbeat_interval", maybe_disabled(duration(), "30s")} - , {"enable_session_registry", t(boolean(), undefined, true)} - , {"session_locking_strategy", t(union([local, leader, quorum, all]), undefined, quorum)} - , {"shared_subscription_strategy", t(union(random, round_robin), undefined, round_robin)} - , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} - , {"route_batch_clean", t(boolean(), undefined, true)} - , {"perf", ref("perf")} + [ {"sys_msg_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "1m" + }) + } + , {"sys_heartbeat_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "30s" + }) + } + , {"enable_session_registry", + sc(boolean(), + #{ default => true + }) + } + , {"session_locking_strategy", + sc(hoconsc:union([local, leader, quorum, all]), + #{ default => quorum + }) + } + , {"shared_subscription_strategy", + sc(hoconsc:union([random, round_robin]), + #{ default => round_robin + }) + } + , {"shared_dispatch_ack_enabled", + sc(boolean(), + #{ default => false + }) + } + , {"route_batch_clean", + sc(boolean(), + #{ default => true + })} + , {"perf", + sc(ref("broker_perf"), + #{ desc => "Broker performance tuning pamaters" + }) + } ]; -fields("perf") -> - [ {"route_lock_type", t(union([key, tab, global]), undefined, key)} - , {"trie_compaction", t(boolean(), undefined, true)} +fields("broker_perf") -> + [ {"route_lock_type", + sc(hoconsc:union([key, tab, global]), + #{ default => key + })} + , {"trie_compaction", + sc(boolean(), + #{ default => true + })} ]; fields("sysmon") -> - [ {"vm", ref("sysmon_vm")} - , {"os", ref("sysmon_os")} + [ {"vm", + sc(ref("sysmon_vm"), + #{}) + } + , {"os", + sc(ref("sysmon_os"), + #{}) + } ]; fields("sysmon_vm") -> - [ {"process_check_interval", t(duration(), undefined, "30s")} - , {"process_high_watermark", t(percent(), undefined, "80%")} - , {"process_low_watermark", t(percent(), undefined, "60%")} - , {"long_gc", maybe_disabled(duration())} - , {"long_schedule", maybe_disabled(duration(), "240ms")} - , {"large_heap", maybe_disabled(bytesize(), "32MB")} - , {"busy_dist_port", t(boolean(), undefined, true)} - , {"busy_port", t(boolean(), undefined, true)} + [ {"process_check_interval", + sc(duration(), + #{ default => "30s" + }) + } + , {"process_high_watermark", + sc(percent(), + #{ default => "80%" + }) + } + , {"process_low_watermark", + sc(percent(), + #{ default => "60%" + }) + } + , {"long_gc", + sc(hoconsc:union([disabled, duration()]), + #{}) + } + , {"long_schedule", + sc(hoconsc:union([disabled, duration()]), + #{ default => "240ms" + }) + } + , {"large_heap", + sc(hoconsc:union([disabled, bytesize()]), + #{default => "32MB"}) + } + , {"busy_dist_port", + sc(boolean(), + #{ default => true + }) + } + , {"busy_port", + sc(boolean(), + #{ default => true + })} ]; fields("sysmon_os") -> - [ {"cpu_check_interval", t(duration(), undefined, "60s")} - , {"cpu_high_watermark", t(percent(), undefined, "80%")} - , {"cpu_low_watermark", t(percent(), undefined, "60%")} - , {"mem_check_interval", maybe_disabled(duration(), "60s")} - , {"sysmem_high_watermark", t(percent(), undefined, "70%")} - , {"procmem_high_watermark", t(percent(), undefined, "5%")} + [ {"cpu_check_interval", + sc(duration(), + #{ default => "60s"}) + } + , {"cpu_high_watermark", + sc(percent(), + #{ default => "80%" + }) + } + , {"cpu_low_watermark", + sc(percent(), + #{ default => "60%" + }) + } + , {"mem_check_interval", + sc(hoconsc:union([disabled, duration()]), + #{ default => "60s" + })} + , {"sysmem_high_watermark", + sc(percent(), + #{ default => "70%" + }) + } + , {"procmem_high_watermark", + sc(percent(), + #{ default => "5%" + }) + } ]; fields("alarm") -> - [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} - , {"size_limit", t(integer(), undefined, 1000)} - , {"validity_period", t(duration(), undefined, "24h")} - ]; - -fields("strip_default:" ++ Name) -> - strip_default(fields(Name)). + [ {"actions", + sc(hoconsc:array(atom()), + #{ default => [log, publish] + }) + } + , {"size_limit", + sc(integer(), + #{ default => 1000 + }) + } + , {"validity_period", + sc(duration(), + #{ default => "24h" + }) + } + ]. mqtt_listener() -> base_listener() ++ - [ {"access_rules", t(hoconsc:array(string()))} - , {"proxy_protocol", t(boolean(), undefined, false)} - , {"proxy_protocol_timeout", t(duration())} + [ {"access_rules", + sc(hoconsc:array(string()), + #{}) + } + , {"proxy_protocol", + sc(boolean(), + #{ default => false + }) + } + , {"proxy_protocol_timeout", + sc(duration(), + #{}) + } ]. base_listener() -> - [ {"bind", hoconsc:t(union(ip_port(), integer()), #{nullable => false})} - , {"acceptors", t(integer(), undefined, 16)} - , {"max_connections", maybe_infinity(integer(), infinity)} - , {"mountpoint", t(binary(), undefined, <<>>)} - , {"zone", t(atom(), undefined, default)} + [ {"bind", + sc(hoconsc:union([ip_port(), integer()]), + #{ nullable => false + })} + , {"acceptors", + sc(integer(), + #{ default => 16 + })} + , {"max_connections", + sc(hoconsc:union([infinity, integer()]), + #{ default => infinity + })} + , {"mountpoint", + sc(binary(), + #{ default => <<>> + })} + , {"zone", + sc(atom(), + #{ default => 'default' + })} ]. %% utils @@ -372,43 +866,101 @@ conf_get(Key, Conf, Default) -> filter(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined]. -%% generate a ssl field. -%% ssl(#{"verify" => verify_peer}) will return: -%% [ {"cacertfile", t(string(), undefined, undefined)} -%% , {"certfile", t(string(), undefined, undefined)} -%% , {"keyfile", t(string(), undefined, undefined)} -%% , {"verify", t(union(verify_peer, verify_none), undefined, verify_peer)} -%% , {"server_name_indication", undefined, undefined)} -%% ...] ssl(Defaults) -> D = fun (Field) -> maps:get(to_atom(Field), Defaults, undefined) end, - [ {"enable", t(boolean(), undefined, D("enable"))} - , {"cacertfile", t(string(), undefined, D("cacertfile"))} - , {"certfile", t(string(), undefined, D("certfile"))} - , {"keyfile", t(string(), undefined, D("keyfile"))} - , {"verify", t(union(verify_peer, verify_none), undefined, D("verify"))} - , {"fail_if_no_peer_cert", t(boolean(), undefined, D("fail_if_no_peer_cert"))} - , {"secure_renegotiate", t(boolean(), undefined, D("secure_renegotiate"))} - , {"reuse_sessions", t(boolean(), undefined, D("reuse_sessions"))} - , {"honor_cipher_order", t(boolean(), undefined, D("honor_cipher_order"))} - , {"handshake_timeout", t(duration(), undefined, D("handshake_timeout"))} - , {"depth", t(integer(), undefined, D("depth"))} - , {"password", hoconsc:t(string(), #{default => D("key_password"), - sensitive => true - })} - , {"dhfile", t(string(), undefined, D("dhfile"))} - , {"server_name_indication", t(union(disable, string()), undefined, - D("server_name_indication"))} - , {"versions", #{ type => list(atom()) - , default => maps:get(versions, Defaults, default_tls_vsns()) - , converter => fun (Vsns) -> [tls_vsn(V) || V <- Vsns] end - }} - , {"ciphers", t(hoconsc:array(string()), undefined, D("ciphers"))} - , {"user_lookup_fun", t(any(), undefined, {fun emqx_psk:lookup/3, <<>>})} + [ {"enable", + sc(boolean(), + #{ default => D("enable") + }) + } + , {"cacertfile", + sc(string(), + #{ default => D("cacertfile") + }) + } + , {"certfile", + sc(string(), + #{ default => D("certfile") + }) + } + , {"keyfile", + sc(string(), + #{ default => D("keyfile") + }) + } + , {"verify", + sc(hoconsc:union([verify_peer, verify_none]), + #{ default => D("verify") + }) + } + , {"fail_if_no_peer_cert", + sc(boolean(), + #{ default => D("fail_if_no_peer_cert") + }) + } + , {"secure_renegotiate", + sc(boolean(), + #{ default => D("secure_renegotiate") + }) + } + , {"reuse_sessions", + sc(boolean(), + #{ default => D("reuse_sessions") + }) + } + , {"honor_cipher_order", + sc(boolean(), + #{ default => D("honor_cipher_order") + }) + } + , {"handshake_timeout", + sc(duration(), + #{ default => D("handshake_timeout") + }) + } + , {"depth", + sc(integer(), + #{default => D("depth") + }) + } + , {"password", + sc(string(), + #{ default => D("key_password") + , sensitive => true + }) + } + , {"dhfile", + sc(string(), + #{ default => D("dhfile") + }) + } + , {"server_name_indication", + sc(hoconsc:union([disable, string()]), + #{ default => D("server_name_indication") + }) + } + , {"versions", + sc(typerefl:alias("string", list(atom())), + #{ default => maps:get(versions, Defaults, default_tls_vsns()) + , converter => fun (Vsns) -> [tls_vsn(iolist_to_binary(V)) || V <- Vsns] end + }) + } + , {"ciphers", + sc(hoconsc:array(string()), + #{ default => D("ciphers") + }) + } + , {"user_lookup_fun", + sc(typerefl:alias("string", any()), + #{ default => "emqx_psk:lookup" + , converter => fun ?MODULE:parse_user_lookup_fun/1 + }) + } ]. %% on erl23.2.7.2-emqx-2, sufficient_crypto_support('tlsv1.3') -> false default_tls_vsns() -> [<<"tlsv1.2">>, <<"tlsv1.1">>, <<"tlsv1">>]. + tls_vsn(<<"tlsv1.3">>) -> 'tlsv1.3'; tls_vsn(<<"tlsv1.2">>) -> 'tlsv1.2'; tls_vsn(<<"tlsv1.1">>) -> 'tlsv1.1'; @@ -451,40 +1003,11 @@ ceiling(X) -> %% types -t(Type) -> hoconsc:t(Type). +sc(Type, Meta) -> hoconsc:mk(Type, Meta). -t(Type, Mapping, Default) -> - hoconsc:t(Type, #{mapping => Mapping, default => Default}). +ref(Field) -> hoconsc:ref(?MODULE, Field). -t(Type, Mapping, Default, OverrideEnv) -> - hoconsc:t(Type, #{ mapping => Mapping - , default => Default - , override_env => OverrideEnv - }). - -t(Type, Mapping, Default, OverrideEnv, Validator) -> - hoconsc:t(Type, #{ mapping => Mapping - , default => Default - , override_env => OverrideEnv - , validator => Validator - }). - -ref(Field) -> hoconsc:t(hoconsc:ref(?MODULE, Field)). - -maybe_disabled(T) -> - maybe_sth(disabled, T, disabled). - -maybe_disabled(T, Default) -> - maybe_sth(disabled, T, Default). - -maybe_infinity(T) -> - maybe_sth(infinity, T, infinity). - -maybe_infinity(T, Default) -> - maybe_sth(infinity, T, Default). - -maybe_sth(What, Type, Default) -> - t(union([What, Type]), undefined, Default). +ref(Module, Field) -> hoconsc:ref(Module, Field). to_duration(Str) -> case hocon_postprocess:duration(Str) of @@ -545,22 +1068,26 @@ to_erl_cipher_suite(Str) -> Cipher -> Cipher end. -strip_default(Fields) -> - [do_strip_default(F) || F <- Fields]. - -do_strip_default({Name, #{type := {ref, Ref}}}) -> - {Name, nullable_no_def(ref("strip_default:" ++ Ref))}; -do_strip_default({Name, #{type := {ref, _Mod, Ref}}}) -> - {Name, nullable_no_def(ref("strip_default:" ++ Ref))}; -do_strip_default({Name, Type}) -> - {Name, nullable_no_def(Type)}. - -nullable_no_def(Type) when is_map(Type) -> - Type#{default => undefined, nullable => true}. - to_atom(Atom) when is_atom(Atom) -> Atom; to_atom(Str) when is_list(Str) -> list_to_atom(Str); to_atom(Bin) when is_binary(Bin) -> binary_to_atom(Bin, utf8). + +validate_heap_size(Siz) -> + MaxSiz = case erlang:system_info(wordsize) of + 8 -> % arch_64 + (1 bsl 59) - 1; + 4 -> % arch_32 + (1 bsl 27) - 1 + end, + case Siz > MaxSiz of + true -> error(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); + false -> ok + end. +parse_user_lookup_fun(StrConf) -> + [ModStr, FunStr] = string:tokens(StrConf, ":"), + Mod = list_to_atom(ModStr), + Fun = list_to_atom(FunStr), + {fun Mod:Fun/3, <<>>}. diff --git a/apps/emqx/src/emqx_zone_schema.erl b/apps/emqx/src/emqx_zone_schema.erl new file mode 100644 index 000000000..013ffb22f --- /dev/null +++ b/apps/emqx/src/emqx_zone_schema.erl @@ -0,0 +1,34 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_zone_schema). + +-export([namespace/0, roots/0, fields/1]). + +namespace() -> zone. + +roots() -> []. + +%% zone schemas are clones from the same name from root level +%% only not allowed to have default values. +fields(Name) -> + [{N, no_default(Sc)} || {N, Sc} <- emqx_schema:fields(Name)]. + +%% no default values for zone settings +no_default(Sc) -> + fun(default) -> undefined; + (Other) -> hocon_schema:field_schema(Sc, Other) + end. diff --git a/apps/emqx_authn/src/emqx_authn_schema.erl b/apps/emqx_authn/src/emqx_authn_schema.erl index de0de9fcc..bceedb6bb 100644 --- a/apps/emqx_authn/src/emqx_authn_schema.erl +++ b/apps/emqx_authn/src/emqx_authn_schema.erl @@ -21,7 +21,8 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1 ]). @@ -32,6 +33,8 @@ -export([ authenticators/1 ]). +namespace() -> authn. + roots() -> [ "authentication" ]. fields("authentication") -> diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index 0ca281aa0..d7902d824 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -21,7 +21,8 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1 ]). @@ -74,6 +75,8 @@ mnesia(copy) -> %% Hocon Schema %%------------------------------------------------------------------------------ +namespace() -> "authn:scram:builtin_db". + roots() -> [config]. fields(config) -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index c5cbc0f02..080b71ab1 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -22,7 +22,8 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1 , validations/0 ]). @@ -37,9 +38,11 @@ %% Hocon Schema %%------------------------------------------------------------------------------ +namespace() -> "authn:http". + roots() -> - [ {config, {union, [ hoconsc:t(get) - , hoconsc:t(post) + [ {config, {union, [ hoconsc:ref(?MODULE, get) + , hoconsc:ref(?MODULE, post) ]}} ]. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl index bc26bf70e..1ce10a2cc 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl @@ -20,7 +20,8 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1 ]). @@ -34,10 +35,12 @@ %% Hocon Schema %%------------------------------------------------------------------------------ +namespace() -> "authn:jwt". + roots() -> - [ {config, {union, [ hoconsc:t('hmac-based') - , hoconsc:t('public-key') - , hoconsc:t('jwks') + [ {config, {union, [ hoconsc:mk('hmac-based') + , hoconsc:mk('public-key') + , hoconsc:mk('jwks') ]}} ]. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index c525efbf1..efe974145 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -21,7 +21,7 @@ -behaviour(hocon_schema). --export([ roots/0, fields/1 ]). +-export([ namespace/0, roots/0, fields/1 ]). -export([ create/1 , update/2 @@ -79,6 +79,8 @@ mnesia(copy) -> %% Hocon Schema %%------------------------------------------------------------------------------ +namespace() -> "authn:builtin_db". + roots() -> [config]. fields(config) -> @@ -102,7 +104,8 @@ user_id_type(type) -> user_id_type(); user_id_type(default) -> username; user_id_type(_) -> undefined. -password_hash_algorithm(type) -> {union, [hoconsc:ref(bcrypt), hoconsc:ref(other_algorithms)]}; +password_hash_algorithm(type) -> hoconsc:union([hoconsc:ref(?MODULE, bcrypt), + hoconsc:ref(?MODULE, other_algorithms)]); password_hash_algorithm(default) -> #{<<"name">> => sha256}; password_hash_algorithm(_) -> undefined. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index 11411b70f..d272fe05b 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -22,7 +22,8 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1 ]). @@ -36,10 +37,12 @@ %% Hocon Schema %%------------------------------------------------------------------------------ +namespace() -> "authn:mongodb". + roots() -> - [ {config, {union, [ hoconsc:t(standalone) - , hoconsc:t('replica-set') - , hoconsc:t('sharded-cluster') + [ {config, {union, [ hoconsc:mk(standalone) + , hoconsc:mk('replica-set') + , hoconsc:mk('sharded-cluster') ]}} ]. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 3cafdb94e..c94798aa6 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -22,7 +22,8 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1 ]). @@ -36,6 +37,8 @@ %% Hocon Schema %%------------------------------------------------------------------------------ +namespace() -> "authn:mysql". + roots() -> [config]. fields(config) -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index 5c21d3d6c..6875c5cb9 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -23,7 +23,7 @@ -behaviour(hocon_schema). --export([ roots/0, fields/1 ]). +-export([ namespace/0, roots/0, fields/1 ]). -export([ create/1 , update/2 @@ -35,6 +35,8 @@ %% Hocon Schema %%------------------------------------------------------------------------------ +namespace() -> "authn:postgres". + roots() -> [config]. fields(config) -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index 1b090b007..6c5a81652 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -22,7 +22,8 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1 ]). @@ -36,10 +37,11 @@ %% Hocon Schema %%------------------------------------------------------------------------------ +namespace() -> "authn:redis". roots() -> - [ {config, {union, [ hoconsc:t(standalone) - , hoconsc:t(cluster) - , hoconsc:t(sentinel) + [ {config, {union, [ hoconsc:mk(standalone) + , hoconsc:mk(cluster) + , hoconsc:mk(sentinel) ]}} ]. diff --git a/apps/emqx_authz/src/emqx_authz_schema.erl b/apps/emqx_authz/src/emqx_authz_schema.erl index 251e40fe6..0645990a8 100644 --- a/apps/emqx_authz/src/emqx_authz_schema.erl +++ b/apps/emqx_authz/src/emqx_authz_schema.erl @@ -13,11 +13,14 @@ -type permission() :: allow | deny. -type url() :: emqx_http_lib:uri_map(). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1 ]). -roots() -> ["authorization"]. +namespace() -> authz. + +roots() -> []. fields("authorization") -> [ {sources, #{type => union_array( diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl index 92420e217..5b781455d 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe_schema.erl @@ -19,25 +19,30 @@ -include_lib("typerefl/include/types.hrl"). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1]). +namespace() -> "auto_subscribe". + roots() -> ["auto_subscribe"]. fields("auto_subscribe") -> - [ {topics, hoconsc:array(hoconsc:ref(?MODULE, "topic"))}]; + [ {topics, hoconsc:array(hoconsc:ref(?MODULE, "topic"))} + ]; fields("topic") -> - [ {topic, emqx_schema:t(binary())} - , {qos, t(hoconsc:union([0, 1, 2]), 0)} - , {rh, t(hoconsc:union([0, 1, 2]), 0)} - , {rap, t(hoconsc:union([0, 1]), 0)} - , {nl, t(hoconsc:union([0, 1]), 0)} + [ {topic, sc(binary(), #{})} + , {qos, sc(typerefl:union([0, 1, 2]), #{default => 0})} + , {rh, sc(typerefl:union([0, 1, 2]), #{default => 0})} + , {rap, sc(typerefl:union([0, 1]), #{default => 0})} + , {nl, sc(typerefl:union([0, 1]), #{default => 0})} ]. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -t(Type, Default) -> - hoconsc:t(Type, #{default => Default}). + +sc(Type, Meta) -> + hoconsc:mk(Type, Meta). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl index 925bfa403..f370af277 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl @@ -20,55 +20,60 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1]). +namespace() -> "bridge_mqtt". + roots() -> [array("bridge_mqtt")]. -array(Name) -> {Name, hoconsc:array(hoconsc:ref(Name))}. +array(Name) -> {Name, hoconsc:array(hoconsc:ref(?MODULE, Name))}. fields("bridge_mqtt") -> - [ {name, emqx_schema:t(string(), undefined, true)} + [ {name, sc(string(), #{default => true})} , {start_type, fun start_type/1} , {forwards, fun forwards/1} - , {forward_mountpoint, emqx_schema:t(string())} - , {reconnect_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} - , {batch_size, emqx_schema:t(integer(), undefined, 100)} - , {queue, emqx_schema:t(hoconsc:ref(?MODULE, "queue"))} - , {config, hoconsc:union([hoconsc:ref(?MODULE, "mqtt"), hoconsc:ref(?MODULE, "rpc")])} + , {forward_mountpoint, sc(string(), #{})} + , {reconnect_interval, sc(emqx_schema:duration_ms(), #{default => "30s"})} + , {batch_size, sc(integer(), #{default => 100})} + , {queue, sc(hoconsc:ref(?MODULE, "queue"), #{})} + , {config, sc(hoconsc:union([hoconsc:ref(?MODULE, "mqtt"), + hoconsc:ref(?MODULE, "rpc")]), + #{})} ]; fields("mqtt") -> [ {conn_type, fun conn_type/1} - , {address, emqx_schema:t(string(), undefined, "127.0.0.1:1883")} + , {address, sc(string(), #{default => "127.0.0.1:1883"})} , {proto_ver, fun proto_ver/1} - , {bridge_mode, emqx_schema:t(boolean(), undefined, true)} - , {clientid, emqx_schema:t(string())} - , {username, emqx_schema:t(string())} - , {password, emqx_schema:t(string())} - , {clean_start, emqx_schema:t(boolean(), undefined, true)} - , {keepalive, emqx_schema:t(integer(), undefined, 300)} - , {subscriptions, hoconsc:array("subscriptions")} - , {receive_mountpoint, emqx_schema:t(string())} - , {retry_interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "30s")} - , {max_inflight, emqx_schema:t(integer(), undefined, 32)} + , {bridge_mode, sc(boolean(), #{default => true})} + , {clientid, sc(string(), #{})} + , {username, sc(string(), #{})} + , {password, sc(string(), #{})} + , {clean_start, sc(boolean(), #{default => true})} + , {keepalive, sc(integer(), #{default => 300})} + , {subscriptions, sc(hoconsc:array(hoconsc:ref(?MODULE, "subscriptions")), #{})} + , {receive_mountpoint, sc(string(), #{})} + , {retry_interval, sc(emqx_schema:duration_ms(), #{default => "30s"})} + , {max_inflight, sc(integer(), #{default => 32})} ]; fields("rpc") -> [ {conn_type, fun conn_type/1} - , {node, emqx_schema:t(atom(), undefined, 'emqx@127.0.0.1')} + , {node, sc(atom(), #{default => 'emqx@127.0.0.1'})} ]; fields("subscriptions") -> [ {topic, #{type => binary(), nullable => false}} - , {qos, emqx_schema:t(integer(), undefined, 1)} + , {qos, sc(integer(), #{default => 1})} ]; fields("queue") -> [ {replayq_dir, hoconsc:union([boolean(), string()])} - , {replayq_seg_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "100MB")} - , {replayq_offload_mode, emqx_schema:t(boolean(), undefined, false)} - , {replayq_max_total_bytes, emqx_schema:t(emqx_schema:bytesize(), undefined, "1024MB")} + , {replayq_seg_bytes, sc(emqx_schema:bytesize(), #{default => "100MB"})} + , {replayq_offload_mode, sc(boolean(), #{default => false})} + , {replayq_max_total_bytes, sc(emqx_schema:bytesize(), #{default => "1024MB"})} ]. conn_type(type) -> hoconsc:enum([mqtt, rpc]); @@ -85,3 +90,5 @@ start_type(_) -> undefined. forwards(type) -> hoconsc:array(binary()); forwards(default) -> []; forwards(_) -> undefined. + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index 88dfb2b72..c95679f32 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -82,10 +82,7 @@ mongo_fields() -> , {auth_source, #{type => binary(), nullable => true}} , {database, fun emqx_connector_schema_lib:database/1} - , {topology, #{type => hoconsc:ref(?MODULE, topology), - default => #{}}} - %% TODO: Does the ref type support nullable=ture ? - % nullable => true}} + , {topology, #{type => hoconsc:ref(?MODULE, topology)}} ] ++ emqx_connector_schema_lib:ssl_fields(). @@ -178,7 +175,7 @@ do_start(InstId, Opts0, Config = #{mongo_type := Type, ]; false -> [{ssl, false}] end, - Topology= maps:get(topology, Config, #{}), + Topology= maps:get(topology, Config, #{}), Opts = Opts0 ++ [{pool_size, PoolSize}, {options, init_topology_options(maps:to_list(Topology), [])}, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index 7dfbc923b..3ba3dc803 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -27,19 +27,19 @@ fields("emqx_dashboard") -> hoconsc:ref(?MODULE, "https")]))} , {default_username, fun default_username/1} , {default_password, fun default_password/1} - , {sample_interval, emqx_schema:t(emqx_schema:duration_s(), undefined, "10s")} - , {token_expired_time, emqx_schema:t(emqx_schema:duration(), undefined, "30m")} + , {sample_interval, sc(emqx_schema:duration_s(), #{default => "10s"})} + , {token_expired_time, sc(emqx_schema:duration(), #{default => "30m"})} ]; fields("http") -> [ {"protocol", hoconsc:enum([http, https])} - , {"port", emqx_schema:t(integer(), undefined, 18083)} - , {"num_acceptors", emqx_schema:t(integer(), undefined, 4)} - , {"max_connections", emqx_schema:t(integer(), undefined, 512)} - , {"backlog", emqx_schema:t(integer(), undefined, 1024)} - , {"send_timeout", emqx_schema:t(emqx_schema:duration(), undefined, "5s")} - , {"inet6", emqx_schema:t(boolean(), undefined, false)} - , {"ipv6_v6only", emqx_schema:t(boolean(), undefined, false)} + , {"port", hoconsc:mk(integer(), #{default => 18083})} + , {"num_acceptors", sc(integer(), #{default => 4})} + , {"max_connections", sc(integer(), #{default => 512})} + , {"backlog", sc(integer(), #{default => 1024})} + , {"send_timeout", sc(emqx_schema:duration(), #{default => "5s"})} + , {"inet6", sc(boolean(), #{default => false})} + , {"ipv6_v6only", sc(boolean(), #{dfeault => false})} ]; fields("https") -> @@ -54,3 +54,5 @@ default_password(type) -> string(); default_password(default) -> "public"; default_password(nullable) -> false; default_password(_) -> undefined. + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl index 69f53d6c1..e3c6d8ee9 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl @@ -22,5 +22,5 @@ fields(ldap) -> connector_fields(ldap). connector_fields(DB) -> Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])), - [{name, hoconsc:t(typerefl:binary())}, + [{name, hoconsc:mk(typerefl:binary())}, {type, #{type => DB}}] ++ Mod:roots(). diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl index 16fd93fa0..64d39eb52 100644 --- a/apps/emqx_exhook/src/emqx_exhook_schema.erl +++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl @@ -32,43 +32,57 @@ -reflect_type([duration/0]). --export([roots/0, fields/1]). +-export([namespace/0, roots/0, fields/1]). --export([t/1, t/3, t/4, ref/1]). +namespace() -> exhook. roots() -> [exhook]. fields(exhook) -> - [ {request_failed_action, t(union([deny, ignore]), undefined, deny)} - , {request_timeout, t(duration(), undefined, "5s")} - , {auto_reconnect, t(union([false, duration()]), undefined, "60s")} - , {servers, t(hoconsc:array(ref(servers)), undefined, [])} + [ {request_failed_action, + sc(union([deny, ignore]), + #{default => deny})} + , {request_timeout, + sc(duration(), + #{default => "5s"})} + , {auto_reconnect, + sc(union([false, duration()]), + #{ default => "60s" + })} + , {servers, + sc(hoconsc:array(ref(servers)), + #{default => []})} ]; fields(servers) -> - [ {name, string()} - , {url, string()} - , {ssl, t(ref(ssl_conf_group))} + [ {name, + sc(string(), + #{})} + , {url, + sc(string(), + #{})} + , {ssl, + sc(ref(ssl_conf), + #{})} ]; -fields(ssl_conf_group) -> - [ {cacertfile, string()} - , {certfile, string()} - , {keyfile, string()} +fields(ssl_conf) -> + [ {cacertfile, + sc(string(), + #{}) + } + , {certfile, + sc(string(), + #{}) + } + , {keyfile, + sc(string(), + #{})} ]. %% types -t(Type) -> #{type => Type}. - -t(Type, Mapping, Default) -> - hoconsc:t(Type, #{mapping => Mapping, default => Default}). - -t(Type, Mapping, Default, OverrideEnv) -> - hoconsc:t(Type, #{ mapping => Mapping - , default => Default - , override_env => OverrideEnv - }). +sc(Type, Meta) -> Meta#{type => Type}. ref(Field) -> hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 9ab26e480..da73b85ee 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -43,148 +43,149 @@ , ip_port/0 ]). --export([roots/0 , fields/1]). --export([t/1, t/3, t/4, ref/1]). +-export([namespace/0, roots/0 , fields/1]). + +namespace() -> gateway. roots() -> [gateway]. fields(gateway) -> - [{stomp, t(ref(stomp_structs))}, - {mqttsn, t(ref(mqttsn_structs))}, - {coap, t(ref(coap_structs))}, - {lwm2m, t(ref(lwm2m_structs))}, - {exproto, t(ref(exproto_structs))} + [{stomp, sc(ref(stomp_structs))}, + {mqttsn, sc(ref(mqttsn_structs))}, + {coap, sc(ref(coap_structs))}, + {lwm2m, sc(ref(lwm2m_structs))}, + {exproto, sc(ref(exproto_structs))} ]; fields(stomp_structs) -> - [ {frame, t(ref(stomp_frame))} - , {listeners, t(ref(tcp_listener_group))} + [ {frame, sc(ref(stomp_frame))} + , {listeners, sc(ref(tcp_listener_group))} ] ++ gateway_common_options(); fields(stomp_frame) -> - [ {max_headers, t(integer(), undefined, 10)} - , {max_headers_length, t(integer(), undefined, 1024)} - , {max_body_length, t(integer(), undefined, 8192)} + [ {max_headers, sc(integer(), undefined, 10)} + , {max_headers_length, sc(integer(), undefined, 1024)} + , {max_body_length, sc(integer(), undefined, 8192)} ]; fields(mqttsn_structs) -> - [ {gateway_id, t(integer())} - , {broadcast, t(boolean())} - , {enable_qos3, t(boolean())} + [ {gateway_id, sc(integer())} + , {broadcast, sc(boolean())} + , {enable_qos3, sc(boolean())} , {predefined, hoconsc:array(ref(mqttsn_predefined))} - , {listeners, t(ref(udp_listener_group))} + , {listeners, sc(ref(udp_listener_group))} ] ++ gateway_common_options(); fields(mqttsn_predefined) -> - [ {id, t(integer())} - , {topic, t(binary())} + [ {id, sc(integer())} + , {topic, sc(binary())} ]; fields(coap_structs) -> - [ {heartbeat, t(duration(), undefined, <<"30s">>)} - , {notify_type, t(union([non, con, qos]), undefined, qos)} - , {subscribe_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} - , {publish_qos, t(union([qos0, qos1, qos2, coap]), undefined, coap)} - , {listeners, t(ref(udp_listener_group))} + [ {heartbeat, sc(duration(), undefined, <<"30s">>)} + , {notify_type, sc(union([non, con, qos]), undefined, qos)} + , {subscribe_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)} + , {publish_qos, sc(union([qos0, qos1, qos2, coap]), undefined, coap)} + , {listeners, sc(ref(udp_listener_group))} ] ++ gateway_common_options(); fields(lwm2m_structs) -> - [ {xml_dir, t(binary())} - , {lifetime_min, t(duration())} - , {lifetime_max, t(duration())} - , {qmode_time_windonw, t(integer())} - , {auto_observe, t(boolean())} - , {mountpoint, t(string())} - , {update_msg_publish_condition, t(union([always, contains_object_list]))} - , {translators, t(ref(translators))} - , {listeners, t(ref(udp_listener_group))} + [ {xml_dir, sc(binary())} + , {lifetime_min, sc(duration())} + , {lifetime_max, sc(duration())} + , {qmode_time_windonw, sc(integer())} + , {auto_observe, sc(boolean())} + , {mountpoint, sc(string())} + , {update_msg_publish_condition, sc(union([always, contains_object_list]))} + , {translators, sc(ref(translators))} + , {listeners, sc(ref(udp_listener_group))} ] ++ gateway_common_options(); fields(exproto_structs) -> - [ {server, t(ref(exproto_grpc_server))} - , {handler, t(ref(exproto_grpc_handler))} - , {listeners, t(ref(udp_tcp_listener_group))} + [ {server, sc(ref(exproto_grpc_server))} + , {handler, sc(ref(exproto_grpc_handler))} + , {listeners, sc(ref(udp_tcp_listener_group))} ] ++ gateway_common_options(); fields(exproto_grpc_server) -> - [ {bind, t(union(ip_port(), integer()))} + [ {bind, sc(union(ip_port(), integer()))} %% TODO: ssl options ]; fields(exproto_grpc_handler) -> - [ {address, t(binary())} + [ {address, sc(binary())} %% TODO: ssl ]; fields(clientinfo_override) -> - [ {username, t(binary())} - , {password, t(binary())} - , {clientid, t(binary())} + [ {username, sc(binary())} + , {password, sc(binary())} + , {clientid, sc(binary())} ]; fields(translators) -> - [ {command, t(ref(translator))} - , {response, t(ref(translator))} - , {notify, t(ref(translator))} - , {register, t(ref(translator))} - , {update, t(ref(translator))} + [ {command, sc(ref(translator))} + , {response, sc(ref(translator))} + , {notify, sc(ref(translator))} + , {register, sc(ref(translator))} + , {update, sc(ref(translator))} ]; fields(translator) -> - [ {topic, t(binary())} - , {qos, t(range(0, 2))} + [ {topic, sc(binary())} + , {qos, sc(range(0, 2))} ]; fields(udp_listener_group) -> - [ {udp, t(ref(udp_listener))} - , {dtls, t(ref(dtls_listener))} + [ {udp, sc(ref(udp_listener))} + , {dtls, sc(ref(dtls_listener))} ]; fields(tcp_listener_group) -> - [ {tcp, t(ref(tcp_listener))} - , {ssl, t(ref(ssl_listener))} + [ {tcp, sc(ref(tcp_listener))} + , {ssl, sc(ref(ssl_listener))} ]; fields(udp_tcp_listener_group) -> - [ {udp, t(ref(udp_listener))} - , {dtls, t(ref(dtls_listener))} - , {tcp, t(ref(tcp_listener))} - , {ssl, t(ref(ssl_listener))} + [ {udp, sc(ref(udp_listener))} + , {dtls, sc(ref(dtls_listener))} + , {tcp, sc(ref(tcp_listener))} + , {ssl, sc(ref(ssl_listener))} ]; fields(tcp_listener) -> - [ {"$name", t(ref(tcp_listener_settings))}]; + [ {"$name", sc(ref(tcp_listener_settings))}]; fields(ssl_listener) -> - [ {"$name", t(ref(ssl_listener_settings))}]; + [ {"$name", sc(ref(ssl_listener_settings))}]; fields(udp_listener) -> - [ {"$name", t(ref(udp_listener_settings))}]; + [ {"$name", sc(ref(udp_listener_settings))}]; fields(dtls_listener) -> - [ {"$name", t(ref(dtls_listener_settings))}]; + [ {"$name", sc(ref(dtls_listener_settings))}]; fields(listener_settings) -> - [ {enable, t(boolean(), undefined, true)} - , {bind, t(union(ip_port(), integer()))} - , {acceptors, t(integer(), undefined, 8)} - , {max_connections, t(integer(), undefined, 1024)} - , {max_conn_rate, t(integer())} - , {active_n, t(integer(), undefined, 100)} - %, {rate_limit, t(comma_separated_list())} - , {access, t(ref(access))} - , {proxy_protocol, t(boolean())} - , {proxy_protocol_timeout, t(duration())} - , {backlog, t(integer(), undefined, 1024)} - , {send_timeout, t(duration(), undefined, <<"15s">>)} - , {send_timeout_close, t(boolean(), undefined, true)} - , {recbuf, t(bytesize())} - , {sndbuf, t(bytesize())} - , {buffer, t(bytesize())} - , {high_watermark, t(bytesize(), undefined, <<"1MB">>)} - , {tune_buffer, t(boolean())} - , {nodelay, t(boolean())} - , {reuseaddr, t(boolean())} + [ {enable, sc(boolean(), undefined, true)} + , {bind, sc(union(ip_port(), integer()))} + , {acceptors, sc(integer(), undefined, 8)} + , {max_connections, sc(integer(), undefined, 1024)} + , {max_conn_rate, sc(integer())} + , {active_n, sc(integer(), undefined, 100)} + %, {rate_limit, sc(comma_separated_list())} + , {access, sc(ref(access))} + , {proxy_protocol, sc(boolean())} + , {proxy_protocol_timeout, sc(duration())} + , {backlog, sc(integer(), undefined, 1024)} + , {send_timeout, sc(duration(), undefined, <<"15s">>)} + , {send_timeout_close, sc(boolean(), undefined, true)} + , {recbuf, sc(bytesize())} + , {sndbuf, sc(bytesize())} + , {buffer, sc(bytesize())} + , {high_watermark, sc(bytesize(), undefined, <<"1MB">>)} + , {tune_buffer, sc(boolean())} + , {nodelay, sc(boolean())} + , {reuseaddr, sc(boolean())} ]; fields(tcp_listener_settings) -> @@ -242,12 +243,12 @@ authentication() -> ]). gateway_common_options() -> - [ {enable, t(boolean(), undefined, true)} - , {enable_stats, t(boolean(), undefined, true)} - , {idle_timeout, t(duration(), undefined, <<"30s">>)} - , {mountpoint, t(binary())} - , {clientinfo_override, t(ref(clientinfo_override))} - , {authentication, t(authentication(), undefined, undefined)} + [ {enable, sc(boolean(), undefined, true)} + , {enable_stats, sc(boolean(), undefined, true)} + , {idle_timeout, sc(duration(), undefined, <<"30s">>)} + , {mountpoint, sc(binary())} + , {clientinfo_override, sc(ref(clientinfo_override))} + , {authentication, sc(authentication(), undefined, undefined)} ]. %%-------------------------------------------------------------------- @@ -255,16 +256,10 @@ gateway_common_options() -> %% types -t(Type) -> #{type => Type}. +sc(Type) -> #{type => Type}. -t(Type, Mapping, Default) -> - hoconsc:t(Type, #{mapping => Mapping, default => Default}). - -t(Type, Mapping, Default, OverrideEnv) -> - hoconsc:t(Type, #{ mapping => Mapping - , default => Default - , override_env => OverrideEnv - }). +sc(Type, Mapping, Default) -> + hoconsc:mk(Type, #{mapping => Mapping, default => Default}). ref(Field) -> hoconsc:ref(?MODULE, Field). @@ -273,10 +268,10 @@ ref(Field) -> %% generate a ssl field. %% ssl("emqx", #{"verify" => verify_peer}) will return -%% [ {"cacertfile", t(string(), "emqx.cacertfile", undefined)} -%% , {"certfile", t(string(), "emqx.certfile", undefined)} -%% , {"keyfile", t(string(), "emqx.keyfile", undefined)} -%% , {"verify", t(union(verify_peer, verify_none), "emqx.verify", verify_peer)} +%% [ {"cacertfile", sc(string(), "emqx.cacertfile", undefined)} +%% , {"certfile", sc(string(), "emqx.certfile", undefined)} +%% , {"keyfile", sc(string(), "emqx.keyfile", undefined)} +%% , {"verify", sc(union(verify_peer, verify_none), "emqx.verify", verify_peer)} %% , {"server_name_indication", "emqx.server_name_indication", undefined)} %% ... ssl(Mapping, Defaults) -> @@ -286,24 +281,24 @@ ssl(Mapping, Defaults) -> _ -> Mapping ++ "." ++ Field end end, D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end, - [ {"enable", t(boolean(), M("enable"), D("enable"))} - , {"cacertfile", t(binary(), M("cacertfile"), D("cacertfile"))} - , {"certfile", t(binary(), M("certfile"), D("certfile"))} - , {"keyfile", t(binary(), M("keyfile"), D("keyfile"))} - , {"verify", t(union(verify_peer, verify_none), M("verify"), D("verify"))} - , {"fail_if_no_peer_cert", t(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))} - , {"secure_renegotiate", t(boolean(), M("secure_renegotiate"), D("secure_renegotiate"))} - , {"reuse_sessions", t(boolean(), M("reuse_sessions"), D("reuse_sessions"))} - , {"honor_cipher_order", t(boolean(), M("honor_cipher_order"), D("honor_cipher_order"))} - , {"handshake_timeout", t(duration(), M("handshake_timeout"), D("handshake_timeout"))} - , {"depth", t(integer(), M("depth"), D("depth"))} - , {"password", hoconsc:t(binary(), #{mapping => M("key_password"), - default => D("key_password"), - sensitive => true + [ {"enable", sc(boolean(), M("enable"), D("enable"))} + , {"cacertfile", sc(binary(), M("cacertfile"), D("cacertfile"))} + , {"certfile", sc(binary(), M("certfile"), D("certfile"))} + , {"keyfile", sc(binary(), M("keyfile"), D("keyfile"))} + , {"verify", sc(union(verify_peer, verify_none), M("verify"), D("verify"))} + , {"fail_if_no_peer_cert", sc(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))} + , {"secure_renegotiate", sc(boolean(), M("secure_renegotiate"), D("secure_renegotiate"))} + , {"reuse_sessions", sc(boolean(), M("reuse_sessions"), D("reuse_sessions"))} + , {"honor_cipher_order", sc(boolean(), M("honor_cipher_order"), D("honor_cipher_order"))} + , {"handshake_timeout", sc(duration(), M("handshake_timeout"), D("handshake_timeout"))} + , {"depth", sc(integer(), M("depth"), D("depth"))} + , {"password", hoconsc:mk(binary(), #{ mapping => M("key_password") + , default => D("key_password") + , sensitive => true })} - , {"dhfile", t(binary(), M("dhfile"), D("dhfile"))} - , {"server_name_indication", t(union(disable, binary()), M("server_name_indication"), + , {"dhfile", sc(binary(), M("dhfile"), D("dhfile"))} + , {"server_name_indication", sc(union(disable, binary()), M("server_name_indication"), D("server_name_indication"))} - , {"tls_versions", t(comma_separated_list(), M("tls_versions"), D("tls_versions"))} - , {"ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))} - , {"psk_ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))}]. + , {"tls_versions", sc(comma_separated_list(), M("tls_versions"), D("tls_versions"))} + , {"ciphers", sc(comma_separated_list(), M("ciphers"), D("ciphers"))} + , {"psk_ciphers", sc(comma_separated_list(), M("ciphers"), D("ciphers"))}]. diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index c25ab8139..a3e7e9388 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -23,6 +23,7 @@ -dialyzer(no_fail_call). -include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). -type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all. -type file() :: string(). @@ -34,8 +35,7 @@ file/0, cipher/0]). --export([roots/0, fields/1, translations/0, translation/1]). --export([t/1, t/3, t/4, ref/1]). +-export([namespace/0, roots/0, fields/1, translations/0, translation/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). %% Static apps which merge their configs into the merged emqx.conf @@ -58,167 +58,406 @@ , emqx_exhook_schema ]). -%% TODO: add a test case to ensure the list elements are unique +namespace() -> undefined. + roots() -> - ["cluster", "node", "rpc", "log"] - ++ lists:flatmap(fun(Mod) -> Mod:roots() end, ?MERGED_CONFIGS). + ["cluster", "node", "rpc", "log"] ++ lists:flatmap(fun roots/1, ?MERGED_CONFIGS). fields("cluster") -> - [ {"name", t(atom(), "ekka.cluster_name", emqxcl)} - , {"discovery_strategy", t(union([manual, static, mcast, dns, etcd, k8s]), - undefined, manual)} - , {"autoclean", t(emqx_schema:duration(), "ekka.cluster_autoclean", "5m")} - , {"autoheal", t(boolean(), "ekka.cluster_autoheal", true)} - , {"static", ref("static")} - , {"mcast", ref("mcast")} - , {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)} - , {"dns", ref("dns")} - , {"etcd", ref("etcd")} - , {"k8s", ref("k8s")} - , {"db_backend", t(union([mnesia, rlog]), "ekka.db_backend", mnesia)} - , {"rlog", ref("rlog")} + [ {"name", + sc(atom(), + #{ mapping => "ekka.cluster_name" + , default => emqxcl + })} + , {"discovery_strategy", + sc(union([manual, static, mcast, dns, etcd, k8s]), + #{ default => manual + })} + , {"autoclean", + sc(emqx_schema:duration(), + #{ mapping => "ekka.cluster_autoclean" + , default => "5m" + })} + , {"autoheal", + sc(boolean(), + #{ mapping => "ekka.cluster_autoheal" + , default => true + })} + , {"static", + sc(ref(cluster_static), + #{})} + , {"mcast", + sc(ref(cluster_mcast), + #{})} + , {"proto_dist", + sc(union([inet_tcp, inet6_tcp, inet_tls]), + #{ mapping => "ekka.proto_dist" + , default => inet_tcp + })} + , {"dns", + sc(ref(cluster_dns), + #{})} + , {"etcd", + sc(ref(cluster_etcd), + #{})} + , {"k8s", + sc(ref(cluster_k8s), + #{})} + , {"db_backend", + sc(union([mnesia, rlog]), + #{ mapping => "ekka.db_backend" + , default => mnesia + })} + , {"rlog", + sc(ref("rlog"), + #{})} ]; -fields("static") -> - [ {"seeds", t(hoconsc:array(string()), undefined, [])}]; - -fields("mcast") -> - [ {"addr", t(string(), undefined, "239.192.0.1")} - , {"ports", t(hoconsc:array(integer()), undefined, [4369, 4370])} - , {"iface", t(string(), undefined, "0.0.0.0")} - , {"ttl", t(range(0, 255), undefined, 255)} - , {"loop", t(boolean(), undefined, true)} - , {"sndbuf", t(emqx_schema:bytesize(), undefined, "16KB")} - , {"recbuf", t(emqx_schema:bytesize(), undefined, "16KB")} - , {"buffer", t(emqx_schema:bytesize(), undefined, "32KB")} +fields(cluster_static) -> + [ {"seeds", + sc(hoconsc:array(string()), + #{ default => [] + })} ]; -fields("dns") -> - [ {"name", t(string(), undefined, "localhost")} - , {"app", t(string(), undefined, "emqx")}]; - -fields("etcd") -> - [ {"server", t(emqx_schema:comma_separated_list())} - , {"prefix", t(string(), undefined, "emqxcl")} - , {"node_ttl", t(emqx_schema:duration(), undefined, "1m")} - , {"ssl", ref("etcd_ssl")} +fields(cluster_mcast) -> + [ {"addr", + sc(string(), + #{ default => "239.192.0.1" + })} + , {"ports", + sc(hoconsc:array(integer()), + #{ default => [4369, 4370] + })} + , {"iface", + sc(string(), + #{ default => "0.0.0.0" + })} + , {"ttl", + sc(range(0, 255), + #{ default => 255 + })} + , {"loop", + sc(boolean(), + #{ default => true + })} + , {"sndbuf", + sc(emqx_schema:bytesize(), + #{ default => "16KB" + })} + , {"recbuf", + sc(emqx_schema:bytesize(), + #{ default => "16KB" + })} + , {"buffer", + sc(emqx_schema:bytesize(), + #{ default =>"32KB" + })} ]; -fields("etcd_ssl") -> +fields(cluster_dns) -> + [ {"name", + sc(string(), + #{ default => "localhost" + })} + , {"app", + sc(string(), + #{ default => "emqx" + })} + ]; + +fields(cluster_etcd) -> + [ {"server", + sc(emqx_schema:comma_separated_list(), + #{})} + , {"prefix", + sc(string(), + #{ default => "emqxcl" + })} + , {"node_ttl", + sc(emqx_schema:duration(), + #{ default => "1m" + })} + , {"ssl", + sc(ref(etcd_ssl_opts), + #{})} + ]; + +fields(etcd_ssl_opts) -> emqx_schema:ssl(#{}); -fields("k8s") -> - [ {"apiserver", t(string())} - , {"service_name", t(string(), undefined, "emqx")} - , {"address_type", t(union([ip, dns, hostname]))} - , {"app_name", t(string(), undefined, "emqx")} - , {"namespace", t(string(), undefined, "default")} - , {"suffix", t(string(), undefined, "pod.local")} +fields(cluster_k8s) -> + [ {"apiserver", + sc(string(), + #{})} + , {"service_name", + sc(string(), + #{ default => "emqx" + })} + , {"address_type", + sc(union([ip, dns, hostname]), + #{})} + , {"app_name", + sc(string(), + #{ default => "emqx" + })} + , {"namespace", + sc(string(), + #{ default => "default" + })} + , {"suffix", + sc(string(), + #{default => "pod.local" + })} ]; fields("rlog") -> - [ {"role", t(union([core, replicant]), "ekka.node_role", core)} - , {"core_nodes", t(emqx_schema:comma_separated_atoms(), "ekka.core_nodes", [])} + [ {"role", + sc(union([core, replicant]), + #{ mapping => "ekka.node_role" + , default => core + })} + , {"core_nodes", + sc(emqx_schema:comma_separated_atoms(), + #{ mapping => "ekka.core_nodes" + , default => [] + })} ]; fields("node") -> - [ {"name", hoconsc:t(string(), #{default => "emqx@127.0.0.1", - override_env => "EMQX_NODE_NAME" - })} - , {"cookie", hoconsc:t(string(), #{mapping => "vm_args.-setcookie", - default => "emqxsecretcookie", - sensitive => true, - override_env => "EMQX_NODE_COOKIE" - })} - , {"data_dir", hoconsc:t(string(), #{nullable => false})} - , {"config_files", t(list(string()), "emqx.config_files", undefined)} - , {"global_gc_interval", t(emqx_schema:duration(), undefined, "15m")} - , {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)} - , {"dist_net_ticktime", t(emqx_schema:duration(), "vm_args.-kernel net_ticktime", "2m")} - , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} - , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)} - , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)} - , {"cluster_call", ref("cluster_call")} + [ {"name", + sc(string(), + #{ default => "emqx@127.0.0.1" + , override_env => "EMQX_NODE_NAME" + })} + , {"cookie", + sc(string(), + #{ mapping => "vm_args.-setcookie", + default => "emqxsecretcookie", + sensitive => true, + override_env => "EMQX_NODE_COOKIE" + })} + , {"data_dir", + sc(string(), + #{ nullable => false + })} + , {"config_files", + sc(list(string()), + #{ mapping => "emqx.config_files" + , default => undefined + })} + , {"global_gc_interval", + sc(emqx_schema:duration(), + #{ default => "15m" + })} + , {"crash_dump_dir", + sc(file(), + #{ mapping => "vm_args.-env ERL_CRASH_DUMP" + })} + , {"dist_net_ticktime", + sc(emqx_schema:duration(), + #{ mapping => "vm_args.-kernel net_ticktime" + , default => "2m" + })} + , {"dist_listen_min", + sc(range(1024, 65535), + #{ mapping => "kernel.inet_dist_listen_min" + , default => 6369 + })} + , {"dist_listen_max", + sc(range(1024, 65535), + #{ mapping => "kernel.inet_dist_listen_max" + , default => 6369 + })} + , {"backtrace_depth", + sc(integer(), + #{ mapping => "emqx_machine.backtrace_depth" + , default => 23 + })} + , {"cluster_call", + sc(ref("cluster_call"), + #{} + )} ]; - fields("cluster_call") -> - [ {"retry_interval", t(emqx_schema:duration(), "emqx_machine.retry_interval", "1s")} - , {"max_history", t(range(1, 500), "emqx_machine.max_history", 100)} - , {"cleanup_interval", t(emqx_schema:duration(), "emqx_machine.cleanup_interval", "5m")} + [ {"retry_interval", + sc(emqx_schema:duration(), + #{ mapping => "emqx_machine.retry_interval" + , default => "1s" + })} + , {"max_history", + sc(range(1, 500), + #{mapping => "emqx_machine.max_history", + default => 100 + })} + , {"cleanup_interval", + sc(emqx_schema:duration(), + #{mapping => "emqx_machine.cleanup_interval", + default => "5m" + })} ]; fields("rpc") -> - [ {"mode", t(union(sync, async), undefined, async)} - , {"async_batch_size", t(integer(), "gen_rpc.max_batch_size", 256)} - , {"port_discovery",t(union(manual, stateless), "gen_rpc.port_discovery", stateless)} - , {"tcp_server_port", t(integer(), "gen_rpc.tcp_server_port", 5369)} - , {"tcp_client_num", t(range(1, 256), undefined, 1)} - , {"connect_timeout", t(emqx_schema:duration(), "gen_rpc.connect_timeout", "5s")} - , {"send_timeout", t(emqx_schema:duration(), "gen_rpc.send_timeout", "5s")} - , {"authentication_timeout", t(emqx_schema:duration(), "gen_rpc.authentication_timeout", "5s")} - , {"call_receive_timeout", t(emqx_schema:duration(), "gen_rpc.call_receive_timeout", "15s")} - , {"socket_keepalive_idle", t(emqx_schema:duration_s(), "gen_rpc.socket_keepalive_idle", "7200s")} - , {"socket_keepalive_interval", t(emqx_schema:duration_s(), "gen_rpc.socket_keepalive_interval", "75s")} - , {"socket_keepalive_count", t(integer(), "gen_rpc.socket_keepalive_count", 9)} - , {"socket_sndbuf", t(emqx_schema:bytesize(), "gen_rpc.socket_sndbuf", "1MB")} - , {"socket_recbuf", t(emqx_schema:bytesize(), "gen_rpc.socket_recbuf", "1MB")} - , {"socket_buffer", t(emqx_schema:bytesize(), "gen_rpc.socket_buffer", "1MB")} + [ {"mode", + sc(union(sync, async), + #{ default => async + })} + , {"async_batch_size", + sc(integer(), + #{ mapping => "gen_rpc.max_batch_size" + , default => 256 + })} + , {"port_discovery", + sc(union(manual, stateless), + #{ mapping => "gen_rpc.port_discovery" + , default => stateless + })} + , {"tcp_server_port", + sc(integer(), + #{ mapping => "gen_rpc.tcp_server_port" + , default => 5369 + })} + , {"tcp_client_num", + sc(range(1, 256), + #{ default => 1 + })} + , {"connect_timeout", + sc(emqx_schema:duration(), + #{ mapping => "gen_rpc.connect_timeout", + default => "5s" + })} + , {"send_timeout", + sc(emqx_schema:duration(), + #{ mapping => "gen_rpc.send_timeout" + , default => "5s" + })} + , {"authentication_timeout", + sc(emqx_schema:duration(), + #{ mapping=> "gen_rpc.authentication_timeout" + , default => "5s" + })} + , {"call_receive_timeout", + sc(emqx_schema:duration(), + #{ mapping => "gen_rpc.call_receive_timeout" + , default => "15s" + })} + , {"socket_keepalive_idle", + sc(emqx_schema:duration_s(), + #{ mapping => "gen_rpc.socket_keepalive_idle" + , default => "7200s" + })} + , {"socket_keepalive_interval", + sc(emqx_schema:duration_s(), + #{ mapping => "gen_rpc.socket_keepalive_interval", + default => "75s" + })} + , {"socket_keepalive_count", + sc(integer(), + #{ mapping => "gen_rpc.socket_keepalive_count" + , default => 9 + })} + , {"socket_sndbuf", + sc(emqx_schema:bytesize(), + #{ mapping => "gen_rpc.socket_sndbuf" + , default => "1MB" + })} + , {"socket_recbuf", + sc(emqx_schema:bytesize(), + #{ mapping => "gen_rpc.socket_recbuf" + , default => "1MB" + })} + , {"socket_buffer", + sc(emqx_schema:bytesize(), + #{ mapping => "gen_rpc.socket_buffer" + , default => "1MB" + })} ]; fields("log") -> [ {"console_handler", ref("console_handler")} - , {"file_handlers", ref("file_handlers")} - , {"error_logger", t(atom(), "kernel.error_logger", silent)} + , {"file_handlers", + sc(ref("file_handlers"), + #{})} + , {"error_logger", + sc(atom(), + #{mapping => "kernel.error_logger", + default => silent})} ]; fields("console_handler") -> - [ {"enable", t(boolean(), undefined, false)} + [ {"enable", + sc(boolean(), + #{ default => false + })} ] ++ log_handler_common_confs(); fields("file_handlers") -> - [ {"$name", ref("log_file_handler")} + [ {"$name", + sc(ref("log_file_handler"), + #{})} ]; fields("log_file_handler") -> - [ {"file", t(file(), undefined, undefined)} - , {"rotation", ref("log_rotation")} - , {"max_size", #{type => union([infinity, emqx_schema:bytesize()]), - default => "10MB"}} + [ {"file", + sc(file(), + #{})} + , {"rotation", + sc(ref("log_rotation"), + #{})} + , {"max_size", + sc(union([infinity, emqx_schema:bytesize()]), + #{ default => "10MB" + })} ] ++ log_handler_common_confs(); fields("log_rotation") -> - [ {"enable", t(boolean(), undefined, true)} - , {"count", t(range(1, 2048), undefined, 10)} + [ {"enable", + sc(boolean(), + #{ default => true + })} + , {"count", + sc(range(1, 2048), + #{ default => 10 + })} ]; fields("log_overload_kill") -> - [ {"enable", t(boolean(), undefined, true)} - , {"mem_size", t(emqx_schema:bytesize(), undefined, "30MB")} - , {"qlen", t(integer(), undefined, 20000)} - , {"restart_after", t(union(emqx_schema:duration(), infinity), undefined, "5s")} + [ {"enable", + sc(boolean(), + #{ default => true + })} + , {"mem_size", + sc(emqx_schema:bytesize(), + #{ default => "30MB" + })} + , {"qlen", + sc(integer(), + #{ default => 20000 + })} + , {"restart_after", + sc(union(emqx_schema:duration(), infinity), + #{ default => "5s" + })} ]; fields("log_burst_limit") -> - [ {"enable", t(boolean(), undefined, true)} - , {"max_count", t(integer(), undefined, 10000)} - , {"window_time", t(emqx_schema:duration(), undefined, "1s")} + [ {"enable", + sc(boolean(), + #{ default => true + })} + , {"max_count", + sc(integer(), + #{ default => 10000 + })} + , {"window_time", + sc(emqx_schema:duration(), + #{default => "1s"})} ]; fields("authorization") -> emqx_schema:fields("authorization") ++ - emqx_authz_schema:fields("authorization"); - -fields(Name) -> - find_field(Name, ?MERGED_CONFIGS). - -find_field(Name, []) -> - error({unknown_config_struct_field, Name}); -find_field(Name, [SchemaModule | Rest]) -> - case lists:member(bin(Name), hocon_schema:root_names(SchemaModule)) of - true -> SchemaModule:fields(Name); - false -> find_field(Name, Rest) - end. + emqx_authz_schema:fields("authorization"). translations() -> ["ekka", "kernel", "emqx"]. @@ -302,20 +541,52 @@ tr_logger(Conf) -> [{handler, default, undefined}] ++ ConsoleHandler ++ FileHandlers. log_handler_common_confs() -> - [ {"level", t(log_level(), undefined, warning)} - , {"time_offset", t(string(), undefined, "system")} - , {"chars_limit", #{type => hoconsc:union([unlimited, range(1, inf)]), - default => unlimited - }} - , {"formatter", t(union([text, json]), undefined, text)} - , {"single_line", t(boolean(), undefined, true)} - , {"sync_mode_qlen", t(integer(), undefined, 100)} - , {"drop_mode_qlen", t(integer(), undefined, 3000)} - , {"flush_qlen", t(integer(), undefined, 8000)} - , {"overload_kill", ref("log_overload_kill")} - , {"burst_limit", ref("log_burst_limit")} - , {"supervisor_reports", t(union([error, progress]), undefined, error)} - , {"max_depth", t(union([unlimited, integer()]), undefined, 100)} + [ {"level", + sc(log_level(), + #{ default => warning + })} + , {"time_offset", + sc(string(), + #{ default => "system" + })} + , {"chars_limit", + sc(hoconsc:union([unlimited, range(1, inf)]), + #{ default => unlimited + })} + , {"formatter", + sc(union([text, json]), + #{ default => text + })} + , {"single_line", + sc(boolean(), + #{ default => true + })} + , {"sync_mode_qlen", + sc(integer(), + #{ default => 100 + })} + , {"drop_mode_qlen", + sc(integer(), + #{ default => 3000 + })} + , {"flush_qlen", + sc(integer(), + #{ default => 8000 + })} + , {"overload_kill", + sc(ref("log_overload_kill"), + #{})} + , {"burst_limit", + sc(ref("log_burst_limit"), + #{})} + , {"supervisor_reports", + sc(union([error, progress]), + #{ default => error + })} + , {"max_depth", + sc(union([unlimited, integer()]), + #{ default => 100 + })} ]. log_handler_conf(Conf) -> @@ -424,18 +695,9 @@ keys(Parent, Conf) -> %% types -t(Type) -> hoconsc:t(Type). +sc(Type, Meta) -> hoconsc:mk(Type, Meta). -t(Type, Mapping, Default) -> - hoconsc:t(Type, #{mapping => Mapping, default => Default}). - -t(Type, Mapping, Default, OverrideEnv) -> - hoconsc:t(Type, #{ mapping => Mapping - , default => Default - , override_env => OverrideEnv - }). - -ref(Field) -> hoconsc:t(hoconsc:ref(Field)). +ref(Field) -> hoconsc:ref(?MODULE, Field). options(static, Conf) -> [{seeds, [to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, [])]}]; @@ -475,6 +737,6 @@ to_atom(Str) when is_list(Str) -> to_atom(Bin) when is_binary(Bin) -> binary_to_atom(Bin, utf8). -bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); -bin(Bin) when is_binary(Bin) -> Bin; -bin(L) when is_list(L) -> iolist_to_binary(L). +roots(Module) -> + lists:map(fun({_BinName, Root}) -> Root end, + maps:to_list(hocon_schema:roots(Module))). diff --git a/apps/emqx_management/src/emqx_management_schema.erl b/apps/emqx_management/src/emqx_management_schema.erl index d21f0e106..fce71ad7b 100644 --- a/apps/emqx_management/src/emqx_management_schema.erl +++ b/apps/emqx_management/src/emqx_management_schema.erl @@ -19,9 +19,12 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1]). +namespace() -> management. + roots() -> []. fields(_) -> []. diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index 7a6b72a8a..15f6ab901 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -20,9 +20,12 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1]). +namespace() -> modules. + roots() -> ["delayed", "recon", @@ -33,32 +36,34 @@ roots() -> fields(Name) when Name =:= "recon"; Name =:= "telemetry" -> - [ {enable, emqx_schema:t(boolean(), undefined, false)} + [ {enable, hoconsc:mk(boolean(), #{default => false})} ]; fields("delayed") -> - [ {enable, emqx_schema:t(boolean(), undefined, false)} - , {max_delayed_messages, emqx_schema:t(integer())} + [ {enable, hoconsc:mk(boolean(), #{default => false})} + , {max_delayed_messages, sc(integer(), #{})} ]; fields("rewrite") -> [ {action, hoconsc:enum([publish, subscribe])} - , {source_topic, emqx_schema:t(binary())} - , {re, emqx_schema:t(binary())} - , {dest_topic, emqx_schema:t(binary())} + , {source_topic, sc(binary(), #{})} + , {re, sc(binary(), #{})} + , {dest_topic, sc(binary(), #{})} ]; fields("event_message") -> - [ {"$event/client_connected", emqx_schema:t(boolean(), undefined, false)} - , {"$event/client_disconnected", emqx_schema:t(boolean(), undefined, false)} - , {"$event/client_subscribed", emqx_schema:t(boolean(), undefined, false)} - , {"$event/client_unsubscribed", emqx_schema:t(boolean(), undefined, false)} - , {"$event/message_delivered", emqx_schema:t(boolean(), undefined, false)} - , {"$event/message_acked", emqx_schema:t(boolean(), undefined, false)} - , {"$event/message_dropped", emqx_schema:t(boolean(), undefined, false)} + [ {"$event/client_connected", sc(boolean(), #{default => false})} + , {"$event/client_disconnected", sc(boolean(), #{default => false})} + , {"$event/client_subscribed", sc(boolean(), #{default => false})} + , {"$event/client_unsubscribed", sc(boolean(), #{default => false})} + , {"$event/message_delivered", sc(boolean(), #{default => false})} + , {"$event/message_acked", sc(boolean(), #{default => false})} + , {"$event/message_dropped", sc(boolean(), #{default => false})} ]; fields("topic_metrics") -> - [{topic, emqx_schema:t(binary())}]. + [{topic, sc(binary(), #{})}]. -array(Name) -> {Name, hoconsc:array(hoconsc:ref(Name))}. +array(Name) -> {Name, hoconsc:array(hoconsc:ref(?MODULE, Name))}. + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl index 47630b58d..922de6238 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_schema.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_schema.erl @@ -19,13 +19,18 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1]). +namespace() -> "prometheus". + roots() -> ["prometheus"]. fields("prometheus") -> - [ {push_gateway_server, emqx_schema:t(string())} - , {interval, emqx_schema:t(emqx_schema:duration_ms(), undefined, "15s")} - , {enable, emqx_schema:t(boolean(), undefined, false)} + [ {push_gateway_server, sc(string(), #{})} + , {interval, sc(emqx_schema:duration_ms(), #{default => "15s"})} + , {enable, sc(boolean(), #{default => false})} ]. + +sc(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index e2acc7fe7..55cfa2fcc 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -4,40 +4,40 @@ -export([roots/0, fields/1]). --define(TYPE(Type), hoconsc:t(Type)). +-define(TYPE(Type), hoconsc:mk(Type)). roots() -> ["emqx_retainer"]. fields("emqx_retainer") -> - [ {enable, t(boolean(), false)} - , {msg_expiry_interval, t(emqx_schema:duration_ms(), "0s")} - , {msg_clear_interval, t(emqx_schema:duration_ms(), "0s")} + [ {enable, sc(boolean(), false)} + , {msg_expiry_interval, sc(emqx_schema:duration_ms(), "0s")} + , {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")} , {flow_control, ?TYPE(hoconsc:ref(?MODULE, flow_control))} - , {max_payload_size, t(emqx_schema:bytesize(), "1MB")} + , {max_payload_size, sc(emqx_schema:bytesize(), "1MB")} , {config, config()} ]; fields(mnesia_config) -> [ {type, ?TYPE(hoconsc:union([built_in_database]))} - , {storage_type, t(hoconsc:union([ram, disc, disc_only]), ram)} - , {max_retained_messages, t(integer(), 0, fun is_pos_integer/1)} + , {storage_type, sc(hoconsc:union([ram, disc, disc_only]), ram)} + , {max_retained_messages, sc(integer(), 0, fun is_pos_integer/1)} ]; fields(flow_control) -> - [ {max_read_number, t(integer(), 0, fun is_pos_integer/1)} - , {msg_deliver_quota, t(integer(), 0, fun is_pos_integer/1)} - , {quota_release_interval, t(emqx_schema:duration_ms(), "0ms")} + [ {max_read_number, sc(integer(), 0, fun is_pos_integer/1)} + , {msg_deliver_quota, sc(integer(), 0, fun is_pos_integer/1)} + , {quota_release_interval, sc(emqx_schema:duration_ms(), "0ms")} ]. %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -t(Type, Default) -> - hoconsc:t(Type, #{default => Default}). +sc(Type, Default) -> + hoconsc:mk(Type, #{default => Default}). -t(Type, Default, Validator) -> - hoconsc:t(Type, #{default => Default, - validator => Validator}). +sc(Type, Default, Validator) -> + hoconsc:mk(Type, #{default => Default, + validator => Validator}). is_pos_integer(V) -> V >= 0. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl index a9646ae1e..2614fb8b1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl @@ -20,10 +20,13 @@ -behaviour(hocon_schema). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1]). -roots() -> ["emqx_rule_engine"]. +namespace() -> rule_engine. -fields("emqx_rule_engine") -> - [{ignore_sys_message, emqx_schema:t(boolean(), undefined, true)}]. +roots() -> ["rule_engine"]. + +fields("rule_engine") -> + [{ignore_sys_message, hoconsc:mk(boolean(), #{default => true})}]. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 8154170b0..a0960df25 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -594,5 +594,6 @@ printable_maps(Headers) -> end, #{}, Headers). ignore_sys_message(#message{flags = Flags}) -> + ConfigRootKey = emqx_rule_engine_schema:namespace(), maps:get(sys, Flags, false) andalso - emqx:get_config([emqx_rule_engine, ignore_sys_message]). + emqx:get_config([ConfigRootKey, ignore_sys_message]). diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index 906f55a4c..72b245f4a 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -6,15 +6,18 @@ -export([to_ip_port/1]). --export([ roots/0 +-export([ namespace/0 + , roots/0 , fields/1]). -typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}). +namespace() -> "statsd". + roots() -> ["statsd"]. fields("statsd") -> - [ {enable, emqx_schema:t(boolean(), undefined, false)} + [ {enable, hoconsc:mk(boolean(), #{default => false})} , {server, fun server/1} , {sample_time_interval, fun duration_ms/1} , {flush_time_interval, fun duration_ms/1} diff --git a/rebar.config b/rebar.config index 4e622b738..690e23bc7 100644 --- a/rebar.config +++ b/rebar.config @@ -60,7 +60,7 @@ , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.14.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.15.0"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.4.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} ]}.