fix(modules): fix conflicts

This commit is contained in:
turtled 2020-10-11 11:12:39 +08:00
parent fa31062a5e
commit eb4b9936dc
42 changed files with 1933 additions and 4283 deletions

View File

@ -1,26 +0,0 @@
%%--------------------------------------------------------------------
%% [ACL](https://docs.emqx.io/broker/v3/en/config.html)
%%
%% -type(who() :: all | binary() |
%% {ipaddr, esockd_access:cidr()} |
%% {client, binary()} |
%% {user, binary()}).
%%
%% -type(access() :: subscribe | publish | pubsub).
%%
%% -type(topic() :: binary()).
%%
%% -type(rule() :: {allow, all} |
%% {allow, who(), access(), list(topic())} |
%% {deny, all} |
%% {deny, who(), access(), list(topic())}).
%%--------------------------------------------------------------------
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.

View File

@ -1,14 +0,0 @@
%%--------------------------------------------------------------------
%% For paho interoperability test cases
%%--------------------------------------------------------------------
{deny, {client, "myclientid"}, subscribe, ["test/nosubscribe"]}.
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.

170
etc/cluster.conf Normal file
View File

@ -0,0 +1,170 @@
##--------------------------------------------------------------------
## Cluster
##--------------------------------------------------------------------
## Cluster name.
##
## Value: String
cluster.name = emqxcl
## Specify the erlang distributed protocol.
##
## Value: Enum
## - inet_tcp: the default; handles TCP streams with IPv4 addressing.
## - inet6_tcp: handles TCP with IPv6 addressing.
## - inet_tls: using TLS for Erlang Distribution.
##
## vm.args: -proto_dist inet_tcp
cluster.proto_dist = inet_tcp
## Cluster auto-discovery strategy.
##
## Value: Enum
## - manual: Manual join command
## - static: Static node list
## - mcast: IP Multicast
## - dns: DNS A Record
## - etcd: etcd
## - k8s: Kubernetes
##
## Default: manual
cluster.discovery = manual
## Enable cluster autoheal from network partition.
##
## Value: on | off
##
## Default: on
cluster.autoheal = on
## Autoclean down node. A down node will be removed from the cluster
## if this value > 0.
##
## Value: Duration
## -h: hour, e.g. '2h' for 2 hours
## -m: minute, e.g. '5m' for 5 minutes
## -s: second, e.g. '30s' for 30 seconds
##
## Default: 5m
cluster.autoclean = 5m
##--------------------------------------------------------------------
## Cluster using static node list
## Node list of the cluster.
##
## Value: String
## cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1
##--------------------------------------------------------------------
## Cluster using IP Multicast.
## IP Multicast Address.
##
## Value: IP Address
## cluster.mcast.addr = 239.192.0.1
## Multicast Ports.
##
## Value: Port List
## cluster.mcast.ports = 4369,4370
## Multicast Iface.
##
## Value: Iface Address
##
## Default: 0.0.0.0
## cluster.mcast.iface = 0.0.0.0
## Multicast Ttl.
##
## Value: 0-255
## cluster.mcast.ttl = 255
## Multicast loop.
##
## Value: on | off
## cluster.mcast.loop = on
##--------------------------------------------------------------------
## Cluster using DNS A records.
## DNS name.
##
## Value: String
## cluster.dns.name = localhost
## The App name is used to build 'node.name' with IP address.
##
## Value: String
## cluster.dns.app = emqx
##--------------------------------------------------------------------
## Cluster using etcd
## Etcd server list, seperated by ','.
##
## Value: String
## cluster.etcd.server = http://127.0.0.1:2379
## The prefix helps build nodes path in etcd. Each node in the cluster
## will create a path in etcd: v2/keys/<prefix>/<cluster.name>/<node.name>
##
## Value: String
## cluster.etcd.prefix = emqxcl
## The TTL for node's path in etcd.
##
## Value: Duration
##
## Default: 1m, 1 minute
## cluster.etcd.node_ttl = 1m
## Path to a file containing the client's private PEM-encoded key.
##
## Value: File
## cluster.etcd.ssl.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
## The path to a file containing the client's certificate.
##
## Value: File
## cluster.etcd.ssl.certfile = {{ platform_etc_dir }}/certs/client.pem
## Path to the file containing PEM-encoded CA certificates. The CA certificates
## are used during server authentication and when building the client certificate chain.
##
## Value: File
## cluster.etcd.ssl.cacertfile = {{ platform_etc_dir }}/certs/ca.pem
##--------------------------------------------------------------------
## Cluster using Kubernetes
## Kubernetes API server list, seperated by ','.
##
## Value: String
## cluster.k8s.apiserver = http://10.110.111.204:8080
## The service name helps lookup EMQ nodes in the cluster.
##
## Value: String
## cluster.k8s.service_name = emqx
## The address type is used to extract host from k8s service.
##
## Value: ip | dns | hostname
## cluster.k8s.address_type = ip
## The app name helps build 'node.name'.
##
## Value: String
## cluster.k8s.app_name = emqx
## The suffix added to dns and hostname get from k8s service
##
## Value: String
## cluster.k8s.suffix = pod.cluster.local
## Kubernetes Namespace
##
## Value: String
## cluster.k8s.namespace = default

File diff suppressed because it is too large Load Diff

928
etc/listeners.conf Normal file
View File

@ -0,0 +1,928 @@
##--------------------------------------------------------------------
## Listeners
##--------------------------------------------------------------------
##--------------------------------------------------------------------
## MQTT/TCP - External TCP Listener for MQTT Protocol
## listener.tcp.$name is the IP address and port that the MQTT/TCP
## listener will bind.
##
## Value: IP:Port | Port
##
## Examples: 1883, 127.0.0.1:1883, ::1:1883
listener.tcp.external = 0.0.0.0:1883
## The acceptor pool for external MQTT/TCP listener.
##
## Value: Number
listener.tcp.external.acceptors = 8
## Maximum number of concurrent MQTT/TCP connections.
##
## Value: Number
listener.tcp.external.max_connections = 1024000
## Maximum external connections per second.
##
## Value: Number
listener.tcp.external.max_conn_rate = 1000
## Specify the {active, N} option for the external MQTT/TCP Socket.
##
## Value: Number
listener.tcp.external.active_n = 100
## Zone of the external MQTT/TCP listener belonged to.
##
## See: zone.$name.*
##
## Value: String
listener.tcp.external.zone = external
## The access control rules for the MQTT/TCP listener.
##
## See: https://github.com/emqtt/esockd#allowdeny
##
## Value: ACL Rule
##
## Example: allow 192.168.0.0/24
listener.tcp.external.access.1 = allow all
## Enable the Proxy Protocol V1/2 if the EMQ X cluster is deployed
## behind HAProxy or Nginx.
##
## See: https://www.haproxy.com/blog/haproxy/proxy-protocol/
##
## Value: on | off
## listener.tcp.external.proxy_protocol = on
## Sets the timeout for proxy protocol. EMQ X will close the TCP connection
## if no proxy protocol packet recevied within the timeout.
##
## Value: Duration
## listener.tcp.external.proxy_protocol_timeout = 3s
## Enable the option for X.509 certificate based authentication.
## EMQX will use the common name of certificate as MQTT username.
##
## Value: cn | dn | crt
## listener.tcp.external.peer_cert_as_username = cn
## The TCP backlog defines the maximum length that the queue of pending
## connections can grow to.
##
## Value: Number >= 0
listener.tcp.external.backlog = 1024
## The TCP send timeout for external MQTT connections.
##
## Value: Duration
listener.tcp.external.send_timeout = 15s
## Close the TCP connection if send timeout.
##
## Value: on | off
listener.tcp.external.send_timeout_close = on
## The TCP receive buffer(os kernel) for MQTT connections.
##
## See: http://erlang.org/doc/man/inet.html
##
## Value: Bytes
## listener.tcp.external.recbuf = 2KB
## The TCP send buffer(os kernel) for MQTT connections.
##
## See: http://erlang.org/doc/man/inet.html
##
## Value: Bytes
## listener.tcp.external.sndbuf = 2KB
## The size of the user-level software buffer used by the driver.
## Not to be confused with options sndbuf and recbuf, which correspond
## to the Kernel socket buffers. It is recommended to have val(buffer)
## >= max(val(sndbuf),val(recbuf)) to avoid performance issues because
## of unnecessary copying. val(buffer) is automatically set to the above
## maximum when values sndbuf or recbuf are set.
##
## See: http://erlang.org/doc/man/inet.html
##
## Value: Bytes
## listener.tcp.external.buffer = 2KB
## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled.
##
## Value: on | off
## listener.tcp.external.tune_buffer = off
## The TCP_NODELAY flag for MQTT connections. Small amounts of data are
## sent immediately if the option is enabled.
##
## Value: true | false
listener.tcp.external.nodelay = true
## The SO_REUSEADDR flag for TCP listener.
##
## Value: true | false
listener.tcp.external.reuseaddr = true
##--------------------------------------------------------------------
## Internal TCP Listener for MQTT Protocol
## The IP address and port that the internal MQTT/TCP protocol listener
## will bind.
##
## Value: IP:Port, Port
##
## Examples: 11883, 127.0.0.1:11883, ::1:11883
listener.tcp.internal = 127.0.0.1:11883
## The acceptor pool for internal MQTT/TCP listener.
##
## Value: Number
listener.tcp.internal.acceptors = 4
## Maximum number of concurrent MQTT/TCP connections.
##
## Value: Number
listener.tcp.internal.max_connections = 1024000
## Maximum internal connections per second.
##
## Value: Number
listener.tcp.internal.max_conn_rate = 1000
## Specify the {active, N} option for the internal MQTT/TCP Socket.
##
## Value: Number
listener.tcp.internal.active_n = 1000
## Zone of the internal MQTT/TCP listener belonged to.
##
## Value: String
listener.tcp.internal.zone = internal
## The TCP backlog of internal MQTT/TCP Listener.
##
## See: listener.tcp.$name.backlog
##
## Value: Number >= 0
listener.tcp.internal.backlog = 512
## The TCP send timeout for internal MQTT connections.
##
## See: listener.tcp.$name.send_timeout
##
## Value: Duration
listener.tcp.internal.send_timeout = 5s
## Close the MQTT/TCP connection if send timeout.
##
## See: listener.tcp.$name.send_timeout_close
##
## Value: on | off
listener.tcp.internal.send_timeout_close = on
## The TCP receive buffer(os kernel) for internal MQTT connections.
##
## See: listener.tcp.$name.recbuf
##
## Value: Bytes
listener.tcp.internal.recbuf = 64KB
## The TCP send buffer(os kernel) for internal MQTT connections.
##
## See: http://erlang.org/doc/man/inet.html
##
## Value: Bytes
listener.tcp.internal.sndbuf = 64KB
## The size of the user-level software buffer used by the driver.
##
## See: listener.tcp.$name.buffer
##
## Value: Bytes
## listener.tcp.internal.buffer = 16KB
## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled.
##
## See: listener.tcp.$name.tune_buffer
##
## Value: on | off
## listener.tcp.internal.tune_buffer = off
## The TCP_NODELAY flag for internal MQTT connections.
##
## See: listener.tcp.$name.nodelay
##
## Value: true | false
listener.tcp.internal.nodelay = false
## The SO_REUSEADDR flag for MQTT/TCP Listener.
##
## Value: true | false
listener.tcp.internal.reuseaddr = true
##--------------------------------------------------------------------
## MQTT/SSL - External SSL Listener for MQTT Protocol
## listener.ssl.$name is the IP address and port that the MQTT/SSL
## listener will bind.
##
## Value: IP:Port | Port
##
## Examples: 8883, 127.0.0.1:8883, ::1:8883
listener.ssl.external = 8883
## The acceptor pool for external MQTT/SSL listener.
##
## Value: Number
listener.ssl.external.acceptors = 16
## Maximum number of concurrent MQTT/SSL connections.
##
## Value: Number
listener.ssl.external.max_connections = 102400
## Maximum MQTT/SSL connections per second.
##
## Value: Number
listener.ssl.external.max_conn_rate = 500
## Specify the {active, N} option for the internal MQTT/SSL Socket.
##
## Value: Number
listener.ssl.external.active_n = 100
## Zone of the external MQTT/SSL listener belonged to.
##
## Value: String
listener.ssl.external.zone = external
## The access control rules for the MQTT/SSL listener.
##
## See: listener.tcp.$name.access
##
## Value: ACL Rule
listener.ssl.external.access.1 = allow all
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
## HAProxy or Nginx.
##
## See: listener.tcp.$name.proxy_protocol
##
## Value: on | off
## listener.ssl.external.proxy_protocol = on
## Sets the timeout for proxy protocol.
##
## See: listener.tcp.$name.proxy_protocol_timeout
##
## Value: Duration
## listener.ssl.external.proxy_protocol_timeout = 3s
## TLS versions only to protect from POODLE attack.
##
## See: http://erlang.org/doc/man/ssl.html
##
## Value: String, seperated by ','
## listener.ssl.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## TLS Handshake timeout.
##
## Value: Duration
listener.ssl.external.handshake_timeout = 15s
## Path to the file containing the user's private PEM-encoded key.
##
## See: http://erlang.org/doc/man/ssl.html
##
## Value: File
listener.ssl.external.keyfile = {{ platform_etc_dir }}/certs/key.pem
## Path to a file containing the user certificate.
##
## See: http://erlang.org/doc/man/ssl.html
##
## Value: File
listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## Path to the file containing PEM-encoded CA certificates. The CA certificates
## are used during server authentication and when building the client certificate chain.
##
## Value: File
## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## The Ephemeral Diffie-Helman key exchange is a very effective way of
## ensuring Forward Secrecy by exchanging a set of keys that never hit
## the wire. Since the DH key is effectively signed by the private key,
## it needs to be at least as strong as the private key. In addition,
## the default DH groups that most of the OpenSSL installations have
## are only a handful (since they are distributed with the OpenSSL
## package that has been built for the operating system its running on)
## and hence predictable (not to mention, 1024 bits only).
## In order to escape this situation, first we need to generate a fresh,
## strong DH group, store it in a file and then use the option above,
## to force our SSL application to use the new DH group. Fortunately,
## OpenSSL provides us with a tool to do that. Simply run:
## openssl dhparam -out dh-params.pem 2048
##
## Value: File
## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem
## A server only does x509-path validation in mode verify_peer,
## as it then sends a certificate request to the client (this
## message is not sent if the verify option is verify_none).
## You can then also want to specify option fail_if_no_peer_cert.
## More information at: http://erlang.org/doc/man/ssl.html
##
## Value: verify_peer | verify_none
## listener.ssl.external.verify = verify_peer
## Used together with {verify, verify_peer} by an SSL server. If set to true,
## the server fails if the client does not have a certificate to send, that is,
## sends an empty certificate.
##
## Value: true | false
## listener.ssl.external.fail_if_no_peer_cert = true
## This is the single most important configuration option of an Erlang SSL
## application. Ciphers (and their ordering) define the way the client and
## server encrypt information over the wire, from the initial Diffie-Helman
## key exchange, the session key encryption ## algorithm and the message
## digest algorithm. Selecting a good cipher suite is critical for the
## applications data security, confidentiality and performance.
##
## The cipher list above offers:
##
## A good balance between compatibility with older browsers.
## It can get stricter for Machine-To-Machine scenarios.
## Perfect Forward Secrecy.
## No old/insecure encryption and HMAC algorithms
##
## Most of it was copied from Mozillas Server Side TLS article
##
## Value: Ciphers
listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
## Ciphers for TLS PSK.
## Note that 'listener.ssl.external.ciphers' and 'listener.ssl.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
#listener.ssl.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## SSL parameter renegotiation is a feature that allows a client and a server
## to renegotiate the parameters of the SSL connection on the fly.
## RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation,
## you drop support for the insecure renegotiation, prone to MitM attacks.
##
## Value: on | off
## listener.ssl.external.secure_renegotiate = off
## A performance optimization setting, it allows clients to reuse
## pre-existing sessions, instead of initializing new ones.
## Read more about it here.
##
## See: http://erlang.org/doc/man/ssl.html
##
## Value: on | off
## listener.ssl.external.reuse_sessions = on
## An important security setting, it forces the cipher to be set based
## on the server-specified order instead of the client-specified order,
## hence enforcing the (usually more properly configured) security
## ordering of the server administrator.
##
## Value: on | off
## listener.ssl.external.honor_cipher_order = on
## Use the CN, DN or CRT field from the client certificate as a username.
## Notice that 'verify' should be set as 'verify_peer'.
##
## Value: cn | dn | crt
## listener.ssl.external.peer_cert_as_username = cn
## TCP backlog for the SSL connection.
##
## See listener.tcp.$name.backlog
##
## Value: Number >= 0
## listener.ssl.external.backlog = 1024
## The TCP send timeout for the SSL connection.
##
## See listener.tcp.$name.send_timeout
##
## Value: Duration
## listener.ssl.external.send_timeout = 15s
## Close the SSL connection if send timeout.
##
## See: listener.tcp.$name.send_timeout_close
##
## Value: on | off
## listener.ssl.external.send_timeout_close = on
## The TCP receive buffer(os kernel) for the SSL connections.
##
## See: listener.tcp.$name.recbuf
##
## Value: Bytes
## listener.ssl.external.recbuf = 4KB
## The TCP send buffer(os kernel) for internal MQTT connections.
##
## See: listener.tcp.$name.sndbuf
##
## Value: Bytes
## listener.ssl.external.sndbuf = 4KB
## The size of the user-level software buffer used by the driver.
##
## See: listener.tcp.$name.buffer
##
## Value: Bytes
## listener.ssl.external.buffer = 4KB
## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled.
##
## See: listener.tcp.$name.tune_buffer
##
## Value: on | off
## listener.ssl.external.tune_buffer = off
## The TCP_NODELAY flag for SSL connections.
##
## See: listener.tcp.$name.nodelay
##
## Value: true | false
## listener.ssl.external.nodelay = true
## The SO_REUSEADDR flag for MQTT/SSL Listener.
##
## Value: true | false
listener.ssl.external.reuseaddr = true
##--------------------------------------------------------------------
## External WebSocket listener for MQTT protocol
## listener.ws.$name is the IP address and port that the MQTT/WebSocket
## listener will bind.
##
## Value: IP:Port | Port
##
## Examples: 8083, 127.0.0.1:8083, ::1:8083
listener.ws.external = 8083
## The path of WebSocket MQTT endpoint
##
## Value: URL Path
listener.ws.external.mqtt_path = /mqtt
## The acceptor pool for external MQTT/WebSocket listener.
##
## Value: Number
listener.ws.external.acceptors = 4
## Maximum number of concurrent MQTT/WebSocket connections.
##
## Value: Number
listener.ws.external.max_connections = 102400
## Maximum MQTT/WebSocket connections per second.
##
## Value: Number
listener.ws.external.max_conn_rate = 1000
## Simulate the {active, N} option for the MQTT/WebSocket connections.
##
## Value: Number
listener.ws.external.active_n = 100
## Zone of the external MQTT/WebSocket listener belonged to.
##
## Value: String
listener.ws.external.zone = external
## The access control for the MQTT/WebSocket listener.
##
## See: listener.ws.$name.access
##
## Value: ACL Rule
listener.ws.external.access.1 = allow all
## Verify if the protocol header is valid. Turn off for WeChat MiniApp.
##
## Value: on | off
listener.ws.external.verify_protocol_header = on
## Enable the Proxy Protocol V1/2 if the EMQ cluster is deployed behind
## HAProxy or Nginx.
##
## See: listener.ws.$name.proxy_protocol
##
## Value: on | off
## listener.ws.external.proxy_protocol = on
## Sets the timeout for proxy protocol.
##
## See: listener.ws.$name.proxy_protocol_timeout
##
## Value: Duration
## listener.ws.external.proxy_protocol_timeout = 3s
## The TCP backlog of external MQTT/WebSocket Listener.
##
## See: listener.ws.$name.backlog
##
## Value: Number >= 0
listener.ws.external.backlog = 1024
## The TCP send timeout for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.send_timeout
##
## Value: Duration
listener.ws.external.send_timeout = 15s
## Close the MQTT/WebSocket connection if send timeout.
##
## See: listener.ws.$name.send_timeout_close
##
## Value: on | off
listener.ws.external.send_timeout_close = on
## The TCP receive buffer(os kernel) for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.recbuf
##
## Value: Bytes
## listener.ws.external.recbuf = 2KB
## The TCP send buffer(os kernel) for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.sndbuf
##
## Value: Bytes
## listener.ws.external.sndbuf = 2KB
## The size of the user-level software buffer used by the driver.
##
## See: listener.ws.$name.buffer
##
## Value: Bytes
## listener.ws.external.buffer = 2KB
## Sets the 'buffer = max(sndbuf, recbuf)' if this option is enabled.
##
## See: listener.ws.$name.tune_buffer
##
## Value: on | off
## listener.ws.external.tune_buffer = off
## The TCP_NODELAY flag for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.nodelay
##
## Value: true | false
listener.ws.external.nodelay = true
## The compress flag for external MQTT/WebSocket connections.
##
## If this Value is set true,the websocket message would be compressed
##
## Value: true | false
## listener.ws.external.compress = true
## The level of deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.level
##
## Value: none | default | best_compression | best_speed
## listener.ws.external.deflate_opts.level = default
## The mem_level of deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.mem_level
##
## Valid range is 1-9
## listener.ws.external.deflate_opts.mem_level = 8
## The strategy of deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.strategy
##
## Value: default | filtered | huffman_only | rle
## listener.ws.external.deflate_opts.strategy = default
## The deflate option for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.server_context_takeover
##
## Value: takeover | no_takeover
## listener.ws.external.deflate_opts.server_context_takeover = takeover
## The deflate option for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.client_context_takeover
##
## Value: takeover | no_takeover
## listener.ws.external.deflate_opts.client_context_takeover = takeover
## The deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.server_max_window_bits
##
## Valid range is 8-15
## listener.ws.external.deflate_opts.server_max_window_bits = 15
## The deflate options for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.deflate_opts.client_max_window_bits
##
## Valid range is 8-15
## listener.ws.external.deflate_opts.client_max_window_bits = 15
## The idle timeout for external MQTT/WebSocket connections.
##
## See: listener.ws.$name.idle_timeout
##
## Value: Duration
## listener.ws.external.idle_timeout = 60s
## The max frame size for external MQTT/WebSocket connections.
##
##
## Value: Number
## listener.ws.external.max_frame_size = 0
## Whether a WebSocket message is allowed to contain multiple MQTT packets
##
## Value: single | multiple
listener.ws.external.mqtt_piggyback = multiple
##--------------------------------------------------------------------
## External WebSocket/SSL listener for MQTT Protocol
## listener.wss.$name is the IP address and port that the MQTT/WebSocket/SSL
## listener will bind.
##
## Value: IP:Port | Port
##
## Examples: 8084, 127.0.0.1:8084, ::1:8084
listener.wss.external = 8084
## The path of WebSocket MQTT endpoint
##
## Value: URL Path
listener.wss.external.mqtt_path = /mqtt
## The acceptor pool for external MQTT/WebSocket/SSL listener.
##
## Value: Number
listener.wss.external.acceptors = 4
## Maximum number of concurrent MQTT/Webwocket/SSL connections.
##
## Value: Number
listener.wss.external.max_connections = 16
## Maximum MQTT/WebSocket/SSL connections per second.
##
## See: listener.tcp.$name.max_conn_rate
##
## Value: Number
listener.wss.external.max_conn_rate = 1000
## Simulate the {active, N} option for the MQTT/WebSocket/SSL connections.
##
## Value: Number
listener.wss.external.active_n = 100
## Zone of the external MQTT/WebSocket/SSL listener belonged to.
##
## Value: String
listener.wss.external.zone = external
## The access control rules for the MQTT/WebSocket/SSL listener.
##
## See: listener.tcp.$name.access.<no>
##
## Value: ACL Rule
listener.wss.external.access.1 = allow all
## See: listener.ws.external.verify_protocol_header
##
## Value: on | off
listener.wss.external.verify_protocol_header = on
## Enable the Proxy Protocol V1/2 support.
##
## See: listener.tcp.$name.proxy_protocol
##
## Value: on | off
## listener.wss.external.proxy_protocol = on
## Sets the timeout for proxy protocol.
##
## See: listener.tcp.$name.proxy_protocol_timeout
##
## Value: Duration
## listener.wss.external.proxy_protocol_timeout = 3s
## TLS versions only to protect from POODLE attack.
##
## See: listener.ssl.$name.tls_versions
##
## Value: String, seperated by ','
## listener.wss.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## Path to the file containing the user's private PEM-encoded key.
##
## See: listener.ssl.$name.keyfile
##
## Value: File
listener.wss.external.keyfile = {{ platform_etc_dir }}/certs/key.pem
## Path to a file containing the user certificate.
##
## See: listener.ssl.$name.certfile
##
## Value: File
listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
## Path to the file containing PEM-encoded CA certificates.
##
## See: listener.ssl.$name.cacert
##
## Value: File
## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## See: listener.ssl.$name.dhfile
##
## Value: File
## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem
## See: listener.ssl.$name.vefify
##
## Value: vefify_peer | verify_none
## listener.wss.external.verify = verify_peer
## See: listener.ssl.$name.fail_if_no_peer_cert
##
## Value: false | true
## listener.wss.external.fail_if_no_peer_cert = true
## See: listener.ssl.$name.ciphers
##
## Value: Ciphers
listener.wss.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA
## Ciphers for TLS PSK.
## Note that 'listener.wss.external.ciphers' and 'listener.wss.external.psk_ciphers' cannot
## be configured at the same time.
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
## listener.wss.external.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## See: listener.ssl.$name.secure_renegotiate
##
## Value: on | off
## listener.wss.external.secure_renegotiate = off
## See: listener.ssl.$name.reuse_sessions
##
## Value: on | off
## listener.wss.external.reuse_sessions = on
## See: listener.ssl.$name.honor_cipher_order
##
## Value: on | off
## listener.wss.external.honor_cipher_order = on
## See: listener.ssl.$name.peer_cert_as_username
##
## Value: cn | dn | crt
## listener.wss.external.peer_cert_as_username = cn
## TCP backlog for the WebSocket/SSL connection.
##
## See: listener.tcp.$name.backlog
##
## Value: Number >= 0
listener.wss.external.backlog = 1024
## The TCP send timeout for the WebSocket/SSL connection.
##
## See: listener.tcp.$name.send_timeout
##
## Value: Duration
listener.wss.external.send_timeout = 15s
## Close the WebSocket/SSL connection if send timeout.
##
## See: listener.tcp.$name.send_timeout_close
##
## Value: on | off
listener.wss.external.send_timeout_close = on
## The TCP receive buffer(os kernel) for the WebSocket/SSL connections.
##
## See: listener.tcp.$name.recbuf
##
## Value: Bytes
## listener.wss.external.recbuf = 4KB
## The TCP send buffer(os kernel) for the WebSocket/SSL connections.
##
## See: listener.tcp.$name.sndbuf
##
## Value: Bytes
## listener.wss.external.sndbuf = 4KB
## The size of the user-level software buffer used by the driver.
##
## See: listener.tcp.$name.buffer
##
## Value: Bytes
## listener.wss.external.buffer = 4KB
## The TCP_NODELAY flag for WebSocket/SSL connections.
##
## See: listener.tcp.$name.nodelay
##
## Value: true | false
## listener.wss.external.nodelay = true
## The compress flag for external WebSocket/SSL connections.
##
## If this Value is set true,the websocket message would be compressed
##
## Value: true | false
## listener.wss.external.compress = true
## The level of deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.level
##
## Value: none | default | best_compression | best_speed
## listener.wss.external.deflate_opts.level = default
## The mem_level of deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.mem_level
##
## Valid range is 1-9
## listener.wss.external.deflate_opts.mem_level = 8
## The strategy of deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.strategy
##
## Value: default | filtered | huffman_only | rle
## listener.wss.external.deflate_opts.strategy = default
## The deflate option for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.server_context_takeover
##
## Value: takeover | no_takeover
## listener.wss.external.deflate_opts.server_context_takeover = takeover
## The deflate option for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.client_context_takeover
##
## Value: takeover | no_takeover
## listener.wss.external.deflate_opts.client_context_takeover = takeover
## The deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.server_max_window_bits
##
## Valid range is 8-15
## listener.wss.external.deflate_opts.server_max_window_bits = 15
## The deflate options for external WebSocket/SSL connections.
##
## See: listener.wss.$name.deflate_opts.client_max_window_bits
##
## Valid range is 8-15
## listener.wss.external.deflate_opts.client_max_window_bits = 15
## The idle timeout for external WebSocket/SSL connections.
##
## See: listener.wss.$name.idle_timeout
##
## Value: Duration
## listener.wss.external.idle_timeout = 60s
## The max frame size for external WebSocket/SSL connections.
##
## Value: Number
## listener.wss.external.max_frame_size = 0
## Whether a WebSocket message is allowed to contain multiple MQTT packets
##
## Value: single | multiple
listener.wss.external.mqtt_piggyback = multiple

170
etc/logger.conf Normal file
View File

@ -0,0 +1,170 @@
##--------------------------------------------------------------------
## Log
##--------------------------------------------------------------------
## Where to emit the logs.
## Enable the console (standard output) logs.
##
## Value: off | file | console | both
## - off: disable logs entirely
## - file: write logs only to file
## - console: write logs only to standard I/O
## - both: write logs both to file and standard I/O
log.to = both
## The log severity level.
##
## Value: debug | info | notice | warning | error | critical | alert | emergency
##
## Note: Only the messages with severity level higher than or equal to
## this level will be logged.
##
## Default: warning
log.level = warning
## The dir for log files.
##
## Value: Folder
log.dir = {{ platform_log_dir }}
## The log filename for logs of level specified in "log.level".
##
## If `log.rotation` is enabled, this is the base name of the
## files. Each file in a rotated log is named <base_name>.N, where N is an integer.
##
## Value: String
## Default: emqx.log
log.file = emqx.log
## Limits the total number of characters printed for each log event.
##
## Value: Integer
## Default: No Limit
#log.chars_limit = 8192
## Enables the log rotation.
## With this enabled, new log files will be created when the current
## log file is full, max to `log.rotation.size` files will be created.
##
## Value: on | off
## Default: on
log.rotation = on
## Maximum size of each log file.
##
## Value: Number
## Default: 10M
## Supported Unit: KB | MB | GB
log.rotation.size = 10MB
## Maximum rotation count of log files.
##
## Value: Number
## Default: 5
log.rotation.count = 5
## To create additional log files for specific log levels.
##
## Value: File Name
## Format: log.$level.file = $filename,
## where "$level" can be one of: debug, info, notice, warning,
## error, critical, alert, emergency
## Note: Log files for a specific log level will only contain all the logs
## that higher than or equal to that level
##
#log.info.file = info.log
#log.error.file = error.log
## The max allowed queue length before switching to sync mode.
##
## Log overload protection parameter. If the message queue grows
## larger than this value the handler switches from anync to sync mode.
##
## Default: 100
##
#log.sync_mode_qlen = 100
## The max allowed queue length before switching to drop mode.
##
## Log overload protection parameter. When the message queue grows
## larger than this threshold, the handler switches to a mode in which
## it drops all new events that senders want to log.
##
## Default: 3000
##
#log.drop_mode_qlen = 3000
## The max allowed queue length before switching to flush mode.
##
## Log overload protection parameter. If the length of the message queue
## grows larger than this threshold, a flush (delete) operation takes place.
## To flush events, the handler discards the messages in the message queue
## by receiving them in a loop without logging.
##
## Default: 8000
##
#log.flush_qlen = 8000
## Kill the log handler when it gets overloaded.
##
## Log overload protection parameter. It is possible that a handler,
## even if it can successfully manage peaks of high load without crashing,
## can build up a large message queue, or use a large amount of memory.
## We could kill the log handler in these cases and restart it after a
## few seconds.
##
## Default: on
##
#log.overload_kill = on
## The max allowed queue length before killing the log hanlder.
##
## Log overload protection parameter. This is the maximum allowed queue
## length. If the message queue grows larger than this, the handler
## process is terminated.
##
## Default: 20000
##
#log.overload_kill_qlen = 20000
## The max allowed memory size before killing the log hanlder.
##
## Log overload protection parameter. This is the maximum memory size
## that the handler process is allowed to use. If the handler grows
## larger than this, the process is terminated.
##
## Default: 30MB
##
#log.overload_kill_mem_size = 30MB
## Restart the log hanlder after some seconds.
##
## Log overload protection parameter. If the handler is terminated,
## it restarts automatically after a delay specified in seconds.
## The value "infinity" prevents restarts.
##
## Default: 5s
##
#log.overload_kill_restart_after = 5s
## Max burst count and time window for burst control.
##
## Log overload protection parameter. Large bursts of log events - many
## events received by the handler under a short period of time - can
## potentially cause problems. By specifying the maximum number of events
## to be handled within a certain time frame, the handler can avoid
## choking the log with massive amounts of printouts.
##
## This config controls the maximum number of events to handle within
## a time frame. After the limit is reached, successive events are
## dropped until the end of the time frame.
##
## Note that there would be no warning if any messages were
## dropped because of burst control.
##
## Comment this config out to disable the burst control feature.
##
## Value: MaxBurstCount,TimeWindow
## Default: disabled
##
#log.burst_limit = 20000, 1s

98
etc/rpc.conf Normal file
View File

@ -0,0 +1,98 @@
##--------------------------------------------------------------------
## RPC
##--------------------------------------------------------------------
## RPC Mode.
##
## Value: sync | async
rpc.mode = async
## Max batch size of async RPC requests.
##
## Value: Integer
## Zero or negative value disables rpc batching.
##
## NOTE: RPC batch won't work when rpc.mode = sync
rpc.async_batch_size = 256
## RPC port discovery
##
## The strategy for discovering the RPC listening port of other nodes.
##
## Value: Enum
## - manual: discover ports by `tcp_server_port` and `tcp_client_port`.
## - stateless: discover ports in a stateless manner.
## If node name is `emqx<N>@127.0.0.1`, where the `<N>` is an integer,
## then the listening port will be `5370 + <N>`
##
## Defaults to `stateless`.
rpc.port_discovery = stateless
## TCP server port for RPC.
##
## Only takes effect when `rpc.port_discovery` = `manual`.
##
## Value: Port [1024-65535]
#rpc.tcp_server_port = 5369
## TCP port for outgoing RPC connections.
##
## Only takes effect when `rpc.port_discovery` = `manual`.
##
## Value: Port [1024-65535]
#rpc.tcp_client_port = 5369
## Number of outgoing RPC connections.
##
## Value: Interger [1-256]
## Defaults to NumberOfCPUSchedulers / 2
#rpc.tcp_client_num = 1
## RCP Client connect timeout.
##
## Value: Seconds
rpc.connect_timeout = 5s
## TCP send timeout of RPC client and server.
##
## Value: Seconds
rpc.send_timeout = 5s
## Authentication timeout
##
## Value: Seconds
rpc.authentication_timeout = 5s
## Default receive timeout for call() functions
##
## Value: Seconds
rpc.call_receive_timeout = 15s
## Socket idle keepalive.
##
## Value: Seconds
rpc.socket_keepalive_idle = 900s
## TCP Keepalive probes interval.
##
## Value: Seconds
rpc.socket_keepalive_interval = 75s
## Probes lost to close the connection
##
## Value: Integer
rpc.socket_keepalive_count = 9
## Size of TCP send buffer.
##
## Value: Bytes
rpc.socket_sndbuf = 1MB
## Size of TCP receive buffer.
##
## Value: Seconds
rpc.socket_recbuf = 1MB
## Size of user-level software socket buffer.
##
## Value: Seconds
rpc.socket_buffer = 1MB

148
etc/sys_mon.conf Normal file
View File

@ -0,0 +1,148 @@
##--------------------------------------------------------------------
## System Monitor
##--------------------------------------------------------------------
## Enable Long GC monitoring. Disable if the value is 0.
## Notice: don't enable the monitor in production for:
## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
##
## Value: Duration
## - h: hour
## - m: minute
## - s: second
## - ms: milliseconds
##
## Examples:
## - 2h: 2 hours
## - 30m: 30 minutes
## - 0.1s: 0.1 seconds
## - 100ms : 100 milliseconds
##
## Default: 0ms
sysmon.long_gc = 0
## Enable Long Schedule(ms) monitoring.
##
## See: http://erlang.org/doc/man/erlang.html#system_monitor-2
##
## Value: Duration
## - h: hour
## - m: minute
## - s: second
## - ms: milliseconds
##
## Examples:
## - 2h: 2 hours
## - 30m: 30 minutes
## - 0.1s: 0.1 seconds
## - 100ms: 100 milliseconds
##
## Default: 0ms
sysmon.long_schedule = 240ms
## Enable Large Heap monitoring.
##
## See: http://erlang.org/doc/man/erlang.html#system_monitor-2
##
## Value: bytes
##
## Default: 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
sysmon.large_heap = 8MB
## Enable Busy Port monitoring.
##
## See: http://erlang.org/doc/man/erlang.html#system_monitor-2
##
## Value: true | false
sysmon.busy_port = false
## Enable Busy Dist Port monitoring.
##
## See: http://erlang.org/doc/man/erlang.html#system_monitor-2
##
## Value: true | false
sysmon.busy_dist_port = true
## The time interval for the periodic cpu check
##
## Value: Duration
## -h: hour, e.g. '2h' for 2 hours
## -m: minute, e.g. '5m' for 5 minutes
## -s: second, e.g. '30s' for 30 seconds
##
## Default: 60s
os_mon.cpu_check_interval = 60s
## The threshold, as percentage of system cpu, for how much system cpu can be used before the corresponding alarm is set.
##
## Default: 80%
os_mon.cpu_high_watermark = 80%
## The threshold, as percentage of system cpu, for how much system cpu can be used before the corresponding alarm is clear.
##
## Default: 60%
os_mon.cpu_low_watermark = 60%
## The time interval for the periodic memory check
##
## Value: Duration
## -h: hour, e.g. '2h' for 2 hours
## -m: minute, e.g. '5m' for 5 minutes
## -s: second, e.g. '30s' for 30 seconds
##
## Default: 60s
os_mon.mem_check_interval = 60s
## The threshold, as percentage of system memory, for how much system memory can be allocated before the corresponding alarm is set.
##
## Default: 70%
os_mon.sysmem_high_watermark = 70%
## The threshold, as percentage of system memory, for how much system memory can be allocated by one Erlang process before the corresponding alarm is set.
##
## Default: 5%
os_mon.procmem_high_watermark = 5%
## The time interval for the periodic process limit check
##
## Value: Duration
##
## Default: 30s
vm_mon.check_interval = 30s
## The threshold, as percentage of processes, for how many processes can simultaneously exist at the local node before the corresponding alarm is set.
##
## Default: 80%
vm_mon.process_high_watermark = 80%
## The threshold, as percentage of processes, for how many processes can simultaneously exist at the local node before the corresponding alarm is clear.
##
## Default: 60%
vm_mon.process_low_watermark = 60%
## Specifies the actions to take when an alarm is activated
##
## Value: String
## - log
## - publish
##
## Default: log,publish
alarm.actions = log,publish
## The maximum number of deactivated alarms
##
## Value: Integer
##
## Default: 1000
alarm.size_limit = 1000
## Validity Period of deactivated alarms
##
## Value: Duration
## - h: hour
## - m: minute
## - s: second
## - ms: milliseconds
##
## Default: 24h
alarm.validity_period = 24h

327
etc/zones.conf Normal file
View File

@ -0,0 +1,327 @@
##--------------------------------------------------------------------
## Zones
##--------------------------------------------------------------------
##--------------------------------------------------------------------
## External Zone
## Idle timeout of the external MQTT connections.
##
## Value: duration
zone.external.idle_timeout = 15s
## Enable ACL check.
##
## Value: Flag
zone.external.enable_acl = on
## Enable ban check.
##
## Value: Flag
zone.external.enable_ban = on
## Enable per connection statistics.
##
## Value: on | off
zone.external.enable_stats = on
## The action when acl check reject current operation
##
## Value: ignore | disconnect
## Default: ignore
zone.external.acl_deny_action = ignore
## Force the MQTT connection process GC after this number of
## messages | bytes passed through.
##
## Numbers delimited by `|'. Zero or negative is to disable.
zone.external.force_gc_policy = 16000|16MB
## Max message queue length and total heap size to force shutdown
## connection/session process.
## Message queue here is the Erlang process mailbox, but not the number
## of queued MQTT messages of QoS 1 and 2.
##
## Numbers delimited by `|'. Zero or negative is to disable.
zone.external.force_shutdown_policy = 10000|32MB
## Maximum MQTT packet size allowed.
##
## Value: Bytes
## Default: 1MB
## zone.external.max_packet_size = 64KB
## Maximum length of MQTT clientId allowed.
##
## Value: Number [23-65535]
## zone.external.max_clientid_len = 1024
## Maximum topic levels allowed. 0 means no limit.
##
## Value: Number
## zone.external.max_topic_levels = 7
## Maximum QoS allowed.
##
## Value: 0 | 1 | 2
## zone.external.max_qos_allowed = 2
## Maximum Topic Alias, 0 means no limit.
##
## Value: 0-65535
## zone.external.max_topic_alias = 65535
## Whether the Server supports retained messages.
##
## Value: boolean
## zone.external.retain_available = true
## Whether the Server supports Wildcard Subscriptions
##
## Value: boolean
## zone.external.wildcard_subscription = false
## Whether the Server supports Shared Subscriptions
##
## Value: boolean
## zone.external.shared_subscription = false
## Server Keep Alive
##
## Value: Number
## zone.external.server_keepalive = 0
## The backoff for MQTT keepalive timeout. The broker will kick a connection out
## until 'Keepalive * backoff * 2' timeout.
##
## Value: Float > 0.5
zone.external.keepalive_backoff = 0.75
## Maximum number of subscriptions allowed, 0 means no limit.
##
## Value: Number
zone.external.max_subscriptions = 0
## Force to upgrade QoS according to subscription.
##
## Value: on | off
zone.external.upgrade_qos = off
## Maximum size of the Inflight Window storing QoS1/2 messages delivered but unacked.
##
## Value: Number
zone.external.max_inflight = 32
## Retry interval for QoS1/2 message delivering.
##
## Value: Duration
zone.external.retry_interval = 30s
## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL, 0 means no limit.
##
## Value: Number
zone.external.max_awaiting_rel = 100
## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout.
##
## Value: Duration
zone.external.await_rel_timeout = 300s
## Default session expiry interval for MQTT V3.1.1 connections.
##
## Value: Duration
## -d: day
## -h: hour
## -m: minute
## -s: second
##
## Default: 2h, 2 hours
zone.external.session_expiry_interval = 2h
## Maximum queue length. Enqueued messages when persistent client disconnected,
## or inflight window is full. 0 means no limit.
##
## Value: Number >= 0
zone.external.max_mqueue_len = 1000
## Topic priorities.
## 'none' to indicate no priority table (by default), hence all messages
## are treated equal
##
## Priority number [1-255]
## Example: topic/1=10,topic/2=8
## NOTE: comma and equal signs are not allowed for priority topic names
## NOTE: messages for topics not in the priority table are treated as
## either highest or lowest priority depending on the configured
## value for mqueue_default_priority
##
zone.external.mqueue_priorities = none
## Default to highest priority for topics not matching priority table
##
## Value: highest | lowest
zone.external.mqueue_default_priority = highest
## Whether to enqueue QoS0 messages.
##
## Value: false | true
zone.external.mqueue_store_qos0 = true
## Whether to turn on flapping detect
##
## Value: on | off
zone.external.enable_flapping_detect = off
## Message limit for the a external MQTT connection.
##
## Value: Number,Duration
## Example: 100 messages per 10 seconds.
#zone.external.rate_limit.conn_messages_in = 100,10s
## Bytes limit for a external MQTT connections.
##
## Value: Number,Duration
## Example: 100KB incoming per 10 seconds.
#zone.external.rate_limit.conn_bytes_in = 100KB,10s
## Messages quota for the each of external MQTT connection.
## This value consumed by the number of recipient on a message.
##
## Value: Number, Duration
##
## Example: 100 messaegs per 1s
#zone.external.quota.conn_messages_routing = 100,1s
## Messages quota for the all of external MQTT connections.
## This value consumed by the number of recipient on a message.
##
## Value: Number, Duration
##
## Example: 200000 messaegs per 1s
#zone.external.quota.overall_messages_routing = 200000,1s
## All the topics will be prefixed with the mountpoint path if this option is enabled.
##
## Variables in mountpoint path:
## - %c: clientid
## - %u: username
##
## Value: String
## zone.external.mountpoint = devicebound/
## Whether use username replace client id
##
## Value: boolean
## Default: false
zone.external.use_username_as_clientid = false
## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
##
## Value: true | false
zone.external.ignore_loop_deliver = false
## Whether to parse the MQTT frame in strict mode
##
## Value: true | false
zone.external.strict_mode = false
## Specify the response information returned to the client
##
## Value: String
## zone.external.response_information = example
##--------------------------------------------------------------------
## Internal Zone
zone.internal.allow_anonymous = true
## Enable per connection stats.
##
## Value: Flag
zone.internal.enable_stats = on
## Enable ACL check.
##
## Value: Flag
zone.internal.enable_acl = off
## The action when acl check reject current operation
##
## Value: ignore | disconnect
## Default: ignore
zone.internal.acl_deny_action = ignore
## See zone.$name.force_gc_policy
## zone.internal.force_gc_policy = 128000|128MB
## See zone.$name.wildcard_subscription.
##
## Value: boolean
## zone.internal.wildcard_subscription = true
## See zone.$name.shared_subscription.
##
## Value: boolean
## zone.internal.shared_subscription = true
## See zone.$name.max_subscriptions.
##
## Value: Integer
zone.internal.max_subscriptions = 0
## See zone.$name.max_inflight
##
## Value: Number
zone.internal.max_inflight = 128
## See zone.$name.max_awaiting_rel
##
## Value: Number
zone.internal.max_awaiting_rel = 1000
## See zone.$name.max_mqueue_len
##
## Value: Number >= 0
zone.internal.max_mqueue_len = 10000
## Whether to enqueue Qos0 messages.
##
## Value: false | true
zone.internal.mqueue_store_qos0 = true
## Whether to turn on flapping detect
##
## Value: on | off
zone.internal.enable_flapping_detect = off
## See zone.$name.force_shutdown_policy
zone.internal.force_shutdown_policy = 128000|128MB
## All the topics will be prefixed with the mountpoint path if this option is enabled.
##
## Variables in mountpoint path:
## - %c: clientid
## - %u: username
##
## Value: String
## zone.internal.mountpoint = cloudbound/
## Whether to ignore loop delivery of messages.(for mqtt v3.1.1)
##
## Value: true | false
zone.internal.ignore_loop_deliver = false
## Whether to parse the MQTT frame in strict mode
##
## Value: true | false
zone.internal.strict_mode = false
## Specify the response information returned to the client
##
## Value: String
## zone.internal.response_information = example
## Allow the zone's clients to bypass authentication step
##
## Value: true | false
zone.internal.bypass_auth_plugins = true

View File

@ -689,12 +689,6 @@ end}.
{datatype, {enum, [allow, deny]}}
]}.
%% @doc Default ACL file.
{mapping, "acl_file", "emqx.acl_file", [
{datatype, string},
hidden
]}.
%% @doc Enable ACL cache for publish.
{mapping, "enable_acl_cache", "emqx.enable_acl_cache", [
{default, on},
@ -1026,6 +1020,7 @@ end}.
%% of queued MQTT messages of QoS 1 and 2.
%% Zero or negative is to disable.
{mapping, "zone.$name.force_shutdown_policy", "emqx.zones", [
{default, "default"},
{datatype, string}
]}.
@ -1082,6 +1077,17 @@ end}.
count => list_to_integer(Count)}
end,
{force_gc_policy, GcPolicy};
(["force_shutdown_policy"], "default") ->
{DefaultLen, DefaultSize} =
case WordSize = erlang:system_info(wordsize) of
8 -> % arch_64
{10000, cuttlefish_bytesize:parse("32MB")};
4 -> % arch_32
{10000, cuttlefish_bytesize:parse("16MB")}
end,
{force_shutdown_policy, #{message_queue_len => DefaultLen,
max_heap_size => DefaultSize div WordSize
}};
(["force_shutdown_policy"], Val) ->
[Len, Siz] = string:tokens(Val, "| "),
MaxSiz = case WordSize = erlang:system_info(wordsize) of
@ -1551,6 +1557,10 @@ end}.
hidden
]}.
{mapping, "listener.ws.$name.peer_cert_as_username", "emqx.listeners", [
{datatype, {enum, [cn, dn, crt]}}
]}.
%%--------------------------------------------------------------------
%% MQTT/WebSocket/SSL Listeners
@ -1675,6 +1685,9 @@ end}.
{mapping, "listener.wss.$name.cacertfile", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.wss.$name.dhfile", "emqx.listeners", [
{datatype, string}
]}.
{mapping, "listener.wss.$name.verify", "emqx.listeners", [
{datatype, atom}
@ -1915,93 +1928,6 @@ end}.
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
end}.
%%--------------------------------------------------------------------
%% Modules
%%--------------------------------------------------------------------
{mapping, "modules.loaded_file", "emqx.modules_loaded_file", [
{datatype, string}
]}.
{mapping, "module.presence.qos", "emqx.modules", [
{default, 1},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "module.subscription.$id.topic", "emqx.modules", [
{datatype, string}
]}.
{mapping, "module.subscription.$id.qos", "emqx.modules", [
{default, 1},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "module.subscription.$id.nl", "emqx.modules", [
{default, 0},
{datatype, integer},
{validators, ["range:0-1"]}
]}.
{mapping, "module.subscription.$id.rap", "emqx.modules", [
{default, 0},
{datatype, integer},
{validators, ["range:0-1"]}
]}.
{mapping, "module.subscription.$id.rh", "emqx.modules", [
{default, 0},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "module.rewrite.rule.$id", "emqx.modules", [
{datatype, string}
]}.
{mapping, "module.rewrite.pub.rule.$id", "emqx.modules", [
{datatype, string}
]}.
{mapping, "module.rewrite.sub.rule.$id", "emqx.modules", [
{datatype, string}
]}.
{translation, "emqx.modules", fun(Conf, _, Conf1) ->
Subscriptions = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
TopicList = [{N, Topic}|| {[_,"subscription",N,"topic"], Topic} <- List],
[{iolist_to_binary(T), #{ qos => cuttlefish:conf_get("module.subscription." ++ N ++ ".qos", Conf, 0),
nl => cuttlefish:conf_get("module.subscription." ++ N ++ ".nl", Conf, 0),
rap => cuttlefish:conf_get("module.subscription." ++ N ++ ".rap", Conf, 0),
rh => cuttlefish:conf_get("module.subscription." ++ N ++ ".rh", Conf, 0)
}} || {N, T} <- TopicList]
end,
Rewrites = fun() ->
Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf),
PubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.pub.rule", Conf),
SubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.sub.rule", Conf),
TotalRules = lists:append(
[ {["module", "rewrite", "pub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ PubRules,
[ {["module", "rewrite", "sub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ SubRules
),
lists:map(fun({[_, "rewrite", PubOrSub, "rule", I], Rule}) ->
[Topic, Re, Dest] = string:tokens(Rule, " "),
{rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
end, TotalRules)
end,
lists:append([
[{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}],
[{emqx_mod_subscription, Subscriptions()}],
[{emqx_mod_rewrite, Rewrites()}],
[{emqx_mod_topic_metrics, []}],
[{emqx_mod_delayed, []}],
[{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}]
])
end}.
%%-------------------------------------------------------------------
%% Plugins
%%-------------------------------------------------------------------

View File

@ -6,7 +6,7 @@
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.3"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}

View File

@ -1,155 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_access_rule).
-include("emqx.hrl").
%% APIs
-export([ match/3
, compile/1
]).
-export_type([rule/0]).
-type(acl_result() :: allow | deny).
-type(who() :: all | binary() |
{client, binary()} |
{user, binary()} |
{ipaddr, esockd_cidr:cidr_string()}).
-type(access() :: subscribe | publish | pubsub).
-type(rule() :: {acl_result(), all} |
{acl_result(), who(), access(), list(emqx_topic:topic())}).
-define(ALLOW_DENY(A), ((A =:= allow) orelse (A =:= deny))).
-define(PUBSUB(A), ((A =:= subscribe) orelse (A =:= publish) orelse (A =:= pubsub))).
%% @doc Compile Access Rule.
compile({A, all}) when ?ALLOW_DENY(A) ->
{A, all};
compile({A, Who, Access, Topic}) when ?ALLOW_DENY(A), ?PUBSUB(Access), is_binary(Topic) ->
{A, compile(who, Who), Access, [compile(topic, Topic)]};
compile({A, Who, Access, TopicFilters}) when ?ALLOW_DENY(A), ?PUBSUB(Access) ->
{A, compile(who, Who), Access, [compile(topic, Topic) || Topic <- TopicFilters]}.
compile(who, all) ->
all;
compile(who, {ipaddr, CIDR}) ->
{ipaddr, esockd_cidr:parse(CIDR, true)};
compile(who, {client, all}) ->
{client, all};
compile(who, {client, ClientId}) ->
{client, bin(ClientId)};
compile(who, {user, all}) ->
{user, all};
compile(who, {user, Username}) ->
{user, bin(Username)};
compile(who, {'and', Conds}) when is_list(Conds) ->
{'and', [compile(who, Cond) || Cond <- Conds]};
compile(who, {'or', Conds}) when is_list(Conds) ->
{'or', [compile(who, Cond) || Cond <- Conds]};
compile(topic, {eq, Topic}) ->
{eq, emqx_topic:words(bin(Topic))};
compile(topic, Topic) ->
Words = emqx_topic:words(bin(Topic)),
case 'pattern?'(Words) of
true -> {pattern, Words};
false -> Words
end.
'pattern?'(Words) ->
lists:member(<<"%u">>, Words)
orelse lists:member(<<"%c">>, Words).
bin(L) when is_list(L) ->
list_to_binary(L);
bin(B) when is_binary(B) ->
B.
%% @doc Match access rule
-spec(match(emqx_types:clientinfo(), emqx_types:topic(), rule())
-> {matched, allow} | {matched, deny} | nomatch).
match(_ClientInfo, _Topic, {AllowDeny, all}) when ?ALLOW_DENY(AllowDeny) ->
{matched, AllowDeny};
match(ClientInfo, Topic, {AllowDeny, Who, _PubSub, TopicFilters})
when ?ALLOW_DENY(AllowDeny) ->
case match_who(ClientInfo, Who)
andalso match_topics(ClientInfo, Topic, TopicFilters) of
true -> {matched, AllowDeny};
false -> nomatch
end.
match_who(_ClientInfo, all) ->
true;
match_who(_ClientInfo, {user, all}) ->
true;
match_who(_ClientInfo, {client, all}) ->
true;
match_who(#{clientid := ClientId}, {client, ClientId}) ->
true;
match_who(#{username := Username}, {user, Username}) ->
true;
match_who(#{peerhost := undefined}, {ipaddr, _Tup}) ->
false;
match_who(#{peerhost := IP}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR);
match_who(ClientInfo, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->
match_who(ClientInfo, Who) andalso Allow
end, true, Conds);
match_who(ClientInfo, {'or', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) ->
match_who(ClientInfo, Who) orelse Allow
end, false, Conds);
match_who(_ClientInfo, _Who) ->
false.
match_topics(_ClientInfo, _Topic, []) ->
false;
match_topics(ClientInfo, Topic, [{pattern, PatternFilter}|Filters]) ->
TopicFilter = feed_var(ClientInfo, PatternFilter),
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(ClientInfo, Topic, Filters);
match_topics(ClientInfo, Topic, [TopicFilter|Filters]) ->
match_topic(emqx_topic:words(Topic), TopicFilter)
orelse match_topics(ClientInfo, Topic, Filters).
match_topic(Topic, {eq, TopicFilter}) ->
Topic == TopicFilter;
match_topic(Topic, TopicFilter) ->
emqx_topic:match(Topic, TopicFilter).
feed_var(ClientInfo, Pattern) ->
feed_var(ClientInfo, Pattern, []).
feed_var(_ClientInfo, [], Acc) ->
lists:reverse(Acc);
feed_var(ClientInfo = #{clientid := undefined}, [<<"%c">>|Words], Acc) ->
feed_var(ClientInfo, Words, [<<"%c">>|Acc]);
feed_var(ClientInfo = #{clientid := ClientId}, [<<"%c">>|Words], Acc) ->
feed_var(ClientInfo, Words, [ClientId |Acc]);
feed_var(ClientInfo = #{username := undefined}, [<<"%u">>|Words], Acc) ->
feed_var(ClientInfo, Words, [<<"%u">>|Acc]);
feed_var(ClientInfo = #{username := Username}, [<<"%u">>|Words], Acc) ->
feed_var(ClientInfo, Words, [Username|Acc]);
feed_var(ClientInfo, [W|Words], Acc) ->
feed_var(ClientInfo, Words, [W|Acc]).

View File

@ -200,10 +200,10 @@ handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions,
ok
end,
Alarm = #deactivated_alarm{activate_at = ActivateAt,
name = Name,
details = Details,
message = Message,
deactivate_at = erlang:system_time(microsecond)},
name = Name,
details = Details,
message = Message,
deactivate_at = erlang:system_time(microsecond)},
mnesia:dirty_delete(?ACTIVATED_ALARM, Name),
mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm),
do_actions(deactivate, Alarm, Actions),

View File

@ -32,12 +32,11 @@ start(_Type, _Args) ->
print_banner(),
ekka:start(),
{ok, Sup} = emqx_sup:start_link(),
ok = emqx_modules:load(),
ok = emqx_plugins:init(),
emqx_plugins:load(),
start_autocluster(),
emqx_boot:is_enabled(listeners)
andalso (ok = emqx_listeners:start()),
start_autocluster(),
ok = emqx_plugins:init(),
emqx_plugins:load(),
register(emqx, self()),
emqx_alarm_handler:load(),
print_vsn(),
@ -47,8 +46,7 @@ start(_Type, _Args) ->
stop(_State) ->
emqx_alarm_handler:unload(),
emqx_boot:is_enabled(listeners)
andalso emqx_listeners:stop(),
emqx_modules:unload().
andalso emqx_listeners:stop().
%%--------------------------------------------------------------------
%% Print Banner

View File

@ -648,17 +648,21 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
-spec(handle_deliver(list(emqx_types:deliver()), channel())
-> {ok, channel()} | {ok, replies(), channel()}).
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
session = Session}) ->
NSession = emqx_session:enqueue(maybe_nack(Delivers), Session),
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
{ok, Channel#channel{session = NSession}};
handle_deliver(Delivers, Channel = #channel{takeover = true,
pendings = Pendings}) ->
NPendings = lists:append(Pendings, maybe_nack(Delivers)),
pendings = Pendings,
session = Session,
clientinfo = #{clientid := ClientId}}) ->
NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)),
{ok, Channel#channel{pendings = NPendings}};
handle_deliver(Delivers, Channel = #channel{session = Session}) ->
case emqx_session:deliver(Delivers, Session) of
handle_deliver(Delivers, Channel = #channel{session = Session,
clientinfo = #{clientid := ClientId}}) ->
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
{ok, Publishes, NSession} ->
NChannel = Channel#channel{session = NSession},
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
@ -666,6 +670,19 @@ handle_deliver(Delivers, Channel = #channel{session = Session}) ->
{ok, Channel#channel{session = NSession}}
end.
ignore_local(Delivers, Subscriber, Session) ->
Subs = emqx_session:info(subscriptions, Session),
lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) ->
case maps:find(Topic, Subs) of
{ok, #{nl := 1}} when Subscriber =:= Publisher ->
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
true;
_ ->
false
end
end, Delivers).
%% Nack delivers from shared subscription
maybe_nack(Delivers) ->
lists:filter(fun not_nacked/1, Delivers).
@ -782,22 +799,15 @@ do_deliver({pubrel, PacketId}, Channel) ->
do_deliver({PacketId, Msg}, Channel = #channel{clientinfo = ClientInfo =
#{mountpoint := MountPoint}}) ->
case ignore_local(Msg, ClientInfo) of
true ->
ok = emqx_metrics:inc('delivery.dropped'),
ok = emqx_metrics:inc('delivery.dropped.no_local'),
{[], Channel};
false ->
ok = emqx_metrics:inc('messages.delivered'),
Msg1 = emqx_hooks:run_fold('message.delivered',
[ClientInfo],
emqx_message:update_expiry(Msg)
),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
Packet = emqx_message:to_packet(PacketId, Msg2),
{NPacket, NChannel} = packing_alias(Packet, Channel),
{[NPacket], NChannel}
end;
ok = emqx_metrics:inc('messages.delivered'),
Msg1 = emqx_hooks:run_fold('message.delivered',
[ClientInfo],
emqx_message:update_expiry(Msg)
),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
Packet = emqx_message:to_packet(PacketId, Msg2),
{NPacket, NChannel} = packing_alias(Packet, Channel),
{[NPacket], NChannel};
do_deliver([Publish], Channel) ->
do_deliver(Publish, Channel);
@ -810,11 +820,6 @@ do_deliver(Publishes, Channel) when is_list(Publishes) ->
end, {[], Channel}, Publishes),
{lists:reverse(Packets), NChannel}.
ignore_local(#message{flags = #{nl := true}, from = ClientId},
#{clientid := ClientId}) ->
true;
ignore_local(_Msg, _ClientInfo) -> false.
%%--------------------------------------------------------------------
%% Handle out suback
%%--------------------------------------------------------------------

View File

@ -60,7 +60,7 @@ lock(ClientId, Piggyback) ->
unlock(ClientId) ->
ekka_locker:release(?MODULE, ClientId, strategy()).
-spec(strategy() -> local | one | quorum | all).
-spec(strategy() -> local | leader | quorum | all).
strategy() ->
emqx:get_env(session_locking_strategy, quorum).

View File

@ -1,23 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gen_mod).
-callback(load(Opts :: any()) -> ok | {error, term()}).
-callback(unload(State :: term()) -> term()).
-callback(description() -> any()).

View File

@ -1,122 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_acl_internal).
-behaviour(emqx_gen_mod).
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[ACL_INTERNAL]").
%% APIs
-export([ check_acl/5
, rules_from_file/1
]).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, reload/1
, description/0
]).
-type(acl_rules() :: #{publish => [emqx_access_rule:rule()],
subscribe => [emqx_access_rule:rule()]}).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
load(Env) ->
Rules = rules_from_file(proplists:get_value(acl_file, Env)),
emqx_hooks:add('client.check_acl', {?MODULE, check_acl, [Rules]}, -1).
unload(_Env) ->
emqx_hooks:del('client.check_acl', {?MODULE, check_acl}).
reload(Env) ->
emqx_acl_cache:is_enabled() andalso (
lists:foreach(
fun(Pid) -> erlang:send(Pid, clean_acl_cache) end,
emqx_cm:all_channels())),
unload(Env), load(Env).
description() ->
"EMQ X Internal ACL Module".
%%--------------------------------------------------------------------
%% ACL callbacks
%%--------------------------------------------------------------------
%% @doc Check ACL
-spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_topic:topic(),
emqx_access_rule:acl_result(), acl_rules())
-> {ok, allow} | {ok, deny} | ok).
check_acl(Client, PubSub, Topic, _AclResult, Rules) ->
case match(Client, Topic, lookup(PubSub, Rules)) of
{matched, allow} -> {ok, allow};
{matched, deny} -> {ok, deny};
nomatch -> ok
end.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
lookup(PubSub, Rules) ->
maps:get(PubSub, Rules, []).
match(_Client, _Topic, []) ->
nomatch;
match(Client, Topic, [Rule|Rules]) ->
case emqx_access_rule:match(Client, Topic, Rule) of
nomatch ->
match(Client, Topic, Rules);
{matched, AllowDeny} ->
{matched, AllowDeny}
end.
-spec(rules_from_file(file:filename()) -> map()).
rules_from_file(AclFile) ->
case file:consult(AclFile) of
{ok, Terms} ->
Rules = [emqx_access_rule:compile(Term) || Term <- Terms],
#{publish => [Rule || Rule <- Rules, filter(publish, Rule)],
subscribe => [Rule || Rule <- Rules, filter(subscribe, Rule)]};
{error, eacces} ->
?LOG(alert, "Insufficient permissions to read the ~s file", [AclFile]),
#{};
{error, enoent} ->
?LOG(alert, "The ~s file does not exist", [AclFile]),
#{};
{error, Reason} ->
?LOG(alert, "Failed to read ~s: ~p", [AclFile, Reason]),
#{}
end.
filter(_PubSub, {allow, all}) ->
true;
filter(_PubSub, {deny, all}) ->
true;
filter(publish, {_AllowDeny, _Who, publish, _Topics}) ->
true;
filter(_PubSub, {_AllowDeny, _Who, pubsub, _Topics}) ->
true;
filter(subscribe, {_AllowDeny, _Who, subscribe, _Topics}) ->
true;
filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.

View File

@ -1,204 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_delayed).
-behaviour(gen_server).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
-export([ start_link/0
, on_message_publish/1
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-record(delayed_message,
{ key
, msg
}).
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
-define(MAX_INTERVAL, 4294967).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [
{type, ordered_set},
{disc_copies, [node()]},
{local_content, true},
{record_name, delayed_message},
{attributes, record_info(fields, delayed_message)}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
-spec(load(list()) -> ok).
load(_Env) ->
emqx_mod_sup:start_child(?MODULE, worker),
emqx:hook('message.publish', {?MODULE, on_message_publish, []}).
-spec(unload(list()) -> ok).
unload(_Env) ->
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
emqx_mod_sup:stop_child(?MODULE).
description() ->
"EMQ X Delayed Publish Module".
%%--------------------------------------------------------------------
%% Hooks
%%--------------------------------------------------------------------
on_message_publish(Msg = #message{id = Id, topic = <<"$delayed/", Topic/binary>>, timestamp = Ts}) ->
[Delay, Topic1] = binary:split(Topic, <<"/">>),
PubAt = case binary_to_integer(Delay) of
Interval when Interval < ?MAX_INTERVAL ->
Interval + erlang:round(Ts / 1000);
Timestamp ->
%% Check malicious timestamp?
case (Timestamp - erlang:round(Ts / 1000)) > ?MAX_INTERVAL of
true -> error(invalid_delayed_timestamp);
false -> Timestamp
end
end,
PubMsg = Msg#message{topic = Topic1},
Headers = PubMsg#message.headers,
ok = store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}),
{stop, PubMsg#message{headers = Headers#{allow_publish => false}}};
on_message_publish(Msg) ->
{ok, Msg}.
%%--------------------------------------------------------------------
%% Start delayed publish server
%%--------------------------------------------------------------------
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(store(#delayed_message{}) -> ok).
store(DelayedMsg) ->
gen_server:call(?SERVER, {store, DelayedMsg}, infinity).
%%--------------------------------------------------------------------
%% gen_server callback
%%--------------------------------------------------------------------
init([]) ->
{ok, ensure_publish_timer(#{timer => undefined, publish_at => 0})}.
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
ok = mnesia:dirty_write(?TAB, DelayedMsg),
emqx_metrics:set('messages.delayed', delayed_count()),
{reply, ok, ensure_publish_timer(Key, State)};
handle_call(Req, _From, State) ->
?LOG(error, "[Delayed] Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "[Delayed] Unexpected cast: ~p", [Msg]),
{noreply, State}.
%% Do Publish...
handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
lists:foreach(fun(Key) -> mnesia:dirty_delete(?TAB, Key) end, DeletedKeys),
emqx_metrics:set('messages.delayed', delayed_count()),
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
handle_info(Info, State) ->
?LOG(error, "[Delayed] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{timer := TRef}) ->
emqx_misc:cancel_timer(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
%% Ensure publish timer
ensure_publish_timer(State) ->
ensure_publish_timer(mnesia:dirty_first(?TAB), State).
ensure_publish_timer('$end_of_table', State) ->
State#{timer := undefined, publish_at := 0};
ensure_publish_timer({Ts, _Id}, State = #{timer := undefined}) ->
ensure_publish_timer(Ts, os:system_time(seconds), State);
ensure_publish_timer({Ts, _Id}, State = #{timer := TRef, publish_at := PubAt})
when Ts < PubAt ->
ok = emqx_misc:cancel_timer(TRef),
ensure_publish_timer(Ts, os:system_time(seconds), State);
ensure_publish_timer(_Key, State) ->
State.
ensure_publish_timer(Ts, Now, State) ->
Interval = max(1, Ts - Now),
TRef = emqx_misc:start_timer(timer:seconds(Interval), do_publish),
State#{timer := TRef, publish_at := Now + Interval}.
do_publish(Key, Now) ->
do_publish(Key, Now, []).
%% Do publish
do_publish('$end_of_table', _Now, Acc) ->
Acc;
do_publish({Ts, _Id}, Now, Acc) when Ts > Now ->
Acc;
do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
case mnesia:dirty_read(?TAB, Key) of
[] -> ok;
[#delayed_message{msg = Msg}] ->
emqx_pool:async_submit(fun emqx:publish/1, [Msg])
end,
do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key|Acc]).
-spec(delayed_count() -> non_neg_integer()).
delayed_count() -> mnesia:table_info(?TAB, size).

View File

@ -1,130 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_presence).
-behaviour(emqx_gen_mod).
-include("emqx.hrl").
-include("logger.hrl").
-logger_header("[Presence]").
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
-export([ on_client_connected/3
, on_client_disconnected/4
]).
-ifdef(TEST).
-export([reason/1]).
-endif.
load(Env) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Env]}),
emqx_hooks:add('client.disconnected', {?MODULE, on_client_disconnected, [Env]}).
unload(_Env) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
description() ->
"EMQ X Presence Module".
%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
Presence = connected_presence(ClientInfo, ConnInfo),
case emqx_json:safe_encode(Presence) of
{ok, Payload} ->
emqx_broker:safe_publish(
make_msg(qos(Env), topic(connected, ClientId), Payload));
{error, _Reason} ->
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
end.
on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
Presence = #{clientid => ClientId,
username => Username,
reason => reason(Reason),
disconnected_at => DisconnectedAt,
ts => erlang:system_time(millisecond)
},
case emqx_json:safe_encode(Presence) of
{ok, Payload} ->
emqx_broker:safe_publish(
make_msg(qos(Env), topic(disconnected, ClientId), Payload));
{error, _Reason} ->
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
end.
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
connected_presence(#{peerhost := PeerHost,
sockport := SockPort,
clientid := ClientId,
username := Username
},
#{clean_start := CleanStart,
proto_name := ProtoName,
proto_ver := ProtoVer,
keepalive := Keepalive,
connected_at := ConnectedAt,
expiry_interval := ExpiryInterval
}) ->
#{clientid => ClientId,
username => Username,
ipaddress => ntoa(PeerHost),
sockport => SockPort,
proto_name => ProtoName,
proto_ver => ProtoVer,
keepalive => Keepalive,
connack => 0, %% Deprecated?
clean_start => CleanStart,
expiry_interval => ExpiryInterval,
connected_at => ConnectedAt,
ts => erlang:system_time(millisecond)
}.
make_msg(QoS, Topic, Payload) ->
emqx_message:set_flag(
sys, emqx_message:make(
?MODULE, QoS, Topic, iolist_to_binary(Payload))).
topic(connected, ClientId) ->
emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"]));
topic(disconnected, ClientId) ->
emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])).
qos(Env) -> proplists:get_value(qos, Env, 0).
-compile({inline, [reason/1]}).
reason(Reason) when is_atom(Reason) -> Reason;
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error.
-compile({inline, [ntoa/1]}).
ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).

View File

@ -1,103 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_rewrite).
-behaviour(emqx_gen_mod).
-include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl").
-ifdef(TEST).
-export([ compile/1
, match_and_rewrite/2
]).
-endif.
%% APIs
-export([ rewrite_subscribe/4
, rewrite_unsubscribe/4
, rewrite_publish/2
]).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
load(RawRules) ->
{PubRules, SubRules} = compile(RawRules),
emqx_hooks:add('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:add('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:add('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
{ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}.
unload(_) ->
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
description() ->
"EMQ X Topic Rewrite Module".
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
compile(Rules) ->
PubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, pub, Topic, Re, Dest}<- Rules ],
SubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
{PubRules, SubRules}.
match_and_rewrite(Topic, []) ->
Topic;
match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules]) ->
case emqx_topic:match(Topic, Filter) of
true -> rewrite(Topic, MP, Dest);
false -> match_and_rewrite(Topic, Rules)
end.
rewrite(Topic, MP, Dest) ->
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
{match, Captured} ->
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|| I <- lists:seq(1, length(Captured))], Captured),
iolist_to_binary(lists:foldl(
fun({Var, Val}, Acc) ->
re:replace(Acc, Var, Val, [global])
end, Dest, Vars));
nomatch -> Topic
end.

