diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index ee3b27ddf..14e4468e3 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -60,9 +60,9 @@ passwd_hash(sha, Password) -> hexstring(crypto:hash(sha, Password)); passwd_hash(sha256, Password) -> hexstring(crypto:hash(sha256, Password)); -passwd_hash(pbkdf2,{Salt, Password, Macfun, Iterations, Dklen}) -> +passwd_hash(pbkdf2, {Salt, Password, Macfun, Iterations, Dklen}) -> case pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen) of - {ok,Hexstring} -> pbkdf2:to_hex(Hexstring); + {ok, Hexstring} -> pbkdf2:to_hex(Hexstring); {error, Error} -> lager:error("PasswdHash with pbkdf2 error:~p", [Error]), error end; passwd_hash(bcrypt, {Salt, Password}) -> diff --git a/test/emqttd_SUITE_data/emqttd.conf b/test/emqttd_SUITE_data/emqttd.conf index a15aeada9..46306484a 100644 --- a/test/emqttd_SUITE_data/emqttd.conf +++ b/test/emqttd_SUITE_data/emqttd.conf @@ -1,8 +1,18 @@ ##=================================================================== -## EMQ Configuration R2.1 +## EMQ Configuration R2.2 ##=================================================================== +##-------------------------------------------------------------------- +## Cluster +##-------------------------------------------------------------------- + +## The cluster Id +cluster.id = emq + +## The multicast address and port. +cluster.multicast = 239.192.0.1:44369 + ##-------------------------------------------------------------------- ## Node Args ##-------------------------------------------------------------------- @@ -11,7 +21,7 @@ node.name = emqttd@127.0.0.1 ## Cookie for distributed node -node.cookie = emq_dist_cookie +node.cookie = emqsecretcookie ## SMP support: enable, auto, disable node.smp = auto @@ -50,30 +60,30 @@ node.crash_dump = log/crash.dump node.dist_net_ticktime = 60 ## Distributed node port range -## node.dist_listen_min = 6369 -## node.dist_listen_max = 6369 +node.dist_listen_min = 6369 +node.dist_listen_max = 6369 ##-------------------------------------------------------------------- ## Log ##-------------------------------------------------------------------- ## Set the log dir -log.dir = {{ platform_log_dir }} +log.dir = log ## Console log. Enum: off, file, console, both log.console = console +## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency +log.console.level = error + ## Syslog. Enum: on, off log.syslog = on ## syslog level. Enum: debug, info, notice, warning, error, critical, alert, emergency log.syslog.level = error -## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency -log.console.level = error - ## Console log file -## log.console.file = {{ platform_log_dir }}/console.log +## log.console.file = log/console.log ## Error log file log.error.file = log/error.log @@ -90,6 +100,9 @@ log.crash.file = log/crash.log ## Allow Anonymous authentication mqtt.allow_anonymous = true +## ACL nomatch +mqtt.acl_nomatch = allow + ## Default ACL File mqtt.acl_file = etc/acl.conf @@ -106,6 +119,13 @@ mqtt.max_clientid_len = 1024 ## Max Packet Size Allowed, 64K by default. mqtt.max_packet_size = 64KB +##-------------------------------------------------------------------- +## MQTT Connection +##-------------------------------------------------------------------- + +## Force GC: integer. Value 0 disabled the Force GC. +mqtt.conn.force_gc_count = 100 + ##-------------------------------------------------------------------- ## MQTT Client ##-------------------------------------------------------------------- @@ -113,30 +133,36 @@ mqtt.max_packet_size = 64KB ## Client Idle Timeout (Second) mqtt.client.idle_timeout = 30s -## Enable client Stats: seconds or off +## Max publish rate of Messages +## mqtt.client.max_publish_rate = 5 + +## Enable client Stats: on | off mqtt.client.enable_stats = off ##-------------------------------------------------------------------- ## MQTT Session ##-------------------------------------------------------------------- +## Max Number of Subscriptions, 0 means no limit. +mqtt.session.max_subscriptions = 0 + ## Upgrade QoS? mqtt.session.upgrade_qos = off -## Max number of QoS 1 and 2 messages that can be “inflight” at one time. +## Max Size of the Inflight Window for QoS1 and QoS2 messages ## 0 means no limit mqtt.session.max_inflight = 32 ## Retry Interval for redelivering QoS1/2 messages. mqtt.session.retry_interval = 20s -## Max Packets that Awaiting PUBREL, 0 means no limit +## Client -> Broker: Max Packets Awaiting PUBREL, 0 means no limit mqtt.session.max_awaiting_rel = 100 ## Awaiting PUBREL Timeout mqtt.session.await_rel_timeout = 20s -## Enable Statistics at the Interval(seconds) +## Enable Statistics: on | off mqtt.session.enable_stats = off ## Expired after 1 day: @@ -147,28 +173,31 @@ mqtt.session.enable_stats = off ## s - second mqtt.session.expiry_interval = 2h +## Ignore message from self publish +mqtt.session.ignore_loop_deliver = false + ##-------------------------------------------------------------------- -## MQTT Queue +## MQTT Message Queue ##-------------------------------------------------------------------- ## Type: simple | priority -mqtt.queue.type = simple +mqtt.mqueue.type = simple ## Topic Priority: 0~255, Default is 0 -## mqtt.queue.priority = topic/1=10,topic/2=8 +## mqtt.mqueue.priority = topic/1=10,topic/2=8 ## Max queue length. Enqueued messages when persistent client disconnected, -## or inflight window is full. -mqtt.queue.max_length = infinity +## or inflight window is full. 0 means no limit. +mqtt.mqueue.max_length = 0 ## Low-water mark of queued messages -mqtt.queue.low_watermark = 20% +mqtt.mqueue.low_watermark = 20% ## High-water mark of queued messages -mqtt.queue.high_watermark = 60% +mqtt.mqueue.high_watermark = 60% ## Queue Qos0 messages? -mqtt.queue.qos0 = true +mqtt.mqueue.store_qos0 = true ##-------------------------------------------------------------------- ## MQTT Broker and PubSub @@ -200,7 +229,7 @@ mqtt.bridge.ping_down_interval = 1 ##------------------------------------------------------------------- ## Dir of plugins' config -mqtt.plugins.etc_dir = etc/plugins/ +mqtt.plugins.etc_dir =etc/plugins/ ## File to store loaded plugin names. mqtt.plugins.loaded_file = data/loaded_plugins @@ -209,63 +238,210 @@ mqtt.plugins.loaded_file = data/loaded_plugins ## MQTT Listeners ##-------------------------------------------------------------------- -## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 -mqtt.listener.tcp = 1883 +##-------------------------------------------------------------------- +## External TCP Listener + +## External TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 +listener.tcp.external = 0.0.0.0:1883 ## Size of acceptor pool -mqtt.listener.tcp.acceptors = 8 +listener.tcp.external.acceptors = 16 ## Maximum number of concurrent clients -mqtt.listener.tcp.max_clients = 1024 +listener.tcp.external.max_clients = 102400 + +#listener.tcp.external.mountpoint = external/ ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.tcp.rate_limit = 100,10 +#listener.tcp.external.rate_limit = 100,10 + +#listener.tcp.external.access.1 = allow 192.168.0.0/24 + +listener.tcp.external.access.2 = allow all + +## Proxy Protocol V1/2 +## listener.tcp.external.proxy_protocol = on +## listener.tcp.external.proxy_protocol_timeout = 3s ## TCP Socket Options -mqtt.listener.tcp.backlog = 1024 -## mqtt.listener.tcp.recbuf = 4096 -## mqtt.listener.tcp.sndbuf = 4096 -## mqtt.listener.tcp.buffer = 4096 -## mqtt.listener.tcp.nodelay = true +listener.tcp.external.backlog = 1024 -## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 -mqtt.listener.ssl = 8883 +#listener.tcp.external.recbuf = 4KB + +#listener.tcp.external.sndbuf = 4KB + +listener.tcp.external.buffer = 4KB + +listener.tcp.external.nodelay = true + +##-------------------------------------------------------------------- +## Internal TCP Listener + +## Internal TCP Listener: 11883, 127.0.0.1:11883, ::1:11883 +listener.tcp.internal = 127.0.0.1:11883 ## Size of acceptor pool -mqtt.listener.ssl.acceptors = 4 +listener.tcp.internal.acceptors = 16 ## Maximum number of concurrent clients -mqtt.listener.ssl.max_clients = 512 +listener.tcp.internal.max_clients = 102400 + +#listener.tcp.external.mountpoint = internal/ ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.ssl.rate_limit = 100,10 +## listener.tcp.internal.rate_limit = 1000,100 + +## TCP Socket Options +listener.tcp.internal.backlog = 512 + +listener.tcp.internal.tune_buffer = on + +listener.tcp.internal.buffer = 1MB + +listener.tcp.internal.recbuf = 4KB + +listener.tcp.internal.sndbuf = 1MB + +listener.tcp.internal.nodelay = true + +##-------------------------------------------------------------------- +## External SSL Listener + +## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 +listener.ssl.external = 8883 + +## Size of acceptor pool +listener.ssl.external.acceptors = 16 + +## Maximum number of concurrent clients +listener.ssl.external.max_clients = 1024 + +## listener.ssl.external.mountpoint = inbound/ + +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## listener.ssl.external.rate_limit = 100,10 + +## Proxy Protocol V1/2 +## listener.ssl.external.proxy_protocol = on +## listener.ssl.external.proxy_protocol_timeout = 3s + +listener.ssl.external.access.1 = allow all + +### SSL Options. See http://erlang.org/doc/man/ssl.html ## Configuring SSL Options. See http://erlang.org/doc/man/ssl.html ### TLS only for POODLE attack -mqtt.listener.ssl.tls_versions = tlsv1.2,tlsv1.1,tlsv1 -mqtt.listener.ssl.handshake_timeout = 15s -mqtt.listener.ssl.keyfile = certs/key.pem -mqtt.listener.ssl.certfile = certs/cert.pem -## mqtt.listener.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem -## mqtt.listener.ssl.verify = verify_peer -## mqtt.listener.ssl.fail_if_no_peer_cert = true +## listener.ssl.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1 -## HTTP and WebSocket Listener -mqtt.listener.http = 8083 -mqtt.listener.http.acceptors = 4 -mqtt.listener.http.max_clients = 64 +### The Ephemeral Diffie-Helman key exchange is a very effective way of +### ensuring Forward Secrecy by exchanging a set of keys that never hit +### the wire. Since the DH key is effectively signed by the private key, +### it needs to be at least as strong as the private key. In addition, +### the default DH groups that most of the OpenSSL installations have +### are only a handful (since they are distributed with the OpenSSL +### package that has been built for the operating system it’s running on) +### and hence predictable (not to mention, 1024 bits only). -## HTTP(SSL) Listener -mqtt.listener.https = 8084 -mqtt.listener.https.acceptors = 4 -mqtt.listener.https.max_clients = 64 -mqtt.listener.https.handshake_timeout = 15 -mqtt.listener.https.keyfile = certs/key.pem -mqtt.listener.https.certfile = certs/cert.pem -## mqtt.listener.https.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem +### In order to escape this situation, first we need to generate a fresh, +### strong DH group, store it in a file and then use the option above, +### to force our SSL application to use the new DH group. Fortunately, +### OpenSSL provides us with a tool to do that. Simply run: +### openssl dhparam -out dh-params.pem 2048 -## mqtt.listener.https.verify = verify_peer -## mqtt.listener.https.fail_if_no_peer_cert = true +listener.ssl.external.handshake_timeout = 15s + +listener.ssl.external.keyfile = certs/key.pem + +listener.ssl.external.certfile = certs/cert.pem + +## listener.ssl.external.cacertfile = certs/cacert.pem + +## listener.ssl.external.dhfile = certs/dh-params.pem + +## listener.ssl.external.verify = verify_peer + +## listener.ssl.external.fail_if_no_peer_cert = true + +### This is the single most important configuration option of an Erlang SSL application. +### Ciphers (and their ordering) define the way the client and server encrypt information +### over the wire, from the initial Diffie-Helman key exchange, the session key encryption +### algorithm and the message digest algorithm. Selecting a good cipher suite is critical +### for the application’s data security, confidentiality and performance. +### The cipher list above offers: +### +### A good balance between compatibility with older browsers. It can get stricter for Machine-To-Machine scenarios. +### Perfect Forward Secrecy. +### No old/insecure encryption and HMAC algorithms +### +### Most of it was copied from Mozilla’s Server Side TLS article +## listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA + +### SSL parameter renegotiation is a feature that allows a client and +### a server to renegotiate the parameters of the SSL connection on the fly. +### RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation, +### you drop support for the insecure renegotiation, prone to MitM attacks. +## listener.ssl.external.secure_renegotiate = off + +### A performance optimization setting, it allows clients to reuse +### pre-existing sessions, instead of initializing new ones. +### Read more about it here. +## listener.ssl.external.reuse_sessions = on + +### An important security setting, it forces the cipher to be set based on +### the server-specified order instead of the client-specified order, +### hence enforcing the (usually more properly configured) security +### ordering of the server administrator. +## listener.ssl.external.honor_cipher_order = on + +### Use the CN or DN value from the client certificate as a username. +### Notice: 'verify' should be configured as 'verify_peer' +## listener.ssl.external.peer_cert_as_username = cn + +##-------------------------------------------------------------------- +## External MQTT/WebSocket Listener + +listener.ws.external = 8083 + +listener.ws.external.acceptors = 4 + +listener.ws.external.max_clients = 64 + +listener.ws.external.access.1 = allow all + +## TCP Options +listener.ws.external.backlog = 1024 + +listener.ws.external.recbuf = 4KB + +listener.ws.external.sndbuf = 4KB + +listener.ws.external.buffer = 4KB + +listener.ws.external.nodelay = true + +##-------------------------------------------------------------------- +## External MQTT/WebSocket/SSL Listener + +listener.wss.external = 8084 + +listener.wss.external.acceptors = 4 + +listener.wss.external.max_clients = 64 + +listener.wss.external.access.1 = allow all + +## SSL Options +listener.wss.external.handshake_timeout = 15s + +listener.wss.external.keyfile = certs/key.pem + +listener.wss.external.certfile = certs/cert.pem + +## listener.wss.external.cacertfile = certs/cacert.pem + +## listener.wss.external.verify = verify_peer + +## listener.wss.external.fail_if_no_peer_cert = true ##------------------------------------------------------------------- ## System Monitor diff --git a/test/emqttd_SUITE_data/emqttd.schema b/test/emqttd_SUITE_data/emqttd.schema index 530984383..9b20ea4c2 100644 --- a/test/emqttd_SUITE_data/emqttd.schema +++ b/test/emqttd_SUITE_data/emqttd.schema @@ -1,13 +1,37 @@ %%-*- mode: erlang -*- %% EMQ config mapping +%%-------------------------------------------------------------------- +%% Cluster +%%-------------------------------------------------------------------- + +%% Cluster ID +{mapping, "cluster.id", "emqttd.cluster", [ + {default, "emq"}, + {datatype, string} +]}. + +%% Cluster Multicast Addr +{mapping, "cluster.multicast", "emqttd.cluster", [ + {default, "239.192.0.1:44369"}, + {datatype, string} +]}. + +{translation, "emqttd.cluster", fun(Conf) -> + Multicast = cuttlefish:conf_get("cluster.multicast", Conf), + [Addr, Port] = string:tokens(Multicast, ":"), + {ok, Ip} = inet_parse:address(Addr), + [{id, cuttlefish:conf_get("cluster.id", Conf)}, + {multicast, {Ip, list_to_integer(Port)}}] +end}. + %%-------------------------------------------------------------------- %% Erlang Node %%-------------------------------------------------------------------- %% @doc Erlang node name {mapping, "node.name", "vm_args.-name", [ - {default, "emqttd@127.0.0.1"} + {default, "emq@127.0.0.1"} ]}. %% @doc Secret cookie for distributed erlang node @@ -132,14 +156,14 @@ end}. %% @doc http://www.erlang.org/doc/man/kernel_app.html {mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ - {commented, 6000}, + {commented, 6369}, {datatype, integer}, hidden ]}. %% @see node.dist_listen_min {mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ - {commented, 6999}, + {commented, 6369}, {datatype, integer}, hidden ]}. @@ -179,7 +203,7 @@ end}. ]}. {mapping, "log.syslog.identity", "lager.handlers", [ - {default, "emq"}, + {default, "emqttd"}, {datatype, string} ]}. @@ -189,7 +213,7 @@ end}. ]}. {mapping, "log.syslog.level", "lager.handlers", [ - {default, err}, + {default, error}, {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}} ]}. @@ -283,6 +307,12 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc ACL nomatch +{mapping, "mqtt.acl_nomatch", "emqttd.acl_nomatch", [ + {default, allow}, + {datatype, {enum, [allow, deny]}} +]}. + %% @doc Default ACL File {mapping, "mqtt.acl_file", "emqttd.acl_file", [ {datatype, string}, @@ -316,10 +346,25 @@ end}. {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] end}. +%%-------------------------------------------------------------------- +%% MQTT Connection +%%-------------------------------------------------------------------- + +%% @doc Force the client to GC: integer +{mapping, "mqtt.conn.force_gc_count", "emqttd.conn_force_gc_count", [ + {datatype, integer} +]}. + %%-------------------------------------------------------------------- %% MQTT Client %%-------------------------------------------------------------------- +%% @doc Max Publish Rate of Message +{mapping, "mqtt.client.max_publish_rate", "emqttd.client", [ + {default, 0}, + {datatype, integer} +]}. + %% @doc Client Idle Timeout. {mapping, "mqtt.client.idle_timeout", "emqttd.client", [ {default, "30s"}, @@ -329,12 +374,12 @@ end}. %% @doc Enable Stats of Client. {mapping, "mqtt.client.enable_stats", "emqttd.client", [ {default, off}, - {datatype, [{duration, ms}, flag]} + {datatype, flag} ]}. -%% @doc Client {translation, "emqttd.client", fun(Conf) -> - [{client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, + [{max_publish_rate, cuttlefish:conf_get("mqtt.client.max_publish_rate", Conf)}, + {client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}] end}. @@ -342,11 +387,18 @@ end}. %% MQTT Session %%-------------------------------------------------------------------- +%% @doc Max Number of Subscriptions Allowed +{mapping, "mqtt.session.max_subscriptions", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + %% @doc Upgrade QoS? {mapping, "mqtt.session.upgrade_qos", "emqttd.session", [ {default, off}, {datatype, flag} ]}. + %% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. %% 0 means no limit {mapping, "mqtt.session.max_inflight", "emqttd.session", [ @@ -375,7 +427,7 @@ end}. %% @doc Enable Stats {mapping, "mqtt.session.enable_stats", "emqttd.session", [ {default, off}, - {datatype, [{duration, ms}, flag]} + {datatype, flag} ]}. %% @doc Session Expiry Interval @@ -384,72 +436,80 @@ end}. {datatype, {duration, ms}} ]}. +%% @doc Ignore message from self publish +{mapping, "mqtt.session.ignore_loop_deliver", "emqttd.session", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqttd.session", fun(Conf) -> - [{upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, + [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)}, + {upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, {max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, {enable_stats, cuttlefish:conf_get("mqtt.session.enable_stats", Conf)}, - {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}] + {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}, + {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}] end}. %%-------------------------------------------------------------------- -%% MQTT Queue +%% MQTT MQueue %%-------------------------------------------------------------------- %% @doc Type: simple | priority -{mapping, "mqtt.queue.type", "emqttd.queue", [ +{mapping, "mqtt.mqueue.type", "emqttd.mqueue", [ {default, simple}, {datatype, atom} ]}. %% @doc Topic Priority: 0~255, Default is 0 -{mapping, "mqtt.queue.priority", "emqttd.queue", [ +{mapping, "mqtt.mqueue.priority", "emqttd.mqueue", [ {default, ""}, {datatype, string} ]}. -%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. -{mapping, "mqtt.queue.max_length", "emqttd.queue", [ - {default, infinity}, - {datatype, [integer, {atom, infinity}]} +%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. 0 means no limit. +{mapping, "mqtt.mqueue.max_length", "emqttd.mqueue", [ + {default, 0}, + {datatype, integer} ]}. %% @doc Low-water mark of queued messages -{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [ +{mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [ {default, "20%"}, {datatype, string} ]}. %% @doc High-water mark of queued messages -{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [ +{mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [ {default, "60%"}, {datatype, string} ]}. %% @doc Queue Qos0 messages? -{mapping, "mqtt.queue.qos0", "emqttd.queue", [ +{mapping, "mqtt.mqueue.store_qos0", "emqttd.mqueue", [ {default, true}, {datatype, {enum, [true, false]}} ]}. -{translation, "emqttd.queue", fun(Conf) -> +{translation, "emqttd.mqueue", fun(Conf) -> Parse = fun(S) -> {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), list_to_integer(N) / 100 end, - Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)}, - {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)}, - {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))}, - {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))}, - {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}], - case cuttlefish:conf_get("mqtt.queue.priority", Conf) of + Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)}, + {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)}, + {low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))}, + {high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))}, + {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}], + case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of undefined -> Opts; - V -> [{priority, - [begin [T, P] = string:tokens(S, "="), - {T, list_to_integer(P)} - end || S <- string:tokens(V, ",")]}|Opts] + V -> [{priority, + [begin [T, P] = string:tokens(S, "="), + {T, list_to_integer(P)} + end || S <- string:tokens(V, ",")]} | Opts] end end}. @@ -522,161 +582,389 @@ end}. %% MQTT Listeners %%-------------------------------------------------------------------- -{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ - %% {default, 1883}, +%%-------------------------------------------------------------------- +%% TCP Listeners + +{mapping, "listener.tcp.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.max_clients", "emqttd.listeners", [ {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.rate_limit", "emqttd.listeners", [ {default, undefined}, {datatype, string} ]}. -{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.proxy_protocol", "emqttd.listeners", [ + %%{default, off}, + {datatype, flag} +]}. + +{mapping, "listener.tcp.$name.proxy_protocol_timeout", "emqttd.listeners", [ + %%{default, "5s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [ {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.tcp.$name.nodelay", "emqttd.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. -{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ - %% {default, 8883}, +%%-------------------------------------------------------------------- +%% SSL Listeners + +{mapping, "listener.ssl.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ - {default, 512}, +{mapping, "listener.ssl.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.zone", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.tls_versions", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.mountpoint", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.rate_limit", "emqttd.listeners", [ + {default, undefined}, + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.proxy_protocol", "emqttd.listeners", [ + %%{default, off}, + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.proxy_protocol_timeout", "emqttd.listeners", [ + %%{default, "5s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "listener.ssl.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.ssl.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ssl.$name.tls_versions", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.ciphers", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.handshake_timeout", "emqttd.listeners", [ {default, "15s"}, {datatype, {duration, ms}} ]}. -{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.dhfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.keyfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.certfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.verify", "emqttd.listeners", [ {datatype, atom} ]}. -{mapping, "mqtt.listener.ssl.fail_if_no_peer_cert", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.fail_if_no_peer_cert", "emqttd.listeners", [ {datatype, {enum, [true, false]}} ]}. -{mapping, "mqtt.listener.http", "emqttd.listeners", [ - %% {default, 8083}, +{mapping, "listener.ssl.$name.secure_renegotiate", "emqttd.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.reuse_sessions", "emqttd.listeners", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.honor_cipher_order", "emqttd.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.peer_cert_as_username", "emqttd.listeners", [ + {datatype, {enum, [cn, dn]}} +]}. + +%%-------------------------------------------------------------------- +%% MQTT/WebSocket Listeners + +{mapping, "listener.ws.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ +{mapping, "listener.ws.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ - {default, 64}, +{mapping, "listener.ws.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https", "emqttd.listeners", [ - %%{default, 8084}, +{mapping, "listener.ws.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.ws.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT/WebSocket/SSL Listeners + +{mapping, "listener.wss.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [ +{mapping, "listener.wss.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [ - {default, 64}, +{mapping, "listener.wss.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [ - {default, 15}, +{mapping, "listener.wss.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.backlog", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.wss.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.wss.$name.handshake_timeout", "emqttd.listeners", [ + {default, "15s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "listener.wss.$name.keyfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.certfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.cacertfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [ +{mapping, "listener.wss.$name.verify", "emqttd.listeners", [ {datatype, atom} ]}. -{mapping, "mqtt.listener.https.fail_if_no_peer_cert", "emqttd.listeners", [ +{mapping, "listener.wss.$name.fail_if_no_peer_cert", "emqttd.listeners", [ {datatype, {enum, [true, false]}} ]}. {translation, "emqttd.listeners", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + + Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end, + + Access = fun(S) -> + [A, CIDR] = string:tokens(S, " "), + {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} + end, + + AccOpts = fun(Prefix) -> + case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of + [] -> []; + Rules -> [{access, [Access(Rule) || {_, Rule} <- Rules]}] + end + end, + + MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, + + ConnOpts = fun(Prefix) -> + Filter([{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, + {rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, + {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, + {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, + {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, + {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}]) + end, + LisOpts = fun(Prefix) -> Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, - {rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}]) - end, + {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)} | AccOpts(Prefix)]) + end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, @@ -693,29 +981,48 @@ end}. L -> [list_to_atom(V) || V <- L] end, Filter([{versions, Versions}, - {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf), undefined}, + {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, + {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, + {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, - {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}]) + {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}, + {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)}, + {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)}, + {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}]) end, - Listeners = fun(Name) when is_atom(Name) -> - Key = "mqtt.listener." ++ atom_to_list(Name), - case cuttlefish:conf_get(Key, Conf, undefined) of - undefined -> - []; - Port -> - ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]), - Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)], - [{Name, Port, case Name =:= ssl orelse Name =:= https of - true -> [{sslopts, SslOpts(Key)} | Opts]; - false -> Opts - end}] - end - end, - lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)]) + TcpListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, {sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + end + end, + + SslListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, + {sockopts, TcpOpts(Prefix)}, + {sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}] + end + end, + + lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] + ++ + [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) end}. %%--------------------------------------------------------------------