diff --git a/etc/emq.conf b/etc/emq.conf index a8531cd69..4303f97f1 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -1,16 +1,7 @@ -##==================================================================== -## -## Configration for EMQ 3.0 -## -##==================================================================== - ##-------------------------------------------------------------------- -## Node +## Node Args ##-------------------------------------------------------------------- -## Node Id: 1~255 -node.id = 1 - ## Node Name node.name = emqttd@127.0.0.1 @@ -30,7 +21,7 @@ node.async_threads = 32 node.process_limit = 256000 ## Sets the maximum number of simultaneously existing ports for this system -node.max_ports = 262144 +node.max_ports = 65536 ## Set the distribution buffer busy limit (dist_buf_busy_limit) node.dist_buffer_size = 32MB @@ -45,6 +36,13 @@ node.fullsweep_after = 1000 ## Crash dump node.crash_dump = log/crash.dump +## Distributed node ticktime +node.dist_net_ticktime = 60 + +## Distributed node port range +## node.dist_listen_min = 6000 +## node.dist_listen_max = 6999 + ##-------------------------------------------------------------------- ## Log ##-------------------------------------------------------------------- @@ -52,7 +50,7 @@ node.crash_dump = log/crash.dump ## Console log. Enum: off, file, console, both log.console = console -## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency, none +## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency log.console.level = error ## Console log file @@ -145,50 +143,6 @@ mqtt.pubsub.by_clientid = true ## Subscribe Asynchronously mqtt.pubsub.async = true -##-------------------------------------------------------------------- -## MQTT Listeners -##-------------------------------------------------------------------- - -## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 -mqtt.listener.tcp = 1883 -mqtt.listener.tcp.acceptors = 8 -mqtt.listener.tcp.max_clients = 1024 -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.tcp.rate_limit = 100,10 -## Mount Point -## mqtt.listener.tcp.mount_point = prefix/ -## TCP Socket Options. Set buffer if hight thoughtput -## mqtt.listener.tcp.opts.recbuf = 4096 -## mqtt.listener.tcp.opts.sndbuf = 4096 -## mqtt.listener.tcp.opts.buffer = 4096 -## mqtt.listener.tcp.opts.nodelay = true -## mqtt.listener.tcp.opts.backlog = 1024 - -## SSL Listener -mqtt.listener.ssl = 8883 -mqtt.listener.ssl.acceptors = 4 -mqtt.listener.ssl.max_clients = 512 -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.ssl.rate_limit = 100,10 -## Mount Point -## mqtt.listener.ssl.mount_point = prefix/ -## mqtt.listener.ssl.opts.handshake_timeout = 10 ## Seconds -## mqtt.listener.ssl.opts.certfile = $(platform_etc_dir)/ssl/cert.pem -## mqtt.listener.ssl.opts.keyfile = $(platform_etc_dir)/ssl/key.pem -## mqtt.listener.ssl.opts.cacertfile = $(platform_etc_dir)/ssl/cacert.pem -## mqtt.listener.ssl.opts.verify = verify_peer -## mqtt.listener.ssl.opts.failed_if_no_peer_cert = true - -## HTTP Listener -mqtt.listener.http = 127.0.0.1:8083 -mqtt.listener.http.acceptors = 4 -mqtt.listener.http.max_clients = 64 - -## HTTP(SSL) Listener -## mqtt.listener.http = 8083 -## mqtt.listener.http.acceptors = 4 -## mqtt.listener.http.max_clients = 64 - ##-------------------------------------------------------------------- ## MQTT Bridge ##-------------------------------------------------------------------- @@ -196,8 +150,8 @@ mqtt.listener.http.max_clients = 64 ## Bridge Queue Size mqtt.bridge.max_queue_len = 10000 -## Ping Interval of bridge node -mqtt.bridge.ping_down_interval = 1 ## Second +## Ping Interval of bridge node. Unit: Second +mqtt.bridge.ping_down_interval = 1 ##------------------------------------------------------------------- ## MQTT Plugins @@ -209,35 +163,77 @@ mqtt.plugins.etc_dir = etc/plugins/ ## File to store loaded plugin names. mqtt.plugins.loaded_file = data/loaded_plugins -##------------------------------------------------------------------- -## MQTT Modules -##------------------------------------------------------------------- +##-------------------------------------------------------------------- +## TCP Listener +##-------------------------------------------------------------------- -## Enable retainer module -## mqtt.module.retainer = on -## disc: disc_copies, ram: ram_copies -## mqtt.module.retainer.storage_type = disc -## Max number of retained messages -## mqtt.module.retainer.max_message_num = 100000 -## Max Payload Size of retained message -## mqtt.module.retainer.max_playload_size = 65536 -## Expired after seconds, never expired if 0 -## mqtt.module.retainer.expired_after = 0 +## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 +mqtt.listener.tcp = 1883 -## Enable presence module -## Client presence management module. Publish presence messages when -## client connected or disconnected. -## mqtt.module.presence = on -## mqtt.module.presence.qos = 0 +## Size of acceptor pool +mqtt.listener.tcp.acceptors = 8 -## Enable subscription module -## Subscribe topics automatically when client connected -## mqtt.module.subscription = on -## mqtt.module.subscription.topics = $client/%c,1;topic2,1 +## Maximum number of concurrent clients +mqtt.listener.tcp.max_clients = 1024 -## [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite) -## mqtt.module.rewrite = off -## mqtt.module.rewrite.config = etc/modules/rewrite.conf +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.tcp.rate_limit = 100,10 + +## TCP Socket Options +mqtt.listener.tcp.backlog = 1024 +## mqtt.listener.tcp.recbuf = 4096 +## mqtt.listener.tcp.sndbuf = 4096 +## mqtt.listener.tcp.buffer = 4096 +## mqtt.listener.tcp.nodelay = true + +##-------------------------------------------------------------------- +## SSL Listener +##-------------------------------------------------------------------- + +## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 +mqtt.listener.ssl = 8883 + +## Size of acceptor pool +mqtt.listener.ssl.acceptors = 4 + +## Maximum number of concurrent clients +mqtt.listener.ssl.max_clients = 512 + +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.ssl.rate_limit = 100,10 + +## Configuring SSL Options +## See http://erlang.org/doc/man/ssl.html +mqtt.listener.ssl.handshake_timeout = 10 #seconds +## mqtt.listener.ssl.keyfile = /path/to/key.pem +## mqtt.listener.ssl.certfile = /path/to/cert.pem +## mqtt.listener.ssl.cacertfile = /path/to/cacert.pem +## mqtt.listener.ssl.verify = verify_peer +## mqtt.listener.ssl.failed_if_no_peer_cert = true + +##-------------------------------------------------------------------- +## HTTP Listener +##-------------------------------------------------------------------- + +mqtt.listener.http = 8083 + +mqtt.listener.http.acceptors = 4 + +mqtt.listener.http.max_clients = 64 + +##-------------------------------------------------------------------- +## HTTP(SSL) Listener +##-------------------------------------------------------------------- + +## mqtt.listener.https = 8083 +## mqtt.listener.https.acceptors = 4 +## mqtt.listener.https.max_clients = 64 +## mqtt.listener.https.handshake_timeout = 10 #seconds +## mqtt.listener.https.certfile = etc/ssl/cert.pem +## mqtt.listener.https.keyfile = etc/ssl/key.pem +## mqtt.listener.https.cacertfile = etc/ssl/cacert.pem +## mqtt.listener.https.verify = verify_peer +## mqtt.listener.https.failed_if_no_peer_cert = true ##------------------------------------------------------------------- ## System Monitor diff --git a/priv/emq.schema b/priv/emq.schema index deb2a1916..c6c6c4f61 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -1,17 +1,10 @@ %%-*- mode: erlang -*- -%% EMQ 3.0 config mapping +%% EMQ config mapping %%-------------------------------------------------------------------- %% Erlang Node %%-------------------------------------------------------------------- -%% @doc Node Id -{mapping, "node.id", "emqttd.node_id", [ - {default, 1}, - {datatype, integer}, - {validators, ["range:0-255"]} -]}. - %% @doc Erlang node name {mapping, "node.name", "vm_args.-name", [ {default, "emqttd@127.0.0.1"} @@ -117,6 +110,27 @@ hidden ]}. +%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime +{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ + {commented, 60}, + {datatype, integer}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html +{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ + {commented, 6000}, + {datatype, integer}, + hidden +]}. + +%% @see node.dist_listen_min +{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ + {commented, 6999}, + {datatype, integer}, + hidden +]}. + %%-------------------------------------------------------------------- %% Log %%-------------------------------------------------------------------- @@ -388,83 +402,6 @@ end}. {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] end}. -%%-------------------------------------------------------------------- -%% MQTT Listeners -%%-------------------------------------------------------------------- - -{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ - {default, 1883}, - {datatype, [integer, ip]} -]}. - -{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ - {default, 1024}, - {datatype, integer} -]}. - -{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ - {default, 8883}, - {datatype, [integer, ip]} -]}. - -{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ - {default, 512}, - {datatype, integer} -]}. - -{mapping, "mqtt.listener.http", "emqttd.listeners", [ - {default, 8883}, - {datatype, [integer, ip]} -]}. - -{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ - {default, 8}, - {datatype, integer} -]}. - -{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ - {default, 64}, - {datatype, integer} -]}. - -{translation, "emqttd.listeners", fun(Conf) -> - TcpListeners = case cuttlefish:conf_get("mqtt.listener.tcp", Conf) of - undefined -> - []; - TcpPort -> - TcpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.tcp.acceptors", Conf)}, - {max_clients, cuttlefish:conf_get("mqtt.listener.tcp.max_clients", Conf)}], - [{tcp, TcpPort, TcpOpts}] - end, - SslListeners = case cuttlefish:conf_get("mqtt.listener.ssl", Conf) of - undefined -> - []; - SslPort -> - SslOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.ssl.acceptors", Conf)}, - {max_clients, cuttlefish:conf_get("mqtt.listener.ssl.max_clients", Conf)}], - [{ssl, SslPort, SslOpts}] - end, - HttpListeners = case cuttlefish:conf_get("mqtt.listener.http", Conf) of - undefined -> - []; - HttPort -> - HttpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.http.acceptors", Conf)}, - {max_clients, cuttlefish:conf_get("mqtt.listener.http.max_clients", Conf)}], - [{http, HttPort, HttpOpts}] - end, - TcpListeners ++ SslListeners ++ HttpListeners -end}. - %%-------------------------------------------------------------------- %% MQTT Bridge %%-------------------------------------------------------------------- @@ -496,6 +433,169 @@ end}. {datatype, string} ]}. +%%-------------------------------------------------------------------- +%% MQTT Listeners +%%-------------------------------------------------------------------- + +{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ + {default, 1883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [ + {default, undefined}, + {datatype, string}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ + {default, 512}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [ + {default, 10}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "mqtt.listener.http", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https", "emqttd.listeners", [ + {default, undefined}, + {datatype, [integer, ip]}, + hidden +]}. + +{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{translation, "emqttd.listeners", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + TcpListeners = case cuttlefish:conf_get("mqtt.listener.tcp", Conf) of + undefined -> + []; + TcpPort -> + TcpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.tcp.acceptors", Conf)}, + {max_clients, cuttlefish:conf_get("mqtt.listener.tcp.max_clients", Conf)}, + {rate_limt, cuttlefish:conf_get("mqtt.listener.tcp.rate_limit", Conf, undefined)}], + [{tcp, TcpPort, Filter(TcpOpts)}] + end, + SslListeners = case cuttlefish:conf_get("mqtt.listener.ssl", Conf) of + undefined -> + []; + SslPort -> + SslOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.ssl.acceptors", Conf)}, + {max_clients, cuttlefish:conf_get("mqtt.listener.ssl.max_clients", Conf)}], + [{ssl, SslPort, Filter(SslOpts)}] + end, + HttpListeners = case cuttlefish:conf_get("mqtt.listener.http", Conf) of + undefined -> + []; + HttPort -> + HttpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.http.acceptors", Conf)}, + {max_clients, cuttlefish:conf_get("mqtt.listener.http.max_clients", Conf)}], + [{http, HttPort, Filter(HttpOpts)}] + end, + HttpsListeners = case cuttlefish:conf_get("mqtt.listener.https", Conf, undefined) of + undefined -> + []; + HttsPort -> + HttpsOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.https.acceptors", Conf)}, + {max_clients, cuttlefish:conf_get("mqtt.listener.https.max_clients", Conf)}], + [{https, HttsPort, Filter(HttpsOpts)}] + end, + TcpListeners ++ SslListeners ++ HttpListeners ++ HttpsListeners +end}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 901a7d8b4..f303274f6 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -170,20 +170,20 @@ start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners, [ %% Start mqtt listener -spec(start_listener(listener()) -> any()). -start_listener({listener, tcp, ListenOn, Opts}) -> - start_listener('mqtt/tcp', ListenOn, Opts); +start_listener({tcp, ListenOn, Opts}) -> + start_listener('mqtt:tcp', ListenOn, Opts); %% Start mqtt(SSL) listener -start_listener({listener, ssl, ListenOn, Opts}) -> - start_listener('mqtt/ssl', ListenOn, Opts); +start_listener({ssl, ListenOn, Opts}) -> + start_listener('mqtt:ssl', ListenOn, Opts); %% Start http listener -start_listener({listener, ws, ListenOn, Opts}) -> - mochiweb:start_http('mqtt/ws', ListenOn, Opts, {emqttd_http, handle_request, []}); +start_listener({http, ListenOn, Opts}) -> + mochiweb:start_http('mqtt:http', ListenOn, Opts, {emqttd_http, handle_request, []}); %% Start https listener -start_listener({listener, wss, ListenOn, Opts}) -> - mochiweb:start_http('mqtt/wss', ListenOn, Opts, {emqttd_http, handle_request, []}). +start_listener({listener, https, ListenOn, Opts}) -> + mochiweb:start_http('mqtt:https', ListenOn, Opts, {emqttd_http, handle_request, []}). start_listener(Protocol, ListenOn, Opts) -> {ok, Env} = emqttd:env(protocol),