View File

@ -1,65 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_subscription).
-behaviour(emqx_gen_mod).
-include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl").
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
%% APIs
-export([on_client_connected/3]).
%%--------------------------------------------------------------------
%% Load/Unload Hook
%%--------------------------------------------------------------------
load(Topics) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) ->
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
end,
TopicFilters = case ProtoVer of
?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics];
_ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics]
end,
self() ! {subscribe, TopicFilters}.
unload(_) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
description() ->
"EMQ X Subscription Module".
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
rep(<<"%c">>, ClientId, Topic) ->
emqx_topic:feed_var(<<"%c">>, ClientId, Topic);
rep(<<"%u">>, undefined, Topic) ->
Topic;
rep(<<"%u">>, Username, Topic) ->
emqx_topic:feed_var(<<"%u">>, Username, Topic).

View File

@ -1,63 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_sup).
-behaviour(supervisor).
-include("types.hrl").
-export([ start_link/0
, start_child/1
, start_child/2
, stop_child/1
]).
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Type), #{id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5000,
type => Type,
modules => [Mod]}).
-spec(start_link() -> startlink_ret()).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(ChildSpec) when is_map(ChildSpec) ->
supervisor:start_child(?MODULE, ChildSpec).
start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) ->
supervisor:start_child(?MODULE, ?CHILD(Mod, Type)).
-spec(stop_child(any()) -> ok | {error, term()}).
stop_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId);
Error -> Error
end.
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([]) ->
ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]),
{ok, {{one_for_one, 10, 100}, []}}.

