From c5f0091b5dd3315d6e23aed89b88c20ce561168d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 24 Aug 2021 18:48:25 +0800 Subject: [PATCH] refactor(config): rework - config struct for zones and listeners ``` listeners.tcp.default { bind = "0.0.0.0:1883" acceptors = 16 max_connections = 1024000 access_rules = [ "allow all" ] proxy_protocol = false proxy_protocol_timeout = 3s mountpoint = "" tcp.backlog = 1024 tcp.buffer = 4KB } listeners.ssl.default { bind = "0.0.0.0:8883" acceptors = 16 max_connections = 512000 access_rules = [ "allow all" ] proxy_protocol = false proxy_protocol_timeout = 3s mountpoint = "" ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] ssl.keyfile = "etc/certs/key.pem" ssl.certfile = "etc/certs/cert.pem" ssl.cacertfile = "etc/certs/cacert.pem" tcp.backlog = 1024 tcp.buffer = 4KB } listeners.quic.default { bind = "0.0.0.0:14567" acceptors = 16 max_connections = 1024000 keyfile = "etc/certs/key.pem" certfile = "etc/certs/cert.pem" mountpoint = "" } listeners.ws.default { bind = "0.0.0.0:8083" acceptors = 16 max_connections = 1024000 access_rules = [ "allow all" ] proxy_protocol = false proxy_protocol_timeout = 3s mountpoint = "" tcp.backlog = 1024 tcp.buffer = 4KB websocket.idle_timeout = 86400s } listeners.wss.default { bind = "0.0.0.0:8084" acceptors = 16 max_connections = 512000 access_rules = [ "allow all" ] proxy_protocol = false proxy_protocol_timeout = 3s mountpoint = "" ssl.keyfile = "etc/certs/key.pem" ssl.certfile = "etc/certs/cert.pem" ssl.cacertfile = "etc/certs/cacert.pem" tcp.backlog = 1024 tcp.buffer = 4KB websocket.idle_timeout = 86400s } ``` ``` zones.default { } ``` --- apps/emqx/etc/emqx.conf | 2069 ++++++++--------- apps/emqx/src/emqx_config.erl | 36 +- apps/emqx/src/emqx_listeners.erl | 203 +- apps/emqx/src/emqx_schema.erl | 63 +- .../src/emqx_mgmt_api_listeners.erl | 4 +- 5 files changed, 1145 insertions(+), 1230 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 50ddbfcde..251f2d0a5 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1,3 +1,991 @@ +##================================================================== +## Listeners +##================================================================== +## MQTT/TCP - TCP Listeners for MQTT Protocol +## syntax: listeners.tcp. +## example: listeners.tcp.my_tcp_listener +listeners.tcp.default { + ## The IP address and port that the listener will bind. + ## + ## @doc listeners.tcp..bind + ## ValueType: IPAddress | Port | IPAddrPort + ## Required: true + ## Examples: 1883, 127.0.0.1:1883, ::1:1883 + bind = "0.0.0.0:1883" + + ## The configuration zone this listener is using. + ## If not set, the global configs are used for this listener. + ## + ## See `zones.` for more details. + ## + ## @doc listeners.tcp..zone + ## ValueType: String + ## Required: false + #zone = default + + ## The size of the acceptor pool for this listener. + ## + ## @doc listeners.tcp..acceptors + ## ValueType: Number + ## Default: 16 + acceptors = 16 + + ## Maximum number of concurrent connections. + ## + ## @doc listeners.tcp..max_connections + ## ValueType: Number | infinity + ## Default: infinity + max_connections = 1024000 + + ## The access control rules for this listener. + ## + ## See: https://github.com/emqtt/esockd#allowdeny + ## + ## @doc listeners.tcp..access_rules + ## ValueType: Array + ## Default: [] + ## Examples: + ## access_rules: [ + ## "deny 192.168.0.0/24", + ## "all all" + ## ] + access_rules = [ + "allow all" + ] + + ## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed + ## behind HAProxy or Nginx. + ## + ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ + ## + ## @doc listeners.tcp..proxy_protocol + ## ValueType: Boolean + ## Default: false + proxy_protocol = false + + ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection + ## if no proxy protocol packet recevied within the timeout. + ## + ## @doc listeners.tcp..proxy_protocol_timeout + ## ValueType: Duration + ## Default: 3s + proxy_protocol_timeout = 3s + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + ## The prefixed string will be removed from the topic name when the message + ## is delivered to the subscriber. The mountpoint is a way that users can use + ## to implement isolation of message routing between different listeners. + ## + ## For example if a clientA subscribes to "t" with `listeners.tcp..mqtt.mountpoint` + ## set to "some_tenant", then the client accually subscribes to the topic + ## "some_tenant/t". Similarly if another clientB (connected to the same listener + ## with the clientA) send a message to topic "t", the message is accually route + ## to all the clients subscribed "some_tenant/t", so clientA will receive the + ## message, with topic name "t". + ## + ## Set to "" to disable the feature. + ## + ## Variables in mountpoint string: + ## - %c: clientid + ## - %u: username + ## + ## @doc listeners.tcp..mqtt.mountpoint + ## ValueType: String + ## Default: "" + mountpoint = "" + + ## TCP options + ## See ${example_common_tcp_options} for more information + tcp.backlog = 1024 + tcp.buffer = 4KB +} + +## MQTT/SSL - SSL Listeners for MQTT Protocol +## syntax: listeners.ssl. +## example: listeners.ssl.my_ssl_listener +listeners.ssl.default { + ## The IP address and port that the listener will bind. + ## + ## @doc listeners.ssl..bind + ## ValueType: IPAddress | Port | IPAddrPort + ## Required: true + ## Examples: 8883, 127.0.0.1:8883, ::1:8883 + bind = "0.0.0.0:8883" + + ## The configuration zone this listener is using. + ## If not set, the global configs are used for this listener. + ## + ## See `zones.` for more details. + ## + ## @doc listeners.ssl..zone + ## ValueType: String + ## Required: false + #zone = default + + ## The size of the acceptor pool for this listener. + ## + ## @doc listeners.ssl..acceptors + ## ValueType: Number + ## Default: 16 + acceptors = 16 + + ## Maximum number of concurrent connections. + ## + ## @doc listeners.ssl..max_connections + ## ValueType: Number | infinity + ## Default: infinity + max_connections = 512000 + + ## The access control rules for this listener. + ## + ## See: https://github.com/emqtt/esockd#allowdeny + ## + ## @doc listeners.ssl..access_rules + ## ValueType: Array + ## Default: [] + ## Examples: + ## access_rules: [ + ## "deny 192.168.0.0/24", + ## "all all" + ## ] + access_rules = [ + "allow all" + ] + + ## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed + ## behind HAProxy or Nginx. + ## + ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ + ## + ## @doc listeners.ssl..proxy_protocol + ## ValueType: Boolean + ## Default: true + proxy_protocol = false + + ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection + ## if no proxy protocol packet recevied within the timeout. + ## + ## @doc listeners.ssl..proxy_protocol_timeout + ## ValueType: Duration + ## Default: 3s + proxy_protocol_timeout = 3s + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + ## The prefixed string will be removed from the topic name when the message + ## is delivered to the subscriber. The mountpoint is a way that users can use + ## to implement isolation of message routing between different listeners. + ## + ## For example if a clientA subscribes to "t" with `listeners.ssl..mqtt.mountpoint` + ## set to "some_tenant", then the client accually subscribes to the topic + ## "some_tenant/t". Similarly if another clientB (connected to the same listener + ## with the clientA) send a message to topic "t", the message is accually route + ## to all the clients subscribed "some_tenant/t", so clientA will receive the + ## message, with topic name "t". + ## + ## Set to "" to disable the feature. + ## + ## Variables in mountpoint string: + ## - %c: clientid + ## - %u: username + ## + ## @doc listeners.ssl..mqtt.mountpoint + ## ValueType: String + ## Default: "" + mountpoint = "" + + ## SSL options + ## See ${example_common_ssl_options} for more information + ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] + ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem" + ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem" + ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" + + ## TCP options + ## See ${example_common_tcp_options} for more information + tcp.backlog = 1024 + tcp.buffer = 4KB +} + +## MQTT/QUIC - QUIC Listeners for MQTT Protocol +## syntax: listeners.quic. +## example: listeners.quic.my_quic_listener +listeners.quic.default { + ## The IP address and port that the listener will bind. + ## + ## @doc listeners.quic..bind + ## ValueType: IPAddress | Port | IPAddrPort + ## Required: true + ## Examples: 14567, 127.0.0.1:14567, ::1:14567 + bind = "0.0.0.0:14567" + + ## The configuration zone this listener is using. + ## If not set, the global configs are used for this listener. + ## + ## See `zones.` for more details. + ## + ## @doc listeners.quic..zone + ## ValueType: String + ## Required: false + #zone = default + + ## The size of the acceptor pool for this listener. + ## + ## @doc listeners.quic..acceptors + ## ValueType: Number + ## Default: 16 + acceptors = 16 + + ## Maximum number of concurrent connections. + ## + ## @doc listeners.quic..max_connections + ## ValueType: Number | infinity + ## Default: infinity + max_connections = 1024000 + + ## Path to the file containing the user's private PEM-encoded key. + ## + ## @doc listeners.quic..keyfile + ## ValueType: String + ## Default: "{{ platform_etc_dir }}/certs/key.pem" + keyfile = "{{ platform_etc_dir }}/certs/key.pem" + + ## Path to a file containing the user certificate. + ## + ## @doc listeners.quic..certfile + ## ValueType: String + ## Default: "{{ platform_etc_dir }}/certs/cert.pem" + certfile = "{{ platform_etc_dir }}/certs/cert.pem" + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + ## The prefixed string will be removed from the topic name when the message + ## is delivered to the subscriber. The mountpoint is a way that users can use + ## to implement isolation of message routing between different listeners. + ## + ## For example if a clientA subscribes to "t" with `listeners.quic..mqtt.mountpoint` + ## set to "some_tenant", then the client accually subscribes to the topic + ## "some_tenant/t". Similarly if another clientB (connected to the same listener + ## with the clientA) send a message to topic "t", the message is accually route + ## to all the clients subscribed "some_tenant/t", so clientA will receive the + ## message, with topic name "t". + ## + ## Set to "" to disable the feature. + ## + ## Variables in mountpoint string: + ## - %c: clientid + ## - %u: username + ## + ## @doc listeners.quic..mqtt.mountpoint + ## ValueType: String + ## Default: "" + mountpoint = "" +} + +## MQTT/WS - Websocket Listeners for MQTT Protocol +## syntax: listeners.ws. +## example: listeners.ws.my_ws_listener +listeners.ws.default { + ## The IP address and port that the listener will bind. + ## + ## @doc listeners.ws..bind + ## ValueType: IPAddress | Port | IPAddrPort + ## Required: true + ## Examples: 8083, 127.0.0.1:8083, ::1:8083 + bind = "0.0.0.0:8083" + + ## The configuration zone this listener is using. + ## If not set, the global configs are used for this listener. + ## + ## See `zones.` for more details. + ## + ## @doc listeners.ws..zone + ## ValueType: String + ## Required: false + #zone = default + + ## The size of the acceptor pool for this listener. + ## + ## @doc listeners.ws..acceptors + ## ValueType: Number + ## Default: 16 + acceptors = 16 + + ## Maximum number of concurrent connections. + ## + ## @doc listeners.ws..max_connections + ## ValueType: Number | infinity + ## Default: infinity + max_connections = 1024000 + + ## The access control rules for this listener. + ## + ## See: https://github.com/emqtt/esockd#allowdeny + ## + ## @doc listeners.ws..access_rules + ## ValueType: Array + ## Default: [] + ## Examples: + ## access_rules: [ + ## "deny 192.168.0.0/24", + ## "all all" + ## ] + access_rules = [ + "allow all" + ] + + ## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed + ## behind HAProxy or Nginx. + ## + ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ + ## + ## @doc listeners.ws..proxy_protocol + ## ValueType: Boolean + ## Default: true + proxy_protocol = false + + ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection + ## if no proxy protocol packet recevied within the timeout. + ## + ## @doc listeners.ws..proxy_protocol_timeout + ## ValueType: Duration + ## Default: 3s + proxy_protocol_timeout = 3s + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + ## The prefixed string will be removed from the topic name when the message + ## is delivered to the subscriber. The mountpoint is a way that users can use + ## to implement isolation of message routing between different listeners. + ## + ## For example if a clientA subscribes to "t" with `listeners.ws..mqtt.mountpoint` + ## set to "some_tenant", then the client accually subscribes to the topic + ## "some_tenant/t". Similarly if another clientB (connected to the same listener + ## with the clientA) send a message to topic "t", the message is accually route + ## to all the clients subscribed "some_tenant/t", so clientA will receive the + ## message, with topic name "t". + ## + ## Set to "" to disable the feature. + ## + ## Variables in mountpoint string: + ## - %c: clientid + ## - %u: username + ## + ## @doc listeners.ws..mqtt.mountpoint + ## ValueType: String + ## Default: "" + mountpoint = "" + + ## TCP options + ## See ${example_common_tcp_options} for more information + tcp.backlog = 1024 + tcp.buffer = 4KB + + ## Websocket options + ## See ${example_common_websocket_options} for more information + websocket.idle_timeout = 86400s +} + +## MQTT/WSS - WebSocket Secure Listeners for MQTT Protocol +## syntax: listeners.wss. +## example: listeners.wss.my_wss_listener +listeners.wss.default { + ## The IP address and port that the listener will bind. + ## + ## @doc listeners.wss..bind + ## ValueType: IPAddress | Port | IPAddrPort + ## Required: true + ## Examples: 8084, 127.0.0.1:8084, ::1:8084 + bind = "0.0.0.0:8084" + + ## The configuration zone this listener is using. + ## If not set, the global configs are used for this listener. + ## + ## See `zones.` for more details. + ## + ## @doc listeners.wss..zone + ## ValueType: String + ## Required: false + #zone = default + + ## The size of the acceptor pool for this listener. + ## + ## @doc listeners.wss..acceptors + ## ValueType: Number + ## Default: 16 + acceptors = 16 + + ## Maximum number of concurrent connections. + ## + ## @doc listeners.wss..max_connections + ## ValueType: Number | infinity + ## Default: infinity + max_connections = 512000 + + ## The access control rules for this listener. + ## + ## See: https://github.com/emqtt/esockd#allowdeny + ## + ## @doc listeners.wss..access_rules + ## ValueType: Array + ## Default: [] + ## Examples: + ## access_rules: [ + ## "deny 192.168.0.0/24", + ## "all all" + ## ] + access_rules = [ + "allow all" + ] + + ## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed + ## behind HAProxy or Nginx. + ## + ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ + ## + ## @doc listeners.wss..proxy_protocol + ## ValueType: Boolean + ## Default: true + proxy_protocol = false + + ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection + ## if no proxy protocol packet recevied within the timeout. + ## + ## @doc listeners.wss..proxy_protocol_timeout + ## ValueType: Duration + ## Default: 3s + proxy_protocol_timeout = 3s + + ## When publishing or subscribing, prefix all topics with a mountpoint string. + ## The prefixed string will be removed from the topic name when the message + ## is delivered to the subscriber. The mountpoint is a way that users can use + ## to implement isolation of message routing between different listeners. + ## + ## For example if a clientA subscribes to "t" with `listeners.wss..mqtt.mountpoint` + ## set to "some_tenant", then the client accually subscribes to the topic + ## "some_tenant/t". Similarly if another clientB (connected to the same listener + ## with the clientA) send a message to topic "t", the message is accually route + ## to all the clients subscribed "some_tenant/t", so clientA will receive the + ## message, with topic name "t". + ## + ## Set to "" to disable the feature. + ## + ## Variables in mountpoint string: + ## - %c: clientid + ## - %u: username + ## + ## @doc listeners.wss..mqtt.mountpoint + ## ValueType: String + ## Default: "" + mountpoint = "" + + ## SSL options + ## See ${example_common_ssl_options} for more information + ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem" + ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem" + ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" + + ## TCP options + ## See ${example_common_tcp_options} for more information + tcp.backlog = 1024 + tcp.buffer = 4KB + + ## Websocket options + ## See ${example_common_websocket_options} for more information + websocket.idle_timeout = 86400s +} + +## Enable per connection statistics. +## +## @doc stats.enable +## ValueType: Boolean +## Default: true +stats.enable = true + +authorization { + ## Behaviour after not matching a rule. + ## + ## @doc authorization.no_match + ## ValueType: allow | deny + ## Default: allow + no_match: allow + + ## The action when authorization check reject current operation + ## + ## @doc authorization.deny_action + ## ValueType: ignore | disconnect + ## Default: ignore + deny_action: ignore + + ## Whether to enable Authorization cache. + ## + ## If enabled, Authorization roles for each client will be cached in the memory + ## + ## @doc authorization.cache.enable + ## ValueType: Boolean + ## Default: true + cache.enable: true + + ## The maximum count of Authorization entries can be cached for a client. + ## + ## @doc authorization.cache.max_size + ## ValueType: Integer + ## Range: [0, 1048576] + ## Default: 32 + cache.max_size: 32 + + ## The time after which an Authorization cache entry will be deleted + ## + ## @doc authorization.cache.ttl + ## ValueType: Duration + ## Default: 1m + cache.ttl: 1m +} + +mqtt { + ## How long time the MQTT connection will be disconnected if the + ## TCP connection is established but MQTT CONNECT has not been + ## received. + ## + ## @doc mqtt.idle_timeout + ## ValueType: Duration + ## Default: 15s + idle_timeout = 15s + + ## Maximum MQTT packet size allowed. + ## + ## @doc mqtt.max_packet_size + ## ValueType: Bytes + ## Default: 1MB + max_packet_size = 1MB + + ## Maximum length of MQTT clientId allowed. + ## + ## @doc mqtt.max_clientid_len + ## ValueType: Integer + ## Range: [23, 65535] + ## Default: 65535 + max_clientid_len = 65535 + + ## Maximum topic levels allowed. + ## + ## @doc mqtt.max_topic_levels + ## ValueType: Integer + ## Range: [1, 65535] + ## Default: 65535 + max_topic_levels = 65535 + + ## Maximum QoS allowed. + ## + ## @doc mqtt.max_qos_allowed + ## ValueType: 0 | 1 | 2 + ## Default: 2 + max_qos_allowed = 2 + + ## Maximum Topic Alias, 0 means no topic alias supported. + ## + ## @doc mqtt.max_topic_alias + ## ValueType: Integer + ## Range: [0, 65535] + ## Default: 65535 + max_topic_alias = 65535 + + ## Whether the Server supports MQTT retained messages. + ## + ## @doc mqtt.retain_available + ## ValueType: Boolean + ## Default: true + retain_available = true + + ## Whether the Server supports MQTT Wildcard Subscriptions + ## + ## @doc mqtt.wildcard_subscription + ## ValueType: Boolean + ## Default: true + wildcard_subscription = true + + ## Whether the Server supports MQTT Shared Subscriptions. + ## + ## @doc mqtt.shared_subscription + ## ValueType: Boolean + ## Default: true + shared_subscription = true + + ## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) + ## + ## @doc mqtt.ignore_loop_deliver + ## ValueType: Boolean + ## Default: false + ignore_loop_deliver = false + + ## Whether to parse the MQTT frame in strict mode + ## + ## @doc mqtt.strict_mode + ## ValueType: Boolean + ## Default: false + strict_mode = false + + ## Specify the response information returned to the client + ## + ## This feature is disabled if is set to "" + ## + ## @doc mqtt.response_information + ## ValueType: String + ## Default: "" + response_information = "" + + ## Server Keep Alive of MQTT 5.0 + ## + ## @doc mqtt.server_keepalive + ## ValueType: Number | disabled + ## Default: disabled + server_keepalive = disabled + + ## The backoff for MQTT keepalive timeout. The broker will kick a connection out + ## until 'Keepalive * backoff * 2' timeout. + ## + ## @doc mqtt.keepalive_backoff + ## ValueType: Float + ## Range: (0.5, 1] + ## Default: 0.75 + keepalive_backoff = 0.75 + + ## Maximum number of subscriptions allowed. + ## + ## @doc mqtt.max_subscriptions + ## ValueType: Integer | infinity + ## Range: [1, infinity) + ## Default: infinity + max_subscriptions = infinity + + ## Force to upgrade QoS according to subscription. + ## + ## @doc mqtt.upgrade_qos + ## ValueType: Boolean + ## Default: false + upgrade_qos = false + + ## Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked. + ## + ## @doc mqtt.max_inflight + ## ValueType: Integer + ## Range: [1, 65535] + ## Default: 32 + max_inflight = 32 + + ## Retry interval for QoS1/2 message delivering. + ## + ## @doc mqtt.retry_interval + ## ValueType: Duration + ## Default: 30s + retry_interval = 30s + + ## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL. + ## + ## @doc mqtt.max_awaiting_rel + ## ValueType: Integer | infinity + ## Range: [1, infinity) + ## Default: 100 + max_awaiting_rel = 100 + + ## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout. + ## + ## @doc mqtt.await_rel_timeout + ## ValueType: Duration + ## Default: 300s + await_rel_timeout = 300s + + ## Default session expiry interval for MQTT V3.1.1 connections. + ## + ## @doc mqtt.session_expiry_interval + ## ValueType: Duration + ## Default: 2h + session_expiry_interval = 2h + + ## Maximum queue length. Enqueued messages when persistent client disconnected, + ## or inflight window is full. + ## + ## @doc mqtt.max_mqueue_len + ## ValueType: Integer | infinity + ## Range: [0, infinity) + ## Default: 1000 + max_mqueue_len = 1000 + + ## Topic priorities. + ## + ## There's no priority table by default, hence all messages + ## are treated equal. + ## + ## Priority number [1-255] + ## + ## NOTE: comma and equal signs are not allowed for priority topic names + ## NOTE: Messages for topics not in the priority table are treated as + ## either highest or lowest priority depending on the configured + ## value for mqtt.mqueue_default_priority + ## + ## @doc mqtt.mqueue_priorities + ## ValueType: Map | disabled + ## Examples: + ## To configure "topic/1" > "topic/2": + ## mqueue_priorities: {"topic/1": 10, "topic/2": 8} + ## Default: disabled + mqueue_priorities = disabled + + ## Default to highest priority for topics not matching priority table + ## + ## @doc mqtt.mqueue_default_priority + ## ValueType: highest | lowest + ## Default: lowest + mqueue_default_priority = lowest + + ## Whether to enqueue QoS0 messages. + ## + ## @doc mqtt.mqueue_store_qos0 + ## ValueType: Boolean + ## Default: true + mqueue_store_qos0 = true + + ## Whether use username replace client id + ## + ## @doc mqtt.use_username_as_clientid + ## ValueType: Boolean + ## Default: false + use_username_as_clientid = false + + ## Use the CN, DN or CRT field from the client certificate as a username. + ## Only works for SSL connection. + ## + ## @doc mqtt.peer_cert_as_username + ## ValueType: cn | dn | crt | disabled + ## Default: disabled + peer_cert_as_username = disabled + + ## Use the CN, DN or CRT field from the client certificate as a clientid. + ## Only works for SSL connection. + ## + ## @doc mqtt.peer_cert_as_clientid + ## ValueType: cn | dn | crt | disabled + ## Default: disabled + peer_cert_as_clientid = disabled +} + +flapping_detect { + ## Enable Flapping Detection. + ## + ## This config controls the allowed maximum number of CONNECT received + ## from the same clientid in a time frame defined by `window_time`. + ## After the limit is reached, successive CONNECT requests are forbidden + ## (banned) until the end of the time period defined by `ban_time`. + ## + ## @doc flapping_detect.enable + ## ValueType: Boolean + ## Default: true + enable = false + + ## The max disconnect allowed of a MQTT Client in `window_time` + ## + ## @doc flapping_detect.max_count + ## ValueType: Integer + ## Default: 15 + max_count = 15 + + ## The time window for flapping detect + ## + ## @doc flapping_detect.window_time + ## ValueType: Duration + ## Default: 1m + window_time = 1m + + ## How long the clientid will be banned + ## + ## @doc flapping_detect.ban_time + ## ValueType: Duration + ## Default: 5m + ban_time = 5m + +} + +force_shutdown { + ## Enable force_shutdown + ## + ## @doc force_shutdown.enable + ## ValueType: Boolean + ## Default: true + enable = true + + ## Max message queue length + ## @doc force_shutdown.max_message_queue_len + ## ValueType: Integer + ## Range: (0, infinity) + ## Default: 1000 + max_message_queue_len = 1000 + + ## Total heap size + ## + ## @doc force_shutdown.max_heap_size + ## ValueType: Size + ## Default: 32MB + max_heap_size = 32MB +} + +force_gc { + ## Force the MQTT connection process GC after this number of + ## messages or bytes passed through. + ## + ## @doc force_gc.enable + ## ValueType: Boolean + ## Default: true + enable = true + + ## GC the process after how many messages received + ## @doc force_gc.max_message_queue_len + ## ValueType: Integer + ## Range: (0, infinity) + ## Default: 16000 + count = 16000 + + ## GC the process after how much bytes passed through + ## + ## @doc force_gc.bytes + ## ValueType: Size + ## Default: 16MB + bytes = 16MB +} + +conn_congestion { + ## Whether to alarm the congested connections. + ## + ## Sometimes the mqtt connection (usually an MQTT subscriber) may + ## get "congested" because there're too many packets to sent. + ## The socket trys to buffer the packets until the buffer is + ## full. If more packets comes after that, the packets will be + ## "pending" in a queue and we consider the connection is + ## "congested". + ## + ## Enable this to send an alarm when there's any bytes pending in + ## the queue. You could set the `sndbuf` to a larger value if the + ## alarm is triggered too often. + ## + ## The name of the alarm is of format "conn_congestion//". + ## Where the is the client-id of the congested MQTT connection. + ## And the is the username or "unknown_user" of not provided by the client. + ## + ## @doc conn_congestion.enable_alarm + ## ValueType: Boolean + ## Default: true + enable_alarm = true + + ## Won't clear the congested alarm in how long time. + ## The alarm is cleared only when there're no pending bytes in + ## the queue, and also it has been `min_alarm_sustain_duration` + ## time since the last time we considered the connection is "congested". + ## + ## This is to avoid clearing and sending the alarm again too often. + ## + ## @doc conn_congestion.min_alarm_sustain_duration + ## ValueType: Duration + ## Default: 1m + min_alarm_sustain_duration = 1m +} + +rate_limit { + ## Maximum connections per second. + ## + ## @doc zones..max_conn_rate + ## ValueType: Number | infinity + ## Default: 1000 + ## Examples: + ## max_conn_rate: 1000 + max_conn_rate = 1000 + + ## Message limit for the a external MQTT connection. + ## + ## @doc rate_limit.conn_messages_in + ## ValueType: String | infinity + ## Default: infinity + ## Examples: 100 messages per 10 seconds. + ## conn_messages_in: "100,10s" + conn_messages_in = "100,10s" + + ## Limit the rate of receiving packets for a MQTT connection. + ## The rate is counted by bytes of packets per second. + ## + ## The connection won't accept more messages if the messages come + ## faster than the limit. + ## + ## @doc rate_limit.conn_bytes_in + ## ValueType: String | infinity + ## Default: infinity + ## Examples: 100KB incoming per 10 seconds. + ## conn_bytes_in: "100KB,10s" + ## + conn_bytes_in = "100KB,10s" +} + +quota { + ## Messages quota for the each of external MQTT connection. + ## This value consumed by the number of recipient on a message. + ## + ## @doc quota.conn_messages_routing + ## ValueType: String | infinity + ## Default: infinity + ## Examples: 100 messaegs per 1s: + ## quota.conn_messages_routing: "100,1s" + conn_messages_routing = "100,1s" + + ## Messages quota for the all of external MQTT connections. + ## This value consumed by the number of recipient on a message. + ## + ## @doc quota.overall_messages_routing + ## ValueType: String | infinity + ## Default: infinity + ## Examples: 200000 messages per 1s: + ## quota.overall_messages_routing: "200000,1s" + ## + overall_messages_routing = "200000,1s" +} + +##================================================================== +## Zones +##================================================================== +## A zone contains a set of configurations for listeners. +## +## A zone can be used by a listener via `listener...zone`. +## +## The configs defined in zones will override the global configs with the same key. +## +## For example given the following config: +## +## ``` +## a { +## b: 1, c: 1 +## } +## +## zone.my_zone { +## a { +## b:2 +## } +## } +## ``` +## +## The global config "a" is overridden by the configs "a" inside the zone "my_zone". +## If there is a listener uses the zone "my_zone", the value of config "a" will be: +## `{b:2, c: 1}`. +## Note that although the default value of `a.c` is `0`, the global value is used. +## i.e. configs in the zone have no default values. To overridde `a.c` we must configure +## it explicitly in the zone. +## +## All the global configs that can be overridden in zones are: +## - `stats.*` +## - `mqtt.*` +## - `authorization.*` +## - `flapping_detect.*` +## - `force_shutdown.*` +## - `conn_congestion.*` +## +## syntax: zones. +## example: zones.my_zone +zones.default { + +} + ##================================================================== ## Broker ##================================================================== @@ -88,1087 +1076,6 @@ broker { perf.trie_compaction = true } - -authorization { - ## Behaviour after not matching a rule. - ## - ## @doc authorization.no_match - ## ValueType: allow | deny - ## Default: allow - no_match: allow - - ## The action when authorization check reject current operation - ## - ## @doc authorization.deny_action - ## ValueType: ignore | disconnect - ## Default: ignore - deny_action: ignore - - ## Whether to enable Authorization cache. - ## - ## If enabled, Authorization roles for each client will be cached in the memory - ## - ## @doc authorization.cache.enable - ## ValueType: Boolean - ## Default: true - cache.enable: true - - ## The maximum count of Authorization entries can be cached for a client. - ## - ## @doc authorization.cache.max_size - ## ValueType: Integer - ## Range: [0, 1048576] - ## Default: 32 - cache.max_size: 32 - - ## The time after which an Authorization cache entry will be deleted - ## - ## @doc authorization.cache.ttl - ## ValueType: Duration - ## Default: 1m - cache.ttl: 1m -} - - -##================================================================== -## Zones and Listeners -##================================================================== -## A zone contains a set of configurations for listeners. -## -## The configurations defined in zone can be overridden by the ones -## defined in listeners with the same key. -## -## For example given the following config: -## ``` -## -## zone.x { -## a: {b:1, c: 1} -## listeners.y { -## a: {b: 2} -## } -## } -## ``` -## The config "a" in zone "x" is overridden by the configs inside -## the listener "y". So the value of config "a" in listener "y" -## is `a: {b:2, c: 1}`. -## -## All the configs that can be set in zones and be overridden in listenser are: -## - `auth.*` -## - `stats.*` -## - `mqtt.*` -## - `flapping_detect.*` -## - `force_shutdown.*` -## - `conn_congestion.*` -## -## Syntax: zones. {} -zones.default { - ## Enable authentication - ## - ## @doc zones..auth.enable - ## ValueType: Boolean - ## Default: false - auth.enable = false - - ## Enable per connection statistics. - ## - ## @doc zones..stats.enable - ## ValueType: Boolean - ## Default: true - stats.enable = true - - ## Maximum number of concurrent connections in this zone. - ## - ## This value must be larger than the sum of `max_connections` set - ## in the listeners under this zone. - ## - ## @doc zones..overall_max_connections - ## ValueType: Number | infinity - ## Default: infinity - overall_max_connections = infinity - - mqtt { - ## When publishing or subscribing, prefix all topics with a mountpoint string. - ## The prefixed string will be removed from the topic name when the message - ## is delivered to the subscriber. The mountpoint is a way that users can use - ## to implement isolation of message routing between different listeners. - ## - ## For example if a clientA subscribes to "t" with `zones..mqtt.mountpoint` - ## set to "some_tenant", then the client accually subscribes to the topic - ## "some_tenant/t". Similarly if another clientB (connected to the same listener - ## with the clientA) send a message to topic "t", the message is accually route - ## to all the clients subscribed "some_tenant/t", so clientA will receive the - ## message, with topic name "t". - ## - ## Set to "" to disable the feature. - ## - ## Variables in mountpoint string: - ## - %c: clientid - ## - %u: username - ## - ## @doc zones..listeners..mountpoint - ## ValueType: String - ## Default: "" - mountpoint = "" - - ## How long time the MQTT connection will be disconnected if the - ## TCP connection is established but MQTT CONNECT has not been - ## received. - ## - ## @doc zones..mqtt.idle_timeout - ## ValueType: Duration - ## Default: 15s - idle_timeout = 15s - - ## Maximum MQTT packet size allowed. - ## - ## @doc zones..mqtt.max_packet_size - ## ValueType: Bytes - ## Default: 1MB - max_packet_size = 1MB - - ## Maximum length of MQTT clientId allowed. - ## - ## @doc zones..mqtt.max_clientid_len - ## ValueType: Integer - ## Range: [23, 65535] - ## Default: 65535 - max_clientid_len = 65535 - - ## Maximum topic levels allowed. - ## - ## @doc zones..mqtt.max_topic_levels - ## ValueType: Integer - ## Range: [1, 65535] - ## Default: 65535 - max_topic_levels = 65535 - - ## Maximum QoS allowed. - ## - ## @doc zones..mqtt.max_qos_allowed - ## ValueType: 0 | 1 | 2 - ## Default: 2 - max_qos_allowed = 2 - - ## Maximum Topic Alias, 0 means no topic alias supported. - ## - ## @doc zones..mqtt.max_topic_alias - ## ValueType: Integer - ## Range: [0, 65535] - ## Default: 65535 - max_topic_alias = 65535 - - ## Whether the Server supports MQTT retained messages. - ## - ## @doc zones..mqtt.retain_available - ## ValueType: Boolean - ## Default: true - retain_available = true - - ## Whether the Server supports MQTT Wildcard Subscriptions - ## - ## @doc zones..mqtt.wildcard_subscription - ## ValueType: Boolean - ## Default: true - wildcard_subscription = true - - ## Whether the Server supports MQTT Shared Subscriptions. - ## - ## @doc zones..mqtt.shared_subscription - ## ValueType: Boolean - ## Default: true - shared_subscription = true - - ## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) - ## - ## @doc zones..mqtt.ignore_loop_deliver - ## ValueType: Boolean - ## Default: false - ignore_loop_deliver = false - - ## Whether to parse the MQTT frame in strict mode - ## - ## @doc zones..mqtt.strict_mode - ## ValueType: Boolean - ## Default: false - strict_mode = false - - ## Specify the response information returned to the client - ## - ## This feature is disabled if is set to "" - ## - ## @doc zones..mqtt.response_information - ## ValueType: String - ## Default: "" - response_information = "" - - ## Server Keep Alive of MQTT 5.0 - ## - ## @doc zones..mqtt.server_keepalive - ## ValueType: Number | disabled - ## Default: disabled - server_keepalive = disabled - - ## The backoff for MQTT keepalive timeout. The broker will kick a connection out - ## until 'Keepalive * backoff * 2' timeout. - ## - ## @doc zones..mqtt.keepalive_backoff - ## ValueType: Float - ## Range: (0.5, 1] - ## Default: 0.75 - keepalive_backoff = 0.75 - - ## Maximum number of subscriptions allowed. - ## - ## @doc zones..mqtt.max_subscriptions - ## ValueType: Integer | infinity - ## Range: [1, infinity) - ## Default: infinity - max_subscriptions = infinity - - ## Force to upgrade QoS according to subscription. - ## - ## @doc zones..mqtt.upgrade_qos - ## ValueType: Boolean - ## Default: false - upgrade_qos = false - - ## Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked. - ## - ## @doc zones..mqtt.max_inflight - ## ValueType: Integer - ## Range: [1, 65535] - ## Default: 32 - max_inflight = 32 - - ## Retry interval for QoS1/2 message delivering. - ## - ## @doc zones..mqtt.retry_interval - ## ValueType: Duration - ## Default: 30s - retry_interval = 30s - - ## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL. - ## - ## @doc zones..mqtt.max_awaiting_rel - ## ValueType: Integer | infinity - ## Range: [1, infinity) - ## Default: 100 - max_awaiting_rel = 100 - - ## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout. - ## - ## @doc zones..mqtt.await_rel_timeout - ## ValueType: Duration - ## Default: 300s - await_rel_timeout = 300s - - ## Default session expiry interval for MQTT V3.1.1 connections. - ## - ## @doc zones..mqtt.session_expiry_interval - ## ValueType: Duration - ## Default: 2h - session_expiry_interval = 2h - - ## Maximum queue length. Enqueued messages when persistent client disconnected, - ## or inflight window is full. - ## - ## @doc zones..mqtt.max_mqueue_len - ## ValueType: Integer | infinity - ## Range: [0, infinity) - ## Default: 1000 - max_mqueue_len = 1000 - - ## Topic priorities. - ## - ## There's no priority table by default, hence all messages - ## are treated equal. - ## - ## Priority number [1-255] - ## - ## NOTE: comma and equal signs are not allowed for priority topic names - ## NOTE: Messages for topics not in the priority table are treated as - ## either highest or lowest priority depending on the configured - ## value for mqtt.mqueue_default_priority - ## - ## @doc zones..mqtt.mqueue_priorities - ## ValueType: Map | disabled - ## Examples: - ## To configure "topic/1" > "topic/2": - ## mqueue_priorities: {"topic/1": 10, "topic/2": 8} - ## Default: disabled - mqueue_priorities = disabled - - ## Default to highest priority for topics not matching priority table - ## - ## @doc zones..mqtt.mqueue_default_priority - ## ValueType: highest | lowest - ## Default: lowest - mqueue_default_priority = lowest - - ## Whether to enqueue QoS0 messages. - ## - ## @doc zones..mqtt.mqueue_store_qos0 - ## ValueType: Boolean - ## Default: true - mqueue_store_qos0 = true - - ## Whether use username replace client id - ## - ## @doc zones..mqtt.use_username_as_clientid - ## ValueType: Boolean - ## Default: false - use_username_as_clientid = false - - ## Use the CN, DN or CRT field from the client certificate as a username. - ## Only works for SSL connection. - ## - ## @doc zones..mqtt.peer_cert_as_username - ## ValueType: cn | dn | crt | disabled - ## Default: disabled - peer_cert_as_username = disabled - - ## Use the CN, DN or CRT field from the client certificate as a clientid. - ## Only works for SSL connection. - ## - ## @doc zones..mqtt.peer_cert_as_clientid - ## ValueType: cn | dn | crt | disabled - ## Default: disabled - peer_cert_as_clientid = disabled - - } - - flapping_detect { - ## Enable Flapping Detection. - ## - ## This config controls the allowed maximum number of CONNECT received - ## from the same clientid in a time frame defined by `window_time`. - ## After the limit is reached, successive CONNECT requests are forbidden - ## (banned) until the end of the time period defined by `ban_time`. - ## - ## @doc zones..flapping_detect.enable - ## ValueType: Boolean - ## Default: true - enable = false - - ## The max disconnect allowed of a MQTT Client in `window_time` - ## - ## @doc zones..flapping_detect.max_count - ## ValueType: Integer - ## Default: 15 - max_count = 15 - - ## The time window for flapping detect - ## - ## @doc zones..flapping_detect.window_time - ## ValueType: Duration - ## Default: 1m - window_time = 1m - - ## How long the clientid will be banned - ## - ## @doc zones..flapping_detect.ban_time - ## ValueType: Duration - ## Default: 5m - ban_time = 5m - - } - - force_shutdown { - ## Enable force_shutdown - ## - ## @doc zones..force_shutdown.enable - ## ValueType: Boolean - ## Default: true - enable = true - - ## Max message queue length - ## @doc zones..force_shutdown.max_message_queue_len - ## ValueType: Integer - ## Range: (0, infinity) - ## Default: 1000 - max_message_queue_len = 1000 - - ## Total heap size - ## - ## @doc zones..force_shutdown.max_heap_size - ## ValueType: Size - ## Default: 32MB - max_heap_size = 32MB - } - - force_gc { - ## Force the MQTT connection process GC after this number of - ## messages or bytes passed through. - ## - ## @doc zones..force_gc.enable - ## ValueType: Boolean - ## Default: true - enable = true - - ## GC the process after how many messages received - ## @doc zones..force_gc.max_message_queue_len - ## ValueType: Integer - ## Range: (0, infinity) - ## Default: 16000 - count = 16000 - - ## GC the process after how much bytes passed through - ## - ## @doc zones..force_gc.bytes - ## ValueType: Size - ## Default: 16MB - bytes = 16MB - } - - conn_congestion { - ## Whether to alarm the congested connections. - ## - ## Sometimes the mqtt connection (usually an MQTT subscriber) may - ## get "congested" because there're too many packets to sent. - ## The socket trys to buffer the packets until the buffer is - ## full. If more packets comes after that, the packets will be - ## "pending" in a queue and we consider the connection is - ## "congested". - ## - ## Enable this to send an alarm when there's any bytes pending in - ## the queue. You could set the `sndbuf` to a larger value if the - ## alarm is triggered too often. - ## - ## The name of the alarm is of format "conn_congestion//". - ## Where the is the client-id of the congested MQTT connection. - ## And the is the username or "unknown_user" of not provided by the client. - ## - ## @doc zones..conn_congestion.enable_alarm - ## ValueType: Boolean - ## Default: true - enable_alarm = true - - ## Won't clear the congested alarm in how long time. - ## The alarm is cleared only when there're no pending bytes in - ## the queue, and also it has been `min_alarm_sustain_duration` - ## time since the last time we considered the connection is "congested". - ## - ## This is to avoid clearing and sending the alarm again too often. - ## - ## @doc zones..conn_congestion.min_alarm_sustain_duration - ## ValueType: Duration - ## Default: 1m - min_alarm_sustain_duration = 1m - } - - listeners.mqtt_tcp - #${example_common_tcp_options} # common options can be written in a separate config entry and reference it from here. - { - - ## The type of the listener. - ## - ## @doc zones..listeners..type - ## ValueType: tcp | ws - ## - tcp: MQTT over TCP - ## - ws: MQTT over Websocket - ## - quic: MQTT over QUIC - ## Required: true - type = tcp - - ## The IP address and port that the listener will bind. - ## - ## @doc zones..listeners..bind - ## ValueType: IPAddress | Port | IPAddrPort - ## Required: true - ## Examples: 1883, 127.0.0.1:1883, ::1:1883 - bind = "0.0.0.0:1883" - - ## The size of the acceptor pool for this listener. - ## - ## @doc zones..listeners..acceptors - ## ValueType: Number - ## Default: 16 - acceptors = 16 - - ## Maximum number of concurrent connections. - ## - ## @doc zones..listeners..max_connections - ## ValueType: Number | infinity - ## Default: infinity - max_connections = 1024000 - - ## The access control rules for this listener. - ## - ## See: https://github.com/emqtt/esockd#allowdeny - ## - ## @doc zones..listeners..access_rules - ## ValueType: Array - ## Default: [] - ## Examples: - ## access_rules: [ - ## "deny 192.168.0.0/24", - ## "all all" - ## ] - access_rules = [ - "allow all" - ] - - ## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed - ## behind HAProxy or Nginx. - ## - ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ - ## - ## @doc zones..listeners..proxy_protocol - ## ValueType: Boolean - ## Default: false - proxy_protocol = false - - ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection - ## if no proxy protocol packet recevied within the timeout. - ## - ## @doc zones..listeners..proxy_protocol_timeout - ## ValueType: Duration - ## Default: 3s - proxy_protocol_timeout = 3s - - rate_limit { - ## Maximum connections per second. - ## - ## @doc zones..max_conn_rate - ## ValueType: Number | infinity - ## Default: 1000 - ## Examples: - ## max_conn_rate: 1000 - max_conn_rate = 1000 - - ## Message limit for the a external MQTT connection. - ## - ## @doc zones..rate_limit.conn_messages_in - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messages per 10 seconds. - ## conn_messages_in: "100,10s" - conn_messages_in = "100,10s" - - ## Limit the rate of receiving packets for a MQTT connection. - ## The rate is counted by bytes of packets per second. - ## - ## The connection won't accept more messages if the messages come - ## faster than the limit. - ## - ## @doc zones..rate_limit.conn_bytes_in - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100KB incoming per 10 seconds. - ## conn_bytes_in: "100KB,10s" - ## - conn_bytes_in = "100KB,10s" - - ## Messages quota for the each of external MQTT connection. - ## This value consumed by the number of recipient on a message. - ## - ## @doc zones..rate_limit.quota.conn_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messaegs per 1s: - ## quota.conn_messages_routing: "100,1s" - quota.conn_messages_routing = "100,1s" - - ## Messages quota for the all of external MQTT connections. - ## This value consumed by the number of recipient on a message. - ## - ## @doc zones..rate_limit.quota.overall_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 200000 messages per 1s: - ## quota.overall_messages_routing: "200000,1s" - ## - quota.overall_messages_routing = "200000,1s" - } - - ## TCP options - ## See ${example_common_tcp_options} for more information - tcp.backlog = 1024 - tcp.buffer = 4KB - } - - ## MQTT/SSL - SSL Listener for MQTT Protocol - listeners.mqtt_ssl - #${example_common_tcp_options} ${example_common_ssl_options} # common options can be written in a separate config entry and reference it from here. - { - - ## The type of the listener. - ## - ## @doc zones..listeners..type - ## ValueType: tcp | ws - ## - tcp: MQTT over TCP - ## - ws: MQTT over Websocket - ## - quic: MQTT over QUIC - ## Required: true - type = tcp - - ## The IP address and port that the listener will bind. - ## - ## @doc zones..listeners..bind - ## ValueType: IPAddress | Port | IPAddrPort - ## Required: true - ## Examples: 8883, 127.0.0.1:8883, ::1:8883 - bind = "0.0.0.0:8883" - - ## The size of the acceptor pool for this listener. - ## - ## @doc zones..listeners..acceptors - ## ValueType: Number - ## Default: 16 - acceptors = 16 - - ## Maximum number of concurrent connections. - ## - ## @doc zones..listeners..max_connections - ## ValueType: Number | infinity - ## Default: infinity - max_connections = 512000 - - ## The access control rules for this listener. - ## - ## See: https://github.com/emqtt/esockd#allowdeny - ## - ## @doc zones..listeners..access_rules - ## ValueType: Array - ## Default: [] - ## Examples: - ## access_rules: [ - ## "deny 192.168.0.0/24", - ## "all all" - ## ] - access_rules = [ - "allow all" - ] - - ## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed - ## behind HAProxy or Nginx. - ## - ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ - ## - ## @doc zones..listeners..proxy_protocol - ## ValueType: Boolean - ## Default: true - proxy_protocol = false - - ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection - ## if no proxy protocol packet recevied within the timeout. - ## - ## @doc zones..listeners..proxy_protocol_timeout - ## ValueType: Duration - ## Default: 3s - proxy_protocol_timeout = 3s - - rate_limit { - ## Maximum connections per second. - ## - ## @doc zones..max_conn_rate - ## ValueType: Number | infinity - ## Default: 1000 - ## Examples: - ## max_conn_rate: 1000 - max_conn_rate = 1000 - - ## Message limit for the a external MQTT connection. - ## - ## @doc zones..rate_limit.conn_messages_in - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messages per 10 seconds. - ## conn_messages_in: "100,10s" - conn_messages_in = "100,10s" - - ## Limit the rate of receiving packets for a MQTT connection. - ## The rate is counted by bytes of packets per second. - ## - ## The connection won't accept more messages if the messages come - ## faster than the limit. - ## - ## @doc zones..rate_limit.conn_bytes_in - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100KB incoming per 10 seconds. - ## conn_bytes_in: "100KB,10s" - ## - conn_bytes_in = "100KB,10s" - - ## Messages quota for the each of external MQTT connection. - ## This value consumed by the number of recipient on a message. - ## - ## @doc zones..rate_limit.quota.conn_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messaegs per 1s: - ## quota.conn_messages_routing: "100,1s" - quota.conn_messages_routing = "100,1s" - - ## Messages quota for the all of external MQTT connections. - ## This value consumed by the number of recipient on a message. - ## - ## @doc zones..rate_limit.quota.overall_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 200000 messages per 1s: - ## quota.overall_messages_routing: "200000,1s" - ## - quota.overall_messages_routing = "200000,1s" - } - - ## SSL options - ## See ${example_common_ssl_options} for more information - ssl.enable = true - ssl.versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] - ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem" - ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem" - ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" - - ## TCP options - ## See ${example_common_tcp_options} for more information - tcp.backlog = 1024 - tcp.buffer = 4KB - } - - listeners.mqtt_quic - { - ## The type of the listener. - ## - ## @doc zones..listeners..type - ## ValueType: tcp | ws - ## - tcp: MQTT over TCP - ## - ws: MQTT over Websocket - ## - quic: MQTT over QUIC - ## Required: true - type = quic - - ## The IP address and port that the listener will bind. - ## - ## @doc zones..listeners..bind - ## ValueType: IPAddress | Port | IPAddrPort - ## Required: true - ## Examples: 14567, 127.0.0.1:14567, ::1:14567 - bind = "0.0.0.0:14567" - - ## The size of the acceptor pool for this listener. - ## - ## @doc zones..listeners..acceptors - ## ValueType: Number - ## Default: 16 - acceptors = 16 - - ## Maximum number of concurrent connections. - ## - ## @doc zones..listeners..max_connections - ## ValueType: Number | infinity - ## Default: infinity - max_connections = 1024000 - - ## Path to the file containing the user's private PEM-encoded key. - ## - ## @doc zones..listeners..keyfile - ## ValueType: String - ## Default: "{{ platform_etc_dir }}/certs/key.pem" - keyfile = "{{ platform_etc_dir }}/certs/key.pem" - - ## Path to a file containing the user certificate. - ## - ## @doc zones..listeners..certfile - ## ValueType: String - ## Default: "{{ platform_etc_dir }}/certs/cert.pem" - certfile = "{{ platform_etc_dir }}/certs/cert.pem" - } - - listeners.mqtt_ws - #${example_common_tcp_options} ${example_common_websocket_options} # common options can be written in a separate config entry and reference it from here. - { - - ## The type of the listener. - ## - ## @doc zones..listeners..type - ## ValueType: tcp | ws - ## - tcp: MQTT over TCP - ## - ws: MQTT over Websocket - ## - quic: MQTT over QUIC - ## Required: true - type = ws - - ## The IP address and port that the listener will bind. - ## - ## @doc zones..listeners..bind - ## ValueType: IPAddress | Port | IPAddrPort - ## Required: true - ## Examples: 8083, 127.0.0.1:8083, ::1:8083 - bind = "0.0.0.0:8083" - - ## The size of the acceptor pool for this listener. - ## - ## @doc zones..listeners..acceptors - ## ValueType: Number - ## Default: 16 - acceptors = 16 - - ## Maximum number of concurrent connections. - ## - ## @doc zones..listeners..max_connections - ## ValueType: Number | infinity - ## Default: infinity - max_connections = 1024000 - - ## The access control rules for this listener. - ## - ## See: https://github.com/emqtt/esockd#allowdeny - ## - ## @doc zones..listeners..access_rules - ## ValueType: Array - ## Default: [] - ## Examples: - ## access_rules: [ - ## "deny 192.168.0.0/24", - ## "all all" - ## ] - access_rules = [ - "allow all" - ] - - ## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed - ## behind HAProxy or Nginx. - ## - ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ - ## - ## @doc zones..listeners..proxy_protocol - ## ValueType: Boolean - ## Default: true - proxy_protocol = false - - ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection - ## if no proxy protocol packet recevied within the timeout. - ## - ## @doc zones..listeners..proxy_protocol_timeout - ## ValueType: Duration - ## Default: 3s - proxy_protocol_timeout = 3s - - rate_limit { - ## Maximum connections per second. - ## - ## @doc zones..max_conn_rate - ## ValueType: Number | infinity - ## Default: 1000 - ## Examples: - ## max_conn_rate: 1000 - max_conn_rate = 1000 - - ## Message limit for the a external MQTT connection. - ## - ## @doc zones..rate_limit.conn_messages_in - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messages per 10 seconds. - ## conn_messages_in: "100,10s" - conn_messages_in = "100,10s" - - ## Limit the rate of receiving packets for a MQTT connection. - ## The rate is counted by bytes of packets per second. - ## - ## The connection won't accept more messages if the messages come - ## faster than the limit. - ## - ## @doc zones..rate_limit.conn_bytes_in - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100KB incoming per 10 seconds. - ## conn_bytes_in: "100KB,10s" - ## - conn_bytes_in = "100KB,10s" - - ## Messages quota for the each of external MQTT connection. - ## This value consumed by the number of recipient on a message. - ## - ## @doc zones..rate_limit.quota.conn_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messaegs per 1s: - ## quota.conn_messages_routing: "100,1s" - quota.conn_messages_routing = "100,1s" - - ## Messages quota for the all of external MQTT connections. - ## This value consumed by the number of recipient on a message. - ## - ## @doc zones..rate_limit.quota.overall_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 200000 messages per 1s: - ## quota.overall_messages_routing: "200000,1s" - ## - quota.overall_messages_routing = "200000,1s" - } - - ## TCP options - ## See ${example_common_tcp_options} for more information - tcp.backlog = 1024 - tcp.buffer = 4KB - - ## Websocket options - ## See ${example_common_websocket_options} for more information - websocket.idle_timeout = 86400s - } - - listeners.mqtt_wss - #${example_common_tcp_options} ${example_common_ssl_options} ${example_common_websocket_options} # common options can be written in a separate config entry and reference it from here. - { - - ## The type of the listener. - ## - ## @doc zones..listeners..type - ## ValueType: tcp | ws - ## - tcp: MQTT over TCP - ## - ws: MQTT over Websocket - ## - quic: MQTT over QUIC - ## Required: true - type = ws - - ## The IP address and port that the listener will bind. - ## - ## @doc zones..listeners..bind - ## ValueType: IPAddress | Port | IPAddrPort - ## Required: true - ## Examples: 8084, 127.0.0.1:8084, ::1:8084 - bind = "0.0.0.0:8084" - - ## The size of the acceptor pool for this listener. - ## - ## @doc zones..listeners..acceptors - ## ValueType: Number - ## Default: 16 - acceptors = 16 - - ## Maximum number of concurrent connections. - ## - ## @doc zones..listeners..max_connections - ## ValueType: Number | infinity - ## Default: infinity - max_connections = 512000 - - ## The access control rules for this listener. - ## - ## See: https://github.com/emqtt/esockd#allowdeny - ## - ## @doc zones..listeners..access_rules - ## ValueType: Array - ## Default: [] - ## Examples: - ## access_rules: [ - ## "deny 192.168.0.0/24", - ## "all all" - ## ] - access_rules = [ - "allow all" - ] - - ## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed - ## behind HAProxy or Nginx. - ## - ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ - ## - ## @doc zones..listeners..proxy_protocol - ## ValueType: Boolean - ## Default: true - proxy_protocol = false - - ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection - ## if no proxy protocol packet recevied within the timeout. - ## - ## @doc zones..listeners..proxy_protocol_timeout - ## ValueType: Duration - ## Default: 3s - proxy_protocol_timeout = 3s - - rate_limit { - ## Maximum connections per second. - ## - ## @doc zones..max_conn_rate - ## ValueType: Number | infinity - ## Default: 1000 - ## Examples: - ## max_conn_rate: 1000 - max_conn_rate = 1000 - - ## Message limit for the a external MQTT connection. - ## - ## @doc zones..rate_limit.conn_messages_in - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messages per 10 seconds. - ## conn_messages_in: "100,10s" - conn_messages_in = "100,10s" - - ## Limit the rate of receiving packets for a MQTT connection. - ## The rate is counted by bytes of packets per second. - ## - ## The connection won't accept more messages if the messages come - ## faster than the limit. - ## - ## @doc zones..rate_limit.conn_bytes_in - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100KB incoming per 10 seconds. - ## conn_bytes_in: "100KB,10s" - ## - conn_bytes_in = "100KB,10s" - - ## Messages quota for the each of external MQTT connection. - ## This value consumed by the number of recipient on a message. - ## - ## @doc zones..rate_limit.quota.conn_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 100 messaegs per 1s: - ## quota.conn_messages_routing: "100,1s" - quota.conn_messages_routing = "100,1s" - - ## Messages quota for the all of external MQTT connections. - ## This value consumed by the number of recipient on a message. - ## - ## @doc zones..rate_limit.quota.overall_messages_routing - ## ValueType: String | infinity - ## Default: infinity - ## Examples: 200000 messages per 1s: - ## quota.overall_messages_routing: "200000,1s" - ## - quota.overall_messages_routing = "200000,1s" - } - - ## SSL options - ## See ${example_common_ssl_options} for more information - ssl.enable = true - ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem" - ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem" - ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" - - ## TCP options - ## See ${example_common_tcp_options} for more information - tcp.backlog = 1024 - tcp.buffer = 4KB - - ## Websocket options - ## See ${example_common_websocket_options} for more information - websocket.idle_timeout = 86400s - } - -} - -#This is an example zone which has less "strict" settings. -#It's useful to clients connecting the broker from trusted networks. -zones.internal { - auth.enable = false - listeners.mqtt_internal { - type = tcp - bind = "127.0.0.1:11883" - acceptors = 4 - max_connections = 1024000 - tcp.active_n = 1000 - tcp.backlog = 512 - } -} - ##================================================================== ## System Monitor ##================================================================== diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 101abbd2b..516831600 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -58,7 +58,6 @@ -export([ get_zone_conf/2 , get_zone_conf/3 , put_zone_conf/3 - , find_zone_conf/2 ]). -export([ get_listener_conf/3 @@ -72,7 +71,7 @@ -define(PERSIS_SCHEMA_MODS, {?MODULE, schema_mods}). -define(PERSIS_KEY(TYPE, ROOT), {?MODULE, TYPE, ROOT}). -define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]). --define(LISTENER_CONF_PATH(ZONE, LISTENER, PATH), [zones, ZONE, listeners, LISTENER | PATH]). +-define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]). -define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL), try [atom(Key) || Key <- PATH] of @@ -151,37 +150,40 @@ find_raw(KeyPath) -> -spec get_zone_conf(atom(), emqx_map_lib:config_key_path()) -> term(). get_zone_conf(Zone, KeyPath) -> - ?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath)). + case find(?ZONE_CONF_PATH(Zone, KeyPath)) of + {not_found, _, _} -> %% not found in zones, try to find the global config + ?MODULE:get(KeyPath); + {ok, Value} -> Value + end. -spec get_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> term(). get_zone_conf(Zone, KeyPath, Default) -> - ?MODULE:get(?ZONE_CONF_PATH(Zone, KeyPath), Default). + case find(?ZONE_CONF_PATH(Zone, KeyPath)) of + {not_found, _, _} -> %% not found in zones, try to find the global config + ?MODULE:get(KeyPath, Default); + {ok, Value} -> Value + end. -spec put_zone_conf(atom(), emqx_map_lib:config_key_path(), term()) -> ok. put_zone_conf(Zone, KeyPath, Conf) -> ?MODULE:put(?ZONE_CONF_PATH(Zone, KeyPath), Conf). --spec find_zone_conf(atom(), emqx_map_lib:config_key_path()) -> - {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. -find_zone_conf(Zone, KeyPath) -> - find(?ZONE_CONF_PATH(Zone, KeyPath)). - -spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> term(). -get_listener_conf(Zone, Listener, KeyPath) -> - ?MODULE:get(?LISTENER_CONF_PATH(Zone, Listener, KeyPath)). +get_listener_conf(Type, Listener, KeyPath) -> + ?MODULE:get(?LISTENER_CONF_PATH(Type, Listener, KeyPath)). -spec get_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> term(). -get_listener_conf(Zone, Listener, KeyPath, Default) -> - ?MODULE:get(?LISTENER_CONF_PATH(Zone, Listener, KeyPath), Default). +get_listener_conf(Type, Listener, KeyPath, Default) -> + ?MODULE:get(?LISTENER_CONF_PATH(Type, Listener, KeyPath), Default). -spec put_listener_conf(atom(), atom(), emqx_map_lib:config_key_path(), term()) -> ok. -put_listener_conf(Zone, Listener, KeyPath, Conf) -> - ?MODULE:put(?LISTENER_CONF_PATH(Zone, Listener, KeyPath), Conf). +put_listener_conf(Type, Listener, KeyPath, Conf) -> + ?MODULE:put(?LISTENER_CONF_PATH(Type, Listener, KeyPath), Conf). -spec find_listener_conf(atom(), atom(), emqx_map_lib:config_key_path()) -> {ok, term()} | {not_found, emqx_map_lib:config_key_path(), term()}. -find_listener_conf(Zone, Listener, KeyPath) -> - find(?LISTENER_CONF_PATH(Zone, Listener, KeyPath)). +find_listener_conf(Type, Listener, KeyPath) -> + find(?LISTENER_CONF_PATH(Type, Listener, KeyPath)). -spec put(map()) -> ok. put(Config) -> diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index f39c11305..375a5c990 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -43,18 +43,14 @@ list() -> [{listener_id(ZoneName, LName), LConf} || {ZoneName, LName, LConf} <- do_list()]. do_list() -> - Zones = maps:to_list(emqx:get_config([zones], #{})), - lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]). + Listeners = maps:to_list(emqx:get_config([listeners], #{})), + lists:append([list(Type, maps:to_list(Conf)) || {Type, Conf} <- Listeners]). -list(ZoneName, ZoneConf) -> - Listeners = maps:to_list(maps:get(listeners, ZoneConf, #{})), - [ - begin - Conf = merge_zone_and_listener_confs(ZoneConf, LConf), - Running = is_running(listener_id(ZoneName, LName), Conf), - {ZoneName , LName, maps:put(running, Running, Conf)} - end - || {LName, LConf} <- Listeners, is_map(LConf)]. +list(Type, Conf) -> + [begin + Running = is_running(Type, listener_id(Type, LName), LConf), + {Type, LName, maps:put(running, Running, LConf)} + end || {LName, LConf} <- Conf, is_map(LConf)]. -spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}. is_running(ListenerId) -> @@ -65,7 +61,7 @@ is_running(ListenerId) -> [] -> {error, not_found} end. -is_running(ListenerId, #{type := tcp, bind := ListenOn})-> +is_running(Type, ListenerId, #{bind := ListenOn}) when Type =:= tcp; Type =:= ssl -> try esockd:listener({ListenerId, ListenOn}) of Pid when is_pid(Pid)-> true @@ -73,7 +69,7 @@ is_running(ListenerId, #{type := tcp, bind := ListenOn})-> false end; -is_running(ListenerId, #{type := ws})-> +is_running(Type, ListenerId, _Conf) when Type =:= ws; Type =:= wss -> try Info = ranch:info(ListenerId), proplists:get_value(status, Info) =:= running @@ -81,8 +77,8 @@ is_running(ListenerId, #{type := ws})-> false end; -is_running(_ListenerId, #{type := quic})-> -%% TODO: quic support +is_running(quic, _ListenerId, _Conf)-> + %% TODO: quic support {error, no_found}. %% @doc Start all listeners. @@ -95,23 +91,56 @@ start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). -spec start_listener(atom(), atom(), map()) -> ok | {error, term()}. -start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) -> - case do_start_listener(ZoneName, ListenerName, Conf) of +start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> + case do_start_listener(Type, ListenerName, Conf) of {ok, {skipped, Reason}} when Reason =:= listener_disabled; Reason =:= quic_app_missing -> - console_print("- Skip - starting ~s listener ~s on ~s ~n due to ~p", - [Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]); + console_print("- Skip - starting listener ~s on ~s ~n due to ~p", + [listener_id(Type, ListenerName), format_addr(Bind), Reason]); {ok, _} -> - console_print("Start ~s listener ~s on ~s successfully.~n", - [Type, listener_id(ZoneName, ListenerName), format(Bind)]); + console_print("Start listener ~s on ~s successfully.~n", + [listener_id(Type, ListenerName), format_addr(Bind)]); {error, {already_started, Pid}} -> {error, {already_started, Pid}}; {error, Reason} -> - ?ELOG("Failed to start ~s listener ~s on ~s: ~0p~n", - [Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]), + ?ELOG("Failed to start listener ~s on ~s: ~0p~n", + [listener_id(Type, ListenerName), format_addr(Bind), Reason]), error(Reason) end. +%% @doc Restart all listeners +-spec(restart() -> ok). +restart() -> + foreach_listeners(fun restart_listener/3). + +-spec(restart_listener(atom()) -> ok | {error, term()}). +restart_listener(ListenerId) -> + apply_on_listener(ListenerId, fun restart_listener/3). + +-spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}). +restart_listener(Type, ListenerName, Conf) -> + case stop_listener(Type, ListenerName, Conf) of + ok -> start_listener(Type, ListenerName, Conf); + Error -> Error + end. + +%% @doc Stop all listeners. +-spec(stop() -> ok). +stop() -> + foreach_listeners(fun stop_listener/3). + +-spec(stop_listener(atom()) -> ok | {error, term()}). +stop_listener(ListenerId) -> + apply_on_listener(ListenerId, fun stop_listener/3). + +-spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}). +stop_listener(Type, ListenerName, #{type := tcp, bind := ListenOn}) -> + esockd:close(listener_id(Type, ListenerName), ListenOn); +stop_listener(Type, ListenerName, #{type := ws}) -> + cowboy:stop_listener(listener_id(Type, ListenerName)); +stop_listener(Type, ListenerName, #{type := quic}) -> + quicer:stop_listener(listener_id(Type, ListenerName)). + -ifndef(TEST). console_print(Fmt, Args) -> ?ULOG(Fmt, Args). -else. @@ -121,27 +150,28 @@ console_print(_Fmt, _Args) -> ok. %% Start MQTT/TCP listener -spec(do_start_listener(atom(), atom(), map()) -> {ok, pid() | {skipped, atom()}} | {error, term()}). -do_start_listener(_ZoneName, _ListenerName, #{enabled := false}) -> +do_start_listener(_Type, _ListenerName, #{enabled := false}) -> {ok, {skipped, listener_disabled}}; -do_start_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn} = Opts) -> - esockd:open(listener_id(ZoneName, ListenerName), ListenOn, merge_default(esockd_opts(Opts)), +do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) + when Type == tcp; Type == ssl -> + esockd:open(listener_id(Type, ListenerName), ListenOn, merge_default(esockd_opts(Type, Opts)), {emqx_connection, start_link, - [#{zone => ZoneName, listener => ListenerName}]}); + [#{type => Type, listener => ListenerName, + zone => zone(Opts)}]}); %% Start MQTT/WS listener -do_start_listener(ZoneName, ListenerName, #{type := ws, bind := ListenOn} = Opts) -> - Id = listener_id(ZoneName, ListenerName), - RanchOpts = ranch_opts(ListenOn, Opts), - WsOpts = ws_opts(ZoneName, ListenerName, Opts), - case is_ssl(Opts) of - false -> - cowboy:start_clear(Id, RanchOpts, WsOpts); - true -> - cowboy:start_tls(Id, RanchOpts, WsOpts) +do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) + when Type == ws; Type == wss -> + Id = listener_id(Type, ListenerName), + RanchOpts = ranch_opts(Type, ListenOn, Opts), + WsOpts = ws_opts(Type, ListenerName, Opts), + case Type of + ws -> cowboy:start_clear(Id, RanchOpts, WsOpts); + wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) end; %% Start MQTT/QUIC listener -do_start_listener(ZoneName, ListenerName, #{type := quic, bind := ListenOn} = Opts) -> +do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> case [ A || {quicer, _, _} = A<-application:which_applications() ] of [_] -> %% @fixme unsure why we need reopen lib and reopen config. @@ -152,48 +182,48 @@ do_start_listener(ZoneName, ListenerName, #{type := quic, bind := ListenOn} = Op , {key, maps:get(keyfile, Opts)} , {alpn, ["mqtt"]} , {conn_acceptors, maps:get(acceptors, Opts, DefAcceptors)} - , {idle_timeout_ms, emqx_config:get_zone_conf(ZoneName, [mqtt, idle_timeout])} + , {idle_timeout_ms, emqx_config:get_zone_conf(zone(Opts), + [mqtt, idle_timeout])} ], ConnectionOpts = #{conn_callback => emqx_quic_connection , peer_unidi_stream_count => 1 , peer_bidi_stream_count => 10 - , zone => ZoneName + , zone => zone(Opts) + , type => quic , listener => ListenerName }, StreamOpts = [], - quicer:start_listener(listener_id(ZoneName, ListenerName), + quicer:start_listener(listener_id(quic, ListenerName), port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts}); [] -> {ok, {skipped, quic_app_missing}} end. -esockd_opts(Opts0) -> +esockd_opts(Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Opts2 = case emqx_map_lib:deep_get([rate_limit, max_conn_rate], Opts0) of infinity -> Opts1; Rate -> Opts1#{max_conn_rate => Rate} end, Opts3 = Opts2#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))}, - maps:to_list(case is_ssl(Opts0) of - false -> - Opts3#{tcp_options => tcp_opts(Opts0)}; - true -> - Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)} + maps:to_list(case Type of + tcp -> Opts3#{tcp_options => tcp_opts(Opts0)}; + ssl -> Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)} end). -ws_opts(ZoneName, ListenerName, Opts) -> +ws_opts(Type, ListenerName, Opts) -> WsPaths = [{maps:get(mqtt_path, Opts, "/mqtt"), emqx_ws_connection, - #{zone => ZoneName, listener => ListenerName}}], + #{zone => zone(Opts), type => Type, listener => ListenerName}}], Dispatch = cowboy_router:compile([{'_', WsPaths}]), ProxyProto = maps:get(proxy_protocol, Opts, false), #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}. -ranch_opts(ListenOn, Opts) -> +ranch_opts(Type, ListenOn, Opts) -> NumAcceptors = maps:get(acceptors, Opts, 4), MaxConnections = maps:get(max_connections, Opts, 1024), - SocketOpts = case is_ssl(Opts) of - true -> tcp_opts(Opts) ++ proplists:delete(handshake_timeout, ssl_opts(Opts)); - false -> tcp_opts(Opts) + SocketOpts = case Type of + wss -> tcp_opts(Opts) ++ proplists:delete(handshake_timeout, ssl_opts(Opts)); + ws -> tcp_opts(Opts) end, #{num_acceptors => NumAcceptors, max_connections => MaxConnections, @@ -217,39 +247,6 @@ esockd_access_rules(StrRules) -> end, [Access(R) || R <- StrRules]. -%% @doc Restart all listeners --spec(restart() -> ok). -restart() -> - foreach_listeners(fun restart_listener/3). - --spec(restart_listener(atom()) -> ok | {error, term()}). -restart_listener(ListenerId) -> - apply_on_listener(ListenerId, fun restart_listener/3). - --spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}). -restart_listener(ZoneName, ListenerName, Conf) -> - case stop_listener(ZoneName, ListenerName, Conf) of - ok -> start_listener(ZoneName, ListenerName, Conf); - Error -> Error - end. - -%% @doc Stop all listeners. --spec(stop() -> ok). -stop() -> - foreach_listeners(fun stop_listener/3). - --spec(stop_listener(atom()) -> ok | {error, term()}). -stop_listener(ListenerId) -> - apply_on_listener(ListenerId, fun stop_listener/3). - --spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}). -stop_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn}) -> - esockd:close(listener_id(ZoneName, ListenerName), ListenOn); -stop_listener(ZoneName, ListenerName, #{type := ws}) -> - cowboy:stop_listener(listener_id(ZoneName, ListenerName)); -stop_listener(ZoneName, ListenerName, #{type := quic}) -> - quicer:stop_listener(listener_id(ZoneName, ListenerName)). - merge_default(Options) -> case lists:keytake(tcp_options, 1, Options) of {value, {tcp_options, TcpOpts}, Options1} -> @@ -258,15 +255,15 @@ merge_default(Options) -> [{tcp_options, ?MQTT_SOCKOPTS} | Options] end. -format(Port) when is_integer(Port) -> +format_addr(Port) when is_integer(Port) -> io_lib:format("0.0.0.0:~w", [Port]); -format({Addr, Port}) when is_list(Addr) -> +format_addr({Addr, Port}) when is_list(Addr) -> io_lib:format("~s:~w", [Addr, Port]); -format({Addr, Port}) when is_tuple(Addr) -> +format_addr({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). -listener_id(ZoneName, ListenerName) -> - list_to_atom(lists:append([atom_to_list(ZoneName), ":", atom_to_list(ListenerName)])). +listener_id(Type, ListenerName) -> + list_to_atom(lists:append([atom_to_list(Type), ":", atom_to_list(ListenerName)])). decode_listener_id(Id) -> try @@ -276,6 +273,9 @@ decode_listener_id(Id) -> _ : _ -> error({invalid_listener_id, Id}) end. +zone(Opts) -> + maps:get(zone, Opts, undefined). + ssl_opts(Opts) -> maps:to_list( emqx_tls_lib:drop_tls13_for_old_otp( @@ -287,9 +287,6 @@ tcp_opts(Opts) -> maps:without([active_n], maps:get(tcp, Opts, #{}))). -is_ssl(Opts) -> - emqx_map_lib:deep_get([ssl, enable], Opts, false). - foreach_listeners(Do) -> lists:foreach( fun({ZoneName, LName, LConf}) -> @@ -298,21 +295,13 @@ foreach_listeners(Do) -> has_enabled_listener_conf_by_type(Type) -> lists:any( - fun({_Zone, _LName, LConf}) when is_map(LConf) -> - Type =:= maps:get(type, LConf) andalso - maps:get(enabled, LConf, true) + fun({Type0, _LName, LConf}) when is_map(LConf) -> + Type =:= Type0 andalso maps:get(enabled, LConf, true) end, do_list()). -%% merge the configs in zone and listeners in a manner that -%% all config entries in the listener are prior to the ones in the zone. -merge_zone_and_listener_confs(ZoneConf, ListenerConf) -> - ConfsInZonesOnly = [listeners, overall_max_connections], - BaseConf = maps:without(ConfsInZonesOnly, ZoneConf), - emqx_map_lib:deep_merge(BaseConf, ListenerConf). - apply_on_listener(ListenerId, Do) -> - {ZoneName, ListenerName} = decode_listener_id(ListenerId), - case emqx_config:find_listener_conf(ZoneName, ListenerName, []) of - {not_found, _, _} -> error({listener_config_not_found, ZoneName, ListenerName}); - {ok, Conf} -> Do(ZoneName, ListenerName, Conf) + {Type, ListenerName} = decode_listener_id(ListenerId), + case emqx_config:find_listener_conf(Type, ListenerName, []) of + {not_found, _, _} -> error({listener_config_not_found, Type, ListenerName}); + {ok, Conf} -> Do(Type, ListenerName, Conf) end. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index cf18f1256..866e14d48 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -70,18 +70,17 @@ -export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([ssl/1]). -structs() -> ["zones", "listeners", "broker", "plugins", "sysmon", "alarm", "authorization"]. +structs() -> ["zones", "mqtt", "flapping_detect", "force_shutdown", "force_gc", + "conn_congestion", "rate_limit", "quota", "listeners", "broker", "plugins", + "sysmon", "alarm", "authorization"]. fields("stats") -> [ {"enable", t(boolean(), undefined, true)} ]; -fields("auth") -> - [ {"enable", t(boolean(), undefined, false)} - ]; - fields("authorization") -> [ {"no_match", t(union(allow, deny), undefined, allow)} + , {"enable", t(boolean(), undefined, true)} , {"deny_action", t(union(ignore, disconnect), undefined, ignore)} , {"cache", ref("authorization_cache")} ]; @@ -93,8 +92,7 @@ fields("authorization_cache") -> ]; fields("mqtt") -> - [ {"mountpoint", t(binary(), undefined, <<>>)} - , {"idle_timeout", maybe_infinity(duration(), "15s")} + [ {"idle_timeout", maybe_infinity(duration(), "15s")} , {"max_packet_size", t(bytesize(), undefined, "1MB")} , {"max_clientid_len", t(range(23, 65535), undefined, 65535)} , {"max_topic_levels", t(range(1, 65535), undefined, 65535)} @@ -129,13 +127,11 @@ fields("zones") -> fields("zone_settings") -> [ {"mqtt", ref("mqtt")} - , {"auth", ref("auth")} , {"stats", ref("stats")} , {"flapping_detect", ref("flapping_detect")} , {"force_shutdown", ref("force_shutdown")} , {"conn_congestion", ref("conn_congestion")} , {"force_gc", ref("force_gc")} - , {"overall_max_connections", maybe_infinity(integer())} , {"listeners", t("listeners")} ]; @@ -143,10 +139,10 @@ fields("rate_limit") -> [ {"max_conn_rate", maybe_infinity(integer(), 1000)} , {"conn_messages_in", maybe_infinity(comma_separated_list())} , {"conn_bytes_in", maybe_infinity(comma_separated_list())} - , {"quota", ref("rate_limit_quota")} + , {"quota", ref("quota")} ]; -fields("rate_limit_quota") -> +fields("quota") -> [ {"conn_messages_routing", maybe_infinity(comma_separated_list())} , {"overall_messages_routing", maybe_infinity(comma_separated_list())} ]; @@ -190,30 +186,51 @@ fields("force_gc") -> ]; fields("listeners") -> - [ {"$name", hoconsc:union( - [ disabled - , hoconsc:ref("mqtt_tcp_listener") - , hoconsc:ref("mqtt_ws_listener") - , hoconsc:ref("mqtt_quic_listener") - ])} + [ {"tcp", ref("t_tcp_listeners")} + , {"ssl", ref("t_ssl_listeners")} + , {"ws", ref("t_ws_listeners")} + , {"wss", ref("t_wss_listeners")} + , {"quic", ref("t_quic_listeners")} + ]; + +fields("t_tcp_listeners") -> + [ {"$name", ref("mqtt_tcp_listener")} + ]; +fields("t_ssl_listeners") -> + [ {"$name", ref("mqtt_ssl_listener")} + ]; +fields("t_ws_listeners") -> + [ {"$name", ref("mqtt_ws_listener")} + ]; +fields("t_wss_listeners") -> + [ {"$name", ref("mqtt_wss_listener")} + ]; +fields("t_quic_listeners") -> + [ {"$name", ref("mqtt_quic_listener")} ]; fields("mqtt_tcp_listener") -> - [ {"type", t(tcp)} - , {"tcp", ref("tcp_opts")} + [ {"tcp", ref("tcp_opts")} + ] ++ mqtt_listener(); + +fields("mqtt_ssl_listener") -> + [ {"tcp", ref("tcp_opts")} , {"ssl", ref("ssl_opts")} ] ++ mqtt_listener(); fields("mqtt_ws_listener") -> - [ {"type", t(ws)} - , {"tcp", ref("tcp_opts")} + [ {"tcp", ref("tcp_opts")} + , {"websocket", ref("ws_opts")} + ] ++ mqtt_listener(); + +fields("mqtt_wss_listener") -> + [ {"tcp", ref("tcp_opts")} , {"ssl", ref("ssl_opts")} , {"websocket", ref("ws_opts")} ] ++ mqtt_listener(); fields("mqtt_quic_listener") -> [ {"enabled", t(boolean(), undefined, true)} - , {"type", t(quic)} , {"certfile", t(string(), undefined, undefined)} , {"keyfile", t(string(), undefined, undefined)} , {"ciphers", t(comma_separated_list(), undefined, "TLS_AES_256_GCM_SHA384," @@ -332,6 +349,8 @@ base_listener() -> , {"acceptors", t(integer(), undefined, 16)} , {"max_connections", maybe_infinity(integer(), infinity)} , {"rate_limit", ref("rate_limit")} + , {"mountpoint", t(binary(), undefined, <<>>)} + , {"zone", t(binary(), undefined, undefined)} ]. %% utils diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index b085db597..b7a69afe1 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -153,13 +153,11 @@ param_path_node() -> }. param_path_id() -> - {Example,_} = hd(emqx_mgmt:list_listeners(node())), #{ name => id, in => path, schema => #{type => string}, - required => true, - example => Example + required => true }. param_path_operation()->