From 4cf181503080fa31169bb449b5128518bb14305b Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 8 Aug 2018 19:23:32 +0800 Subject: [PATCH] Add more configurations for EMQ X R3.0 --- etc/emqx.conf | 928 ++++++++++++++++++++++++++++------------------- priv/emqx.schema | 874 ++++++++++++++++++++++---------------------- 2 files changed, 973 insertions(+), 829 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 555a6d63b..9458e4fed 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -9,7 +9,7 @@ ## Cluster name. ## ## Value: String -cluster.name = emqxcluster +cluster.name = emqxcl ## Cluster auto-discovery strategy. ## @@ -48,7 +48,7 @@ cluster.autoclean = 5m ## Node list of the cluster. ## ## Value: String -## cluster.static.seeds = emq1@127.0.0.1,emq2@127.0.0.1 +## cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1 ##-------------------------------------------------------------------- ## Cluster using IP Multicast. @@ -91,7 +91,7 @@ cluster.autoclean = 5m ## The App name is used to build 'node.name' with IP address. ## ## Value: String -## cluster.dns.app = emq +## cluster.dns.app = emqx ##-------------------------------------------------------------------- ## Cluster using etcd @@ -105,7 +105,7 @@ cluster.autoclean = 5m ## will create a path in etcd: v2/keys/// ## ## Value: String -## cluster.etcd.prefix = emqcl +## cluster.etcd.prefix = emqxcl ## The TTL for node's path in etcd. ## @@ -125,7 +125,7 @@ cluster.autoclean = 5m ## The service name helps lookup EMQ nodes in the cluster. ## ## Value: String -## cluster.k8s.service_name = emq +## cluster.k8s.service_name = emqx ## The address type is used to extract host from k8s service. ## @@ -135,7 +135,7 @@ cluster.autoclean = 5m ## The app name helps build 'node.name'. ## ## Value: String -## cluster.k8s.app_name = emq +## cluster.k8s.app_name = emqx ## Kubernates Namespace ## @@ -143,7 +143,7 @@ cluster.autoclean = 5m ## cluster.k8s.namespace = default ##-------------------------------------------------------------------- -## Node Args +## Node ##-------------------------------------------------------------------- ## Node name. @@ -276,38 +276,54 @@ node.dist_listen_min = 6369 node.dist_listen_max = 6369 ##-------------------------------------------------------------------- -## RPC Args +## RPC ##-------------------------------------------------------------------- -## TCP server port. +## TCP server port for RPC. +## +## Value: Port [1024-65535] rpc.tcp_server_port = 5369 -## Default TCP port for outgoing connections +## TCP port for outgoing RPC connections. +## +## Value: Port [1024-65535] rpc.tcp_client_port = 5369 -## Client connect timeout +## RCP Client connect timeout. +## +## Value: Seconds rpc.connect_timeout = 5000 -## Client and Server send timeout +## TCP send timeout of RPC client and server. +## +## Value: Seconds rpc.send_timeout = 5000 ## Authentication timeout +## +## Value: Seconds rpc.authentication_timeout = 5000 ## Default receive timeout for call() functions +## +## Value: Seconds rpc.call_receive_timeout = 15000 -## Socket keepalive configuration +## Socket idle keepalive. +## +## Value: Seconds rpc.socket_keepalive_idle = 900 -## Seconds between probes +## TCP Keepalive probes interval. +## +## Value: Integer rpc.socket_keepalive_interval = 75 ## Probes lost to close the connection +## +## Value: Integer rpc.socket_keepalive_count = 9 -## TODO: sndbuf, rcvbuf and buffer - ##-------------------------------------------------------------------- ## Log ##-------------------------------------------------------------------- @@ -399,283 +415,35 @@ log.syslog = on ## log.syslog.level = error ##-------------------------------------------------------------------- -## Allow Anonymous Authentication and Default ACL +## Authentication/Access Control ##-------------------------------------------------------------------- -## Allow Anonymous Authentication. -## -## Notice: Disable the option for production deployment. +## Allow anonymous authentication by default if no auth plugins loaded. +## Notice: Disable the option in production deployment! ## ## Value: true | false -mqtt.allow_anonymous = true - -## Default behaviour when ACL nomatch. -## -## Value: allow | deny -mqtt.acl_nomatch = allow +allow_anonymous = true ## Default ACL File. ## ## Value: File Name -mqtt.acl_file = {{ platform_etc_dir }}/acl.conf +acl_file = {{ platform_etc_dir }}/acl.conf -## Whether to cache ACL for publish messages. -## -## Value: true | false -mqtt.cache_acl = true - -##-------------------------------------------------------------------- -## MQTT Protocol -##-------------------------------------------------------------------- - -## Maximum length of MQTT clientId allowed. -## -## Value: Number [23-65535] -mqtt.max_clientid_len = 1024 - -## Maximum MQTT packet size allowed. -## -## Value: Bytes -## -## Default: 64K -mqtt.max_packet_size = 64KB - -## Check if the websocket protocol header is valid. -## Turn off the option when developing WeChat App. +## Whether to enable ACL cache for publish. ## ## Value: on | off -mqtt.websocket_protocol_header = on +enable_acl_cache = on -## Check Websocket Upgrade Header. -## -## Value: on | off -mqtt.websocket_check_upgrade_header = on - -## The backoff for MQTT keepalive timeout. -## EMQ will kick a MQTT connection out until 'Keepalive * backoff * 2' timeout. -## -## Value: Float > 0.5 -mqtt.keepalive_backoff = 0.75 - -##-------------------------------------------------------------------- -## MQTT Connection -##-------------------------------------------------------------------- - -## Force GC the MQTT connections. Value 0 will disable the Force GC. -## -## Value: Number >= 0 -mqtt.conn.force_gc_count = 100 - -##-------------------------------------------------------------------- -## MQTT Client -##-------------------------------------------------------------------- - -## MQTT client idle timeout, specified in seconds. +## The ACL cache age. ## ## Value: Duration -mqtt.client.idle_timeout = 30s +## Default: 5 minute +acl_cache_age = 5m -## TODO: Maximum publish rate of MQTT messages per second. +## The ACL cache size, 0 means no limit. ## -## Value: Number -## mqtt.client.max_publish_rate = 5 - -## Enable per client statistics. -## -## Value: on | off -mqtt.client.enable_stats = off - -##-------------------------------------------------------------------- -## MQTT Session -##-------------------------------------------------------------------- - -## Maximum number of subscriptions allowed, 0 means no limit. -## -## Value: Number -mqtt.session.max_subscriptions = 0 - -## Force to upgrade QoS according to subscription. -## -## Value: on | off -mqtt.session.upgrade_qos = off - -## Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked. -## -## Value: Number -mqtt.session.max_inflight = 32 - -## Retry interval for QoS1/2 message delivering. -## -## Value: Duration -mqtt.session.retry_interval = 20s - -## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL, 0 means no limit. -## -## Value: Number -mqtt.session.max_awaiting_rel = 1000 - -## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout. -## -## Value: Duration -mqtt.session.await_rel_timeout = 30s - -## Enable per session statistics. -## -## Value: on | off -mqtt.session.enable_stats = on - -## Session expiration time. -## -## Value: Duration -## -d: day -## -h: hour -## -m: minute -## -s: second -## -## Default: 2h, 2 hours -mqtt.session.expiry_interval = 2h - -## Whether to ignore loop delivery of messages. -## -## Value: true | false -## -## Default: false -mqtt.session.ignore_loop_deliver = false - -##-------------------------------------------------------------------- -## MQTT Message Queue -##-------------------------------------------------------------------- - -## Message queue type. -## -## Value: simple | priority -mqtt.mqueue.type = simple - -## Topic priority. Default is 0. -## -## Value: Number [0-255] -## -## mqtt.mqueue.priority = topic/1=10,topic/2=8 - -## Maximum queue length. Enqueued messages when persistent client disconnected, -## or inflight window is full. 0 means no limit. -## -## Value: Number >= 0 -mqtt.mqueue.max_length = 1000 - -## Low-water mark of queued messages. -## -## Value: Percent -mqtt.mqueue.low_watermark = 20% - -## High-water mark of queued messages. -## -## Value: Percent -mqtt.mqueue.high_watermark = 60% - -## Whether to enqueue Qos0 messages. -## -## Value: false | true -mqtt.mqueue.store_qos0 = true - -##-------------------------------------------------------------------- -## MQTT Broker and PubSub -##-------------------------------------------------------------------- - -## System interval of publishing $SYS messages. -## -## Value: Duration -## -## Default: 1m, 1 minute -mqtt.broker.sys_interval = 1m - -## The PubSub pool size. Default value should be same as scheduler numbers. -## -## Value: Number > 1 -mqtt.pubsub.pool_size = 8 - -## TODO: Subscribe asynchronously. -## -## Value: true | false -mqtt.pubsub.async = true - -##-------------------------------------------------------------------- -## MQTT Bridge -##-------------------------------------------------------------------- - -## The pending message queue size of bridge. -## -## Value: Number -mqtt.bridge.max_queue_len = 10000 - -## Ping interval of bridge node. -## -## Value: Duration -## -## Default: 1s, 1 second -mqtt.bridge.ping_down_interval = 1s - -##------------------------------------------------------------------- -## Plugins -##------------------------------------------------------------------- - -## The etc dir for plugins' config. -## -## Value: Folder -mqtt.plugins.etc_dir ={{ platform_etc_dir }}/plugins/ - -## The file to store loaded plugin names. -## -## Value: File -mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins - -## File to store loaded plugin names. -mqtt.plugins.expand_plugins_dir = {{ platform_plugins_dir }}/ - -##-------------------------------------------------------------------- -## Modules -##-------------------------------------------------------------------- - -##-------------------------------------------------------------------- -## Presence Module - -## Enable Presence Module. -## -## Value: on | off -module.presence = on - -## Sets the QoS for presence MQTT message. -## -## Value: 0 | 1 | 2 -module.presence.qos = 1 - -##-------------------------------------------------------------------- -## Subscription Module - -## Enable Subscription Module. -## -## Value: on | off -module.subscription = off - -## Subscribe the Topics automatically when client connected. -## module.subscription.1.topic = $client/%c -## Qos of the subscription: 0 | 1 | 2 -## module.subscription.1.qos = 1 - -## module.subscription.2.topic = $user/%u -## module.subscription.2.qos = 1 - -##-------------------------------------------------------------------- -## Rewrite Module - -## Enable Rewrite Module. -## -## Value: on | off -module.rewrite = off - -## {rewrite, Topic, Re, Dest} -## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1 -## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 +## Value: Integer +acl_cache_size = 0 ##-------------------------------------------------------------------- ## Listeners @@ -684,7 +452,7 @@ module.rewrite = off ##-------------------------------------------------------------------- ## MQTT/TCP - External TCP Listener for MQTT Protocol -## listener.tcp. is the IP address and port that the MQTT/TCP +## listener.tcp.$name is the IP address and port that the MQTT/TCP ## listener will bind. ## ## Value: IP:Port | Port @@ -695,37 +463,40 @@ listener.tcp.external = 0.0.0.0:1883 ## The acceptor pool for external MQTT/TCP listener. ## ## Value: Number -listener.tcp.external.acceptors = 16 +listener.tcp.external.acceptors = 8 ## Maximum number of concurrent MQTT/TCP connections. ## ## Value: Number -listener.tcp.external.max_clients = 102400 +listener.tcp.external.max_connections = 1024000 -## Maximum connection per second. +## Maximum external connections per second. ## ## Value: Number listener.tcp.external.max_conn_rate = 1000 ## Zone of the external MQTT/TCP listener belonged to. ## +## See: zone.$name.* +## ## Value: String listener.tcp.external.zone = external -## Mountpoint of the MQTT/TCP Listener. All the topics of this -## listener will be prefixed with the mount point if this option -## is enabled. -## Notice that supports wildcard mount:%c clientid, %u username +## Mountpoint of the MQTT/TCP Listener. All the topics will be prefixed +## with the mountpoint path if this option is enabled. +## +## Variables in mountpoint path: +## - %c: clientid +## - %u: username ## ## Value: String -listener.tcp.external.mountpoint = devicebound/ +## listener.tcp.external.mountpoint = devicebound/ -## Rate limit for the external MQTT/TCP connections. -## Format is 'burst,rate'. +## Rate limit for the external MQTT/TCP connections. Format is 'rate,burst'. ## -## Value: burst,rate -## Unit: KB/sec -listener.tcp.external.rate_limit = 100,10 +## Value: rate,burst +## Unit: Bps +## listener.tcp.external.rate_limit = 1024,4096 ## The access control rules for the MQTT/TCP listener. ## @@ -736,7 +507,7 @@ listener.tcp.external.rate_limit = 100,10 ## Example: allow 192.168.0.0/24 listener.tcp.external.access.1 = allow all -## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed +## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed ## behind HAProxy or Nginx. ## ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ @@ -744,18 +515,17 @@ listener.tcp.external.access.1 = allow all ## Value: on | off ## listener.tcp.external.proxy_protocol = on -## Sets the timeout for proxy protocol. EMQ will close the TCP connection +## Sets the timeout for proxy protocol. EMQ X will close the TCP connection ## if no proxy protocol packet recevied within the timeout. ## ## Value: Duration ## listener.tcp.external.proxy_protocol_timeout = 3s ## Enable the option for X.509 certificate based authentication. -## EMQ will Use the PP2_SUBTYPE_SSL_CN field in Proxy Protocol V2 -## as MQTT username. +## EMQX will use the common name of certificate as MQTT username. ## -## Value: cn -## listener.tcp.external.peer_cert_as_username = cn +## Value: boolean +## listener.tcp.external.peer_cert_as_username = true ## The TCP backlog defines the maximum length that the queue of pending ## connections can grow to. @@ -778,14 +548,14 @@ listener.tcp.external.send_timeout_close = on ## See: http://erlang.org/doc/man/inet.html ## ## Value: Bytes -## listener.tcp.external.recbuf = 4KB +## listener.tcp.external.recbuf = 2KB ## The TCP send buffer(os kernel) for MQTT connections. ## ## See: http://erlang.org/doc/man/inet.html ## ## Value: Bytes -## listener.tcp.external.sndbuf = 4KB +## listener.tcp.external.sndbuf = 2KB ## The size of the user-level software buffer used by the driver. ## Not to be confused with options sndbuf and recbuf, which correspond @@ -797,7 +567,7 @@ listener.tcp.external.send_timeout_close = on ## See: http://erlang.org/doc/man/inet.html ## ## Value: Bytes -## listener.tcp.external.buffer = 4KB +## listener.tcp.external.buffer = 2KB ## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled. ## @@ -834,7 +604,12 @@ listener.tcp.internal.acceptors = 4 ## Maximum number of concurrent MQTT/TCP connections. ## ## Value: Number -listener.tcp.internal.max_clients = 102400 +listener.tcp.internal.max_connections = 10240000 + +## Maximum internal connections per second. +## +## Value: Number +listener.tcp.internal.max_conn_rate = 1000 ## Zone of the internal MQTT/TCP listener belonged to. ## @@ -843,42 +618,43 @@ listener.tcp.internal.zone = internal ## Mountpoint of the MQTT/TCP Listener. ## -## See: listener.tcp..mountpoint +## See: listener.tcp.$name.mountpoint ## ## Value: String ## listener.tcp.internal.mountpoint = internal/ ## Rate limit for the internal MQTT/TCP connections. ## -## See: listener.tcp..rate_limit +## See: listener.tcp.$name.rate_limit ## -## Value: burst,rate -## listener.tcp.internal.rate_limit = 1000,100 +## Value: rate,burst +## Unit: Bps +## listener.tcp.internal.rate_limit = 1000000,2000000 ## The TCP backlog of internal MQTT/TCP Listener. ## -## See: listener.tcp..backlog +## See: listener.tcp.$name.backlog ## ## Value: Number >= 0 listener.tcp.internal.backlog = 512 ## The TCP send timeout for internal MQTT connections. ## -## See: listener.tcp..send_timeout +## See: listener.tcp.$name.send_timeout ## ## Value: Duration listener.tcp.internal.send_timeout = 5s ## Close the MQTT/TCP connection if send timeout. ## -## See: listener.tcp..send_timeout_close +## See: listener.tcp.$name.send_timeout_close ## ## Value: on | off listener.tcp.external.send_timeout_close = on ## The TCP receive buffer(os kernel) for internal MQTT connections. ## -## See: listener.tcp..recbuf +## See: listener.tcp.$name.recbuf ## ## Value: Bytes ## listener.tcp.internal.recbuf = 16KB @@ -892,21 +668,21 @@ listener.tcp.external.send_timeout_close = on ## The size of the user-level software buffer used by the driver. ## -## See: listener.tcp..buffer +## See: listener.tcp.$name.buffer ## ## Value: Bytes ## listener.tcp.internal.buffer = 16KB ## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled. ## -## See: listener.tcp..tune_buffer +## See: listener.tcp.$name.tune_buffer ## ## Value: on | off ## listener.tcp.internal.tune_buffer = off ## The TCP_NODELAY flag for internal MQTT connections. ## -## See: listener.tcp..nodelay +## See: listener.tcp.$name.nodelay ## ## Value: true | false listener.tcp.internal.nodelay = false @@ -919,7 +695,7 @@ listener.tcp.internal.reuseaddr = true ##-------------------------------------------------------------------- ## MQTT/SSL - External SSL Listener for MQTT Protocol -## listener.ssl. is the IP address and port that the MQTT/SSL +## listener.ssl.$name is the IP address and port that the MQTT/SSL ## listener will bind. ## ## Value: IP:Port | Port @@ -935,12 +711,12 @@ listener.ssl.external.acceptors = 16 ## Maximum number of concurrent MQTT/SSL connections. ## ## Value: Number -listener.ssl.external.max_clients = 102400 +listener.ssl.external.max_connections = 102400 ## Maximum MQTT/SSL connections per second. ## ## Value: Number -listener.ssl.external.max_conn_rate = 1000 +listener.ssl.external.max_conn_rate = 500 ## Zone of the external MQTT/SSL listener belonged to. ## @@ -950,31 +726,32 @@ listener.ssl.external.zone = external ## Mountpoint of the MQTT/SSL Listener. ## ## Value: String -## listener.ssl.external.mountpoint = inbound/ +## listener.ssl.external.mountpoint = devicebound/ ## The access control rules for the MQTT/SSL listener. ## -## See: listener.tcp..access +## See: listener.tcp.$name.access ## ## Value: ACL Rule listener.ssl.external.access.1 = allow all ## Rate limit for the external MQTT/SSL connections. ## -## Value: burst,rate -## listener.ssl.external.rate_limit = 100,10 +## Value: rate,burst +## Unit: Bps +## listener.ssl.external.rate_limit = 1024,4096 ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. ## -## See: listener.tcp..proxy_protocol +## See: listener.tcp.$name.proxy_protocol ## ## Value: on | off ## listener.ssl.external.proxy_protocol = on ## Sets the timeout for proxy protocol. ## -## See: listener.tcp..proxy_protocol_timeout +## See: listener.tcp.$name.proxy_protocol_timeout ## ## Value: Duration ## listener.ssl.external.proxy_protocol_timeout = 3s @@ -1088,64 +865,64 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## Value: on | off ## listener.ssl.external.honor_cipher_order = on -## Use the CN or DN value from the client certificate as a username. +## Use the CN field from the client certificate as a username. ## Notice that 'verify' should be set as 'verify_peer'. ## -## Value: cn | dn +## Value: boolean ## listener.ssl.external.peer_cert_as_username = cn ## TCP backlog for the SSL connection. ## -## See listener.tcp..backlog +## See listener.tcp.$name.backlog ## ## Value: Number >= 0 ## listener.ssl.external.backlog = 1024 ## The TCP send timeout for the SSL connection. ## -## See listener.tcp..send_timeout +## See listener.tcp.$name.send_timeout ## ## Value: Duration ## listener.ssl.external.send_timeout = 15s ## Close the SSL connection if send timeout. ## -## See: listener.tcp..send_timeout_close +## See: listener.tcp.$name.send_timeout_close ## ## Value: on | off ## listener.ssl.external.send_timeout_close = on ## The TCP receive buffer(os kernel) for the SSL connections. ## -## See: listener.tcp..recbuf +## See: listener.tcp.$name.recbuf ## ## Value: Bytes ## listener.ssl.external.recbuf = 4KB ## The TCP send buffer(os kernel) for internal MQTT connections. ## -## See: listener.tcp..sndbuf +## See: listener.tcp.$name.sndbuf ## ## Value: Bytes ## listener.ssl.external.sndbuf = 4KB ## The size of the user-level software buffer used by the driver. ## -## See: listener.tcp..buffer +## See: listener.tcp.$name.buffer ## ## Value: Bytes ## listener.ssl.external.buffer = 4KB ## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled. ## -## See: listener.tcp..tune_buffer +## See: listener.tcp.$name.tune_buffer ## ## Value: on | off ## listener.ssl.external.tune_buffer = off ## The TCP_NODELAY flag for SSL connections. ## -## See: listener.tcp..nodelay +## See: listener.tcp.$name.nodelay ## ## Value: true | false ## listener.ssl.external.nodelay = true @@ -1156,9 +933,9 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem listener.ssl.external.reuseaddr = true ##-------------------------------------------------------------------- -## External WebSocket Listener for MQTT Protocol +## External WebSocket listener for MQTT protocol -## listener.ws. is the IP address and port that the MQTT/WebSocket +## listener.ws.$name is the IP address and port that the MQTT/WebSocket ## listener will bind. ## ## Value: IP:Port | Port @@ -1174,13 +951,19 @@ listener.ws.external.acceptors = 4 ## Maximum number of concurrent MQTT/WebSocket connections. ## ## Value: Number -listener.ws.external.max_clients = 102400 +listener.ws.external.max_connections = 102400 ## Maximum MQTT/WebSocket connections per second. ## ## Value: Number listener.ws.external.max_conn_rate = 1000 +## Rate limit for the MQTT/WebSocket connections. +## +## Value: rate,burst +## Unit: Bps +## listener.ws.external.rate_limit = 1024,4096 + ## Zone of the external MQTT/WebSocket listener belonged to. ## ## Value: String @@ -1188,25 +971,30 @@ listener.ws.external.zone = external ## Mountpoint of the MQTT/WebSocket Listener. ## -## See: listener.tcp..mountpoint +## See: listener.tcp.$name.mountpoint ## ## Value: String -## listener.ws.external.mountpoint = external/ +## listener.ws.external.mountpoint = devicebound/ ## The access control for the MQTT/WebSocket listener. ## -## See: listener.tcp..access +## See: listener.tcp.$name.access ## ## Value: ACL Rule listener.ws.external.access.1 = allow all -## Use X-Forwarded-For header for real source IP if the EMQ cluster is +## Verify if the protocol header is valid. Turn off for WeChat MiniApp. +## +## Value: on | off +listener.ws.external.verify_protocol_header = on + +## Use X-Forwarded-For header for real source IP if the EMQ X cluster is ## deployed behind NGINX or HAProxy. ## ## Value: String ## listener.ws.external.proxy_address_header = X-Forwarded-For -## Use X-Forwarded-Port header for real source port if the EMQ cluster is +## Use X-Forwarded-Port header for real source port if the EMQ X cluster is ## deployed behind NGINX or HAProxy. ## ## Value: String @@ -1215,70 +1003,70 @@ listener.ws.external.access.1 = allow all ## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind ## HAProxy or Nginx. ## -## See: listener.tcp..proxy_protocol +## See: listener.tcp.$name.proxy_protocol ## ## Value: on | off ## listener.ws.external.proxy_protocol = on ## Sets the timeout for proxy protocol. ## -## See: listener.tcp..proxy_protocol_timeout +## See: listener.tcp.$name.proxy_protocol_timeout ## ## Value: Duration ## listener.ws.external.proxy_protocol_timeout = 3s ## The TCP backlog of external MQTT/WebSocket Listener. ## -## See: listener.tcp..backlog +## See: listener.tcp.$name.backlog ## ## Value: Number >= 0 listener.ws.external.backlog = 1024 ## The TCP send timeout for external MQTT/WebSocket connections. ## -## See: listener.tcp..send_timeout +## See: listener.tcp.$name.send_timeout ## ## Value: Duration listener.ws.external.send_timeout = 15s ## Close the MQTT/WebSocket connection if send timeout. ## -## See: listener.tcp..send_timeout_close +## See: listener.tcp.$name.send_timeout_close ## ## Value: on | off listener.ws.external.send_timeout_close = on ## The TCP receive buffer(os kernel) for external MQTT/WebSocket connections. ## -## See: listener.tcp..recbuf +## See: listener.tcp.$name.recbuf ## ## Value: Bytes -## listener.ws.external.recbuf = 4KB +## listener.ws.external.recbuf = 2KB ## The TCP send buffer(os kernel) for external MQTT/WebSocket connections. ## -## See: listener.tcp..sndbuf +## See: listener.tcp.$name.sndbuf ## ## Value: Bytes -## listener.ws.external.sndbuf = 4KB +## listener.ws.external.sndbuf = 2KB ## The size of the user-level software buffer used by the driver. ## -## See: listener.tcp..buffer +## See: listener.tcp.$name.buffer ## ## Value: Bytes -## listener.ws.external.buffer = 4KB +## listener.ws.external.buffer = 2KB ## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled. ## -## See: listener.tcp..tune_buffer +## See: listener.tcp.$name.tune_buffer ## ## Value: on | off ## listener.ws.external.tune_buffer = off ## The TCP_NODELAY flag for external MQTT/WebSocket connections. ## -## See: listener.tcp..nodelay +## See: listener.tcp.$name.nodelay ## ## Value: true | false listener.ws.external.nodelay = true @@ -1291,7 +1079,7 @@ listener.ws.external.reuseaddr = true ##-------------------------------------------------------------------- ## External WebSocket/SSL listener for MQTT Protocol -## listener.wss. is the IP address and port that the MQTT/WebSocket/SSL +## listener.wss.$name is the IP address and port that the MQTT/WebSocket/SSL ## listener will bind. ## ## Value: IP:Port | Port @@ -1307,13 +1095,21 @@ listener.wss.external.acceptors = 4 ## Maximum number of concurrent MQTT/Webwocket/SSL connections. ## ## Value: Number -listener.wss.external.max_clients = 64 +listener.wss.external.max_connections = 16 ## Maximum MQTT/WebSocket/SSL connections per second. ## +## See: listener.tcp.$name.max_conn_rate +## ## Value: Number listener.wss.external.max_conn_rate = 1000 +## Rate limit for the MQTT/WebSocket/SSL connections. +## +## Value: rate,burst +## Unit: Bps +## listener.wss.external.rate_limit = 1024,4096 + ## Zone of the external MQTT/WebSocket/SSL listener belonged to. ## ## Value: String @@ -1321,18 +1117,23 @@ listener.wss.external.zone = external ## Mountpoint of the MQTT/WebSocket/SSL Listener. ## -## See: listener.tcp..mountpoint +## See: listener.tcp.$name.mountpoint ## ## Value: String -## listener.wss.external.mountpoint = inbound/ +## listener.wss.external.mountpoint = devicebound/ ## The access control rules for the MQTT/WebSocket/SSL listener. ## -## See: listener.tcp..access. +## See: listener.tcp.$name.access. ## ## Value: ACL Rule listener.wss.external.access.1 = allow all +## See: listener.ws.external.verify_protocol_header +## +## Value: on | off +listener.wss.external.verify_protocol_header = on + ## See: listener.ws.external.proxy_address_header ## ## Value: String @@ -1345,138 +1146,138 @@ listener.wss.external.access.1 = allow all ## Enable the Proxy Protocol V1/2 support. ## -## See: listener.tcp..proxy_protocol +## See: listener.tcp.$name.proxy_protocol ## ## Value: on | off ## listener.wss.external.proxy_protocol = on ## Sets the timeout for proxy protocol. ## -## See: listener.tcp..proxy_protocol_timeout +## See: listener.tcp.$name.proxy_protocol_timeout ## ## Value: Duration ## listener.wss.external.proxy_protocol_timeout = 3s ## TLS versions only to protect from POODLE attack. ## -## See: listener.ssl..tls_versions +## See: listener.ssl.$name.tls_versions ## ## Value: String, seperated by ',' ## listener.wss.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1 ## TLS Handshake timeout. ## -## See: listener.ssl..handshake_timeout +## See: listener.ssl.$name.handshake_timeout ## ## Value: Duration listener.wss.external.handshake_timeout = 15s ## Path to the file containing the user's private PEM-encoded key. ## -## See: listener.ssl..keyfile +## See: listener.ssl.$name.keyfile ## ## Value: File listener.wss.external.keyfile = {{ platform_etc_dir }}/certs/key.pem ## Path to a file containing the user certificate. ## -## See: listener.ssl..certfile +## See: listener.ssl.$name.certfile ## ## Value: File listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem ## Path to the file containing PEM-encoded CA certificates. ## -## See: listener.ssl..cacert +## See: listener.ssl.$name.cacert ## ## Value: File ## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem -## See: listener.ssl..dhfile +## See: listener.ssl.$name.dhfile ## ## Value: File ## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem -## See: listener.ssl..vefify +## See: listener.ssl.$name.vefify ## ## Value: vefify_peer | verify_none ## listener.wss.external.verify = verify_peer -## See: listener.ssl..fail_if_no_peer_cert +## See: listener.ssl.$name.fail_if_no_peer_cert ## ## Value: false | true ## listener.wss.external.fail_if_no_peer_cert = true -## See: listener.ssl..ciphers +## See: listener.ssl.$name.ciphers ## ## Value: Ciphers ## listener.wss.external.ciphers = -## See: listener.ssl..secure_renegotiate +## See: listener.ssl.$name.secure_renegotiate ## ## Value: on | off ## listener.wss.external.secure_renegotiate = off -## See: listener.ssl..reuse_sessions +## See: listener.ssl.$name.reuse_sessions ## ## Value: on | off ## listener.wss.external.reuse_sessions = on -## See: listener.ssl..honor_cipher_order +## See: listener.ssl.$name.honor_cipher_order ## ## Value: on | off ## listener.wss.external.honor_cipher_order = on -## See: listener.ssl..peer_cert_as_username +## See: listener.ssl.$name.peer_cert_as_username ## ## Value: cn | dn ## listener.wss.external.peer_cert_as_username = cn ## TCP backlog for the WebSocket/SSL connection. ## -## See: listener.tcp..backlog +## See: listener.tcp.$name.backlog ## ## Value: Number >= 0 listener.wss.external.backlog = 1024 ## The TCP send timeout for the WebSocket/SSL connection. ## -## See: listener.tcp..send_timeout +## See: listener.tcp.$name.send_timeout ## ## Value: Duration listener.wss.external.send_timeout = 15s ## Close the WebSocket/SSL connection if send timeout. ## -## See: listener.tcp..send_timeout_close +## See: listener.tcp.$name.send_timeout_close ## ## Value: on | off listener.wss.external.send_timeout_close = on ## The TCP receive buffer(os kernel) for the WebSocket/SSL connections. ## -## See: listener.tcp..recbuf +## See: listener.tcp.$name.recbuf ## ## Value: Bytes ## listener.wss.external.recbuf = 4KB ## The TCP send buffer(os kernel) for the WebSocket/SSL connections. ## -## See: listener.tcp..sndbuf +## See: listener.tcp.$name.sndbuf ## ## Value: Bytes ## listener.wss.external.sndbuf = 4KB ## The size of the user-level software buffer used by the driver. ## -## See: listener.tcp..buffer +## See: listener.tcp.$name.buffer ## ## Value: Bytes ## listener.wss.external.buffer = 4KB ## The TCP_NODELAY flag for WebSocket/SSL connections. ## -## See: listener.tcp..nodelay +## See: listener.tcp.$name.nodelay ## ## Value: true | false ## listener.wss.external.nodelay = true @@ -1486,9 +1287,376 @@ listener.wss.external.send_timeout_close = on ## Value: true | false listener.wss.external.reuseaddr = true +##-------------------------------------------------------------------- +## Zones +##-------------------------------------------------------------------- + +##-------------------------------------------------------------------- +## External Zone + +## Idle timeout of the external MQTT connections. +## +## Value: duration +zone.external.idle_timeout = 15s + +## Publish limit for the external MQTT connections. +## +## Value: rate,burst +## Default: 10 messages per second, and 100 messages burst. +## zone.external.publish_limit = 10,100 + +## Enable ACL check. +## +## Value: Flag +zone.external.enable_acl = on + +## Enable per connection statistics. +## +## Value: on | off +zone.external.enable_stats = on + +## Maximum MQTT packet size allowed. +## +## Value: Bytes +## Default: 64KB +zone.external.max_packet_size = 1MB + +## Maximum length of MQTT clientId allowed. +## +## Value: Number [23-65535] +zone.external.max_clientid_len = 65535 + +## Maximum topic levels allowed. 0 means no limit. +## +## Value: Number +zone.external.max_topic_levels = 0 + +## Maximum QoS allowed. +## +## Value: 0 | 1 | 2 +zone.external.max_qos_allowed = 2 + +## Maximum Topic Alias, 0 means no limit. +## +## Value: 0-65535 +zone.external.max_topic_alias = 0 + +## Whether the Server supports retained messages. +## +## Value: boolean +zone.external.retain_available = true + +## Whether the Server supports Wildcard Subscriptions +## +## Value: boolean +zone.external.wildcard_subscription = true + +## Whether the Server supports Shared Subscriptions +## +## Value: boolean +zone.external.shared_subscription = true + +## The backoff for MQTT keepalive timeout. The broker will kick a connection out +## until 'Keepalive * backoff * 2' timeout. +## +## Value: Float > 0.5 +zone.external.keepalive_backoff = 0.75 + +## Maximum number of subscriptions allowed, 0 means no limit. +## +## Value: Number +zone.external.max_subscriptions = 0 + +## Force to upgrade QoS according to subscription. +## +## Value: on | off +zone.external.upgrade_qos = off + +## Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked. +## +## Value: Number +zone.external.max_inflight = 32 + +## Retry interval for QoS1/2 message delivering. +## +## Value: Duration +zone.external.retry_interval = 20s + +## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL, 0 means no limit. +## +## Value: Number +zone.external.max_awaiting_rel = 100 + +## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout. +## +## Value: Duration +zone.external.await_rel_timeout = 60s + +## Whether to ignore loop delivery of messages. +## +## Value: true | false +## Default: false +zone.external.ignore_loop_deliver = false + +## Default session expiry interval for MQTT V3.1.1 connections. +## +## Value: Duration +## -d: day +## -h: hour +## -m: minute +## -s: second +## +## Default: 2h, 2 hours +zone.external.session_expiry_interval = 2h + +## Maximum queue length. Enqueued messages when persistent client disconnected, +## or inflight window is full. 0 means no limit. +## +## Value: Number >= 0 +zone.external.max_mqueue_len = 1000 + +## Whether to enqueue Qos0 messages. +## +## Value: false | true +zone.external.mqueue_store_qos0 = true + +##-------------------------------------------------------------------- +## Internal Zone + +## Enable per connection stats. +## +## Value: Flag +zone.internal.enable_stats = on + +## Enable ACL check. +## +## Value: Flag +zone.internal.enable_acl = off + +## See zone.$name.wildcard_subscription. +## +## Value: boolean +zone.internal.wildcard_subscription = true + +## See zone.$name.shared_subscription. +## +## Value: boolean +zone.internal.shared_subscription = true + +## See zone.$name.max_subscriptions. +## +## Value: Integer +zone.internal.max_subscriptions = 0 + +## See zone.$name.max_inflight +## +## Value: Number +zone.internal.max_inflight = 32 + +## See zone.$name.max_awaiting_rel +## +## Value: Number +zone.internal.max_awaiting_rel = 100 + +## See zone.$name.max_mqueue_len +## +## Value: Number >= 0 +zone.internal.max_mqueue_len = 1000 + +## Whether to enqueue Qos0 messages. +## +## Value: false | true +zone.internal.mqueue_store_qos0 = true + +##-------------------------------------------------------------------- +## Bridges +##-------------------------------------------------------------------- + +## Bridge Type. +## +## Value: local | remote +bridge.name.type = local + +## Bridge address: node name for local bridge, host:port for remote. +## +## Value: String +## Example: emqx@127.0.0.1, 127.0.0.1:1883 +bridge.name.address = emqx@127.0.0.1 + +## Protocol version of the bridge. +## +## Value: Enum +## - mqtt5 +## - mqtt4 +## - mqtt3 +bridge.name.proto_ver = mqtt4 + +## The ClientId of a remote bridge. +## +## Value: String +bridge.name.client_id = bridge:$name + +## The Clean start flag of a remote bridge. +## +## Value: boolean +bridge.name.clean_start = false + +## The username for a remote bridge. +## +## Value: String +bridge.name.username = user + +## The password for a remote bridge. +## +## Value: String +bridge.name.password = passwd + +## Mountpoint of the bridge. +## +## Value: String +bridge.name.mountpoint = bridge/$name/ + +## PEM-encoded CA certificates of the bridge. +## +## Value: File +bridge.name.cacertfile = cacert.pem + +## SSL Certfile of the bridge. +## +## Value: File +bridge.name.certfile = cert.pem + +## SSL Keyfile of the bridge. +## +## Value: File +bridge.name.keyfile = key.pem + +## SSL Ciphers used by the bridge. +## +## Value: String +bridge.name.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384 + +## TLS versions used by the bridge. +## +## Value: String +bridge.name.tls_versions = tlsv1.2,tlsv1.1,tlsv1 + +## The pending message queue of a bridge. +## +## Value: Number +bridge.name.max_pending_messages = 10000 + +## Ping interval of a down bridge. +## +## Value: Duration +## Default: 10 seconds +bridge.name.keepalive = 10s + +## Subscriptions of the bridge. +## +## Default: 10 seconds +bridge.name.subscription.1.topic = topic1/ +bridge.name.subscription.1.qos = 2 +## bridge.name.subscription.2.topic = topic2/ +## bridge.name.subscription.2.qos = 2 + +##-------------------------------------------------------------------- +## Modules +##-------------------------------------------------------------------- + +##-------------------------------------------------------------------- +## Presence Module + +## Enable Presence Module. +## +## Value: on | off +module.presence = on + +## Sets the QoS for presence MQTT message. +## +## Value: 0 | 1 | 2 +module.presence.qos = 1 + +##-------------------------------------------------------------------- +## Subscription Module + +## Enable Subscription Module. +## +## Value: on | off +module.subscription = off + +## Subscribe the Topics automatically when client connected. +## module.subscription.1.topic = $client/%c +## Qos of the subscription: 0 | 1 | 2 +## module.subscription.1.qos = 1 + +## module.subscription.2.topic = $user/%u +## module.subscription.2.qos = 1 + +##-------------------------------------------------------------------- +## Rewrite Module + +## Enable Rewrite Module. +## +## Value: on | off +module.rewrite = off + +## {rewrite, Topic, Re, Dest} +## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1 +## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 + ##------------------------------------------------------------------- +## Plugins +##------------------------------------------------------------------- + +## The etc dir for plugins' config. +## +## Value: Folder +plugins.etc_dir ={{ platform_etc_dir }}/plugins/ + +## The file to store loaded plugin names. +## +## Value: File +plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins + +## File to store loaded plugin names. +plugins.expand_plugins_dir = {{ platform_plugins_dir }}/ + +##-------------------------------------------------------------------- +## Broker +##-------------------------------------------------------------------- + +## System interval of publishing $SYS messages. +## +## Value: Duration +## Default: 1m, 1 minute +broker.sys_interval = 1m + +## Session locking strategy in a cluster. +## +## Value: Enum +## - local +## - one +## - quorum +## - all +broker.session_locking_strategy = quorum + +## Dispatch strategy for shared subscription +## +## Value: Enum +## - random +## - round_robbin +## - hash +broker.shared_subscription_strategy = random + +## Enable batch clean for deleted routes. +## +## Value: Flag +broker.route_batch_clean = on + +##-------------------------------------------------------------------- ## System Monitor -##------------------------------------------------------------------- +##-------------------------------------------------------------------- ## Enable Long GC monitoring. ## Notice: don't enable the monitor in production for: diff --git a/priv/emqx.schema b/priv/emqx.schema index 7ff5fd0a3..3a3267930 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -165,10 +165,10 @@ end}. %%-------------------------------------------------------------------- -%% Erlang Node +%% Node %%-------------------------------------------------------------------- -%% @doc Erlang node name +%% @doc Node name {mapping, "node.name", "vm_args.-name", [ {default, "emqx@127.0.0.1"} ]}. @@ -321,7 +321,7 @@ end}. ]}. %%-------------------------------------------------------------------- -%% RPC Args +%% RPC %%-------------------------------------------------------------------- %% RPC server port. @@ -550,368 +550,39 @@ end}. ]}. %%-------------------------------------------------------------------- -%% Allow Anonymous and Default ACL +%% Authentication/ACL %%-------------------------------------------------------------------- -%% @doc Allow Anonymous -{mapping, "mqtt.allow_anonymous", "emqx.allow_anonymous", [ +%% @doc Allow anonymous authentication. +{mapping, "allow_anonymous", "emqx.allow_anonymous", [ {default, false}, {datatype, {enum, [true, false]}} ]}. -%% @doc ACL nomatch -{mapping, "mqtt.acl_nomatch", "emqx.acl_nomatch", [ - {default, allow}, - {datatype, {enum, [allow, deny]}} -]}. - -%% @doc Default ACL File -{mapping, "mqtt.acl_file", "emqx.acl_file", [ +%% @doc Default ACL file. +{mapping, "acl_file", "emqx.acl_file", [ {datatype, string}, hidden ]}. -%% @doc Cache ACL for PUBLISH -{mapping, "mqtt.cache_acl", "emqx.cache_acl", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Protocol -%%-------------------------------------------------------------------- - -%% @doc Set the Max ClientId Length Allowed. -{mapping, "mqtt.max_clientid_len", "emqx.protocol", [ - {default, 1024}, - {datatype, integer} -]}. - -%% @doc Max Packet Size Allowed, 64K by default. -{mapping, "mqtt.max_packet_size", "emqx.protocol", [ - {default, "64KB"}, - {datatype, bytesize} -]}. - -%% @doc Keepalive backoff -{mapping, "mqtt.keepalive_backoff", "emqx.protocol", [ - {default, 1.25}, - {datatype, float} -]}. - -{translation, "emqx.protocol", fun(Conf) -> - [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, - {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}, - {keepalive_backoff, cuttlefish:conf_get("mqtt.keepalive_backoff", Conf)}] -end}. - -{mapping, "mqtt.websocket_protocol_header", "emqx.websocket_protocol_header", [ +%% @doc Enable ACL cache for publish. +{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [ {default, on}, {datatype, flag} ]}. -{mapping, "mqtt.websocket_check_upgrade_header", "emqx.websocket_check_upgrade_header", [ - {default, on}, - {datatype, flag} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Connection -%%-------------------------------------------------------------------- - -%% @doc Force the client to GC: integer -{mapping, "mqtt.conn.force_gc_count", "emqx.conn_force_gc_count", [ - {datatype, integer} -]}. - -%%-------------------------------------------------------------------- -%% MQTT Client -%%-------------------------------------------------------------------- - -%% @doc Max Publish Rate of Message -{mapping, "mqtt.client.max_publish_rate", "emqx.client", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Client Idle Timeout. -{mapping, "mqtt.client.idle_timeout", "emqx.client", [ - {default, "30s"}, +%% @doc ACL cache age. +{mapping, "acl_cache_age", "emqx.acl_cache_age", [ + {default, "5m"}, {datatype, {duration, ms}} ]}. -%% @doc Enable Stats of Client. -{mapping, "mqtt.client.enable_stats", "emqx.client", [ - {default, off}, - {datatype, flag} -]}. - -{translation, "emqx.client", fun(Conf) -> - [{max_publish_rate, cuttlefish:conf_get("mqtt.client.max_publish_rate", Conf)}, - {client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, - {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT Session -%%-------------------------------------------------------------------- - -%% @doc Max Number of Subscriptions Allowed -{mapping, "mqtt.session.max_subscriptions", "emqx.session", [ +%% @doc ACL cache size. +{mapping, "acl_cache_size", "emqx.acl_cache_size", [ {default, 0}, {datatype, integer} ]}. -%% @doc Upgrade QoS? -{mapping, "mqtt.session.upgrade_qos", "emqx.session", [ - {default, off}, - {datatype, flag} -]}. - -%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. -%% 0 means no limit -{mapping, "mqtt.session.max_inflight", "emqx.session", [ - {default, 100}, - {datatype, integer} -]}. - -%% @doc Retry interval for redelivering QoS1/2 messages. -{mapping, "mqtt.session.retry_interval", "emqx.session", [ - {default, "20s"}, - {datatype, {duration, ms}} -]}. - -%% @doc Max Packets that Awaiting PUBREL, 0 means no limit -{mapping, "mqtt.session.max_awaiting_rel", "emqx.session", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Awaiting PUBREL Timeout -{mapping, "mqtt.session.await_rel_timeout", "emqx.session", [ - {default, "20s"}, - {datatype, {duration, ms}} -]}. - -%% @doc Enable Stats -{mapping, "mqtt.session.enable_stats", "emqx.session", [ - {default, off}, - {datatype, flag} -]}. - -%% @doc Session Expiry Interval -{mapping, "mqtt.session.expiry_interval", "emqx.session", [ - {default, "2h"}, - {datatype, {duration, ms}} -]}. - -%% @doc Ignore message from self publish -{mapping, "mqtt.session.ignore_loop_deliver", "emqx.session", [ - {default, false}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqx.session", fun(Conf) -> - [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)}, - {upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, - {max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, - {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, - {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, - {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, - {enable_stats, cuttlefish:conf_get("mqtt.session.enable_stats", Conf)}, - {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}, - {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT MQueue -%%-------------------------------------------------------------------- - -%% @doc Type: simple | priority -{mapping, "mqtt.mqueue.type", "emqx.mqueue", [ - {default, simple}, - {datatype, atom} -]}. - -%% @doc Topic Priority: 0~255, Default is 0 -{mapping, "mqtt.mqueue.priority", "emqx.mqueue", [ - {default, ""}, - {datatype, string} -]}. - -%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. 0 means no limit. -{mapping, "mqtt.mqueue.max_length", "emqx.mqueue", [ - {default, 0}, - {datatype, integer} -]}. - -%% @doc Low-water mark of queued messages -{mapping, "mqtt.mqueue.low_watermark", "emqx.mqueue", [ - {default, "20%"}, - {datatype, {percent, float}} -]}. - -%% @doc High-water mark of queued messages -{mapping, "mqtt.mqueue.high_watermark", "emqx.mqueue", [ - {default, "60%"}, - {datatype, {percent, float}} -]}. - -%% @doc Queue Qos0 messages? -{mapping, "mqtt.mqueue.store_qos0", "emqx.mqueue", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqx.mqueue", fun(Conf) -> - Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)}, - {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)}, - {low_watermark, cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf)}, - {high_watermark, cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf)}, - {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}], - case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of - undefined -> Opts; - V -> [{priority, - [begin [T, P] = string:tokens(S, "="), - {T, list_to_integer(P)} - end || S <- string:tokens(V, ",")]} | Opts] - end -end}. - -%%-------------------------------------------------------------------- -%% MQTT Broker -%%-------------------------------------------------------------------- - -{mapping, "mqtt.broker.sys_interval", "emqx.broker_sys_interval", [ - {datatype, {duration, ms}}, - {default, "1m"} -]}. - -%%-------------------------------------------------------------------- -%% MQTT PubSub -%%-------------------------------------------------------------------- - -{mapping, "mqtt.pubsub.pool_size", "emqx.pubsub", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "mqtt.pubsub.async", "emqx.pubsub", [ - {default, true}, - {datatype, {enum, [true, false]}} -]}. - -{translation, "emqx.pubsub", fun(Conf) -> - [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)}, - {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] -end}. - -%%-------------------------------------------------------------------- -%% MQTT Bridge -%%-------------------------------------------------------------------- - -{mapping, "mqtt.bridge.max_queue_len", "emqx.bridge", [ - {default, 10000}, - {datatype, integer} -]}. - -{mapping, "mqtt.bridge.ping_down_interval", "emqx.bridge", [ - {datatype, {duration, ms}}, - {default, "1s"} -]}. - -{translation, "emqx.bridge", fun(Conf) -> - [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)}, - {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}] -end}. - -%%------------------------------------------------------------------- -%% Plugins -%%------------------------------------------------------------------- - -{mapping, "mqtt.plugins.etc_dir", "emqx.plugins_etc_dir", [ - {datatype, string} -]}. - -{mapping, "mqtt.plugins.loaded_file", "emqx.plugins_loaded_file", [ - {datatype, string} -]}. - -{mapping, "mqtt.plugins.expand_plugins_dir", "emqx.expand_plugins_dir", [ - {datatype, string} -]}. - -%%-------------------------------------------------------------------- -%% Modules -%%-------------------------------------------------------------------- - -{mapping, "module.presence", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "module.presence.qos", "emqx.modules", [ - {default, 1}, - {datatype, integer}, - {validators, ["range:0-2"]} -]}. - -{mapping, "module.subscription", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "module.subscription.$id.topic", "emqx.modules", [ - {datatype, string} -]}. - -{mapping, "module.subscription.$id.qos", "emqx.modules", [ - {default, 1}, - {datatype, integer}, - {validators, ["range:0-2"]} -]}. - -{mapping, "module.rewrite", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "module.rewrite.rule.$id", "emqx.modules", [ - {datatype, string} -]}. - -{translation, "emqx.modules", fun(Conf) -> - Subscriptions = fun() -> - List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), - QosList = [Qos || {_, Qos} <- lists:sort([{I, Qos} || {[_,"subscription", I,"qos"], Qos} <- List])], - TopicList = [iolist_to_binary(Topic) || {_, Topic} <- - lists:sort([{I, Topic} || {[_,"subscription", I, "topic"], Topic} <- List])], - lists:zip(TopicList, QosList) - end, - Rewrites = fun() -> - Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf), - lists:map(fun({[_, "rewrite", "rule", I], Rule}) -> - [Topic, Re, Dest] = string:tokens(Rule, " "), - {rewrite, list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} - end, Rules) - end, - lists:append([ - case cuttlefish:conf_get("module.presence", Conf) of %% Presence - true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}]; - false -> [] - end, - case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription - true -> [{emqx_mod_subscription, Subscriptions()}]; - false -> [] - end, - case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite - true -> [{emqx_mod_rewrite, Rewrites()}]; - false -> [] - end - ]) -end}. - - %%-------------------------------------------------------------------- %% Listeners %%-------------------------------------------------------------------- @@ -928,7 +599,7 @@ end}. {datatype, integer} ]}. -{mapping, "listener.tcp.$name.max_clients", "emqx.listeners", [ +{mapping, "listener.tcp.$name.max_connections", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. @@ -963,7 +634,8 @@ end}. ]}. {mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [ - {datatype, {enum, [cn, dn]}} + {default, false}, + {datatype, {enum, [true, false]}} ]}. {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [ @@ -1023,7 +695,7 @@ end}. {datatype, integer} ]}. -{mapping, "listener.ssl.$name.max_clients", "emqx.listeners", [ +{mapping, "listener.ssl.$name.max_connections", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. @@ -1168,7 +840,7 @@ end}. {datatype, integer} ]}. -{mapping, "listener.ws.$name.max_clients", "emqx.listeners", [ +{mapping, "listener.ws.$name.max_connections", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. @@ -1185,10 +857,20 @@ end}. {datatype, string} ]}. +{mapping, "listener.ws.$name.rate_limit", "emqx.listeners", [ + {default, undefined}, + {datatype, string} +]}. + {mapping, "listener.ws.$name.access.$id", "emqx.listeners", [ {datatype, string} ]}. +{mapping, "listener.ws.$name.verify_protocol_header", "emqx.listeners", [ + {default, on}, + {datatype, flag} +]}. + {mapping, "listener.ws.$name.proxy_address_header", "emqx.listeners", [ {datatype, string}, hidden @@ -1264,7 +946,7 @@ end}. {datatype, integer} ]}. -{mapping, "listener.wss.$name.max_clients", "emqx.listeners", [ +{mapping, "listener.wss.$name.max_connections", "emqx.listeners", [ {default, 1024}, {datatype, integer} ]}. @@ -1285,6 +967,11 @@ end}. {datatype, string} ]}. +{mapping, "listener.wss.$name.verify_protocol_header", "emqx.listeners", [ + {default, on}, + {datatype, flag} +]}. + {mapping, "listener.wss.$name.access.$id", "emqx.listeners", [ {datatype, string} ]}. @@ -1422,16 +1109,23 @@ end}. MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, + Ratelimit = fun(undefined) -> + undefined; + (S) -> + list_to_tuple([list_to_integer(Token) || Token <- string:tokens(S, ",")]) + end, + LisOpts = fun(Prefix) -> Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, - {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, + {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, - %%{rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, + {rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))}, {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, + {verify_protocol_header, cuttlefish:conf_get(Prefix ++ ".verify_protocol_header", Conf, undefined)}, {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)]) @@ -1488,124 +1182,406 @@ end}. end end, - ApiListeners = fun(Type, Name) -> - Prefix = string:join(["listener", Type, Name], "."), - case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - SslOpts1 = case SslOpts(Prefix) of [] -> []; SslOpts0 -> [{ssl_options, SslOpts0}] end, - [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}|LisOpts(Prefix)] ++ SslOpts1}] - end - end, - - lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] ++ [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) - ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)] - ++ - [ApiListeners(Type, Name) || {["listener", Type, Name], ListenOn} - <- cuttlefish_variable:filter_by_prefix("listener.api", Conf)]) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) end}. %%-------------------------------------------------------------------- -%% MQTT REST API Listeners +%% Zones +%%-------------------------------------------------------------------- -{mapping, "listener.api.$name", "emqx.listeners", [ - {datatype, [integer, ip]} -]}. - -{mapping, "listener.api.$name.acceptors", "emqx.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.max_clients", "emqx.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.rate_limit", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.access.$id", "emqx.listeners", [ - {datatype, string} -]}. - -{mapping, "listener.api.$name.backlog", "emqx.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "listener.api.$name.send_timeout", "emqx.listeners", [ - {datatype, {duration, ms}}, - {default, "15s"} -]}. - -{mapping, "listener.api.$name.send_timeout_close", "emqx.listeners", [ - {datatype, flag}, - {default, on} -]}. - -{mapping, "listener.api.$name.recbuf", "emqx.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.sndbuf", "emqx.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.buffer", "emqx.listeners", [ - {datatype, bytesize}, - hidden -]}. - -{mapping, "listener.api.$name.tune_buffer", "emqx.listeners", [ - {datatype, flag}, - hidden -]}. - -{mapping, "listener.api.$name.nodelay", "emqx.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.api.$name.reuseaddr", "emqx.listeners", [ - {datatype, {enum, [true, false]}}, - hidden -]}. - -{mapping, "listener.api.$name.handshake_timeout", "emqx.listeners", [ +%% @doc Idle timeout of the MQTT connection. +{mapping, "zone.$name.idle_timeout", "emqx.zones", [ + {default, "15s"}, {datatype, {duration, ms}} ]}. -{mapping, "listener.api.$name.keyfile", "emqx.listeners", [ +%% @doc Enable ACL check. +{mapping, "zone.$name.enable_acl", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + +%% @doc Enable per connection statistics. +{mapping, "zone.$name.enable_stats", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + +%% @doc Publish limit of the MQTT connections. +{mapping, "zone.$name.publish_limit", "emqx.zones", [ + {default, undefined}, {datatype, string} ]}. -{mapping, "listener.api.$name.certfile", "emqx.listeners", [ - {datatype, string} +%% @doc Max Packet Size Allowed, 64K by default. +{mapping, "zone.$name.max_packet_size", "emqx.zones", [ + {default, "64KB"}, + {datatype, bytesize} ]}. -{mapping, "listener.api.$name.cacertfile", "emqx.listeners", [ - {datatype, string} +%% @doc Set the Max ClientId Length Allowed. +{mapping, "zone.$name.max_clientid_len", "emqx.zones", [ + {default, 65535}, + {datatype, integer} ]}. -{mapping, "listener.api.$name.verify", "emqx.listeners", [ - {datatype, atom} +%% @doc Set the Maximum topic levels. +{mapping, "zone.$name.max_topic_levels", "emqx.zones", [ + {default, 0}, + {datatype, integer} ]}. -{mapping, "listener.api.$name.fail_if_no_peer_cert", "emqx.listeners", [ +%% @doc Set the Maximum QoS allowed. +{mapping, "zone.$name.max_qos_allowed", "emqx.zones", [ + {default, 2}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +%% @doc Set the Maximum topic alias. +{mapping, "zone.$name.max_topic_alias", "emqx.zones", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Whether the server supports retained messages. +{mapping, "zone.$name.retain_available", "emqx.zones", [ + {default, true}, {datatype, {enum, [true, false]}} ]}. +%% @doc Whether the Server supports Wildcard Subscriptions. +{mapping, "zone.$name.wildcard_subscription", "emqx.zones", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Whether the Server supports Shared Subscriptions. +{mapping, "zone.$name.shared_subscription", "emqx.zones", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Keepalive backoff +{mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ + {default, 0.75}, + {datatype, float} +]}. + +%% @doc Max Number of Subscriptions Allowed. +{mapping, "zone.$name.max_subscriptions", "emqx.zones", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Upgrade QoS according to subscription? +{mapping, "zone.$name.upgrade_qos", "emqx.zones", [ + {default, off}, + {datatype, flag} +]}. + +%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. +%% 0 means no limit +{mapping, "zone.$name.max_inflight", "emqx.zones", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Retry interval for redelivering QoS1/2 messages. +{mapping, "zone.$name.retry_interval", "emqx.zones", [ + {default, "20s"}, + {datatype, {duration, ms}} +]}. + +%% @doc Max Packets that Awaiting PUBREL, 0 means no limit +{mapping, "zone.$name.max_awaiting_rel", "emqx.zones", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Awaiting PUBREL timeout +{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [ + {default, "60s"}, + {datatype, {duration, ms}} +]}. + +%% @doc Ignore message from self publish +{mapping, "zone.$name.ignore_loop_deliver", "emqx.zones", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Session Expiry Interval +{mapping, "zone.$name.session_expiry_interval", "emqx.zones", [ + {default, "2h"}, + {datatype, {duration, ms}} +]}. + +%% @doc Max queue length. Enqueued messages when persistent client +%% disconnected, or inflight window is full. 0 means no limit. +{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Queue Qos0 messages? +{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqx.zones", fun(Conf) -> + maps:to_list( + lists:foldl( + fun({["zone", Name, Opt], Val}, Acc) -> + ZName = list_to_atom(Name), + case maps:find(ZName, Acc) of + {ok, Opts} -> + maps:put(ZName, [{list_to_atom(Opt), Val} | Opts], Acc); + error -> + maps:put(ZName, [{list_to_atom(Opt), Val}], Acc) + end + end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("zone.", Conf)))) +end}. + +%%-------------------------------------------------------------------- +%% Bridges +%%-------------------------------------------------------------------- + +{mapping, "bridge.$name.type", "emqx.bridges", [ + {default, local}, + {datatype, {enum, [local,remote]}} +]}. + +{mapping, "bridge.$name.address", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.proto_ver", "emqx.bridges", [ + {datatype, {enum, [mqtt3, mqtt4, mqtt5]}} +]}. + +{mapping, "bridge.$name.client_id", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.clean_start", "emqx.bridges", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "bridge.$name.username", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.password", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.mountpoint", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.cacertfile", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.certfile", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.keyfile", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.ciphers", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.max_pending_messages", "emqx.bridges", [ + {default, 10000}, + {datatype, integer} +]}. + +{mapping, "bridge.$name.keepalive", "emqx.bridges", [ + {default, "10s"}, + {datatype, {duration, s}} +]}. + +{mapping, "bridge.$name.tls_versions", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.subscription.$id.topic", "emqx.bridges", [ + {datatype, string} +]}. + +{mapping, "bridge.$name.subscription.$id.qos", "emqx.bridges", [ + {datatype, integer} +]}. + +{translation, "emqx.bridges", fun(Conf) -> + + Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, + + IsSsl = fun(cacertfile) -> true; + (certfile) -> true; + (keyfile) -> true; + (ciphers) -> true; + (tls_versions) -> true; + (_Opt) -> false + end, + + Parse = fun(tls_versions, Vers) -> + {versions, [list_to_atom(S) || S <- Split(Vers)]}; + (ciphers, Ciphers) -> + {ciphers, Split(Ciphers)}; + (Opt, Val) -> + {Opt, Val} + end, + + Merge = fun(Opt, Val, Opts) -> + case IsSsl(Opt) of + true -> + SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])], + lists:ukeymerge(1, [{ssl_opts, SslOpts}], Opts); + false -> + [{Opt, Val}|Opts] + end + end, + Subscriptions = fun(Name) -> + Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf), + lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], + [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])]) + end, + + maps:to_list( + lists:foldl( + fun({["bridge", Name, Opt], Val}, Acc) -> + maps:update_with(list_to_atom(Name), + fun(Opts) -> + Merge(list_to_atom(Opt), Val, Opts) + end, [{subscriptions, Subscriptions(Name)}], Acc); + (_, Acc) -> Acc + end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf)))) +end}. + +%%-------------------------------------------------------------------- +%% Modules +%%-------------------------------------------------------------------- + +{mapping, "module.presence", "emqx.modules", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "module.presence.qos", "emqx.modules", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "module.subscription", "emqx.modules", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "module.subscription.$id.topic", "emqx.modules", [ + {datatype, string} +]}. + +{mapping, "module.subscription.$id.qos", "emqx.modules", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "module.rewrite", "emqx.modules", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "module.rewrite.rule.$id", "emqx.modules", [ + {datatype, string} +]}. + +{translation, "emqx.modules", fun(Conf) -> + Subscriptions = fun() -> + List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), + QosList = [Qos || {_, Qos} <- lists:sort([{I, Qos} || {[_,"subscription", I,"qos"], Qos} <- List])], + TopicList = [iolist_to_binary(Topic) || {_, Topic} <- + lists:sort([{I, Topic} || {[_,"subscription", I, "topic"], Topic} <- List])], + lists:zip(TopicList, QosList) + end, + Rewrites = fun() -> + Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf), + lists:map(fun({[_, "rewrite", "rule", I], Rule}) -> + [Topic, Re, Dest] = string:tokens(Rule, " "), + {rewrite, list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} + end, Rules) + end, + lists:append([ + case cuttlefish:conf_get("module.presence", Conf) of %% Presence + true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}]; + false -> [] + end, + case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription + true -> [{emqx_mod_subscription, Subscriptions()}]; + false -> [] + end, + case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite + true -> [{emqx_mod_rewrite, Rewrites()}]; + false -> [] + end + ]) +end}. + +%%------------------------------------------------------------------- +%% Plugins +%%------------------------------------------------------------------- + +{mapping, "plugins.etc_dir", "emqx.plugins_etc_dir", [ + {datatype, string} +]}. + +{mapping, "plugins.loaded_file", "emqx.plugins_loaded_file", [ + {datatype, string} +]}. + +{mapping, "plugins.expand_plugins_dir", "emqx.expand_plugins_dir", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% Broker +%%-------------------------------------------------------------------- + +{mapping, "broker.sys_interval", "emqx.broker_sys_interval", [ + {datatype, {duration, ms}}, + {default, "1m"} +]}. + +{mapping, "broker.session_locking_strategy", "emqx.session_locking_strategy", [ + {default, quorum}, + {datatype, {enum, [local,one,quorum,all]}} +]}. + +{mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ + {default, random}, + {datatype, {enum, [random, round_robbin, hash]}} +]}. + +{mapping, "broker.route_batch_clean", "emqx.route_batch_clean", [ + {default, on}, + {datatype, flag} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- @@ -1642,10 +1618,10 @@ end}. ]}. {translation, "emqx.sysmon", fun(Conf) -> - [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, - {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, - {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, - {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, - {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] + [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, + {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, + {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, + {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, + {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}.