View File

@ -1,382 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_topic_metrics).
-behaviour(gen_server).
-behaviour(emqx_gen_mod).
-include("emqx.hrl").
-include("logger.hrl").
-include("emqx_mqtt.hrl").
-logger_header("[TOPIC_METRICS]").
-export([ load/1
, unload/1
, description/0
]).
-export([ on_message_publish/1
, on_message_delivered/2
, on_message_dropped/3
]).
%% API functions
-export([ start_link/0
, stop/0
]).
-export([ inc/2
, inc/3
, val/2
, rate/2
, metrics/1
, register/1
, unregister/1
, unregister_all/0
, is_registered/1
, all_registered_topics/0
]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_info/2
, handle_cast/2
, terminate/2
]).
-define(CRefID(Topic), {?MODULE, Topic}).
-define(MAX_TOPICS, 512).
-define(TAB, ?MODULE).
-define(TOPIC_METRICS,
['messages.in',
'messages.out',
'messages.qos0.in',
'messages.qos0.out',
'messages.qos1.in',
'messages.qos1.out',
'messages.qos2.in',
'messages.qos2.out',
'messages.dropped'
]).
-define(TICKING_INTERVAL, 1).
-record(speed, {
last = 0 :: number(),
tick = 1 :: number(),
last_v = 0 :: number(),
acc = 0 :: number(),
samples = [] :: list()
}).
-record(state, {
speeds :: #{{binary(), atom()} => #speed{}}
}).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
load(_Env) ->
emqx_mod_sup:start_child(?MODULE, worker),
emqx:hook('message.publish', {?MODULE, on_message_publish, []}),
emqx:hook('message.dropped', {?MODULE, on_message_dropped, []}),
emqx:hook('message.delivered', {?MODULE, on_message_delivered, []}).
unload(_Env) ->
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
emqx:unhook('message.dropped', {?MODULE, on_message_dropped}),
emqx:unhook('message.delivered', {?MODULE, on_message_delivered}),
emqx_mod_sup:stop_child(?MODULE).
description() ->
"EMQ X Topic Metrics Module".
on_message_publish(#message{topic = Topic, qos = QoS}) ->
case is_registered(Topic) of
true ->
inc(Topic, 'messages.in'),
case QoS of
?QOS_0 -> inc(Topic, 'messages.qos0.in');
?QOS_1 -> inc(Topic, 'messages.qos1.in');
?QOS_2 -> inc(Topic, 'messages.qos2.in')
end;
false ->
ok
end.
on_message_delivered(_, #message{topic = Topic, qos = QoS}) ->
case is_registered(Topic) of
true ->
inc(Topic, 'messages.out'),
case QoS of
?QOS_0 -> inc(Topic, 'messages.qos0.out');
?QOS_1 -> inc(Topic, 'messages.qos1.out');
?QOS_2 -> inc(Topic, 'messages.qos2.out')
end;
false ->
ok
end.
on_message_dropped(#message{topic = Topic}, _, _) ->
case is_registered(Topic) of
true ->
inc(Topic, 'messages.dropped');
false ->
ok
end.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:stop(?MODULE).
inc(Topic, Metric) ->
inc(Topic, Metric, 1).
inc(Topic, Metric, Val) ->
case get_counters(Topic) of
{error, topic_not_found} ->
{error, topic_not_found};
CRef ->
case metric_idx(Metric) of
{error, invalid_metric} ->
{error, invalid_metric};
Idx ->
counters:add(CRef, Idx, Val)
end
end.
val(Topic, Metric) ->
case ets:lookup(?TAB, Topic) of
[] ->
{error, topic_not_found};
[{Topic, CRef}] ->
case metric_idx(Metric) of
{error, invalid_metric} ->
{error, invalid_metric};
Idx ->
counters:get(CRef, Idx)
end
end.
rate(Topic, Metric) ->
gen_server:call(?MODULE, {get_rate, Topic, Metric}).
metrics(Topic) ->
case ets:lookup(?TAB, Topic) of
[] ->
{error, topic_not_found};
[{Topic, CRef}] ->
lists:foldl(fun(Metric, Acc) ->
[{to_count(Metric), counters:get(CRef, metric_idx(Metric))},
{to_rate(Metric), rate(Topic, Metric)} | Acc]
end, [], ?TOPIC_METRICS)
end.
register(Topic) when is_binary(Topic) ->
gen_server:call(?MODULE, {register, Topic}).
unregister(Topic) when is_binary(Topic) ->
gen_server:call(?MODULE, {unregister, Topic}).
unregister_all() ->
gen_server:call(?MODULE, {unregister, all}).
is_registered(Topic) ->
ets:member(?TAB, Topic).
all_registered_topics() ->
[Topic || {Topic, _CRef} <- ets:tab2list(?TAB)].
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
erlang:process_flag(trap_exit, true),
ok = emqx_tables:new(?TAB, [{read_concurrency, true}]),
erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
{ok, #state{speeds = #{}}, hibernate}.
handle_call({register, Topic}, _From, State = #state{speeds = Speeds}) ->
case is_registered(Topic) of
true ->
{reply, {error, already_existed}, State};
false ->
case number_of_registered_topics() < ?MAX_TOPICS of
true ->
CRef = counters:new(counters_size(), [write_concurrency]),
true = ets:insert(?TAB, {Topic, CRef}),
[counters:put(CRef, Idx, 0) || Idx <- lists:seq(1, counters_size())],
NSpeeds = lists:foldl(fun(Metric, Acc) ->
maps:put({Topic, Metric}, #speed{}, Acc)
end, Speeds, ?TOPIC_METRICS),
{reply, ok, State#state{speeds = NSpeeds}};
false ->
{reply, {error, quota_exceeded}, State}
end
end;
handle_call({unregister, all}, _From, State) ->
[delete_counters(Topic) || {Topic, _CRef} <- ets:tab2list(?TAB)],
{reply, ok, State#state{speeds = #{}}};
handle_call({unregister, Topic}, _From, State = #state{speeds = Speeds}) ->
case is_registered(Topic) of
false ->
{reply, ok, State};
true ->
ok = delete_counters(Topic),
NSpeeds = lists:foldl(fun(Metric, Acc) ->
maps:remove({Topic, Metric}, Acc)
end, Speeds, ?TOPIC_METRICS),
{reply, ok, State#state{speeds = NSpeeds}}
end;
handle_call({get_rate, Topic, Metric}, _From, State = #state{speeds = Speeds}) ->
case is_registered(Topic) of
false ->
{reply, {error, topic_not_found}, State};
true ->
case maps:get({Topic, Metric}, Speeds, undefined) of
undefined ->
{reply, {error, invalid_metric}, State};
#speed{last = Last} ->
{reply, Last, State}
end
end.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(ticking, State = #state{speeds = Speeds}) ->
NSpeeds = maps:map(
fun({Topic, Metric}, Speed) ->
case val(Topic, Metric) of
{error, topic_not_found} -> maps:remove({Topic, Metric}, Speeds);
Val -> calculate_speed(Val, Speed)
end
end, Speeds),
erlang:send_after(timer:seconds(5), self(), ticking),
{noreply, State#state{speeds = NSpeeds}};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
metric_idx('messages.in') -> 01;
metric_idx('messages.out') -> 02;
metric_idx('messages.qos0.in') -> 03;
metric_idx('messages.qos0.out') -> 04;
metric_idx('messages.qos1.in') -> 05;
metric_idx('messages.qos1.out') -> 06;
metric_idx('messages.qos2.in') -> 07;
metric_idx('messages.qos2.out') -> 08;
metric_idx('messages.dropped') -> 09;
metric_idx(_) ->
{error, invalid_metric}.
to_count('messages.in') ->
'messages.in.count';
to_count('messages.out') ->
'messages.out.count';
to_count('messages.qos0.in') ->
'messages.qos0.in.count';
to_count('messages.qos0.out') ->
'messages.qos0.out.count';
to_count('messages.qos1.in') ->
'messages.qos1.in.count';
to_count('messages.qos1.out') ->
'messages.qos1.out.count';
to_count('messages.qos2.in') ->
'messages.qos2.in.count';
to_count('messages.qos2.out') ->
'messages.qos2.out.count';
to_count('messages.dropped') ->
'messages.dropped.count'.
to_rate('messages.in') ->
'messages.in.rate';
to_rate('messages.out') ->
'messages.out.rate';
to_rate('messages.qos0.in') ->
'messages.qos0.in.rate';
to_rate('messages.qos0.out') ->
'messages.qos0.out.rate';
to_rate('messages.qos1.in') ->
'messages.qos1.in.rate';
to_rate('messages.qos1.out') ->
'messages.qos1.out.rate';
to_rate('messages.qos2.in') ->
'messages.qos2.in.rate';
to_rate('messages.qos2.out') ->
'messages.qos2.out.rate';
to_rate('messages.dropped') ->
'messages.dropped.rate'.
delete_counters(Topic) ->
true = ets:delete(?TAB, Topic),
ok.
get_counters(Topic) ->
case ets:lookup(?TAB, Topic) of
[] -> {error, topic_not_found};
[{Topic, CRef}] -> CRef
end.
counters_size() ->
length(?TOPIC_METRICS).
number_of_registered_topics() ->
proplists:get_value(size, ets:info(?TAB)).
calculate_speed(CurVal, #speed{last_v = LastVal, tick = Tick, acc = Acc, samples = Samples}) ->
%% calculate the current speed based on the last value of the counter
CurSpeed = (CurVal - LastVal) / ?TICKING_INTERVAL,
%% calculate the average speed in last 5 seconds
case Tick =< 5 of
true ->
Acc1 = Acc + CurSpeed,
#speed{last = Acc1 / Tick,
last_v = CurVal,
acc = Acc1,
samples = Samples ++ [CurSpeed],
tick = Tick + 1};
false ->
[FirstSpeed | Speeds] = Samples,
Acc1 = Acc + CurSpeed - FirstSpeed,
#speed{last = Acc1 / Tick,
last_v = CurVal,
acc = Acc1,
samples = Speeds ++ [CurSpeed],
tick = Tick}
end.

View File

@ -1,169 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules).
-include("logger.hrl").
-logger_header("[Modules]").
-export([ list/0
, load/0
, load/1
, unload/0
, unload/1
, reload/1
, find_module/1
, load_module/2
]).
%% @doc List all available plugins
-spec(list() -> [{atom(), boolean()}]).
list() ->
ets:tab2list(?MODULE).
%% @doc Load all the extended modules.
-spec(load() -> ok).
load() ->
case emqx:get_env(modules_loaded_file) of
undefined -> ok;
File ->
load_modules(File)
end.
load(ModuleName) ->
case find_module(ModuleName) of
[] ->
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
{error, not_found};
[{ModuleName, true}] ->
?LOG(notice, "Module ~s is already started", [ModuleName]),
{error, already_started};
[{ModuleName, false}] ->
emqx_modules:load_module(ModuleName, true)
end.
%% @doc Unload all the extended modules.
-spec(unload() -> ok).
unload() ->
case emqx:get_env(modules_loaded_file) of
undefined -> ignore;
File ->
unload_modules(File)
end.
unload(ModuleName) ->
case find_module(ModuleName) of
[] ->
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
{error, not_found};
[{ModuleName, false}] ->
?LOG(error, "Module ~s is not started", [ModuleName]),
{error, not_started};
[{ModuleName, true}] ->
unload_module(ModuleName, true)
end.
reload(emqx_mod_acl_internal) ->
Modules = emqx:get_env(modules, []),
Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined),
case emqx_mod_acl_internal:reload(Env) of
ok ->
?LOG(info, "Reload ~s module successfully.", [emqx_mod_acl_internal]);
{error, Error} ->
?LOG(error, "Reload module ~s failed, cannot start for ~0p", [emqx_mod_acl_internal, Error])
end;
reload(_) ->
ignore.
find_module(ModuleName) ->
ets:lookup(?MODULE, ModuleName).
filter_module(ModuleNames) ->
filter_module(ModuleNames, emqx:get_env(modules, [])).
filter_module([], Acc) ->
Acc;
filter_module([{ModuleName, true} | ModuleNames], Acc) ->
filter_module(ModuleNames, lists:keydelete(ModuleName, 1, Acc));
filter_module([{_, false} | ModuleNames], Acc) ->
filter_module(ModuleNames, Acc).
load_modules(File) ->
case file:consult(File) of
{ok, ModuleNames} ->
lists:foreach(fun({ModuleName, _}) ->
ets:insert(?MODULE, {ModuleName, false})
end, filter_module(ModuleNames)),
lists:foreach(fun load_module/1, ModuleNames);
{error, Error} ->
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error])
end.
load_module({ModuleName, true}) ->
emqx_modules:load_module(ModuleName, false);
load_module({ModuleName, false}) ->
ets:insert(?MODULE, {ModuleName, false});
load_module(ModuleName) ->
load_module({ModuleName, true}).
load_module(ModuleName, Persistent) ->
Modules = emqx:get_env(modules, []),
Env = proplists:get_value(ModuleName, Modules, undefined),
case ModuleName:load(Env) of
ok ->
ets:insert(?MODULE, {ModuleName, true}),
write_loaded(Persistent),
?LOG(info, "Load ~s module successfully.", [ModuleName]);
{error, Error} ->
?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]),
{error, Error}
end.
unload_modules(File) ->
case file:consult(File) of
{ok, ModuleNames} ->
lists:foreach(fun unload_module/1, ModuleNames);
{error, Error} ->
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error])
end.
unload_module({ModuleName, true}) ->
unload_module(ModuleName, false);
unload_module({ModuleName, false}) ->
ets:insert(?MODULE, {ModuleName, false});
unload_module(ModuleName) ->
unload_module({ModuleName, true}).
unload_module(ModuleName, Persistent) ->
Modules = emqx:get_env(modules, []),
Env = proplists:get_value(ModuleName, Modules, undefined),
case ModuleName:unload(Env) of
ok ->
ets:insert(?MODULE, {ModuleName, false}),
write_loaded(Persistent),
?LOG(info, "Unload ~s module successfully.", [ModuleName]);
{error, Error} ->
?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error])
end.
write_loaded(true) ->
FilePath = emqx:get_env(modules_loaded_file),
case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of
ok -> ok;
{error, Error} ->
?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]),
{error, Error}
end;
write_loaded(false) -> ok.

