From 5aa4565e84a2f300fc77e92021c82bb59abea3eb Mon Sep 17 00:00:00 2001 From: Turtle Date: Thu, 10 Jun 2021 11:29:31 +0800 Subject: [PATCH] chore: remove lager schema info --- priv/emqx.schema | 2470 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 2470 insertions(+) create mode 100644 priv/emqx.schema diff --git a/priv/emqx.schema b/priv/emqx.schema new file mode 100644 index 000000000..2d67c9677 --- /dev/null +++ b/priv/emqx.schema @@ -0,0 +1,2470 @@ +%%-*- mode: erlang -*- +%% EMQ X R4.0 config mapping + +%%-------------------------------------------------------------------- +%% Cluster +%%-------------------------------------------------------------------- + +%% @doc Cluster name +{mapping, "cluster.name", "ekka.cluster_name", [ + {default, emqxcl}, + {datatype, atom} +]}. + +%% @doc Cluster discovery +{mapping, "cluster.discovery", "ekka.cluster_discovery", [ + {default, manual}, + {datatype, atom} +]}. + +%% @doc Clean down node from the cluster +{mapping, "cluster.autoclean", "ekka.cluster_autoclean", [ + {datatype, {duration, ms}} +]}. + +%% @doc Cluster autoheal +{mapping, "cluster.autoheal", "ekka.cluster_autoheal", [ + {datatype, flag}, + {default, off} +]}. + +%%-------------------------------------------------------------------- +%% Cluster by static node list + +{mapping, "cluster.static.seeds", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% Cluster by UDP Multicast + +{mapping, "cluster.mcast.addr", "ekka.cluster_discovery", [ + {default, "239.192.0.1"}, + {datatype, string} +]}. + +{mapping, "cluster.mcast.ports", "ekka.cluster_discovery", [ + {default, "4369"}, + {datatype, string} +]}. + +{mapping, "cluster.mcast.iface", "ekka.cluster_discovery", [ + {datatype, string}, + {default, "0.0.0.0"} +]}. + +{mapping, "cluster.mcast.ttl", "ekka.cluster_discovery", [ + {datatype, integer}, + {default, 255} +]}. + +{mapping, "cluster.mcast.loop", "ekka.cluster_discovery", [ + {datatype, flag}, + {default, on} +]}. + +{mapping, "cluster.mcast.sndbuf", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "16KB"} +]}. + +{mapping, "cluster.mcast.recbuf", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "16KB"} +]}. + +{mapping, "cluster.mcast.buffer", "ekka.cluster_discovery", [ + {datatype, bytesize}, + {default, "32KB"} +]}. + +%%-------------------------------------------------------------------- +%% Cluster by DNS A Record + +{mapping, "cluster.dns.name", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +%% @doc The erlang distributed protocol +{mapping, "cluster.proto_dist", "ekka.proto_dist", [ + {default, "inet_tcp"}, + {datatype, {enum, [inet_tcp, inet6_tcp, inet_tls]}}, + hidden +]}. + +{mapping, "cluster.dns.app", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% Cluster using etcd + +{mapping, "cluster.etcd.server", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.prefix", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.node_ttl", "ekka.cluster_discovery", [ + {datatype, {duration, ms}}, + {default, "1m"} +]}. + +{mapping, "cluster.etcd.ssl.keyfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.ssl.certfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.etcd.ssl.cacertfile", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% Cluster on K8s + +{mapping, "cluster.k8s.apiserver", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.k8s.service_name", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.k8s.address_type", "ekka.cluster_discovery", [ + {datatype, {enum, [ip, dns, hostname]}} +]}. + +{mapping, "cluster.k8s.app_name", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.k8s.namespace", "ekka.cluster_discovery", [ + {datatype, string} +]}. + +{mapping, "cluster.k8s.suffix", "ekka.cluster_discovery", [ + {datatype, string}, + {default, ""} + ]}. + +{translation, "ekka.cluster_discovery", fun(Conf) -> + Strategy = cuttlefish:conf_get("cluster.discovery", Conf), + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + IpPort = fun(S) -> + [Addr, Port] = string:tokens(S, ":"), + {ok, Ip} = inet:parse_address(Addr), + {Ip, Port} + end, + Options = fun(static) -> + [{seeds, [list_to_atom(S) || S <- string:tokens(cuttlefish:conf_get("cluster.static.seeds", Conf, ""), ",")]}]; + (mcast) -> + {ok, Addr} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.addr", Conf)), + {ok, Iface} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.iface", Conf)), + Ports = [list_to_integer(S) || S <- string:tokens(cuttlefish:conf_get("cluster.mcast.ports", Conf), ",")], + [{addr, Addr}, {ports, Ports}, {iface, Iface}, + {ttl, cuttlefish:conf_get("cluster.mcast.ttl", Conf, 1)}, + {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}]; + (dns) -> + [{name, cuttlefish:conf_get("cluster.dns.name", Conf)}, + {app, cuttlefish:conf_get("cluster.dns.app", Conf)}]; + (etcd) -> + SslOpts = fun(Conf) -> + Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf), + lists:map(fun({["cluster", "etcd", "ssl", Name], Value}) -> + {list_to_atom(Name), Value} + end, Options) + end, + [{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")}, + {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")}, + {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}, + {ssl_options, SslOpts(Conf)}]; + (k8s) -> + [{apiserver, cuttlefish:conf_get("cluster.k8s.apiserver", Conf)}, + {service_name, cuttlefish:conf_get("cluster.k8s.service_name", Conf)}, + {address_type, cuttlefish:conf_get("cluster.k8s.address_type", Conf, ip)}, + {app_name, cuttlefish:conf_get("cluster.k8s.app_name", Conf)}, + {namespace, cuttlefish:conf_get("cluster.k8s.namespace", Conf)}, + {suffix, cuttlefish:conf_get("cluster.k8s.suffix", Conf, "")}]; + (manual) -> + [ ] + end, + {Strategy, Filter(Options(Strategy))} +end}. + +%%-------------------------------------------------------------------- +%% Node +%%-------------------------------------------------------------------- + +%% @doc Node name +{mapping, "node.name", "vm_args.-name", [ + {default, "emqx@127.0.0.1"}, + {override_env, "NODE_NAME"} +]}. + +%% @doc Specify SSL Options in the file if using SSL for erlang distribution +{mapping, "node.ssl_dist_optfile", "vm_args.-ssl_dist_optfile", [ + {datatype, string}, + hidden +]}. + +%% @doc Secret cookie for distributed erlang node +{mapping, "node.cookie", "vm_args.-setcookie", [ + {default, "emqxsecretcookie"}, + {override_env, "NODE_COOKIE"} +]}. + +{mapping, "node.data_dir", "emqx.data_dir", [ + {datatype, string} +]}. + +%% @doc http://erlang.org/doc/man/heart.html +{mapping, "node.heartbeat", "vm_args.-heart", [ + {datatype, flag}, + hidden +]}. + +{translation, "vm_args.-heart", fun(Conf) -> + case cuttlefish:conf_get("node.heartbeat", Conf) of + true -> ""; + false -> cuttlefish:invalid("should be 'on' or comment the line!") + end +end}. + +%% @doc More information at: http://erlang.org/doc/man/erl.html +{mapping, "node.async_threads", "vm_args.+A", [ + {datatype, integer}, + {validators, ["range:0-1024"]} +]}. + +%% @doc Erlang Process Limit +{mapping, "node.process_limit", "vm_args.+P", [ + {datatype, integer}, + hidden +]}. + +%% @doc The maximum number of concurrent ports/sockets. +%% Valid range is 1024-134217727 +{mapping, "node.max_ports", "vm_args.+Q", [ + {datatype, integer}, + {validators, ["range4ports"]}, + {override_env, "MAX_PORTS"} +]}. + +{validator, "range4ports", "must be 1024 to 134217727", + fun(X) -> X >= 1024 andalso X =< 134217727 end}. + +%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl +{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ + {datatype, bytesize}, + {commented, "32MB"}, + hidden, + {validators, ["zdbbl_range"]} +]}. + +{translation, "vm_args.+zdbbl", + fun(Conf) -> + ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined), + case ZDBBL of + undefined -> undefined; + X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; + _ -> undefined + end + end}. + +{validator, "zdbbl_range", "must be between 1KB and 2097151KB", + fun(ZDBBL) -> + %% 2097151KB = 2147482624 + ZDBBL >= 1024 andalso ZDBBL =< 2147482624 + end +}. + +%% @doc Global GC Interval +{mapping, "node.global_gc_interval", "emqx.global_gc_interval", [ + {datatype, {duration, s}} +]}. + +%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 +{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ + {default, 1000}, + {datatype, integer}, + hidden, + {validators, ["positive_integer"]} +]}. + +{validator, "positive_integer", "must be a positive integer", + fun(X) -> X >= 0 end}. + +%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, +%% R16+ uses +e +%% @doc The ETS table limit +{mapping, "node.max_ets_tables", + cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [ + {default, 256000}, + {datatype, integer}, + hidden +]}. + +%% @doc Set the location of crash dumps +{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ + {default, "{{crash_dump}}"}, + {datatype, file}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime +{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ + {datatype, integer}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html +{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ + {commented, 6369}, + {datatype, integer}, + hidden +]}. + +%% @see node.dist_listen_min +{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ + {commented, 6369}, + {datatype, integer}, + hidden +]}. + +{mapping, "node.backtrace_depth", "emqx.backtrace_depth", [ + {default, 16}, + {datatype, integer} +]}. + +%%-------------------------------------------------------------------- +%% RPC +%%-------------------------------------------------------------------- + +%% RPC Mode. +{mapping, "rpc.mode", "emqx.rpc_mode", [ + {default, async}, + {datatype, {enum, [sync, async]}} +]}. + +{mapping, "rpc.async_batch_size", "gen_rpc.max_batch_size", [ + {default, 256}, + {datatype, integer} +]}. + +{mapping, "rpc.port_discovery", "gen_rpc.port_discovery", [ + {default, stateless}, + {datatype, {enum, [manual, stateless]}} +]}. + +%% RPC server port. +{mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [ + {default, 5369}, + {datatype, integer} +]}. + +%% Number of tcp connections when connecting to RPC server +{mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:gt_0_lt_256"]} +]}. + +{translation, "gen_rpc.tcp_client_num", fun(Conf) -> + case cuttlefish:conf_get("rpc.tcp_client_num", Conf) of + 0 -> max(1, erlang:system_info(schedulers) div 2); + V -> V + end +end}. + +%% Client connect timeout +{mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [ + {default, "5s"}, + {datatype, {duration, ms}} +]}. + +%% Client and Server send timeout +{mapping, "rpc.send_timeout", "gen_rpc.send_timeout", [ + {default, 5000}, + {datatype, {duration, ms}} +]}. + +%% Authentication timeout +{mapping, "rpc.authentication_timeout", "gen_rpc.authentication_timeout", [ + {default, 5000}, + {datatype, {duration, ms}} +]}. + +%% Default receive timeout for call() functions +{mapping, "rpc.call_receive_timeout", "gen_rpc.call_receive_timeout", [ + {default, 15000}, + {datatype, {duration, ms}} +]}. + +%% Socket keepalive configuration +{mapping, "rpc.socket_keepalive_idle", "gen_rpc.socket_keepalive_idle", [ + {default, 7200}, + {datatype, {duration, s}} +]}. + +%% Seconds between probes +{mapping, "rpc.socket_keepalive_interval", "gen_rpc.socket_keepalive_interval", [ + {default, 75}, + {datatype, {duration, s}} +]}. + +%% Probes lost to close the connection +{mapping, "rpc.socket_keepalive_count", "gen_rpc.socket_keepalive_count", [ + {default, 9}, + {datatype, integer} +]}. + +%% Size of TCP send buffer +{mapping, "rpc.socket_sndbuf", "gen_rpc.socket_sndbuf", [ + {default, "1MB"}, + {datatype, bytesize} +]}. + +%% Size of TCP receive buffer +{mapping, "rpc.socket_recbuf", "gen_rpc.socket_recbuf", [ + {default, "1MB"}, + {datatype, bytesize} +]}. + +%% Size of TCP receive buffer +{mapping, "rpc.socket_buffer", "gen_rpc.socket_buffer", [ + {default, "1MB"}, + {datatype, bytesize} +]}. + +{validator, "range:gt_0_lt_256", "must greater than 0 and less than 256", + fun(X) -> X >= 0 andalso X < 256 end +}. + +%% Force client to use server listening port, because we do no provide +%% per-node listening port manual mapping from configs. +%% i.e. all nodes in the cluster should agree to the same +%% listening port number. +{translation, "gen_rpc.tcp_client_port", fun(_, _, Conf) -> + cuttlefish:conf_get("rpc.tcp_server_port", Conf) +end}. + +%%-------------------------------------------------------------------- +%% Log +%%-------------------------------------------------------------------- + +{mapping, "log.to", "kernel.logger", [ + {default, file}, + {datatype, {enum, [file, console, both]}} +]}. + +{mapping, "log.level", "kernel.logger", [ + {default, warning}, + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} +]}. + +{mapping, "log.primary_log_level", "kernel.logger_level", [ + {default, warning}, + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} +]}. + +{mapping, "log.dir", "kernel.logger", [ + {default, "log"}, + {datatype, string} +]}. + +{mapping, "log.file", "kernel.logger", [ + {default, "emqx.log"}, + {datatype, file} +]}. + +{mapping, "log.chars_limit", "kernel.logger", [ + {default, -1}, + {datatype, integer} +]}. + +{mapping, "log.supervisor_reports", "kernel.logger", [ + {default, error}, + {datatype, {enum, [error, progress]}}, + hidden +]}. + +%% @doc Maximum depth in Erlang term log formatting +%% and message queue inspection. +{mapping, "log.max_depth", "kernel.error_logger_format_depth", [ + {default, 20}, + {datatype, [{enum, [unlimited]}, integer]} +]}. + +%% @doc format logs as JSON objects +{mapping, "log.formatter", "kernel.logger", [ + {default, text}, + {datatype, {enum, [text, json]}} +]}. + +%% @doc format logs in a single line. +{mapping, "log.single_line", "kernel.logger", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "log.rotation", "kernel.logger", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "log.rotation.size", "kernel.logger", [ + {default, "10MB"}, + {datatype, bytesize} +]}. + +{mapping, "log.size", "kernel.logger", [ + {default, infinity}, + {datatype, [bytesize, atom]} +]}. + +{mapping, "log.rotation.count", "kernel.logger", [ + {default, 5}, + {datatype, integer} +]}. + +{mapping, "log.$level.file", "kernel.logger", [ + {datatype, file} +]}. + +{mapping, "log.sync_mode_qlen", "kernel.logger", [ + {default, 100}, + {datatype, integer} +]}. + +{mapping, "log.drop_mode_qlen", "kernel.logger", [ + {default, 3000}, + {datatype, integer} +]}. + +{mapping, "log.flush_qlen", "kernel.logger", [ + {default, 8000}, + {datatype, integer} +]}. + +{mapping, "log.overload_kill", "kernel.logger", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "log.overload_kill_mem_size", "kernel.logger", [ + {default, "30MB"}, + {datatype, bytesize} +]}. + +{mapping, "log.overload_kill_qlen", "kernel.logger", [ + {default, 20000}, + {datatype, integer} +]}. + +{mapping, "log.overload_kill_restart_after", "kernel.logger", [ + {default, "5s"}, + {datatype, [{duration, ms}, atom]} +]}. + +{mapping, "log.burst_limit", "kernel.logger", [ + {default, "disabled"}, + {datatype, string} +]}. + +{mapping, "log.error_logger", "kernel.error_logger", [ + {default, silent}, + {datatype, {enum, [silent]}}, + hidden +]}. + +{translation, "kernel.logger_level", fun(_, _, Conf) -> + cuttlefish:conf_get("log.level", Conf) +end}. + +{translation, "kernel.logger", fun(Conf) -> + LogTo = cuttlefish:conf_get("log.to", Conf), + LogLevel = cuttlefish:conf_get("log.level", Conf), + LogType = case cuttlefish:conf_get("log.rotation", Conf) of + true -> wrap; + false -> halt + end, + CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of + -1 -> unlimited; + V -> V + end, + SingleLine = cuttlefish:conf_get("log.single_line", Conf), + FmtName = cuttlefish:conf_get("log.formatter", Conf), + Formatter = + case FmtName of + json -> + {emqx_logger_jsonfmt, + #{chars_limit => CharsLimit, + single_line => SingleLine + }}; + text -> + {emqx_logger_textfmt, + #{template => + [time," [",level,"] ", + {clientid, + [{peername, + [clientid,"@",peername," "], + [clientid, " "]}], + [{peername, + [peername," "], + []}]}, + msg,"\n"], + chars_limit => CharsLimit, + single_line => SingleLine + }} + end, + {BustLimitOn, {MaxBurstCount, TimeWindow}} = + case string:tokens(cuttlefish:conf_get("log.burst_limit", Conf), ", ") of + ["disabled"] -> {false, {20000, 1000}}; + [Count, Window] -> + {true, {list_to_integer(Count), + case cuttlefish_duration:parse(Window, ms) of + Secs when is_integer(Secs) -> Secs; + {error, Reason1} -> error(Reason1) + end}} + end, + FileConf = fun(Filename) -> + BasicConf = + #{type => LogType, + file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename), + max_no_files => cuttlefish:conf_get("log.rotation.count", Conf), + sync_mode_qlen => cuttlefish:conf_get("log.sync_mode_qlen", Conf), + drop_mode_qlen => cuttlefish:conf_get("log.drop_mode_qlen", Conf), + flush_qlen => cuttlefish:conf_get("log.flush_qlen", Conf), + overload_kill_enable => cuttlefish:conf_get("log.overload_kill", Conf), + overload_kill_qlen => cuttlefish:conf_get("log.overload_kill_qlen", Conf), + overload_kill_mem_size => cuttlefish:conf_get("log.overload_kill_mem_size", Conf), + overload_kill_restart_after => cuttlefish:conf_get("log.overload_kill_restart_after", Conf), + burst_limit_enable => BustLimitOn, + burst_limit_max_count => MaxBurstCount, + burst_limit_window_time => TimeWindow + }, + MaxNoBytes = case LogType of + wrap -> cuttlefish:conf_get("log.rotation.size", Conf); + halt -> cuttlefish:conf_get("log.size", Conf) + end, + BasicConf#{max_no_bytes => MaxNoBytes} + end, + + Filters = case cuttlefish:conf_get("log.supervisor_reports", Conf) of + error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}]; + progress -> [] + end, + + %% For the default logger that outputs to console + DefaultHandler = + if LogTo =:= console orelse LogTo =:= both -> + [{handler, console, logger_std_h, + #{level => LogLevel, + config => #{type => standard_io}, + formatter => Formatter, + filters => Filters + } + }]; + true -> + [{handler, default, undefined}] + end, + + %% For the file logger + FileHandler = + if LogTo =:= file orelse LogTo =:= both -> + [{handler, file, logger_disk_log_h, + #{level => LogLevel, + config => FileConf(cuttlefish:conf_get("log.file", Conf)), + formatter => Formatter, + filesync_repeat_interval => no_repeat, + filters => Filters + }}]; + true -> [] + end, + + %% For creating additional log files for specific log levels. + AdditionalLogFiles = + lists:foldl( + fun({[_, Level, _] = K, Filename}, Acc) when LogTo =:= file; LogTo =:= both -> + case cuttlefish_variable:is_fuzzy_match(K, ["log", "$level", "file"]) of + true -> [{Level, Filename} | Acc]; + false -> Acc + end; + ({_K, _V}, Acc) -> + Acc + end, [], Conf), + AdditionalHandlers = + [{handler, list_to_atom("file_for_"++Level), logger_disk_log_h, + #{level => list_to_atom(Level), + config => FileConf(Filename), + formatter => Formatter, + filesync_repeat_interval => no_repeat}} + || {Level, Filename} <- AdditionalLogFiles], + + DefaultHandler ++ FileHandler ++ AdditionalHandlers +end}. + +%%-------------------------------------------------------------------- +%% Authentication/ACL +%%-------------------------------------------------------------------- + +%% @doc Allow anonymous authentication. +{mapping, "allow_anonymous", "emqx.allow_anonymous", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc ACL nomatch. +{mapping, "acl_nomatch", "emqx.acl_nomatch", [ + {default, deny}, + {datatype, {enum, [allow, deny]}} +]}. + +%% @doc Default ACL file. +{mapping, "acl_file", "emqx.acl_file", [ + {datatype, string}, + hidden +]}. + +%% @doc Enable ACL cache for publish. +{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [ + {default, on}, + {datatype, flag} +]}. + +%% @doc ACL cache time-to-live. +{mapping, "acl_cache_ttl", "emqx.acl_cache_ttl", [ + {default, "1m"}, + {datatype, {duration, ms}} +]}. + +%% @doc ACL cache size. +{mapping, "acl_cache_max_size", "emqx.acl_cache_max_size", [ + {default, 32}, + {datatype, integer}, + {validators, ["range:gt_0"]} +]}. + +%% @doc Action when acl check reject current operation +{mapping, "acl_deny_action", "emqx.acl_deny_action", [ + {default, ignore}, + {datatype, {enum, [ignore, disconnect]}} +]}. + +%% @doc Flapping detect policy +{mapping, "flapping_detect_policy", "emqx.flapping_detect_policy", [ + {datatype, string}, + {default, "30,1m,5m"} +]}. + +{translation, "emqx.flapping_detect_policy", fun(Conf) -> + Policy = cuttlefish:conf_get("flapping_detect_policy", Conf), + [Threshold, Duration, Interval] = string:tokens(Policy, ", "), + ParseDuration = fun(S, Dur) -> + case cuttlefish_duration:parse(S, Dur) of + I when is_integer(I) -> I; + {error, Reason} -> error(Reason) + end + end, + #{threshold => list_to_integer(Threshold), + duration => ParseDuration(Duration, ms), + banned_interval => ParseDuration(Interval, s) + } +end}. + +{validator, "range:gt_0", "must greater than 0", + fun(X) -> X > 0 end +}. + +%%-------------------------------------------------------------------- +%% MQTT Protocol +%%-------------------------------------------------------------------- + +%% @doc Max Packet Size Allowed, 1MB by default. +{mapping, "mqtt.max_packet_size", "emqx.max_packet_size", [ + {default, "1MB"}, + {datatype, bytesize}, + {override_env, "MAX_PACKET_SIZE"} +]}. + +%% @doc Set the Max ClientId Length Allowed. +{mapping, "mqtt.max_clientid_len", "emqx.max_clientid_len", [ + {default, 65535}, + {datatype, integer} +]}. + +%% @doc Set the Maximum topic levels. +{mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Set the Maximum QoS allowed. +{mapping, "mqtt.max_qos_allowed", "emqx.max_qos_allowed", [ + {default, 2}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +%% @doc Set the Maximum Topic Alias. +{mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [ + {default, 65535}, + {datatype, integer} +]}. + +%% @doc Whether the server supports MQTT retained messages. +{mapping, "mqtt.retain_available", "emqx.retain_available", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Whether the Server supports MQTT Wildcard Subscriptions. +{mapping, "mqtt.wildcard_subscription", "emqx.wildcard_subscription", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Whether the Server supports MQTT Shared Subscriptions. +{mapping, "mqtt.shared_subscription", "emqx.shared_subscription", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Whether to ignore loop delivery of messages.(for mqtt v3.1.1) +{mapping, "mqtt.ignore_loop_deliver", "emqx.ignore_loop_deliver", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Whether to parse the MQTT frame in strict mode +{mapping, "mqtt.strict_mode", "emqx.strict_mode", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Specify the response information returned to the client +{mapping, "mqtt.response_information", "emqx.response_information", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% Zones +%%-------------------------------------------------------------------- + +%% @doc Idle timeout of the MQTT connection. +{mapping, "zone.$name.idle_timeout", "emqx.zones", [ + {default, "15s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "zone.$name.allow_anonymous", "emqx.zones", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "zone.$name.acl_nomatch", "emqx.zones", [ + {datatype, {enum, [allow, deny]}} +]}. + +%% @doc Enable ACL check. +{mapping, "zone.$name.enable_acl", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + +%% @doc Action when acl check reject current operation +{mapping, "zone.$name.acl_deny_action", "emqx.zones", [ + {default, ignore}, + {datatype, {enum, [ignore, disconnect]}} +]}. + +%% @doc Enable Ban. +{mapping, "zone.$name.enable_ban", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + +%% @doc Enable per connection statistics. +{mapping, "zone.$name.enable_stats", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + +%% @doc Publish limit of the MQTT connections. +{mapping, "zone.$name.publish_limit", "emqx.zones", [ + {datatype, string} +]}. + +%% @doc Max Packet Size Allowed, 64K by default. +{mapping, "zone.$name.max_packet_size", "emqx.zones", [ + {datatype, bytesize} +]}. + +%% @doc Set the Max ClientId Length Allowed. +{mapping, "zone.$name.max_clientid_len", "emqx.zones", [ + {datatype, integer} +]}. + +%% @doc Set the Maximum topic levels. +{mapping, "zone.$name.max_topic_levels", "emqx.zones", [ + {datatype, integer} +]}. + +%% @doc Set the Maximum QoS allowed. +{mapping, "zone.$name.max_qos_allowed", "emqx.zones", [ + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +%% @doc Set the Maximum topic alias. +{mapping, "zone.$name.max_topic_alias", "emqx.zones", [ + {datatype, integer} +]}. + +%% @doc Whether the server supports retained messages. +{mapping, "zone.$name.retain_available", "emqx.zones", [ + {datatype, {enum, [true, false]}} +]}. + +%% @doc Whether the Server supports Wildcard Subscriptions. +{mapping, "zone.$name.wildcard_subscription", "emqx.zones", [ + {datatype, {enum, [true, false]}} +]}. + +%% @doc Whether the Server supports Shared Subscriptions. +{mapping, "zone.$name.shared_subscription", "emqx.zones", [ + {datatype, {enum, [true, false]}} +]}. + +%% @doc Server Keepalive +{mapping, "zone.$name.server_keepalive", "emqx.zones", [ + {datatype, integer} +]}. + +%% @doc Keepalive backoff +{mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ + {default, 0.75}, + {datatype, float} +]}. + +%% @doc Max Number of Subscriptions Allowed. +{mapping, "zone.$name.max_subscriptions", "emqx.zones", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Upgrade QoS according to subscription? +{mapping, "zone.$name.upgrade_qos", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + +%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. +%% 0 is equivalent to maximum allowed +{mapping, "zone.$name.max_inflight", "emqx.zones", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:1-65535"]} +]}. + +%% @doc Retry interval for redelivering QoS1/2 messages. +{mapping, "zone.$name.retry_interval", "emqx.zones", [ + {default, "30s"}, + {datatype, {duration, s}} +]}. + +%% @doc Max Packets that Awaiting PUBREL, 0 means no limit +{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Awaiting PUBREL timeout +{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [ + {default, "300s"}, + {datatype, {duration, s}} +]}. + +%% @doc Ignore loop delivery of messages +{mapping, "zone.$name.ignore_loop_deliver", "emqx.zones", [ + {datatype, {enum, [true, false]}} +]}. + +%% @doc Session Expiry Interval +{mapping, "zone.$name.session_expiry_interval", "emqx.zones", [ + {default, "2h"}, + {datatype, {duration, s}} +]}. + +%% @doc Max queue length. Enqueued messages when persistent client +%% disconnected, or inflight window is full. 0 means no limit. +{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [ + {default, 1000}, + {datatype, integer} +]}. + +%% @doc Topic Priorities, comma separated topic=priority pairs, +%% where priority should be integer in range 1-255 (inclusive) +%% 1 being the lowest and 255 being the highest. +%% default value `none` to indicate no priority table, hence all +%% messages are treated equal, which means either highest ('infinity'), +%% or lowest (0) depending on mqueue_default_priority config. +{mapping, "zone.$name.mqueue_priorities", "emqx.zones", [ + {default, "none"}, + {datatype, string} +]}. + +%% @doc Default priority for topics not in priority table. +{mapping, "zone.$name.mqueue_default_priority", "emqx.zones", [ + {default, lowest}, + {datatype, {enum, [highest, lowest]}} +]}. + +%% @doc Queue Qos0 messages? +{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [ + {datatype, flag}, + {default, off} +]}. + +{mapping, "zone.$name.rate_limit.conn_messages_in", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.rate_limit.conn_bytes_in", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.conn_congestion.alarm", "emqx.zones", [ + {datatype, flag}, + {default, off} +]}. + +{mapping, "zone.$name.conn_congestion.min_alarm_sustain_duration", "emqx.zones", [ + {default, "1m"}, + {datatype, {duration, ms}} +]}. + +{mapping, "zone.$name.quota.conn_messages_routing", "emqx.zones", [ + {datatype, string} +]}. + +{mapping, "zone.$name.quota.overall_messages_routing", "emqx.zones", [ + {datatype, string} +]}. + +%% @doc Force connection/session process GC after this number of +%% messages | bytes passed through. +%% Numbers delimited by `|'. Zero or negative is to disable. +{mapping, "zone.$name.force_gc_policy", "emqx.zones", [ + {datatype, string} + ]}. + +%% @doc Max message queue length and total heap size to force shutdown +%% connection/session process. +%% Message queue here is the Erlang process mailbox, but not the number +%% of queued MQTT messages of QoS 1 and 2. +%% Zero or negative is to disable. +{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [ + {default, "default"}, + {datatype, string} +]}. + +{mapping, "zone.$name.mountpoint", "emqx.zones", [ + {datatype, string} +]}. + +%% @doc Use username replace client id +{mapping, "zone.$name.use_username_as_clientid", "emqx.zones", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Whether to parse the MQTT frame in strict mode +{mapping, "zone.$name.strict_mode", "emqx.zones", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Specify the response information returned to the client +{mapping, "zone.$name.response_information", "emqx.zones", [ + {datatype, string} +]}. + +%% @doc Whether to bypass the authentication step +{mapping, "zone.$name.bypass_auth_plugins", "emqx.zones", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqx.zones", fun(Conf) -> + Ratelimit = fun(Val) -> + [L, D] = string:tokens(Val, ", "), + Limit = case cuttlefish_bytesize:parse(L) of + Sz when is_integer(Sz) -> Sz; + {error, Reason1} -> error(Reason1) + end, + Duration = case cuttlefish_duration:parse(D, s) of + Secs when is_integer(Secs) -> Secs; + {error, Reason} -> error(Reason) + end, + {Limit, Duration} + end, + Mapping = fun(["publish_limit"], Val) -> + %% XXX: Deprecated at v4.2 + {publish_limit, Ratelimit(Val)}; + (["force_gc_policy"], Val) -> + [Count, Bytes] = string:tokens(Val, "| "), + GcPolicy = case cuttlefish_bytesize:parse(Bytes) of + {error, Reason} -> + error(Reason); + Bytes1 -> + #{bytes => Bytes1, + count => list_to_integer(Count)} + end, + {force_gc_policy, GcPolicy}; + (["force_shutdown_policy"], "default") -> + {DefaultLen, DefaultSize} = + case WordSize = erlang:system_info(wordsize) of + 8 -> % arch_64 + {10000, cuttlefish_bytesize:parse("64MB")}; + 4 -> % arch_32 + {1000, cuttlefish_bytesize:parse("32MB")} + end, + {force_shutdown_policy, #{message_queue_len => DefaultLen, + max_heap_size => DefaultSize div WordSize + }}; + (["force_shutdown_policy"], Val) -> + [Len, Siz] = string:tokens(Val, "| "), + MaxSiz = case WordSize = erlang:system_info(wordsize) of + 8 -> % arch_64 + (1 bsl 59) - 1; + 4 -> % arch_32 + (1 bsl 27) - 1 + end, + ShutdownPolicy = + case cuttlefish_bytesize:parse(Siz) of + {error, Reason} -> + error(Reason); + Siz1 when Siz1 > MaxSiz -> + cuttlefish:invalid(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz])); + Siz1 -> + #{message_queue_len => list_to_integer(Len), + max_heap_size => Siz1 div WordSize} + end, + {force_shutdown_policy, ShutdownPolicy}; + (["mqueue_priorities"], Val) -> + case Val of + "none" -> {mqueue_priorities, none}; % NO_PRIORITY_TABLE + _ -> + MqueuePriorities = lists:foldl(fun(T, Acc) -> + %% NOTE: space in "= " is intended + [Topic, Prio] = string:tokens(T, "= "), + P = list_to_integer(Prio), + (P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}), + maps:put(iolist_to_binary(Topic), P, Acc) + end, #{}, string:tokens(Val, ",")), + {mqueue_priorities, MqueuePriorities} + end; + (["mountpoint"], Val) -> + {mountpoint, iolist_to_binary(Val)}; + (["response_information"], Val) -> + {response_information, iolist_to_binary(Val)}; + (["rate_limit", "conn_messages_in"], Val) -> + {ratelimit, {conn_messages_in, Ratelimit(Val)}}; + (["rate_limit", "conn_bytes_in"], Val) -> + {ratelimit, {conn_bytes_in, Ratelimit(Val)}}; + (["conn_congestion", "alarm"], Val) -> + {conn_congestion_alarm_enabled, Val}; + (["conn_congestion", "min_alarm_sustain_duration"], Val) -> + {conn_congestion_min_alarm_sustain_duration, Val}; + (["quota", "conn_messages_routing"], Val) -> + {quota, {conn_messages_routing, Ratelimit(Val)}}; + (["quota", "overall_messages_routing"], Val) -> + {quota, {overall_messages_routing, Ratelimit(Val)}}; + ([Opt], Val) -> + {list_to_atom(Opt), Val} + end, + maps:to_list( + lists:foldl( + fun({["zone", Name | Opt], Val}, Zones) -> + NVal = Mapping(Opt, Val), + maps:update_with(list_to_atom(Name), + fun(Opts) -> + case NVal of + {Key, Rl} when Key == ratelimit; + Key == quota -> + Rls = proplists:get_value(Key, Opts, []), + lists:keystore(Key, 1, Opts, {Key, [Rl|Rls]}); + _ -> + [NVal|Opts] + end + end, [NVal], Zones) + end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf)))) +end}. + +%%-------------------------------------------------------------------- +%% Listeners +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% TCP Listeners + +{mapping, "listener.tcp.$name", "emqx.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.tcp.$name.acceptors", "emqx.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.tcp.$name.max_connections", "emqx.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.tcp.$name.max_conn_rate", "emqx.listeners", [ + {datatype, integer} +]}. + +{mapping, "listener.tcp.$name.active_n", "emqx.listeners", [ + {default, 100}, + {datatype, integer} +]}. + +{mapping, "listener.tcp.$name.zone", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.rate_limit", "emqx.listeners", [ + {default, undefined}, + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.access.$id", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.proxy_protocol", "emqx.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.tcp.$name.proxy_protocol_timeout", "emqx.listeners", [ + {datatype, {duration, ms}} +]}. + +%% The proxy-protocol protocol can get the certificate CN through tcp +{mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [ + {datatype, {enum, [cn]}} +]}. + +%% The proxy-protocol protocol can get the certificate CN through tcp +{mapping, "listener.tcp.$name.peer_cert_as_clientid", "emqx.listeners", [ + {datatype, {enum, [cn]}} +]}. + +{mapping, "listener.tcp.$name.backlog", "emqx.listeners", [ + {datatype, integer}, + {default, 1024} +]}. + +{mapping, "listener.tcp.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.tcp.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} +]}. + +{mapping, "listener.tcp.$name.recbuf", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.tcp.$name.sndbuf", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.tcp.$name.buffer", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.tcp.$name.high_watermark", "emqx.listeners", [ + {datatype, bytesize}, + {default, "1MB"} + ]}. + +{mapping, "listener.tcp.$name.tune_buffer", "emqx.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.tcp.$name.nodelay", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.tcp.$name.reuseaddr", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% SSL Listeners + +{mapping, "listener.ssl.$name", "emqx.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.ssl.$name.acceptors", "emqx.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.max_connections", "emqx.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.max_conn_rate", "emqx.listeners", [ + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.active_n", "emqx.listeners", [ + {default, 100}, + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.zone", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.rate_limit", "emqx.listeners", [ + {default, undefined}, + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.access.$id", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.proxy_protocol", "emqx.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.proxy_protocol_timeout", "emqx.listeners", [ + {datatype, {duration, ms}} +]}. + +{mapping, "listener.ssl.$name.backlog", "emqx.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.ssl.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} +]}. + +{mapping, "listener.ssl.$name.recbuf", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.sndbuf", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.buffer", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.high_watermark", "emqx.listeners", [ + {datatype, bytesize}, + {default, "1MB"} + ]}. + +{mapping, "listener.ssl.$name.tune_buffer", "emqx.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.ssl.$name.nodelay", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ssl.$name.reuseaddr", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ssl.$name.tls_versions", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.ciphers", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.psk_ciphers", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.handshake_timeout", "emqx.listeners", [ + {default, "15s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "listener.ssl.$name.depth", "emqx.listeners", [ + {default, 10}, + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.key_password", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.dhfile", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.keyfile", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.certfile", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.cacertfile", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.verify", "emqx.listeners", [ + {datatype, atom} +]}. + +{mapping, "listener.ssl.$name.fail_if_no_peer_cert", "emqx.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "listener.ssl.$name.secure_renegotiate", "emqx.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.reuse_sessions", "emqx.listeners", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.honor_cipher_order", "emqx.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.peer_cert_as_username", "emqx.listeners", [ + {datatype, {enum, [cn, dn, crt, pem, md5]}} +]}. + +{mapping, "listener.ssl.$name.peer_cert_as_clientid", "emqx.listeners", [ + {datatype, {enum, [cn, dn, crt, pem, md5]}} +]}. + +%%-------------------------------------------------------------------- +%% MQTT/WebSocket Listeners + +{mapping, "listener.ws.$name", "emqx.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.ws.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} +]}. + +{mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.ws.$name.max_connections", "emqx.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ws.$name.max_conn_rate", "emqx.listeners", [ + {datatype, integer} +]}. + +{mapping, "listener.ws.$name.active_n", "emqx.listeners", [ + {default, 100}, + {datatype, integer} +]}. + +{mapping, "listener.ws.$name.zone", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.rate_limit", "emqx.listeners", [ + {default, undefined}, + {datatype, string} +]}. + +{mapping, "listener.ws.$name.access.$id", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.fail_if_no_subprotocol", "emqx.listeners", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "listener.ws.$name.supported_subprotocols", "emqx.listeners", [ + {default, "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5"}, + {datatype, string} +]}. + +{mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [ + {default, "X-Forwarded-For"}, + {datatype, string} +]}. + +{mapping, "listener.ws.$name.proxy_port_header", "emqx.listeners", [ + {default, "X-Forwarded-Port"}, + {datatype, string} +]}. + +{mapping, "listener.ws.$name.proxy_protocol", "emqx.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ws.$name.proxy_protocol_timeout", "emqx.listeners", [ + {datatype, {duration, ms}} +]}. + +{mapping, "listener.ws.$name.backlog", "emqx.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ws.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.ws.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} +]}. + +{mapping, "listener.ws.$name.recbuf", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.sndbuf", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.buffer", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.tune_buffer", "emqx.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.ws.$name.nodelay", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ws.$name.compress", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.level", "emqx.listeners", [ + {datatype, {enum, [none, default, best_compression, best_speed]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.mem_level", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:1-9"]}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.strategy", "emqx.listeners", [ + {datatype, {enum, [default, filtered, huffman_only, rle]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.idle_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + hidden +]}. + +{mapping, "listener.ws.$name.max_frame_size", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.ws.$name.mqtt_piggyback", "emqx.listeners", [ + {datatype, {enum, [single, multiple]}}, + {default, multiple}, + hidden +]}. + +{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [ + {datatype, {enum, [cn]}} +]}. + +{mapping, "listener.ws.$name.peer_cert_as_clientid", "emqx.listeners", [ + {datatype, {enum, [cn]}} +]}. + +{mapping, "listener.ws.$name.check_origin_enable", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + {default, false}, + hidden +]}. + +{mapping, "listener.ws.$name.allow_origin_absence", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + {default, true}, + hidden +]}. + +{mapping, "listener.ws.$name.check_origins", "emqx.listeners", [ + {datatype, string}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT/WebSocket/SSL Listeners + +{mapping, "listener.wss.$name", "emqx.listeners", [ + {datatype, [integer, ip]} +]}. + +{mapping, "listener.wss.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} +]}. + +{mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "listener.wss.$name.max_connections", "emqx.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.wss.$name.max_conn_rate", "emqx.listeners", [ + {datatype, integer} +]}. + +{mapping, "listener.wss.$name.active_n", "emqx.listeners", [ + {default, 100}, + {datatype, integer} +]}. + +{mapping, "listener.wss.$name.zone", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.rate_limit", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.fail_if_no_subprotocol", "emqx.listeners", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "listener.wss.$name.supported_subprotocols", "emqx.listeners", [ + {default, "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5"}, + {datatype, string} +]}. + +{mapping, "listener.wss.$name.access.$id", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.proxy_address_header", "emqx.listeners", [ + {default, "X-Forwarded-For"}, + {datatype, string} +]}. + +{mapping, "listener.wss.$name.proxy_port_header", "emqx.listeners", [ + {default, "X-Forwarded-Port"}, + {datatype, string} +]}. + +{mapping, "listener.wss.$name.proxy_protocol", "emqx.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.wss.$name.proxy_protocol_timeout", "emqx.listeners", [ + {datatype, {duration, ms}} +]}. + +%%{mapping, "listener.wss.$name.handshake_timeout", "emqx.listeners", [ +%% {default, "15s"}, +%% {datatype, {duration, ms}} +%%]}. + +{mapping, "listener.wss.$name.backlog", "emqx.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.wss.$name.send_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + {default, "15s"} +]}. + +{mapping, "listener.wss.$name.send_timeout_close", "emqx.listeners", [ + {datatype, flag}, + {default, on} +]}. + +{mapping, "listener.wss.$name.recbuf", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.sndbuf", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.buffer", "emqx.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.tune_buffer", "emqx.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.wss.$name.nodelay", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.wss.$name.tls_versions", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.ciphers", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.psk_ciphers", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.keyfile", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.certfile", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.cacertfile", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.dhfile", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.depth", "emqx.listeners", [ + {default, 10}, + {datatype, integer} +]}. + +{mapping, "listener.wss.$name.key_password", "emqx.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.verify", "emqx.listeners", [ + {datatype, atom} +]}. + +{mapping, "listener.wss.$name.fail_if_no_peer_cert", "emqx.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "listener.wss.$name.secure_renegotiate", "emqx.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.wss.$name.reuse_sessions", "emqx.listeners", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "listener.wss.$name.honor_cipher_order", "emqx.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.wss.$name.peer_cert_as_username", "emqx.listeners", [ + {datatype, {enum, [cn, dn, crt, pem, md5]}} +]}. + +{mapping, "listener.wss.$name.peer_cert_as_clientid", "emqx.listeners", [ + {datatype, {enum, [cn, dn, crt, pem, md5]}} +]}. + +{mapping, "listener.wss.$name.compress", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.level", "emqx.listeners", [ + {datatype, {enum, [none, default, best_compression, best_speed]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.mem_level", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:1-9"]}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.strategy", "emqx.listeners", [ + {datatype, {enum, [default, filtered, huffman_only, rle]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.server_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.client_context_takeover", "emqx.listeners", [ + {datatype, {enum, [takeover, no_takeover]}}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.server_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:8-15"]}, + hidden +]}. + +{mapping, "listener.wss.$name.deflate_opts.client_max_window_bits", "emqx.listeners", [ + {datatype, integer}, + {validators, ["range:8-15"]}, + hidden +]}. + +{mapping, "listener.wss.$name.idle_timeout", "emqx.listeners", [ + {datatype, {duration, ms}}, + hidden +]}. + +{mapping, "listener.wss.$name.max_frame_size", "emqx.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "listener.wss.$name.mqtt_piggyback", "emqx.listeners", [ + {datatype, {enum, [single, multiple]}}, + {default, multiple}, + hidden +]}. + +{mapping, "listener.wss.$name.check_origin_enable", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + {default, false}, + hidden +]}. + +{mapping, "listener.wss.$name.allow_origin_absence", "emqx.listeners", [ + {datatype, {enum, [true, false]}}, + {default, true}, + hidden +]}. + +{mapping, "listener.wss.$name.check_origins", "emqx.listeners", [ + {datatype, string}, + hidden +]}. + +{translation, "emqx.listeners", fun(Conf) -> + + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + + Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end, + + Access = fun(S) -> + [A, CIDR] = string:tokens(S, " "), + {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} + end, + + AccOpts = fun(Prefix) -> + case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of + [] -> []; + Rules -> [{access_rules, [Access(Rule) || {_, Rule} <- Rules]}] + end + end, + + RateLimit = fun(undefined) -> + undefined; + (Val) -> + [L, D] = string:tokens(Val, ", "), + Limit = case cuttlefish_bytesize:parse(L) of + Sz when is_integer(Sz) -> Sz; + {error, Reason} -> error(Reason) + end, + Duration = case cuttlefish_duration:parse(D, s) of + Secs when is_integer(Secs) -> Secs; + {error, Reason1} -> error(Reason1) + end, + {Limit, Duration} + end, + + CheckOrigin = fun(S) -> + Origins = string:tokens(S, ","), + [ list_to_binary(string:trim(O)) || O <- Origins] + end, + + WsOpts = fun(Prefix) -> + case cuttlefish_variable:filter_by_prefix(Prefix ++ ".check_origins", Conf) of + [] -> undefined; + Rules -> + OriginList = [CheckOrigin(Rule) || {_, Rule} <- Rules], + lists:flatten(OriginList) + end + end, + + LisOpts = fun(Prefix) -> + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, + {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, + {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, + {active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)}, + {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, + {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, + {rate_limit, RateLimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))}, + {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, + {proxy_address_header, list_to_binary(string:lowercase(cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, "")))}, + {proxy_port_header, list_to_binary(string:lowercase(cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, "")))}, + {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, + {fail_if_no_subprotocol, cuttlefish:conf_get(Prefix ++ ".fail_if_no_subprotocol", Conf, undefined)}, + {supported_subprotocols, string:tokens(cuttlefish:conf_get(Prefix ++ ".supported_subprotocols", Conf, ""), ", ")}, + {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, + {peer_cert_as_clientid, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_clientid", Conf, undefined)}, + {compress, cuttlefish:conf_get(Prefix ++ ".compress", Conf, undefined)}, + {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}, + {max_frame_size, cuttlefish:conf_get(Prefix ++ ".max_frame_size", Conf, undefined)}, + {mqtt_piggyback, cuttlefish:conf_get(Prefix ++ ".mqtt_piggyback", Conf, undefined)}, + {check_origin_enable, cuttlefish:conf_get(Prefix ++ ".check_origin_enable", Conf, undefined)}, + {allow_origin_absence, cuttlefish:conf_get(Prefix ++ ".allow_origin_absence", Conf, undefined)}, + {check_origins, WsOpts(Prefix)} | AccOpts(Prefix)]) + end, + DeflateOpts = fun(Prefix) -> + Filter([{level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.level", Conf, undefined)}, + {mem_level, cuttlefish:conf_get(Prefix ++ ".deflate_opts.mem_level", Conf, undefined)}, + {strategy, cuttlefish:conf_get(Prefix ++ ".deflate_opts.strategy", Conf, undefined)}, + {server_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_context_takeover", Conf, undefined)}, + {client_context_takeover, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_context_takeover", Conf, undefined)}, + {server_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.server_max_window_bits", Conf, undefined)}, + {client_max_windows_bits, cuttlefish:conf_get(Prefix ++ ".deflate_opts.client_max_window_bits", Conf, undefined)}]) + end, + TcpOpts = fun(Prefix) -> + Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)}, + {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)}, + {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {high_watermark, cuttlefish:conf_get(Prefix ++ ".high_watermark", Conf, undefined)}, + {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}, + {reuseaddr, cuttlefish:conf_get(Prefix ++ ".reuseaddr", Conf, undefined)}]) + end, + SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, + MapPSKCiphers = fun(PSKCiphers) -> + lists:map( + fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; + ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; + ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; + ("PSK-RC4-SHA") -> {psk, rc4_128, sha} + end, PSKCiphers) + end, + SslOpts = fun(Prefix) -> + Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of + undefined -> undefined; + L -> [list_to_atom(V) || V <- L] + end, + TLSCiphers = cuttlefish:conf_get(Prefix++".ciphers", Conf, undefined), + PSKCiphers = cuttlefish:conf_get(Prefix++".psk_ciphers", Conf, undefined), + Ciphers = + case {TLSCiphers, PSKCiphers} of + {undefined, undefined} -> + cuttlefish:invalid(Prefix++".ciphers or "++Prefix++".psk_ciphers is absent"); + {TLSCiphers, undefined} -> + SplitFun(TLSCiphers); + {undefined, PSKCiphers} -> + MapPSKCiphers(SplitFun(PSKCiphers)); + {_TLSCiphers, _PSKCiphers} -> + cuttlefish:invalid(Prefix++".ciphers and "++Prefix++".psk_ciphers cannot be configured at the same time") + end, + UserLookupFun = + case PSKCiphers of + undefined -> undefined; + _ -> {fun emqx_psk:lookup/3, <<>>} + end, + Filter([{versions, Versions}, + {ciphers, Ciphers}, + {user_lookup_fun, UserLookupFun}, + {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, + {depth, cuttlefish:conf_get(Prefix ++ ".depth", Conf, undefined)}, + {password, cuttlefish:conf_get(Prefix ++ ".key_password", Conf, undefined)}, + {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, + {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, + {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, + {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, + {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, + {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}, + {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)}, + {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)}, + {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}]) + end, + + Listen_fix = fun({Ip, Port}) -> case inet:parse_address(Ip) of + {ok, R} -> {R, Port}; + _ -> {Ip, Port} + end; + (Other) -> Other + end, + + TcpListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + ListenOnN = case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> []; + ListenOn -> Listen_fix(ListenOn) + end, + [#{ proto => Atom(Type) + , name => Name + , listen_on => ListenOnN + , opts => [ {deflate_options, DeflateOpts(Prefix)} + , {tcp_options, TcpOpts(Prefix)} + | LisOpts(Prefix) + ] + } + ] + end, + SslListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + [#{ proto => Atom(Type) + , name => Name + , listen_on => Listen_fix(ListenOn) + , opts => [ {deflate_options, DeflateOpts(Prefix)} + , {tcp_options, TcpOpts(Prefix)} + , {ssl_options, SslOpts(Prefix)} + | LisOpts(Prefix) + ] + } + ] + end + end, + + lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] + ++ + [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) +end}. + +%%-------------------------------------------------------------------- +%% Modules +%%-------------------------------------------------------------------- + +{mapping, "modules.loaded_file", "emqx.modules_loaded_file", [ + {datatype, string} +]}. + +{mapping, "module.presence.qos", "emqx.modules", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "module.subscription.$id.topic", "emqx.modules", [ + {datatype, string} +]}. + +{mapping, "module.subscription.$id.qos", "emqx.modules", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "module.subscription.$id.nl", "emqx.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-1"]} +]}. + +{mapping, "module.subscription.$id.rap", "emqx.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-1"]} +]}. + +{mapping, "module.subscription.$id.rh", "emqx.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "module.rewrite.rule.$id", "emqx.modules", [ + {datatype, string} +]}. + +{mapping, "module.rewrite.pub.rule.$id", "emqx.modules", [ + {datatype, string} +]}. + +{mapping, "module.rewrite.sub.rule.$id", "emqx.modules", [ + {datatype, string} +]}. + +{translation, "emqx.modules", fun(Conf, _, Conf1) -> + Subscriptions = fun() -> + List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), + TopicList = [{N, Topic}|| {[_,"subscription",N,"topic"], Topic} <- List], + [{iolist_to_binary(T), #{ qos => cuttlefish:conf_get("module.subscription." ++ N ++ ".qos", Conf, 0), + nl => cuttlefish:conf_get("module.subscription." ++ N ++ ".nl", Conf, 0), + rap => cuttlefish:conf_get("module.subscription." ++ N ++ ".rap", Conf, 0), + rh => cuttlefish:conf_get("module.subscription." ++ N ++ ".rh", Conf, 0) + }} || {N, T} <- TopicList] + end, + Rewrites = fun() -> + Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf), + PubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.pub.rule", Conf), + SubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.sub.rule", Conf), + TotalRules = lists:append( + [ {["module", "rewrite", "pub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ PubRules, + [ {["module", "rewrite", "sub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ SubRules + ), + lists:map(fun({[_, "rewrite", PubOrSub, "rule", I], Rule}) -> + [Topic, Re, Dest] = string:tokens(Rule, " "), + {rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} + end, TotalRules) + end, + lists:append([ + [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}], + [{emqx_mod_subscription, Subscriptions()}], + [{emqx_mod_rewrite, Rewrites()}], + [{emqx_mod_topic_metrics, []}], + [{emqx_mod_delayed, []}], + [{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}] + ]) +end}. + +%%------------------------------------------------------------------- +%% Plugins +%%------------------------------------------------------------------- + +{mapping, "plugins.etc_dir", "emqx.plugins_etc_dir", [ + {datatype, string} +]}. + +{mapping, "plugins.loaded_file", "emqx.plugins_loaded_file", [ + {datatype, string} +]}. + +{mapping, "plugins.expand_plugins_dir", "emqx.expand_plugins_dir", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% Broker +%%-------------------------------------------------------------------- + +{mapping, "broker.sys_interval", "emqx.broker_sys_interval", [ + {datatype, {duration, ms}}, + {default, "1m"} +]}. + +{mapping, "broker.sys_heartbeat", "emqx.broker_sys_heartbeat", [ + {datatype, {duration, ms}}, + {default, "30s"} +]}. + +{mapping, "broker.enable_session_registry", "emqx.enable_session_registry", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "broker.session_locking_strategy", "emqx.session_locking_strategy", [ + {default, quorum}, + {datatype, {enum, [local,leader,quorum,all]}} +]}. + +%% @doc Shared Subscription Dispatch Strategy. +{mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ + {default, round_robin}, + {datatype, + {enum, + [random, %% randomly pick a subscriber + round_robin, %% round robin alive subscribers one message after another + sticky, %% pick a random subscriber and stick to it + hash, %% hash client ID to a group member + hash_clientid, + hash_topic + ]}} +]}. + +%% @doc Enable or disable shared dispatch acknowledgement for QoS1 and QoS2 messages +{mapping, "broker.shared_dispatch_ack_enabled", "emqx.shared_dispatch_ack_enabled", + [ {default, false}, + {datatype, {enum, [true, false]}} + ]}. + +{mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [ + {default, on}, + {datatype, flag} +]}. + +%% @doc Performance toggle for subscribe/unsubscribe wildcard topic. +%% Change this toggle only when there are many wildcard topics. +%% key: mnesia translational updates with per-key locks. recommended for single node setup. +%% tab: mnesia translational updates with table lock. recommended for multi-nodes setup. +%% global: global lock protected updates. recommended for larger cluster. +%% NOTE: when changing from/to 'global' lock, it requires all nodes in the cluster +%% +{mapping, "broker.perf.route_lock_type", "emqx.route_lock_type", [ + {default, key}, + {datatype, {enum, [key, tab, global]}} +]}. + +%% @doc Enable trie path compaction. +%% Enabling it significantly improves wildcard topic subscribe rate, +%% if wildcard topics have unique prefixes like: 'sensor/{{id}}/+/', +%% where ID is unique per subscriber. +%% +%% Topic match performance (when publishing) may degrade if messages +%% are mostly published to topics with large number of levels. +%% +%% NOTE: This is a cluster-wide configuration. +%% It rquires all nodes to be stopped before changing it. +{mapping, "broker.perf.trie_compaction", "emqx.trie_compaction", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +%%-------------------------------------------------------------------- +%% System Monitor +%%-------------------------------------------------------------------- + +%% @doc Long GC, don't monitor in production mode for: +%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +{mapping, "sysmon.long_gc", "emqx.sysmon", [ + {default, 0}, + {datatype, [integer, {duration, ms}]} +]}. + +%% @doc Long Schedule(ms) +{mapping, "sysmon.long_schedule", "emqx.sysmon", [ + {default, 240}, + {datatype, [integer, {duration, ms}]} +]}. + +%% @doc Large Heap +{mapping, "sysmon.large_heap", "emqx.sysmon", [ + {default, "8MB"}, + {datatype, bytesize} +]}. + +%% @doc Monitor Busy Port +{mapping, "sysmon.busy_port", "emqx.sysmon", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Monitor Busy Dist Port +{mapping, "sysmon.busy_dist_port", "emqx.sysmon", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqx.sysmon", fun(Conf) -> + Configs = cuttlefish_variable:filter_by_prefix("sysmon", Conf), + [{list_to_atom(Name), Value} || {[_, Name], Value} <- Configs] +end}. + +%%-------------------------------------------------------------------- +%% Operating System Monitor +%%-------------------------------------------------------------------- + +{mapping, "os_mon.cpu_check_interval", "emqx.os_mon", [ + {default, 60}, + {datatype, {duration, s}} +]}. + +{mapping, "os_mon.cpu_high_watermark", "emqx.os_mon", [ + {default, "80%"}, + {datatype, {percent, float}} +]}. + +{mapping, "os_mon.cpu_low_watermark", "emqx.os_mon", [ + {default, "60%"}, + {datatype, {percent, float}} +]}. + +{mapping, "os_mon.mem_check_interval", "emqx.os_mon", [ + {default, 60}, + {datatype, {duration, s}} +]}. + +{mapping, "os_mon.sysmem_high_watermark", "emqx.os_mon", [ + {default, "70%"}, + {datatype, {percent, float}} +]}. + +{mapping, "os_mon.procmem_high_watermark", "emqx.os_mon", [ + {default, "5%"}, + {datatype, {percent, float}} +]}. + +{translation, "emqx.os_mon", fun(Conf) -> + [{cpu_check_interval, cuttlefish:conf_get("os_mon.cpu_check_interval", Conf)}, + {cpu_high_watermark, cuttlefish:conf_get("os_mon.cpu_high_watermark", Conf) * 100}, + {cpu_low_watermark, cuttlefish:conf_get("os_mon.cpu_low_watermark", Conf) * 100}, + {mem_check_interval, cuttlefish:conf_get("os_mon.mem_check_interval", Conf)}, + {sysmem_high_watermark, cuttlefish:conf_get("os_mon.sysmem_high_watermark", Conf) * 100}, + {procmem_high_watermark, cuttlefish:conf_get("os_mon.procmem_high_watermark", Conf) * 100}] +end}. + +%%-------------------------------------------------------------------- +%% VM Monitor +%%-------------------------------------------------------------------- +{mapping, "vm_mon.check_interval", "emqx.vm_mon", [ + {default, 30}, + {datatype, {duration, s}} +]}. + +{mapping, "vm_mon.process_high_watermark", "emqx.vm_mon", [ + {default, "80%"}, + {datatype, {percent, float}} +]}. + +{mapping, "vm_mon.process_low_watermark", "emqx.vm_mon", [ + {default, "60%"}, + {datatype, {percent, float}} +]}. + +{translation, "emqx.vm_mon", fun(Conf) -> + [{check_interval, cuttlefish:conf_get("vm_mon.check_interval", Conf)}, + {process_high_watermark, cuttlefish:conf_get("vm_mon.process_high_watermark", Conf) * 100}, + {process_low_watermark, cuttlefish:conf_get("vm_mon.process_low_watermark", Conf) * 100}] +end}. + +%%-------------------------------------------------------------------- +%% Alarm +%%-------------------------------------------------------------------- +{mapping, "alarm.actions", "emqx.alarm", [ + {default, "log,publish"}, + {datatype, string} +]}. + +{mapping, "alarm.size_limit", "emqx.alarm", [ + {default, 1000}, + {datatype, integer} +]}. + +{mapping, "alarm.validity_period", "emqx.alarm", [ + {default, "24h"}, + {datatype, {duration, s}} +]}. + +{translation, "emqx.alarm", fun(Conf) -> + [{actions, [list_to_atom(Action) || Action <- string:tokens(cuttlefish:conf_get("alarm.actions", Conf), ",")]}, + {size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)}, + {validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}] +end}. + +%%-------------------------------------------------------------------- +%% Telemetry +%%-------------------------------------------------------------------- +{mapping, "telemetry.enabled", "emqx.telemetry", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "telemetry.url", "emqx.telemetry", [ + {default, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry"}, + {datatype, string} +]}. + +{mapping, "telemetry.report_interval", "emqx.telemetry", [ + {default, "7d"}, + {datatype, {duration, s}} +]}. + +{translation, "emqx.telemetry", fun(Conf) -> + [ {enabled, cuttlefish:conf_get("telemetry.enabled", Conf)} + , {url, cuttlefish:conf_get("telemetry.url", Conf)} + , {report_interval, cuttlefish:conf_get("telemetry.report_interval", Conf)} + ] +end}.