diff --git a/Makefile b/Makefile index 4f1f637a2..abd416178 100644 --- a/Makefile +++ b/Makefile @@ -1,18 +1,18 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker -PROJECT_VERSION = 2.1.2 - -DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog +PROJECT_VERSION = 2.2 +DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt dep_goldrush = git https://github.com/basho/goldrush 0.1.9 dep_gproc = git https://github.com/uwiger/gproc dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master -dep_esockd = git https://github.com/emqtt/esockd v4.2 -dep_mochiweb = git https://github.com/emqtt/mochiweb +dep_esockd = git https://github.com/emqtt/esockd emq22 +dep_mochiweb = git https://github.com/emqtt/mochiweb emq22 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog +dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master ERLC_OPTS += +'{parse_transform, lager_transform}' diff --git a/etc/acl.conf b/etc/acl.conf index 3cb3b8c52..2560bf80d 100644 --- a/etc/acl.conf +++ b/etc/acl.conf @@ -24,6 +24,3 @@ {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}. -{allow, all}. - - diff --git a/etc/emq.conf b/etc/emq.conf index 196ea99f3..40341b83f 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -1,8 +1,18 @@ ##=================================================================== -## EMQ Configuration R2.1 +## EMQ Configuration R2.2 ##=================================================================== +##-------------------------------------------------------------------- +## Cluster +##-------------------------------------------------------------------- + +## The cluster Id +cluster.id = emq + +## The multicast address and port. +cluster.multicast = 239.192.0.1:44369 + ##-------------------------------------------------------------------- ## Node Args ##-------------------------------------------------------------------- @@ -11,7 +21,7 @@ node.name = emqttd@127.0.0.1 ## Cookie for distributed node -node.cookie = emq_dist_cookie +node.cookie = emqsecretcookie ## SMP support: enable, auto, disable node.smp = auto @@ -50,8 +60,8 @@ node.crash_dump = {{ platform_log_dir }}/crash.dump node.dist_net_ticktime = 60 ## Distributed node port range -## node.dist_listen_min = 6369 -## node.dist_listen_max = 6369 +node.dist_listen_min = 6369 +node.dist_listen_max = 6369 ##-------------------------------------------------------------------- ## Log @@ -63,15 +73,15 @@ log.dir = {{ platform_log_dir }} ## Console log. Enum: off, file, console, both log.console = console +## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency +log.console.level = error + ## Syslog. Enum: on, off log.syslog = on ## syslog level. Enum: debug, info, notice, warning, error, critical, alert, emergency log.syslog.level = error -## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency -log.console.level = error - ## Console log file ## log.console.file = {{ platform_log_dir }}/console.log @@ -90,6 +100,9 @@ log.crash.file = {{ platform_log_dir }}/crash.log ## Allow Anonymous authentication mqtt.allow_anonymous = true +## ACL nomatch +mqtt.acl_nomatch = allow + ## Default ACL File mqtt.acl_file = {{ platform_etc_dir }}/acl.conf @@ -120,6 +133,9 @@ mqtt.conn.force_gc_count = 100 ## Client Idle Timeout (Second) mqtt.client.idle_timeout = 30s +## Max publish rate of Messages +## mqtt.client.max_publish_rate = 5 + ## Enable client Stats: on | off mqtt.client.enable_stats = off @@ -127,6 +143,9 @@ mqtt.client.enable_stats = off ## MQTT Session ##-------------------------------------------------------------------- +## Max Number of Subscriptions, 0 means no limit. +mqtt.session.max_subscriptions = 0 + ## Upgrade QoS? mqtt.session.upgrade_qos = off @@ -154,28 +173,31 @@ mqtt.session.enable_stats = off ## s - second mqtt.session.expiry_interval = 2h +## Ignore message from self publish +mqtt.session.ignore_loop_deliver = false + ##-------------------------------------------------------------------- -## MQTT Queue +## MQTT Message Queue ##-------------------------------------------------------------------- ## Type: simple | priority -mqtt.queue.type = simple +mqtt.mqueue.type = simple ## Topic Priority: 0~255, Default is 0 -## mqtt.queue.priority = topic/1=10,topic/2=8 +## mqtt.mqueue.priority = topic/1=10,topic/2=8 ## Max queue length. Enqueued messages when persistent client disconnected, -## or inflight window is full. -mqtt.queue.max_length = infinity +## or inflight window is full. 0 means no limit. +mqtt.mqueue.max_length = 0 ## Low-water mark of queued messages -mqtt.queue.low_watermark = 20% +mqtt.mqueue.low_watermark = 20% ## High-water mark of queued messages -mqtt.queue.high_watermark = 60% +mqtt.mqueue.high_watermark = 60% ## Queue Qos0 messages? -mqtt.queue.qos0 = true +mqtt.mqueue.store_qos0 = true ##-------------------------------------------------------------------- ## MQTT Broker and PubSub @@ -216,63 +238,210 @@ mqtt.plugins.loaded_file = {{ platform_data_dir }}/loaded_plugins ## MQTT Listeners ##-------------------------------------------------------------------- -## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 -mqtt.listener.tcp = 1883 +##-------------------------------------------------------------------- +## External TCP Listener + +## External TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 +listener.tcp.external = 0.0.0.0:1883 ## Size of acceptor pool -mqtt.listener.tcp.acceptors = 8 +listener.tcp.external.acceptors = 16 ## Maximum number of concurrent clients -mqtt.listener.tcp.max_clients = 1024 +listener.tcp.external.max_clients = 102400 + +#listener.tcp.external.mountpoint = external/ ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.tcp.rate_limit = 100,10 +#listener.tcp.external.rate_limit = 100,10 + +#listener.tcp.external.access.1 = allow 192.168.0.0/24 + +listener.tcp.external.access.2 = allow all + +## Proxy Protocol V1/2 +## listener.tcp.external.proxy_protocol = on +## listener.tcp.external.proxy_protocol_timeout = 3s ## TCP Socket Options -mqtt.listener.tcp.backlog = 1024 -## mqtt.listener.tcp.recbuf = 4096 -## mqtt.listener.tcp.sndbuf = 4096 -## mqtt.listener.tcp.buffer = 4096 -## mqtt.listener.tcp.nodelay = true +listener.tcp.external.backlog = 1024 -## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 -mqtt.listener.ssl = 8883 +#listener.tcp.external.recbuf = 4KB + +#listener.tcp.external.sndbuf = 4KB + +listener.tcp.external.buffer = 4KB + +listener.tcp.external.nodelay = true + +##-------------------------------------------------------------------- +## Internal TCP Listener + +## Internal TCP Listener: 11883, 127.0.0.1:11883, ::1:11883 +listener.tcp.internal = 127.0.0.1:11883 ## Size of acceptor pool -mqtt.listener.ssl.acceptors = 4 +listener.tcp.internal.acceptors = 16 ## Maximum number of concurrent clients -mqtt.listener.ssl.max_clients = 512 +listener.tcp.internal.max_clients = 102400 + +#listener.tcp.external.mountpoint = internal/ ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.ssl.rate_limit = 100,10 +## listener.tcp.internal.rate_limit = 1000,100 + +## TCP Socket Options +listener.tcp.internal.backlog = 512 + +listener.tcp.internal.tune_buffer = on + +listener.tcp.internal.buffer = 1MB + +listener.tcp.internal.recbuf = 4KB + +listener.tcp.internal.sndbuf = 1MB + +listener.tcp.internal.nodelay = true + +##-------------------------------------------------------------------- +## External SSL Listener + +## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 +listener.ssl.external = 8883 + +## Size of acceptor pool +listener.ssl.external.acceptors = 16 + +## Maximum number of concurrent clients +listener.ssl.external.max_clients = 1024 + +## listener.ssl.external.mountpoint = inbound/ + +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## listener.ssl.external.rate_limit = 100,10 + +## Proxy Protocol V1/2 +## listener.ssl.external.proxy_protocol = on +## listener.ssl.external.proxy_protocol_timeout = 3s + +listener.ssl.external.access.1 = allow all + +### SSL Options. See http://erlang.org/doc/man/ssl.html ## Configuring SSL Options. See http://erlang.org/doc/man/ssl.html ### TLS only for POODLE attack -mqtt.listener.ssl.tls_versions = tlsv1.2,tlsv1.1,tlsv1 -mqtt.listener.ssl.handshake_timeout = 15s -mqtt.listener.ssl.keyfile = {{ platform_etc_dir }}/certs/key.pem -mqtt.listener.ssl.certfile = {{ platform_etc_dir }}/certs/cert.pem -## mqtt.listener.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem -## mqtt.listener.ssl.verify = verify_peer -## mqtt.listener.ssl.fail_if_no_peer_cert = true +## listener.ssl.external.tls_versions = tlsv1.2,tlsv1.1,tlsv1 -## HTTP and WebSocket Listener -mqtt.listener.http = 8083 -mqtt.listener.http.acceptors = 4 -mqtt.listener.http.max_clients = 64 +### The Ephemeral Diffie-Helman key exchange is a very effective way of +### ensuring Forward Secrecy by exchanging a set of keys that never hit +### the wire. Since the DH key is effectively signed by the private key, +### it needs to be at least as strong as the private key. In addition, +### the default DH groups that most of the OpenSSL installations have +### are only a handful (since they are distributed with the OpenSSL +### package that has been built for the operating system it’s running on) +### and hence predictable (not to mention, 1024 bits only). -## HTTP(SSL) Listener -mqtt.listener.https = 8084 -mqtt.listener.https.acceptors = 4 -mqtt.listener.https.max_clients = 64 -mqtt.listener.https.handshake_timeout = 15 -mqtt.listener.https.keyfile = {{ platform_etc_dir }}/certs/key.pem -mqtt.listener.https.certfile = {{ platform_etc_dir }}/certs/cert.pem -## mqtt.listener.https.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem +### In order to escape this situation, first we need to generate a fresh, +### strong DH group, store it in a file and then use the option above, +### to force our SSL application to use the new DH group. Fortunately, +### OpenSSL provides us with a tool to do that. Simply run: +### openssl dhparam -out dh-params.pem 2048 -## mqtt.listener.https.verify = verify_peer -## mqtt.listener.https.fail_if_no_peer_cert = true +listener.ssl.external.handshake_timeout = 15s + +listener.ssl.external.keyfile = {{ platform_etc_dir }}/certs/key.pem + +listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem + +## listener.ssl.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem + +## listener.ssl.external.dhfile = {{ platform_etc_dir }}/certs/dh-params.pem + +## listener.ssl.external.verify = verify_peer + +## listener.ssl.external.fail_if_no_peer_cert = true + +### This is the single most important configuration option of an Erlang SSL application. +### Ciphers (and their ordering) define the way the client and server encrypt information +### over the wire, from the initial Diffie-Helman key exchange, the session key encryption +### algorithm and the message digest algorithm. Selecting a good cipher suite is critical +### for the application’s data security, confidentiality and performance. +### The cipher list above offers: +### +### A good balance between compatibility with older browsers. It can get stricter for Machine-To-Machine scenarios. +### Perfect Forward Secrecy. +### No old/insecure encryption and HMAC algorithms +### +### Most of it was copied from Mozilla’s Server Side TLS article +## listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA + +### SSL parameter renegotiation is a feature that allows a client and +### a server to renegotiate the parameters of the SSL connection on the fly. +### RFC 5746 defines a more secure way of doing this. By enabling secure renegotiation, +### you drop support for the insecure renegotiation, prone to MitM attacks. +## listener.ssl.external.secure_renegotiate = off + +### A performance optimization setting, it allows clients to reuse +### pre-existing sessions, instead of initializing new ones. +### Read more about it here. +## listener.ssl.external.reuse_sessions = on + +### An important security setting, it forces the cipher to be set based on +### the server-specified order instead of the client-specified order, +### hence enforcing the (usually more properly configured) security +### ordering of the server administrator. +## listener.ssl.external.honor_cipher_order = on + +### Use the CN or DN value from the client certificate as a username. +### Notice: 'verify' should be configured as 'verify_peer' +## listener.ssl.external.peer_cert_as_username = cn + +##-------------------------------------------------------------------- +## External MQTT/WebSocket Listener + +listener.ws.external = 8083 + +listener.ws.external.acceptors = 4 + +listener.ws.external.max_clients = 64 + +listener.ws.external.access.1 = allow all + +## TCP Options +listener.ws.external.backlog = 1024 + +listener.ws.external.recbuf = 4KB + +listener.ws.external.sndbuf = 4KB + +listener.ws.external.buffer = 4KB + +listener.ws.external.nodelay = true + +##-------------------------------------------------------------------- +## External MQTT/WebSocket/SSL Listener + +listener.wss.external = 8084 + +listener.wss.external.acceptors = 4 + +listener.wss.external.max_clients = 64 + +listener.wss.external.access.1 = allow all + +## SSL Options +listener.wss.external.handshake_timeout = 15s + +listener.wss.external.keyfile = {{ platform_etc_dir }}/certs/key.pem + +listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem + +## listener.wss.external.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem + +## listener.wss.external.verify = verify_peer + +## listener.wss.external.fail_if_no_peer_cert = true ##------------------------------------------------------------------- ## System Monitor diff --git a/include/emqttd.hrl b/include/emqttd.hrl index a02c86129..508712512 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -84,6 +84,7 @@ keepalive = 0, will_topic :: undefined | binary(), ws_initial_headers :: list({ws_header_key(), ws_header_val()}), + mountpoint :: undefined | binary(), connected_at :: erlang:timestamp() }). @@ -157,8 +158,8 @@ %%-------------------------------------------------------------------- -record(mqtt_route, - { topic :: binary(), - node :: node() + { topic :: binary(), + node :: node() }). -type(mqtt_route() :: #mqtt_route{}). @@ -168,11 +169,11 @@ %%-------------------------------------------------------------------- -record(mqtt_alarm, - { id :: binary(), - severity :: warning | error | critical, - title :: iolist() | binary(), - summary :: iolist() | binary(), - timestamp :: erlang:timestamp() + { id :: binary(), + severity :: warning | error | critical, + title :: iolist() | binary(), + summary :: iolist() | binary(), + timestamp :: erlang:timestamp() }). -type(mqtt_alarm() :: #mqtt_alarm{}). diff --git a/priv/emq.schema b/priv/emq.schema index 8eb9f8ef5..ddddfff24 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1,13 +1,37 @@ %%-*- mode: erlang -*- %% EMQ config mapping +%%-------------------------------------------------------------------- +%% Cluster +%%-------------------------------------------------------------------- + +%% Cluster ID +{mapping, "cluster.id", "emqttd.cluster", [ + {default, "emq"}, + {datatype, string} +]}. + +%% Cluster Multicast Addr +{mapping, "cluster.multicast", "emqttd.cluster", [ + {default, "239.192.0.1:44369"}, + {datatype, string} +]}. + +{translation, "emqttd.cluster", fun(Conf) -> + Multicast = cuttlefish:conf_get("cluster.multicast", Conf), + [Addr, Port] = string:tokens(Multicast, ":"), + {ok, Ip} = inet_parse:address(Addr), + [{id, cuttlefish:conf_get("cluster.id", Conf)}, + {multicast, {Ip, list_to_integer(Port)}}] +end}. + %%-------------------------------------------------------------------- %% Erlang Node %%-------------------------------------------------------------------- %% @doc Erlang node name {mapping, "node.name", "vm_args.-name", [ - {default, "emqttd@127.0.0.1"} + {default, "emq@127.0.0.1"} ]}. %% @doc Secret cookie for distributed erlang node @@ -282,6 +306,12 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc ACL nomatch +{mapping, "mqtt.acl_nomatch", "emqttd.acl_nomatch", [ + {default, allow}, + {datatype, {enum, [allow, deny]}} +]}. + %% @doc Default ACL File {mapping, "mqtt.acl_file", "emqttd.acl_file", [ {datatype, string}, @@ -328,6 +358,12 @@ end}. %% MQTT Client %%-------------------------------------------------------------------- +%% @doc Max Publish Rate of Message +{mapping, "mqtt.client.max_publish_rate", "emqttd.client", [ + {default, 0}, + {datatype, integer} +]}. + %% @doc Client Idle Timeout. {mapping, "mqtt.client.idle_timeout", "emqttd.client", [ {default, "30s"}, @@ -340,9 +376,9 @@ end}. {datatype, flag} ]}. -%% @doc Client {translation, "emqttd.client", fun(Conf) -> - [{client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, + [{max_publish_rate, cuttlefish:conf_get("mqtt.client.max_publish_rate", Conf)}, + {client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)}, {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}] end}. @@ -350,6 +386,12 @@ end}. %% MQTT Session %%-------------------------------------------------------------------- +%% @doc Max Number of Subscriptions Allowed +{mapping, "mqtt.session.max_subscriptions", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + %% @doc Upgrade QoS? {mapping, "mqtt.session.upgrade_qos", "emqttd.session", [ {default, off}, @@ -393,72 +435,80 @@ end}. {datatype, {duration, ms}} ]}. +%% @doc Ignore message from self publish +{mapping, "mqtt.session.ignore_loop_deliver", "emqttd.session", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + {translation, "emqttd.session", fun(Conf) -> - [{upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, + [{max_subscriptions, cuttlefish:conf_get("mqtt.session.max_subscriptions", Conf)}, + {upgrade_qos, cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)}, {max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, {enable_stats, cuttlefish:conf_get("mqtt.session.enable_stats", Conf)}, - {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}] + {expiry_interval, cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}, + {ignore_loop_deliver, cuttlefish:conf_get("mqtt.session.ignore_loop_deliver", Conf)}] end}. %%-------------------------------------------------------------------- -%% MQTT Queue +%% MQTT MQueue %%-------------------------------------------------------------------- %% @doc Type: simple | priority -{mapping, "mqtt.queue.type", "emqttd.queue", [ +{mapping, "mqtt.mqueue.type", "emqttd.mqueue", [ {default, simple}, {datatype, atom} ]}. %% @doc Topic Priority: 0~255, Default is 0 -{mapping, "mqtt.queue.priority", "emqttd.queue", [ +{mapping, "mqtt.mqueue.priority", "emqttd.mqueue", [ {default, ""}, {datatype, string} ]}. -%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. -{mapping, "mqtt.queue.max_length", "emqttd.queue", [ - {default, infinity}, - {datatype, [integer, {atom, infinity}]} +%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. 0 means no limit. +{mapping, "mqtt.mqueue.max_length", "emqttd.mqueue", [ + {default, 0}, + {datatype, integer} ]}. %% @doc Low-water mark of queued messages -{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [ +{mapping, "mqtt.mqueue.low_watermark", "emqttd.mqueue", [ {default, "20%"}, {datatype, string} ]}. %% @doc High-water mark of queued messages -{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [ +{mapping, "mqtt.mqueue.high_watermark", "emqttd.mqueue", [ {default, "60%"}, {datatype, string} ]}. %% @doc Queue Qos0 messages? -{mapping, "mqtt.queue.qos0", "emqttd.queue", [ +{mapping, "mqtt.mqueue.store_qos0", "emqttd.mqueue", [ {default, true}, {datatype, {enum, [true, false]}} ]}. -{translation, "emqttd.queue", fun(Conf) -> +{translation, "emqttd.mqueue", fun(Conf) -> Parse = fun(S) -> {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), list_to_integer(N) / 100 end, - Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)}, - {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)}, - {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))}, - {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))}, - {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}], - case cuttlefish:conf_get("mqtt.queue.priority", Conf) of + Opts = [{type, cuttlefish:conf_get("mqtt.mqueue.type", Conf, simple)}, + {max_length, cuttlefish:conf_get("mqtt.mqueue.max_length", Conf)}, + {low_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.low_watermark", Conf))}, + {high_watermark, Parse(cuttlefish:conf_get("mqtt.mqueue.high_watermark", Conf))}, + {store_qos0, cuttlefish:conf_get("mqtt.mqueue.store_qos0", Conf)}], + case cuttlefish:conf_get("mqtt.mqueue.priority", Conf) of undefined -> Opts; - V -> [{priority, - [begin [T, P] = string:tokens(S, "="), - {T, list_to_integer(P)} - end || S <- string:tokens(V, ",")]}|Opts] + V -> [{priority, + [begin [T, P] = string:tokens(S, "="), + {T, list_to_integer(P)} + end || S <- string:tokens(V, ",")]} | Opts] end end}. @@ -531,165 +581,388 @@ end}. %% MQTT Listeners %%-------------------------------------------------------------------- -{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ - %% {default, 1883}, +%%-------------------------------------------------------------------- +%% TCP Listeners + +{mapping, "listener.tcp.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.max_clients", "emqttd.listeners", [ {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.rate_limit", "emqttd.listeners", [ {default, undefined}, {datatype, string} ]}. -{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.tcp.$name.proxy_protocol", "emqttd.listeners", [ + %%{default, off}, + {datatype, flag} +]}. + +{mapping, "listener.tcp.$name.proxy_protocol_timeout", "emqttd.listeners", [ + %%{default, "5s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "listener.tcp.$name.backlog", "emqttd.listeners", [ {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [ - {datatype, integer}, +{mapping, "listener.tcp.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, hidden ]}. -{mapping, "mqtt.listener.tcp.tune_buffer", "emqttd.listeners", [ - {default, off}, - {datatype, flag} +{mapping, "listener.tcp.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden ]}. -{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [ +{mapping, "listener.tcp.$name.nodelay", "emqttd.listeners", [ {datatype, {enum, [true, false]}}, hidden ]}. -{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ - %% {default, 8883}, +%%-------------------------------------------------------------------- +%% SSL Listeners + +{mapping, "listener.ssl.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ - {default, 512}, +{mapping, "listener.ssl.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.zone", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.tls_versions", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.mountpoint", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.rate_limit", "emqttd.listeners", [ + {default, undefined}, + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.proxy_protocol", "emqttd.listeners", [ + %%{default, off}, + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.proxy_protocol_timeout", "emqttd.listeners", [ + %%{default, "5s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "listener.ssl.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ssl.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ssl.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.ssl.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.ssl.$name.tls_versions", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.ciphers", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.handshake_timeout", "emqttd.listeners", [ {default, "15s"}, {datatype, {duration, ms}} ]}. -{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.dhfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.keyfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.certfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ssl.$name.verify", "emqttd.listeners", [ {datatype, atom} ]}. -{mapping, "mqtt.listener.ssl.fail_if_no_peer_cert", "emqttd.listeners", [ +{mapping, "listener.ssl.$name.fail_if_no_peer_cert", "emqttd.listeners", [ {datatype, {enum, [true, false]}} ]}. -{mapping, "mqtt.listener.http", "emqttd.listeners", [ - %% {default, 8083}, +{mapping, "listener.ssl.$name.secure_renegotiate", "emqttd.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.reuse_sessions", "emqttd.listeners", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.honor_cipher_order", "emqttd.listeners", [ + {datatype, flag} +]}. + +{mapping, "listener.ssl.$name.peer_cert_as_username", "emqttd.listeners", [ + {datatype, {enum, [cn, dn]}} +]}. + +%%-------------------------------------------------------------------- +%% MQTT/WebSocket Listeners + +{mapping, "listener.ws.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ +{mapping, "listener.ws.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ - {default, 64}, +{mapping, "listener.ws.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https", "emqttd.listeners", [ - %%{default, 8084}, +{mapping, "listener.ws.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.ws.$name.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "listener.ws.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.ws.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.ws.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT/WebSocket/SSL Listeners + +{mapping, "listener.wss.$name", "emqttd.listeners", [ {datatype, [integer, ip]} ]}. -{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [ +{mapping, "listener.wss.$name.acceptors", "emqttd.listeners", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [ - {default, 64}, +{mapping, "listener.wss.$name.max_clients", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [ - {default, 15}, +{mapping, "listener.wss.$name.zone", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.mountpoint", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.access.$id", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "listener.wss.$name.backlog", "emqttd.listeners", [ + {default, 1024}, {datatype, integer} ]}. -{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.recbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.sndbuf", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.buffer", "emqttd.listeners", [ + {datatype, bytesize}, + hidden +]}. + +{mapping, "listener.wss.$name.tune_buffer", "emqttd.listeners", [ + {datatype, flag}, + hidden +]}. + +{mapping, "listener.wss.$name.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "listener.wss.$name.handshake_timeout", "emqttd.listeners", [ + {default, "15s"}, + {datatype, {duration, ms}} +]}. + +{mapping, "listener.wss.$name.keyfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.certfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [ +{mapping, "listener.wss.$name.cacertfile", "emqttd.listeners", [ {datatype, string} ]}. -{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [ +{mapping, "listener.wss.$name.verify", "emqttd.listeners", [ {datatype, atom} ]}. -{mapping, "mqtt.listener.https.fail_if_no_peer_cert", "emqttd.listeners", [ +{mapping, "listener.wss.$name.fail_if_no_peer_cert", "emqttd.listeners", [ {datatype, {enum, [true, false]}} ]}. {translation, "emqttd.listeners", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + + Atom = fun(undefined) -> undefined; (S) -> list_to_atom(S) end, + + Access = fun(S) -> + [A, CIDR] = string:tokens(S, " "), + {list_to_atom(A), case CIDR of "all" -> all; _ -> CIDR end} + end, + + AccOpts = fun(Prefix) -> + case cuttlefish_variable:filter_by_prefix(Prefix ++ ".access", Conf) of + [] -> []; + Rules -> [{access, [Access(Rule) || {_, Rule} <- Rules]}] + end + end, + + MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, + + ConnOpts = fun(Prefix) -> + Filter([{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, + {rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, + {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, + {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, + {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, + {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}]) + end, + LisOpts = fun(Prefix) -> Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, - {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}]) + {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)} | AccOpts(Prefix)]) end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, @@ -707,29 +980,48 @@ end}. L -> [list_to_atom(V) || V <- L] end, Filter([{versions, Versions}, - {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf), undefined}, + {ciphers, SplitFun(cuttlefish:conf_get(Prefix ++ ".ciphers", Conf, undefined))}, + {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf, undefined)}, + {dhfile, cuttlefish:conf_get(Prefix ++ ".dhfile", Conf, undefined)}, {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, {verify, cuttlefish:conf_get(Prefix ++ ".verify", Conf, undefined)}, - {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}]) + {fail_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ ".fail_if_no_peer_cert", Conf, undefined)}, + {secure_renegotiate, cuttlefish:conf_get(Prefix ++ ".secure_renegotiate", Conf, undefined)}, + {reuse_sessions, cuttlefish:conf_get(Prefix ++ ".reuse_sessions", Conf, undefined)}, + {honor_cipher_order, cuttlefish:conf_get(Prefix ++ ".honor_cipher_order", Conf, undefined)}]) end, - Listeners = fun(Name) when is_atom(Name) -> - Key = "mqtt.listener." ++ atom_to_list(Name), - case cuttlefish:conf_get(Key, Conf, undefined) of - undefined -> - []; - Port -> - ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]), - Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)], - [{Name, Port, case Name =:= ssl orelse Name =:= https of - true -> [{sslopts, SslOpts(Key)} | Opts]; - false -> Opts - end}] - end - end, - lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)]) + TcpListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, {sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + end + end, + + SslListeners = fun(Type, Name) -> + Prefix = string:join(["listener", Type, Name], "."), + case cuttlefish:conf_get(Prefix, Conf, undefined) of + undefined -> + []; + ListenOn -> + [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, + {sockopts, TcpOpts(Prefix)}, + {sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}] + end + end, + + lists:flatten([TcpListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.tcp", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.ws", Conf)] + ++ + [SslListeners(Type, Name) || {["listener", Type, Name], ListenOn} + <- cuttlefish_variable:filter_by_prefix("listener.ssl", Conf) + ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) end}. %%-------------------------------------------------------------------- diff --git a/rebar.config b/rebar.config index 3e4afc5d7..28b47bf58 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}} +{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq22"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","emq22"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}} ]}. {erl_opts, [{parse_transform,lager_transform}]}. diff --git a/src/emqttd.app.src b/src/emqttd.app.src index ba5e70f35..3a7ed3482 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,12 +1,12 @@ -{application, emqttd, [ - {description, "Erlang MQTT Broker"}, - {vsn, "2.1.2"}, - {modules, []}, - {registered, [emqttd_sup]}, - {applications, [kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog,pbkdf2]}, - {env, []}, - {mod, {emqttd_app, []}}, - {maintainers, ["Feng Lee "]}, - {licenses, ["Apache-2.0"]}, - {links, [{"Github", "https://github.com/emqtt/emqttd"}]} -]}. +{application,emqttd, + [{description,"Erlang MQTT Broker"}, + {vsn,"2.2"}, + {modules,[]}, + {registered,[emqttd_sup]}, + {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb, + lager_syslog,pbkdf2,bcrypt]}, + {env,[]}, + {mod,{emqttd_app,[]}}, + {maintainers,["Feng Lee "]}, + {licenses,["Apache-2.0"]}, + {links,[{"Github","https://github.com/emqtt/emqttd"}]}]}. diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 65d0c76f5..283d42a78 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -71,16 +71,10 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> PubSub :: pubsub(), Topic :: binary()). check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> - case lookup_mods(acl) of - [] -> case emqttd:env(allow_anonymous, false) of - true -> allow; - false -> deny - end; - AclMods -> check_acl(Client, PubSub, Topic, AclMods) - end. -check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) -> - lager:error("ACL: nomatch for ~s ~s ~s", [ClientId, PubSub, Topic]), - allow; + check_acl(Client, PubSub, Topic, lookup_mods(acl)). + +check_acl(_Client, _PubSub, _Topic, []) -> + emqttd:env(acl_nomatch, allow); check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) -> case Mod:check_acl({Client, PubSub, Topic}, State) of allow -> allow; diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index 1cd32c0f4..5305985c4 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -30,7 +30,7 @@ -define(ACL_RULE_TAB, mqtt_acl_rule). --record(state, {config, nomatch = allow}). +-record(state, {config}). %%-------------------------------------------------------------------- %% API @@ -86,11 +86,11 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> State :: #state{}). check_acl(_Who, #state{config = undefined}) -> allow; -check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) -> +check_acl({Client, PubSub, Topic}, #state{}) -> case match(Client, Topic, lookup(PubSub)) of {matched, allow} -> allow; {matched, deny} -> deny; - nomatch -> Default + nomatch -> ignore end. lookup(PubSub) -> diff --git a/src/emqttd_auth_mod.erl b/src/emqttd_auth_mod.erl index ff7f79a20..ee3b27ddf 100644 --- a/src/emqttd_auth_mod.erl +++ b/src/emqttd_auth_mod.erl @@ -61,8 +61,15 @@ passwd_hash(sha, Password) -> passwd_hash(sha256, Password) -> hexstring(crypto:hash(sha256, Password)); passwd_hash(pbkdf2,{Salt, Password, Macfun, Iterations, Dklen}) -> - {ok,Hexstring} = pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen), - pbkdf2:to_hex(Hexstring). + case pbkdf2:pbkdf2(Macfun, Password, Salt, Iterations, Dklen) of + {ok,Hexstring} -> pbkdf2:to_hex(Hexstring); + {error, Error} -> lager:error("PasswdHash with pbkdf2 error:~p", [Error]), error + end; +passwd_hash(bcrypt, {Salt, Password}) -> + case bcrypt:hashpw(Password, Salt) of + {ok, HashPassword} -> list_to_binary(HashPassword); + {error, Error}-> lager:error("PasswdHash with bcrypt error:~p", [Error]), error + end. hexstring(<>) -> iolist_to_binary(io_lib:format("~32.16.0b", [X])); diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 88f14c2d6..98db870e7 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -55,7 +55,7 @@ %% Unused fields: connname, peerhost, peerport -record(client_state, {connection, peername, conn_state, await_recv, rate_limit, packet_size, parser, proto_state, - keepalive, enable_stats, force_gc_count}). + keepalive, enable_stats, idle_timeout, force_gc_count}). -define(INFO_KEYS, [peername, conn_state, await_recv]). @@ -112,8 +112,9 @@ do_init(Conn, Env, Peername) -> RateLimit = get_value(rate_limit, Conn:opts()), PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), Parser = emqttd_parser:initial_state(PacketSize), - ProtoState = emqttd_protocol:init(Peername, SendFun, Env), + ProtoState = emqttd_protocol:init(Conn, Peername, SendFun, Env), EnableStats = get_value(client_enable_stats, Env, false), + IdleTimout = get_value(client_idle_timeout, Env, 30000), ForceGcCount = emqttd_gc:conn_max_gc_count(), State = run_socket(#client_state{connection = Conn, peername = Peername, @@ -124,8 +125,8 @@ do_init(Conn, Env, Peername) -> parser = Parser, proto_state = ProtoState, enable_stats = EnableStats, + idle_timeout = IdleTimout, force_gc_count = ForceGcCount}), - IdleTimout = get_value(client_idle_timeout, Env, 30000), gen_server2:enter_loop(?MODULE, [], State, self(), IdleTimout, {backoff, 2000, 2000, 20000}). @@ -275,9 +276,11 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(Reason, #client_state{connection = Conn, - keepalive = KeepAlive, - proto_state = ProtoState}) -> +terminate(Reason, State = #client_state{connection = Conn, + keepalive = KeepAlive, + proto_state = ProtoState}) -> + + ?LOG(debug, "Terminated for ~p", [Reason], State), Conn:fast_close(), emqttd_keepalive:cancel(KeepAlive), case {ProtoState, Reason} of @@ -300,12 +303,13 @@ code_change(_OldVsn, State, _Extra) -> received(<<>>, State) -> {noreply, gc(State), hibernate}; -received(Bytes, State = #client_state{parser = Parser, - packet_size = PacketSize, - proto_state = ProtoState}) -> +received(Bytes, State = #client_state{parser = Parser, + packet_size = PacketSize, + proto_state = ProtoState, + idle_timeout = IdleTimeout}) -> case catch emqttd_parser:parse(Bytes, Parser) of {more, NewParser} -> - {noreply, run_socket(State#client_state{parser = NewParser}), hibernate}; + {noreply, run_socket(State#client_state{parser = NewParser}), IdleTimeout}; {ok, Packet, Rest} -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of diff --git a/src/emqttd_cluster.erl b/src/emqttd_cluster.erl index 7990a5d51..ab40875aa 100644 --- a/src/emqttd_cluster.erl +++ b/src/emqttd_cluster.erl @@ -73,12 +73,14 @@ remove(Node) when Node =:= node() -> {error, {cannot_remove_self, Node}}; remove(Node) -> - case rpc:call(Node, ?MODULE, prepare, []) of + case is_clustered(Node) andalso rpc:call(Node, ?MODULE, prepare, []) of ok -> case emqttd_mnesia:remove_from_cluster(Node) of ok -> rpc:call(Node, ?MODULE, reboot, []); Error -> Error end; + false -> + {error, node_not_in_cluster}; {badrpc, nodedown} -> emqttd_mnesia:remove_from_cluster(Node); {badrpc, Reason} -> diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 4f825329a..08e620a37 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -58,25 +58,27 @@ -define(HIGH_WM, 0.6). +-define(PQUEUE, priority_queue). + -type(priority() :: {iolist(), pos_integer()}). -type(option() :: {type, simple | priority} - | {max_length, pos_integer() | infinity} + | {max_length, non_neg_integer()} %% Max queue length | {priority, list(priority())} | {low_watermark, float()} %% Low watermark | {high_watermark, float()} %% High watermark - | {queue_qos0, boolean()}). %% Queue Qos0? + | {store_qos0, boolean()}). %% Queue Qos0? --type(stat() :: {max_len, infinity | pos_integer()} +-type(stat() :: {max_len, non_neg_integer()} | {len, non_neg_integer()} | {dropped, non_neg_integer()}). -record(mqueue, {type :: simple | priority, - name, q :: queue:queue() | priority_queue:q(), + name, q :: queue:queue() | ?PQUEUE:q(), %% priority table pseq = 0, priorities = [], %% len of simple queue - len = 0, max_len = infinity, + len = 0, max_len = 0, low_wm = ?LOW_WM, high_wm = ?HIGH_WM, qos0 = false, dropped = 0, alarm_fun}). @@ -89,19 +91,19 @@ -spec(new(iolist(), list(option()), fun()) -> mqueue()). new(Name, Opts, AlarmFun) -> Type = get_value(type, Opts, simple), - MaxLen = get_value(max_length, Opts, infinity), + MaxLen = get_value(max_length, Opts, 0), init_q(#mqueue{type = Type, name = iolist_to_binary(Name), len = 0, max_len = MaxLen, low_wm = low_wm(MaxLen, Opts), high_wm = high_wm(MaxLen, Opts), - qos0 = get_value(queue_qos0, Opts, false), + qos0 = get_value(store_qos0, Opts, false), alarm_fun = AlarmFun}, Opts). init_q(MQ = #mqueue{type = simple}, _Opts) -> MQ#mqueue{q = queue:new()}; init_q(MQ = #mqueue{type = priority}, Opts) -> Priorities = get_value(priority, Opts, []), - init_p(Priorities, MQ#mqueue{q = priority_queue:new()}). + init_p(Priorities, MQ#mqueue{q = ?PQUEUE:new()}). init_p([], MQ) -> MQ; @@ -113,13 +115,13 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> <> = <>, {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}. -low_wm(infinity, _Opts) -> - infinity; +low_wm(0, _Opts) -> + undefined; low_wm(MaxLen, Opts) -> round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)). -high_wm(infinity, _Opts) -> - infinity; +high_wm(0, _Opts) -> + undefined; high_wm(MaxLen, Opts) -> round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)). @@ -132,12 +134,12 @@ type(#mqueue{type = Type}) -> Type. is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; -is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q). +is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q). len(#mqueue{type = simple, len = Len}) -> Len; -len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q). +len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q). -max_len(#mqueue{max_len= MaxLen}) -> MaxLen. +max_len(#mqueue{max_len = MaxLen}) -> MaxLen. %% @doc Dropped of the mqueue -spec(dropped(mqueue()) -> non_neg_integer()). @@ -148,14 +150,14 @@ dropped(#mqueue{dropped = Dropped}) -> Dropped. stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) -> [{len, case Type of simple -> Len; - priority -> priority_queue:len(Q) + priority -> ?PQUEUE:len(Q) end} | [{max_len, MaxLen}, {dropped, Dropped}]]. %% @doc Enqueue a message. -spec(in(mqtt_message(), mqueue()) -> mqueue()). in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) when Len >= MaxLen -> @@ -166,43 +168,45 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, - max_len = infinity}) -> + max_len = 0}) -> case lists:keysearch(Topic, 1, Priorities) of {value, {_, Pri}} -> - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}; + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}; false -> {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end; in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, max_len = MaxLen}) -> case lists:keysearch(Topic, 1, Priorities) of {value, {_, Pri}} -> - case priority_queue:plen(Pri, Q) >= MaxLen of + case ?PQUEUE:plen(Pri, Q) >= MaxLen of true -> - {_, Q1} = priority_queue:out(Pri, Q), - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)}; + {_, Q1} = ?PQUEUE:out(Pri, Q), + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)}; false -> - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end; false -> {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end. out(MQ = #mqueue{type = simple, len = 0}) -> {empty, MQ}; -out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> +out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> {R, Q2} = queue:out(Q), {R, MQ#mqueue{q = Q2, len = Len - 1}}; out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> {R, Q2} = queue:out(Q), {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}; out(MQ = #mqueue{type = priority, q = Q}) -> - {R, Q2} = priority_queue:out(Q), + {R, Q2} = ?PQUEUE:out(Q), {R, MQ#mqueue{q = Q2}}. +maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) -> + MQ; maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun}) when Len > HighWM -> Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]), @@ -213,6 +217,8 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun maybe_set_alarm(MQ) -> MQ. +maybe_clear_alarm(MQ = #mqueue{low_wm = undefined}) -> + MQ; maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun}) when Len < LowWM -> MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))}; diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index b87274455..433825253 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -27,7 +27,7 @@ -import(proplists, [get_value/2, get_value/3]). %% API --export([init/3, info/1, stats/1, clientid/1, client/1, session/1]). +-export([init/3, init/4, info/1, stats/1, clientid/1, client/1, session/1]). -export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]). @@ -43,12 +43,12 @@ -record(proto_state, {peername, sendfun, connected = false, client_id, client_pid, clean_sess, proto_ver, proto_name, username, is_superuser, will_msg, keepalive, max_clientid_len, session, stats_data, - ws_initial_headers, connected_at}). + mountpoint, ws_initial_headers, connected_at}). -type(proto_state() :: #proto_state{}). -define(INFO_KEYS, [client_id, username, clean_sess, proto_ver, proto_name, - keepalive, will_msg, ws_initial_headers, connected_at]). + keepalive, will_msg, ws_initial_headers, mountpoint, connected_at]). -define(STATS_KEYS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -63,12 +63,22 @@ init(Peername, SendFun, Opts) -> WsInitialHeaders = get_value(ws_initial_headers, Opts), #proto_state{peername = Peername, sendfun = SendFun, - client_pid = self(), max_clientid_len = MaxLen, is_superuser = false, + client_pid = self(), ws_initial_headers = WsInitialHeaders, stats_data = #proto_stats{enable_stats = EnableStats}}. +init(Conn, Peername, SendFun, Opts) -> + enrich_opt(Conn:opts(), Conn, init(Peername, SendFun, Opts)). + +enrich_opt([], _Conn, State) -> + State; +enrich_opt([{mountpoint, MountPoint} | ConnOpts], Conn, State) -> + enrich_opt(ConnOpts, Conn, State#proto_state{mountpoint = MountPoint}); +enrich_opt([_ | ConnOpts], Conn, State) -> + enrich_opt(ConnOpts, Conn, State). + info(ProtoState) -> ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS). @@ -87,6 +97,7 @@ client(#proto_state{client_id = ClientId, keepalive = Keepalive, will_msg = WillMsg, ws_initial_headers = WsInitialHeaders, + mountpoint = MountPoint, connected_at = Time}) -> WillTopic = if WillMsg =:= undefined -> undefined; @@ -101,6 +112,7 @@ client(#proto_state{client_id = ClientId, keepalive = Keepalive, will_topic = WillTopic, ws_initial_headers = WsInitialHeaders, + mountpoint = MountPoint, connected_at = Time}. session(#proto_state{session = Session}) -> @@ -167,13 +179,13 @@ process(?CONNECT_PACKET(Var), State0) -> keep_alive = KeepAlive, client_id = ClientId} = Var, - State1 = State0#proto_state{proto_ver = ProtoVer, - proto_name = ProtoName, - username = Username, - client_id = ClientId, - clean_sess = CleanSess, - keepalive = KeepAlive, - will_msg = willmsg(Var), + State1 = State0#proto_state{proto_ver = ProtoVer, + proto_name = ProtoName, + username = Username, + client_id = ClientId, + clean_sess = CleanSess, + keepalive = KeepAlive, + will_msg = willmsg(Var, State0), connected_at = os:timestamp()}, {ReturnCode1, SessPresent, State3} = @@ -240,10 +252,11 @@ process(?SUBSCRIBE_PACKET(PacketId, []), State) -> %% TODO: refactor later... process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), - State = #proto_state{session = Session, - client_id = ClientId, + State = #proto_state{client_id = ClientId, username = Username, - is_superuser = IsSuperuser}) -> + is_superuser = IsSuperuser, + mountpoint = MountPoint, + session = Session}) -> Client = client(State), TopicTable = parse_topic_table(RawTopicTable), AllowDenies = if IsSuperuser -> []; @@ -256,7 +269,8 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), false -> case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of {ok, TopicTable1} -> - emqttd_session:subscribe(Session, PacketId, TopicTable1), {ok, State}; + emqttd_session:subscribe(Session, PacketId, mount(MountPoint, TopicTable1)), + {ok, State}; {stop, _} -> {ok, State} end @@ -267,12 +281,13 @@ process(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics), - State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + State = #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint, + session = Session}) -> case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of {ok, TopicTable} -> - emqttd_session:unsubscribe(Session, TopicTable); + emqttd_session:unsubscribe(Session, mount(MountPoint, TopicTable)); {stop, _} -> ok end, @@ -286,11 +301,12 @@ process(?PACKET(?DISCONNECT), State) -> {stop, normal, State#proto_state{will_msg = undefined}}. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), - #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint, + session = Session}) -> Msg = emqttd_message:from_packet(Username, ClientId, Packet), - emqttd_session:publish(Session, Msg); + emqttd_session:publish(Session, mount(MountPoint, Msg)); publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> with_puback(?PUBACK, Packet, State); @@ -299,11 +315,12 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> with_puback(?PUBREC, Packet, State). with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), - State = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> + State = #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint, + session = Session}) -> Msg = emqttd_message:from_packet(Username, ClientId, Packet), - case emqttd_session:publish(Session, Msg) of + case emqttd_session:publish(Session, mount(MountPoint, Msg)) of ok -> send(?PUBACK_PACKET(Type, PacketId), State); {error, Error} -> @@ -311,10 +328,12 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), end. -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). -send(Msg, State = #proto_state{client_id = ClientId, username = Username}) +send(Msg, State = #proto_state{client_id = ClientId, + username = Username, + mountpoint = MountPoint}) when is_record(Msg, mqtt_message) -> emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), - send(emqttd_message:to_packet(Msg), State); + send(emqttd_message:to_packet(unmount(MountPoint, Msg)), State); send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> @@ -371,8 +390,11 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> %% emqttd_cm:unreg(ClientId). ok. -willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> - emqttd_message:from_packet(Packet). +willmsg(Packet, #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) -> + case emqttd_message:from_packet(Packet) of + undefined -> undefined; + Msg -> mount(MountPoint, Msg) + end. %% Generate a client if if nulll maybe_set_clientid(State = #proto_state{client_id = NullId}) @@ -513,3 +535,23 @@ check_acl(subscribe, Topic, Client) -> sp(true) -> 1; sp(false) -> 0. + +%%-------------------------------------------------------------------- +%% Mount Point +%%-------------------------------------------------------------------- + +mount(undefined, Any) -> + Any; +mount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> + Msg#mqtt_message{topic = <>}; +mount(MountPoint, TopicTable) when is_list(TopicTable) -> + [{<>, Opts} || {Topic, Opts} <- TopicTable]. + +unmount(undefined, Any) -> + Any; +unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> + case catch split_binary(Topic, byte_size(MountPoint)) of + {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0}; + _ -> Msg + end. + diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 65d773e0d..02d23567f 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -77,6 +77,8 @@ -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3, handle_pre_hibernate/1]). +-define(MQueue, emqttd_mqueue). + -record(state, { %% Clean Session Flag @@ -124,7 +126,7 @@ %% QoS 1 and QoS 2 messages pending transmission to the Client. %% %% Optionally, QoS 0 messages pending transmission to the Client. - mqueue :: emqttd_mqueue:mqueue(), + mqueue :: ?MQueue:mqueue(), %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. awaiting_rel :: map(), @@ -150,7 +152,9 @@ %% Force GC Count force_gc_count :: undefined | integer(), - created_at :: erlang:timestamp() + created_at :: erlang:timestamp(), + + ignore_loop_deliver = false :: boolean() }). -define(TIMEOUT, 60000). @@ -257,12 +261,9 @@ stats(#state{max_subscriptions = MaxSubscriptions, {subscriptions, maps:size(Subscriptions)}, {max_inflight, MaxInflight}, {inflight_len, Inflight:size()}, - {max_mqueue, case emqttd_mqueue:max_len(MQueue) of - infinity -> 0; - Len -> Len - end}, - {mqueue_len, emqttd_mqueue:len(MQueue)}, - {mqueue_dropped, emqttd_mqueue:dropped(MQueue)}, + {max_mqueue, ?MQueue:max_len(MQueue)}, + {mqueue_len, ?MQueue:len(MQueue)}, + {mqueue_dropped, ?MQueue:dropped(MQueue)}, {max_awaiting_rel, MaxAwaitingRel}, {awaiting_rel_len, maps:size(AwaitingRel)}, {deliver_msg, get(deliver_msg)}, @@ -282,11 +283,12 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> true = link(ClientPid), init_stats([deliver_msg, enqueue_msg]), {ok, Env} = emqttd:env(session), - {ok, QEnv} = emqttd:env(queue), + {ok, QEnv} = emqttd:env(mqueue), MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), + IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), ForceGcCount = emqttd_gc:conn_max_gc_count(), - MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), + MQueue = ?MQueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), State = #state{clean_sess = CleanSess, binding = binding(ClientPid), client_id = ClientId, @@ -305,7 +307,8 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> expiry_interval = get_value(expiry_interval, Env), enable_stats = EnableStats, force_gc_count = ForceGcCount, - created_at = os:timestamp()}, + created_at = os:timestamp(), + ignore_loop_deliver = IgnoreLoopDeliver}, emqttd_sm:register_session(ClientId, CleanSess, info(State)), emqttd_hooks:run('session.created', [ClientId, Username]), {ok, emit_stats(State), hibernate, {backoff, 1000, 1000, 10000}}. @@ -526,6 +529,14 @@ handle_cast({destroy, ClientId}, handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). +%% Dispatch message from self publish +handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, + State = #state{client_id = ClientId, + ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) -> + case IgnoreLoopDeliver of + true -> {noreply, State, hibernate}; + false -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate} + end; %% Dispatch Message handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> {noreply, gc(dispatch(tune_qos(Topic, Msg, State), State)), hibernate}; @@ -698,7 +709,7 @@ dispatch(Msg = #mqtt_message{qos = QoS}, enqueue_msg(Msg, State = #state{mqueue = Q}) -> inc_stats(enqueue_msg), - State#state{mqueue = emqttd_mqueue:in(Msg, Q)}. + State#state{mqueue = ?MQueue:in(Msg, Q)}. %%-------------------------------------------------------------------- %% Deliver @@ -765,7 +776,7 @@ dequeue(State = #state{inflight = Inflight}) -> end. dequeue2(State = #state{mqueue = Q}) -> - case emqttd_mqueue:out(Q) of + case ?MQueue:out(Q) of {empty, _Q} -> State; {{value, Msg}, Q1} -> diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index 0c97a5c49..d9795be37 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -93,7 +93,7 @@ init([Env, WsPid, Req, ReplyChannel]) -> Headers = mochiweb_headers:to_list( mochiweb_request:get(headers, Req)), Conn = Req:get(connection), - ProtoState = emqttd_protocol:init(Peername, send_fun(ReplyChannel), + ProtoState = emqttd_protocol:init(Conn, Peername, send_fun(ReplyChannel), [{ws_initial_headers, Headers} | Env]), IdleTimeout = get_value(client_idle_timeout, Env, 30000), EnableStats = get_value(client_enable_stats, Env, false),