From 8b3fcde380f60139d4ef422b7d64c602a74fdd8e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 2 Jul 2021 22:17:22 +0800 Subject: [PATCH] feat(config): make the listeners up --- apps/emqx/etc/emqx.conf | 233 +++++++------- apps/emqx/etc/emqx.conf.old | 10 +- apps/emqx/src/emqx_connection.erl | 16 +- apps/emqx/src/emqx_listeners.erl | 291 +++++++----------- apps/emqx/src/emqx_schema.erl | 101 ++---- apps/emqx/src/emqx_tls_lib.erl | 23 +- apps/emqx/src/emqx_ws_connection.erl | 18 +- apps/emqx/test/emqx_connection_SUITE.erl | 8 +- .../emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 2 +- apps/emqx/test/emqx_ws_connection_SUITE.erl | 2 +- apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl | 2 +- apps/emqx_exproto/etc/emqx_exproto.conf | 2 +- apps/emqx_exproto/priv/emqx_exproto.schema | 4 +- apps/emqx_exproto/src/emqx_exproto_conn.erl | 16 +- apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl | 2 +- apps/emqx_sn/src/emqx_sn_gateway.erl | 2 +- 16 files changed, 305 insertions(+), 427 deletions(-) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 3ea51ec3f..0dcb7285e 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -817,25 +817,25 @@ broker { ## - `conn_congestion.*` ## - `overall_max_connections` ## -## Syntax: zone. {} -zone.default { +## Syntax: zones. {} +zones.default { ## Enable authentication ## - ## @doc zone..auth.enable + ## @doc zones..auth.enable ## ValueType: Boolean ## Default: false auth.enable: false ## Enable per connection statistics. ## - ## @doc zone..stats.enable + ## @doc zones..stats.enable ## ValueType: Boolean ## Default: true stats.enable: true ## Maximum number of concurrent connections. ## - ## @doc zone..listeners..overall_max_connections + ## @doc zones..overall_max_connections ## ValueType: Number | infinity ## Default: infinity overall_max_connections: infinity @@ -846,7 +846,7 @@ zone.default { ## is delivered to the subscriber. The mountpoint is a way that users can use ## to implement isolation of message routing between different listeners. ## - ## For example if a clientA subscribes to "t" with `zone..mqtt.mountpoint` + ## For example if a clientA subscribes to "t" with `zones..mqtt.mountpoint` ## set to "some_tenant", then the client accually subscribes to the topic ## "some_tenant/t". Similarly if another clientB (connected to the same listener ## with the clientA) send a message to topic "t", the message is accually route @@ -859,7 +859,7 @@ zone.default { ## - %c: clientid ## - %u: username ## - ## @doc zone..listeners..mountpoint + ## @doc zones..listeners..mountpoint ## ValueType: String ## Default: "" mountpoint: "" @@ -868,21 +868,21 @@ zone.default { ## TCP connection is established but MQTT CONNECT has not been ## received. ## - ## @doc zone..mqtt.idle_timeout + ## @doc zones..mqtt.idle_timeout ## ValueType: Duration | infinity ## Default: 15s idle_timeout: 15s ## Maximum MQTT packet size allowed. ## - ## @doc zone..mqtt.max_packet_size + ## @doc zones..mqtt.max_packet_size ## ValueType: Bytes | infinity ## Default: 1MB max_packet_size: 1MB ## Maximum length of MQTT clientId allowed. ## - ## @doc zone..mqtt.max_clientid_len + ## @doc zones..mqtt.max_clientid_len ## ValueType: Integer ## Range: [23, 65535] ## Default: 65535 @@ -890,7 +890,7 @@ zone.default { ## Maximum topic levels allowed. ## - ## @doc zone..mqtt.max_topic_levels + ## @doc zones..mqtt.max_topic_levels ## ValueType: Integer ## Range: [1, 65535] ## Default: 65535 @@ -898,14 +898,14 @@ zone.default { ## Maximum QoS allowed. ## - ## @doc zone..mqtt.max_qos_allowed + ## @doc zones..mqtt.max_qos_allowed ## ValueType: 0 | 1 | 2 ## Default: 2 max_qos_allowed: 2 ## Maximum Topic Alias, 0 means no topic alias supported. ## - ## @doc zone..mqtt.max_topic_alias + ## @doc zones..mqtt.max_topic_alias ## ValueType: Integer ## Range: [0, 65535] ## Default: 65535 @@ -913,35 +913,35 @@ zone.default { ## Whether the Server supports MQTT retained messages. ## - ## @doc zone..mqtt.retain_available + ## @doc zones..mqtt.retain_available ## ValueType: Boolean ## Default: true retain_available: true ## Whether the Server supports MQTT Wildcard Subscriptions ## - ## @doc zone..mqtt.wildcard_subscription + ## @doc zones..mqtt.wildcard_subscription ## ValueType: Boolean ## Default: true wildcard_subscription: true ## Whether the Server supports MQTT Shared Subscriptions. ## - ## @doc zone..mqtt.shared_subscription + ## @doc zones..mqtt.shared_subscription ## ValueType: Boolean ## Default: true shared_subscription: true ## Whether to ignore loop delivery of messages.(for mqtt v3.1.1) ## - ## @doc zone..mqtt.ignore_loop_deliver + ## @doc zones..mqtt.ignore_loop_deliver ## ValueType: Boolean ## Default: false ignore_loop_deliver: false ## Whether to parse the MQTT frame in strict mode ## - ## @doc zone..mqtt.strict_mode + ## @doc zones..mqtt.strict_mode ## ValueType: Boolean ## Default: false strict_mode: false @@ -950,14 +950,14 @@ zone.default { ## ## This feature is disabled if not set ## - ## @doc zone..mqtt.response_information + ## @doc zones..mqtt.response_information ## ValueType: String ## Default: "" response_information: "" ## Server Keep Alive of MQTT 5.0 ## - ## @doc zone..mqtt.server_keepalive + ## @doc zones..mqtt.server_keepalive ## ValueType: Number | disabled ## Default: disabled server_keepalive: disabled @@ -965,7 +965,7 @@ zone.default { ## The backoff for MQTT keepalive timeout. The broker will kick a connection out ## until 'Keepalive * backoff * 2' timeout. ## - ## @doc zone..mqtt.keepalive_backoff + ## @doc zones..mqtt.keepalive_backoff ## ValueType: Float ## Range: (0.5, 1] ## Default: 0.75 @@ -973,7 +973,7 @@ zone.default { ## Maximum number of subscriptions allowed. ## - ## @doc zone..mqtt.max_subscriptions + ## @doc zones..mqtt.max_subscriptions ## ValueType: Integer | infinity ## Range: [1, ) ## Default: infinity @@ -981,14 +981,14 @@ zone.default { ## Force to upgrade QoS according to subscription. ## - ## @doc zone..mqtt.upgrade_qos + ## @doc zones..mqtt.upgrade_qos ## ValueType: Boolean ## Default: false upgrade_qos: false ## Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked. ## - ## @doc zone..mqtt.max_inflight + ## @doc zones..mqtt.max_inflight ## ValueType: Integer ## Range: [1, 65535] ## Default: 32 @@ -996,14 +996,14 @@ zone.default { ## Retry interval for QoS1/2 message delivering. ## - ## @doc zone..mqtt.retry_interval + ## @doc zones..mqtt.retry_interval ## ValueType: Duration ## Default: 30s retry_interval: 30s ## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL. ## - ## @doc zone..mqtt.max_awaiting_rel + ## @doc zones..mqtt.max_awaiting_rel ## ValueType: Integer | infinity ## Range: [1, ) ## Default: 100 @@ -1011,14 +1011,14 @@ zone.default { ## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout. ## - ## @doc zone..mqtt.await_rel_timeout + ## @doc zones..mqtt.await_rel_timeout ## ValueType: Duration ## Default: 300s await_rel_timeout: 300s ## Default session expiry interval for MQTT V3.1.1 connections. ## - ## @doc zone..mqtt.session_expiry_interval + ## @doc zones..mqtt.session_expiry_interval ## ValueType: Duration ## Default: 2h session_expiry_interval: 2h @@ -1026,7 +1026,7 @@ zone.default { ## Maximum queue length. Enqueued messages when persistent client disconnected, ## or inflight window is full. ## - ## @doc zone..mqtt.max_mqueue_len + ## @doc zones..mqtt.max_mqueue_len ## ValueType: Integer | infinity ## Range: [0, ) ## Default: 1000 @@ -1042,7 +1042,7 @@ zone.default { ## either highest or lowest priority depending on the configured ## value for mqtt.mqueue_default_priority ## - ## @doc zone..mqtt.mqueue_priorities + ## @doc zones..mqtt.mqueue_priorities ## ValueType: Array ## Examples: ## To configure "t/1" > "t/2" > "t/3": @@ -1052,21 +1052,21 @@ zone.default { ## Default to highest priority for topics not matching priority table ## - ## @doc zone..mqtt.mqueue_default_priority + ## @doc zones..mqtt.mqueue_default_priority ## ValueType: highest | lowest ## Default: highest mqueue_default_priority: highest ## Whether to enqueue QoS0 messages. ## - ## @doc zone..mqtt.mqueue_store_qos0 + ## @doc zones..mqtt.mqueue_store_qos0 ## ValueType: Boolean ## Default: true mqueue_store_qos0: true ## Whether use username replace client id ## - ## @doc zone..mqtt.use_username_as_clientid + ## @doc zones..mqtt.use_username_as_clientid ## ValueType: Boolean ## Default: false use_username_as_clientid: false @@ -1074,7 +1074,7 @@ zone.default { ## Use the CN, DN or CRT field from the client certificate as a username. ## Only works for SSL connection. ## - ## @doc zone..mqtt.peer_cert_as_username + ## @doc zones..mqtt.peer_cert_as_username ## ValueType: cn | dn | crt | disabled ## Default: disabled peer_cert_as_username: disabled @@ -1082,7 +1082,7 @@ zone.default { ## Use the CN, DN or CRT field from the client certificate as a clientid. ## Only works for SSL connection. ## - ## @doc zone..mqtt.peer_cert_as_clientid + ## @doc zones..mqtt.peer_cert_as_clientid ## ValueType: cn | dn | crt | disabled ## Default: disabled peer_cert_as_clientid: disabled @@ -1093,14 +1093,14 @@ zone.default { ## Enable ACL check. ## - ## @doc zone..acl.enable + ## @doc zones..acl.enable ## ValueType: Boolean ## Default: false enable: false ## The action when acl check reject current operation ## - ## @doc zone..acl.deny_action + ## @doc zones..acl.deny_action ## ValueType: ignore | disconnect ## Default: ignore deny_action: ignore @@ -1109,14 +1109,14 @@ zone.default { ## ## If enabled, ACLs roles for each client will be cached in the memory ## - ## @doc zone..acl.cache.enable + ## @doc zones..acl.cache.enable ## ValueType: Boolean ## Default: true cache.enable: true ## The maximum count of ACL entries can be cached for a client. ## - ## @doc zone..acl.cache.max_size + ## @doc zones..acl.cache.max_size ## ValueType: Integer ## Range: [0, 1048576] ## Default: 32 @@ -1124,7 +1124,7 @@ zone.default { ## The time after which an ACL cache entry will be deleted ## - ## @doc zone..acl.cache.ttl + ## @doc zones..acl.cache.ttl ## ValueType: Duration ## Default: 1m cache.ttl: 1m @@ -1138,28 +1138,28 @@ zone.default { ## After the limit is reached, successive CONNECT requests are forbidden ## (banned) until the end of the time period defined by `ban_time`. ## - ## @doc zone..flapping_detect.enable + ## @doc zones..flapping_detect.enable ## ValueType: Boolean ## Default: true enable: true ## The max disconnect allowed of a MQTT Client in `window_time` ## - ## @doc zone..flapping_detect.max_count + ## @doc zones..flapping_detect.max_count ## ValueType: Integer ## Default: 15 max_count: 15 ## The time window for flapping detect ## - ## @doc zone..flapping_detect.window_time + ## @doc zones..flapping_detect.window_time ## ValueType: Duration ## Default: 1m window_time: 1m ## How long the clientid will be banned ## - ## @doc zone..flapping_detect.ban_time + ## @doc zones..flapping_detect.ban_time ## ValueType: Duration ## Default: 5m ban_time: 5m @@ -1169,13 +1169,13 @@ zone.default { force_shutdown: { ## Enable force_shutdown ## - ## @doc zone..force_shutdown.enable + ## @doc zones..force_shutdown.enable ## ValueType: Boolean ## Default: true enable: true ## Max message queue length - ## @doc zone..force_shutdown.max_message_queue_len + ## @doc zones..force_shutdown.max_message_queue_len ## ValueType: Integer ## Range: (0, ) ## Default: 1000 @@ -1183,7 +1183,7 @@ zone.default { ## Total heap size ## - ## @doc zone..force_shutdown.max_heap_size + ## @doc zones..force_shutdown.max_heap_size ## ValueType: Size ## Default: 32MB max_heap_size: 32MB @@ -1193,13 +1193,13 @@ zone.default { ## Force the MQTT connection process GC after this number of ## messages or bytes passed through. ## - ## @doc zone..force_gc.enable + ## @doc zones..force_gc.enable ## ValueType: Boolean ## Default: true enable: true ## GC the process after how many messages received - ## @doc zone..force_gc.max_message_queue_len + ## @doc zones..force_gc.max_message_queue_len ## ValueType: Integer ## Range: (0, ) ## Default: 16000 @@ -1207,7 +1207,7 @@ zone.default { ## GC the process after how much bytes passed through ## - ## @doc zone..force_gc.bytes + ## @doc zones..force_gc.bytes ## ValueType: Size ## Default: 16MB bytes: 16MB @@ -1231,7 +1231,7 @@ zone.default { ## Where the is the client-id of the congested MQTT connection. ## And the is the username or "unknown_user" of not provided by the client. ## - ## @doc zone..conn_congestion.enable_alarm + ## @doc zones..conn_congestion.enable_alarm ## ValueType: Boolean ## Default: true enable_alarm: true @@ -1243,7 +1243,7 @@ zone.default { ## ## This is to avoid clearing and sending the alarm again too often. ## - ## @doc zone..conn_congestion.min_alarm_sustain_duration + ## @doc zones..conn_congestion.min_alarm_sustain_duration ## ValueType: Duration ## Default: 1m min_alarm_sustain_duration: 1m @@ -1256,7 +1256,7 @@ zone.default { ## The type of the listener. ## - ## @doc zone..listeners..type + ## @doc zones..listeners..type ## ValueType: tcp | ws ## - tcp: MQTT over TCP ## - ws: MQTT over Websocket @@ -1265,7 +1265,7 @@ zone.default { ## The IP address and port that the listener will bind. ## - ## @doc zone..listeners..bind + ## @doc zones..listeners..bind ## ValueType: IPAddress | Port | IPAddrPort ## Required: true ## Examples: 1883, 127.0.0.1:1883, ::1:1883 @@ -1273,14 +1273,14 @@ zone.default { ## The size of the acceptor pool for this listener. ## - ## @doc zone..listeners..acceptors + ## @doc zones..listeners..acceptors ## ValueType: Number ## Default: 16 acceptors: 16 ## Maximum number of concurrent connections. ## - ## @doc zone..listeners..max_connections + ## @doc zones..listeners..max_connections ## ValueType: Number | infinity ## Default: infinity max_connections: 1024000 @@ -1289,7 +1289,7 @@ zone.default { ## ## See: https://github.com/emqtt/esockd#allowdeny ## - ## @doc zone..listeners..access_rules + ## @doc zones..listeners..access_rules ## ValueType: Array ## Default: [] ## Examples: @@ -1306,7 +1306,7 @@ zone.default { ## ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ ## - ## @doc zone..listeners..proxy_protocol + ## @doc zones..listeners..proxy_protocol ## ValueType: Boolean ## Default: false proxy_protocol: false @@ -1314,7 +1314,7 @@ zone.default { ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection ## if no proxy protocol packet recevied within the timeout. ## - ## @doc zone..listeners..proxy_protocol_timeout + ## @doc zones..listeners..proxy_protocol_timeout ## ValueType: Duration ## Default: 3s proxy_protocol_timeout: 3s @@ -1322,7 +1322,7 @@ zone.default { rate_limit { ## Maximum connections per second. ## - ## @doc zone..max_conn_rate + ## @doc zones..max_conn_rate ## ValueType: Number | infinity ## Default: 1000 ## Examples: @@ -1331,7 +1331,7 @@ zone.default { ## Message limit for the a external MQTT connection. ## - ## @doc zone..rate_limit.conn_messages_in + ## @doc zones..rate_limit.conn_messages_in ## ValueType: String | infinity ## Default: infinity ## Examples: 100 messages per 10 seconds. @@ -1344,7 +1344,7 @@ zone.default { ## The connection won't accept more messages if the messages come ## faster than the limit. ## - ## @doc zone..rate_limit.conn_bytes_in + ## @doc zones..rate_limit.conn_bytes_in ## ValueType: String | infinity ## Default: infinity ## Examples: 100KB incoming per 10 seconds. @@ -1355,7 +1355,7 @@ zone.default { ## Messages quota for the each of external MQTT connection. ## This value consumed by the number of recipient on a message. ## - ## @doc zone..rate_limit.quota.conn_messages_routing + ## @doc zones..rate_limit.quota.conn_messages_routing ## ValueType: String | infinity ## Default: infinity ## Examples: 100 messaegs per 1s: @@ -1365,7 +1365,7 @@ zone.default { ## Messages quota for the all of external MQTT connections. ## This value consumed by the number of recipient on a message. ## - ## @doc zone..rate_limit.quota.overall_messages_routing + ## @doc zones..rate_limit.quota.overall_messages_routing ## ValueType: String | infinity ## Default: infinity ## Examples: 200000 messages per 1s: @@ -1387,7 +1387,7 @@ zone.default { ## The type of the listener. ## - ## @doc zone..listeners..type + ## @doc zones..listeners..type ## ValueType: tcp | ws ## - tcp: MQTT over TCP ## - ws: MQTT over Websocket @@ -1396,7 +1396,7 @@ zone.default { ## The IP address and port that the listener will bind. ## - ## @doc zone..listeners..bind + ## @doc zones..listeners..bind ## ValueType: IPAddress | Port | IPAddrPort ## Required: true ## Examples: 8883, 127.0.0.1:8883, ::1:8883 @@ -1404,14 +1404,14 @@ zone.default { ## The size of the acceptor pool for this listener. ## - ## @doc zone..listeners..acceptors + ## @doc zones..listeners..acceptors ## ValueType: Number ## Default: 16 acceptors: 16 ## Maximum number of concurrent connections. ## - ## @doc zone..listeners..max_connections + ## @doc zones..listeners..max_connections ## ValueType: Number | infinity ## Default: infinity max_connections: 512000 @@ -1420,7 +1420,7 @@ zone.default { ## ## See: https://github.com/emqtt/esockd#allowdeny ## - ## @doc zone..listeners..access_rules + ## @doc zones..listeners..access_rules ## ValueType: Array ## Default: [] ## Examples: @@ -1437,15 +1437,15 @@ zone.default { ## ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ ## - ## @doc zone..listeners..proxy_protocol + ## @doc zones..listeners..proxy_protocol ## ValueType: Boolean ## Default: true - proxy_protocol: true + proxy_protocol: false ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection ## if no proxy protocol packet recevied within the timeout. ## - ## @doc zone..listeners..proxy_protocol_timeout + ## @doc zones..listeners..proxy_protocol_timeout ## ValueType: Duration ## Default: 3s proxy_protocol_timeout: 3s @@ -1453,7 +1453,7 @@ zone.default { rate_limit { ## Maximum connections per second. ## - ## @doc zone..max_conn_rate + ## @doc zones..max_conn_rate ## ValueType: Number | infinity ## Default: 1000 ## Examples: @@ -1462,7 +1462,7 @@ zone.default { ## Message limit for the a external MQTT connection. ## - ## @doc zone..rate_limit.conn_messages_in + ## @doc zones..rate_limit.conn_messages_in ## ValueType: String | infinity ## Default: infinity ## Examples: 100 messages per 10 seconds. @@ -1475,7 +1475,7 @@ zone.default { ## The connection won't accept more messages if the messages come ## faster than the limit. ## - ## @doc zone..rate_limit.conn_bytes_in + ## @doc zones..rate_limit.conn_bytes_in ## ValueType: String | infinity ## Default: infinity ## Examples: 100KB incoming per 10 seconds. @@ -1486,7 +1486,7 @@ zone.default { ## Messages quota for the each of external MQTT connection. ## This value consumed by the number of recipient on a message. ## - ## @doc zone..rate_limit.quota.conn_messages_routing + ## @doc zones..rate_limit.quota.conn_messages_routing ## ValueType: String | infinity ## Default: infinity ## Examples: 100 messaegs per 1s: @@ -1496,7 +1496,7 @@ zone.default { ## Messages quota for the all of external MQTT connections. ## This value consumed by the number of recipient on a message. ## - ## @doc zone..rate_limit.quota.overall_messages_routing + ## @doc zones..rate_limit.quota.overall_messages_routing ## ValueType: String | infinity ## Default: infinity ## Examples: 200000 messages per 1s: @@ -1508,6 +1508,7 @@ zone.default { ## SSL options ## See ${example_common_ssl_options} for more information ssl.enable: true + ssl.versions: ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] ssl.keyfile: "{{ platform_etc_dir }}/certs/key.pem" ssl.certfile: "{{ platform_etc_dir }}/certs/cert.pem" ssl.cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" @@ -1524,7 +1525,7 @@ zone.default { ## The type of the listener. ## - ## @doc zone..listeners..type + ## @doc zones..listeners..type ## ValueType: tcp | ws ## - tcp: MQTT over TCP ## - ws: MQTT over Websocket @@ -1533,7 +1534,7 @@ zone.default { ## The IP address and port that the listener will bind. ## - ## @doc zone..listeners..bind + ## @doc zones..listeners..bind ## ValueType: IPAddress | Port | IPAddrPort ## Required: true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 @@ -1541,14 +1542,14 @@ zone.default { ## The size of the acceptor pool for this listener. ## - ## @doc zone..listeners..acceptors + ## @doc zones..listeners..acceptors ## ValueType: Number ## Default: 16 acceptors: 16 ## Maximum number of concurrent connections. ## - ## @doc zone..listeners..max_connections + ## @doc zones..listeners..max_connections ## ValueType: Number | infinity ## Default: infinity max_connections: 1024000 @@ -1557,7 +1558,7 @@ zone.default { ## ## See: https://github.com/emqtt/esockd#allowdeny ## - ## @doc zone..listeners..access_rules + ## @doc zones..listeners..access_rules ## ValueType: Array ## Default: [] ## Examples: @@ -1574,15 +1575,15 @@ zone.default { ## ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ ## - ## @doc zone..listeners..proxy_protocol + ## @doc zones..listeners..proxy_protocol ## ValueType: Boolean ## Default: true - proxy_protocol: true + proxy_protocol: false ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection ## if no proxy protocol packet recevied within the timeout. ## - ## @doc zone..listeners..proxy_protocol_timeout + ## @doc zones..listeners..proxy_protocol_timeout ## ValueType: Duration ## Default: 3s proxy_protocol_timeout: 3s @@ -1590,7 +1591,7 @@ zone.default { rate_limit { ## Maximum connections per second. ## - ## @doc zone..max_conn_rate + ## @doc zones..max_conn_rate ## ValueType: Number | infinity ## Default: 1000 ## Examples: @@ -1599,7 +1600,7 @@ zone.default { ## Message limit for the a external MQTT connection. ## - ## @doc zone..rate_limit.conn_messages_in + ## @doc zones..rate_limit.conn_messages_in ## ValueType: String | infinity ## Default: infinity ## Examples: 100 messages per 10 seconds. @@ -1612,7 +1613,7 @@ zone.default { ## The connection won't accept more messages if the messages come ## faster than the limit. ## - ## @doc zone..rate_limit.conn_bytes_in + ## @doc zones..rate_limit.conn_bytes_in ## ValueType: String | infinity ## Default: infinity ## Examples: 100KB incoming per 10 seconds. @@ -1623,7 +1624,7 @@ zone.default { ## Messages quota for the each of external MQTT connection. ## This value consumed by the number of recipient on a message. ## - ## @doc zone..rate_limit.quota.conn_messages_routing + ## @doc zones..rate_limit.quota.conn_messages_routing ## ValueType: String | infinity ## Default: infinity ## Examples: 100 messaegs per 1s: @@ -1633,7 +1634,7 @@ zone.default { ## Messages quota for the all of external MQTT connections. ## This value consumed by the number of recipient on a message. ## - ## @doc zone..rate_limit.quota.overall_messages_routing + ## @doc zones..rate_limit.quota.overall_messages_routing ## ValueType: String | infinity ## Default: infinity ## Examples: 200000 messages per 1s: @@ -1658,7 +1659,7 @@ zone.default { ## The type of the listener. ## - ## @doc zone..listeners..type + ## @doc zones..listeners..type ## ValueType: tcp | ws ## - tcp: MQTT over TCP ## - ws: MQTT over Websocket @@ -1667,7 +1668,7 @@ zone.default { ## The IP address and port that the listener will bind. ## - ## @doc zone..listeners..bind + ## @doc zones..listeners..bind ## ValueType: IPAddress | Port | IPAddrPort ## Required: true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 @@ -1675,14 +1676,14 @@ zone.default { ## The size of the acceptor pool for this listener. ## - ## @doc zone..listeners..acceptors + ## @doc zones..listeners..acceptors ## ValueType: Number ## Default: 16 acceptors: 16 ## Maximum number of concurrent connections. ## - ## @doc zone..listeners..max_connections + ## @doc zones..listeners..max_connections ## ValueType: Number | infinity ## Default: infinity max_connections: 512000 @@ -1691,7 +1692,7 @@ zone.default { ## ## See: https://github.com/emqtt/esockd#allowdeny ## - ## @doc zone..listeners..access_rules + ## @doc zones..listeners..access_rules ## ValueType: Array ## Default: [] ## Examples: @@ -1708,15 +1709,15 @@ zone.default { ## ## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/ ## - ## @doc zone..listeners..proxy_protocol + ## @doc zones..listeners..proxy_protocol ## ValueType: Boolean ## Default: true - proxy_protocol: true + proxy_protocol: false ## Sets the timeout for proxy protocol. EMQ X will close the TCP connection ## if no proxy protocol packet recevied within the timeout. ## - ## @doc zone..listeners..proxy_protocol_timeout + ## @doc zones..listeners..proxy_protocol_timeout ## ValueType: Duration ## Default: 3s proxy_protocol_timeout: 3s @@ -1724,7 +1725,7 @@ zone.default { rate_limit { ## Maximum connections per second. ## - ## @doc zone..max_conn_rate + ## @doc zones..max_conn_rate ## ValueType: Number | infinity ## Default: 1000 ## Examples: @@ -1733,7 +1734,7 @@ zone.default { ## Message limit for the a external MQTT connection. ## - ## @doc zone..rate_limit.conn_messages_in + ## @doc zones..rate_limit.conn_messages_in ## ValueType: String | infinity ## Default: infinity ## Examples: 100 messages per 10 seconds. @@ -1746,7 +1747,7 @@ zone.default { ## The connection won't accept more messages if the messages come ## faster than the limit. ## - ## @doc zone..rate_limit.conn_bytes_in + ## @doc zones..rate_limit.conn_bytes_in ## ValueType: String | infinity ## Default: infinity ## Examples: 100KB incoming per 10 seconds. @@ -1757,7 +1758,7 @@ zone.default { ## Messages quota for the each of external MQTT connection. ## This value consumed by the number of recipient on a message. ## - ## @doc zone..rate_limit.quota.conn_messages_routing + ## @doc zones..rate_limit.quota.conn_messages_routing ## ValueType: String | infinity ## Default: infinity ## Examples: 100 messaegs per 1s: @@ -1767,7 +1768,7 @@ zone.default { ## Messages quota for the all of external MQTT connections. ## This value consumed by the number of recipient on a message. ## - ## @doc zone..rate_limit.quota.overall_messages_routing + ## @doc zones..rate_limit.quota.overall_messages_routing ## ValueType: String | infinity ## Default: infinity ## Examples: 200000 messages per 1s: @@ -1797,7 +1798,7 @@ zone.default { #This is an example zone which has less "strict" settings. #It's useful to clients connecting the broker from trusted networks. -zone.internal { +zones.internal { acl.enable: false auth.enable: false listeners.mqtt_internal: { @@ -1805,7 +1806,7 @@ zone.internal { bind: "127.0.0.1:11883" acceptors: 4 max_connections: 1024000 - tcp.active_n: 1000 + tcp.active: 1000 tcp.backlog: 512 } } @@ -1958,10 +1959,10 @@ example_common_tcp_options { ## ## See: https://erlang.org/doc/man/inet.html#setopts-2 ## - ## @doc listeners..tcp.active_n + ## @doc listeners..tcp.active ## ValueType: Number ## Default: 100 - tcp.active_n: 100 + tcp.active: 100 ## TCP backlog defines the maximum length that the queue of ## pending connections can grow to. @@ -2072,10 +2073,10 @@ example_common_ssl_options { ## TLS versions only to protect from POODLE attack. ## - ## @doc listeners..ssl.tls_versions + ## @doc listeners..ssl.versions ## ValueType: Array - ## Default: [tlsv1.2,tlsv1.1,tlsv1] - ssl.tls_versions: [tlsv1.2,tlsv1.1,tlsv1] + ## Default: ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] + ssl.versions: ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"] ## TLS Handshake timeout. ## @@ -2189,17 +2190,9 @@ example_common_ssl_options { ## ## @doc listeners..ssl.ciphers ## ValueType: Array - ## Default: [ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA] - ssl.ciphers: [ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA] + ## Default: [ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA,PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA] + ssl.ciphers: [ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA,PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA] - ## Ciphers for TLS PSK. See 'https://tools.ietf.org/html/rfc4279#section-2'. - ## - ## Note that 'ciphers' and 'psk_ciphers' cannot be configured at the same time. - ## - ## @doc listeners..ssl.psk_ciphers - ## ValueType: Array - ## Default: [PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA] - ssl.psk_ciphers: [PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA] } ## Socket options for websocket connections diff --git a/apps/emqx/etc/emqx.conf.old b/apps/emqx/etc/emqx.conf.old index d2b5fd11d..862f9d78f 100644 --- a/apps/emqx/etc/emqx.conf.old +++ b/apps/emqx/etc/emqx.conf.old @@ -1107,7 +1107,7 @@ listener.tcp.external.max_conn_rate = 1000 ## Specify the {active, N} option for the external MQTT/TCP Socket. ## ## Value: Number -listener.tcp.external.active_n = 100 +listener.tcp.external.active = 100 ## Zone of the external MQTT/TCP listener belonged to. ## @@ -1247,7 +1247,7 @@ listener.tcp.internal.max_conn_rate = 1000 ## Specify the {active, N} option for the internal MQTT/TCP Socket. ## ## Value: Number -listener.tcp.internal.active_n = 1000 +listener.tcp.internal.active = 1000 ## Zone of the internal MQTT/TCP listener belonged to. ## @@ -1344,7 +1344,7 @@ listener.ssl.external.max_conn_rate = 500 ## Specify the {active, N} option for the internal MQTT/SSL Socket. ## ## Value: Number -listener.ssl.external.active_n = 100 +listener.ssl.external.active = 100 ## Zone of the external MQTT/SSL listener belonged to. ## @@ -1610,7 +1610,7 @@ listener.ws.external.max_conn_rate = 1000 ## Simulate the {active, N} option for the MQTT/WebSocket connections. ## ## Value: Number -listener.ws.external.active_n = 100 +listener.ws.external.active = 100 ## Zone of the external MQTT/WebSocket listener belonged to. ## @@ -1879,7 +1879,7 @@ listener.wss.external.max_conn_rate = 1000 ## Simulate the {active, N} option for the MQTT/WebSocket/SSL connections. ## ## Value: Number -listener.wss.external.active_n = 100 +listener.wss.external.active = 100 ## Zone of the external MQTT/WebSocket/SSL listener belonged to. ## diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index ab91c02b4..3363b013e 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -84,7 +84,7 @@ %% Sock State sockstate :: emqx_types:sockstate(), %% The {active, N} option - active_n :: pos_integer(), + active :: pos_integer(), %% Limiter limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer @@ -108,7 +108,7 @@ -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -165,7 +165,7 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; -info(active_n, #state{active_n = ActiveN}) -> +info(active, #state{active = ActiveN}) -> ActiveN; info(stats_timer, #state{stats_timer = StatsTimer}) -> StatsTimer; @@ -254,7 +254,7 @@ init_state(Transport, Socket, Options) -> conn_mod => ?MODULE }, Zone = proplists:get_value(zone, Options), - ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), + ActiveN = proplists:get_value(active, Options, ?ACTIVE_N), PubLimit = emqx_zone:publish_limit(Zone), BytesIn = proplists:get_value(rate_limit, Options), RateLimit = emqx_zone:ratelimit(Zone), @@ -272,7 +272,7 @@ init_state(Transport, Socket, Options) -> peername = Peername, sockname = Sockname, sockstate = idle, - active_n = ActiveN, + active = ActiveN, limiter = Limiter, parse_state = ParseState, serialize = Serialize, @@ -452,12 +452,12 @@ handle_msg({Passive, _Sock}, State) handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, - #state{active_n = ActiveN} = State) -> + #state{active = ActiveN} = State) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent -handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> +handle_msg({inet_reply, _Sock, ok}, State = #state{active = ActiveN}) -> case emqx_pd:get_counter(outgoing_pubs) > ActiveN of true -> Pubs = emqx_pd:reset_counter(outgoing_pubs), @@ -800,7 +800,7 @@ activate_socket(State = #state{sockstate = blocked}) -> {ok, State}; activate_socket(State = #state{transport = Transport, socket = Socket, - active_n = N}) -> + active = N}) -> case Transport:setopts(Socket, [{active, N}]) of ok -> {ok, State#state{sockstate = running}}; Error -> Error diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 1f3d1776b..e3c6a5e22 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -21,7 +21,6 @@ %% APIs -export([ start/0 - , ensure_all_started/0 , restart/0 , stop/0 ]). @@ -29,88 +28,35 @@ -export([ start_listener/1 , start_listener/3 , stop_listener/1 + , stop_listener/3 , restart_listener/1 , restart_listener/3 ]). --export([ find_id_by_listen_on/1 - , find_by_listen_on/1 - , find_by_id/1 - , identifier/1 - , format_listen_on/1 - ]). - --type(listener() :: #{ name := binary() - , proto := esockd:proto() - , listen_on := esockd:listen_on() - , opts := [esockd:option()] - }). - -%% @doc Find listener identifier by listen-on. -%% Return empty string (binary) if listener is not found in config. --spec(find_id_by_listen_on(esockd:listen_on()) -> binary() | false). -find_id_by_listen_on(ListenOn) -> - case find_by_listen_on(ListenOn) of - false -> false; - L -> identifier(L) - end. - -%% @doc Find listener by listen-on. -%% Return 'false' if not found. --spec(find_by_listen_on(esockd:listen_on()) -> listener() | false). -find_by_listen_on(ListenOn) -> - find_by_listen_on(ListenOn, emqx:get_env(listeners, [])). - -%% @doc Find listener by identifier. -%% Return 'false' if not found. --spec(find_by_id(string() | binary()) -> listener() | false). -find_by_id(Id) -> - find_by_id(iolist_to_binary(Id), emqx:get_env(listeners, [])). - -%% @doc Return the ID of the given listener. --spec identifier(listener()) -> binary(). -identifier(#{proto := Proto, name := Name}) -> - identifier(Proto, Name). - %% @doc Start all listeners. -spec(start() -> ok). start() -> - lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])). + lists:foreach(fun({ZoneName, ZoneConf}) -> + lists:foreach(fun({LName, LConf}) -> + start_listener(ZoneName, LName, LConf) + end, maps:to_list(maps:get(listeners, ZoneConf, #{}))) + end, maps:to_list(emqx_config:get([zones], #{}))). -%% @doc Ensure all configured listeners are started. -%% Raise exception if any of them failed to start. --spec(ensure_all_started() -> ok). -ensure_all_started() -> - ensure_all_started(emqx:get_env(listeners, []), []). +-spec(start_listener(atom()) -> ok). +start_listener(Id) -> + {ZoneName, ListenerName} = decode_listener_id(Id), + start_listener(ZoneName, ListenerName, + emqx_config:get([zones, ZoneName, listeners, ListenerName])). -ensure_all_started([], []) -> ok; -ensure_all_started([], Failed) -> error(Failed); -ensure_all_started([L | Rest], Results) -> - #{proto := Proto, listen_on := ListenOn, opts := Options} = L, - NewResults = - case start_listener(Proto, ListenOn, Options) of - {ok, _Pid} -> - Results; - {error, {already_started, _Pid}} -> - Results; - {error, Reason} -> - [{identifier(L), Reason} | Results] - end, - ensure_all_started(Rest, NewResults). - -%% @doc Format address:port for logging. --spec(format_listen_on(esockd:listen_on()) -> [char()]). -format_listen_on(ListenOn) -> format(ListenOn). - --spec(start_listener(listener()) -> ok). -start_listener(#{proto := Proto, name := Name, listen_on := ListenOn, opts := Options}) -> - ID = identifier(Proto, Name), - case start_listener(Proto, ListenOn, Options) of +-spec(start_listener(atom(), atom(), map()) -> ok). +start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) -> + case do_start_listener(ZoneName, ListenerName, Conf) of {ok, _} -> - console_print("Start ~s listener on ~s successfully.~n", [ID, format(ListenOn)]); + console_print("Start ~s listener ~s on ~s successfully.~n", + [Type, listener_id(ZoneName, ListenerName), format(Bind)]); {error, Reason} -> - io:format(standard_error, "Failed to start mqtt listener ~s on ~s: ~0p~n", - [ID, format(ListenOn), Reason]), + io:format(standard_error, "Failed to start ~s listener ~s on ~s: ~0p~n", + [Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]), error(Reason) end. @@ -122,124 +68,105 @@ console_print(_Fmt, _Args) -> ok. -endif. %% Start MQTT/TCP listener --spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) +-spec(do_start_listener(atom(), atom(), map()) -> {ok, pid()} | {error, term()}). -start_listener(tcp, ListenOn, Options) -> - start_mqtt_listener('mqtt:tcp', ListenOn, Options); - -%% Start MQTT/TLS listener -start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> - start_mqtt_listener('mqtt:ssl', ListenOn, Options); +do_start_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn} = Opts) -> + esockd:open(listener_id(ZoneName, ListenerName), ListenOn, merge_default(esockd_opts(Opts)), + {emqx_connection, start_link, [ZoneName, ListenerName]}); %% Start MQTT/WS listener -start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, - ranch_opts(Options), ws_opts(Options)); - -%% Start MQTT/WSS listener -start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, - ranch_opts(Options), ws_opts(Options)). - -replace(Opts, Key, Value) -> [{Key, Value} | proplists:delete(Key, Opts)]. - -drop_tls13_for_old_otp(Options) -> - case proplists:get_value(ssl_options, Options) of - undefined -> Options; - SslOpts -> - SslOpts1 = emqx_tls_lib:drop_tls13_for_old_otp(SslOpts), - replace(Options, ssl_options, SslOpts1) +do_start_listener(ZoneName, ListenerName, #{type := ws, bind := ListenOn} = Opts) -> + Id = listener_id(ZoneName, ListenerName), + RanchOpts = ranch_opts(Opts), + WsOpts = ws_opts(ZoneName, ListenerName, Opts), + case is_ssl(Opts) of + false -> + cowboy:start_clear(Id, with_port(ListenOn, RanchOpts), WsOpts); + true -> + cowboy:start_tls(Id, with_port(ListenOn, RanchOpts), WsOpts) end. -start_mqtt_listener(Name, ListenOn, Options0) -> - Options = drop_tls13_for_old_otp(Options0), - SockOpts = esockd:parse_opt(Options), - esockd:open(Name, ListenOn, merge_default(SockOpts), - {emqx_connection, start_link, [Options -- SockOpts]}). +esockd_opts(Opts0) -> + Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), + Opts2 = case emqx_config:deep_get([rate_limit, max_conn_rate], Opts0) of + infinity -> Opts1; + Rate -> Opts1#{max_conn_rate => Rate} + end, + Opts3 = Opts2#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))}, + maps:to_list(Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)}). -start_http_listener(Start, Name, ListenOn, RanchOpts, ProtoOpts) -> - Start(ws_name(Name, ListenOn), with_port(ListenOn, RanchOpts), ProtoOpts). - -mqtt_path(Options) -> - proplists:get_value(mqtt_path, Options, "/mqtt"). - -ws_opts(Options) -> - WsPaths = [{mqtt_path(Options), emqx_ws_connection, Options}], +ws_opts(ZoneName, ListenerName, Opts) -> + WsPaths = [{maps:get(mqtt_path, Opts, "/mqtt"), emqx_ws_connection, + #{zone => ZoneName, listener => ListenerName}}], Dispatch = cowboy_router:compile([{'_', WsPaths}]), - ProxyProto = proplists:get_value(proxy_protocol, Options, false), + ProxyProto = maps:get(proxy_protocol, Opts, false), #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}. -ranch_opts(Options0) -> - Options = drop_tls13_for_old_otp(Options0), - NumAcceptors = proplists:get_value(acceptors, Options, 4), - MaxConnections = proplists:get_value(max_connections, Options, 1024), - TcpOptions = proplists:get_value(tcp_options, Options, []), - RanchOpts = #{num_acceptors => NumAcceptors, - max_connections => MaxConnections, - socket_opts => TcpOptions}, - case proplists:get_value(ssl_options, Options) of - undefined -> RanchOpts; - SslOptions -> RanchOpts#{socket_opts => TcpOptions ++ SslOptions} - end. +ranch_opts(Opts) -> + NumAcceptors = maps:get(acceptors, Opts, 4), + MaxConnections = maps:get(max_connections, Opts, 1024), + #{num_acceptors => NumAcceptors, + max_connections => MaxConnections, + handshake_timeout => maps:get(handshake_timeout, Opts, 15000), + socket_opts => case is_ssl(Opts) of + true -> tcp_opts(Opts) ++ proplists:delete(handshake_timeout, ssl_opts(Opts)); + false -> tcp_opts(Opts) + end}. with_port(Port, Opts = #{socket_opts := SocketOption}) when is_integer(Port) -> Opts#{socket_opts => [{port, Port}| SocketOption]}; with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) -> Opts#{socket_opts => [{ip, Addr}, {port, Port}| SocketOption]}. +esockd_access_rules(StrRules) -> + Access = fun(S) -> + [A, CIDR] = string:tokens(S, " "), + {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} + end, + [Access(R) || R <- StrRules]. + %% @doc Restart all listeners -spec(restart() -> ok). restart() -> - lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])). + lists:foreach(fun({ZoneName, ZoneConf}) -> + lists:foreach(fun({LName, LConf}) -> + restart_listener(ZoneName, LName, LConf) + end, maps:to_list(maps:get(listeners, ZoneConf, #{}))) + end, maps:to_list(emqx_config:get([zones], #{}))). --spec(restart_listener(listener() | string() | binary()) -> ok | {error, any()}). -restart_listener(#{proto := Proto, listen_on := ListenOn, opts := Options}) -> - restart_listener(Proto, ListenOn, Options); -restart_listener(Identifier) -> - case emqx_listeners:find_by_id(Identifier) of - false -> {error, {no_such_listener, Identifier}}; - Listener -> restart_listener(Listener) +-spec(restart_listener(atom()) -> ok | {error, any()}). +restart_listener(ListenerID) -> + {ZoneName, ListenerName} = decode_listener_id(ListenerID), + restart_listener(ZoneName, ListenerName, + emqx_config:get([zones, ZoneName, listeners, ListenerName])). + +-spec(restart_listener(atom(), atom(), map()) -> ok | {error, any()}). +restart_listener(ZoneName, ListenerName, Conf) -> + case stop_listener(ZoneName, ListenerName, Conf) of + ok -> start_listener(ZoneName, ListenerName, Conf); + Error -> Error end. --spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> - ok | {error, any()}). -restart_listener(tcp, ListenOn, _Options) -> - esockd:reopen('mqtt:tcp', ListenOn); -restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls -> - esockd:reopen('mqtt:ssl', ListenOn); -restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - _ = cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)), - ok(start_listener(Proto, ListenOn, Options)); -restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - _ = cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)), - ok(start_listener(Proto, ListenOn, Options)); -restart_listener(Proto, ListenOn, _Opts) -> - esockd:reopen(Proto, ListenOn). - -ok({ok, _}) -> ok; -ok(Other) -> Other. - %% @doc Stop all listeners. -spec(stop() -> ok). stop() -> - lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])). + lists:foreach(fun({ZoneName, ZoneConf}) -> + lists:foreach(fun({LName, LConf}) -> + stop_listener(ZoneName, LName, LConf) + end, maps:to_list(maps:get(listeners, ZoneConf, #{}))) + end, maps:to_list(emqx_config:get([zones], #{}))). --spec(stop_listener(listener()) -> ok | {error, term()}). -stop_listener(#{proto := Proto, listen_on := ListenOn, opts := Opts}) -> - stop_listener(Proto, ListenOn, Opts). +-spec(stop_listener(atom()) -> ok | {error, term()}). +stop_listener(ListenerID) -> + {ZoneName, ListenerName} = decode_listener_id(ListenerID), + stop_listener(ZoneName, ListenerName, + emqx_config:get([zones, ZoneName, listeners, ListenerName])). --spec(stop_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) - -> ok | {error, term()}). -stop_listener(tcp, ListenOn, _Opts) -> - esockd:close('mqtt:tcp', ListenOn); -stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls -> - esockd:close('mqtt:ssl', ListenOn); -stop_listener(Proto, ListenOn, _Opts) when Proto == http; Proto == ws -> - cowboy:stop_listener(ws_name('mqtt:ws', ListenOn)); -stop_listener(Proto, ListenOn, _Opts) when Proto == https; Proto == wss -> - cowboy:stop_listener(ws_name('mqtt:wss', ListenOn)); -stop_listener(Proto, ListenOn, _Opts) -> - esockd:close(Proto, ListenOn). +-spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}). +stop_listener(ZoneName, ListenerName, #{type := tcp, bind := ListenOn}) -> + esockd:close(listener_id(ZoneName, ListenerName), ListenOn); +stop_listener(ZoneName, ListenerName, #{type := ws}) -> + cowboy:stop_listener(listener_id(ZoneName, ListenerName)). merge_default(Options) -> case lists:keytake(tcp_options, 1, Options) of @@ -256,23 +183,23 @@ format({Addr, Port}) when is_list(Addr) -> format({Addr, Port}) when is_tuple(Addr) -> io_lib:format("~s:~w", [inet:ntoa(Addr), Port]). -ws_name(Name, {_Addr, Port}) -> - ws_name(Name, Port); -ws_name(Name, Port) -> - list_to_atom(lists:concat([Name, ":", Port])). +listener_id(ZoneName, ListenerName) -> + list_to_atom(lists:append([atom_to_list(ZoneName), ":", atom_to_list(ListenerName)])). -identifier(Proto, Name) when is_atom(Proto) -> - identifier(atom_to_list(Proto), Name); -identifier(Proto, Name) -> - iolist_to_binary(["mqtt", ":", Proto, ":", Name]). - -find_by_listen_on(_ListenOn, []) -> false; -find_by_listen_on(ListenOn, [#{listen_on := ListenOn} = L | _]) -> L; -find_by_listen_on(ListenOn, [_ | Rest]) -> find_by_listen_on(ListenOn, Rest). - -find_by_id(_Id, []) -> false; -find_by_id(Id, [L | Rest]) -> - case identifier(L) =:= Id of - true -> L; - false -> find_by_id(Id, Rest) +decode_listener_id(Id) -> + case string:split(atom_to_list(Id), ":", leading) of + [Zone, Listen] -> {list_to_atom(Zone), list_to_atom(Listen)}; + _ -> error({invalid_listener_id, Id}) end. + +ssl_opts(Opts) -> + maps:to_list( + emqx_tls_lib:drop_tls13_for_old_otp( + maps:without([enable], + maps:get(ssl, Opts, #{})))). + +tcp_opts(Opts) -> + maps:to_list(maps:get(tcp, Opts, #{})). + +is_ssl(Opts) -> + emqx_config:deep_get([ssl, enable], Opts, false). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index f4900d36b..5a6044307 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -20,6 +20,7 @@ -type comma_separated_atoms() :: [atom()]. -type bar_separated_list() :: list(). -type ip_port() :: tuple(). +-type cipher() :: map(). -typerefl_from_string({duration/0, emqx_schema, to_duration}). -typerefl_from_string({duration_s/0, emqx_schema, to_duration_s}). @@ -30,6 +31,7 @@ -typerefl_from_string({comma_separated_list/0, emqx_schema, to_comma_separated_list}). -typerefl_from_string({bar_separated_list/0, emqx_schema, to_bar_separated_list}). -typerefl_from_string({ip_port/0, emqx_schema, to_ip_port}). +-typerefl_from_string({cipher/0, emqx_schema, to_erl_cipher_suite}). -typerefl_from_string({comma_separated_atoms/0, emqx_schema, to_comma_separated_atoms}). % workaround: prevent being recognized as unused functions @@ -37,6 +39,7 @@ to_bytesize/1, to_wordsize/1, to_percent/1, to_comma_separated_list/1, to_bar_separated_list/1, to_ip_port/1, + to_erl_cipher_suite/1, to_comma_separated_atoms/1]). -behaviour(hocon_schema). @@ -44,18 +47,19 @@ -reflect_type([ log_level/0, duration/0, duration_s/0, duration_ms/0, bytesize/0, wordsize/0, percent/0, file/0, comma_separated_list/0, bar_separated_list/0, ip_port/0, + cipher/0, comma_separated_atoms/0]). -export([structs/0, fields/1, translations/0, translation/1]). -export([t/1, t/3, t/4, ref/1]). -export([conf_get/2, conf_get/3, keys/2, filter/1]). --export([ssl/1, tr_ssl/2, tr_password_hash/2]). +-export([ssl/1]). %% will be used by emqx_ct_helper to find the dependent apps -export([includes/0]). structs() -> ["cluster", "node", "rpc", "log", "lager", - "acl", "mqtt", "zone", "listeners", "module", "broker", + "acl", "mqtt", "zones", "listeners", "module", "broker", "plugins", "sysmon", "alarm", "telemetry"] ++ includes(). @@ -271,7 +275,7 @@ fields("mqtt") -> , {"peer_cert_as_clientid", maybe_disabled(union([cn, dn, crt, pem, md5]))} ]; -fields("zone") -> +fields("zones") -> [ {"$name", ref("zone_settings")}]; fields("zone_settings") -> @@ -368,14 +372,14 @@ fields("ws_opts") -> "mqtt, mqtt-v3, mqtt-v3.1.1, mqtt-v5")} , {"check_origin_enable", t(boolean(), undefined, false)} , {"allow_origin_absence", t(boolean(), undefined, true)} - , {"check_origins", t(comma_separated_list())} + , {"check_origins", t(hoconsc:array(binary()), undefined, [])} , {"proxy_address_header", t(string(), undefined, "x-forwarded-for")} , {"proxy_port_header", t(string(), undefined, "x-forwarded-port")} , {"deflate_opts", ref("deflate_opts")} ]; fields("tcp_opts") -> - [ {"active_n", t(integer(), undefined, 100)} + [ {"active", t(integer(), undefined, 100)} , {"backlog", t(integer(), undefined, 1024)} , {"send_timeout", t(duration(), undefined, "15s")} , {"send_timeout_close", t(boolean(), undefined, true)} @@ -391,7 +395,9 @@ fields("tcp_opts") -> fields("ssl_opts") -> ssl(#{handshake_timeout => "15s" , depth => 10 - , reuse_sessions => true}); + , reuse_sessions => true + , versions => default_tls_vsns() + }); fields("deflate_opts") -> [ {"level", t(union([none, default, best_compression, best_speed]))} @@ -643,72 +649,19 @@ ssl(Defaults) -> , {"dhfile", t(string(), undefined, D("dhfile"))} , {"server_name_indication", t(union(disable, string()), undefined, D("server_name_indication"))} - , {"tls_versions", t(comma_separated_list(), undefined, D("tls_versions"))} - , {"ciphers", t(comma_separated_list(), undefined, D("ciphers"))} - , {"psk_ciphers", t(comma_separated_list(), undefined, D("ciphers"))}]. - -tr_ssl(Field, Conf) -> - Versions = case conf_get([Field, "tls_versions"], Conf) of - undefined -> undefined; - Vs -> [list_to_existing_atom(V) || V <- Vs] - end, - TLSCiphers = conf_get([Field, "ciphers"], Conf), - PSKCiphers = conf_get([Field, "psk_ciphers"], Conf), - Ciphers = ciphers(TLSCiphers, PSKCiphers, Field), - case emqx_schema:conf_get([Field, "enable"], Conf) of - X when X =:= true orelse X =:= undefined -> - filter([{versions, Versions}, - {ciphers, Ciphers}, - {user_lookup_fun, user_lookup_fun(PSKCiphers)}, - {handshake_timeout, conf_get([Field, "handshake_timeout"], Conf)}, - {depth, conf_get([Field, "depth"], Conf)}, - {password, conf_get([Field, "key_password"], Conf)}, - {dhfile, conf_get([Field, "dhfile"], Conf)}, - {keyfile, emqx_schema:conf_get([Field, "keyfile"], Conf)}, - {certfile, emqx_schema:conf_get([Field, "certfile"], Conf)}, - {cacertfile, emqx_schema:conf_get([Field, "cacertfile"], Conf)}, - {verify, emqx_schema:conf_get([Field, "verify"], Conf)}, - {fail_if_no_peer_cert, conf_get([Field, "fail_if_no_peer_cert"], Conf)}, - {secure_renegotiate, conf_get([Field, "secure_renegotiate"], Conf)}, - {reuse_sessions, conf_get([Field, "reuse_sessions"], Conf)}, - {honor_cipher_order, conf_get([Field, "honor_cipher_order"], Conf)}, - {server_name_indication, emqx_schema:conf_get([Field, "server_name_indication"], Conf)} - ]); - _ -> - [] - end. - -map_psk_ciphers(PSKCiphers) -> - lists:map( - fun("PSK-AES128-CBC-SHA") -> {psk, aes_128_cbc, sha}; - ("PSK-AES256-CBC-SHA") -> {psk, aes_256_cbc, sha}; - ("PSK-3DES-EDE-CBC-SHA") -> {psk, '3des_ede_cbc', sha}; - ("PSK-RC4-SHA") -> {psk, rc4_128, sha} - end, PSKCiphers). - -ciphers(undefined, undefined, _) -> - undefined; -ciphers(TLSCiphers, undefined, _) -> - TLSCiphers; -ciphers(undefined, PSKCiphers, _) -> - map_psk_ciphers(PSKCiphers); -ciphers(_, _, Field) -> - error(Field ++ ".ciphers and " ++ Field ++ ".psk_ciphers cannot be configured at the same time"). - -user_lookup_fun(undefined) -> - undefined; -user_lookup_fun(_PSKCiphers) -> - {fun emqx_psk:lookup/3, <<>>}. - -tr_password_hash(Field, Conf) -> - case emqx_schema:conf_get([Field, "password_hash"], Conf) of - [Hash] -> list_to_atom(Hash); - [Prefix, Suffix] -> {list_to_atom(Prefix), list_to_atom(Suffix)}; - [Hash, MacFun, Iterations, Dklen] -> {list_to_atom(Hash), list_to_atom(MacFun), - list_to_integer(Iterations), list_to_integer(Dklen)}; - _ -> plain - end. + , {"versions", #{ type => list(atom()) + , default => maps:get(versions, Defaults, default_tls_vsns()) + , converter => fun (Vsns) -> [tls_vsn(V) || V <- Vsns] end + }} + , {"ciphers", t(hoconsc:array(string()), undefined, D("ciphers"))} + , {"user_lookup_fun", t(any(), undefined, {fun emqx_psk:lookup/3, <<>>})} + ]. +default_tls_vsns() -> [<<"tlsv1.3">>, <<"tlsv1.2">>, <<"tlsv1.1">>, <<"tlsv1">>]. +tls_vsn(<<"tlsv1.3">>) -> 'tlsv1.3'; +tls_vsn(<<"tlsv1.2">>) -> 'tlsv1.2'; +tls_vsn(<<"tlsv1.1">>) -> 'tlsv1.1'; +tls_vsn(<<"tlsv1">>) -> 'tlsv1'. %% @private return a list of keys in a parent field -spec(keys(string(), hocon:config()) -> [string()]). @@ -814,3 +767,9 @@ to_ip_port(Str) -> end; _ -> {error, Str} end. + +to_erl_cipher_suite(Str) -> + case ssl:str_to_suite(Str) of + {error, Reason} -> error({invalid_cipher, Reason}); + Cipher -> Cipher + end. diff --git a/apps/emqx/src/emqx_tls_lib.erl b/apps/emqx/src/emqx_tls_lib.erl index 72a955962..24a9a15cf 100644 --- a/apps/emqx/src/emqx_tls_lib.erl +++ b/apps/emqx/src/emqx_tls_lib.erl @@ -161,17 +161,16 @@ drop_tls13_for_old_otp(SslOpts) -> , "TLS_AES_128_CCM_8_SHA256" ]). drop_tls13(SslOpts0) -> - SslOpts1 = case proplists:get_value(versions, SslOpts0) of - undefined -> SslOpts0; - Vsns -> replace(SslOpts0, versions, Vsns -- ['tlsv1.3']) + SslOpts1 = case maps:find(versions, SslOpts0) of + error -> SslOpts0; + {ok, Vsns} -> SslOpts0#{versions => (Vsns -- ['tlsv1.3'])} end, - case proplists:get_value(ciphers, SslOpts1) of - undefined -> SslOpts1; - Ciphers -> replace(SslOpts1, ciphers, Ciphers -- ?TLSV13_EXCLUSIVE_CIPHERS) + case maps:find(ciphers, SslOpts1) of + error -> SslOpts1; + {ok, Ciphers} -> + SslOpts1#{ciphers => Ciphers -- ?TLSV13_EXCLUSIVE_CIPHERS} end. -replace(Opts, Key, Value) -> [{Key, Value} | proplists:delete(Key, Opts)]. - -if(?OTP_RELEASE > 22). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -181,13 +180,13 @@ drop_tls13_test() -> ?assert(lists:member('tlsv1.3', Versions)), Ciphers = default_ciphers(), ?assert(has_tlsv13_cipher(Ciphers)), - Opts0 = [{versions, Versions}, {ciphers, Ciphers}, other, {bool, true}], + Opts0 = #{versions => Versions, ciphers => Ciphers, other => true}, Opts = drop_tls13(Opts0), - ?assertNot(lists:member('tlsv1.3', proplists:get_value(versions, Opts))), - ?assertNot(has_tlsv13_cipher(proplists:get_value(ciphers, Opts))). + ?assertNot(lists:member('tlsv1.3', maps:get(versions, Opts, undefined))), + ?assertNot(has_tlsv13_cipher(maps:get(ciphers, Opts, undefined))). drop_tls13_no_versions_cipers_test() -> - Opts0 = [other, {bool, true}], + Opts0 = #{other => 0, bool => true}, Opts = drop_tls13(Opts0), ?_assertEqual(Opts0, Opts). diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 7bc68c271..8d7816acf 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -62,8 +62,8 @@ sockname :: emqx_types:peername(), %% Sock state sockstate :: emqx_types:sockstate(), - %% Simulate the active_n opt - active_n :: pos_integer(), + %% Simulate the active opt + active :: pos_integer(), %% MQTT Piggyback mqtt_piggyback :: single | multiple, %% Limiter @@ -93,7 +93,7 @@ -type(ws_cmd() :: {active, boolean()}|close). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -124,7 +124,7 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; -info(active_n, #state{active_n = ActiveN}) -> +info(active, #state{active = ActiveN}) -> ActiveN; info(limiter, #state{limiter = Limiter}) -> maybe_apply(fun emqx_limiter:info/1, Limiter); @@ -293,7 +293,7 @@ websocket_init([Req, Opts]) -> BytesIn = proplists:get_value(rate_limit, Opts), RateLimit = emqx_zone:ratelimit(Zone), Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), - ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N), + ActiveN = proplists:get_value(active, Opts, ?ACTIVE_N), MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), @@ -309,7 +309,7 @@ websocket_init([Req, Opts]) -> {ok, #state{peername = Peername, sockname = Sockname, sockstate = running, - active_n = ActiveN, + active = ActiveN, mqtt_piggyback = MQTTPiggyback, limiter = Limiter, parse_state = ParseState, @@ -372,7 +372,7 @@ websocket_info({check_gc, Stats}, State) -> return(check_oom(run_gc(Stats, State))); websocket_info(Deliver = {deliver, _Topic, _Msg}, - State = #state{active_n = ActiveN}) -> + State = #state{active = ActiveN}) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); @@ -551,7 +551,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> %% Handle incoming packet %%-------------------------------------------------------------------- -handle_incoming(Packet, State = #state{active_n = ActiveN}) +handle_incoming(Packet, State = #state{active = ActiveN}) when is_record(Packet, mqtt_packet) -> ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), ok = inc_incoming_stats(Packet), @@ -586,7 +586,7 @@ with_channel(Fun, Args, State = #state{channel = Channel}) -> %% Handle outgoing packets %%-------------------------------------------------------------------- -handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQTTPiggyback}) -> +handle_outgoing(Packets, State = #state{active = ActiveN, mqtt_piggyback = MQTTPiggyback}) -> IoData = lists:map(serialize_and_inc_stats_fun(State), Packets), Oct = iolist_size(IoData), ok = inc_sent_stats(length(Packets), Oct), diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index a6b2b614a..6bace4ccc 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -120,7 +120,7 @@ t_info(_) -> end end), #{sockinfo := SockInfo} = emqx_connection:info(CPid), - ?assertMatch(#{active_n := 100, + ?assertMatch(#{active := 100, peername := {{127,0,0,1},3456}, sockname := {{127,0,0,1},1883}, sockstate := idle, @@ -219,8 +219,8 @@ t_handle_msg_deliver(_) -> t_handle_msg_inet_reply(_) -> ok = meck:expect(emqx_pd, get_counter, fun(_) -> 10 end), - ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 0}))), - ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active_n => 100}))), + ?assertMatch({ok, _St}, handle_msg({inet_reply, for_testing, ok}, st(#{active => 0}))), + ?assertEqual(ok, handle_msg({inet_reply, for_testing, ok}, st(#{active => 100}))), ?assertMatch({stop, {shutdown, for_testing}, _St}, handle_msg({inet_reply, for_testing, {error, for_testing}}, st())). @@ -386,7 +386,7 @@ t_start_link_exit_on_activate(_) -> t_get_conn_info(_) -> with_conn(fun(CPid) -> #{sockinfo := SockInfo} = emqx_connection:info(CPid), - ?assertEqual(#{active_n => 100, + ?assertEqual(#{active => 100, peername => {{127,0,0,1},3456}, sockname => {{127,0,0,1},1883}, sockstate => running, diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index 8ce35b50c..e80643e96 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -265,7 +265,7 @@ t_connect_idle_timeout(_) -> t_connect_limit_timeout(_) -> ok = meck:new(proplists, [non_strict, passthrough, no_history, no_link, unstick]), - meck:expect(proplists, get_value, fun(active_n, _Options, _Default) -> 1; + meck:expect(proplists, get_value, fun(active, _Options, _Default) -> 1; (Arg1, ARg2, Arg3) -> meck:passthrough([Arg1, ARg2, Arg3]) end), diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 6db831972..a9a6b7792 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -118,7 +118,7 @@ t_info(_) -> end), #{sockinfo := SockInfo} = ?ws_conn:call(WsPid, info), #{socktype := ws, - active_n := 100, + active := 100, peername := {{127,0,0,1}, 3456}, sockname := {{127,0,0,1}, 18083}, sockstate := running diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index d465f9ca3..fa127cc8b 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -303,7 +303,7 @@ sockinfo(#state{peername = Peername}) -> peername => Peername, sockname => {{127, 0, 0, 1}, 5683}, %% FIXME: Sock? sockstate => running, - active_n => 1 + active => 1 }. %% copies from emqx_channel:info/1 diff --git a/apps/emqx_exproto/etc/emqx_exproto.conf b/apps/emqx_exproto/etc/emqx_exproto.conf index 7a7667271..687e97748 100644 --- a/apps/emqx_exproto/etc/emqx_exproto.conf +++ b/apps/emqx_exproto/etc/emqx_exproto.conf @@ -49,7 +49,7 @@ exproto.listener.protoname.max_conn_rate = 1000 ## Specify the {active, N} option for the external MQTT/TCP Socket. ## ## Value: Number -exproto.listener.protoname.active_n = 100 +exproto.listener.protoname.active = 100 ## Idle timeout ## diff --git a/apps/emqx_exproto/priv/emqx_exproto.schema b/apps/emqx_exproto/priv/emqx_exproto.schema index 4bd215847..6d0fb0fa8 100644 --- a/apps/emqx_exproto/priv/emqx_exproto.schema +++ b/apps/emqx_exproto/priv/emqx_exproto.schema @@ -78,7 +78,7 @@ end}. {datatype, integer} ]}. -{mapping, "exproto.listener.$proto.active_n", "emqx_exproto.listeners", [ +{mapping, "exproto.listener.$proto.active", "emqx_exproto.listeners", [ {default, 100}, {datatype, integer} ]}. @@ -250,7 +250,7 @@ end}. end, ConnOpts = fun(Prefix) -> - Filter([{active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)}, + Filter([{active, cuttlefish:conf_get(Prefix ++ ".active", Conf, undefined)}, {idle_timeout, cuttlefish:conf_get(Prefix ++ ".idle_timeout", Conf, undefined)}]) end, diff --git a/apps/emqx_exproto/src/emqx_exproto_conn.erl b/apps/emqx_exproto/src/emqx_exproto_conn.erl index da655bcb4..fb30e1e88 100644 --- a/apps/emqx_exproto/src/emqx_exproto_conn.erl +++ b/apps/emqx_exproto/src/emqx_exproto_conn.erl @@ -61,7 +61,7 @@ %% Sock State sockstate :: emqx_types:sockstate(), %% The {active, N} option - active_n :: pos_integer(), + active :: pos_integer(), %% BACKW: e4.2.0-e4.2.1 %% We should remove it sendfun :: function() | undefined, @@ -84,7 +84,7 @@ -type(state() :: #state{}). -define(ACTIVE_N, 100). --define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). @@ -137,7 +137,7 @@ info(sockname, #state{sockname = Sockname}) -> Sockname; info(sockstate, #state{sockstate = SockSt}) -> SockSt; -info(active_n, #state{active_n = ActiveN}) -> +info(active, #state{active = ActiveN}) -> ActiveN. -spec(stats(pid()|state()) -> emqx_types:stats()). @@ -240,7 +240,7 @@ init_state(WrappedSock, Peername, Options) -> conn_mod => ?MODULE }, - ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), + ActiveN = proplists:get_value(active, Options, ?ACTIVE_N), %% FIXME: %%Limiter = emqx_limiter:init(Options), @@ -255,7 +255,7 @@ init_state(WrappedSock, Peername, Options) -> peername = Peername, sockname = Sockname, sockstate = idle, - active_n = ActiveN, + active = ActiveN, sendfun = undefined, limiter = undefined, channel = Channel, @@ -403,13 +403,13 @@ handle_msg({Passive, _Sock}, State) handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, - State = #state{active_n = ActiveN}) -> + State = #state{active = ActiveN}) -> Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent %% TODO: Who will deliver this message? -handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) -> +handle_msg({inet_reply, _Sock, ok}, State = #state{active = ActiveN}) -> case emqx_pd:get_counter(outgoing_pkt) > ActiveN of true -> Pubs = emqx_pd:reset_counter(outgoing_pkt), @@ -652,7 +652,7 @@ activate_socket(State = #state{sockstate = closed}) -> activate_socket(State = #state{sockstate = blocked}) -> {ok, State}; activate_socket(State = #state{socket = Socket, - active_n = N}) -> + active = N}) -> %% FIXME: Works on dtls/udp ??? %% How to hanlde buffer? case esockd_setopts(Socket, [{active, N}]) of diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 55f992da6..9a8c0229a 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -459,7 +459,7 @@ sockinfo(#lwm2m_state{peername = Peername}) -> peername => Peername, sockname => {{127,0,0,1}, 5683}, %% FIXME: Sock? sockstate => running, - active_n => 1 + active => 1 }. %% copies from emqx_channel:info/1 diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 2339961cf..bc1c11075 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -97,7 +97,7 @@ pending_topic_ids = #{} :: pending_msgs() }). --define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). +-define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).