%%-*- mode: erlang -*- %% EMQ X R3.0 config mapping %%-------------------------------------------------------------------- %% Cluster %%-------------------------------------------------------------------- %% @doc Cluster name {mapping, "cluster.name", "ekka.cluster_name", [ {default, emqxcl}, {datatype, atom} ]}. %% @doc Cluster discovery {mapping, "cluster.discovery", "ekka.cluster_discovery", [ {default, manual}, {datatype, atom} ]}. %% @doc Clean down node from the cluster {mapping, "cluster.autoclean", "ekka.cluster_autoclean", [ {datatype, {duration, ms}} ]}. %% @doc Cluster autoheal {mapping, "cluster.autoheal", "ekka.cluster_autoheal", [ {datatype, flag}, {default, off} ]}. %%-------------------------------------------------------------------- %% Cluster by static node list {mapping, "cluster.static.seeds", "ekka.cluster_discovery", [ {datatype, string} ]}. %%-------------------------------------------------------------------- %% Cluster by UDP Multicast {mapping, "cluster.mcast.addr", "ekka.cluster_discovery", [ {default, "239.192.0.1"}, {datatype, string} ]}. {mapping, "cluster.mcast.ports", "ekka.cluster_discovery", [ {default, "4369"}, {datatype, string} ]}. {mapping, "cluster.mcast.iface", "ekka.cluster_discovery", [ {datatype, string}, {default, "0.0.0.0"} ]}. {mapping, "cluster.mcast.ttl", "ekka.cluster_discovery", [ {datatype, integer}, {default, 255} ]}. {mapping, "cluster.mcast.loop", "ekka.cluster_discovery", [ {datatype, flag}, {default, on} ]}. {mapping, "cluster.mcast.sndbuf", "ekka.cluster_discovery", [ {datatype, bytesize}, {default, "16KB"} ]}. {mapping, "cluster.mcast.recbuf", "ekka.cluster_discovery", [ {datatype, bytesize}, {default, "16KB"} ]}. {mapping, "cluster.mcast.buffer", "ekka.cluster_discovery", [ {datatype, bytesize}, {default, "32KB"} ]}. %%-------------------------------------------------------------------- %% Cluster by DNS A Record {mapping, "cluster.dns.name", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.dns.app", "ekka.cluster_discovery", [ {datatype, string} ]}. %%-------------------------------------------------------------------- %% Cluster using etcd {mapping, "cluster.etcd.server", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.etcd.prefix", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.etcd.node_ttl", "ekka.cluster_discovery", [ {datatype, {duration, ms}}, {default, "1m"} ]}. {mapping, "cluster.etcd.ssl.keyfile", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.etcd.ssl.certfile", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.etcd.ssl.cacertfile", "ekka.cluster_discovery", [ {datatype, string} ]}. %%-------------------------------------------------------------------- %% Cluster on K8s {mapping, "cluster.k8s.apiserver", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.k8s.service_name", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.k8s.address_type", "ekka.cluster_discovery", [ {datatype, {enum, [ip, dns, hostname]}} ]}. {mapping, "cluster.k8s.app_name", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.k8s.namespace", "ekka.cluster_discovery", [ {datatype, string} ]}. {mapping, "cluster.k8s.suffix", "ekka.cluster_discovery", [ {datatype, string}, {default, ""} ]}. {translation, "ekka.cluster_discovery", fun(Conf) -> Strategy = cuttlefish:conf_get("cluster.discovery", Conf), Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, IpPort = fun(S) -> [Addr, Port] = string:tokens(S, ":"), {ok, Ip} = inet:parse_address(Addr), {Ip, Port} end, Options = fun(static) -> [{seeds, [list_to_atom(S) || S <- string:tokens(cuttlefish:conf_get("cluster.static.seeds", Conf, ""), ",")]}]; (mcast) -> {ok, Addr} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.addr", Conf)), {ok, Iface} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.iface", Conf)), Ports = [list_to_integer(S) || S <- string:tokens(cuttlefish:conf_get("cluster.mcast.ports", Conf), ",")], [{addr, Addr}, {ports, Ports}, {iface, Iface}, {ttl, cuttlefish:conf_get("cluster.mcast.ttl", Conf, 1)}, {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}]; (dns) -> [{name, cuttlefish:conf_get("cluster.dns.name", Conf)}, {app, cuttlefish:conf_get("cluster.dns.app", Conf)}]; (etcd) -> SslOpts = fun(Conf) -> Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf), lists:map(fun({["cluster", "etcd", "ssl", Name], Value}) -> {list_to_atom(Name), Value} end, Options) end, [{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")}, {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")}, {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}, {ssl_options, SslOpts(Conf)}]; (k8s) -> [{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)}, {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, {address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)}, {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}, {namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}, {suffix, cuttlefish:conf_get("cluster.k8s.suffix", Conf, "")}]; (manual) -> [ ] end, {Strategy, Filter(Options(Strategy))} end}. %%-------------------------------------------------------------------- %% Node %%-------------------------------------------------------------------- %% @doc Node name {mapping, "node.name", "vm_args.-name", [ {default, "emqx@127.0.0.1"} ]}. %% @doc The erlang distributed protocol {mapping, "node.proto_dist", "vm_args.-proto_dist", [ %{default, "inet_tcp"}, {datatype, {enum, [inet_tcp, inet6_tcp, inet_tls]}}, hidden ]}. %% @doc Specify SSL Options in the file if using SSL for erlang distribution {mapping, "node.ssl_dist_optfile", "vm_args.-ssl_dist_optfile", [ {datatype, string}, hidden ]}. %% @doc Secret cookie for distributed erlang node {mapping, "node.cookie", "vm_args.-setcookie", [ {default, "emqxsecretcookie"} ]}. {mapping, "node.data_dir", "emqx.data_dir", [ {datatype, string} ]}. %% @doc http://erlang.org/doc/man/heart.html {mapping, "node.heartbeat", "vm_args.-heart", [ {datatype, flag}, hidden ]}. {translation, "vm_args.-heart", fun(Conf) -> case cuttlefish:conf_get("node.heartbeat", Conf) of true -> ""; false -> cuttlefish:invalid("should be 'on' or comment the line!") end end}. %% @doc More information at: http://erlang.org/doc/man/erl.html {mapping, "node.async_threads", "vm_args.+A", [ {default, 64}, {datatype, integer}, {validators, ["range:0-1024"]} ]}. %% @doc Erlang Process Limit {mapping, "node.process_limit", "vm_args.+P", [ {datatype, integer}, {default, 256000}, hidden ]}. %% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q %% @doc The number of concurrent ports/sockets %% Valid range is 1024-134217727 {mapping, "node.max_ports", cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [ {default, 262144}, {datatype, integer}, {validators, ["range4ports"]} ]}. {validator, "range4ports", "must be 1024 to 134217727", fun(X) -> X >= 1024 andalso X =< 134217727 end}. %% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl {mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ {datatype, bytesize}, {commented, "32MB"}, hidden, {validators, ["zdbbl_range"]} ]}. {translation, "vm_args.+zdbbl", fun(Conf) -> ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined), case ZDBBL of undefined -> undefined; X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; _ -> undefined end end}. {validator, "zdbbl_range", "must be between 1KB and 2097151KB", fun(ZDBBL) -> %% 2097151KB = 2147482624 ZDBBL >= 1024 andalso ZDBBL =< 2147482624 end }. %% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 {mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ {default, 1000}, {datatype, integer}, hidden, {validators, ["positive_integer"]} ]}. {validator, "positive_integer", "must be a positive integer", fun(X) -> X >= 0 end}. %% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, %% R16+ uses +e %% @doc The ETS table limit {mapping, "node.max_ets_tables", cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [ {default, 256000}, {datatype, integer}, hidden ]}. %% @doc Set the location of crash dumps {mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ {default, "{{crash_dump}}"}, {datatype, file}, hidden ]}. %% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime {mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ {commented, 60}, {datatype, integer}, hidden ]}. %% @doc http://www.erlang.org/doc/man/kernel_app.html {mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ {commented, 6369}, {datatype, integer}, hidden ]}. %% @see node.dist_listen_min {mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ {commented, 6369}, {datatype, integer}, hidden ]}. %% @see node.max_clients {mapping, "node.max_clients", "emqx.max_clients", [ {default, 1024000}, {datatype, integer}, hidden ]}. %%-------------------------------------------------------------------- %% RPC %%-------------------------------------------------------------------- %% RPC Mode. {mapping, "rpc.mode", "emqx.rpc_mode", [ {default, async}, {datatype, {enum, [sync, async]}} ]}. {mapping, "rpc.async_batch_size", "gen_rpc.max_batch_size", [ {default, 256}, {datatype, integer} ]}. %% RPC server port. {mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [ {default, 5369}, {datatype, integer} ]}. %% Default TCP port for outgoing connections {mapping, "rpc.tcp_client_port", "gen_rpc.tcp_client_port", [ {default, 5369}, {datatype, integer} ]}. %% Default TCP port for outgoing connections {mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [ {default, 32}, {datatype, integer}, {validators, ["range:gt_0_lt_256"]} ]}. %% Client connect timeout {mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [ {default, "5s"}, {datatype, {duration, ms}} ]}. %% Client and Server send timeout {mapping, "rpc.send_timeout", "gen_rpc.send_timeout", [ {default, 5000}, {datatype, {duration, ms}} ]}. %% Authentication timeout {mapping, "rpc.authentication_timeout", "gen_rpc.authentication_timeout", [ {default, 5000}, {datatype, {duration, ms}} ]}. %% Default receive timeout for call() functions {mapping, "rpc.call_receive_timeout", "gen_rpc.call_receive_timeout", [ {default, 15000}, {datatype, {duration, ms}} ]}. %% Socket keepalive configuration {mapping, "rpc.socket_keepalive_idle", "gen_rpc.socket_keepalive_idle", [ {default, 7200}, {datatype, {duration, s}} ]}. %% Seconds between probes {mapping, "rpc.socket_keepalive_interval", "gen_rpc.socket_keepalive_interval", [ {default, 75}, {datatype, {duration, s}} ]}. %% Probes lost to close the connection {mapping, "rpc.socket_keepalive_count", "gen_rpc.socket_keepalive_count", [ {default, 9}, {datatype, integer} ]}. %% Size of TCP send buffer {mapping, "rpc.socket_sndbuf", "gen_rpc.socket_sndbuf", [ {default, "1MB"}, {datatype, bytesize} ]}. %% Size of TCP receive buffer {mapping, "rpc.socket_recbuf", "gen_rpc.socket_recbuf", [ {default, "1MB"}, {datatype, bytesize} ]}. %% Size of TCP receive buffer {mapping, "rpc.socket_buffer", "gen_rpc.socket_buffer", [ {default, "1MB"}, {datatype, bytesize} ]}. {validator, "range:gt_0_lt_256", "must greater than 0 and less than 256", fun(X) -> X > 0 andalso X < 256 end }. %%-------------------------------------------------------------------- %% Log %%-------------------------------------------------------------------- {mapping, "log.to", "kernel.logger", [ {default, console}, {datatype, {enum, [off, file, console, both]}} ]}. {mapping, "log.level", "kernel.logger", [ {default, warning}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. {mapping, "log.primary_log_level", "kernel.logger_level", [ {default, warning}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} ]}. {mapping, "log.logger_sasl_compatible", "kernel.logger_sasl_compatible", [ {default, true}, {datatype, {enum, [true, false]}} ]}. {mapping, "log.dir", "kernel.logger", [ {default, "log"}, {datatype, string} ]}. {mapping, "log.file", "kernel.logger", [ {default, "emqx.log"}, {datatype, file} ]}. {mapping, "log.chars_limit", "kernel.logger", [ {default, -1}, {datatype, integer} ]}. {mapping, "log.rotation.size", "kernel.logger", [ {default, "10MB"}, {datatype, bytesize} ]}. {mapping, "log.rotation.count", "kernel.logger", [ {default, 5}, {datatype, integer} ]}. {mapping, "log.$level.file", "kernel.logger", [ {datatype, file} ]}. {mapping, "log.sasl", "sasl.sasl_error_logger", [ {default, off}, {datatype, flag}, hidden ]}. {mapping, "log.error_logger", "kernel.error_logger", [ {default, silent}, {datatype, {enum, [silent]}}, hidden ]}. %% disable lager {mapping, "lager.handlers", "lager.handlers", [ {default, []}, hidden ]}. {mapping, "lager.crash_log", "lager.crash_log", [ {default, off}, {datatype, flag}, hidden ]}. {translation, "kernel.logger_level", fun(_, _, Conf) -> cuttlefish:conf_get("log.level", Conf) end}. {translation, "kernel.logger", fun(Conf) -> LogTo = cuttlefish:conf_get("log.to", Conf), LogLevel = cuttlefish:conf_get("log.level", Conf), CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of -1 -> unlimited; V -> V end, Formatter = {emqx_logger_formatter, #{template => [time," [",level,"] ", {client_id, [{peername, [client_id,"@",peername," "], [client_id, " "]}], [{peername, [peername," "], []}]}, msg,"\n"], chars_limit => CharsLimit}}, FileConf = fun(Filename) -> #{type => wrap, file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename), max_no_files => cuttlefish:conf_get("log.rotation.count", Conf), max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)} end, %% For the default logger that outputs to console DefaultHandler = if LogTo =:= console orelse LogTo =:= both -> [{handler, default, logger_std_h, #{level => LogLevel, config => #{type => standard_io}, formatter => Formatter}}]; true -> [{handler, default, undefined}] end, %% For the file logger FileHandler = if LogTo =:= file orelse LogTo =:= both -> [{handler, file, logger_disk_log_h, #{level => LogLevel, config => FileConf(cuttlefish:conf_get("log.file", Conf)), formatter => Formatter, filesync_repeat_interval => no_repeat}}]; true -> [] end, %% For creating additional log files for specific log levels. AdditionalLogFiles = lists:foldl( fun({[_, Level, _] = K, Filename}, Acc) when LogTo =:= file; LogTo =:= both -> case cuttlefish_variable:is_fuzzy_match(K, ["log", "$level", "file"]) of true -> [{Level, Filename} | Acc]; false -> Acc end; ({_K, _V}, Acc) -> Acc end, [], Conf), AdditionalHandlers = [{handler, list_to_atom("file_for_"++Level), logger_disk_log_h, #{level => list_to_atom(Level), config => FileConf(Filename), formatter => Formatter, filesync_repeat_interval => no_repeat}} || {Level, Filename} <- AdditionalLogFiles], DefaultHandler ++ FileHandler ++ AdditionalHandlers end}. %%-------------------------------------------------------------------- %% Authentication/ACL %%-------------------------------------------------------------------- %% @doc Allow anonymous authentication. {mapping, "allow_anonymous", "emqx.allow_anonymous", [ {default, false}, {datatype, {enum, [true, false]}} ]}. %% @doc ACL nomatch. {mapping, "acl_nomatch", "emqx.acl_nomatch", [ {default, deny}, {datatype, {enum, [allow, deny]}} ]}. %% @doc Default ACL file. {mapping, "acl_file", "emqx.acl_file", [ {datatype, string}, hidden ]}. %% @doc Enable ACL cache for publish. {mapping, "enable_acl_cache", "emqx.enable_acl_cache", [ {default, on}, {datatype, flag} ]}. %% @doc ACL cache time-to-live. {mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [ {default, "1m"}, {datatype, {duration, ms}} ]}. %% @doc ACL cache size. {mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [ {default, 32}, {datatype, integer}, {validators, ["range:gt_0"]} ]}. %% @doc Action when acl check reject current operation {mapping, "acl_deny_action", "emqx.acl_deny_action", [ {default, ignore}, {datatype, {enum, [ignore, disconnect]}} ]}. %% @doc time interval to clean flapping records {mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [ {datatype, {duration, ms}} ]}. {validator, "range:gt_0", "must greater than 0", fun(X) -> X > 0 end }. %%-------------------------------------------------------------------- %% MQTT Protocol %%-------------------------------------------------------------------- %% @doc Max Packet Size Allowed, 1MB by default. {mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [ {default, "1MB"}, {datatype, bytesize} ]}. %% @doc Set the Max ClientId Length Allowed. {mapping, "mqtt.max_clientid_len", "emqx.max_clientid_len", [ {default, 65535}, {datatype, integer} ]}. %% @doc Set the Maximum topic levels. {mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [ {default, 0}, {datatype, integer} ]}. %% @doc Set the Maximum QoS allowed. {mapping, "mqtt.max_qos_allowed", "emqx.max_qos_allowed", [ {default, 2}, {datatype, integer}, {validators, ["range:0-2"]} ]}. %% @doc Set the Maximum topic alias. {mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [ {default, 0}, {datatype, integer} ]}. %% @doc Whether the server supports MQTT retained messages. {mapping, "mqtt.retain_available", "emqx.mqtt_retain_available", [ {default, true}, {datatype, {enum, [true, false]}} ]}. %% @doc Whether the Server supports MQTT Wildcard Subscriptions. {mapping, "mqtt.wildcard_subscription", "emqx.mqtt_wildcard_subscription", [ {default, true}, {datatype, {enum, [true, false]}} ]}. %% @doc Whether the Server supports MQTT Shared Subscriptions. {mapping, "mqtt.shared_subscription", "emqx.mqtt_shared_subscription", [ {default, true}, {datatype, {enum, [true, false]}} ]}. %% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1) {mapping, "mqtt.ignore_loop_deliver", "emqx.ignore_loop_deliver", [ {default, true}, {datatype, {enum, [true, false]}} ]}. %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- %% @doc Idle timeout of the MQTT connection. {mapping, "zone.$name.idle_timeout", "emqx.zones", [ {default, "15s"}, {datatype, {duration, ms}} ]}. {mapping, "zone.$name.allow_anonymous", "emqx.zones", [ {datatype, {enum, [true, false]}} ]}. {mapping, "zone.$name.acl_nomatch", "emqx.zones", [ {datatype, {enum, [allow, deny]}} ]}. %% @doc Enable ACL check. {mapping, "zone.$name.enable_acl", "emqx.zones", [ {default, off}, {datatype, flag} ]}. %% @doc Action when acl check reject current operation {mapping, "zone.$name.acl_deny_action", "emqx.zones", [ {default, ignore}, {datatype, {enum, [ignore, disconnect]}} ]}. %% @doc Enable Ban. {mapping, "zone.$name.enable_ban", "emqx.zones", [ {default, off}, {datatype, flag} ]}. %% @doc Enable per connection statistics. {mapping, "zone.$name.enable_stats", "emqx.zones", [ {default, off}, {datatype, flag} ]}. %% @doc Publish limit of the MQTT connections. {mapping, "zone.$name.publish_limit", "emqx.zones", [ {datatype, string} ]}. %% @doc Max Packet Size Allowed, 64K by default. {mapping, "zone.$name.max_packet_size", "emqx.zones", [ {datatype, bytesize} ]}. %% @doc Set the Max ClientId Length Allowed. {mapping, "zone.$name.max_clientid_len", "emqx.zones", [ {datatype, integer} ]}. %% @doc Set the Maximum topic levels. {mapping, "zone.$name.max_topic_levels", "emqx.zones", [ {datatype, integer} ]}. %% @doc Set the Maximum QoS allowed. {mapping, "zone.$name.max_qos_allowed", "emqx.zones", [ {datatype, integer}, {validators, ["range:0-2"]} ]}. %% @doc Set the Maximum topic alias. {mapping, "zone.$name.max_topic_alias", "emqx.zones", [ {datatype, integer} ]}. %% @doc Whether the server supports retained messages. {mapping, "zone.$name.retain_available", "emqx.zones", [ {datatype, {enum, [true, false]}} ]}. %% @doc Whether the Server supports Wildcard Subscriptions. {mapping, "zone.$name.wildcard_subscription", "emqx.zones", [ {datatype, {enum, [true, false]}} ]}. %% @doc Whether the Server supports Shared Subscriptions. {mapping, "zone.$name.shared_subscription", "emqx.zones", [ {datatype, {enum, [true, false]}} ]}. %% @doc Server Keepalive {mapping, "zone.$name.server_keepalive", "emqx.zones", [ {datatype, integer} ]}. %% @doc Keepalive backoff {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ {default, 0.75}, {datatype, float} ]}. %% @doc Max Number of Subscriptions Allowed. {mapping, "zone.$name.max_subscriptions", "emqx.zones", [ {default, 0}, {datatype, integer} ]}. %% @doc Upgrade QoS according to subscription? {mapping, "zone.$name.upgrade_qos", "emqx.zones", [ {default, off}, {datatype, flag} ]}. %% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. %% 0 means no limit {mapping, "zone.$name.max_inflight", "emqx.zones", [ {default, 0}, {datatype, integer} ]}. %% @doc Retry interval for redelivering QoS1/2 messages. {mapping, "zone.$name.retry_interval", "emqx.zones", [ {default, "20s"}, {datatype, {duration, ms}} ]}. %% @doc Max Packets that Awaiting PUBREL, 0 means no limit {mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [ {default, 0}, {datatype, integer} ]}. %% @doc Awaiting PUBREL timeout {mapping, "zone.$name.await_rel_timeout", "emqx.zones", [ {default, "300s"}, {datatype, {duration, ms}} ]}. %% @doc Ignore loop delivery of messages {mapping, "zone.$name.ignore_loop_deliver", "emqx.zones", [ {datatype, {enum, [true, false]}} ]}. %% @doc Session Expiry Interval {mapping, "zone.$name.session_expiry_interval", "emqx.zones", [ {default, "2h"}, {datatype, {duration, s}} ]}. %% @doc Max queue length. Enqueued messages when persistent client %% disconnected, or inflight window is full. 0 means no limit. {mapping, "zone.$name.max_mqueue_len", "emqx.zones", [ {default, 1000}, {datatype, integer} ]}. %% @doc Topic Priorities, comma separated topic=priority pairs, %% where priority should be integer in range 1-255 (inclusive) %% 1 being the lowest and 255 being the highest. %% default value `none` to indicate no priority table, hence all %% messages are treated equal, which means either highest ('infinity'), %% or lowest (0) depending on mqueue_default_priority config. {mapping, "zone.$name.mqueue_priorities", "emqx.zones", [ {default, "none"}, {datatype, string} ]}. %% @doc Default priority for topics not in priority table. {mapping, "zone.$name.mqueue_default_priority", "emqx.zones", [ {default, lowest}, {datatype, {enum, [highest, lowest]}} ]}. %% @doc Queue Qos0 messages? {mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [ {default, true}, {datatype, {enum, [true, false]}} ]}. {mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [ {datatype, flag} ]}. {mapping, "zone.$name.flapping_threshold", "emqx.zones", [ {datatype, string} ]}. {mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [ {datatype, {duration, s}} ]}. %% @doc Force connection/session process GC after this number of %% messages | bytes passed through. %% Numbers delimited by `|'. Zero or negative is to disable. {mapping, "zone.$name.force_gc_policy", "emqx.zones", [ {default, "0 | 0MB"}, {datatype, string} ]}. %% @doc Max message queue length and total heap size to force shutdown %% connection/session process. %% Message queue here is the Erlang process mailbox, but not the number %% of queued MQTT messages of QoS 1 and 2. %% Zero or negative is to disable. {mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ {default, "default"}, {datatype, string} ]}. {mapping, "zone.$name.mountpoint", "emqx.zones", [ {datatype, string} ]}. %% @doc Use username replace client id {mapping, "zone.$name.use_username_as_clientid", "emqx.zones", [ {default, false}, {datatype, {enum, [true, false]}} ]}. {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {mqtt_retain_available, Val}; ("flapping_threshold", Val) -> [Limit, Duration] = string:tokens(Val, ", "), FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of Min when is_integer(Min) -> {list_to_integer(Limit), Min}; {error, Reason} -> error(Reason) end, {flapping_threshold, FlappingThreshold}; ("wildcard_subscription", Val) -> {mqtt_wildcard_subscription, Val}; ("shared_subscription", Val) -> {mqtt_shared_subscription, Val}; ("publish_limit", Val) -> [Limit, Duration] = string:tokens(Val, ", "), PubLimit = case cuttlefish_duration:parse(Duration, s) of Secs when is_integer(Secs) -> {list_to_integer(Limit) / Secs, list_to_integer(Limit)}; {error, Reason} -> error(Reason) end, {publish_limit, PubLimit}; ("force_gc_policy", Val) -> [Count, Bytes] = string:tokens(Val, "| "), GcPolicy = case cuttlefish_bytesize:parse(Bytes) of {error, Reason} -> error(Reason); Bytes1 -> #{bytes => Bytes1, count => list_to_integer(Count)} end, {force_gc_policy, GcPolicy}; ("force_shutdown_policy", "default") -> {DefaultLen, DefaultSize} = case erlang:system_info(wordsize) of 8 -> % arch_64 {8000, cuttlefish_bytesize:parse("800MB")}; 4 -> % arch_32 {1000, cuttlefish_bytesize:parse("100MB")} end, {force_shutdown_policy, #{message_queue_len => DefaultLen, max_heap_size => DefaultSize}}; ("force_shutdown_policy", Val) -> [Len, Siz] = string:tokens(Val, "| "), MaxSiz = case erlang:system_info(wordsize) of 8 -> % arch_64 (1 bsl 59) - 1; 4 -> % arch_32 (1 bsl 27) - 1 end, ShutdownPolicy = case cuttlefish_bytesize:parse(Siz) of {error, Reason} -> error(Reason); Siz1 when Siz1 > MaxSiz -> cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); Siz1 -> #{message_queue_len => list_to_integer(Len), max_heap_size => Siz1} end, {force_shutdown_policy, ShutdownPolicy}; ("mqueue_priorities", Val) -> case Val of "none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE _ -> MqueuePriorities = lists:foldl(fun(T, Acc) -> %% NOTE: space in "= " is intended [Topic, Prio] = string:tokens(T, "= "), P = list_to_integer(Prio), (P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}), maps:put(iolist_to_binary(Topic), P, Acc) end, #{}, string:tokens(Val, ",")), {mqueue_priorities, MqueuePriorities} end; ("mountpoint", Val) -> {mountpoint, iolist_to_binary(Val)}; (Opt, Val) -> {list_to_atom(Opt), Val} end, maps:to_list( lists:foldl( fun({["zone", Name, Opt], Val}, Zones) -> maps:update_with(list_to_atom(Name), fun(Opts) -> [Mapping(Opt, Val)|Opts] end, [Mapping(Opt, Val)], Zones) end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf)))) end}. %%-------------------------------------------------------------------- %% Listeners %%-------------------------------------------------------------------- %%-------------------------------------------------------------------- %% TCP Listeners {mapping, "listener.tcp.$name", "emqx.listeners", [ {datatype, [integer, ip]} ]}. {mapping, "listener.tcp.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} ]}. {mapping, "listener.tcp.$name.max_connections", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. {mapping, "listener.tcp.$name.max_conn_rate", "emqx.listeners", [ {datatype, integer} ]}. {mapping, "listener.tcp.$name.active_n", "emqx.listeners", [ {default, 100}, {datatype, integer} ]}. {mapping, "listener.tcp.$name.zone", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.tcp.$name.rate_limit", "emqx.listeners", [ {default, undefined}, {datatype, string} ]}. {mapping, "listener.tcp.$name.access.$id", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.tcp.$name.proxy_protocol", "emqx.listeners", [ {datatype, flag} ]}. {mapping, "listener.tcp.$name.proxy_protocol_timeout", "emqx.listeners", [ {datatype, {duration, ms}} ]}. {mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [ {datatype, {enum, [cn, dn, crt]}} ]}. {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [ {datatype, integer}, {default, 1024} ]}. {mapping, "listener.tcp.$name.send_timeout", "emqx.listeners", [ {datatype, {duration, ms}}, {default, "15s"} ]}. {mapping, "listener.tcp.$name.send_timeout_close", "emqx.listeners", [ {datatype, flag}, {default, on} ]}. {mapping, "listener.tcp.$name.recbuf", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.tcp.$name.sndbuf", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.tcp.$name.buffer", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden ]}. {mapping, "listener.tcp.$name.nodelay", "emqx.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. {mapping, "listener.tcp.$name.reuseaddr", "emqx.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. %%-------------------------------------------------------------------- %% SSL Listeners {mapping, "listener.ssl.$name", "emqx.listeners", [ {datatype, [integer, ip]} ]}. {mapping, "listener.ssl.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} ]}. {mapping, "listener.ssl.$name.max_connections", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. {mapping, "listener.ssl.$name.max_conn_rate", "emqx.listeners", [ {datatype, integer} ]}. {mapping, "listener.ssl.$name.active_n", "emqx.listeners", [ {default, 100}, {datatype, integer} ]}. {mapping, "listener.ssl.$name.zone", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.rate_limit", "emqx.listeners", [ {default, undefined}, {datatype, string} ]}. {mapping, "listener.ssl.$name.access.$id", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.proxy_protocol", "emqx.listeners", [ {datatype, flag} ]}. {mapping, "listener.ssl.$name.proxy_protocol_timeout", "emqx.listeners", [ {datatype, {duration, ms}} ]}. {mapping, "listener.ssl.$name.backlog", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. {mapping, "listener.ssl.$name.send_timeout", "emqx.listeners", [ {datatype, {duration, ms}}, {default, "15s"} ]}. {mapping, "listener.ssl.$name.send_timeout_close", "emqx.listeners", [ {datatype, flag}, {default, on} ]}. {mapping, "listener.ssl.$name.recbuf", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.ssl.$name.sndbuf", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.ssl.$name.buffer", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden ]}. {mapping, "listener.ssl.$name.nodelay", "emqx.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. {mapping, "listener.ssl.$name.reuseaddr", "emqx.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. {mapping, "listener.ssl.$name.tls_versions", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.ciphers", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.psk_ciphers", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.handshake_timeout", "emqx.listeners", [ {default, "15s"}, {datatype, {duration, ms}} ]}. {mapping, "listener.ssl.$name.dhfile", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.keyfile", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.certfile", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.cacertfile", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ssl.$name.verify", "emqx.listeners", [ {datatype, atom} ]}. {mapping, "listener.ssl.$name.fail_if_no_peer_cert", "emqx.listeners", [ {datatype, {enum, [true, false]}} ]}. {mapping, "listener.ssl.$name.secure_renegotiate", "emqx.listeners", [ {datatype, flag} ]}. {mapping, "listener.ssl.$name.reuse_sessions", "emqx.listeners", [ {default, on}, {datatype, flag} ]}. {mapping, "listener.ssl.$name.honor_cipher_order", "emqx.listeners", [ {datatype, flag} ]}. {mapping, "listener.ssl.$name.peer_cert_as_username", "emqx.listeners", [ {datatype, {enum, [cn, dn, crt]}} ]}. %%-------------------------------------------------------------------- %% MQTT/WebSocket Listeners {mapping, "listener.ws.$name", "emqx.listeners", [ {datatype, [integer, ip]} ]}. {mapping, "listener.ws.$name.mqtt_path", "emqx.listeners", [ {default, "/mqtt"}, {datatype, string} ]}. {mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} ]}. {mapping, "listener.ws.$name.max_connections", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. {mapping, "listener.ws.$name.max_conn_rate", "emqx.listeners", [ {datatype, integer} ]}. {mapping, "listener.ws.$name.zone", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ws.$name.rate_limit", "emqx.listeners", [ {default, undefined}, {datatype, string} ]}. {mapping, "listener.ws.$name.access.$id", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.ws.$name.verify_protocol_header", "emqx.listeners", [ {default, on}, {datatype, flag} ]}. {mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [ {datatype, string}, hidden ]}. {mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [ {datatype, string}, hidden ]}. {mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [ {datatype, flag} ]}. {mapping, "listener.ws.$name.proxy_protocol_timeout", "emqx.listeners", [ {datatype, {duration, ms}} ]}. {mapping, "listener.ws.$name.backlog", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. {mapping, "listener.ws.$name.send_timeout", "emqx.listeners", [ {datatype, {duration, ms}}, {default, "15s"} ]}. {mapping, "listener.ws.$name.send_timeout_close", "emqx.listeners", [ {datatype, flag}, {default, on} ]}. {mapping, "listener.ws.$name.recbuf", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.ws.$name.sndbuf", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.ws.$name.buffer", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.ws.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden ]}. {mapping, "listener.ws.$name.nodelay", "emqx.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. {mapping, "listener.ws.$name.compress", "emqx.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. {mapping, "listener.ws.$name.deflate_opts.level", "emqx.listeners", [ {datatype, {enum, [none, default, best_compression, best_speed]}}, hidden ]}. {mapping, "listener.ws.$name.deflate_opts.mem_level", "emqx.listeners", [ {datatype, integer}, {validators, ["range:1-9"]}, hidden ]}. {mapping, "listener.ws.$name.deflate_opts.strategy", "emqx.listeners", [ {datatype, {enum, [default, filtered, huffman_only, rle]}}, hidden ]}. {mapping, "listener.ws.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ {datatype, {enum, [takeover, no_takeover]}}, hidden ]}. {mapping, "listener.ws.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ {datatype, {enum, [takeover, no_takeover]}}, hidden ]}. {mapping, "listener.ws.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ {datatype, integer}, hidden ]}. {mapping, "listener.ws.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ {datatype, integer}, hidden ]}. {mapping, "listener.ws.$name.idle_timeout", "emqx.listeners", [ {datatype, {duration, ms}}, hidden ]}. {mapping, "listener.ws.$name.max_frame_size", "emqx.listeners", [ {datatype, integer}, hidden ]}. %%-------------------------------------------------------------------- %% MQTT/WebSocket/SSL Listeners {mapping, "listener.wss.$name", "emqx.listeners", [ {datatype, [integer, ip]} ]}. {mapping, "listener.wss.$name.mqtt_path", "emqx.listeners", [ {default, "/mqtt"}, {datatype, string} ]}. {mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} ]}. {mapping, "listener.wss.$name.max_connections", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. {mapping, "listener.wss.$name.max_conn_rate", "emqx.listeners", [ {datatype, integer} ]}. {mapping, "listener.wss.$name.zone", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.rate_limit", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.verify_protocol_header", "emqx.listeners", [ {default, on}, {datatype, flag} ]}. {mapping, "listener.wss.$name.access.$id", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [ {datatype, string}, hidden ]}. {mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [ {datatype, string}, hidden ]}. {mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [ {datatype, flag} ]}. {mapping, "listener.wss.$name.proxy_protocol_timeout", "emqx.listeners", [ {datatype, {duration, ms}} ]}. %%{mapping, "listener.wss.$name.handshake_timeout", "emqx.listeners", [ %% {default, "15s"}, %% {datatype, {duration, ms}} %%]}. {mapping, "listener.wss.$name.backlog", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. {mapping, "listener.wss.$name.send_timeout", "emqx.listeners", [ {datatype, {duration, ms}}, {default, "15s"} ]}. {mapping, "listener.wss.$name.send_timeout_close", "emqx.listeners", [ {datatype, flag}, {default, on} ]}. {mapping, "listener.wss.$name.recbuf", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.wss.$name.sndbuf", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.wss.$name.buffer", "emqx.listeners", [ {datatype, bytesize}, hidden ]}. {mapping, "listener.wss.$name.tune_buffer", "emqx.listeners", [ {datatype, flag}, hidden ]}. {mapping, "listener.wss.$name.nodelay", "emqx.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. {mapping, "listener.wss.$name.tls_versions", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.ciphers", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.psk_ciphers", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.keyfile", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.certfile", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.cacertfile", "emqx.listeners", [ {datatype, string} ]}. {mapping, "listener.wss.$name.verify", "emqx.listeners", [ {datatype, atom} ]}. {mapping, "listener.wss.$name.fail_if_no_peer_cert", "emqx.listeners", [ {datatype, {enum, [true, false]}} ]}. {mapping, "listener.wss.$name.secure_renegotiate", "emqx.listeners", [ {datatype, flag} ]}. {mapping, "listener.wss.$name.reuse_sessions", "emqx.listeners", [ {default, on}, {datatype, flag} ]}. {mapping, "listener.wss.$name.honor_cipher_order", "emqx.listeners", [ {datatype, flag} ]}. {mapping, "listener.wss.$name.peer_cert_as_username", "emqx.listeners", [ {datatype, {enum, [cn, dn, crt]}} ]}. {mapping, "listener.wss.$name.compress", "emqx.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. {mapping, "listener.wss.$name.deflate_opts.level", "emqx.listeners", [ {datatype, {enum, [none, default, best_compression, best_speed]}}, hidden ]}. {mapping, "listener.wss.$name.deflate_opts.mem_level", "emqx.listeners", [ {datatype, integer}, {validators, ["range:1-9"]}, hidden ]}. {mapping, "listener.wss.$name.deflate_opts.strategy", "emqx.listeners", [ {datatype, {enum, [default, filtered, huffman_only, rle]}}, hidden ]}. {mapping, "listener.wss.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ {datatype, {enum, [takeover, no_takeover]}}, hidden ]}. {mapping, "listener.wss.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ {datatype, {enum, [takeover, no_takeover]}}, hidden ]}. {mapping, "listener.wss.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ {datatype, integer}, {validators, ["range:8-15"]}, hidden ]}. {mapping, "listener.wss.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ {datatype, integer}, {validators, ["range:8-15"]}, hidden ]}. {mapping, "listener.wss.$name.idle_timeout", "emqx.listeners", [ {datatype, {duration, ms}}, hidden ]}. {mapping, "listener.wss.$name.max_frame_size", "emqx.listeners", [ {datatype, integer}, hidden ]}. {translation, "emqx.listeners", fun(Conf) -> Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end, Access = fun(S) -> [A, CIDR] = string:tokens(S, " "), {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} end, AccOpts = fun(Prefix) -> case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of [] -> []; Rules -> [{access_rules, [Access(Rule) || {_, Rule} <- Rules]}] end end, Ratelimit = fun(undefined) -> undefined; (S) -> list_to_tuple([list_to_integer(Token) || Token <- string:tokens(S, ",")]) end, LisOpts = fun(Prefix) -> Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, {rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))}, {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)}, {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)}, {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}, {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)}, {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)]) end, DeflateOpts = fun(Prefix) -> Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)}, {mem_level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.mem_level", Conf, undefined)}, {strategy, cuttlefish:conf_get(Prefix ++ ".deflate_opts.strategy", Conf, undefined)}, {server_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf, undefined)}, {client_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf, undefined)}, {server_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf, undefined)}, {client_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf, undefined)}]) end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) end, SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, MapPSKCiphers = fun(PSKCiphers) -> lists:map( fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; ("PSK-RC4-SHA") -> {psk, rc4_128, sha} end, PSKCiphers) end, SslOpts = fun(Prefix) -> Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of undefined -> undefined; L -> [list_to_atom(V) || V <- L] end, TLSCiphers = cuttlefish:conf_get(Prefix++".ciphers", Conf, undefined), PSKCiphers = cuttlefish:conf_get(Prefix++".psk_ciphers", Conf, undefined), Ciphers = case {TLSCiphers, PSKCiphers} of {undefined, undefined} -> cuttlefish:invalid(Prefix++".ciphers or "++Prefix++".psk_ciphers is absent"); {TLSCiphers, undefined} -> SplitFun(TLSCiphers); {undefined, PSKCiphers} -> MapPSKCiphers(SplitFun(PSKCiphers)); {_TLSCiphers, _PSKCiphers} -> cuttlefish:invalid(Prefix++".ciphers and "++Prefix++".psk_ciphers cannot be configured at the same time") end, UserLookupFun = case PSKCiphers of undefined -> undefined; _ -> {fun emqx_psk:lookup/3, <<>>} end, Filter([{versions, Versions}, {ciphers, Ciphers}, {user_lookup_fun, UserLookupFun}, {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}, {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)}, {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)}, {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}]) end, TcpListeners = fun(Type, Name) -> Prefix = string:join(["listener", Type, Name], "."), case cuttlefish:conf_get(Prefix, Conf, undefined) of undefined -> []; ListenOn -> [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, {tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] end end, SslListeners = fun(Type, Name) -> Prefix = string:join(["listener", Type, Name], "."), case cuttlefish:conf_get(Prefix, Conf, undefined) of undefined -> []; ListenOn -> [{Atom(Type), ListenOn, [{deflate_options, DeflateOpts(Prefix)}, {tcp_options, TcpOpts(Prefix)}, {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}] end end, lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) end}. %%-------------------------------------------------------------------- %% Modules %%-------------------------------------------------------------------- {mapping, "module.presence", "emqx.modules", [ {default, off}, {datatype, flag} ]}. {mapping, "module.presence.qos", "emqx.modules", [ {default, 1}, {datatype, integer}, {validators, ["range:0-2"]} ]}. {mapping, "module.subscription", "emqx.modules", [ {default, off}, {datatype, flag} ]}. {mapping, "module.subscription.$id.topic", "emqx.modules", [ {datatype, string} ]}. {mapping, "module.subscription.$id.qos", "emqx.modules", [ {default, 1}, {datatype, integer}, {validators, ["range:0-2"]} ]}. {mapping, "module.rewrite", "emqx.modules", [ {default, off}, {datatype, flag} ]}. {mapping, "module.rewrite.rule.$id", "emqx.modules", [ {datatype, string} ]}. {translation, "emqx.modules", fun(Conf) -> Subscriptions = fun() -> List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), QosList = [Qos || {_, Qos} <- lists:sort([{I, Qos} || {[_,"subscription", I,"qos"], Qos} <- List])], TopicList = [iolist_to_binary(Topic) || {_, Topic} <- lists:sort([{I, Topic} || {[_,"subscription", I, "topic"], Topic} <- List])], lists:zip(TopicList, QosList) end, Rewrites = fun() -> Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf), lists:map(fun({[_, "rewrite", "rule", I], Rule}) -> [Topic, Re, Dest] = string:tokens(Rule, " "), {rewrite, list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} end, Rules) end, lists:append([ case cuttlefish:conf_get("module.presence", Conf) of %% Presence true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}]; false -> [] end, case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription true -> [{emqx_mod_subscription, Subscriptions()}]; false -> [] end, case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite true -> [{emqx_mod_rewrite, Rewrites()}]; false -> [] end ]) end}. %%------------------------------------------------------------------- %% Plugins %%------------------------------------------------------------------- {mapping, "plugins.etc_dir", "emqx.plugins_etc_dir", [ {datatype, string} ]}. {mapping, "plugins.loaded_file", "emqx.plugins_loaded_file", [ {datatype, string} ]}. {mapping, "plugins.expand_plugins_dir", "emqx.expand_plugins_dir", [ {datatype, string} ]}. %%-------------------------------------------------------------------- %% Broker %%-------------------------------------------------------------------- {mapping, "broker.sys_interval", "emqx.broker_sys_interval", [ {datatype, {duration, ms}}, {default, "1m"} ]}. {mapping, "broker.sys_heartbeat", "emqx.broker_sys_heartbeat", [ {datatype, {duration, ms}}, {default, "30s"} ]}. {mapping, "broker.enable_session_registry", "emqx.enable_session_registry", [ {default, on}, {datatype, flag} ]}. {mapping, "broker.session_locking_strategy", "emqx.session_locking_strategy", [ {default, quorum}, {datatype, {enum, [local,one,quorum,all]}} ]}. %% @doc Shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ {default, round_robin}, {datatype, {enum, [random, %% randomly pick a subscriber round_robin, %% round robin alive subscribers one message after another sticky, %% pick a random subscriber and stick to it hash %% hash client ID to a group member ]}} ]}. %% @doc Enable or disable shared dispatch acknowledgement for QoS1 and QoS2 messages {mapping, "broker.shared_dispatch_ack_enabled", "emqx.shared_dispatch_ack_enabled", [ {default, false}, {datatype, {enum, [true, false]}} ]}. {mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [ {default, on}, {datatype, flag} ]}. %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- %% @doc Long GC, don't monitor in production mode for: %% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 {mapping, "sysmon.long_gc", "emqx.sysmon", [ {default, 0}, {datatype, [integer, {duration, ms}]} ]}. %% @doc Long Schedule(ms) {mapping, "sysmon.long_schedule", "emqx.sysmon", [ {default, 240}, {datatype, [integer, {duration, ms}]} ]}. %% @doc Large Heap {mapping, "sysmon.large_heap", "emqx.sysmon", [ {default, "8MB"}, {datatype, bytesize} ]}. %% @doc Monitor Busy Port {mapping, "sysmon.busy_port", "emqx.sysmon", [ {default, false}, {datatype, {enum, [true, false]}} ]}. %% @doc Monitor Busy Dist Port {mapping, "sysmon.busy_dist_port", "emqx.sysmon", [ {default, true}, {datatype, {enum, [true, false]}} ]}. {translation, "emqx.sysmon", fun(Conf) -> Configs = cuttlefish_variable:filter_by_prefix("sysmon", Conf), [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- %% Operating System Monitor %%-------------------------------------------------------------------- {mapping, "os_mon.cpu_check_interval", "emqx.os_mon", [ {default, 60}, {datatype, {duration, s}} ]}. {mapping, "os_mon.cpu_high_watermark", "emqx.os_mon", [ {default, "80%"}, {datatype, {percent, float}} ]}. {mapping, "os_mon.cpu_low_watermark", "emqx.os_mon", [ {default, "60%"}, {datatype, {percent, float}} ]}. {mapping, "os_mon.mem_check_interval", "emqx.os_mon", [ {default, 60}, {datatype, {duration, s}} ]}. {mapping, "os_mon.sysmem_high_watermark", "emqx.os_mon", [ {default, "70%"}, {datatype, {percent, float}} ]}. {mapping, "os_mon.procmem_high_watermark", "emqx.os_mon", [ {default, "5%"}, {datatype, {percent, float}} ]}. {translation, "emqx.os_mon", fun(Conf) -> Configs = cuttlefish_variable:filter_by_prefix("os_mon", Conf), [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}. %%-------------------------------------------------------------------- %% VM Monitor %%-------------------------------------------------------------------- {mapping, "vm_mon.check_interval", "emqx.vm_mon", [ {default, 30}, {datatype, {duration, s}} ]}. {mapping, "vm_mon.process_high_watermark", "emqx.vm_mon", [ {default, "80%"}, {datatype, {percent, float}} ]}. {mapping, "vm_mon.process_low_watermark", "emqx.vm_mon", [ {default, "60%"}, {datatype, {percent, float}} ]}. {translation, "emqx.vm_mon", fun(Conf) -> Configs = cuttlefish_variable:filter_by_prefix("vm_mon", Conf), [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] end}.