View File

@ -429,7 +429,7 @@ deliver(Delivers, Session) ->
deliver([], Publishes, Session) ->
{ok, lists:reverse(Publishes), Session};
deliver([Msg|More], Acc, Session) ->
deliver([Msg | More], Acc, Session) ->
case deliver_msg(Msg, Session) of
{ok, Session1} ->
deliver(More, Acc, Session1);

View File

@ -67,12 +67,11 @@ init([]) ->
BrokerSup = child_spec(emqx_broker_sup, supervisor),
CMSup = child_spec(emqx_cm_sup, supervisor),
SysSup = child_spec(emqx_sys_sup, supervisor),
ModSup = child_spec(emqx_mod_sup, supervisor),
Childs = [KernelSup] ++
[RouterSup || emqx_boot:is_enabled(router)] ++
[BrokerSup || emqx_boot:is_enabled(broker)] ++
[CMSup || emqx_boot:is_enabled(broker)] ++
[SysSup] ++ [ModSup],
[SysSup],
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1

View File

@ -121,6 +121,7 @@
conn_props := properties(),
connected := boolean(),
connected_at := non_neg_integer(),
disconnected_at => non_neg_integer(),
keepalive := 0..16#FFFF,
receive_maximum := non_neg_integer(),
expiry_interval := non_neg_integer(),

