From b2cb5f98b63ee5d0587d30b9537ab6c81d6d44e0 Mon Sep 17 00:00:00 2001 From: Feng Date: Sun, 27 Nov 2016 18:10:22 +0800 Subject: [PATCH 01/24] Proxy Protocol V1 --- Makefile | 2 +- etc/emq.conf | 10 ++++++++ priv/emq.schema | 63 +++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index eb0205a8d..98da008ac 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master dep_gen_logger = git https://github.com/emqtt/gen_logger -dep_esockd = git https://github.com/emqtt/esockd emq20 +dep_esockd = git https://github.com/emqtt/esockd proxy-protocol dep_mochiweb = git https://github.com/emqtt/mochiweb #dep_clique = git https://github.com/basho/clique #dep_pbkdf2 = git https://github.com/basho/erlang-pbkdf2 2.0.0 diff --git a/etc/emq.conf b/etc/emq.conf index c6f11331b..be4c78c25 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -193,6 +193,10 @@ mqtt.listener.tcp.max_clients = 1024 ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec ## mqtt.listener.tcp.rate_limit = 100,10 +## Proxy Protocol V1 +mqtt.listener.tcp.proxy_protocol = 1 +mqtt.listener.tcp.proxy_protocol_timeout = 10 + ## TCP Socket Options mqtt.listener.tcp.backlog = 1024 ## mqtt.listener.tcp.recbuf = 4096 @@ -211,6 +215,8 @@ mqtt.listener.ssl.max_clients = 512 ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec ## mqtt.listener.ssl.rate_limit = 100,10 +mqtt.listener.ssl.proxy_protocol = 1 +mqtt.listener.ssl.proxy_protocol_timeout = 10 ## Configuring SSL Options ## See http://erlang.org/doc/man/ssl.html @@ -226,6 +232,10 @@ mqtt.listener.http = 8083 mqtt.listener.http.acceptors = 4 mqtt.listener.http.max_clients = 64 +## Proxy Protocol V1 +mqtt.listener.http.proxy_protocol = 1 +mqtt.listener.http.proxy_protocol_timeout = 10 + ## HTTP(SSL) Listener mqtt.listener.https = 8084 mqtt.listener.https.acceptors = 4 diff --git a/priv/emq.schema b/priv/emq.schema index 9e0846159..c8580287f 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -491,6 +491,19 @@ end}. hidden ]}. +{mapping, "mqtt.listener.tcp.proxy_protocol", "emqttd.listeners", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:1-2"]}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.proxy_protocol_timeout", "emqttd.listeners", [ + {default, 10}, + {datatype, integer}, + hidden +]}. + {mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ {default, 1024}, {datatype, integer} @@ -535,6 +548,19 @@ end}. {datatype, string} ]}. +{mapping, "mqtt.listener.ssl.proxy_protocol", "emqttd.listeners", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:1-2"]}, + hidden +]}. + +{mapping, "mqtt.listener.ssl.proxy_protocol_timeout", "emqttd.listeners", [ + {default, 10}, + {datatype, integer}, + hidden +]}. + {mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [ {default, 15}, {datatype, integer} @@ -575,6 +601,19 @@ end}. {datatype, integer} ]}. +{mapping, "mqtt.listener.http.proxy_protocol", "emqttd.listeners", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:1-2"]}, + hidden +]}. + +{mapping, "mqtt.listener.http.proxy_protocol_timeout", "emqttd.listeners", [ + {default, 10}, + {datatype, integer}, + hidden +]}. + {mapping, "mqtt.listener.https", "emqttd.listeners", [ {default, undefined}, {datatype, [integer, ip]}, @@ -591,6 +630,18 @@ end}. {datatype, integer} ]}. +{mapping, "mqtt.listener.https.proxy_protocol", "emqttd.listeners", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:1-2"]}, + hidden +]}. + +{mapping, "mqtt.listener.https.proxy_protocol_timeout", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + {mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [ {default, 15}, {datatype, integer} @@ -620,9 +671,8 @@ end}. Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, LisOpts = fun(Prefix) -> Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, - {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, - {rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}]) - end, + {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}]) + end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, @@ -645,7 +695,12 @@ end}. undefined -> []; Port -> - ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]), + ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}, + {proxy_protocol, cuttlefish:conf_get(Key ++ ".proxy_protocol", Conf, undefined)}, + {proxy_protocol_timeout, case cuttlefish:conf_get(Key ++ ".proxy_protocol_timeout", Conf, undefined) of + undefined -> undefined; + I -> I * 1000 + end}]), Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)], [{Name, Port, case Name =:= ssl orelse Name =:= https of true -> [{ssl, SslOpts(Key)} | Opts]; From b3cb875eb2e6cf7d4a11a852a5695492afaaf05f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 22 Mar 2017 10:13:17 +0800 Subject: [PATCH 02/24] Depends on the 'emq22' branch of esockd, mochiweb --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 80000fcf1..5b46ef5ce 100644 --- a/Makefile +++ b/Makefile @@ -9,8 +9,8 @@ DEPS = gproc lager esockd mochiweb lager_syslog pbkdf2 dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master -dep_esockd = git https://github.com/emqtt/esockd v4.2 -dep_mochiweb = git https://github.com/emqtt/mochiweb +dep_esockd = git https://github.com/emqtt/esockd emq22 +dep_mochiweb = git https://github.com/emqtt/mochiweb emq22 dep_lager_syslog = git https://github.com/basho/lager_syslog dep_pbkdf2 = git https://github.com/comtihon/erlang-pbkdf2.git 2.0.0 From f0e6fc16c7581a80b2775e994d233bad8cbcbd5d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 22 Mar 2017 10:48:12 +0800 Subject: [PATCH 03/24] Version 2.2 --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 5b46ef5ce..89289c9cd 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker -PROJECT_VERSION = 2.1.0 +PROJECT_VERSION = 2.2 NO_AUTOPATCH = cuttlefish From 0b13cd78e68a3abc05d747820d018fdd8f564d30 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 22 Mar 2017 10:48:24 +0800 Subject: [PATCH 04/24] Version 2.2 --- src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index d03a4d952..0eef4e383 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,6 +1,6 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "2.1.0"}, + {vsn, "2.2"}, {modules, []}, {registered, [emqttd_sup]}, {applications, [kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog,pbkdf2]}, From f321bcdaceb87227dd4c3391bb1be1c2ba2ac1b1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 22 Mar 2017 10:48:47 +0800 Subject: [PATCH 05/24] Support proxy protocol v1/2 config --- etc/emq.conf | 16 +++++++--------- priv/emq.schema | 40 ++++++---------------------------------- 2 files changed, 13 insertions(+), 43 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 49ec5ac10..94e7a4ecf 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -228,9 +228,9 @@ mqtt.listener.tcp.max_clients = 1024 ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec ## mqtt.listener.tcp.rate_limit = 100,10 -## Proxy Protocol V1 -mqtt.listener.tcp.proxy_protocol = 1 -mqtt.listener.tcp.proxy_protocol_timeout = 10 +## Proxy Protocol V1/2 +## mqtt.listener.tcp.proxy_protocol = on +## mqtt.listener.tcp.proxy_protocol_timeout = 3s ## TCP Socket Options mqtt.listener.tcp.backlog = 1024 @@ -250,8 +250,10 @@ mqtt.listener.ssl.max_clients = 512 ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec ## mqtt.listener.ssl.rate_limit = 100,10 -mqtt.listener.ssl.proxy_protocol = 1 -mqtt.listener.ssl.proxy_protocol_timeout = 10 + +## Proxy Protocol V1/2 +## mqtt.listener.ssl.proxy_protocol = on +## mqtt.listener.ssl.proxy_protocol_timeout = 3s ## Configuring SSL Options. See http://erlang.org/doc/man/ssl.html ### TLS only for POODLE attack @@ -268,10 +270,6 @@ mqtt.listener.http = 8083 mqtt.listener.http.acceptors = 4 mqtt.listener.http.max_clients = 64 -## Proxy Protocol V1 -mqtt.listener.http.proxy_protocol = 1 -mqtt.listener.http.proxy_protocol_timeout = 10 - ## HTTP(SSL) Listener mqtt.listener.https = 8084 mqtt.listener.https.acceptors = 4 diff --git a/priv/emq.schema b/priv/emq.schema index 3ef4fe60d..af77a8998 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -553,16 +553,13 @@ end}. ]}. {mapping, "mqtt.listener.tcp.proxy_protocol", "emqttd.listeners", [ - {default, 1}, - {datatype, integer}, - {validators, ["range:1-2"]}, - hidden + %%{default, off}, + {datatype, flag} ]}. {mapping, "mqtt.listener.tcp.proxy_protocol_timeout", "emqttd.listeners", [ - {default, 10}, - {datatype, integer}, - hidden + %%{default, "5s"}, + {datatype, {duration, ms}} ]}. {mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ @@ -615,12 +612,12 @@ end}. ]}. {mapping, "mqtt.listener.ssl.proxy_protocol", "emqttd.listeners", [ - {default, off}, + %%{default, off}, {datatype, flag} ]}. {mapping, "mqtt.listener.ssl.proxy_protocol_timeout", "emqttd.listeners", [ - {default, 5s}, + %%{default, "5s"}, {datatype, {duration, ms}} ]}. @@ -668,19 +665,6 @@ end}. {datatype, integer} ]}. -{mapping, "mqtt.listener.http.proxy_protocol", "emqttd.listeners", [ - {default, 1}, - {datatype, integer}, - {validators, ["range:1-2"]}, - hidden -]}. - -{mapping, "mqtt.listener.http.proxy_protocol_timeout", "emqttd.listeners", [ - {default, 10}, - {datatype, integer}, - hidden -]}. - {mapping, "mqtt.listener.https", "emqttd.listeners", [ %%{default, 8084}, {datatype, [integer, ip]} @@ -696,18 +680,6 @@ end}. {datatype, integer} ]}. -{mapping, "mqtt.listener.https.proxy_protocol", "emqttd.listeners", [ - {default, 1}, - {datatype, integer}, - {validators, ["range:1-2"]}, - hidden -]}. - -{mapping, "mqtt.listener.https.proxy_protocol_timeout", "emqttd.listeners", [ - {datatype, integer}, - hidden -]}. - {mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [ {default, 15}, {datatype, integer} From a0c60f11cf7a18879e7043f5d58a31fe4cc6b80d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 22 Mar 2017 10:49:09 +0800 Subject: [PATCH 06/24] Depends on 'emq22' branch --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 4cc2a59d1..dc8500144 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","v4.2"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{pbkdf2,".*",{git,"https://github.com/comtihon/erlang-pbkdf2.git","2.0.0"}} +{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq22"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","emq22"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{pbkdf2,".*",{git,"https://github.com/comtihon/erlang-pbkdf2.git","2.0.0"}} ]}. {erl_opts, [{parse_transform,lager_transform}]}. From e469ffe7c7d05ca6740bdb91089b3b81a613938d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 15:04:02 +0800 Subject: [PATCH 07/24] Support multiple TCP/SSL listeners --- etc/emq.conf | 267 +++++++++++++++++++++++------ priv/emq.schema | 447 ++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 563 insertions(+), 151 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 94e7a4ecf..6b8e784f4 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -1,17 +1,27 @@ ##=================================================================== -## EMQ Configuration R2.1 +## EMQ Configuration R2.2 ##=================================================================== +##-------------------------------------------------------------------- +## Cluster +##-------------------------------------------------------------------- + +## The cluster Id +cluster.id = emq + +## The multicast address and port. +cluster.multicast = 239.192.0.1:44369 + ##-------------------------------------------------------------------- ## Node Args ##-------------------------------------------------------------------- ## Node name -node.name = emqttd@127.0.0.1 +node.name = emq@127.0.0.1 ## Cookie for distributed node -node.cookie = emq_dist_cookie +node.cookie = emqsecretcookie ## SMP support: enable, auto, disable node.smp = auto @@ -50,8 +60,8 @@ node.crash_dump = {{ platform_log_dir }}/crash.dump node.dist_net_ticktime = 60 ## Distributed node port range -## node.dist_listen_min = 6369 -## node.dist_listen_max = 6369 +node.dist_listen_min = 6369 +node.dist_listen_max = 6369 ##-------------------------------------------------------------------- ## Log @@ -63,15 +73,15 @@ log.dir = {{ platform_log_dir }} ## Console log. Enum: off, file, console, both log.console = console +## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency +log.console.level = error + ## Syslog. Enum: on, off log.syslog = on ## syslog level. Enum: debug, info, notice, warning, error, critical, alert, emergency log.syslog.level = error -## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency -log.console.level = error - ## Console log file ## log.console.file = {{ platform_log_dir }}/console.log @@ -120,6 +130,9 @@ mqtt.conn.force_gc_count = 100 ## Client Idle Timeout (Second) mqtt.client.idle_timeout = 30s +## Max publish rate of Messages +## mqtt.client.max_publish_rate = 5 + ## Enable client Stats: on | off mqtt.client.enable_stats = off @@ -127,6 +140,9 @@ mqtt.client.enable_stats = off ## MQTT Session ##-------------------------------------------------------------------- +## Max Number of Subscriptions, 0 means no limit. +mqtt.session.max_subscriptions = 0 + ## Upgrade QoS? mqtt.session.upgrade_qos = off @@ -155,27 +171,27 @@ mqtt.session.enable_stats = off mqtt.session.expiry_interval = 2h ##-------------------------------------------------------------------- -## MQTT Queue +## MQTT Message Queue ##-------------------------------------------------------------------- ## Type: simple | priority -mqtt.queue.type = simple +mqtt.mqueue.type = simple ## Topic Priority: 0~255, Default is 0 -## mqtt.queue.priority = topic/1=10,topic/2=8 +## mqtt.mqueue.priority = topic/1=10,topic/2=8 ## Max queue length. Enqueued messages when persistent client disconnected, -## or inflight window is full. -mqtt.queue.max_length = infinity +## or inflight window is full. 0 means no limit. +mqtt.mqueue.max_length = 0 ## Low-water mark of queued messages -mqtt.queue.low_watermark = 20% +mqtt.mqueue.low_watermark = 20% ## High-water mark of queued messages -mqtt.queue.high_watermark = 60% +mqtt.mqueue.high_watermark = 60% ## Queue Qos0 messages? -mqtt.queue.qos0 = true +mqtt.mqueue.store_qos0 = true ##-------------------------------------------------------------------- ## MQTT Broker and PubSub @@ -216,71 +232,210 @@ mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins ## MQTT Listeners ##-------------------------------------------------------------------- -## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 -mqtt.listener.tcp = 1883 +##-------------------------------------------------------------------- +## External TCP Listener + +## External TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 +listener.tcp.external = 0.0.0.0:1883 ## Size of acceptor pool -mqtt.listener.tcp.acceptors = 8 +listener.tcp.external.acceptors = 16 ## Maximum number of concurrent clients -mqtt.listener.tcp.max_clients = 1024 +listener.tcp.external.max_clients = 102400 + +#listener.tcp.external.mountpoint = external/ ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.tcp.rate_limit = 100,10 +#listener.tcp.external.rate_limit = 100,10 + +#listener.tcp.external.access.1 = allow 192.168.0.0/24 + +listener.tcp.external.access.2 = allow all ## Proxy Protocol V1/2 -## mqtt.listener.tcp.proxy_protocol = on -## mqtt.listener.tcp.proxy_protocol_timeout = 3s +## listener.tcp.external.proxy_protocol = on +## listener.tcp.external.proxy_protocol_timeout = 3s ## TCP Socket Options -mqtt.listener.tcp.backlog = 1024 -## mqtt.listener.tcp.recbuf = 4096 -## mqtt.listener.tcp.sndbuf = 4096 -## mqtt.listener.tcp.buffer = 4096 -## mqtt.listener.tcp.nodelay = true +listener.tcp.external.backlog = 1024 -## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 -mqtt.listener.ssl = 8883 +#listener.tcp.external.recbuf = 4KB + +#listener.tcp.external.sndbuf = 4KB + +listener.tcp.external.buffer = 4KB + +listener.tcp.external.nodelay = true + +##-------------------------------------------------------------------- +## Internal TCP Listener + +## Internal TCP Listener: 11883, 127.0.0.1:11883, ::1:11883 +listener.tcp.internal = 127.0.0.1:11883 ## Size of acceptor pool -mqtt.listener.ssl.acceptors = 4 +listener.tcp.internal.acceptors = 16 ## Maximum number of concurrent clients -mqtt.listener.ssl.max_clients = 512 +listener.tcp.internal.max_clients = 102400 + +#listener.tcp.external.mountpoint = internal/ ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.ssl.rate_limit = 100,10 +## listener.tcp.internal.rate_limit = 1000,100 + +## TCP Socket Options +listener.tcp.internal.backlog = 512 + +listener.tcp.internal.tune_buffer = on + +listener.tcp.internal.buffer = 1MB + +listener.tcp.internal.recbuf = 4KB + +listener.tcp.internal.sndbuf = 1MB + +listener.tcp.internal.nodelay = true + +##-------------------------------------------------------------------- +## External SSL Listener + +## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 +listener.ssl.external = 8883 + +## Size of acceptor pool +listener.ssl.external.acceptors = 16 + +## Maximum number of concurrent clients +listener.ssl.external.max_clients = 1024 + +## listener.ssl.external.mountpoint = inbound/ + +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## listener.ssl.external.rate_limit = 100,10 ## Proxy Protocol V1/2 -## mqtt.listener.ssl.proxy_protocol = on -## mqtt.listener.ssl.proxy_protocol_timeout = 3s +## listener.ssl.external.proxy_protocol = on +## listener.ssl.external.proxy_protocol_timeout = 3s + +listener.ssl.external.access.1 = allow all + +### SSL Options. See http://erlang.org/doc/man/ssl.html ## Configuring SSL Options. See http://erlang.org/doc/man/ssl.html ### TLS only for POODLE attack -mqtt.listener.ssl.tls_versions = tlsv1.2,tlsv1.1,tlsv1 -mqtt.listener.ssl.handshake_timeout = 15s -mqtt.listener.ssl.keyfile = {{ platform_etc_dir }}/certs/key.pem -mqtt.listener.ssl.certfile = {{ platform_etc_dir }}/certs/cert.pem -## mqtt.listener.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem -## mqtt.listener.ssl.verify = verify_peer -## mqtt.listener.ssl.fail_if_no_peer_cert = true +## listener.ssl.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1 -## HTTP and WebSocket Listener -mqtt.listener.http = 8083 -mqtt.listener.http.acceptors = 4 -mqtt.listener.http.max_clients = 64 +### The Ephemeral Diffie-Helman key exchange is a very effective way of +### ensuring Forward Secrecy by exchanging a set of keys that never hit +### the wire. Since the DH key is effectively signed by the private key, +### it needs to be at least as strong as the private key. In addition, +### the default DH groups that most of the OpenSSL installations have +### are only a handful (since they are distributed with the OpenSSL +### package that has been built for the operating system it’s running on) +### and hence predictable (not to mention, 1024 bits only). -## HTTP(SSL) Listener -mqtt.listener.https = 8084 -mqtt.listener.https.acceptors = 4 -mqtt.listener.https.max_clients = 64 -mqtt.listener.https.handshake_timeout = 15 -mqtt.listener.https.keyfile = {{ platform_etc_dir }}/certs/key.pem -mqtt.listener.https.certfile = {{ platform_etc_dir }}/certs/cert.pem -## mqtt.listener.https.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem +### In order to escape this situation, first we need to generate a fresh, +### strong DH group, store it in a file and then use the option above, +### to force our SSL application to use the new DH group. Fortunately, +### OpenSSL provides us with a tool to do that. Simply run: +### openssl dhparam -out dh-params.pem 2048 -## mqtt.listener.https.verify = verify_peer -## mqtt.listener.https.fail_if_no_peer_cert = true +listener.ssl.external.handshake_timeout = 15s + +listener.ssl.external.keyfile = {{ platform_etc_dir }}/certs/key.pem + +listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem + +## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem + +## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem + +## listener.ssl.external.verify = verify_peer + +## listener.ssl.external.fail_if_no_peer_cert = true + +### This is the single most important configuration option of an Erlang SSL application. +### Ciphers (and their ordering) define the way the client and server encrypt information +### over the wire, from the initial Diffie-Helman key exchange, the session key encryption +### algorithm and the message digest algorithm. Selecting a good cipher suite is critical +### for the application’s data security, confidentiality and performance. +### The cipher list above offers: +### +### A good balance between compatibility with older browsers. It can get stricter for Machine-To-Machine scenarios. +### Perfect Forward Secrecy. +### No old/insecure encryption and HMAC algorithms +### +### Most of it was copied from Mozilla’s Server Side TLS article +## listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA + +### SSL parameter renegotiation is a feature that allows a client and +### a server to renegotiate the parameters of the SSL connection on the fly. +### RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation, +### you drop support for the insecure renegotiation, prone to MitM attacks. +## listener.ssl.external.secure_renegotiate = off + +### A performance optimization setting, it allows clients to reuse +### pre-existing sessions, instead of initializing new ones. +### Read more about it here. +## listener.ssl.external.reuse_sessions = on + +### An important security setting, it forces the cipher to be set based on +### the server-specified order instead of the client-specified order, +### hence enforcing the (usually more properly configured) security +### ordering of the server administrator. +## listener.ssl.external.honor_cipher_order = on + +### Use the CN or DN value from the client certificate as a username. +### Notice: 'verify' should be configured as 'verify_peer' +## listener.ssl.external.peer_cert_as_username = cn + +##-------------------------------------------------------------------- +## External MQTT/WebSocket Listener + +listener.ws.external = 8083 + +listener.ws.external.acceptors = 4 + +listener.ws.external.max_clients = 64 + +listener.ws.external.access.1 = allow all + +## TCP Options +listener.ws.external.backlog = 1024 + +listener.ws.external.recbuf = 4KB + +listener.ws.external.sndbuf = 4KB + +listener.ws.external.buffer = 4KB + +listener.ws.external.nodelay = true + +##-------------------------------------------------------------------- +## External MQTT/WebSocket/SSL Listener + +listener.wss.external = 8084 + +listener.wss.external.acceptors = 4 + +listener.wss.external.max_clients = 64 + +listener.wss.external.access.1 = allow all + +## SSL Options +listener.wss.external.handshake_timeout = 15s + +listener.wss.external.keyfile = {{ platform_etc_dir }}/certs/key.pem + +listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem + +## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem + +## listener.wss.external.verify = verify_peer + +## listener.wss.external.fail_if_no_peer_cert = true ##------------------------------------------------------------------- ## System Monitor diff --git a/priv/emq.schema b/priv/emq.schema index af77a8998..2ed196e2c 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1,13 +1,37 @@ %%-*- mode: erlang -*- %% EMQ config mapping +%%-------------------------------------------------------------------- +%% Cluster +%%-------------------------------------------------------------------- + +%% Cluster ID +{mapping, "cluster.id", "emqttd.cluster", [ + {default, "emq"}, + {datatype, string} +]}. + +%% Cluster Multicast Addr +{mapping, "cluster.multicast", "emqttd.cluster", [ + {default, "239.192.0.1:44369"}, + {datatype, string} +]}. + +{translation, "emqttd.cluster", fun(Conf) -> + Multicast = cuttlefish:conf_get("cluster.multicast", Conf), + [Addr, Port] = string:tokens(Multicast, ":"), + {ok, Ip} = inet_parse:address(Addr), + [{id, cuttlefish:conf_get("cluster.id", Conf)}, + {multicast, {Ip, list_to_integer(Port)}}] +end}. + %%-------------------------------------------------------------------- %% Erlang Node %%-------------------------------------------------------------------- %% @doc Erlang node name {mapping, "node.name", "vm_args.-name", [ - {default, "emqttd@127.0.0.1"} + {default, "emq@127.0.0.1"} ]}. %% @doc Secret cookie for distributed erlang node @@ -329,6 +353,12 @@ end}. %% MQTT Client %%-------------------------------------------------------------------- +%% @doc Max Publish Rate of Message +{mapping, "mqtt.client.max_publish_rate", "emqttd.client", [ + {default, 0}, + {datatype, integer} +]}. + %% @doc Client Idle Timeout. {mapping, "mqtt.client.idle_timeout", "emqttd.client", [ {default, "30s"}, @@ -341,9 +371,9 @@ end}. {datatype, flag} ]}. -%% @doc Client {translation, "emqttd.client", fun(Conf) -> - [{client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, + [{max_publish_rate, cuttlefish:conf_get("mqtt.client.max_publish_rate", Conf)}, + {client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}] end}. @@ -351,6 +381,12 @@ end}. %% MQTT Session %%-------------------------------------------------------------------- +%% @doc Max Number of Subscriptions Allowed +{mapping, "mqtt.session.max_subscriptions", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + %% @doc Upgrade QoS? {mapping, "mqtt.session.upgrade_qos", "emqttd.session", [ {default, off}, @@ -395,7 +431,8 @@ end}. ]}. {translation, "emqttd.session", fun(Conf) -> - [{upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, + [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)}, + {upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, {max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, @@ -405,61 +442,61 @@ end}. end}. %%-------------------------------------------------------------------- -%% MQTT Queue +%% MQTT MQueue %%-------------------------------------------------------------------- %% @doc Type: simple | priority -{mapping, "mqtt.queue.type", "emqttd.queue", [ +{mapping, "mqtt.mqueue.type", "emqttd.mqueue", [ {default, simple}, {datatype, atom} ]}. %% @doc Topic Priority: 0~255, Default is 0 -{mapping, "mqtt.queue.priority", "emqttd.queue", [ +{mapping, "mqtt.mqueue.priority", "emqttd.mqueue", [ {default, ""}, {datatype, string} ]}. -%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. -{mapping, "mqtt.queue.max_length", "emqttd.queue", [ - {default, infinity}, - {datatype, [integer, {atom, infinity}]} +%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. 0 means no limit. +{mapping, "mqtt.mqueue.max_length", "emqttd.mqueue", [ + {default, 0}, + {datatype, integer} ]}. %% @doc Low-water mark of queued messages -{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [ +{mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [ {default, "20%"}, {datatype, string} ]}. %% @doc High-water mark of queued messages -{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [ +{mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [ {default, "60%"}, {datatype, string} ]}. %% @doc Queue Qos0 messages? -{mapping, "mqtt.queue.qos0", "emqttd.queue", [ +{mapping, "mqtt.mqueue.store_qos0", "emqttd.mqueue", [ {default, true}, {datatype, {enum, [true, false]}} ]}. -{translation, "emqttd.queue", fun(Conf) -> +{translation, "emqttd.mqueue", fun(Conf) -> Parse = fun(S) -> {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), list_to_integer(N) / 100 end, - Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)}, - {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)}, - {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))}, - {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))}, - {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}], - case cuttlefish:conf_get("mqtt.queue.priority", Conf) of + Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)}, + {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)}, + {low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))}, + {high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))}, + {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}], + case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of undefined -> Opts; - V -> [{priority, - [begin [T, P] = string:tokens(S, "="), - {T, list_to_integer(P)} - end || S <- string:tokens(V, ",")]}|Opts] + V -> [{priority, + [begin [T, P] = string:tokens(S, "="), + {T, list_to_integer(P)} + end || S <- string:tokens(V, ",")]} | Opts] end end}. @@ -532,185 +569,388 @@ end}. %% MQTT Listeners %%-------------------------------------------------------------------- -{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ - %% {default, 1883}, +%%-------------------------------------------------------------------- +%% TCP Listeners + +{mapping, "listener.tcp.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.max_clients", "emqttd.listeners", [ {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.rate_limit", "emqttd.listeners", [ {default, undefined}, {datatype, string} ]}. -{mapping, "mqtt.listener.tcp.proxy_protocol", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.proxy_protocol", "emqttd.listeners", [ %%{default, off}, {datatype, flag} ]}. -{mapping, "mqtt.listener.tcp.proxy_protocol_timeout", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.proxy_protocol_timeout", "emqttd.listeners", [ %%{default, "5s"}, {datatype, {duration, ms}} ]}. -{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [ {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.tune_buffer", "emqttd.listeners", [ - {default, off}, - {datatype, flag} +{mapping, "listener.tcp.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden ]}. -{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.nodelay", "emqttd.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. -{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ - %% {default, 8883}, +%%-------------------------------------------------------------------- +%% SSL Listeners + +{mapping, "listener.ssl.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ - {default, 512}, +{mapping, "listener.ssl.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.zone", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.proxy_protocol", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.rate_limit", "emqttd.listeners", [ + {default, undefined}, + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.proxy_protocol", "emqttd.listeners", [ %%{default, off}, {datatype, flag} ]}. -{mapping, "mqtt.listener.ssl.proxy_protocol_timeout", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.proxy_protocol_timeout", "emqttd.listeners", [ %%{default, "5s"}, {datatype, {duration, ms}} ]}. -{mapping, "mqtt.listener.ssl.tls_versions", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.ssl.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ssl.$name.tls_versions", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.ciphers", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.handshake_timeout", "emqttd.listeners", [ {default, "15s"}, {datatype, {duration, ms}} ]}. -{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.dhfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.keyfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.certfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.verify", "emqttd.listeners", [ {datatype, atom} ]}. -{mapping, "mqtt.listener.ssl.fail_if_no_peer_cert", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.fail_if_no_peer_cert", "emqttd.listeners", [ {datatype, {enum, [true, false]}} ]}. -{mapping, "mqtt.listener.http", "emqttd.listeners", [ - %% {default, 8083}, +{mapping, "listener.ssl.$name.secure_renegotiate", "emqttd.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.reuse_sessions", "emqttd.listeners", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.honor_cipher_order", "emqttd.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.peer_cert_as_username", "emqttd.listeners", [ + {datatype, {enum, [cn, dn]}} +]}. + +%%-------------------------------------------------------------------- +%% MQTT/WebSocket Listeners + +{mapping, "listener.ws.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ +{mapping, "listener.ws.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ - {default, 64}, +{mapping, "listener.ws.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https", "emqttd.listeners", [ - %%{default, 8084}, +{mapping, "listener.ws.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.ws.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT/WebSocket/SSL Listeners + +{mapping, "listener.wss.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [ +{mapping, "listener.wss.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [ - {default, 64}, +{mapping, "listener.wss.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [ - {default, 15}, +{mapping, "listener.wss.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.backlog", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.wss.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.wss.$name.handshake_timeout", "emqttd.listeners", [ + {default, "15s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "listener.wss.$name.keyfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.certfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.cacertfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [ +{mapping, "listener.wss.$name.verify", "emqttd.listeners", [ {datatype, atom} ]}. -{mapping, "mqtt.listener.https.fail_if_no_peer_cert", "emqttd.listeners", [ +{mapping, "listener.wss.$name.fail_if_no_peer_cert", "emqttd.listeners", [ {datatype, {enum, [true, false]}} ]}. {translation, "emqttd.listeners", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + + Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end, + + Access = fun(S) -> + [A, CIDR] = string:tokens(S, " "), + {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} + end, + + AccOpts = fun(Prefix) -> + case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of + [] -> []; + Rules -> [{access, [Access(Rule) || {_, Rule} <- Rules]}] + end + end, + + MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, + + ConnOpts = fun(Prefix) -> + Filter([{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, + {rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, + {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, + {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, + {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, + {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}]) + end, + LisOpts = fun(Prefix) -> Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, - {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}]) + {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)} | AccOpts(Prefix)]) end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, @@ -728,31 +968,48 @@ end}. L -> [list_to_atom(V) || V <- L] end, Filter([{versions, Versions}, - {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf), undefined}, + {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, + {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, + {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, - {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}]) + {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}, + {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)}, + {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)}, + {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}]) end, - Listeners = fun(Name) when is_atom(Name) -> - Key = "mqtt.listener." ++ atom_to_list(Name), - case cuttlefish:conf_get(Key, Conf, undefined) of - undefined -> - []; - Port -> - ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}, - {proxy_protocol, cuttlefish:conf_get(Key ++ ".proxy_protocol", Conf, undefined)}, - {proxy_protocol_timeout, cuttlefish:conf_get(Key ++ ".proxy_protocol_timeout", Conf, undefined)}]), - Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)], - [{Name, Port, case Name =:= ssl orelse Name =:= https of - true -> [{sslopts, SslOpts(Key)} | Opts]; - false -> Opts - end}] - end - end, - lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)]) + TcpListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, {sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + end + end, + + SslListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, + {sockopts, TcpOpts(Prefix)}, + {sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}] + end + end, + + lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] + ++ + [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) end}. %%-------------------------------------------------------------------- From 66c3c26d8b0741351bbfd969e8ff7c43e67c0e8a Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 15:04:37 +0800 Subject: [PATCH 08/24] Support 'mountpoint' --- include/emqttd.hrl | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index a02c86129..508712512 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -84,6 +84,7 @@ keepalive = 0, will_topic :: undefined | binary(), ws_initial_headers :: list({ws_header_key(), ws_header_val()}), + mountpoint :: undefined | binary(), connected_at :: erlang:timestamp() }). @@ -157,8 +158,8 @@ %%-------------------------------------------------------------------- -record(mqtt_route, - { topic :: binary(), - node :: node() + { topic :: binary(), + node :: node() }). -type(mqtt_route() :: #mqtt_route{}). @@ -168,11 +169,11 @@ %%-------------------------------------------------------------------- -record(mqtt_alarm, - { id :: binary(), - severity :: warning | error | critical, - title :: iolist() | binary(), - summary :: iolist() | binary(), - timestamp :: erlang:timestamp() + { id :: binary(), + severity :: warning | error | critical, + title :: iolist() | binary(), + summary :: iolist() | binary(), + timestamp :: erlang:timestamp() }). -type(mqtt_alarm() :: #mqtt_alarm{}). From 0d617c17e03e6d3e5234bd4aabf46b17e80da1d4 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 15:06:14 +0800 Subject: [PATCH 09/24] Use the emqttd_protocol:init/4 API --- src/emqttd_client.erl | 2 +- src/emqttd_ws_client.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 88f14c2d6..be08b6fed 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -112,7 +112,7 @@ do_init(Conn, Env, Peername) -> RateLimit = get_value(rate_limit, Conn:opts()), PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), Parser = emqttd_parser:initial_state(PacketSize), - ProtoState = emqttd_protocol:init(Peername, SendFun, Env), + ProtoState = emqttd_protocol:init(Conn, Peername, SendFun, Env), EnableStats = get_value(client_enable_stats, Env, false), ForceGcCount = emqttd_gc:conn_max_gc_count(), State = run_socket(#client_state{connection = Conn, diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 4beb7bc40..68b00e501 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -93,7 +93,7 @@ init([Env, WsPid, Req, ReplyChannel]) -> Headers = mochiweb_headers:to_list( mochiweb_request:get(headers, Req)), Conn = Req:get(connection), - ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel), + ProtoState = emqttd_protocol:init(Conn, Peername, send_fun(ReplyChannel), [{ws_initial_headers, Headers} | Env]), IdleTimeout = get_value(client_idle_timeout, Env, 30000), EnableStats = get_value(client_enable_stats, Env, false), From 496d046d52ca12e204a2623468e33114a800fe6d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 15:07:06 +0800 Subject: [PATCH 10/24] Improve the mqueue design --- src/emqttd_mqueue.erl | 60 ++++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 4f825329a..08e620a37 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -58,25 +58,27 @@ -define(HIGH_WM, 0.6). +-define(PQUEUE, priority_queue). + -type(priority() :: {iolist(), pos_integer()}). -type(option() :: {type, simple | priority} - | {max_length, pos_integer() | infinity} + | {max_length, non_neg_integer()} %% Max queue length | {priority, list(priority())} | {low_watermark, float()} %% Low watermark | {high_watermark, float()} %% High watermark - | {queue_qos0, boolean()}). %% Queue Qos0? + | {store_qos0, boolean()}). %% Queue Qos0? --type(stat() :: {max_len, infinity | pos_integer()} +-type(stat() :: {max_len, non_neg_integer()} | {len, non_neg_integer()} | {dropped, non_neg_integer()}). -record(mqueue, {type :: simple | priority, - name, q :: queue:queue() | priority_queue:q(), + name, q :: queue:queue() | ?PQUEUE:q(), %% priority table pseq = 0, priorities = [], %% len of simple queue - len = 0, max_len = infinity, + len = 0, max_len = 0, low_wm = ?LOW_WM, high_wm = ?HIGH_WM, qos0 = false, dropped = 0, alarm_fun}). @@ -89,19 +91,19 @@ -spec(new(iolist(), list(option()), fun()) -> mqueue()). new(Name, Opts, AlarmFun) -> Type = get_value(type, Opts, simple), - MaxLen = get_value(max_length, Opts, infinity), + MaxLen = get_value(max_length, Opts, 0), init_q(#mqueue{type = Type, name = iolist_to_binary(Name), len = 0, max_len = MaxLen, low_wm = low_wm(MaxLen, Opts), high_wm = high_wm(MaxLen, Opts), - qos0 = get_value(queue_qos0, Opts, false), + qos0 = get_value(store_qos0, Opts, false), alarm_fun = AlarmFun}, Opts). init_q(MQ = #mqueue{type = simple}, _Opts) -> MQ#mqueue{q = queue:new()}; init_q(MQ = #mqueue{type = priority}, Opts) -> Priorities = get_value(priority, Opts, []), - init_p(Priorities, MQ#mqueue{q = priority_queue:new()}). + init_p(Priorities, MQ#mqueue{q = ?PQUEUE:new()}). init_p([], MQ) -> MQ; @@ -113,13 +115,13 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> <> = <>, {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}. -low_wm(infinity, _Opts) -> - infinity; +low_wm(0, _Opts) -> + undefined; low_wm(MaxLen, Opts) -> round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)). -high_wm(infinity, _Opts) -> - infinity; +high_wm(0, _Opts) -> + undefined; high_wm(MaxLen, Opts) -> round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)). @@ -132,12 +134,12 @@ type(#mqueue{type = Type}) -> Type. is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; -is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q). +is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q). len(#mqueue{type = simple, len = Len}) -> Len; -len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q). +len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q). -max_len(#mqueue{max_len= MaxLen}) -> MaxLen. +max_len(#mqueue{max_len = MaxLen}) -> MaxLen. %% @doc Dropped of the mqueue -spec(dropped(mqueue()) -> non_neg_integer()). @@ -148,14 +150,14 @@ dropped(#mqueue{dropped = Dropped}) -> Dropped. stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) -> [{len, case Type of simple -> Len; - priority -> priority_queue:len(Q) + priority -> ?PQUEUE:len(Q) end} | [{max_len, MaxLen}, {dropped, Dropped}]]. %% @doc Enqueue a message. -spec(in(mqtt_message(), mqueue()) -> mqueue()). in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) when Len >= MaxLen -> @@ -166,43 +168,45 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, - max_len = infinity}) -> + max_len = 0}) -> case lists:keysearch(Topic, 1, Priorities) of {value, {_, Pri}} -> - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}; + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}; false -> {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end; in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, max_len = MaxLen}) -> case lists:keysearch(Topic, 1, Priorities) of {value, {_, Pri}} -> - case priority_queue:plen(Pri, Q) >= MaxLen of + case ?PQUEUE:plen(Pri, Q) >= MaxLen of true -> - {_, Q1} = priority_queue:out(Pri, Q), - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)}; + {_, Q1} = ?PQUEUE:out(Pri, Q), + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)}; false -> - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end; false -> {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end. out(MQ = #mqueue{type = simple, len = 0}) -> {empty, MQ}; -out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> +out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> {R, Q2} = queue:out(Q), {R, MQ#mqueue{q = Q2, len = Len - 1}}; out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> {R, Q2} = queue:out(Q), {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}; out(MQ = #mqueue{type = priority, q = Q}) -> - {R, Q2} = priority_queue:out(Q), + {R, Q2} = ?PQUEUE:out(Q), {R, MQ#mqueue{q = Q2}}. +maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) -> + MQ; maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun}) when Len > HighWM -> Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]), @@ -213,6 +217,8 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun maybe_set_alarm(MQ) -> MQ. +maybe_clear_alarm(MQ = #mqueue{low_wm = undefined}) -> + MQ; maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun}) when Len < LowWM -> MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))}; From 690f27a8b4393523fa8efeb9cdec70ef2004cb8d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 15:10:24 +0800 Subject: [PATCH 11/24] Support 'mount point' --- src/emqttd_protocol.erl | 104 ++++++++++++++++++++++++++++------------ 1 file changed, 73 insertions(+), 31 deletions(-) diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index b87274455..433825253 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -27,7 +27,7 @@ -import(proplists, [get_value/2, get_value/3]). %% API --export([init/3, info/1, stats/1, clientid/1, client/1, session/1]). +-export([init/3, init/4, info/1, stats/1, clientid/1, client/1, session/1]). -export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]). @@ -43,12 +43,12 @@ -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, clean_sess, proto_ver, proto_name, username, is_superuser, will_msg, keepalive, max_clientid_len, session, stats_data, - ws_initial_headers, connected_at}). + mountpoint, ws_initial_headers, connected_at}). -type(proto_state() :: #proto_state{}). -define(INFO_KEYS, [client_id, username, clean_sess, proto_ver, proto_name, - keepalive, will_msg, ws_initial_headers, connected_at]). + keepalive, will_msg, ws_initial_headers, mountpoint, connected_at]). -define(STATS_KEYS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -63,12 +63,22 @@ init(Peername, SendFun, Opts) -> WsInitialHeaders = get_value(ws_initial_headers, Opts), #proto_state{peername = Peername, sendfun = SendFun, - client_pid = self(), max_clientid_len = MaxLen, is_superuser = false, + client_pid = self(), ws_initial_headers = WsInitialHeaders, stats_data = #proto_stats{enable_stats = EnableStats}}. +init(Conn, Peername, SendFun, Opts) -> + enrich_opt(Conn:opts(), Conn, init(Peername, SendFun, Opts)). + +enrich_opt([], _Conn, State) -> + State; +enrich_opt([{mountpoint, MountPoint} | ConnOpts], Conn, State) -> + enrich_opt(ConnOpts, Conn, State#proto_state{mountpoint = MountPoint}); +enrich_opt([_ | ConnOpts], Conn, State) -> + enrich_opt(ConnOpts, Conn, State). + info(ProtoState) -> ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS). @@ -87,6 +97,7 @@ client(#proto_state{client_id = ClientId, keepalive = Keepalive, will_msg = WillMsg, ws_initial_headers = WsInitialHeaders, + mountpoint = MountPoint, connected_at = Time}) -> WillTopic = if WillMsg =:= undefined -> undefined; @@ -101,6 +112,7 @@ client(#proto_state{client_id = ClientId, keepalive = Keepalive, will_topic = WillTopic, ws_initial_headers = WsInitialHeaders, + mountpoint = MountPoint, connected_at = Time}. session(#proto_state{session = Session}) -> @@ -167,13 +179,13 @@ process(?CONNECT_PACKET(Var), State0) -> keep_alive = KeepAlive, client_id = ClientId} = Var, - State1 = State0#proto_state{proto_ver = ProtoVer, - proto_name = ProtoName, - username = Username, - client_id = ClientId, - clean_sess = CleanSess, - keepalive = KeepAlive, - will_msg = willmsg(Var), + State1 = State0#proto_state{proto_ver = ProtoVer, + proto_name = ProtoName, + username = Username, + client_id = ClientId, + clean_sess = CleanSess, + keepalive = KeepAlive, + will_msg = willmsg(Var, State0), connected_at = os:timestamp()}, {ReturnCode1, SessPresent, State3} = @@ -240,10 +252,11 @@ process(?SUBSCRIBE_PACKET(PacketId, []), State) -> %% TODO: refactor later... process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), - State = #proto_state{session = Session, - client_id = ClientId, + State = #proto_state{client_id = ClientId, username = Username, - is_superuser = IsSuperuser}) -> + is_superuser = IsSuperuser, + mountpoint = MountPoint, + session = Session}) -> Client = client(State), TopicTable = parse_topic_table(RawTopicTable), AllowDenies = if IsSuperuser -> []; @@ -256,7 +269,8 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), false -> case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of {ok, TopicTable1} -> - emqttd_session:subscribe(Session, PacketId, TopicTable1), {ok, State}; + emqttd_session:subscribe(Session, PacketId, mount(MountPoint, TopicTable1)), + {ok, State}; {stop, _} -> {ok, State} end @@ -267,12 +281,13 @@ process(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics), - State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + State = #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint, + session = Session}) -> case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of {ok, TopicTable} -> - emqttd_session:unsubscribe(Session, TopicTable); + emqttd_session:unsubscribe(Session, mount(MountPoint, TopicTable)); {stop, _} -> ok end, @@ -286,11 +301,12 @@ process(?PACKET(?DISCONNECT), State) -> {stop, normal, State#proto_state{will_msg = undefined}}. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), - #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint, + session = Session}) -> Msg = emqttd_message:from_packet(Username, ClientId, Packet), - emqttd_session:publish(Session, Msg); + emqttd_session:publish(Session, mount(MountPoint, Msg)); publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> with_puback(?PUBACK, Packet, State); @@ -299,11 +315,12 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> with_puback(?PUBREC, Packet, State). with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), - State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + State = #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint, + session = Session}) -> Msg = emqttd_message:from_packet(Username, ClientId, Packet), - case emqttd_session:publish(Session, Msg) of + case emqttd_session:publish(Session, mount(MountPoint, Msg)) of ok -> send(?PUBACK_PACKET(Type, PacketId), State); {error, Error} -> @@ -311,10 +328,12 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), end. -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). -send(Msg, State = #proto_state{client_id = ClientId, username = Username}) +send(Msg, State = #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint}) when is_record(Msg, mqtt_message) -> emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), - send(emqttd_message:to_packet(Msg), State); + send(emqttd_message:to_packet(unmount(MountPoint, Msg)), State); send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> @@ -371,8 +390,11 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> %% emqttd_cm:unreg(ClientId). ok. -willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> - emqttd_message:from_packet(Packet). +willmsg(Packet, #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) -> + case emqttd_message:from_packet(Packet) of + undefined -> undefined; + Msg -> mount(MountPoint, Msg) + end. %% Generate a client if if nulll maybe_set_clientid(State = #proto_state{client_id = NullId}) @@ -513,3 +535,23 @@ check_acl(subscribe, Topic, Client) -> sp(true) -> 1; sp(false) -> 0. + +%%-------------------------------------------------------------------- +%% Mount Point +%%-------------------------------------------------------------------- + +mount(undefined, Any) -> + Any; +mount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> + Msg#mqtt_message{topic = <>}; +mount(MountPoint, TopicTable) when is_list(TopicTable) -> + [{<>, Opts} || {Topic, Opts} <- TopicTable]. + +unmount(undefined, Any) -> + Any; +unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> + case catch split_binary(Topic, byte_size(MountPoint)) of + {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0}; + _ -> Msg + end. + From 06100ae6d531a8e43f975011fb4cdfc259d7587f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 15:15:45 +0800 Subject: [PATCH 12/24] Define 'MQueue' macro --- src/emqttd_session.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 85b027781..41ee456f4 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -77,6 +77,8 @@ -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3, handle_pre_hibernate/1]). +-define(MQueue, emqttd_mqueue). + -record(state, { %% Clean Session Flag @@ -124,7 +126,7 @@ %% QoS 1 and QoS 2 messages pending transmission to the Client. %% %% Optionally, QoS 0 messages pending transmission to the Client. - mqueue :: emqttd_mqueue:mqueue(), + mqueue :: ?MQueue:mqueue(), %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. awaiting_rel :: map(), @@ -257,12 +259,9 @@ stats(#state{max_subscriptions = MaxSubscriptions, {subscriptions, maps:size(Subscriptions)}, {max_inflight, MaxInflight}, {inflight_len, Inflight:size()}, - {max_mqueue, case emqttd_mqueue:max_len(MQueue) of - infinity -> 0; - Len -> Len - end}, - {mqueue_len, emqttd_mqueue:len(MQueue)}, - {mqueue_dropped, emqttd_mqueue:dropped(MQueue)}, + {max_mqueue, ?MQueue:max_len(MQueue)}, + {mqueue_len, ?MQueue:len(MQueue)}, + {mqueue_dropped, ?MQueue:dropped(MQueue)}, {max_awaiting_rel, MaxAwaitingRel}, {awaiting_rel_len, maps:size(AwaitingRel)}, {deliver_msg, get(deliver_msg)}, @@ -286,7 +285,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), ForceGcCount = emqttd_gc:conn_max_gc_count(), - MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), + MQueue = ?MQueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), State = #state{clean_sess = CleanSess, binding = binding(ClientPid), client_id = ClientId, @@ -698,7 +697,7 @@ dispatch(Msg = #mqtt_message{qos = QoS}, enqueue_msg(Msg, State = #state{mqueue = Q}) -> inc_stats(enqueue_msg), - State#state{mqueue = emqttd_mqueue:in(Msg, Q)}. + State#state{mqueue = ?MQueue:in(Msg, Q)}. %%-------------------------------------------------------------------- %% Deliver @@ -755,7 +754,7 @@ dequeue(State = #state{inflight = Inflight}) -> end. dequeue2(State = #state{mqueue = Q}) -> - case emqttd_mqueue:out(Q) of + case ?MQueue:out(Q) of {empty, _Q} -> State; {{value, Msg}, Q1} -> From 3c8de09ba3c66c88d4abc8b60521c001ba7100e5 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 15:20:32 +0800 Subject: [PATCH 13/24] Rename the env 'queue' to 'mqueue' --- src/emqttd_session.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 41ee456f4..316a97741 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -281,7 +281,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> true = link(ClientPid), init_stats([deliver_msg, enqueue_msg]), {ok, Env} = emqttd:env(session), - {ok, QEnv} = emqttd:env(queue), + {ok, QEnv} = emqttd:env(mqueue), MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), ForceGcCount = emqttd_gc:conn_max_gc_count(), From f194f924180ea43c0acfbd74d13e05e91fe40ce7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 16:30:23 +0800 Subject: [PATCH 14/24] Shutdown the connection if no more data received --- src/emqttd_client.erl | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index be08b6fed..98db870e7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -55,7 +55,7 @@ %% Unused fields: connname, peerhost, peerport -record(client_state, {connection, peername, conn_state, await_recv, rate_limit, packet_size, parser, proto_state, - keepalive, enable_stats, force_gc_count}). + keepalive, enable_stats, idle_timeout, force_gc_count}). -define(INFO_KEYS, [peername, conn_state, await_recv]). @@ -114,6 +114,7 @@ do_init(Conn, Env, Peername) -> Parser = emqttd_parser:initial_state(PacketSize), ProtoState = emqttd_protocol:init(Conn, Peername, SendFun, Env), EnableStats = get_value(client_enable_stats, Env, false), + IdleTimout = get_value(client_idle_timeout, Env, 30000), ForceGcCount = emqttd_gc:conn_max_gc_count(), State = run_socket(#client_state{connection = Conn, peername = Peername, @@ -124,8 +125,8 @@ do_init(Conn, Env, Peername) -> parser = Parser, proto_state = ProtoState, enable_stats = EnableStats, + idle_timeout = IdleTimout, force_gc_count = ForceGcCount}), - IdleTimout = get_value(client_idle_timeout, Env, 30000), gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, {backoff, 2000, 2000, 20000}). @@ -275,9 +276,11 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(Reason, #client_state{connection = Conn, - keepalive = KeepAlive, - proto_state = ProtoState}) -> +terminate(Reason, State = #client_state{connection = Conn, + keepalive = KeepAlive, + proto_state = ProtoState}) -> + + ?LOG(debug, "Terminated for ~p", [Reason], State), Conn:fast_close(), emqttd_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -300,12 +303,13 @@ code_change(_OldVsn, State, _Extra) -> received(<<>>, State) -> {noreply, gc(State), hibernate}; -received(Bytes, State = #client_state{parser = Parser, - packet_size = PacketSize, - proto_state = ProtoState}) -> +received(Bytes, State = #client_state{parser = Parser, + packet_size = PacketSize, + proto_state = ProtoState, + idle_timeout = IdleTimeout}) -> case catch emqttd_parser:parse(Bytes, Parser) of {more, NewParser} -> - {noreply, run_socket(State#client_state{parser = NewParser}), hibernate}; + {noreply, run_socket(State#client_state{parser = NewParser}), IdleTimeout}; {ok, Packet, Rest} -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of From 440307081bbff08cbd3c5dd4cc480b8c64582b40 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 23 Mar 2017 18:37:04 +0800 Subject: [PATCH 15/24] Check if the node is in cluster before remove --- src/emqttd_cluster.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/emqttd_cluster.erl b/src/emqttd_cluster.erl index 7990a5d51..ab40875aa 100644 --- a/src/emqttd_cluster.erl +++ b/src/emqttd_cluster.erl @@ -73,12 +73,14 @@ remove(Node) when Node =:= node() -> {error, {cannot_remove_self, Node}}; remove(Node) -> - case rpc:call(Node, ?MODULE, prepare, []) of + case is_clustered(Node) andalso rpc:call(Node, ?MODULE, prepare, []) of ok -> case emqttd_mnesia:remove_from_cluster(Node) of ok -> rpc:call(Node, ?MODULE, reboot, []); Error -> Error end; + false -> + {error, node_not_in_cluster}; {badrpc, nodedown} -> emqttd_mnesia:remove_from_cluster(Node); {badrpc, Reason} -> From 4f0bd74f878e42e8a41a26a8355327ff7787efeb Mon Sep 17 00:00:00 2001 From: turtled Date: Wed, 29 Mar 2017 15:28:24 +0800 Subject: [PATCH 16/24] #777 --- etc/emq.conf | 3 +++ priv/emq.schema | 6 ++++++ src/emqttd_access_control.erl | 14 ++++---------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 6b8e784f4..286bd4163 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -100,6 +100,9 @@ log.crash.file = {{ platform_log_dir }}/crash.log ## Allow Anonymous authentication mqtt.allow_anonymous = true +## ACL nomatch +mqtt.acl_nomatch = allow + ## Default ACL File mqtt.acl_file = {{ platform_etc_dir }}/acl.conf diff --git a/priv/emq.schema b/priv/emq.schema index 2ed196e2c..2760438f9 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -307,6 +307,12 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc ACL nomatch +{mapping, "mqtt.acl_nomatch", "emqttd.acl_nomatch", [ + {default, allow}, + {datatype, {enum, [allow, deny]}} +]}. + %% @doc Default ACL File {mapping, "mqtt.acl_file", "emqttd.acl_file", [ {datatype, string}, diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 65d0c76f5..283d42a78 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -71,16 +71,10 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> PubSub :: pubsub(), Topic :: binary()). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - case lookup_mods(acl) of - [] -> case emqttd:env(allow_anonymous, false) of - true -> allow; - false -> deny - end; - AclMods -> check_acl(Client, PubSub, Topic, AclMods) - end. -check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) -> - lager:error("ACL: nomatch for ~s ~s ~s", [ClientId, PubSub, Topic]), - allow; + check_acl(Client, PubSub, Topic, lookup_mods(acl)). + +check_acl(_Client, _PubSub, _Topic, []) -> + emqttd:env(acl_nomatch, allow); check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> case Mod:check_acl({Client, PubSub, Topic}, State) of allow -> allow; From 5c866d91b4ef3a69d73f4e843069a6bf7d5cf4ef Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 30 Mar 2017 09:43:35 +0800 Subject: [PATCH 17/24] support cascading multiple acl modules --- etc/acl.conf | 3 --- src/emqttd_acl_internal.erl | 6 +++--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/etc/acl.conf b/etc/acl.conf index 3cb3b8c52..2560bf80d 100644 --- a/etc/acl.conf +++ b/etc/acl.conf @@ -24,6 +24,3 @@ {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}. -{allow, all}. - - diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index 1cd32c0f4..5305985c4 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -30,7 +30,7 @@ -define(ACL_RULE_TAB, mqtt_acl_rule). --record(state, {config, nomatch = allow}). +-record(state, {config}). %%-------------------------------------------------------------------- %% API @@ -86,11 +86,11 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> State :: #state{}). check_acl(_Who, #state{config = undefined}) -> allow; -check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) -> +check_acl({Client, PubSub, Topic}, #state{}) -> case match(Client, Topic, lookup(PubSub)) of {matched, allow} -> allow; {matched, deny} -> deny; - nomatch -> Default + nomatch -> ignore end. lookup(PubSub) -> From 45ca461fd9957bc2b26e1fce82306a85ba660243 Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 30 Mar 2017 14:41:29 +0800 Subject: [PATCH 18/24] Add ignore self publish message --- etc/emq.conf | 3 +++ priv/emq.schema | 9 ++++++++- src/emqttd_session.erl | 16 ++++++++++++++-- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/etc/emq.conf b/etc/emq.conf index 286bd4163..dcdc289f4 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -173,6 +173,9 @@ mqtt.session.enable_stats = off ## s - second mqtt.session.expiry_interval = 2h +## Ignore message from self publish +mqtt.session.ignore_loop_deliver = false + ##-------------------------------------------------------------------- ## MQTT Message Queue ##-------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index 2760438f9..0f59cad69 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -436,6 +436,12 @@ end}. {datatype, {duration, ms}} ]}. +%% @doc Ignore message from self publish +{mapping, "mqtt.session.ignore_loop_deliver", "emqttd.session", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqttd.session", fun(Conf) -> [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)}, {upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, @@ -444,7 +450,8 @@ end}. {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, {enable_stats, cuttlefish:conf_get("mqtt.session.enable_stats", Conf)}, - {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}] + {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}, + {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}] end}. %%-------------------------------------------------------------------- diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 316a97741..ba3c42036 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -152,7 +152,9 @@ %% Force GC Count force_gc_count :: undefined | integer(), - created_at :: erlang:timestamp() + created_at :: erlang:timestamp(), + + ignore_loop_deliver = false :: boolean() }). -define(TIMEOUT, 60000). @@ -284,6 +286,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> {ok, QEnv} = emqttd:env(mqueue), MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), + IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), ForceGcCount = emqttd_gc:conn_max_gc_count(), MQueue = ?MQueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), State = #state{clean_sess = CleanSess, @@ -304,7 +307,8 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> expiry_interval = get_value(expiry_interval, Env), enable_stats = EnableStats, force_gc_count = ForceGcCount, - created_at = os:timestamp()}, + created_at = os:timestamp(), + ignore_loop_deliver = IgnoreLoopDeliver}, emqttd_sm:register_session(ClientId, CleanSess, info(State)), emqttd_hooks:run('session.created', [ClientId, Username]), {ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}}. @@ -525,6 +529,14 @@ handle_cast({destroy, ClientId}, handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). +%% Dispatch message from self publish +handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, + State = #state{client_id = ClientId, + ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) -> + case IgnoreLoopDeliver of + true -> {noreply, State, hibernate}; + false -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate} + end; %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate}; From 7d618e78ff68744a602410dc21c4ab19593cc1af Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 11 Apr 2017 16:02:21 +0800 Subject: [PATCH 19/24] support bcrypt --- Makefile | 3 ++- src/emqttd_auth_mod.erl | 11 +++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 13d534dae..abd416178 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker PROJECT_VERSION = 2.2 -DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog +DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc @@ -12,6 +12,7 @@ dep_esockd = git https://github.com/emqtt/esockd emq22 dep_mochiweb = git https://github.com/emqtt/mochiweb emq22 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog +dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master ERLC_OPTS += +'{parse_transform, lager_transform}' diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index ff7f79a20..455ff60e6 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -61,8 +61,15 @@ passwd_hash(sha, Password) -> passwd_hash(sha256, Password) -> hexstring(crypto:hash(sha256, Password)); passwd_hash(pbkdf2,{Salt, Password, Macfun, Iterations, Dklen}) -> - {ok,Hexstring} = pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen), - pbkdf2:to_hex(Hexstring). + case pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen) of + {ok,Hexstring} -> pbkdf2:to_hex(Hexstring); + {error, Error} -> lager:error("PasswdHash with pbkdf2 error:~p", [Error]), error + end; +passwd_hash(bcrypt, {Salt, Password}) -> + case bcrypt:hashpw(Salt, Password) of + {ok, HashPassword} -> list_to_binary(HashPassword); + {error, Error}-> lager:error("PasswdHash with bcrypt error:~p", [Error]), error + end. hexstring(<>) -> iolist_to_binary(io_lib:format("~32.16.0b", [X])); From 8d6457996c7485bede5291ae42277e9a2c0b1ed6 Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 11 Apr 2017 16:02:48 +0800 Subject: [PATCH 20/24] support bcrypt --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 36ae52ab0..28b47bf58 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq22"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","emq22"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}} +{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq22"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","emq22"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} ]}. {erl_opts, [{parse_transform,lager_transform}]}. From ca676880a81e20d098101d82c024d171586f1c01 Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 11 Apr 2017 16:39:52 +0800 Subject: [PATCH 21/24] Support bcrypt --- src/emqttd_auth_mod.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index 455ff60e6..ee3b27ddf 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -66,7 +66,7 @@ passwd_hash(pbkdf2,{Salt, Password, Macfun, Iterations, Dklen}) -> {error, Error} -> lager:error("PasswdHash with pbkdf2 error:~p", [Error]), error end; passwd_hash(bcrypt, {Salt, Password}) -> - case bcrypt:hashpw(Salt, Password) of + case bcrypt:hashpw(Password, Salt) of {ok, HashPassword} -> list_to_binary(HashPassword); {error, Error}-> lager:error("PasswdHash with bcrypt error:~p", [Error]), error end. From ba0c08517f1fd088c1bdb2f14cde0a2a73cdf07b Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 25 Apr 2017 10:41:43 +0800 Subject: [PATCH 22/24] v2.2 --- Makefile | 2 +- src/emqttd.app.src | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Makefile b/Makefile index a1c202a81..abd416178 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker -PROJECT_VERSION = 2.1.2 +PROJECT_VERSION = 2.2 DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt diff --git a/src/emqttd.app.src b/src/emqttd.app.src index ecb210e53..4b3e2a1f6 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,12 +1,12 @@ -{application, emqttd, [ - {description, "Erlang MQTT Broker"}, - {vsn, "2.2"}, - {modules, []}, - {registered, [emqttd_sup]}, - {applications, [kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog,pbkdf2]}, - {env, []}, - {mod, {emqttd_app, []}}, - {maintainers, ["Feng Lee "]}, - {licenses, ["Apache-2.0"]}, - {links, [{"Github", "https://github.com/emqtt/emqttd"}]} -]}. +{application,emqttd, + [{description,"Erlang MQTT Broker"}, + {vsn,"2.2"}, + {modules,[]}, + {registered,[emqttd_sup]}, + {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, + lager_syslog,pbkdf2]}, + {env,[]}, + {mod,{emqttd_app,[]}}, + {maintainers,["Feng Lee "]}, + {licenses,["Apache-2.0"]}, + {links,[{"Github","https://github.com/emqtt/emqttd"}]}]}. From cdbcc84d234a96da03810e4f83e4d939485054a1 Mon Sep 17 00:00:00 2001 From: turtled Date: Thu, 27 Apr 2017 09:22:02 +0800 Subject: [PATCH 23/24] emqttd.app.src add bcrypt --- src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index 4b3e2a1f6..3a7ed3482 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -4,7 +4,7 @@ {modules,[]}, {registered,[emqttd_sup]}, {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, - lager_syslog,pbkdf2]}, + lager_syslog,pbkdf2,bcrypt]}, {env,[]}, {mod,{emqttd_app,[]}}, {maintainers,["Feng Lee "]}, From 1691e9a9b628e3b61c5d9416823a50a2f8b1a916 Mon Sep 17 00:00:00 2001 From: turtled Date: Tue, 2 May 2017 14:33:23 +0800 Subject: [PATCH 24/24] Update node name --- etc/emq.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/emq.conf b/etc/emq.conf index dcdc289f4..40341b83f 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -18,7 +18,7 @@ cluster.multicast = 239.192.0.1:44369 ##-------------------------------------------------------------------- ## Node name -node.name = emq@127.0.0.1 +node.name = emqttd@127.0.0.1 ## Cookie for distributed node node.cookie = emqsecretcookie