From e12a629498436150f41f599d8f4217bd53a72c06 Mon Sep 17 00:00:00 2001 From: z8674558 Date: Sat, 29 May 2021 19:19:46 +0900 Subject: [PATCH] feat(emqx_schema): add ssl field generator --- src/emqx_schema.erl | 1414 ++++++++++++++++++------------------------- 1 file changed, 597 insertions(+), 817 deletions(-) diff --git a/src/emqx_schema.erl b/src/emqx_schema.erl index c2a161d83..5a0cc210d 100644 --- a/src/emqx_schema.erl +++ b/src/emqx_schema.erl @@ -2,30 +2,6 @@ -include_lib("typerefl/include/types.hrl"). --type rpc_mode() :: sync | async. --type proto_dist() :: inet_tcp | inet6_tcp | inet_tls. --type k8s_address_type() :: ip | dns | hostname. --type port_discovery() :: manual | stateless. --type acl_nomatch() :: allow | deny. --type acl_deny_action() :: ignore | disconnect. --type mqueue_default_priority() :: highest | lowest. --type endpoint() :: integer() | string(). --type listener_peer_cert_tcp() :: cn | tmp. % @todo fix --type listener_peer_cert_ssl() :: cn | dn | crt | pem | md5. --type mqtt_piggyback() :: single | multiple. --type deflate_opts_level() :: none | default | best_compression | best_speed. --type deflate_opts_strategy() :: default | filtered | huffman_only | rle. --type context_takeover() :: takeover | no_takeover. --type verify() :: verify_peer | verify_none. --type session_locking_strategy() :: local | one | quorum | all. --type shared_subscription_strategy() :: random | round_robin | sticky | hash. --type route_lock_type() :: key | tab | global. --type log_format_depth() :: unlimited | integer(). --type log_size() :: atom() | bytesize(). --type overload_kill_restart_after() :: atom() | duration(). --type log_formatter() :: text | json. --type supervisor_reports() :: error | progress. --type log_to() :: file | console | both. -type log_level() :: debug | info | notice | warning | error | critical | alert | emergency | all. -type flag() :: true | false. -type duration() :: integer(). @@ -33,264 +9,266 @@ -type bytesize() :: integer(). -type percent() :: float(). -type file() :: string(). +-type comma_separated_list() :: list(). +-type bar_separated_list() :: list(). +-type ip_port() :: tuple(). -typerefl_from_string({flag/0, emqx_schema, to_flag}). -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). -typerefl_from_string({bytesize/0, emqx_schema, to_bytesize}). -typerefl_from_string({percent/0, emqx_schema, to_percent}). +-typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}). +-typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}). +-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}). % workaround: prevent being recognized as unused functions --export([to_duration/1, to_duration_s/1, to_bytesize/1, to_flag/1, to_percent/1]). +-export([to_duration/1, to_duration_s/1, to_bytesize/1, + to_flag/1, to_percent/1, to_comma_separated_list/1, + to_bar_separated_list/1, to_ip_port/1]). -behaviour(hocon_schema). --reflect_type([ rpc_mode/0, proto_dist/0, k8s_address_type/0, port_discovery/0 - , acl_nomatch/0, acl_deny_action/0, mqueue_default_priority/0 - , endpoint/0, listener_peer_cert_tcp/0, listener_peer_cert_ssl/0 - , mqtt_piggyback/0, deflate_opts_level/0, deflate_opts_strategy/0, context_takeover/0 - , verify/0, session_locking_strategy/0 - , shared_subscription_strategy/0, route_lock_type/0 - , log_format_depth/0, log_size/0, overload_kill_restart_after/0 - , log_formatter/0, supervisor_reports/0 - , log_to/0, log_level/0, flag/0, duration/0, duration_s/0 - , bytesize/0, percent/0, file/0 -]). +-reflect_type([ log_level/0, flag/0, duration/0, duration_s/0, + bytesize/0, percent/0, file/0, + comma_separated_list/0, bar_separated_list/0, ip_port/0]). -export([structs/0, fields/1, translations/0, translation/1]). +-export([t/1, t/3, t/4, ref/1]). +-export([conf_get/2, conf_get/3, keys/2, filter/1]). +-export([ssl/2, tr_ssl/2, tr_password_hash/2]). structs() -> ["cluster", "node", "rpc", "log", "lager", - "acl", "mqtt", "zone", "listener", "module", "broker", - "plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"]. + "acl", "mqtt", "zone", "listener", "module", "broker", + "plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"]. fields("cluster") -> - [ {"name", fun cluster__name/1} - , {"discovery", fun cluster__discovery/1} - , {"autoclean", duration("ekka.cluster_autoclean", undefined)} - , {"autoheal", flag("ekka.cluster_autoheal", false)} - , {"static", ref("static")} - , {"mcast", ref("mcast")} - , {"proto_dist", fun cluster__proto_dist/1} - , {"dns", ref("dns")} - , {"etcd", ref("etcd")} - , {"k8s", ref("k8s")} + [ {"name", t(atom(), "ekka.cluster_name", emqxcl)} + , {"discovery", t(atom(), undefined, manual)} + , {"autoclean", t(duration(), "ekka.cluster_autoclean", undefined)} + , {"autoheal", t(flag(), "ekka.cluster_autoheal", false)} + , {"static", ref("static")} + , {"mcast", ref("mcast")} + , {"proto_dist", t(union([inet_tcp, inet6_tcp, inet_tls]), "ekka.proto_dist", inet_tcp)} + , {"dns", ref("dns")} + , {"etcd", ref("etcd")} + , {"k8s", ref("k8s")} ]; fields("static") -> - [ {"seeds", fun string/1}]; + [ {"seeds", t(comma_separated_list())}]; fields("mcast") -> - [ {"addr", string(undefined, "239.192.0.1")} - , {"ports", string(undefined, "4369")} - , {"iface", string(undefined, "0.0.0.0")} - , {"ttl", integer(undefined, 255)} - , {"loop", flag(undefined, true)} - , {"sndbuf", bytesize(undefined, "16KB")} - , {"recbuf", bytesize(undefined, "16KB")} - , {"buffer", bytesize(undefined, "32KB")} + [ {"addr", t(string(), undefined, "239.192.0.1")} + , {"ports", t(comma_separated_list(), undefined, "4369")} + , {"iface", t(string(), undefined, "0.0.0.0")} + , {"ttl", t(integer(), undefined, 255)} + , {"loop", t(flag(), undefined, true)} + , {"sndbuf", t(bytesize(), undefined, "16KB")} + , {"recbuf", t(bytesize(), undefined, "16KB")} + , {"buffer", t(bytesize(), undefined, "32KB")} ]; fields("dns") -> - [ {"app", fun string/1}]; + [ {"app", t(string())}]; fields("etcd") -> - [ {"server", fun string/1} - , {"prefix", fun string/1} - , {"node_ttl", duration(undefined, "1m")} - , {"ssl", ref("ssl")} + [ {"server", t(comma_separated_list())} + , {"prefix", t(string())} + , {"node_ttl", t(duration(), undefined, "1m")} + , {"ssl", ref("etcd_ssl")} ]; -fields("ssl") -> - [ {"keyfile", fun string/1} - , {"certfile", fun string/1} - , {"cacertfile", fun string/1} - ]; +fields("etcd_ssl") -> + ssl(undefined, #{}); fields("k8s") -> - [ {"apiserver", fun string/1} - , {"service_name", fun string/1} - , {"address_type", fun cluster__k8s__address_type/1} - , {"app_name", fun string/1} - , {"namespace", fun string/1} - , {"suffix", string(undefined, "")} + [ {"apiserver", t(string())} + , {"service_name", t(string())} + , {"address_type", t(union([ip, dns, hostname]))} + , {"app_name", t(string())} + , {"namespace", t(string())} + , {"suffix", t(string(), undefined, "")} ]; fields("node") -> - [ {"name", fun node__name/1} - , {"ssl_dist_optfile", string("vm_args.-ssl_dist_optfile", undefined)} - , {"cookie", fun node__cookie/1} - , {"data_dir", string("emqx.data_dir", undefined)} - , {"heartbeat", flag(undefined, false)} - , {"async_threads", fun node__async_threads/1} - , {"process_limit", integer("vm_args.+P", undefined)} - , {"max_ports", fun node__max_ports/1} - , {"dist_buffer_size", fun node__dist_buffer_size/1} - , {"global_gc_interval", duration_s("emqx.global_gc_interval", undefined)} - , {"fullsweep_after", fun node__fullsweep_after/1} - , {"max_ets_tables", duration("vm_args.+e", 256000)} - , {"crash_dump", fun node__crash_dump/1} - , {"dist_net_ticktime", integer("vm_args.-kernel net_ticktime", undefined)} - , {"dist_listen_min", integer("kernel.inet_dist_listen_min", undefined)} - , {"dist_listen_max", integer("kernel.inet_dist_listen_max", undefined)} - , {"backtrace_depth", integer("emqx.backtrace_depth", 16)} + [ {"name", t(string(), "vm_args.-name", "emqx@127.0.0.1", "NODE_NAME")} + , {"ssl_dist_optfile", t(string(), "vm_args.-ssl_dist_optfile", undefined)} + , {"cookie", t(string(), "vm_args.-setcookie", "emqxsecretcookie", "NODE_COOKIE")} + , {"data_dir", t(string(), "emqx.data_dir", undefined)} + , {"heartbeat", t(flag(), undefined, false)} + , {"async_threads", t(range(1, 1024), "vm_args.+A", undefined)} + , {"process_limit", t(integer(), "vm_args.+P", undefined)} + , {"max_ports", t(range(1024, 134217727), "vm_args.+Q", undefined, "MAX_PORTS")} + , {"dist_buffer_size", fun node__dist_buffer_size/1} + , {"global_gc_interval", t(duration_s(), "emqx.global_gc_interval", undefined)} + , {"fullsweep_after", t(non_neg_integer(), + "vm_args.-env ERL_FULLSWEEP_AFTER", 1000)} + , {"max_ets_tables", t(integer(), "vm_args.+e", 256000)} + , {"crash_dump", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)} + , {"dist_net_ticktime", t(integer(), "vm_args.-kernel net_ticktime", undefined)} + , {"dist_listen_min", t(integer(), "kernel.inet_dist_listen_min", undefined)} + , {"dist_listen_max", t(integer(), "kernel.inet_dist_listen_max", undefined)} + , {"backtrace_depth", t(integer(), "emqx.backtrace_depth", 16)} ]; fields("rpc") -> - [ {"mode", fun rpc__mode/1} - , {"async_batch_size", integer("gen_rpc.max_batch_size", 256)} - , {"port_discovery", fun rpc__port_discovery/1} - , {"tcp_server_port", integer("gen_rpc.tcp_server_port", 5369)} - , {"tcp_client_num", fun rpc__tcp_client_num/1} - , {"connect_timeout", duration("gen_rpc.connect_timeout", "5s")} - , {"send_timeout", duration("gen_rpc.send_timeout", "5s")} - , {"authentication_timeout", duration("gen_rpc.authentication_timeout", "5s")} - , {"call_receive_timeout", duration("gen_rpc.call_receive_timeout", "15s")} - , {"socket_keepalive_idle", duration_s("gen_rpc.socket_keepalive_idle", "7200s")} - , {"socket_keepalive_interval", duration_s("gen_rpc.socket_keepalive_interval", "75s")} - , {"socket_keepalive_count", integer("gen_rpc.socket_keepalive_count", 9)} - , {"socket_sndbuf", bytesize("gen_rpc.socket_sndbuf", "1MB")} - , {"socket_recbuf", bytesize("gen_rpc.socket_recbuf", "1MB")} - , {"socket_buffer", bytesize("gen_rpc.socket_buffer", "1MB")} + [ {"mode", t(union(sync, async), "emqx.rpc_mode", async)} + , {"async_batch_size", t(integer(), "gen_rpc.max_batch_size", 256)} + , {"port_discovery",t(union(manual, stateless), "gen_rpc.port_discovery", stateless)} + , {"tcp_server_port", t(integer(), "gen_rpc.tcp_server_port", 5369)} + , {"tcp_client_num", t(range(0, 255), undefined, 0)} + , {"connect_timeout", t(duration(), "gen_rpc.connect_timeout", "5s")} + , {"send_timeout", t(duration(), "gen_rpc.send_timeout", "5s")} + , {"authentication_timeout", t(duration(), "gen_rpc.authentication_timeout", "5s")} + , {"call_receive_timeout", t(duration(), "gen_rpc.call_receive_timeout", "15s")} + , {"socket_keepalive_idle", t(duration_s(), "gen_rpc.socket_keepalive_idle", "7200s")} + , {"socket_keepalive_interval", t(duration_s(), "gen_rpc.socket_keepalive_interval", "75s")} + , {"socket_keepalive_count", t(integer(), "gen_rpc.socket_keepalive_count", 9)} + , {"socket_sndbuf", t(bytesize(), "gen_rpc.socket_sndbuf", "1MB")} + , {"socket_recbuf", t(bytesize(), "gen_rpc.socket_recbuf", "1MB")} + , {"socket_buffer", t(bytesize(), "gen_rpc.socket_buffer", "1MB")} ]; fields("log") -> - [ {"to", fun log__to/1} - , {"level", fun log__level/1} - , {"time_offset", string(undefined, "system")} - , {"primary_log_level", fun log__primary_log_level/1} - , {"dir", string(undefined,"log")} - , {"file", fun log__file/1} - , {"chars_limit", integer(undefined, -1)} - , {"supervisor_reports", fun log__supervisor_reports/1} - , {"max_depth", fun log__max_depth/1} - , {"formatter", fun log__formatter/1} - , {"single_line", boolean(undefined, true)} - , {"rotation", ref("rotation")} - , {"size", fun log__size/1} - , {"sync_mode_qlen", integer(undefined, 100)} - , {"drop_mode_qlen", integer(undefined, 3000)} - , {"flush_qlen", integer(undefined, 8000)} - , {"overload_kill", flag(undefined, true)} - , {"overload_kill_mem_size", bytesize(undefined, "30MB")} - , {"overload_kill_qlen", integer(undefined, 20000)} - , {"overload_kill_restart_after", fun log__overload_kill_restart_after/1} - , {"burst_limit", string(undefined, "disabled")} - , {"error_logger", fun log__error_logger/1} - , {"debug", ref("additional_log_file")} - , {"info", ref("additional_log_file")} - , {"notice", ref("additional_log_file")} - , {"warning", ref("additional_log_file")} - , {"error", ref("additional_log_file")} - , {"critical", ref("additional_log_file")} - , {"alert", ref("additional_log_file")} - , {"emergency", ref("additional_log_file")} + [ {"to", t(union([file, console, both]), undefined, file)} + , {"level", t(log_level(), undefined, warning)} + , {"time_offset", t(string(), undefined, "system")} + , {"primary_log_level", t(log_level(), undefined, warning)} + , {"dir", t(string(), undefined, "log")} + , {"file", t(file(), undefined, "emqx.log")} + , {"chars_limit", t(integer(), undefined, -1)} + , {"supervisor_reports", t(union([error, progress]), undefined, error)} + , {"max_depth", t(union([infinity, integer()]), + "kernel.error_logger_format_depth", 20)} + , {"formatter", t(union([text, json]), undefined, text)} + , {"single_line", t(boolean(), undefined, true)} + , {"rotation", ref("rotation")} + , {"size", t(union(bytesize(), infinity), undefined, infinity)} + , {"sync_mode_qlen", t(integer(), undefined, 100)} + , {"drop_mode_qlen", t(integer(), undefined, 3000)} + , {"flush_qlen", t(integer(), undefined, 8000)} + , {"overload_kill", t(flag(), undefined, true)} + , {"overload_kill_mem_size", t(bytesize(), undefined, "30MB")} + , {"overload_kill_qlen", t(integer(), undefined, 20000)} + , {"overload_kill_restart_after", t(union(duration(), infinity), undefined, "5s")} + , {"burst_limit", t(comma_separated_list(), undefined, "disabled")} + , {"error_logger", t(atom(), "kernel.error_logger", silent)} + , {"debug", ref("additional_log_file")} + , {"info", ref("additional_log_file")} + , {"notice", ref("additional_log_file")} + , {"warning", ref("additional_log_file")} + , {"error", ref("additional_log_file")} + , {"critical", ref("additional_log_file")} + , {"alert", ref("additional_log_file")} + , {"emergency", ref("additional_log_file")} ]; fields("additional_log_file") -> - [ {"file", fun string/1}]; + [ {"file", t(string())}]; fields("rotation") -> - [ {"enable", flag(undefined, true)} - , {"size", bytesize(undefined, "10MB")} - , {"count", integer(undefined, 5)} + [ {"enable", t(flag(), undefined, true)} + , {"size", t(bytesize(), undefined, "10MB")} + , {"count", t(integer(), undefined, 5)} ]; fields("lager") -> - [ {"handlers", string("lager.handlers", "")} - , {"crash_log", flag("lager.crash_log", false)} + [ {"handlers", t(string(), "lager.handlers", "")} + , {"crash_log", t(flag(), "lager.crash_log", false)} ]; fields("acl") -> - [ {"allow_anonymous", boolean("emqx.allow_anonymous", false)} - , {"acl_nomatch", fun acl_nomatch/1} - , {"acl_file", string("emqx.acl_file", undefined)} - , {"enable_acl_cache", flag("emqx.enable_acl_cache", true)} - , {"acl_cache_ttl", duration("emqx.acl_cache_ttl", "1m")} - , {"acl_cache_max_size", fun acl_cache_max_size/1} - , {"acl_deny_action", fun acl_deny_action/1} - , {"flapping_detect_policy", string(undefined, "30,1m,5m")} + [ {"allow_anonymous", t(boolean(), "emqx.allow_anonymous", false)} + , {"acl_nomatch", t(union(allow, deny), "emqx.acl_nomatch", deny)} + , {"acl_file", t(string(), "emqx.acl_file", undefined)} + , {"enable_acl_cache", t(flag(), "emqx.enable_acl_cache", true)} + , {"acl_cache_ttl", t(duration(), "emqx.acl_cache_ttl", "1m")} + , {"acl_cache_max_size", t(range(1, inf), "emqx.acl_cache_max_size", 32)} + , {"acl_deny_action", t(union(ignore, disconnect), "emqx.acl_deny_action", ignore)} + , {"flapping_detect_policy", t(comma_separated_list(), undefined, "30,1m,5m")} ]; fields("mqtt") -> - [ {"max_packet_size", fun mqtt__max_packet_size/1} - , {"max_clientid_len", integer("emqx.max_clientid_len", 65535)} - , {"max_topic_levels", integer("emqx.max_topic_levels", 0)} - , {"max_qos_allowed", fun mqtt__max_qos_allowed/1} - , {"max_topic_alias", integer("emqx.max_topic_alias", 65535)} - , {"retain_available", boolean("emqx.retain_available", true)} - , {"wildcard_subscription", boolean("emqx.wildcard_subscription", true)} - , {"shared_subscription", boolean("emqx.shared_subscription", true)} - , {"ignore_loop_deliver", boolean("emqx.ignore_loop_deliver", true)} - , {"strict_mode", boolean("emqx.strict_mode", false)} - , {"response_information", string("emqx.response_information", undefined)} + [ {"max_packet_size", t(bytesize(), "emqx.max_packet_size", "1MB", "MAX_PACKET_SIZE")} + , {"max_clientid_len", t(integer(), "emqx.max_clientid_len", 65535)} + , {"max_topic_levels", t(integer(), "emqx.max_topic_levels", 0)} + , {"max_qos_allowed", t(range(0, 2), "emqx.max_qos_allowed", 2)} + , {"max_topic_alias", t(integer(), "emqx.max_topic_alias", 65535)} + , {"retain_available", t(boolean(), "emqx.retain_available", true)} + , {"wildcard_subscription", t(boolean(), "emqx.wildcard_subscription", true)} + , {"shared_subscription", t(boolean(), "emqx.shared_subscription", true)} + , {"ignore_loop_deliver", t(boolean(), "emqx.ignore_loop_deliver", true)} + , {"strict_mode", t(boolean(), "emqx.strict_mode", false)} + , {"response_information", t(string(), "emqx.response_information", undefined)} ]; fields("zone") -> [ {"$name", ref("zone_settings")}]; fields("zone_settings") -> - [ {"idle_timeout", duration(undefined, "15s")} - , {"allow_anonymous", fun boolean/1} - , {"acl_nomatch", fun zones_acl_nomatch/1} - , {"enable_acl", flag(undefined, false)} - , {"acl_deny_action", fun zones_acl_deny_action/1} - , {"enable_ban", flag(undefined, false)} - , {"enable_stats", flag(undefined, false)} - , {"max_packet_size", fun bytesize/1} - , {"max_clientid_len", fun integer/1} - , {"max_topic_levels", fun integer/1} - , {"max_qos_allowed", fun zones_max_qos_allowed/1} - , {"max_topic_alias", fun integer/1} - , {"retain_available", fun boolean/1} - , {"wildcard_subscription", fun boolean/1} - , {"shared_subscription", fun boolean/1} - , {"server_keepalive", fun integer/1} - , {"keepalive_backoff", fun zones_keepalive_backoff/1} - , {"max_subscriptions", integer(undefined, 0)} - , {"upgrade_qos", flag(undefined, false)} - , {"max_inflight", fun zones_max_inflight/1} - , {"retry_interval", duration_s(undefined, "30s")} - , {"max_awaiting_rel", duration(undefined, 0)} - , {"await_rel_timeout", duration_s(undefined, "300s")} - , {"ignore_loop_deliver", fun boolean/1} - , {"session_expiry_interval", duration_s(undefined, "2h")} - , {"max_mqueue_len", integer(undefined, 1000)} - , {"mqueue_priorities", string(undefined, "none")} - , {"mqueue_default_priority", fun zones_mqueue_default_priority/1} - , {"mqueue_store_qos0", boolean(undefined, true)} - , {"enable_flapping_detect", flag(undefined, false)} - , {"rate_limit", ref("rate_limit")} - , {"conn_congestion", ref("conn_congestion")} - , {"quota", ref("quota")} - , {"force_gc_policy", fun string/1} - , {"force_shutdown_policy", string(undefined, "default")} - , {"mountpoint", fun string/1} - , {"use_username_as_clientid", boolean(undefined, false)} - , {"strict_mode", boolean(undefined, false)} - , {"response_information", fun string/1} - , {"bypass_auth_plugins", boolean(undefined, false)} + [ {"idle_timeout", t(duration(), undefined, "15s")} + , {"allow_anonymous", t(boolean())} + , {"acl_nomatch", t(union(allow, deny))} + , {"enable_acl", t(flag(), undefined, false)} + , {"acl_deny_action", t(union(ignore, disconnect), undefined, ignore)} + , {"enable_ban", t(flag(), undefined, false)} + , {"enable_stats", t(flag(), undefined, false)} + , {"max_packet_size", t(bytesize())} + , {"max_clientid_len", t(integer())} + , {"max_topic_levels", t(integer())} + , {"max_qos_allowed", t(range(0, 2))} + , {"max_topic_alias", t(integer())} + , {"retain_available", t(boolean())} + , {"wildcard_subscription", t(boolean())} + , {"shared_subscription", t(boolean())} + , {"server_keepalive", t(integer())} + , {"keepalive_backoff", t(float(), undefined, 0.75)} + , {"max_subscriptions", t(integer(), undefined, 0)} + , {"upgrade_qos", t(flag(), undefined, false)} + , {"max_inflight", t(range(0, 65535))} + , {"retry_interval", t(duration_s(), undefined, "30s")} + , {"max_awaiting_rel", t(duration(), undefined, 0)} + , {"await_rel_timeout", t(duration_s(), undefined, "300s")} + , {"ignore_loop_deliver", t(boolean())} + , {"session_expiry_interval", t(duration_s(), undefined, "2h")} + , {"max_mqueue_len", t(integer(), undefined, 1000)} + , {"mqueue_priorities", t(comma_separated_list(), undefined, "none")} + , {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)} + , {"mqueue_store_qos0", t(boolean(), undefined, true)} + , {"enable_flapping_detect", t(flag(), undefined, false)} + , {"rate_limit", ref("rate_limit")} + , {"conn_congestion", ref("conn_congestion")} + , {"quota", ref("quota")} + , {"force_gc_policy", t(bar_separated_list())} + , {"force_shutdown_policy", t(bar_separated_list(), undefined, "default")} + , {"mountpoint", t(string())} + , {"use_username_as_clientid", t(boolean(), undefined, false)} + , {"strict_mode", t(boolean(), undefined, false)} + , {"response_information", t(string())} + , {"bypass_auth_plugins", t(boolean(), undefined, false)} ]; fields("rate_limit") -> - [ {"conn_messages_in", fun string/1} - , {"conn_bytes_in", fun string/1} + [ {"conn_messages_in", t(comma_separated_list())} + , {"conn_bytes_in", t(comma_separated_list())} ]; fields("conn_congestion") -> - [ {"alarm", flag(undefined, false)} - , {"min_alarm_sustain_duration", duration(undefined, "1m")} + [ {"alarm", t(flag(), undefined, false)} + , {"min_alarm_sustain_duration", t(duration(), undefined, "1m")} ]; fields("quota") -> - [ {"conn_messages_routing", fun string/1} - , {"overall_messages_routing", fun string/1} + [ {"conn_messages_routing", t(comma_separated_list())} + , {"overall_messages_routing", t(comma_separated_list())} ]; fields("listener") -> [ {"tcp", ref("tcp_listener")} - , {"ssl", ref("ssl_listener")} - , {"ws", ref("ws_listener")} - , {"wss", ref("wss_listener")} + , {"ssl", ref("ssl_listener")} + , {"ws", ref("ws_listener")} + , {"wss", ref("wss_listener")} ]; fields("tcp_listener") -> @@ -306,176 +284,166 @@ fields("wss_listener") -> [ {"$name", ref("wss_listener_settings")}]; fields("listener_settings") -> - [ {"endpoint", fun listener_endpoint/1} - , {"acceptors", integer(undefined, 8)} - , {"max_connections", integer(undefined, 1024)} - , {"max_conn_rate", fun integer/1} - , {"active_n", integer(undefined, 100)} - , {"zone", fun string/1} - , {"rate_limit", fun string/1} - , {"access", ref("access")} - , {"proxy_protocol", fun flag/1} - , {"proxy_protocol_timeout", fun duration/1} - , {"backlog", integer(undefined, 1024)} - , {"send_timeout", duration(undefined, "15s")} - , {"send_timeout_close", flag(undefined, true)} - , {"recbuf", fun bytesize/1} - , {"sndbuf", fun bytesize/1} - , {"buffer", fun bytesize/1} - , {"high_watermark", bytesize(undefined, "1MB")} - , {"tune_buffer", fun flag/1} - , {"nodelay", fun boolean/1} - , {"reuseaddr", fun boolean/1} + [ {"endpoint", t(union(ip_port(), integer()))} + , {"acceptors", t(integer(), undefined, 8)} + , {"max_connections", t(integer(), undefined, 1024)} + , {"max_conn_rate", t(integer())} + , {"active_n", t(integer(), undefined, 100)} + , {"zone", t(string())} + , {"rate_limit", t(comma_separated_list())} + , {"access", ref("access")} + , {"proxy_protocol", t(flag())} + , {"proxy_protocol_timeout", t(duration())} + , {"backlog", t(integer(), undefined, 1024)} + , {"send_timeout", t(duration(), undefined, "15s")} + , {"send_timeout_close", t(flag(), undefined, true)} + , {"recbuf", t(bytesize())} + , {"sndbuf", t(bytesize())} + , {"buffer", t(bytesize())} + , {"high_watermark", t(bytesize(), undefined, "1MB")} + , {"tune_buffer", t(flag())} + , {"nodelay", t(boolean())} + , {"reuseaddr", t(boolean())} ]; fields("tcp_listener_settings") -> - [ {"peer_cert_as_username", fun listener_peer_cert_as_username_tcp/1} - , {"peer_cert_as_clientid", fun listener_peer_cert_as_clientid_tcp/1} + [ {"peer_cert_as_username", t(cn)} + , {"peer_cert_as_clientid", t(cn)} ] ++ fields("listener_settings"); fields("ssl_listener_settings") -> - [ {"tls_versions", fun string/1} - , {"ciphers", fun string/1} - , {"psk_ciphers", fun string/1} - , {"handshake_timeout", duration(undefined, "15s")} - , {"depth", integer(undefined, 10)} - , {"key_password", fun string/1} - , {"dhfile", fun string/1} - , {"keyfile", fun string/1} - , {"certfile", fun string/1} - , {"cacertfile", fun string/1} - , {"verify", fun listener_verify/1} - , {"fail_if_no_peer_cert", fun boolean/1} - , {"secure_renegotiate", fun flag/1} - , {"reuse_sessions", flag(undefined, true)} - , {"honor_cipher_order", fun flag/1} - , {"peer_cert_as_username", fun listener_peer_cert_as_username_ssl/1} - , {"peer_cert_as_clientid", fun listener_peer_cert_as_clientid_ssl/1} - ] ++ fields("listener_settings"); + [ {"peer_cert_as_username", t(union([cn, dn, crt, pem, md5]))} + , {"peer_cert_as_clientid", t(union([cn, dn, crt, pem, md5]))} + ] ++ + ssl(undefined, #{handshake_timeout => "15s" + , depth => 10 + , reuse_sessions => true}) ++ fields("listener_settings"); fields("ws_listener_settings") -> - [ {"mqtt_path", string(undefined, "/mqtt")} - , {"fail_if_no_subprotocol", boolean(undefined, true)} - , {"supported_subprotocols", string(undefined, "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5")} - , {"proxy_address_header", string(undefined, "X-Forwarded-For")} - , {"proxy_port_header", string(undefined, "X-Forwarded-Port")} - , {"compress", fun boolean/1} - , {"deflate_opts", ref("deflate_opts")} - , {"idle_timeout", fun duration/1} - , {"max_frame_size", fun integer/1} - , {"mqtt_piggyback", fun listener_mqtt_piggyback/1} - , {"check_origin_enable", boolean(undefined, false)} - , {"allow_origin_absence", boolean(undefined, true)} - , {"check_origins", fun string/1} + [ {"mqtt_path", t(string(), undefined, "/mqtt")} + , {"fail_if_no_subprotocol", t(boolean(), undefined, true)} + , {"supported_subprotocols", t(string(), undefined, "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5")} + , {"proxy_address_header", t(string(), undefined, "x-forwarded-for")} + , {"proxy_port_header", t(string(), undefined, "x-forwarded-port")} + , {"compress", t(boolean())} + , {"deflate_opts", ref("deflate_opts")} + , {"idle_timeout", t(duration())} + , {"max_frame_size", t(integer())} + , {"mqtt_piggyback", t(union(single, multiple), undefined, multiple)} + , {"check_origin_enable", t(boolean(), undefined, false)} + , {"allow_origin_absence", t(boolean(), undefined, true)} + , {"check_origins", t(comma_separated_list())} % @fixme ] ++ lists:keydelete("high_watermark", 1, fields("tcp_listener_settings")); fields("wss_listener_settings") -> % @fixme - Settings = lists:ukeymerge(1, fields("ssl_listener_settings"), fields("ws_listener_settings")), - [{K, V} || {K, V} <- Settings, - lists:all(fun(X) -> X =/= K end, ["high_watermark", "handshake_timeout", "dhfile"])]; + Ssl = ssl(undefined, #{depth => 10 + , reuse_sessions => true}) ++ fields("listener_settings"), + Settings = lists:ukeymerge(1, Ssl, fields("ws_listener_settings")), + lists:keydelete("high_watermark", 1, Settings); fields("access") -> - [ {"$id", fun string/1}]; + [ {"$id", t(string(), undefined, undefined)}]; fields("deflate_opts") -> - [ {"level", fun deflate_opts_level/1} - , {"mem_level", fun deflate_opts_mem_level/1} - , {"strategy", fun deflate_opts_strategy/1} - , {"server_context_takeover", fun deflate_opts_server_context_takeover/1} - , {"client_context_takeover", fun deflate_opts_client_context_takeover/1} - , {"server_max_window_bits", fun integer/1} - , {"client_max_window_bits", fun integer/1} + [ {"level", t(union([none, default, best_compression, best_speed]))} + , {"mem_level", t(range(1, 9))} + , {"strategy", t(union([default, filtered, huffman_only, rle]))} + , {"server_context_takeover", t(union(takeover, no_takeover))} + , {"client_context_takeover", t(union(takeover, no_takeover))} + , {"server_max_window_bits", t(integer())} + , {"client_max_window_bits", t(integer())} ]; fields("module") -> - [ {"loaded_file", string("emqx.modules_loaded_file", undefined)} - , {"presence", ref("presence")} - , {"subscription", ref("subscription")} - , {"rewrite", ref("rewrite")} + [ {"loaded_file", t(string(), "emqx.modules_loaded_file", undefined)} + , {"presence", ref("presence")} + , {"subscription", ref("subscription")} + , {"rewrite", ref("rewrite")} ]; fields("presence") -> - [ {"qos", fun module_presence__qos/1}]; + [ {"qos", t(range(0, 2), undefined, 1)}]; fields("subscription") -> - [ {"$id", ref("subscription_settings")} - ]; + [ {"$id", ref("subscription_settings")}]; fields("subscription_settings") -> - [ {"topic", fun string/1} - , {"qos", fun module_subscription_qos/1} - , {"nl", fun module_subscription_nl/1} - , {"rap", fun module_subscription_rap/1} - , {"rh", fun module_subscription_rh/1} + [ {"topic", t(string())} + , {"qos", t(range(0, 2), undefined, 1)} + , {"nl", t(range(0, 1), undefined, 0)} + , {"rap", t(range(0, 1), undefined, 0)} + , {"rh", t(range(0, 2), undefined, 0)} ]; fields("rewrite") -> [ {"rule", ref("rule")} - , {"pub_rule", ref("rule")} - , {"sub_rule", ref("rule")} + , {"pub_rule", ref("rule")} + , {"sub_rule", ref("rule")} ]; fields("rule") -> - [ {"$id", fun string/1}]; + [ {"$id", t(string())}]; fields("plugins") -> - [ {"etc_dir", string("emqx.plugins_etc_dir", undefined)} - , {"loaded_file", string("emqx.plugins_loaded_file", undefined)} - , {"expand_plugins_dir", string("emqx.expand_plugins_dir", undefined)} + [ {"etc_dir", t(string(), "emqx.plugins_etc_dir", undefined)} + , {"loaded_file", t(string(), "emqx.plugins_loaded_file", undefined)} + , {"expand_plugins_dir", t(string(), "emqx.expand_plugins_dir", undefined)} ]; fields("broker") -> - [ {"sys_interval", duration("emqx.broker_sys_interval", "1m")} - , {"sys_heartbeat", duration("emqx.broker_sys_heartbeat", "30s")} - , {"enable_session_registry", flag("emqx.enable_session_registry", true)} - , {"session_locking_strategy", fun broker__session_locking_strategy/1} - , {"shared_subscription_strategy", fun broker__shared_subscription_strategy/1} - , {"shared_dispatch_ack_enabled", boolean("emqx.shared_dispatch_ack_enabled", false)} - , {"route_batch_clean", flag("emqx.route_batch_clean", true)} - , {"perf", ref("perf")} + [ {"sys_interval", t(duration(), "emqx.broker_sys_interval", "1m")} + , {"sys_heartbeat", t(duration(), "emqx.broker_sys_heartbeat", "30s")} + , {"enable_session_registry", t(flag(), "emqx.enable_session_registry", true)} + , {"session_locking_strategy", t(union([local, leader, quorum, all]), + "emqx.session_locking_strategy", quorum)} + , {"shared_subscription_strategy", t(union(random, round_robin), + "emqx.shared_subscription_strategy", round_robin)} + , {"shared_dispatch_ack_enabled", t(boolean(), "emqx.shared_dispatch_ack_enabled", false)} + , {"route_batch_clean", t(flag(), "emqx.route_batch_clean", true)} + , {"perf", ref("perf")} ]; fields("perf") -> - [ {"route_lock_type", fun broker__perf__route_lock_type/1} - , {"trie_compaction", boolean("emqx.trie_compaction", true)} + [ {"route_lock_type", t(union([key, tab, global]), "emqx.route_lock_type", key)} + , {"trie_compaction", t(boolean(), "emqx.trie_compaction", true)} ]; fields("sysmon") -> - [ {"long_gc", duration(undefined, 0)} - , {"long_schedule", duration(undefined, 240)} - , {"large_heap", bytesize(undefined, "8MB")} - , {"busy_dist_port", boolean(undefined, true)} - , {"busy_port", boolean(undefined, false)} + [ {"long_gc", t(duration(), undefined, 0)} + , {"long_schedule", t(duration(), undefined, 240)} + , {"large_heap", t(bytesize(), undefined, "8MB")} + , {"busy_dist_port", t(boolean(), undefined, true)} + , {"busy_port", t(boolean(), undefined, false)} ]; fields("os_mon") -> - [ {"cpu_check_interval", duration_s(undefined, 60)} - , {"cpu_high_watermark", percent(undefined, "80%")} - , {"cpu_low_watermark", percent(undefined, "60%")} - , {"mem_check_interval", duration_s(undefined, 60)} - , {"sysmem_high_watermark", percent(undefined, "70%")} - , {"procmem_high_watermark", percent(undefined, "5%")} + [ {"cpu_check_interval", t(duration_s(), undefined, 60)} + , {"cpu_high_watermark", t(percent(), undefined, "80%")} + , {"cpu_low_watermark", t(percent(), undefined, "60%")} + , {"mem_check_interval", t(duration_s(), undefined, 60)} + , {"sysmem_high_watermark", t(percent(), undefined, "70%")} + , {"procmem_high_watermark", t(percent(), undefined, "5%")} ]; fields("vm_mon") -> - [ {"check_interval", duration_s(undefined, 30)} - , {"process_high_watermark", percent(undefined, "80%")} - , {"process_low_watermark", percent(undefined, "60%")} + [ {"check_interval", t(duration_s(), undefined, 30)} + , {"process_high_watermark", t(percent(), undefined, "80%")} + , {"process_low_watermark", t(percent(), undefined, "60%")} ]; fields("alarm") -> - [ {"actions", string(undefined, "log,publish")} - , {"size_limit", integer(undefined, 1000)} - , {"validity_period", duration_s(undefined, "24h")} + [ {"actions", t(comma_separated_list(), undefined, "log,publish")} + , {"size_limit", t(integer(), undefined, 1000)} + , {"validity_period", t(duration_s(), undefined, "24h")} ]; fields("telemetry") -> - [ {"enabled", boolean(undefined, false)} - , {"url", string(undefined, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry")} - , {"report_interval", duration_s(undefined, "7d")} + [ {"enabled", t(boolean(), undefined, false)} + , {"url", t(string(), undefined, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry")} + , {"report_interval", t(duration_s(), undefined, "7d")} ]. @@ -486,63 +454,31 @@ translation("ekka") -> translation("vm_args") -> [ {"+zdbbl", fun tr_zdbbl/1} - , {"-heart", fun tr_heart/1}]; + , {"-heart", fun tr_heart/1}]; translation("gen_rpc") -> [ {"tcp_client_num", fun tr_tcp_client_num/1} - , {"tcp_client_port", fun tr_tcp_client_port/1}]; + , {"tcp_client_port", fun tr_tcp_client_port/1}]; translation("kernel") -> [ {"logger_level", fun tr_logger_level/1} - , {"logger", fun tr_logger/1}]; + , {"logger", fun tr_logger/1}]; translation("emqx") -> [ {"flapping_detect_policy", fun tr_flapping_detect_policy/1} - , {"zones", fun tr_zones/1} - , {"listeners", fun tr_listeners/1} - , {"modules", fun tr_modules/1} - , {"sysmon", fun tr_sysmon/1} - , {"os_mon", fun tr_os_mon/1} - , {"vm_mon", fun tr_vm_mon/1} - , {"alarm", fun tr_alarm/1} - , {"telemetry", fun tr_telemetry/1} + , {"zones", fun tr_zones/1} + , {"listeners", fun tr_listeners/1} + , {"modules", fun tr_modules/1} + , {"sysmon", fun tr_sysmon/1} + , {"os_mon", fun tr_os_mon/1} + , {"vm_mon", fun tr_vm_mon/1} + , {"alarm", fun tr_alarm/1} + , {"telemetry", fun tr_telemetry/1} ]. -cluster__name(mapping) -> "ekka.cluster_name"; -cluster__name(default) -> emqxcl; -cluster__name(type) -> atom(); -cluster__name(_) -> undefined. - -cluster__discovery(default) -> manual; -cluster__discovery(type) -> atom(); -cluster__discovery(_) -> undefined. - tr_cluster__discovery(Conf) -> Strategy = conf_get("cluster.discovery", Conf), - Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, - {Strategy, Filter(options(Strategy, Conf))}. - -%% @doc The erlang distributed protocol -cluster__proto_dist(mapping) -> "ekka.proto_dist"; -cluster__proto_dist(type) -> proto_dist(); -cluster__proto_dist(default) -> inet_tcp; -cluster__proto_dist(_) -> undefined. - -cluster__k8s__address_type(type) -> k8s_address_type(); -cluster__k8s__address_type(_) -> undefined. - -%% @doc Node name -node__name(mapping) -> "vm_args.-name"; -node__name(type) -> string(); -node__name(default) -> "emqx@127.0.0.1"; -node__name(override_env) -> "NODE_NAME"; -node__name(_) -> undefined. - -%% @doc Secret cookie for distributed erlang node -node__cookie(mapping) -> "vm_args.-setcookie"; -node__cookie(default) -> "emqxsecretcookie"; -node__cookie(override_env) -> "NODE_COOKIE"; -node__cookie(X) -> string(X). + {Strategy, filter(options(Strategy, Conf))}. tr_heart(Conf) -> case conf_get("node.heartbeat", Conf) of @@ -551,17 +487,6 @@ tr_heart(Conf) -> _ -> undefined end. -%% @doc More information at: http://erlang.org/doc/man/erl.html -node__async_threads(mapping) -> "vm_args.+A"; -node__async_threads(type) -> range(1, 1024); -node__async_threads(_) -> undefined. - -%% @doc The maximum number of concurrent ports/sockets.Valid range is 1024-134217727 -node__max_ports(mapping) -> "vm_args.+Q"; -node__max_ports(type) -> range(1024, 134217727); -node__max_ports(override_env) -> "MAX_PORTS"; -node__max_ports(_) -> undefined. - %% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl node__dist_buffer_size(type) -> bytesize(); node__dist_buffer_size(validator) -> @@ -578,35 +503,10 @@ node__dist_buffer_size(_) -> undefined. tr_zdbbl(Conf) -> case conf_get("node.dist_buffer_size", Conf) of undefined -> undefined; - X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; + X when is_integer(X) -> ceiling(X / 1024); %% Bytes to Kilobytes; _ -> undefined end. -%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 -node__fullsweep_after(mapping) -> "vm_args.-env ERL_FULLSWEEP_AFTER"; -node__fullsweep_after(type) -> non_neg_integer(); -node__fullsweep_after(default) -> 1000; -node__fullsweep_after(_) -> undefined. - -%% @doc Set the location of crash dumps -node__crash_dump(mapping) -> "vm_args.-env ERL_CRASH_DUMP"; -node__crash_dump(type) -> file(); -node__crash_dump(_) -> undefined. - -rpc__mode(mapping) -> "emqx.rpc_mode"; -rpc__mode(type) -> rpc_mode(); -rpc__mode(default) -> async; -rpc__mode(_) -> undefined. - -rpc__port_discovery(mapping) -> "gen_rpc.port_discovery"; -rpc__port_discovery(type) -> port_discovery(); -rpc__port_discovery(default) -> stateless; -rpc__port_discovery(_) -> undefined. - -rpc__tcp_client_num(type) -> range(0, 255); -rpc__tcp_client_num(default) -> 0; -rpc__tcp_client_num(_) -> undefined. - %% Force client to use server listening port, because we do no provide %% per-node listening port manual mapping from configs. %% i.e. all nodes in the cluster should agree to the same @@ -620,54 +520,8 @@ tr_tcp_client_num(Conf) -> tr_tcp_client_port(Conf) -> conf_get("rpc.tcp_server_port", Conf). -log__to(type) -> log_to(); -log__to(default) -> file; -log__to(_) -> undefined. - -log__level(type) -> log_level(); -log__level(default) -> warning; -log__level(_) -> undefined. - tr_logger_level(Conf) -> conf_get("log.level", Conf). -log__primary_log_level(mapping) -> "kernel.logger_level"; -log__primary_log_level(type) -> log_level(); -log__primary_log_level(default) -> warning; -log__primary_log_level(_) -> undefined. - -log__file(type) -> file(); -log__file(default) -> "emqx.log"; -log__file(_) -> undefined. - -log__supervisor_reports(type) -> supervisor_reports(); -log__supervisor_reports(default) -> error; -log__supervisor_reports(_) -> undefined. - -%% @doc Maximum depth in Erlang term log formattingand message queue inspection. -log__max_depth(mapping) -> "kernel.error_logger_format_depth"; -log__max_depth(type) -> log_format_depth(); -log__max_depth(default) -> 20; -log__max_depth(_) -> undefined. - -%% @doc format logs as JSON objects -log__formatter(type) -> log_formatter(); -log__formatter(default) -> text; -log__formatter(_) -> undefined. - -log__size(type) -> log_size(); -log__size(default) -> infinity; -log__size(_) -> undefined. - -% @todo convert to union -log__overload_kill_restart_after(type) -> duration(); -log__overload_kill_restart_after(default) -> "5s"; -log__overload_kill_restart_after(_) -> undefined. - -log__error_logger(mapping) -> "kernel.error_logger"; -log__error_logger(type) -> atom(); -log__error_logger(default) -> silent; -log__error_logger(_) -> undefined. - tr_logger(Conf) -> LogTo = conf_get("log.to", Conf), LogLevel = conf_get("log.level", Conf), @@ -682,7 +536,7 @@ tr_logger(Conf) -> SingleLine = conf_get("log.single_line", Conf), FmtName = conf_get("log.formatter", Conf), Formatter = formatter(FmtName, CharsLimit, SingleLine), - BurstLimit = string:tokens(conf_get("log.burst_limit", Conf), ", "), + BurstLimit = conf_get("log.burst_limit", Conf), {BustLimitOn, {MaxBurstCount, TimeWindow}} = burst_limit(BurstLimit), FileConf = fun (Filename) -> BasicConf = @@ -749,130 +603,33 @@ tr_logger(Conf) -> DefaultHandler ++ FileHandler ++ AdditionalHandlers. -%% @doc ACL nomatch. -acl_nomatch(mapping) -> "emqx.acl_nomatch"; -acl_nomatch(type) -> acl_nomatch(); -acl_nomatch(default) -> deny; -acl_nomatch(_) -> undefined. - -%% @doc ACL cache size. -acl_cache_max_size(mapping) -> "emqx.acl_cache_max_size"; -acl_cache_max_size(type) -> range(1, inf); -acl_cache_max_size(default) -> 32; -acl_cache_max_size(_) -> undefined. - -%% @doc Action when acl check reject current operation -acl_deny_action(mapping) -> "emqx.acl_deny_action"; -acl_deny_action(type) -> acl_deny_action(); -acl_deny_action(default) -> ignore; -acl_deny_action(_) -> undefined. - tr_flapping_detect_policy(Conf) -> - Policy = conf_get("acl.flapping_detect_policy", Conf), - [Threshold, Duration, Interval] = string:tokens(Policy, ", "), - ParseDuration = fun(S, Dur) -> - case cuttlefish_duration:parse(S, Dur) of - I when is_integer(I) -> I; - {error, Reason} -> error(Reason) + [Threshold, Duration, Interval] = conf_get("acl.flapping_detect_policy", Conf), + ParseDuration = fun(S, F) -> + case F(S) of + {ok, I} -> I; + {error, Reason} -> error({duration, Reason}) end end, #{threshold => list_to_integer(Threshold), - duration => ParseDuration(Duration, ms), - banned_interval => ParseDuration(Interval, s) + duration => ParseDuration(Duration, fun to_duration/1), + banned_interval => ParseDuration(Interval, fun to_duration_s/1) }. -%% @doc Max Packet Size Allowed, 1MB by default. -mqtt__max_packet_size(mapping) -> "emqx.max_packet_size"; -mqtt__max_packet_size(override_env) -> "MAX_PACKET_SIZE"; -mqtt__max_packet_size(type) -> bytesize(); -mqtt__max_packet_size(default) -> "1MB"; -mqtt__max_packet_size(_) -> undefined. - -%% @doc Set the Maximum QoS allowed. -mqtt__max_qos_allowed(mapping) -> "emqx.max_qos_allowed"; -mqtt__max_qos_allowed(type) -> range(0, 2); -mqtt__max_qos_allowed(default) -> 2; -mqtt__max_qos_allowed(_) -> undefined. - -zones_acl_nomatch(type) -> acl_nomatch(); -zones_acl_nomatch(_) -> undefined. - -%% @doc Action when acl check reject current operation -zones_acl_deny_action(type) -> acl_deny_action(); -zones_acl_deny_action(default) -> ignore; -zones_acl_deny_action(_) -> undefined. - -%% @doc Set the Maximum QoS allowed. -zones_max_qos_allowed(type) -> range(0, 2); -zones_max_qos_allowed(_) -> undefined. - -%% @doc Keepalive backoff -zones_keepalive_backoff(type) -> float(); -zones_keepalive_backoff(default) -> 0.75; -zones_keepalive_backoff(_) -> undefined. - -%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.0 is equivalent to maximum allowed -zones_max_inflight(type) -> range(0, 65535); -zones_max_inflight(_) -> undefined. - -%% @doc Default priority for topics not in priority table. -zones_mqueue_default_priority(type) -> mqueue_default_priority(); -zones_mqueue_default_priority(default) -> lowest; -zones_mqueue_default_priority(_) -> undefined. - tr_zones(Conf) -> Names = lists:usort(keys("zone", Conf)), - lists:foldl( - fun(Name, Zones) -> - Zone = keys("zone." ++ Name, Conf), - Mapped = lists:flatten([map_zones(K, conf_get(["zone", Name, K], Conf)) || K <- Zone]), - [{list_to_atom(Name), lists:filter(fun ({K, []}) when K =:= ratelimit; K =:= quota -> false; - ({_, undefined}) -> false; - (_) -> true end, Mapped)} | Zones] - end, [], Names). - -listener_endpoint(type) -> endpoint(); -listener_endpoint(_) -> undefined. - -listener_peer_cert_as_username_tcp(type) -> listener_peer_cert_tcp(); -listener_peer_cert_as_username_tcp(_) -> undefined. - -listener_peer_cert_as_clientid_tcp(type) -> listener_peer_cert_tcp(); -listener_peer_cert_as_clientid_tcp(_) -> undefined. - -listener_peer_cert_as_username_ssl(type) -> listener_peer_cert_ssl(); -listener_peer_cert_as_username_ssl(_) -> undefined. - -listener_peer_cert_as_clientid_ssl(type) -> listener_peer_cert_ssl(); -listener_peer_cert_as_clientid_ssl(_) -> undefined. - -listener_verify(type) -> verify(); -listener_verify(_) -> undefined. - -listener_mqtt_piggyback(type) -> mqtt_piggyback(); -listener_mqtt_piggyback(default) -> multiple; -listener_mqtt_piggyback(_) -> undefined. - -deflate_opts_level(type) -> deflate_opts_level(); -deflate_opts_level(_) -> undefined. - -deflate_opts_mem_level(type) -> range(1, 9); -deflate_opts_mem_level(_) -> undefined. - -deflate_opts_strategy(type) -> deflate_opts_strategy(); -deflate_opts_strategy(_) -> undefined. - -deflate_opts_server_context_takeover(type) -> context_takeover(); -deflate_opts_server_context_takeover(_) -> undefined. - -deflate_opts_client_context_takeover(type) -> context_takeover(); -deflate_opts_client_context_takeover(_) -> undefined. + lists:foldl( + fun(Name, Zones) -> + Zone = keys("zone." ++ Name, Conf), + Mapped = lists:flatten([map_zones(K, conf_get(["zone", Name, K], Conf)) || K <- Zone]), + [{list_to_atom(Name), lists:filter(fun ({K, []}) when K =:= ratelimit; K =:= quota -> false; + ({_, undefined}) -> false; + (_) -> true end, Mapped)} | Zones] + end, [], Names). tr_listeners(Conf) -> - Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, - Atom = fun(undefined) -> undefined; - (B) when is_binary(B)-> binary_to_atom(B); - (S) when is_list(S) -> list_to_atom(S) end, + (B) when is_binary(B)-> binary_to_atom(B); + (S) when is_list(S) -> list_to_atom(S) end, Access = fun(S) -> [A, CIDR] = string:tokens(S, " "), @@ -888,23 +645,19 @@ tr_listeners(Conf) -> RateLimit = fun(undefined) -> undefined; - (Val) -> - [L, D] = string:tokens(Val, ", "), - Limit = case cuttlefish_bytesize:parse(L) of - Sz when is_integer(Sz) -> Sz; - {error, Reason} -> error(Reason) + ([L, D]) -> + Limit = case to_bytesize(L) of + {ok, I0} -> I0; + {error, R0} -> error({bytesize, R0}) end, - Duration = case cuttlefish_duration:parse(D, s) of - Secs when is_integer(Secs) -> Secs; - {error, Reason1} -> error(Reason1) + Duration = case to_duration_s(D) of + {ok, I1} -> I1; + {error, R1} -> error({duration, R1}) end, {Limit, Duration} end, - CheckOrigin = fun(S) -> - Origins = string:tokens(S, ","), - [ list_to_binary(string:trim(O)) || O <- Origins] - end, + CheckOrigin = fun(S) -> [ list_to_binary(string:trim(O)) || O <- S] end, WsOpts = fun(Prefix) -> case conf_get(Prefix ++ ".check_origins", Conf) of @@ -914,7 +667,7 @@ tr_listeners(Conf) -> end, LisOpts = fun(Prefix) -> - Filter([{acceptors, conf_get(Prefix ++ ".acceptors", Conf)}, + filter([{acceptors, conf_get(Prefix ++ ".acceptors", Conf)}, {mqtt_path, conf_get(Prefix ++ ".mqtt_path", Conf)}, {max_connections, conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, conf_get(Prefix ++ ".max_conn_rate", Conf)}, @@ -939,87 +692,40 @@ tr_listeners(Conf) -> {check_origins, WsOpts(Prefix)} | AccOpts(Prefix)]) end, DeflateOpts = fun(Prefix) -> - Filter([{level, conf_get(Prefix ++ ".deflate_opts.level", Conf)}, - {mem_level, conf_get(Prefix ++ ".deflate_opts.mem_level", Conf)}, - {strategy, conf_get(Prefix ++ ".deflate_opts.strategy", Conf)}, - {server_context_takeover, conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf)}, - {client_context_takeover, conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf)}, - {server_max_windows_bits, conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf)}, - {client_max_windows_bits, conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf)}]) + filter([{level, conf_get(Prefix ++ ".deflate_opts.level", Conf)}, + {mem_level, conf_get(Prefix ++ ".deflate_opts.mem_level", Conf)}, + {strategy, conf_get(Prefix ++ ".deflate_opts.strategy", Conf)}, + {server_context_takeover, conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf)}, + {client_context_takeover, conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf)}, + {server_max_windows_bits, conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf)}, + {client_max_windows_bits, conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf)}]) end, TcpOpts = fun(Prefix) -> - Filter([{backlog, conf_get(Prefix ++ ".backlog", Conf)}, - {send_timeout, conf_get(Prefix ++ ".send_timeout", Conf)}, - {send_timeout_close, conf_get(Prefix ++ ".send_timeout_close", Conf)}, - {recbuf, conf_get(Prefix ++ ".recbuf", Conf)}, - {sndbuf, conf_get(Prefix ++ ".sndbuf", Conf)}, - {buffer, conf_get(Prefix ++ ".buffer", Conf)}, - {high_watermark, conf_get(Prefix ++ ".high_watermark", Conf)}, - {nodelay, conf_get(Prefix ++ ".nodelay", Conf, true)}, - {reuseaddr, conf_get(Prefix ++ ".reuseaddr", Conf)}]) - end, - SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, - MapPSKCiphers = fun(PSKCiphers) -> - lists:map( - fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; - ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; - ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; - ("PSK-RC4-SHA") -> {psk, rc4_128, sha} - end, PSKCiphers) - end, - SslOpts = fun(Prefix) -> - Versions = case SplitFun(conf_get(Prefix ++ ".tls_versions", Conf)) of - undefined -> undefined; - L -> [list_to_atom(V) || V <- L] - end, - TLSCiphers = conf_get(Prefix++".ciphers", Conf), - PSKCiphers = conf_get(Prefix++".psk_ciphers", Conf), - Ciphers = - case {TLSCiphers, PSKCiphers} of - {undefined, undefined} -> - cuttlefish:invalid(Prefix++".ciphers or "++Prefix++".psk_ciphers is absent"); - {TLSCiphers, undefined} -> - SplitFun(TLSCiphers); - {undefined, PSKCiphers} -> - MapPSKCiphers(SplitFun(PSKCiphers)); - {_TLSCiphers, _PSKCiphers} -> - cuttlefish:invalid(Prefix++".ciphers and "++Prefix++".psk_ciphers cannot be configured at the same time") - end, - UserLookupFun = - case PSKCiphers of - undefined -> undefined; - _ -> {fun emqx_psk:lookup/3, <<>>} - end, - Filter([{versions, Versions}, - {ciphers, Ciphers}, - {user_lookup_fun, UserLookupFun}, - {handshake_timeout, conf_get(Prefix ++ ".handshake_timeout", Conf)}, - {depth, conf_get(Prefix ++ ".depth", Conf)}, - {password, conf_get(Prefix ++ ".key_password", Conf)}, - {dhfile, conf_get(Prefix ++ ".dhfile", Conf)}, - {keyfile, conf_get(Prefix ++ ".keyfile", Conf)}, - {certfile, conf_get(Prefix ++ ".certfile", Conf)}, - {cacertfile, conf_get(Prefix ++ ".cacertfile", Conf)}, - {verify, conf_get(Prefix ++ ".verify", Conf)}, - {fail_if_no_peer_cert, conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf)}, - {secure_renegotiate, conf_get(Prefix ++ ".secure_renegotiate", Conf)}, - {reuse_sessions, conf_get(Prefix ++ ".reuse_sessions", Conf)}, - {honor_cipher_order, conf_get(Prefix ++ ".honor_cipher_order", Conf)}]) + filter([{backlog, conf_get(Prefix ++ ".backlog", Conf)}, + {send_timeout, conf_get(Prefix ++ ".send_timeout", Conf)}, + {send_timeout_close, conf_get(Prefix ++ ".send_timeout_close", Conf)}, + {recbuf, conf_get(Prefix ++ ".recbuf", Conf)}, + {sndbuf, conf_get(Prefix ++ ".sndbuf", Conf)}, + {buffer, conf_get(Prefix ++ ".buffer", Conf)}, + {high_watermark, conf_get(Prefix ++ ".high_watermark", Conf)}, + {nodelay, conf_get(Prefix ++ ".nodelay", Conf, true)}, + {reuseaddr, conf_get(Prefix ++ ".reuseaddr", Conf)}]) end, - Listen_fix = fun(IpPort) when is_list(IpPort) -> - [Ip, Port] = string:tokens(IpPort, ":"), - case inet:parse_address(Ip) of - {ok, R} -> {R, list_to_integer(Port)}; - _ -> error("failed to parse ip address") - end; - (Other) -> Other end, + SslOpts = fun(Prefix) -> + Opts = tr_ssl(Prefix, Conf), + case lists:keyfind(ciphers, 1, Opts) of + false -> + error(Prefix ++ ".ciphers or " ++ Prefix ++ ".psk_ciphers is absent"); + _ -> + Opts + end end, TcpListeners = fun(Type, Name) -> Prefix = string:join(["listener", Type, Name], "."), ListenOnN = case conf_get(Prefix ++ ".endpoint", Conf) of undefined -> []; - ListenOn -> Listen_fix(ListenOn) + ListenOn -> ListenOn end, [#{ proto => Atom(Type) , name => Name @@ -1039,7 +745,7 @@ tr_listeners(Conf) -> ListenOn -> [#{ proto => Atom(Type) , name => Name - , listen_on => Listen_fix(ListenOn) + , listen_on => ListenOn , opts => [ {deflate_options, DeflateOpts(Prefix)} , {tcp_options, TcpOpts(Prefix)} , {ssl_options, SslOpts(Prefix)} @@ -1047,43 +753,22 @@ tr_listeners(Conf) -> ] } ] - end - end, + end end, lists:flatten([TcpListeners("tcp", Name) || Name <- keys("listener.tcp", Conf)] - ++ [TcpListeners("ws", Name) || Name <- keys("listener.ws", Conf)] - ++ [SslListeners("ssl", Name) || Name <- keys("listener.ssl", Conf)] - ++ [SslListeners("wss", Name) || Name <- keys("listener.wss", Conf)]). - -module_presence__qos(type) -> range(0, 2); -module_presence__qos(default) -> 1; -module_presence__qos(_) -> undefined. - -module_subscription_qos(type) -> range(0, 2); -module_subscription_qos(default) -> 1; -module_subscription_qos(_) -> undefined. - -module_subscription_nl(type) -> range(0, 1); -module_subscription_nl(default) -> 0; -module_subscription_nl(_) -> undefined. - -module_subscription_rap(type) -> range(0, 1); -module_subscription_rap(default) -> 0; -module_subscription_rap(_) -> undefined. - -module_subscription_rh(type) -> range(0, 2); -module_subscription_rh(default) -> 0; -module_subscription_rh(_) -> undefined. + ++ [TcpListeners("ws", Name) || Name <- keys("listener.ws", Conf)] + ++ [SslListeners("ssl", Name) || Name <- keys("listener.ssl", Conf)] + ++ [SslListeners("wss", Name) || Name <- keys("listener.wss", Conf)]). tr_modules(Conf) -> Subscriptions = fun() -> List = keys("module.subscription", Conf), TopicList = [{N, conf_get(["module", "subscription", N, "topic"], Conf)}|| N <- List], [{list_to_binary(T), #{ qos => conf_get("module.subscription." ++ N ++ ".qos", Conf, 0), - nl => conf_get("module.subscription." ++ N ++ ".nl", Conf, 0), - rap => conf_get("module.subscription." ++ N ++ ".rap", Conf, 0), - rh => conf_get("module.subscription." ++ N ++ ".rh", Conf, 0) + nl => conf_get("module.subscription." ++ N ++ ".nl", Conf, 0), + rap => conf_get("module.subscription." ++ N ++ ".rap", Conf, 0), + rh => conf_get("module.subscription." ++ N ++ ".rh", Conf, 0) }} || {N, T} <- TopicList] end, Rewrites = fun() -> @@ -1109,92 +794,67 @@ tr_modules(Conf) -> [{emqx_mod_acl_internal, [{acl_file, conf_get("acl.acl_file", Conf)}]}] ]). -broker__session_locking_strategy(mapping) -> "emqx.session_locking_strategy"; -broker__session_locking_strategy(type) -> session_locking_strategy(); -broker__session_locking_strategy(default) -> quorum; -broker__session_locking_strategy(_) -> undefined. - -%% @doc Shared Subscription Dispatch Strategy.randomly pick a subscriber -%% round robin alive subscribers one message after another -%% pick a random subscriber and stick to it -%% hash client ID to a group member -broker__shared_subscription_strategy(mapping) -> "emqx.shared_subscription_strategy"; -broker__shared_subscription_strategy(type) -> shared_subscription_strategy(); -broker__shared_subscription_strategy(default) -> round_robin; -broker__shared_subscription_strategy(_) -> undefined. - -%% @doc Performance toggle for subscribe/unsubscribe wildcard topic. -%% Change this toggle only when there are many wildcard topics. -%% key: mnesia translational updates with per-key locks. recommended for single node setup. -%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup. -%% global: global lock protected updates. recommended for larger cluster. -%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster -broker__perf__route_lock_type(mapping) -> "emqx.route_lock_type"; -broker__perf__route_lock_type(type) -> route_lock_type(); -broker__perf__route_lock_type(default) -> key; -broker__perf__route_lock_type(_) -> undefined. - tr_sysmon(Conf) -> Keys = maps:to_list(conf_get("sysmon", Conf, #{})), [{binary_to_atom(K), maps:get(value, V)} || {K, V} <- Keys]. tr_os_mon(Conf) -> [{cpu_check_interval, conf_get("os_mon.cpu_check_interval", Conf)} - , {cpu_high_watermark, conf_get("os_mon.cpu_high_watermark", Conf) * 100} - , {cpu_low_watermark, conf_get("os_mon.cpu_low_watermark", Conf) * 100} - , {mem_check_interval, conf_get("os_mon.mem_check_interval", Conf)} - , {sysmem_high_watermark, conf_get("os_mon.sysmem_high_watermark", Conf) * 100} - , {procmem_high_watermark, conf_get("os_mon.procmem_high_watermark", Conf) * 100} + , {cpu_high_watermark, conf_get("os_mon.cpu_high_watermark", Conf) * 100} + , {cpu_low_watermark, conf_get("os_mon.cpu_low_watermark", Conf) * 100} + , {mem_check_interval, conf_get("os_mon.mem_check_interval", Conf)} + , {sysmem_high_watermark, conf_get("os_mon.sysmem_high_watermark", Conf) * 100} + , {procmem_high_watermark, conf_get("os_mon.procmem_high_watermark", Conf) * 100} ]. tr_vm_mon(Conf) -> [ {check_interval, conf_get("vm_mon.check_interval", Conf)} - , {process_high_watermark, conf_get("vm_mon.process_high_watermark", Conf) * 100} - , {process_low_watermark, conf_get("vm_mon.process_low_watermark", Conf) * 100} + , {process_high_watermark, conf_get("vm_mon.process_high_watermark", Conf) * 100} + , {process_low_watermark, conf_get("vm_mon.process_low_watermark", Conf) * 100} ]. tr_alarm(Conf) -> - [ {actions, [list_to_atom(Action) || Action <- string:tokens(conf_get("alarm.actions", Conf), ",")]} - , {size_limit, conf_get("alarm.size_limit", Conf)} - , {validity_period, conf_get("alarm.validity_period", Conf)} + [ {actions, [list_to_atom(Action) || Action <- conf_get("alarm.actions", Conf)]} + , {size_limit, conf_get("alarm.size_limit", Conf)} + , {validity_period, conf_get("alarm.validity_period", Conf)} ]. tr_telemetry(Conf) -> [ {enabled, conf_get("telemetry.enabled", Conf)} - , {url, conf_get("telemetry.url", Conf)} - , {report_interval, conf_get("telemetry.report_interval", Conf)} + , {url, conf_get("telemetry.url", Conf)} + , {report_interval, conf_get("telemetry.report_interval", Conf)} ]. %% helpers options(static, Conf) -> - [{seeds, [list_to_atom(S) || S <- string:tokens(conf_get("cluster.static.seeds", Conf, ""), ",")]}]; + [{seeds, [list_to_atom(S) || S <- conf_get("cluster.static.seeds", Conf, "")]}]; options(mcast, Conf) -> {ok, Addr} = inet:parse_address(conf_get("cluster.mcast.addr", Conf)), {ok, Iface} = inet:parse_address(conf_get("cluster.mcast.iface", Conf)), - Ports = [list_to_integer(S) || S <- string:tokens(conf_get("cluster.mcast.ports", Conf), ",")], + Ports = [list_to_integer(S) || S <- conf_get("cluster.mcast.ports", Conf)], [{addr, Addr}, {ports, Ports}, {iface, Iface}, - {ttl, conf_get("cluster.mcast.ttl", Conf, 1)}, - {loop, conf_get("cluster.mcast.loop", Conf, true)}]; + {ttl, conf_get("cluster.mcast.ttl", Conf, 1)}, + {loop, conf_get("cluster.mcast.loop", Conf, true)}]; options(dns, Conf) -> [{name, conf_get("cluster.dns.name", Conf)}, - {app, conf_get("cluster.dns.app", Conf)}]; + {app, conf_get("cluster.dns.app", Conf)}]; options(etcd, Conf) -> Namespace = "cluster.etcd.ssl", SslOpts = fun(C) -> Options = keys(Namespace, C), lists:map(fun(Key) -> {list_to_atom(Key), conf_get([Namespace, Key], Conf)} end, Options) end, - [{server, string:tokens(conf_get("cluster.etcd.server", Conf), ",")}, - {prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")}, - {node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)}, - {ssl_options, SslOpts(Conf)}]; + [{server, conf_get("cluster.etcd.server", Conf)}, + {prefix, conf_get("cluster.etcd.prefix", Conf, "emqxcl")}, + {node_ttl, conf_get("cluster.etcd.node_ttl", Conf, 60)}, + {ssl_options, filter(SslOpts(Conf))}]; options(k8s, Conf) -> [{apiserver, conf_get("cluster.k8s.apiserver", Conf)}, - {service_name, conf_get("cluster.k8s.service_name", Conf)}, - {address_type, conf_get("cluster.k8s.address_type", Conf, ip)}, - {app_name, conf_get("cluster.k8s.app_name", Conf)}, - {namespace, conf_get("cluster.k8s.namespace", Conf)}, - {suffix, conf_get("cluster.k8s.suffix", Conf, "")}]; + {service_name, conf_get("cluster.k8s.service_name", Conf)}, + {address_type, conf_get("cluster.k8s.address_type", Conf, ip)}, + {app_name, conf_get("cluster.k8s.app_name", Conf)}, + {namespace, conf_get("cluster.k8s.namespace", Conf)}, + {suffix, conf_get("cluster.k8s.suffix", Conf, "")}]; options(manual, _Conf) -> []. @@ -1223,15 +883,15 @@ burst_limit(["disabled"]) -> {false, {20000, 1000}}; burst_limit([Count, Window]) -> {true, {list_to_integer(Count), - case hocon_postprocess:duration(Window) of - Secs when is_integer(Secs) -> Secs; - _ -> error({duration, Window}) + case to_duration(Window) of + {ok, I} -> I; + {error, R} -> error({duration, R}) end}}. %% For creating additional log files for specific log levels. additional_log_files(Conf) -> LogLevel = ["debug", "info", "notice", "warning", - "error", "critical", "alert", "emergency"], + "error", "critical", "alert", "emergency"], additional_log_files(Conf, LogLevel, []). additional_log_files(_Conf, [], Acc) -> @@ -1242,44 +902,52 @@ additional_log_files(Conf, [L | More], Acc) -> F -> additional_log_files(Conf, More, [{L, F} | Acc]) end. -rate_limit(Val) -> - [L, D] = string:tokens(Val, ", "), - Limit = case cuttlefish_bytesize:parse(L) of - Sz when is_integer(Sz) -> Sz; - {error, Reason1} -> error(Reason1) +rate_limit_byte_dur([L, D]) -> + Limit = case to_bytesize(L) of + {ok, I0} -> I0; + {error, R0} -> error({bytesize, R0}) end, - Duration = case cuttlefish_duration:parse(D, s) of - Secs when is_integer(Secs) -> Secs; + Duration = case to_duration_s(D) of + {ok, I1} -> I1; + {error, R1} -> error({duration, R1}) + end, + {Limit, Duration}. + +rate_limit_num_dur([L, D]) -> + Limit = case string:to_integer(L) of + {Int, []} when is_integer(Int) -> Int; + _ -> error("failed to parse bytesize string") + end, + Duration = case to_duration_s(D) of + {ok, I} -> I; {error, Reason} -> error(Reason) end, {Limit, Duration}. map_zones(_, undefined) -> {undefined, undefined}; -map_zones("force_gc_policy", Val) -> - [Count, Bytes] = string:tokens(Val, "| "), - GcPolicy = case cuttlefish_bytesize:parse(Bytes) of +map_zones("force_gc_policy", [Count, Bytes]) -> + GcPolicy = case to_bytesize(Bytes) of {error, Reason} -> - error(Reason); - Bytes1 -> + error({bytesize, Reason}); + {ok, Bytes1} -> #{bytes => Bytes1, count => list_to_integer(Count)} end, {force_gc_policy, GcPolicy}; -map_zones("force_shutdown_policy", "default") -> +map_zones("force_shutdown_policy", ["default"]) -> WordSize = erlang:system_info(wordsize), {DefaultLen, DefaultSize} = case WordSize of 8 -> % arch_64 - {10000, cuttlefish_bytesize:parse("64MB")}; + {10000, hocon_postprocess:bytesize("64MB")}; 4 -> % arch_32 - {1000, cuttlefish_bytesize:parse("32MB")} + {1000, hocon_postprocess:bytesize("32MB")} end, {force_shutdown_policy, #{message_queue_len => DefaultLen, max_heap_size => DefaultSize div WordSize }}; -map_zones("force_shutdown_policy", Val) -> - [Len, Siz] = string:tokens(Val, "| "), +map_zones("force_shutdown_policy", [Len, Siz]) -> WordSize = erlang:system_info(wordsize), MaxSiz = case WordSize of 8 -> % arch_64 @@ -1288,19 +956,19 @@ map_zones("force_shutdown_policy", Val) -> (1 bsl 27) - 1 end, ShutdownPolicy = - case cuttlefish_bytesize:parse(Siz) of + case to_bytesize(Siz) of {error, Reason} -> error(Reason); - Siz1 when Siz1 > MaxSiz -> - cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); - Siz1 -> + {ok, Siz1} when Siz1 > MaxSiz -> + error(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); + {ok, Siz1} -> #{message_queue_len => list_to_integer(Len), max_heap_size => Siz1 div WordSize} end, {force_shutdown_policy, ShutdownPolicy}; map_zones("mqueue_priorities", Val) -> case Val of - "none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE + ["none"] -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE _ -> MqueuePriorities = lists:foldl(fun(T, Acc) -> %% NOTE: space in "= " is intended @@ -1308,7 +976,7 @@ map_zones("mqueue_priorities", Val) -> P = list_to_integer(Prio), (P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}), maps:put(iolist_to_binary(Topic), P, Acc) - end, #{}, string:tokens(Val, ",")), + end, #{}, Val), {mqueue_priorities, MqueuePriorities} end; map_zones("mountpoint", Val) -> @@ -1317,45 +985,45 @@ map_zones("response_information", Val) -> {response_information, iolist_to_binary(Val)}; map_zones("rate_limit", Conf) -> Messages = case conf_get("conn_messages_in", #{value => Conf}) of - undefined -> - []; - M -> - [{conn_messages_in, rate_limit(M)}] - end, + undefined -> + []; + M -> + [{conn_messages_in, rate_limit_num_dur(M)}] + end, Bytes = case conf_get("conn_bytes_in", #{value => Conf}) of - undefined -> - []; - B -> - [{conn_bytes_in, rate_limit(B)}] + undefined -> + []; + B -> + [{conn_bytes_in, rate_limit_byte_dur(B)}] end, {ratelimit, Messages ++ Bytes}; map_zones("conn_congestion", Conf) -> Alarm = case conf_get("alarm", #{value => Conf}) of + undefined -> + []; + A -> + [{conn_congestion_alarm_enabled, A}] + end, + MinAlarm = case conf_get("min_alarm_sustain_duration", #{value => Conf}) of undefined -> []; - A -> - [{conn_congestion_alarm_enabled, A}] + M -> + [{conn_congestion_min_alarm_sustain_duration, M}] end, - MinAlarm = case conf_get("min_alarm_sustain_duration", #{value => Conf}) of - undefined -> - []; - M -> - [{conn_congestion_min_alarm_sustain_duration, M}] - end, Alarm ++ MinAlarm; map_zones("quota", Conf) -> Conn = case conf_get("conn_messages_routing", #{value => Conf}) of - undefined -> - []; - C -> - [{conn_messages_routing, rate_limit(C)}] - end, + undefined -> + []; + C -> + [{conn_messages_routing, rate_limit_num_dur(C)}] + end, Overall = case conf_get("overall_messages_routing", #{value => Conf}) of - undefined -> - []; - O -> - [{overall_messages_routing, rate_limit(O)}] - end, + undefined -> + []; + O -> + [{overall_messages_routing, rate_limit_num_dur(O)}] + end, {quota, Conn ++ Overall}; map_zones(Opt, Val) -> {list_to_atom(Opt), Val}. @@ -1382,47 +1050,134 @@ conf_get(Key, Conf, Default) -> V end. +filter(Opts) -> + [{K, V} || {K, V} <- Opts, V =/= undefined]. + +%% generate a ssl field. +%% ssl("emqx", #{"verify" => verify_peer}) will return +%% [ {"cacertfile", t(string(), "emqx.cacertfile", undefined)} +%% , {"certfile", t(string(), "emqx.certfile", undefined)} +%% , {"keyfile", t(string(), "emqx.keyfile", undefined)} +%% , {"verify", t(union(verify_peer, verify_none), "emqx.verify", verify_peer)} +%% , {"server_name_indication", "emqx.server_name_indication", undefined)} +%% ... +ssl(Mapping, Defaults) -> + M = fun (Field) -> + case (Mapping) of + undefined -> undefined; + _ -> Mapping ++ "." ++ Field + end end, + D = fun (Field) -> maps:get(list_to_atom(Field), Defaults, undefined) end, + [ {"enable", t(flag(), M("enable"), D("enable"))} + , {"cacertfile", t(string(), M("cacertfile"), D("cacertfile"))} + , {"certfile", t(string(), M("certfile"), D("certfile"))} + , {"keyfile", t(string(), M("keyfile"), D("keyfile"))} + , {"verify", t(union(verify_peer, verify_none), M("verify"), D("verify"))} + , {"fail_if_no_peer_cert", t(boolean(), M("fail_if_no_peer_cert"), D("fail_if_no_peer_cert"))} + , {"secure_renegotiate", t(flag(), M("secure_renegotiate"), D("secure_renegotiate"))} + , {"reuse_sessions", t(flag(), M("reuse_sessions"), D("reuse_sessions"))} + , {"honor_cipher_order", t(flag(), M("honor_cipher_order"), D("honor_cipher_order"))} + , {"handshake_timeout", t(duration(), M("handshake_timeout"), D("handshake_timeout"))} + , {"depth", t(integer(), M("depth"), D("depth"))} + , {"password", t(string(), M("key_password"), D("key_password"))} + , {"dhfile", t(string(), M("dhfile"), D("dhfile"))} + , {"server_name_indication", t(union(disable, string()), M("server_name_indication"), + D("server_name_indication"))} + , {"tls_versions", t(comma_separated_list(), M("tls_versions"), D("tls_versions"))} + , {"ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))} + , {"psk_ciphers", t(comma_separated_list(), M("ciphers"), D("ciphers"))}]. + +tr_ssl(Field, Conf) -> + Versions = case conf_get([Field, "tls_versions"], Conf) of + undefined -> undefined; + Vs -> [list_to_existing_atom(V) || V <- Vs] + end, + TLSCiphers = conf_get([Field, "ciphers"], Conf), + PSKCiphers = conf_get([Field, "psk_ciphers"], Conf), + Ciphers = ciphers(TLSCiphers, PSKCiphers, Field), + case emqx_schema:conf_get([Field, "enable"], Conf) of + X when X =:= true orelse X =:= undefined -> + filter([{versions, Versions}, + {ciphers, Ciphers}, + {user_lookup_fun, user_lookup_fun(PSKCiphers)}, + {handshake_timeout, conf_get([Field, "handshake_timeout"], Conf)}, + {depth, conf_get([Field, "depth"], Conf)}, + {password, conf_get([Field, "key_password"], Conf)}, + {dhfile, conf_get([Field, "dhfile"], Conf)}, + {keyfile, emqx_schema:conf_get([Field, "keyfile"], Conf)}, + {certfile, emqx_schema:conf_get([Field, "certfile"], Conf)}, + {cacertfile, emqx_schema:conf_get([Field, "cacertfile"], Conf)}, + {verify, emqx_schema:conf_get([Field, "verify"], Conf)}, + {fail_if_no_peer_cert, conf_get([Field, "fail_if_no_peer_cert"], Conf)}, + {secure_renegotiate, conf_get([Field, "secure_renegotiate"], Conf)}, + {reuse_sessions, conf_get([Field, "reuse_sessions"], Conf)}, + {honor_cipher_order, conf_get([Field, "honor_cipher_order"], Conf)}, + {server_name_indication, emqx_schema:conf_get([Field, "server_name_indication"], Conf)} + ]); + _ -> + [] + end. + +map_psk_ciphers(PSKCiphers) -> + lists:map( + fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; + ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; + ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; + ("PSK-RC4-SHA") -> {psk, rc4_128, sha} + end, PSKCiphers). + +ciphers(undefined, undefined, _) -> + undefined; +ciphers(TLSCiphers, undefined, _) -> + TLSCiphers; +ciphers(undefined, PSKCiphers, _) -> + map_psk_ciphers(PSKCiphers); +ciphers(_, _, Field) -> + error(Field ++ ".ciphers and " ++ Field ++ ".psk_ciphers cannot be configured at the same time"). + +user_lookup_fun(undefined) -> + undefined; +user_lookup_fun(_PSKCiphers) -> + {fun emqx_psk:lookup/3, <<>>}. + +tr_password_hash(Field, Conf) -> + case emqx_schema:conf_get([Field, "password_hash"], Conf) of + [Hash] -> list_to_atom(Hash); + [Prefix, Suffix] -> {list_to_atom(Prefix), list_to_atom(Suffix)}; + [Hash, MacFun, Iterations, Dklen] -> {list_to_atom(Hash), list_to_atom(MacFun), + list_to_integer(Iterations), list_to_integer(Dklen)}; + _ -> plain + end. + + %% @private return a list of keys in a parent field -spec(keys(string(), hocon:config()) -> [string()]). keys(Parent, Conf) -> [binary_to_list(B) || B <- maps:keys(conf_get(Parent, Conf, #{}))]. +-spec ceiling(float()) -> integer(). +ceiling(X) -> + T = erlang:trunc(X), + case (X - T) of + Neg when Neg < 0 -> T; + Pos when Pos > 0 -> T + 1; + _ -> T + end. + %% types -duration(type) -> duration(); -duration(_) -> undefined. +t(T) -> + fun (type) -> T; (_) -> undefined end. -flag(type) -> flag(); -flag(_) -> undefined. +t(T, M, D) -> + fun (type) -> T; (mapping) -> M; (default) -> D; (_) -> undefined end. -string(type) -> string(); -string(_) -> undefined. - -integer(type) -> integer(); -integer(_) -> undefined. - -bytesize(type) -> bytesize(); -bytesize(_) -> undefined. - -boolean(type) -> boolean(); -boolean(_) -> undefined. - -duration(M, D) -> - fun (type) -> duration(); (mapping) -> M; (default) -> D; (_) -> undefined end. -duration_s(M, D) -> - fun (type) -> duration_s(); (mapping) -> M; (default) -> D; (_) -> undefined end. -flag(M, D) -> - fun (type) -> flag(); (mapping) -> M; (default) -> D; (_) -> undefined end. -string(M, D) -> - fun (type) -> string(); (mapping) -> M; (default) -> D; (_) -> undefined end. -integer(M, D) -> - fun (type) -> integer(); (mapping) -> M; (default) -> D; (_) -> undefined end. -bytesize(M, D) -> - fun (type) -> bytesize(); (mapping) -> M; (default) -> D; (_) -> undefined end. -boolean(M, D) -> - fun (type) -> boolean(); (mapping) -> M; (default) -> D; (_) -> undefined end. -percent(M, D) -> - fun (type) -> percent(); (mapping) -> M; (default) -> D; (_) -> undefined end. +t(T, M, D, O) -> + fun (type) -> T; + (mapping) -> M; + (default) -> D; + (override_env) -> O; + (_) -> undefined end. ref(Field) -> fun (type) -> Field; (_) -> undefined end. @@ -1431,13 +1186,38 @@ to_flag(Str) -> {ok, hocon_postprocess:onoff(Str)}. to_duration(Str) -> - {ok, hocon_postprocess:duration(Str)}. + case hocon_postprocess:duration(Str) of + I when is_integer(I) -> {ok, I}; + _ -> {error, Str} + end. to_duration_s(Str) -> - {ok, hocon_postprocess:duration(Str) div 1000}. + case hocon_postprocess:duration(Str) of + I when is_integer(I) -> {ok, ceiling(I / 1000)}; + _ -> {error, Str} + end. to_bytesize(Str) -> - {ok, hocon_postprocess:bytesize(Str)}. + case hocon_postprocess:bytesize(Str) of + I when is_integer(I) -> {ok, I}; + _ -> {error, Str} + end. to_percent(Str) -> {ok, hocon_postprocess:percent(Str)}. + +to_comma_separated_list(Str) -> + {ok, string:tokens(Str, ", ")}. + +to_bar_separated_list(Str) -> + {ok, string:tokens(Str, "| ")}. + +to_ip_port(Str) -> + case string:tokens(Str, ":") of + [Ip, Port] -> + case inet:parse_address(Ip) of + {ok, R} -> {ok, {R, list_to_integer(Port)}}; + _ -> {error, Str} + end; + _ -> {error, Str} + end.