View File

@ -0,0 +1 @@
{deny, {client, "batch_test"}, subscribe, ["t1", "t2", "t3"]}.

View File

@ -1,97 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_access_rule_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules([router, broker]),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_compile(_) ->
Rule1 = {allow, all, pubsub, <<"%u">>},
Compile1 = {allow, all, pubsub, [{pattern,[<<"%u">>]}]},
Rule2 = {allow, {ipaddr, "127.0.0.1"}, pubsub, <<"%c">>},
Compile2 = {allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, pubsub, [{pattern,[<<"%c">>]}]},
Rule3 = {allow, {'and', [{client, <<"testClient">>}, {user, <<"testUser">>}]}, pubsub, [<<"testTopics1">>, <<"testTopics2">>]},
Compile3 = {allow, {'and', [{client, <<"testClient">>}, {user, <<"testUser">>}]}, pubsub, [[<<"testTopics1">>], [<<"testTopics2">>]]},
Rule4 = {allow, {'or', [{client, all}, {user, all}]}, pubsub, [ <<"testTopics1">>, <<"testTopics2">>]},
Compile4 = {allow, {'or', [{client, all}, {user, all}]}, pubsub, [[<<"testTopics1">>], [<<"testTopics2">>]]},
?assertEqual(Compile1, emqx_access_rule:compile(Rule1)),
?assertEqual(Compile2, emqx_access_rule:compile(Rule2)),
?assertEqual(Compile3, emqx_access_rule:compile(Rule3)),
?assertEqual(Compile4, emqx_access_rule:compile(Rule4)).
t_match(_) ->
ClientInfo1 = #{zone => external,
clientid => <<"testClient">>,
username => <<"TestUser">>,
peerhost => {127,0,0,1}
},
ClientInfo2 = #{zone => external,
clientid => <<"testClient">>,
username => <<"TestUser">>,
peerhost => {192,168,0,10}
},
ClientInfo3 = #{zone => external,
clientid => <<"testClient">>,
username => <<"TestUser">>,
peerhost => undefined
},
?assertEqual({matched, deny}, emqx_access_rule:match([], [], {deny, all})),
?assertEqual({matched, allow}, emqx_access_rule:match([], [], {allow, all})),
?assertEqual(nomatch, emqx_access_rule:match(ClientInfo1, <<"Test/Topic">>,
emqx_access_rule:compile({allow, {user, all}, pubsub, []}))),
?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"Test/Topic">>,
emqx_access_rule:compile({allow, {client, all}, pubsub, ["$SYS/#", "#"]}))),
?assertEqual(nomatch, emqx_access_rule:match(ClientInfo3, <<"Test/Topic">>,
emqx_access_rule:compile({allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}))),
?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"Test/Topic">>,
emqx_access_rule:compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}))),
?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo2, <<"Test/Topic">>,
emqx_access_rule:compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]}))),
?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"d/e/f/x">>,
emqx_access_rule:compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]}))),
?assertEqual(nomatch, emqx_access_rule:match(ClientInfo1, <<"d/e/f/x">>,
emqx_access_rule:compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}))),
?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"testTopics/testClient">>,
emqx_access_rule:compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}))),
?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"clients/testClient">>,
emqx_access_rule:compile({allow, all, pubsub, ["clients/%c"]}))),
?assertEqual({matched, allow}, emqx_access_rule:match(#{username => <<"user2">>}, <<"users/user2/abc/def">>,
emqx_access_rule:compile({allow, all, subscribe, ["users/%u/#"]}))),
?assertEqual({matched, deny}, emqx_access_rule:match(ClientInfo1, <<"d/e/f">>,
emqx_access_rule:compile({deny, all, subscribe, ["$SYS/#", "#"]}))),
?assertEqual(nomatch, emqx_access_rule:match(ClientInfo1, <<"Topic">>,
emqx_access_rule:compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}))),
?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"Topic">>,
emqx_access_rule:compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}))),
?assertEqual({matched, allow}, emqx_access_rule:match(ClientInfo1, <<"Topic">>,
emqx_access_rule:compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}))).

