feat(config): make the listeners up

This commit is contained in:
Shawn 2021-07-02 22:17:22 +08:00
parent 8dcb5ceb86
commit 8b3fcde380
16 changed files with 305 additions and 427 deletions

View File

@ -817,25 +817,25 @@ broker {
## - `conn_congestion.*`
## - `overall_max_connections`
##
## Syntax: zone.<zone-name> {}
zone.default {
## Syntax: zones.<zone-name> {}
zones.default {
## Enable authentication
##
## @doc zone.<name>.auth.enable
## @doc zones.<name>.auth.enable
## ValueType: Boolean
## Default: false
auth.enable: false
## Enable per connection statistics.
##
## @doc zone.<name>.stats.enable
## @doc zones.<name>.stats.enable
## ValueType: Boolean
## Default: true
stats.enable: true
## Maximum number of concurrent connections.
##
## @doc zone.<name>.listeners.<name>.overall_max_connections
## @doc zones.<name>.overall_max_connections
## ValueType: Number | infinity
## Default: infinity
overall_max_connections: infinity
@ -846,7 +846,7 @@ zone.default {
## is delivered to the subscriber. The mountpoint is a way that users can use
## to implement isolation of message routing between different listeners.
##
## For example if a clientA subscribes to "t" with `zone.<name>.mqtt.mountpoint`
## For example if a clientA subscribes to "t" with `zones.<name>.mqtt.mountpoint`
## set to "some_tenant", then the client accually subscribes to the topic
## "some_tenant/t". Similarly if another clientB (connected to the same listener
## with the clientA) send a message to topic "t", the message is accually route
@ -859,7 +859,7 @@ zone.default {
## - %c: clientid
## - %u: username
##
## @doc zone.<name>.listeners.<name>.mountpoint
## @doc zones.<name>.listeners.<name>.mountpoint
## ValueType: String
## Default: ""
mountpoint: ""
@ -868,21 +868,21 @@ zone.default {
## TCP connection is established but MQTT CONNECT has not been
## received.
##
## @doc zone.<name>.mqtt.idle_timeout
## @doc zones.<name>.mqtt.idle_timeout
## ValueType: Duration | infinity
## Default: 15s
idle_timeout: 15s
## Maximum MQTT packet size allowed.
##
## @doc zone.<name>.mqtt.max_packet_size
## @doc zones.<name>.mqtt.max_packet_size
## ValueType: Bytes | infinity
## Default: 1MB
max_packet_size: 1MB
## Maximum length of MQTT clientId allowed.
##
## @doc zone.<name>.mqtt.max_clientid_len
## @doc zones.<name>.mqtt.max_clientid_len
## ValueType: Integer
## Range: [23, 65535]
## Default: 65535
@ -890,7 +890,7 @@ zone.default {
## Maximum topic levels allowed.
##
## @doc zone.<name>.mqtt.max_topic_levels
## @doc zones.<name>.mqtt.max_topic_levels
## ValueType: Integer
## Range: [1, 65535]
## Default: 65535
@ -898,14 +898,14 @@ zone.default {
## Maximum QoS allowed.
##
## @doc zone.<name>.mqtt.max_qos_allowed
## @doc zones.<name>.mqtt.max_qos_allowed
## ValueType: 0 | 1 | 2
## Default: 2
max_qos_allowed: 2
## Maximum Topic Alias, 0 means no topic alias supported.
##
## @doc zone.<name>.mqtt.max_topic_alias
## @doc zones.<name>.mqtt.max_topic_alias
## ValueType: Integer
## Range: [0, 65535]
## Default: 65535
@ -913,35 +913,35 @@ zone.default {
## Whether the Server supports MQTT retained messages.
##
## @doc zone.<name>.mqtt.retain_available
## @doc zones.<name>.mqtt.retain_available
## ValueType: Boolean
## Default: true
retain_available: true
## Whether the Server supports MQTT Wildcard Subscriptions
##
## @doc zone.<name>.mqtt.wildcard_subscription
## @doc zones.<name>.mqtt.wildcard_subscription
## ValueType: Boolean
## Default: true
wildcard_subscription: true
## Whether the Server supports MQTT Shared Subscriptions.
##
## @doc zone.<name>.mqtt.shared_subscription
## @doc zones.<name>.mqtt.shared_subscription
## ValueType: Boolean
## Default: true
shared_subscription: true
## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
##
## @doc zone.<name>.mqtt.ignore_loop_deliver
## @doc zones.<name>.mqtt.ignore_loop_deliver
## ValueType: Boolean
## Default: false
ignore_loop_deliver: false
## Whether to parse the MQTT frame in strict mode
##
## @doc zone.<name>.mqtt.strict_mode
## @doc zones.<name>.mqtt.strict_mode
## ValueType: Boolean
## Default: false
strict_mode: false
@ -950,14 +950,14 @@ zone.default {
##
## This feature is disabled if not set
##
## @doc zone.<name>.mqtt.response_information
## @doc zones.<name>.mqtt.response_information
## ValueType: String
## Default: ""
response_information: ""
## Server Keep Alive of MQTT 5.0
##
## @doc zone.<name>.mqtt.server_keepalive
## @doc zones.<name>.mqtt.server_keepalive
## ValueType: Number | disabled
## Default: disabled
server_keepalive: disabled
@ -965,7 +965,7 @@ zone.default {
## The backoff for MQTT keepalive timeout. The broker will kick a connection out
## until 'Keepalive * backoff * 2' timeout.
##
## @doc zone.<name>.mqtt.keepalive_backoff
## @doc zones.<name>.mqtt.keepalive_backoff
## ValueType: Float
## Range: (0.5, 1]
## Default: 0.75
@ -973,7 +973,7 @@ zone.default {
## Maximum number of subscriptions allowed.
##
## @doc zone.<name>.mqtt.max_subscriptions
## @doc zones.<name>.mqtt.max_subscriptions
## ValueType: Integer | infinity
## Range: [1, )
## Default: infinity
@ -981,14 +981,14 @@ zone.default {
## Force to upgrade QoS according to subscription.
##
## @doc zone.<name>.mqtt.upgrade_qos
## @doc zones.<name>.mqtt.upgrade_qos
## ValueType: Boolean
## Default: false
upgrade_qos: false
## Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked.
##
## @doc zone.<name>.mqtt.max_inflight
## @doc zones.<name>.mqtt.max_inflight
## ValueType: Integer
## Range: [1, 65535]
## Default: 32
@ -996,14 +996,14 @@ zone.default {
## Retry interval for QoS1/2 message delivering.
##
## @doc zone.<name>.mqtt.retry_interval
## @doc zones.<name>.mqtt.retry_interval
## ValueType: Duration
## Default: 30s
retry_interval: 30s
## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL.
##
## @doc zone.<name>.mqtt.max_awaiting_rel
## @doc zones.<name>.mqtt.max_awaiting_rel
## ValueType: Integer | infinity
## Range: [1, )
## Default: 100
@ -1011,14 +1011,14 @@ zone.default {
## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout.
##
## @doc zone.<name>.mqtt.await_rel_timeout
## @doc zones.<name>.mqtt.await_rel_timeout
## ValueType: Duration
## Default: 300s
await_rel_timeout: 300s
## Default session expiry interval for MQTT V3.1.1 connections.
##
## @doc zone.<name>.mqtt.session_expiry_interval
## @doc zones.<name>.mqtt.session_expiry_interval
## ValueType: Duration
## Default: 2h
session_expiry_interval: 2h
@ -1026,7 +1026,7 @@ zone.default {
## Maximum queue length. Enqueued messages when persistent client disconnected,
## or inflight window is full.
##
## @doc zone.<name>.mqtt.max_mqueue_len
## @doc zones.<name>.mqtt.max_mqueue_len
## ValueType: Integer | infinity
## Range: [0, )
## Default: 1000
@ -1042,7 +1042,7 @@ zone.default {
## either highest or lowest priority depending on the configured
## value for mqtt.mqueue_default_priority
##
## @doc zone.<name>.mqtt.mqueue_priorities
## @doc zones.<name>.mqtt.mqueue_priorities
## ValueType: Array<TopicName>
## Examples:
## To configure "t/1" > "t/2" > "t/3":
@ -1052,21 +1052,21 @@ zone.default {
## Default to highest priority for topics not matching priority table
##
## @doc zone.<name>.mqtt.mqueue_default_priority
## @doc zones.<name>.mqtt.mqueue_default_priority
## ValueType: highest | lowest
## Default: highest
mqueue_default_priority: highest
## Whether to enqueue QoS0 messages.
##
## @doc zone.<name>.mqtt.mqueue_store_qos0
## @doc zones.<name>.mqtt.mqueue_store_qos0
## ValueType: Boolean
## Default: true
mqueue_store_qos0: true
## Whether use username replace client id
##
## @doc zone.<name>.mqtt.use_username_as_clientid
## @doc zones.<name>.mqtt.use_username_as_clientid
## ValueType: Boolean
## Default: false
use_username_as_clientid: false
@ -1074,7 +1074,7 @@ zone.default {
## Use the CN, DN or CRT field from the client certificate as a username.
## Only works for SSL connection.
##
## @doc zone.<name>.mqtt.peer_cert_as_username
## @doc zones.<name>.mqtt.peer_cert_as_username
## ValueType: cn | dn | crt | disabled
## Default: disabled
peer_cert_as_username: disabled
@ -1082,7 +1082,7 @@ zone.default {
## Use the CN, DN or CRT field from the client certificate as a clientid.
## Only works for SSL connection.
##
## @doc zone.<name>.mqtt.peer_cert_as_clientid
## @doc zones.<name>.mqtt.peer_cert_as_clientid
## ValueType: cn | dn | crt | disabled
## Default: disabled
peer_cert_as_clientid: disabled
@ -1093,14 +1093,14 @@ zone.default {
## Enable ACL check.
##
## @doc zone.<name>.acl.enable
## @doc zones.<name>.acl.enable
## ValueType: Boolean
## Default: false
enable: false
## The action when acl check reject current operation
##
## @doc zone.<name>.acl.deny_action
## @doc zones.<name>.acl.deny_action
## ValueType: ignore | disconnect
## Default: ignore
deny_action: ignore
@ -1109,14 +1109,14 @@ zone.default {
##
## If enabled, ACLs roles for each client will be cached in the memory
##
## @doc zone.<name>.acl.cache.enable
## @doc zones.<name>.acl.cache.enable
## ValueType: Boolean
## Default: true
cache.enable: true
## The maximum count of ACL entries can be cached for a client.
##
## @doc zone.<name>.acl.cache.max_size
## @doc zones.<name>.acl.cache.max_size
## ValueType: Integer
## Range: [0, 1048576]
## Default: 32
@ -1124,7 +1124,7 @@ zone.default {
## The time after which an ACL cache entry will be deleted
##
## @doc zone.<name>.acl.cache.ttl
## @doc zones.<name>.acl.cache.ttl
## ValueType: Duration
## Default: 1m
cache.ttl: 1m
@ -1138,28 +1138,28 @@ zone.default {
## After the limit is reached, successive CONNECT requests are forbidden
## (banned) until the end of the time period defined by `ban_time`.
##
## @doc zone.<name>.flapping_detect.enable
## @doc zones.<name>.flapping_detect.enable
## ValueType: Boolean
## Default: true
enable: true
## The max disconnect allowed of a MQTT Client in `window_time`
##
## @doc zone.<name>.flapping_detect.max_count
## @doc zones.<name>.flapping_detect.max_count
## ValueType: Integer
## Default: 15
max_count: 15
## The time window for flapping detect
##
## @doc zone.<name>.flapping_detect.window_time
## @doc zones.<name>.flapping_detect.window_time
## ValueType: Duration
## Default: 1m
window_time: 1m
## How long the clientid will be banned
##
## @doc zone.<name>.flapping_detect.ban_time
## @doc zones.<name>.flapping_detect.ban_time
## ValueType: Duration
## Default: 5m
ban_time: 5m
@ -1169,13 +1169,13 @@ zone.default {
force_shutdown: {
## Enable force_shutdown
##
## @doc zone.<name>.force_shutdown.enable
## @doc zones.<name>.force_shutdown.enable
## ValueType: Boolean
## Default: true
enable: true
## Max message queue length
## @doc zone.<name>.force_shutdown.max_message_queue_len
## @doc zones.<name>.force_shutdown.max_message_queue_len
## ValueType: Integer
## Range: (0, )
## Default: 1000
@ -1183,7 +1183,7 @@ zone.default {
## Total heap size
##
## @doc zone.<name>.force_shutdown.max_heap_size
## @doc zones.<name>.force_shutdown.max_heap_size
## ValueType: Size
## Default: 32MB
max_heap_size: 32MB
@ -1193,13 +1193,13 @@ zone.default {
## Force the MQTT connection process GC after this number of
## messages or bytes passed through.
##
## @doc zone.<name>.force_gc.enable
## @doc zones.<name>.force_gc.enable
## ValueType: Boolean
## Default: true
enable: true
## GC the process after how many messages received
## @doc zone.<name>.force_gc.max_message_queue_len
## @doc zones.<name>.force_gc.max_message_queue_len
## ValueType: Integer
## Range: (0, )
## Default: 16000
@ -1207,7 +1207,7 @@ zone.default {
## GC the process after how much bytes passed through
##
## @doc zone.<name>.force_gc.bytes
## @doc zones.<name>.force_gc.bytes
## ValueType: Size
## Default: 16MB
bytes: 16MB
@ -1231,7 +1231,7 @@ zone.default {
## Where the <ClientID> is the client-id of the congested MQTT connection.
## And the <Username> is the username or "unknown_user" of not provided by the client.
##
## @doc zone.<name>.conn_congestion.enable_alarm
## @doc zones.<name>.conn_congestion.enable_alarm
## ValueType: Boolean
## Default: true
enable_alarm: true
@ -1243,7 +1243,7 @@ zone.default {
##
## This is to avoid clearing and sending the alarm again too often.
##
## @doc zone.<name>.conn_congestion.min_alarm_sustain_duration
## @doc zones.<name>.conn_congestion.min_alarm_sustain_duration
## ValueType: Duration
## Default: 1m
min_alarm_sustain_duration: 1m
@ -1256,7 +1256,7 @@ zone.default {
## The type of the listener.
##
## @doc zone.<name>.listeners.<name>.type
## @doc zones.<name>.listeners.<name>.type
## ValueType: tcp | ws
## - tcp: MQTT over TCP
## - ws: MQTT over Websocket
@ -1265,7 +1265,7 @@ zone.default {
## The IP address and port that the listener will bind.
##
## @doc zone.<name>.listeners.<name>.bind
## @doc zones.<name>.listeners.<name>.bind
## ValueType: IPAddress | Port | IPAddrPort
## Required: true
## Examples: 1883, 127.0.0.1:1883, ::1:1883
@ -1273,14 +1273,14 @@ zone.default {
## The size of the acceptor pool for this listener.
##
## @doc zone.<name>.listeners.<name>.acceptors
## @doc zones.<name>.listeners.<name>.acceptors
## ValueType: Number
## Default: 16
acceptors: 16
## Maximum number of concurrent connections.
##
## @doc zone.<name>.listeners.<name>.max_connections
## @doc zones.<name>.listeners.<name>.max_connections
## ValueType: Number | infinity
## Default: infinity
max_connections: 1024000
@ -1289,7 +1289,7 @@ zone.default {
##
## See: https://github.com/emqtt/esockd#allowdeny
##
## @doc zone.<name>.listeners.<name>.access_rules
## @doc zones.<name>.listeners.<name>.access_rules
## ValueType: Array<AccessRules>
## Default: []
## Examples:
@ -1306,7 +1306,7 @@ zone.default {
##
## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/
##
## @doc zone.<name>.listeners.<name>.proxy_protocol
## @doc zones.<name>.listeners.<name>.proxy_protocol
## ValueType: Boolean
## Default: false
proxy_protocol: false
@ -1314,7 +1314,7 @@ zone.default {
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
## if no proxy protocol packet recevied within the timeout.
##
## @doc zone.<name>.listeners.<name>.proxy_protocol_timeout
## @doc zones.<name>.listeners.<name>.proxy_protocol_timeout
## ValueType: Duration
## Default: 3s
proxy_protocol_timeout: 3s
@ -1322,7 +1322,7 @@ zone.default {
rate_limit {
## Maximum connections per second.
##
## @doc zone.<name>.max_conn_rate
## @doc zones.<name>.max_conn_rate
## ValueType: Number | infinity
## Default: 1000
## Examples:
@ -1331,7 +1331,7 @@ zone.default {
## Message limit for the a external MQTT connection.
##
## @doc zone.<name>.rate_limit.conn_messages_in
## @doc zones.<name>.rate_limit.conn_messages_in
## ValueType: String | infinity
## Default: infinity
## Examples: 100 messages per 10 seconds.
@ -1344,7 +1344,7 @@ zone.default {
## The connection won't accept more messages if the messages come
## faster than the limit.
##
## @doc zone.<name>.rate_limit.conn_bytes_in
## @doc zones.<name>.rate_limit.conn_bytes_in
## ValueType: String | infinity
## Default: infinity
## Examples: 100KB incoming per 10 seconds.
@ -1355,7 +1355,7 @@ zone.default {
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##
## @doc zone.<name>.rate_limit.quota.conn_messages_routing
## @doc zones.<name>.rate_limit.quota.conn_messages_routing
## ValueType: String | infinity
## Default: infinity
## Examples: 100 messaegs per 1s:
@ -1365,7 +1365,7 @@ zone.default {
## Messages quota for the all of external MQTT connections.
## This value consumed by the number of recipient on a message.
##
## @doc zone.<name>.rate_limit.quota.overall_messages_routing
## @doc zones.<name>.rate_limit.quota.overall_messages_routing
## ValueType: String | infinity
## Default: infinity
## Examples: 200000 messages per 1s:
@ -1387,7 +1387,7 @@ zone.default {
## The type of the listener.
##
## @doc zone.<name>.listeners.<name>.type
## @doc zones.<name>.listeners.<name>.type
## ValueType: tcp | ws
## - tcp: MQTT over TCP
## - ws: MQTT over Websocket
@ -1396,7 +1396,7 @@ zone.default {
## The IP address and port that the listener will bind.
##
## @doc zone.<name>.listeners.<name>.bind
## @doc zones.<name>.listeners.<name>.bind
## ValueType: IPAddress | Port | IPAddrPort
## Required: true
## Examples: 8883, 127.0.0.1:8883, ::1:8883
@ -1404,14 +1404,14 @@ zone.default {
## The size of the acceptor pool for this listener.
##
## @doc zone.<name>.listeners.<name>.acceptors
## @doc zones.<name>.listeners.<name>.acceptors
## ValueType: Number
## Default: 16
acceptors: 16
## Maximum number of concurrent connections.
##
## @doc zone.<name>.listeners.<name>.max_connections
## @doc zones.<name>.listeners.<name>.max_connections
## ValueType: Number | infinity
## Default: infinity
max_connections: 512000
@ -1420,7 +1420,7 @@ zone.default {
##
## See: https://github.com/emqtt/esockd#allowdeny
##
## @doc zone.<name>.listeners.<name>.access_rules
## @doc zones.<name>.listeners.<name>.access_rules
## ValueType: Array<AccessRules>
## Default: []
## Examples:
@ -1437,15 +1437,15 @@ zone.default {
##
## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/
##
## @doc zone.<name>.listeners.<name>.proxy_protocol
## @doc zones.<name>.listeners.<name>.proxy_protocol
## ValueType: Boolean
## Default: true
proxy_protocol: true
proxy_protocol: false
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
## if no proxy protocol packet recevied within the timeout.
##
## @doc zone.<name>.listeners.<name>.proxy_protocol_timeout
## @doc zones.<name>.listeners.<name>.proxy_protocol_timeout
## ValueType: Duration
## Default: 3s
proxy_protocol_timeout: 3s
@ -1453,7 +1453,7 @@ zone.default {
rate_limit {
## Maximum connections per second.
##
## @doc zone.<name>.max_conn_rate
## @doc zones.<name>.max_conn_rate
## ValueType: Number | infinity
## Default: 1000
## Examples:
@ -1462,7 +1462,7 @@ zone.default {
## Message limit for the a external MQTT connection.
##
## @doc zone.<name>.rate_limit.conn_messages_in
## @doc zones.<name>.rate_limit.conn_messages_in
## ValueType: String | infinity
## Default: infinity
## Examples: 100 messages per 10 seconds.
@ -1475,7 +1475,7 @@ zone.default {
## The connection won't accept more messages if the messages come
## faster than the limit.
##
## @doc zone.<name>.rate_limit.conn_bytes_in
## @doc zones.<name>.rate_limit.conn_bytes_in
## ValueType: String | infinity
## Default: infinity
## Examples: 100KB incoming per 10 seconds.
@ -1486,7 +1486,7 @@ zone.default {
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##
## @doc zone.<name>.rate_limit.quota.conn_messages_routing
## @doc zones.<name>.rate_limit.quota.conn_messages_routing
## ValueType: String | infinity
## Default: infinity
## Examples: 100 messaegs per 1s:
@ -1496,7 +1496,7 @@ zone.default {
## Messages quota for the all of external MQTT connections.
## This value consumed by the number of recipient on a message.
##
## @doc zone.<name>.rate_limit.quota.overall_messages_routing
## @doc zones.<name>.rate_limit.quota.overall_messages_routing
## ValueType: String | infinity
## Default: infinity
## Examples: 200000 messages per 1s:
@ -1508,6 +1508,7 @@ zone.default {
## SSL options
## See ${example_common_ssl_options} for more information
ssl.enable: true
ssl.versions: ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
ssl.keyfile: "{{ platform_etc_dir }}/certs/key.pem"
ssl.certfile: "{{ platform_etc_dir }}/certs/cert.pem"
ssl.cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
@ -1524,7 +1525,7 @@ zone.default {
## The type of the listener.
##
## @doc zone.<name>.listeners.<name>.type
## @doc zones.<name>.listeners.<name>.type
## ValueType: tcp | ws
## - tcp: MQTT over TCP
## - ws: MQTT over Websocket
@ -1533,7 +1534,7 @@ zone.default {
## The IP address and port that the listener will bind.
##
## @doc zone.<name>.listeners.<name>.bind
## @doc zones.<name>.listeners.<name>.bind
## ValueType: IPAddress | Port | IPAddrPort
## Required: true
## Examples: 8083, 127.0.0.1:8083, ::1:8083
@ -1541,14 +1542,14 @@ zone.default {
## The size of the acceptor pool for this listener.
##
## @doc zone.<name>.listeners.<name>.acceptors
## @doc zones.<name>.listeners.<name>.acceptors
## ValueType: Number
## Default: 16
acceptors: 16
## Maximum number of concurrent connections.
##
## @doc zone.<name>.listeners.<name>.max_connections
## @doc zones.<name>.listeners.<name>.max_connections
## ValueType: Number | infinity
## Default: infinity
max_connections: 1024000
@ -1557,7 +1558,7 @@ zone.default {
##
## See: https://github.com/emqtt/esockd#allowdeny
##
## @doc zone.<name>.listeners.<name>.access_rules
## @doc zones.<name>.listeners.<name>.access_rules
## ValueType: Array<AccessRules>
## Default: []
## Examples:
@ -1574,15 +1575,15 @@ zone.default {
##
## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/
##
## @doc zone.<name>.listeners.<name>.proxy_protocol
## @doc zones.<name>.listeners.<name>.proxy_protocol
## ValueType: Boolean
## Default: true
proxy_protocol: true
proxy_protocol: false
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
## if no proxy protocol packet recevied within the timeout.
##
## @doc zone.<name>.listeners.<name>.proxy_protocol_timeout
## @doc zones.<name>.listeners.<name>.proxy_protocol_timeout
## ValueType: Duration
## Default: 3s
proxy_protocol_timeout: 3s
@ -1590,7 +1591,7 @@ zone.default {
rate_limit {
## Maximum connections per second.
##
## @doc zone.<name>.max_conn_rate
## @doc zones.<name>.max_conn_rate
## ValueType: Number | infinity
## Default: 1000
## Examples:
@ -1599,7 +1600,7 @@ zone.default {
## Message limit for the a external MQTT connection.
##
## @doc zone.<name>.rate_limit.conn_messages_in
## @doc zones.<name>.rate_limit.conn_messages_in
## ValueType: String | infinity
## Default: infinity
## Examples: 100 messages per 10 seconds.
@ -1612,7 +1613,7 @@ zone.default {
## The connection won't accept more messages if the messages come
## faster than the limit.
##
## @doc zone.<name>.rate_limit.conn_bytes_in
## @doc zones.<name>.rate_limit.conn_bytes_in
## ValueType: String | infinity
## Default: infinity
## Examples: 100KB incoming per 10 seconds.
@ -1623,7 +1624,7 @@ zone.default {
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##
## @doc zone.<name>.rate_limit.quota.conn_messages_routing
## @doc zones.<name>.rate_limit.quota.conn_messages_routing
## ValueType: String | infinity
## Default: infinity
## Examples: 100 messaegs per 1s:
@ -1633,7 +1634,7 @@ zone.default {
## Messages quota for the all of external MQTT connections.
## This value consumed by the number of recipient on a message.
##
## @doc zone.<name>.rate_limit.quota.overall_messages_routing
## @doc zones.<name>.rate_limit.quota.overall_messages_routing
## ValueType: String | infinity
## Default: infinity
## Examples: 200000 messages per 1s:
@ -1658,7 +1659,7 @@ zone.default {
## The type of the listener.
##
## @doc zone.<name>.listeners.<name>.type
## @doc zones.<name>.listeners.<name>.type
## ValueType: tcp | ws
## - tcp: MQTT over TCP
## - ws: MQTT over Websocket
@ -1667,7 +1668,7 @@ zone.default {
## The IP address and port that the listener will bind.
##
## @doc zone.<name>.listeners.<name>.bind
## @doc zones.<name>.listeners.<name>.bind
## ValueType: IPAddress | Port | IPAddrPort
## Required: true
## Examples: 8084, 127.0.0.1:8084, ::1:8084
@ -1675,14 +1676,14 @@ zone.default {
## The size of the acceptor pool for this listener.
##
## @doc zone.<name>.listeners.<name>.acceptors
## @doc zones.<name>.listeners.<name>.acceptors
## ValueType: Number
## Default: 16
acceptors: 16
## Maximum number of concurrent connections.
##
## @doc zone.<name>.listeners.<name>.max_connections
## @doc zones.<name>.listeners.<name>.max_connections
## ValueType: Number | infinity
## Default: infinity
max_connections: 512000
@ -1691,7 +1692,7 @@ zone.default {
##
## See: https://github.com/emqtt/esockd#allowdeny
##
## @doc zone.<name>.listeners.<name>.access_rules
## @doc zones.<name>.listeners.<name>.access_rules
## ValueType: Array<AccessRules>
## Default: []
## Examples:
@ -1708,15 +1709,15 @@ zone.default {
##
## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/
##
## @doc zone.<name>.listeners.<name>.proxy_protocol
## @doc zones.<name>.listeners.<name>.proxy_protocol
## ValueType: Boolean
## Default: true
proxy_protocol: true
proxy_protocol: false
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
## if no proxy protocol packet recevied within the timeout.
##
## @doc zone.<name>.listeners.<name>.proxy_protocol_timeout
## @doc zones.<name>.listeners.<name>.proxy_protocol_timeout
## ValueType: Duration
## Default: 3s
proxy_protocol_timeout: 3s
@ -1724,7 +1725,7 @@ zone.default {
rate_limit {
## Maximum connections per second.
##
## @doc zone.<name>.max_conn_rate
## @doc zones.<name>.max_conn_rate
## ValueType: Number | infinity
## Default: 1000
## Examples:
@ -1733,7 +1734,7 @@ zone.default {
## Message limit for the a external MQTT connection.
##
## @doc zone.<name>.rate_limit.conn_messages_in
## @doc zones.<name>.rate_limit.conn_messages_in
## ValueType: String | infinity
## Default: infinity
## Examples: 100 messages per 10 seconds.
@ -1746,7 +1747,7 @@ zone.default {
## The connection won't accept more messages if the messages come
## faster than the limit.
##
## @doc zone.<name>.rate_limit.conn_bytes_in
## @doc zones.<name>.rate_limit.conn_bytes_in
## ValueType: String | infinity
## Default: infinity
## Examples: 100KB incoming per 10 seconds.
@ -1757,7 +1758,7 @@ zone.default {
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##
## @doc zone.<name>.rate_limit.quota.conn_messages_routing
## @doc zones.<name>.rate_limit.quota.conn_messages_routing
## ValueType: String | infinity
## Default: infinity
## Examples: 100 messaegs per 1s:
@ -1767,7 +1768,7 @@ zone.default {
## Messages quota for the all of external MQTT connections.
## This value consumed by the number of recipient on a message.
##
## @doc zone.<name>.rate_limit.quota.overall_messages_routing
## @doc zones.<name>.rate_limit.quota.overall_messages_routing
## ValueType: String | infinity
## Default: infinity
## Examples: 200000 messages per 1s:
@ -1797,7 +1798,7 @@ zone.default {
#This is an example zone which has less "strict" settings.
#It's useful to clients connecting the broker from trusted networks.
zone.internal {
zones.internal {
acl.enable: false
auth.enable: false
listeners.mqtt_internal: {
@ -1805,7 +1806,7 @@ zone.internal {
bind: "127.0.0.1:11883"
acceptors: 4
max_connections: 1024000
tcp.active_n: 1000
tcp.active: 1000
tcp.backlog: 512
}
}
@ -1958,10 +1959,10 @@ example_common_tcp_options {
##
## See: https://erlang.org/doc/man/inet.html#setopts-2
##
## @doc listeners.<name>.tcp.active_n
## @doc listeners.<name>.tcp.active
## ValueType: Number
## Default: 100
tcp.active_n: 100
tcp.active: 100
## TCP backlog defines the maximum length that the queue of
## pending connections can grow to.
@ -2072,10 +2073,10 @@ example_common_ssl_options {
## TLS versions only to protect from POODLE attack.
##
## @doc listeners.<name>.ssl.tls_versions
## @doc listeners.<name>.ssl.versions
## ValueType: Array<TLSVersion>
## Default: [tlsv1.2,tlsv1.1,tlsv1]
ssl.tls_versions: [tlsv1.2,tlsv1.1,tlsv1]
## Default: ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
ssl.versions: ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
## TLS Handshake timeout.
##
@ -2189,17 +2190,9 @@ example_common_ssl_options {
##
## @doc listeners.<name>.ssl.ciphers
## ValueType: Array<Cipher>
## Default: [ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA]
ssl.ciphers: [ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA]
## Default: [ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA,PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA]
ssl.ciphers: [ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA,PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA]
## Ciphers for TLS PSK. See 'https://tools.ietf.org/html/rfc4279#section-2'.
##
## Note that 'ciphers' and 'psk_ciphers' cannot be configured at the same time.
##
## @doc listeners.<name>.ssl.psk_ciphers
## ValueType: Array<Cipher>
## Default: [PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA]
ssl.psk_ciphers: [PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA]
}
## Socket options for websocket connections

View File

@ -1107,7 +1107,7 @@ listener.tcp.external.max_conn_rate = 1000
## Specify the {active, N} option for the external MQTT/TCP Socket.
##
## Value: Number
listener.tcp.external.active_n = 100
listener.tcp.external.active = 100
## Zone of the external MQTT/TCP listener belonged to.
##
@ -1247,7 +1247,7 @@ listener.tcp.internal.max_conn_rate = 1000
## Specify the {active, N} option for the internal MQTT/TCP Socket.
##
## Value: Number
listener.tcp.internal.active_n = 1000
listener.tcp.internal.active = 1000
## Zone of the internal MQTT/TCP listener belonged to.
##
@ -1344,7 +1344,7 @@ listener.ssl.external.max_conn_rate = 500
## Specify the {active, N} option for the internal MQTT/SSL Socket.
##
## Value: Number
listener.ssl.external.active_n = 100
listener.ssl.external.active = 100
## Zone of the external MQTT/SSL listener belonged to.
##
@ -1610,7 +1610,7 @@ listener.ws.external.max_conn_rate = 1000
## Simulate the {active, N} option for the MQTT/WebSocket connections.
##
## Value: Number
listener.ws.external.active_n = 100
listener.ws.external.active = 100
## Zone of the external MQTT/WebSocket listener belonged to.
##
@ -1879,7 +1879,7 @@ listener.wss.external.max_conn_rate = 1000
## Simulate the {active, N} option for the MQTT/WebSocket/SSL connections.
##
## Value: Number
listener.wss.external.active_n = 100
listener.wss.external.active = 100
## Zone of the external MQTT/WebSocket/SSL listener belonged to.
##

View File

@ -84,7 +84,7 @@
%% Sock State
sockstate :: emqx_types:sockstate(),
%% The {active, N} option
active_n :: pos_integer(),
active :: pos_integer(),
%% Limiter
limiter :: maybe(emqx_limiter:limiter()),
%% Limit Timer
@ -108,7 +108,7 @@
-type(state() :: #state{}).
-define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -165,7 +165,7 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname;
info(sockstate, #state{sockstate = SockSt}) ->
SockSt;
info(active_n, #state{active_n = ActiveN}) ->
info(active, #state{active = ActiveN}) ->
ActiveN;
info(stats_timer, #state{stats_timer = StatsTimer}) ->
StatsTimer;
@ -254,7 +254,7 @@ init_state(Transport, Socket, Options) ->
conn_mod => ?MODULE
},
Zone = proplists:get_value(zone, Options),
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
ActiveN = proplists:get_value(active, Options, ?ACTIVE_N),
PubLimit = emqx_zone:publish_limit(Zone),
BytesIn = proplists:get_value(rate_limit, Options),
RateLimit = emqx_zone:ratelimit(Zone),
@ -272,7 +272,7 @@ init_state(Transport, Socket, Options) ->
peername = Peername,
sockname = Sockname,
sockstate = idle,
active_n = ActiveN,
active = ActiveN,
limiter = Limiter,
parse_state = ParseState,
serialize = Serialize,
@ -452,12 +452,12 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
#state{active_n = ActiveN} = State) ->
#state{active = ActiveN} = State) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
handle_msg({inet_reply, _Sock, ok}, State = #state{active = ActiveN}) ->
case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
true ->
Pubs = emqx_pd:reset_counter(outgoing_pubs),
@ -800,7 +800,7 @@ activate_socket(State = #state{sockstate = blocked}) ->
{ok, State};
activate_socket(State = #state{transport = Transport,
socket = Socket,
active_n = N}) ->
active = N}) ->
case Transport:setopts(Socket, [{active, N}]) of
ok -> {ok, State#state{sockstate = running}};
Error -> Error

View File

@ -21,7 +21,6 @@
%% APIs
-export([ start/0
, ensure_all_started/0
, restart/0
, stop/0
]).
@ -29,88 +28,35 @@
-export([ start_listener/1
, start_listener/3
, stop_listener/1
, stop_listener/3
, restart_listener/1
, restart_listener/3
]).
-export([ find_id_by_listen_on/1
, find_by_listen_on/1
, find_by_id/1
, identifier/1
, format_listen_on/1
]).
-type(listener() :: #{ name := binary()
, proto := esockd:proto()
, listen_on := esockd:listen_on()
, opts := [esockd:option()]
}).
%% @doc Find listener identifier by listen-on.
%% Return empty string (binary) if listener is not found in config.
-spec(find_id_by_listen_on(esockd:listen_on()) -> binary() | false).
find_id_by_listen_on(ListenOn) ->
case find_by_listen_on(ListenOn) of
false -> false;
L -> identifier(L)
end.
%% @doc Find listener by listen-on.
%% Return 'false' if not found.
-spec(find_by_listen_on(esockd:listen_on()) -> listener() | false).
find_by_listen_on(ListenOn) ->
find_by_listen_on(ListenOn, emqx:get_env(listeners, [])).
%% @doc Find listener by identifier.
%% Return 'false' if not found.
-spec(find_by_id(string() | binary()) -> listener() | false).
find_by_id(Id) ->
find_by_id(iolist_to_binary(Id), emqx:get_env(listeners, [])).
%% @doc Return the ID of the given listener.
-spec identifier(listener()) -> binary().
identifier(#{proto := Proto, name := Name}) ->
identifier(Proto, Name).
%% @doc Start all listeners.
-spec(start() -> ok).
start() ->
lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])).
lists:foreach(fun({ZoneName, ZoneConf}) ->
lists:foreach(fun({LName, LConf}) ->
start_listener(ZoneName, LName, LConf)
end, maps:to_list(maps:get(listeners, ZoneConf, #{})))
end, maps:to_list(emqx_config:get([zones], #{}))).
%% @doc Ensure all configured listeners are started.
%% Raise exception if any of them failed to start.
-spec(ensure_all_started() -> ok).
ensure_all_started() ->
ensure_all_started(emqx:get_env(listeners, []), []).
-spec(start_listener(atom()) -> ok).
start_listener(Id) ->
{ZoneName, ListenerName} = decode_listener_id(Id),
start_listener(ZoneName, ListenerName,
emqx_config:get([zones, ZoneName, listeners, ListenerName])).
ensure_all_started([], []) -> ok;
ensure_all_started([], Failed) -> error(Failed);
ensure_all_started([L | Rest], Results) ->
#{proto := Proto, listen_on := ListenOn, opts := Options} = L,
NewResults =
case start_listener(Proto, ListenOn, Options) of
{ok, _Pid} ->
Results;
{error, {already_started, _Pid}} ->
Results;
{error, Reason} ->
[{identifier(L), Reason} | Results]
end,
ensure_all_started(Rest, NewResults).
%% @doc Format address:port for logging.
-spec(format_listen_on(esockd:listen_on()) -> [char()]).
format_listen_on(ListenOn) -> format(ListenOn).
-spec(start_listener(listener()) -> ok).
start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Options}) ->
ID = identifier(Proto, Name),
case start_listener(Proto, ListenOn, Options) of
-spec(start_listener(atom(), atom(), map()) -> ok).
start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) ->
case do_start_listener(ZoneName, ListenerName, Conf) of
{ok, _} ->
console_print("Start ~s listener on ~s successfully.~n", [ID, format(ListenOn)]);
console_print("Start ~s listener ~s on ~s successfully.~n",
[Type, listener_id(ZoneName, ListenerName), format(Bind)]);
{error, Reason} ->
io:format(standard_error, "Failed to start mqtt listener ~s on ~s: ~0p~n",
[ID, format(ListenOn), Reason]),
io:format(standard_error, "Failed to start ~s listener ~s on ~s: ~0p~n",
[Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]),
error(Reason)
end.
@ -122,124 +68,105 @@ console_print(_Fmt, _Args) -> ok.
-endif.
%% Start MQTT/TCP listener
-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
-spec(do_start_listener(atom(), atom(), map())
-> {ok, pid()} | {error, term()}).
start_listener(tcp, ListenOn, Options) ->
start_mqtt_listener('mqtt:tcp', ListenOn, Options);
%% Start MQTT/TLS listener
start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls ->
start_mqtt_listener('mqtt:ssl', ListenOn, Options);
do_start_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn} = Opts) ->
esockd:open(listener_id(ZoneName, ListenerName), ListenOn, merge_default(esockd_opts(Opts)),
{emqx_connection, start_link, [ZoneName, ListenerName]});
%% Start MQTT/WS listener
start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn,
ranch_opts(Options), ws_opts(Options));
%% Start MQTT/WSS listener
start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn,
ranch_opts(Options), ws_opts(Options)).
replace(Opts, Key, Value) -> [{Key, Value} | proplists:delete(Key, Opts)].
drop_tls13_for_old_otp(Options) ->
case proplists:get_value(ssl_options, Options) of
undefined -> Options;
SslOpts ->
SslOpts1 = emqx_tls_lib:drop_tls13_for_old_otp(SslOpts),
replace(Options, ssl_options, SslOpts1)
do_start_listener(ZoneName, ListenerName, #{type := ws, bind := ListenOn} = Opts) ->
Id = listener_id(ZoneName, ListenerName),
RanchOpts = ranch_opts(Opts),
WsOpts = ws_opts(ZoneName, ListenerName, Opts),
case is_ssl(Opts) of
false ->
cowboy:start_clear(Id, with_port(ListenOn, RanchOpts), WsOpts);
true ->
cowboy:start_tls(Id, with_port(ListenOn, RanchOpts), WsOpts)
end.
start_mqtt_listener(Name, ListenOn, Options0) ->
Options = drop_tls13_for_old_otp(Options0),
SockOpts = esockd:parse_opt(Options),
esockd:open(Name, ListenOn, merge_default(SockOpts),
{emqx_connection, start_link, [Options -- SockOpts]}).
esockd_opts(Opts0) ->
Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
Opts2 = case emqx_config:deep_get([rate_limit, max_conn_rate], Opts0) of
infinity -> Opts1;
Rate -> Opts1#{max_conn_rate => Rate}
end,
Opts3 = Opts2#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))},
maps:to_list(Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)}).
start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) ->
Start(ws_name(Name, ListenOn), with_port(ListenOn, RanchOpts), ProtoOpts).
mqtt_path(Options) ->
proplists:get_value(mqtt_path, Options, "/mqtt").
ws_opts(Options) ->
WsPaths = [{mqtt_path(Options), emqx_ws_connection, Options}],
ws_opts(ZoneName, ListenerName, Opts) ->
WsPaths = [{maps:get(mqtt_path, Opts, "/mqtt"), emqx_ws_connection,
#{zone => ZoneName, listener => ListenerName}}],
Dispatch = cowboy_router:compile([{'_', WsPaths}]),
ProxyProto = proplists:get_value(proxy_protocol, Options, false),
ProxyProto = maps:get(proxy_protocol, Opts, false),
#{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
ranch_opts(Options0) ->
Options = drop_tls13_for_old_otp(Options0),
NumAcceptors = proplists:get_value(acceptors, Options, 4),
MaxConnections = proplists:get_value(max_connections, Options, 1024),
TcpOptions = proplists:get_value(tcp_options, Options, []),
RanchOpts = #{num_acceptors => NumAcceptors,
ranch_opts(Opts) ->
NumAcceptors = maps:get(acceptors, Opts, 4),
MaxConnections = maps:get(max_connections, Opts, 1024),
#{num_acceptors => NumAcceptors,
max_connections => MaxConnections,
socket_opts => TcpOptions},
case proplists:get_value(ssl_options, Options) of
undefined -> RanchOpts;
SslOptions -> RanchOpts#{socket_opts => TcpOptions ++ SslOptions}
end.
handshake_timeout => maps:get(handshake_timeout, Opts, 15000),
socket_opts => case is_ssl(Opts) of
true -> tcp_opts(Opts) ++ proplists:delete(handshake_timeout, ssl_opts(Opts));
false -> tcp_opts(Opts)
end}.
with_port(Port, Opts = #{socket_opts := SocketOption}) when is_integer(Port) ->
Opts#{socket_opts => [{port, Port}| SocketOption]};
with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) ->
Opts#{socket_opts => [{ip, Addr}, {port, Port}| SocketOption]}.
esockd_access_rules(StrRules) ->
Access = fun(S) ->
[A, CIDR] = string:tokens(S, " "),
{list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end}
end,
[Access(R) || R <- StrRules].
%% @doc Restart all listeners
-spec(restart() -> ok).
restart() ->
lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
lists:foreach(fun({ZoneName, ZoneConf}) ->
lists:foreach(fun({LName, LConf}) ->
restart_listener(ZoneName, LName, LConf)
end, maps:to_list(maps:get(listeners, ZoneConf, #{})))
end, maps:to_list(emqx_config:get([zones], #{}))).
-spec(restart_listener(listener() | string() | binary()) -> ok | {error, any()}).
restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) ->
restart_listener(Proto, ListenOn, Options);
restart_listener(Identifier) ->
case emqx_listeners:find_by_id(Identifier) of
false -> {error, {no_such_listener, Identifier}};
Listener -> restart_listener(Listener)
-spec(restart_listener(atom()) -> ok | {error, any()}).
restart_listener(ListenerID) ->
{ZoneName, ListenerName} = decode_listener_id(ListenerID),
restart_listener(ZoneName, ListenerName,
emqx_config:get([zones, ZoneName, listeners, ListenerName])).
-spec(restart_listener(atom(), atom(), map()) -> ok | {error, any()}).
restart_listener(ZoneName, ListenerName, Conf) ->
case stop_listener(ZoneName, ListenerName, Conf) of
ok -> start_listener(ZoneName, ListenerName, Conf);
Error -> Error
end.
-spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) ->
ok | {error, any()}).
restart_listener(tcp, ListenOn, _Options) ->
esockd:reopen('mqtt:tcp', ListenOn);
restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls ->
esockd:reopen('mqtt:ssl', ListenOn);
restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
_ = cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)),
ok(start_listener(Proto, ListenOn, Options));
restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
_ = cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)),
ok(start_listener(Proto, ListenOn, Options));
restart_listener(Proto, ListenOn, _Opts) ->
esockd:reopen(Proto, ListenOn).
ok({ok, _}) -> ok;
ok(Other) -> Other.
%% @doc Stop all listeners.
-spec(stop() -> ok).
stop() ->
lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])).
lists:foreach(fun({ZoneName, ZoneConf}) ->
lists:foreach(fun({LName, LConf}) ->
stop_listener(ZoneName, LName, LConf)
end, maps:to_list(maps:get(listeners, ZoneConf, #{})))
end, maps:to_list(emqx_config:get([zones], #{}))).
-spec(stop_listener(listener()) -> ok | {error, term()}).
stop_listener(#{proto := Proto, listen_on := ListenOn, opts := Opts}) ->
stop_listener(Proto, ListenOn, Opts).
-spec(stop_listener(atom()) -> ok | {error, term()}).
stop_listener(ListenerID) ->
{ZoneName, ListenerName} = decode_listener_id(ListenerID),
stop_listener(ZoneName, ListenerName,
emqx_config:get([zones, ZoneName, listeners, ListenerName])).
-spec(stop_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
-> ok | {error, term()}).
stop_listener(tcp, ListenOn, _Opts) ->
esockd:close('mqtt:tcp', ListenOn);
stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls ->
esockd:close('mqtt:ssl', ListenOn);
stop_listener(Proto, ListenOn, _Opts) when Proto == http; Proto == ws ->
cowboy:stop_listener(ws_name('mqtt:ws', ListenOn));
stop_listener(Proto, ListenOn, _Opts) when Proto == https; Proto == wss ->
cowboy:stop_listener(ws_name('mqtt:wss', ListenOn));
stop_listener(Proto, ListenOn, _Opts) ->
esockd:close(Proto, ListenOn).
-spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}).
stop_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn}) ->
esockd:close(listener_id(ZoneName, ListenerName), ListenOn);
stop_listener(ZoneName, ListenerName, #{type := ws}) ->
cowboy:stop_listener(listener_id(ZoneName, ListenerName)).
merge_default(Options) ->
case lists:keytake(tcp_options, 1, Options) of
@ -256,23 +183,23 @@ format({Addr, Port}) when is_list(Addr) ->
format({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
ws_name(Name, {_Addr, Port}) ->
ws_name(Name, Port);
ws_name(Name, Port) ->
list_to_atom(lists:concat([Name, ":", Port])).
listener_id(ZoneName, ListenerName) ->
list_to_atom(lists:append([atom_to_list(ZoneName), ":", atom_to_list(ListenerName)])).
identifier(Proto, Name) when is_atom(Proto) ->
identifier(atom_to_list(Proto), Name);
identifier(Proto, Name) ->
iolist_to_binary(["mqtt", ":", Proto, ":", Name]).
find_by_listen_on(_ListenOn, []) -> false;
find_by_listen_on(ListenOn, [#{listen_on := ListenOn} = L | _]) -> L;
find_by_listen_on(ListenOn, [_ | Rest]) -> find_by_listen_on(ListenOn, Rest).
find_by_id(_Id, []) -> false;
find_by_id(Id, [L | Rest]) ->
case identifier(L) =:= Id of
true -> L;
false -> find_by_id(Id, Rest)
decode_listener_id(Id) ->
case string:split(atom_to_list(Id), ":", leading) of
[Zone, Listen] -> {list_to_atom(Zone), list_to_atom(Listen)};
_ -> error({invalid_listener_id, Id})
end.
ssl_opts(Opts) ->
maps:to_list(
emqx_tls_lib:drop_tls13_for_old_otp(
maps:without([enable],
maps:get(ssl, Opts, #{})))).
tcp_opts(Opts) ->
maps:to_list(maps:get(tcp, Opts, #{})).
is_ssl(Opts) ->
emqx_config:deep_get([ssl, enable], Opts, false).

View File

@ -20,6 +20,7 @@
-type comma_separated_atoms() :: [atom()].
-type bar_separated_list() :: list().
-type ip_port() :: tuple().
-type cipher() :: map().
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
-typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}).
@ -30,6 +31,7 @@
-typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}).
-typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}).
-typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}).
-typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}).
-typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}).
% workaround: prevent being recognized as unused functions
@ -37,6 +39,7 @@
to_bytesize/1, to_wordsize/1,
to_percent/1, to_comma_separated_list/1,
to_bar_separated_list/1, to_ip_port/1,
to_erl_cipher_suite/1,
to_comma_separated_atoms/1]).
-behaviour(hocon_schema).
@ -44,18 +47,19 @@
-reflect_type([ log_level/0, duration/0, duration_s/0, duration_ms/0,
bytesize/0, wordsize/0, percent/0, file/0,
comma_separated_list/0, bar_separated_list/0, ip_port/0,
cipher/0,
comma_separated_atoms/0]).
-export([structs/0, fields/1, translations/0, translation/1]).
-export([t/1, t/3, t/4, ref/1]).
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([ssl/1, tr_ssl/2, tr_password_hash/2]).
-export([ssl/1]).
%% will be used by emqx_ct_helper to find the dependent apps
-export([includes/0]).
structs() -> ["cluster", "node", "rpc", "log", "lager",
"acl", "mqtt", "zone", "listeners", "module", "broker",
"acl", "mqtt", "zones", "listeners", "module", "broker",
"plugins", "sysmon", "alarm", "telemetry"]
++ includes().
@ -271,7 +275,7 @@ fields("mqtt") ->
, {"peer_cert_as_clientid", maybe_disabled(union([cn, dn, crt, pem, md5]))}
];
fields("zone") ->
fields("zones") ->
[ {"$name", ref("zone_settings")}];
fields("zone_settings") ->
@ -368,14 +372,14 @@ fields("ws_opts") ->
"mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5")}
, {"check_origin_enable", t(boolean(), undefined, false)}
, {"allow_origin_absence", t(boolean(), undefined, true)}
, {"check_origins", t(comma_separated_list())}
, {"check_origins", t(hoconsc:array(binary()), undefined, [])}
, {"proxy_address_header", t(string(), undefined, "x-forwarded-for")}
, {"proxy_port_header", t(string(), undefined, "x-forwarded-port")}
, {"deflate_opts", ref("deflate_opts")}
];
fields("tcp_opts") ->
[ {"active_n", t(integer(), undefined, 100)}
[ {"active", t(integer(), undefined, 100)}
, {"backlog", t(integer(), undefined, 1024)}
, {"send_timeout", t(duration(), undefined, "15s")}
, {"send_timeout_close", t(boolean(), undefined, true)}
@ -391,7 +395,9 @@ fields("tcp_opts") ->
fields("ssl_opts") ->
ssl(#{handshake_timeout => "15s"
, depth => 10
, reuse_sessions => true});
, reuse_sessions => true
, versions => default_tls_vsns()
});
fields("deflate_opts") ->
[ {"level", t(union([none, default, best_compression, best_speed]))}
@ -643,72 +649,19 @@ ssl(Defaults) ->
, {"dhfile", t(string(), undefined, D("dhfile"))}
, {"server_name_indication", t(union(disable, string()), undefined,
D("server_name_indication"))}
, {"tls_versions", t(comma_separated_list(), undefined, D("tls_versions"))}
, {"ciphers", t(comma_separated_list(), undefined, D("ciphers"))}
, {"psk_ciphers", t(comma_separated_list(), undefined, D("ciphers"))}].
tr_ssl(Field, Conf) ->
Versions = case conf_get([Field, "tls_versions"], Conf) of
undefined -> undefined;
Vs -> [list_to_existing_atom(V) || V <- Vs]
end,
TLSCiphers = conf_get([Field, "ciphers"], Conf),
PSKCiphers = conf_get([Field, "psk_ciphers"], Conf),
Ciphers = ciphers(TLSCiphers, PSKCiphers, Field),
case emqx_schema:conf_get([Field, "enable"], Conf) of
X when X =:= true orelse X =:= undefined ->
filter([{versions, Versions},
{ciphers, Ciphers},
{user_lookup_fun, user_lookup_fun(PSKCiphers)},
{handshake_timeout, conf_get([Field, "handshake_timeout"], Conf)},
{depth, conf_get([Field, "depth"], Conf)},
{password, conf_get([Field, "key_password"], Conf)},
{dhfile, conf_get([Field, "dhfile"], Conf)},
{keyfile, emqx_schema:conf_get([Field, "keyfile"], Conf)},
{certfile, emqx_schema:conf_get([Field, "certfile"], Conf)},
{cacertfile, emqx_schema:conf_get([Field, "cacertfile"], Conf)},
{verify, emqx_schema:conf_get([Field, "verify"], Conf)},
{fail_if_no_peer_cert, conf_get([Field, "fail_if_no_peer_cert"], Conf)},
{secure_renegotiate, conf_get([Field, "secure_renegotiate"], Conf)},
{reuse_sessions, conf_get([Field, "reuse_sessions"], Conf)},
{honor_cipher_order, conf_get([Field, "honor_cipher_order"], Conf)},
{server_name_indication, emqx_schema:conf_get([Field, "server_name_indication"], Conf)}
]);
_ ->
[]
end.
map_psk_ciphers(PSKCiphers) ->
lists:map(
fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha};
("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha};
("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha};
("PSK-RC4-SHA") -> {psk, rc4_128, sha}
end, PSKCiphers).
ciphers(undefined, undefined, _) ->
undefined;
ciphers(TLSCiphers, undefined, _) ->
TLSCiphers;
ciphers(undefined, PSKCiphers, _) ->
map_psk_ciphers(PSKCiphers);
ciphers(_, _, Field) ->
error(Field ++ ".ciphers and " ++ Field ++ ".psk_ciphers cannot be configured at the same time").
user_lookup_fun(undefined) ->
undefined;
user_lookup_fun(_PSKCiphers) ->
{fun emqx_psk:lookup/3, <<>>}.
tr_password_hash(Field, Conf) ->
case emqx_schema:conf_get([Field, "password_hash"], Conf) of
[Hash] -> list_to_atom(Hash);
[Prefix, Suffix] -> {list_to_atom(Prefix), list_to_atom(Suffix)};
[Hash, MacFun, Iterations, Dklen] -> {list_to_atom(Hash), list_to_atom(MacFun),
list_to_integer(Iterations), list_to_integer(Dklen)};
_ -> plain
end.
, {"versions", #{ type => list(atom())
, default => maps:get(versions, Defaults, default_tls_vsns())
, converter => fun (Vsns) -> [tls_vsn(V) || V <- Vsns] end
}}
, {"ciphers", t(hoconsc:array(string()), undefined, D("ciphers"))}
, {"user_lookup_fun", t(any(), undefined, {fun emqx_psk:lookup/3, <<>>})}
].
default_tls_vsns() -> [<<"tlsv1.3">>, <<"tlsv1.2">>, <<"tlsv1.1">>, <<"tlsv1">>].
tls_vsn(<<"tlsv1.3">>) -> 'tlsv1.3';
tls_vsn(<<"tlsv1.2">>) -> 'tlsv1.2';
tls_vsn(<<"tlsv1.1">>) -> 'tlsv1.1';
tls_vsn(<<"tlsv1">>) -> 'tlsv1'.
%% @private return a list of keys in a parent field
-spec(keys(string(), hocon:config()) -> [string()]).
@ -814,3 +767,9 @@ to_ip_port(Str) ->
end;
_ -> {error, Str}
end.
to_erl_cipher_suite(Str) ->
case ssl:str_to_suite(Str) of
{error, Reason} -> error({invalid_cipher, Reason});
Cipher -> Cipher
end.

View File

@ -161,17 +161,16 @@ drop_tls13_for_old_otp(SslOpts) ->
, "TLS_AES_128_CCM_8_SHA256"
]).
drop_tls13(SslOpts0) ->
SslOpts1 = case proplists:get_value(versions, SslOpts0) of
undefined -> SslOpts0;
Vsns -> replace(SslOpts0, versions, Vsns -- ['tlsv1.3'])
SslOpts1 = case maps:find(versions, SslOpts0) of
error -> SslOpts0;
{ok, Vsns} -> SslOpts0#{versions => (Vsns -- ['tlsv1.3'])}
end,
case proplists:get_value(ciphers, SslOpts1) of
undefined -> SslOpts1;
Ciphers -> replace(SslOpts1, ciphers, Ciphers -- ?TLSV13_EXCLUSIVE_CIPHERS)
case maps:find(ciphers, SslOpts1) of
error -> SslOpts1;
{ok, Ciphers} ->
SslOpts1#{ciphers => Ciphers -- ?TLSV13_EXCLUSIVE_CIPHERS}
end.
replace(Opts, Key, Value) -> [{Key, Value} | proplists:delete(Key, Opts)].
-if(?OTP_RELEASE > 22).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -181,13 +180,13 @@ drop_tls13_test() ->
?assert(lists:member('tlsv1.3', Versions)),
Ciphers = default_ciphers(),
?assert(has_tlsv13_cipher(Ciphers)),
Opts0 = [{versions, Versions}, {ciphers, Ciphers}, other, {bool, true}],
Opts0 = #{versions => Versions, ciphers => Ciphers, other => true},
Opts = drop_tls13(Opts0),
?assertNot(lists:member('tlsv1.3', proplists:get_value(versions, Opts))),
?assertNot(has_tlsv13_cipher(proplists:get_value(ciphers, Opts))).
?assertNot(lists:member('tlsv1.3', maps:get(versions, Opts, undefined))),
?assertNot(has_tlsv13_cipher(maps:get(ciphers, Opts, undefined))).
drop_tls13_no_versions_cipers_test() ->
Opts0 = [other, {bool, true}],
Opts0 = #{other => 0, bool => true},
Opts = drop_tls13(Opts0),
?_assertEqual(Opts0, Opts).

View File

@ -62,8 +62,8 @@
sockname :: emqx_types:peername(),
%% Sock state
sockstate :: emqx_types:sockstate(),
%% Simulate the active_n opt
active_n :: pos_integer(),
%% Simulate the active opt
active :: pos_integer(),
%% MQTT Piggyback
mqtt_piggyback :: single | multiple,
%% Limiter
@ -93,7 +93,7 @@
-type(ws_cmd() :: {active, boolean()}|close).
-define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
@ -124,7 +124,7 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname;
info(sockstate, #state{sockstate = SockSt}) ->
SockSt;
info(active_n, #state{active_n = ActiveN}) ->
info(active, #state{active = ActiveN}) ->
ActiveN;
info(limiter, #state{limiter = Limiter}) ->
maybe_apply(fun emqx_limiter:info/1, Limiter);
@ -293,7 +293,7 @@ websocket_init([Req, Opts]) ->
BytesIn = proplists:get_value(rate_limit, Opts),
RateLimit = emqx_zone:ratelimit(Zone),
Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit),
ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
ActiveN = proplists:get_value(active, Opts, ?ACTIVE_N),
MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple),
FrameOpts = emqx_zone:mqtt_frame_options(Zone),
ParseState = emqx_frame:initial_parse_state(FrameOpts),
@ -309,7 +309,7 @@ websocket_init([Req, Opts]) ->
{ok, #state{peername = Peername,
sockname = Sockname,
sockstate = running,
active_n = ActiveN,
active = ActiveN,
mqtt_piggyback = MQTTPiggyback,
limiter = Limiter,
parse_state = ParseState,
@ -372,7 +372,7 @@ websocket_info({check_gc, Stats}, State) ->
return(check_oom(run_gc(Stats, State)));
websocket_info(Deliver = {deliver, _Topic, _Msg},
State = #state{active_n = ActiveN}) ->
State = #state{active = ActiveN}) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
@ -551,7 +551,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
%% Handle incoming packet
%%--------------------------------------------------------------------
handle_incoming(Packet, State = #state{active_n = ActiveN})
handle_incoming(Packet, State = #state{active = ActiveN})
when is_record(Packet, mqtt_packet) ->
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
ok = inc_incoming_stats(Packet),
@ -586,7 +586,7 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
%% Handle outgoing packets
%%--------------------------------------------------------------------
handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQTTPiggyback}) ->
handle_outgoing(Packets, State = #state{active = ActiveN, mqtt_piggyback = MQTTPiggyback}) ->
IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
Oct = iolist_size(IoData),
ok = inc_sent_stats(length(Packets), Oct),

View File

@ -120,7 +120,7 @@ t_info(_) ->
end
end),
#{sockinfo := SockInfo} = emqx_connection:info(CPid),
?assertMatch(#{active_n := 100,
?assertMatch(#{active := 100,
peername := {{127,0,0,1},3456},
sockname := {{127,0,0,1},1883},
sockstate := idle,
@ -219,8 +219,8 @@ t_handle_msg_deliver(_) ->
t_handle_msg_inet_reply(_) ->
ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end),
?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))),
?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))),
?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active => 0}))),
?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active => 100}))),
?assertMatch({stop, {shutdown, for_testing}, _St},
handle_msg({inet_reply, for_testing, {error, for_testing}}, st())).
@ -386,7 +386,7 @@ t_start_link_exit_on_activate(_) ->
t_get_conn_info(_) ->
with_conn(fun(CPid) ->
#{sockinfo := SockInfo} = emqx_connection:info(CPid),
?assertEqual(#{active_n => 100,
?assertEqual(#{active => 100,
peername => {{127,0,0,1},3456},
sockname => {{127,0,0,1},1883},
sockstate => running,

View File

@ -265,7 +265,7 @@ t_connect_idle_timeout(_) ->
t_connect_limit_timeout(_) ->
ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]),
meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1;
meck:expect(proplists, get_value, fun(active, _Options, _Default) -> 1;
(Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3])
end),

View File

@ -118,7 +118,7 @@ t_info(_) ->
end),
#{sockinfo := SockInfo} = ?ws_conn:call(WsPid, info),
#{socktype := ws,
active_n := 100,
active := 100,
peername := {{127,0,0,1}, 3456},
sockname := {{127,0,0,1}, 18083},
sockstate := running

View File

@ -303,7 +303,7 @@ sockinfo(#state{peername = Peername}) ->
peername => Peername,
sockname => {{127, 0, 0, 1}, 5683}, %% FIXME: Sock?
sockstate => running,
active_n => 1
active => 1
}.
%% copies from emqx_channel:info/1

View File

@ -49,7 +49,7 @@ exproto.listener.protoname.max_conn_rate = 1000
## Specify the {active, N} option for the external MQTT/TCP Socket.
##
## Value: Number
exproto.listener.protoname.active_n = 100
exproto.listener.protoname.active = 100
## Idle timeout
##

View File

@ -78,7 +78,7 @@ end}.
{datatype, integer}
]}.
{mapping, "exproto.listener.$proto.active_n", "emqx_exproto.listeners", [
{mapping, "exproto.listener.$proto.active", "emqx_exproto.listeners", [
{default, 100},
{datatype, integer}
]}.
@ -250,7 +250,7 @@ end}.
end,
ConnOpts = fun(Prefix) ->
Filter([{active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)},
Filter([{active, cuttlefish:conf_get(Prefix ++ ".active", Conf, undefined)},
{idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}])
end,

View File

@ -61,7 +61,7 @@
%% Sock State
sockstate :: emqx_types:sockstate(),
%% The {active, N} option
active_n :: pos_integer(),
active :: pos_integer(),
%% BACKW: e4.2.0-e4.2.1
%% We should remove it
sendfun :: function() | undefined,
@ -84,7 +84,7 @@
-type(state() :: #state{}).
-define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -137,7 +137,7 @@ info(sockname, #state{sockname = Sockname}) ->
Sockname;
info(sockstate, #state{sockstate = SockSt}) ->
SockSt;
info(active_n, #state{active_n = ActiveN}) ->
info(active, #state{active = ActiveN}) ->
ActiveN.
-spec(stats(pid()|state()) -> emqx_types:stats()).
@ -240,7 +240,7 @@ init_state(WrappedSock, Peername, Options) ->
conn_mod => ?MODULE
},
ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
ActiveN = proplists:get_value(active, Options, ?ACTIVE_N),
%% FIXME:
%%Limiter = emqx_limiter:init(Options),
@ -255,7 +255,7 @@ init_state(WrappedSock, Peername, Options) ->
peername = Peername,
sockname = Sockname,
sockstate = idle,
active_n = ActiveN,
active = ActiveN,
sendfun = undefined,
limiter = undefined,
channel = Channel,
@ -403,13 +403,13 @@ handle_msg({Passive, _Sock}, State)
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
State = #state{active_n = ActiveN}) ->
State = #state{active = ActiveN}) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
%% TODO: Who will deliver this message?
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
handle_msg({inet_reply, _Sock, ok}, State = #state{active = ActiveN}) ->
case emqx_pd:get_counter(outgoing_pkt) > ActiveN of
true ->
Pubs = emqx_pd:reset_counter(outgoing_pkt),
@ -652,7 +652,7 @@ activate_socket(State = #state{sockstate = closed}) ->
activate_socket(State = #state{sockstate = blocked}) ->
{ok, State};
activate_socket(State = #state{socket = Socket,
active_n = N}) ->
active = N}) ->
%% FIXME: Works on dtls/udp ???
%% How to hanlde buffer?
case esockd_setopts(Socket, [{active, N}]) of

View File

@ -459,7 +459,7 @@ sockinfo(#lwm2m_state{peername = Peername}) ->
peername => Peername,
sockname => {{127,0,0,1}, 5683}, %% FIXME: Sock?
sockstate => running,
active_n => 1
active => 1
}.
%% copies from emqx_channel:info/1

View File

@ -97,7 +97,7 @@
pending_topic_ids = #{} :: pending_msgs()
}).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).