View File

@ -56,7 +56,7 @@ t_clean_acl_cache(_) ->
emqtt:stop(Client).
% optimize??
t_reload_aclfile_and_cleanall(Config) ->
t_reload_aclfile_and_cleanall(_Config) ->
RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end,
disconnected => fun(_) -> ok end,
@ -78,27 +78,6 @@ t_reload_aclfile_and_cleanall(Config) ->
%% Check acl cache list
[ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>),
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
%% Update acl file and reload mod_acl_internal
Path = filename:join([testdir(proplists:get_value(data_dir, Config)), "acl2.conf"]),
ok = file:write_file(Path, <<"{deny, all}.">>),
OldPath = emqx:get_env(acl_file),
% application:set_env(emqx, acl_file, Path),
emqx_mod_acl_internal:reload([{acl_file, Path}]),
?assert(length(gen_server:call(ClientPid, list_acl_cache)) == 0),
{ok, PktId2} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
receive
{puback, #{packet_id := PktId2, reason_code := Rc2}} ->
%% Not authorized
?assertEqual(16#87, Rc2);
_ ->
?assert(false)
end,
application:set_env(emqx, acl_file, OldPath),
file:delete(Path),
emqx_mod_acl_internal:reload([{acl_file, OldPath}]),
emqtt:stop(Client).
%% @private

View File

@ -1,33 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_acl_test_mod).
%% ACL callbacks
-export([ init/1
, check_acl/2
, description/0
]).
init(AclOpts) ->
{ok, AclOpts}.
check_acl({_User, _PubSub, _Topic}, _State) ->
allow.
description() ->
"Test ACL Mod".

View File

@ -417,6 +417,14 @@ t_handle_deliver(_) ->
{ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_deliver(Delivers, channel()),
?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt)|| Pkt <- Packets]).
t_handle_deliver_nl(_) ->
ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
Session = session(#{subscriptions => #{<<"t1">> => #{nl => 1}}}),
Channel = channel(#{clientinfo => ClientInfo, session => Session}),
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
NMsg = emqx_message:set_flag(nl, Msg),
{ok, Channel} = emqx_channel:handle_deliver([{deliver, <<"t1">>, NMsg}], Channel).
%%--------------------------------------------------------------------
%% Test cases for handle_out
%%--------------------------------------------------------------------
@ -434,13 +442,6 @@ t_handle_out_publish_1(_) ->
{ok, {outgoing, [?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>)]}, _Chan}
= emqx_channel:handle_out(publish, [{1, Msg}], channel()).
t_handle_out_publish_nl(_) ->
ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
Channel = channel(#{clientinfo => ClientInfo}),
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
Pubs = [{1, emqx_message:set_flag(nl, Msg)}],
{ok, {outgoing,[]}, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
t_handle_out_connack_sucess(_) ->
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()),

View File

@ -1,64 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_acl_internal_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_load_unload(_) ->
?assertEqual(ok, emqx_mod_acl_internal:unload([])),
?assertEqual(ok, emqx_mod_acl_internal:load([])),
?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])).
t_check_acl(_) ->
Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]},
?assertEqual({ok, allow}, emqx_mod_acl_internal:check_acl(clientinfo(), publish, <<"t">>, [], Rules)),
?assertEqual({ok, deny}, emqx_mod_acl_internal:check_acl(clientinfo(), subscribe, <<"t">>, [], Rules)),
?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)).
t_reload_acl(_) ->
?assertEqual(ok, emqx_mod_acl_internal:reload([])).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
clientinfo() -> clientinfo(#{}).
clientinfo(InitProps) ->
maps:merge(#{zone => zone,
protocol => mqtt,
peerhost => {127,0,0,1},
clientid => <<"clientid">>,
username => <<"username">>,
password => <<"passwd">>,
is_superuser => false,
peercert => undefined,
mountpoint => undefined
}, InitProps).

View File

@ -1,78 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_delayed_SUITE).
-import(emqx_mod_delayed, [on_message_publish/1]).
-compile(export_all).
-compile(nowarn_export_all).
-record(delayed_message, {key, msg}).
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([]).
set_special_configs(emqx) ->
application:set_env(emqx, modules, [{emqx_mod_delayed, []}]),
application:set_env(emqx, allow_anonymous, false),
application:set_env(emqx, enable_acl_cache, false);
set_special_configs(_App) ->
ok.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_load_case(_) ->
UnHooks = emqx_hooks:lookup('message.publish'),
?assertEqual([], UnHooks),
ok = emqx_mod_delayed:load([]),
Hooks = emqx_hooks:lookup('message.publish'),
?assertEqual(1, length(Hooks)),
ok.
t_delayed_message(_) ->
ok = emqx_mod_delayed:load([]),
DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>),
?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)),
Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>),
?assertEqual({ok, Msg}, on_message_publish(Msg)),
[Key] = mnesia:dirty_all_keys(emqx_mod_delayed),
[#delayed_message{msg = #message{payload = Payload}}] = mnesia:dirty_read({emqx_mod_delayed, Key}),
?assertEqual(<<"delayed_m">>, Payload),
timer:sleep(5000),
EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed),
?assertEqual([], EmptyKey),
ok = emqx_mod_delayed:unload([]).

View File

@ -1,88 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_presence_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
%% Ensure all the modules unloaded.
ok = emqx_modules:unload(),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
%% Test case for emqx_mod_presence
t_mod_presence(_) ->
ok = emqx_mod_presence:load([{qos, ?QOS_1}]),
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
{ok, _} = emqtt:connect(C1),
{ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1),
%% Connected Presence
{ok, C2} = emqtt:start_link([{clientid, <<"clientid">>},
{username, <<"username">>}]),
{ok, _} = emqtt:connect(C2),
ok = recv_and_check_presence(<<"clientid">>, <<"connected">>),
%% Disconnected Presence
ok = emqtt:disconnect(C2),
ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>),
ok = emqtt:disconnect(C1),
ok = emqx_mod_presence:unload([{qos, ?QOS_1}]).
t_mod_presence_reason(_) ->
?assertEqual(normal, emqx_mod_presence:reason(normal)),
?assertEqual(discarded, emqx_mod_presence:reason({shutdown, discarded})),
?assertEqual(tcp_error, emqx_mod_presence:reason({tcp_error, einval})),
?assertEqual(internal_error, emqx_mod_presence:reason(<<"unknown error">>)).
recv_and_check_presence(ClientId, Presence) ->
{ok, #{qos := ?QOS_1, topic := Topic, payload := Payload}} = receive_publish(100),
?assertMatch([<<"$SYS">>, <<"brokers">>, _Node, <<"clients">>, ClientId, Presence],
binary:split(Topic, <<"/">>, [global])),
case Presence of
<<"connected">> ->
?assertMatch(#{<<"clientid">> := <<"clientid">>,
<<"username">> := <<"username">>,
<<"ipaddress">> := <<"127.0.0.1">>,
<<"proto_name">> := <<"MQTT">>,
<<"proto_ver">> := ?MQTT_PROTO_V4,
<<"connack">> := ?RC_SUCCESS,
<<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps]));
<<"disconnected">> ->
?assertMatch(#{<<"clientid">> := <<"clientid">>,
<<"username">> := <<"username">>,
<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
receive_publish(Timeout) ->
receive
{publish, Publish} -> {ok, Publish}
after
Timeout -> {error, timeout}
end.

View File

@ -1,93 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_rewrite_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
{rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}
]).
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
%% Ensure all the modules unloaded.
ok = emqx_modules:unload(),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
%% Test case for emqx_mod_write
t_mod_rewrite(_Config) ->
ok = emqx_mod_rewrite:load(?RULES),
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
{ok, _} = emqtt:connect(C),
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],
PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>],
SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>],
SubDestTopics = [<<"y/z/b">>, <<"y/def">>],
%% Sub Rules
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]),
timer:sleep(100),
Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
RecvTopics1 = [begin
ok = emqtt:publish(C, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- SubDestTopics],
?assertEqual(SubDestTopics, RecvTopics1),
{ok, _, _} = emqtt:unsubscribe(C, SubOrigTopics),
timer:sleep(100),
?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
%% Pub Rules
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
RecvTopics2 = [begin
ok = emqtt:publish(C, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- PubOrigTopics],
?assertEqual(PubDestTopics, RecvTopics2),
{ok, _, _} = emqtt:unsubscribe(C, PubDestTopics),
ok = emqtt:disconnect(C),
ok = emqx_mod_rewrite:unload(?RULES).
t_rewrite_rule(_Config) ->
{PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES),
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)),
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)),
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)),
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules)).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
receive_publish(Timeout) ->
receive
{publish, Publish} -> {ok, Publish}
after
Timeout -> {error, timeout}
end.

View File

@ -1,92 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_subscription_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_on_client_connected(_) ->
?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])),
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, "myclient"},
{username, "admin"}]),
{ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0),
{ok, #{topic := Topic, payload := Payload}} = receive_publish(100),
?assertEqual(<<"connected/myclient/admin">>, Topic),
?assertEqual(<<"Hello world">>, Payload),
ok = emqtt:disconnect(C),
?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])).
t_on_undefined_client_connected(_) ->
?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/undefined">>, #{qos => ?QOS_1}}])),
{ok, C} = emqtt:start_link([{host, "localhost"}]),
{ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"connected/undefined">>, <<"Hello world">>, ?QOS_1),
{ok, #{topic := Topic, payload := Payload}} = receive_publish(100),
?assertEqual(<<"connected/undefined">>, Topic),
?assertEqual(<<"Hello world">>, Payload),
ok = emqtt:disconnect(C),
?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, #{qos => ?QOS_1}}])).
t_suboption(_) ->
Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end,
Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2},
?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])),
{ok, C1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
timer:sleep(200),
[CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)),
[ Sub1 | _ ] = ets:lookup(emqx_subscription,CPid1),
[ Suboption1 | _ ] = ets:lookup(emqx_suboption,Sub1),
?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1),
ok = emqtt:disconnect(C1),
%% The subscription option is not valid for MQTT V3.1.1
{ok, C2} = emqtt:start_link([{proto_ver, v4}]),
{ok, _} = emqtt:connect(C2),
timer:sleep(200),
[CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)),
[ Sub2 | _ ] = ets:lookup(emqx_subscription,CPid2),
[ Suboption2 | _ ] = ets:lookup(emqx_suboption,Sub2),
ok = emqtt:disconnect(C2),
?assertMatch({Sub2, #{qos := 2, nl := 0, rap := 0, rh := 0, subid := _}}, Suboption2),
?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, Suboption}])).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
receive_publish(Timeout) ->
receive
{publish, Publish} -> {ok, Publish}
after
Timeout -> {error, timeout}
end.

View File

@ -1,51 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_sup_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_start(_) ->
{ok, _} = emqx_mod_sup:start_link(),
?assertEqual([], supervisor:which_children(emqx_mod_sup)).
t_start_child(_) ->
%% Set the emqx_mod_sup child with emqx_hooks for test
Mod = emqx_hooks,
Spec = #{id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]},
{ok, _} = emqx_mod_sup:start_link(),
{ok, _} = emqx_mod_sup:start_child(Mod, worker),
{error, {already_started, _}} = emqx_mod_sup:start_child(Spec),
ok = emqx_mod_sup:stop_child(Mod),
{error, not_found} = emqx_mod_sup:stop_child(Mod),
ok.

View File

@ -1,92 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_topic_metrics_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_nonexistent_topic_metrics(_) ->
emqx_mod_topic_metrics:load([]),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in')),
emqx_mod_topic_metrics:register(<<"a/b/c">>),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]).
t_topic_metrics(_) ->
emqx_mod_topic_metrics:load([]),
?assertEqual(false, emqx_mod_topic_metrics:is_registered(<<"a/b/c">>)),
?assertEqual([], emqx_mod_topic_metrics:all_registered_topics()),
emqx_mod_topic_metrics:register(<<"a/b/c">>),
?assertEqual(true, emqx_mod_topic_metrics:is_registered(<<"a/b/c">>)),
?assertEqual([<<"a/b/c">>], emqx_mod_topic_metrics:all_registered_topics()),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(ok, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assert(emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]).
t_hook(_) ->
emqx_mod_topic_metrics:load([]),
emqx_mod_topic_metrics:register(<<"a/b/c">>),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, "myclient"},
{username, "myuser"}]),
{ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0),
ct:sleep(100),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
emqtt:subscribe(C, <<"a/b/c">>),
emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0),
ct:sleep(100),
?assertEqual(2, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(2, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]).

View File

@ -1,47 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1),
Config.
set_sepecial_cfg(_) ->
application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")),
ok.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_load(_) ->
?assertEqual(ok, emqx_modules:unload()),
?assertEqual(ok, emqx_modules:load()),
?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)),
?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)),
?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)),
?assertEqual(ok, emqx_modules:reload(emqx_mod_acl_internal)).
t_list(_) ->
?assertMatch([{_, _} | _ ], emqx_modules:list()).

View File

@ -179,10 +179,7 @@ t_batch_subscribe(_) ->
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]),
{ok, _} = emqtt:connect(Client),
application:set_env(emqx, enable_acl_cache, false),
TempAcl = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_temp.conf"),
file:write_file(TempAcl, "{deny, {client, \"batch_test\"}, subscribe, [\"t1\", \"t2\", \"t3\"]}.\n"),
timer:sleep(10),
emqx_mod_acl_internal:reload([{acl_file, TempAcl}]),
application:set_env(emqx, acl_nomatch, deny),
{ok, _, [?RC_NOT_AUTHORIZED,
?RC_NOT_AUTHORIZED,
?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1},
@ -193,8 +190,9 @@ t_batch_subscribe(_) ->
?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>,
<<"t2">>,
<<"t3">>]),
file:delete(TempAcl),
application:set_env(emqx, acl_nomatch, allow),
emqtt:disconnect(Client).
t_connect_will_retain(_) ->
Topic = nth(1, ?TOPICS),