mqtt listeners

This commit is contained in:
Feng Lee 2016-10-13 17:37:32 +08:00
parent 1f50175c6f
commit a01f642606
3 changed files with 272 additions and 176 deletions

View File

@ -1,16 +1,7 @@
##====================================================================
##
## Configration for EMQ 3.0
##
##====================================================================
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Node ## Node Args
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Node Id: 1~255
node.id = 1
## Node Name ## Node Name
node.name = emqttd@127.0.0.1 node.name = emqttd@127.0.0.1
@ -30,7 +21,7 @@ node.async_threads = 32
node.process_limit = 256000 node.process_limit = 256000
## Sets the maximum number of simultaneously existing ports for this system ## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 262144 node.max_ports = 65536
## Set the distribution buffer busy limit (dist_buf_busy_limit) ## Set the distribution buffer busy limit (dist_buf_busy_limit)
node.dist_buffer_size = 32MB node.dist_buffer_size = 32MB
@ -45,6 +36,13 @@ node.fullsweep_after = 1000
## Crash dump ## Crash dump
node.crash_dump = log/crash.dump node.crash_dump = log/crash.dump
## Distributed node ticktime
node.dist_net_ticktime = 60
## Distributed node port range
## node.dist_listen_min = 6000
## node.dist_listen_max = 6999
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Log ## Log
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
@ -52,7 +50,7 @@ node.crash_dump = log/crash.dump
## Console log. Enum: off, file, console, both ## Console log. Enum: off, file, console, both
log.console = console log.console = console
## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency, none ## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
log.console.level = error log.console.level = error
## Console log file ## Console log file
@ -145,50 +143,6 @@ mqtt.pubsub.by_clientid = true
## Subscribe Asynchronously ## Subscribe Asynchronously
mqtt.pubsub.async = true mqtt.pubsub.async = true
##--------------------------------------------------------------------
## MQTT Listeners
##--------------------------------------------------------------------
## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
mqtt.listener.tcp = 1883
mqtt.listener.tcp.acceptors = 8
mqtt.listener.tcp.max_clients = 1024
## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.listener.tcp.rate_limit = 100,10
## Mount Point
## mqtt.listener.tcp.mount_point = prefix/
## TCP Socket Options. Set buffer if hight thoughtput
## mqtt.listener.tcp.opts.recbuf = 4096
## mqtt.listener.tcp.opts.sndbuf = 4096
## mqtt.listener.tcp.opts.buffer = 4096
## mqtt.listener.tcp.opts.nodelay = true
## mqtt.listener.tcp.opts.backlog = 1024
## SSL Listener
mqtt.listener.ssl = 8883
mqtt.listener.ssl.acceptors = 4
mqtt.listener.ssl.max_clients = 512
## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.listener.ssl.rate_limit = 100,10
## Mount Point
## mqtt.listener.ssl.mount_point = prefix/
## mqtt.listener.ssl.opts.handshake_timeout = 10 ## Seconds
## mqtt.listener.ssl.opts.certfile = $(platform_etc_dir)/ssl/cert.pem
## mqtt.listener.ssl.opts.keyfile = $(platform_etc_dir)/ssl/key.pem
## mqtt.listener.ssl.opts.cacertfile = $(platform_etc_dir)/ssl/cacert.pem
## mqtt.listener.ssl.opts.verify = verify_peer
## mqtt.listener.ssl.opts.failed_if_no_peer_cert = true
## HTTP Listener
mqtt.listener.http = 127.0.0.1:8083
mqtt.listener.http.acceptors = 4
mqtt.listener.http.max_clients = 64
## HTTP(SSL) Listener
## mqtt.listener.http = 8083
## mqtt.listener.http.acceptors = 4
## mqtt.listener.http.max_clients = 64
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## MQTT Bridge ## MQTT Bridge
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
@ -196,8 +150,8 @@ mqtt.listener.http.max_clients = 64
## Bridge Queue Size ## Bridge Queue Size
mqtt.bridge.max_queue_len = 10000 mqtt.bridge.max_queue_len = 10000
## Ping Interval of bridge node ## Ping Interval of bridge node. Unit: Second
mqtt.bridge.ping_down_interval = 1 ## Second mqtt.bridge.ping_down_interval = 1
##------------------------------------------------------------------- ##-------------------------------------------------------------------
## MQTT Plugins ## MQTT Plugins
@ -209,35 +163,77 @@ mqtt.plugins.etc_dir = etc/plugins/
## File to store loaded plugin names. ## File to store loaded plugin names.
mqtt.plugins.loaded_file = data/loaded_plugins mqtt.plugins.loaded_file = data/loaded_plugins
##------------------------------------------------------------------- ##--------------------------------------------------------------------
## MQTT Modules ## TCP Listener
##------------------------------------------------------------------- ##--------------------------------------------------------------------
## Enable retainer module ## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
## mqtt.module.retainer = on mqtt.listener.tcp = 1883
## disc: disc_copies, ram: ram_copies
## mqtt.module.retainer.storage_type = disc
## Max number of retained messages
## mqtt.module.retainer.max_message_num = 100000
## Max Payload Size of retained message
## mqtt.module.retainer.max_playload_size = 65536
## Expired after seconds, never expired if 0
## mqtt.module.retainer.expired_after = 0
## Enable presence module ## Size of acceptor pool
## Client presence management module. Publish presence messages when mqtt.listener.tcp.acceptors = 8
## client connected or disconnected.
## mqtt.module.presence = on
## mqtt.module.presence.qos = 0
## Enable subscription module ## Maximum number of concurrent clients
## Subscribe topics automatically when client connected mqtt.listener.tcp.max_clients = 1024
## mqtt.module.subscription = on
## mqtt.module.subscription.topics = $client/%c,1;topic2,1
## [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite) ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.module.rewrite = off ## mqtt.listener.tcp.rate_limit = 100,10
## mqtt.module.rewrite.config = etc/modules/rewrite.conf
## TCP Socket Options
mqtt.listener.tcp.backlog = 1024
## mqtt.listener.tcp.recbuf = 4096
## mqtt.listener.tcp.sndbuf = 4096
## mqtt.listener.tcp.buffer = 4096
## mqtt.listener.tcp.nodelay = true
##--------------------------------------------------------------------
## SSL Listener
##--------------------------------------------------------------------
## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883
mqtt.listener.ssl = 8883
## Size of acceptor pool
mqtt.listener.ssl.acceptors = 4
## Maximum number of concurrent clients
mqtt.listener.ssl.max_clients = 512
## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.listener.ssl.rate_limit = 100,10
## Configuring SSL Options
## See http://erlang.org/doc/man/ssl.html
mqtt.listener.ssl.handshake_timeout = 10 #seconds
## mqtt.listener.ssl.keyfile = /path/to/key.pem
## mqtt.listener.ssl.certfile = /path/to/cert.pem
## mqtt.listener.ssl.cacertfile = /path/to/cacert.pem
## mqtt.listener.ssl.verify = verify_peer
## mqtt.listener.ssl.failed_if_no_peer_cert = true
##--------------------------------------------------------------------
## HTTP Listener
##--------------------------------------------------------------------
mqtt.listener.http = 8083
mqtt.listener.http.acceptors = 4
mqtt.listener.http.max_clients = 64
##--------------------------------------------------------------------
## HTTP(SSL) Listener
##--------------------------------------------------------------------
## mqtt.listener.https = 8083
## mqtt.listener.https.acceptors = 4
## mqtt.listener.https.max_clients = 64
## mqtt.listener.https.handshake_timeout = 10 #seconds
## mqtt.listener.https.certfile = etc/ssl/cert.pem
## mqtt.listener.https.keyfile = etc/ssl/key.pem
## mqtt.listener.https.cacertfile = etc/ssl/cacert.pem
## mqtt.listener.https.verify = verify_peer
## mqtt.listener.https.failed_if_no_peer_cert = true
##------------------------------------------------------------------- ##-------------------------------------------------------------------
## System Monitor ## System Monitor

View File

@ -1,17 +1,10 @@
%%-*- mode: erlang -*- %%-*- mode: erlang -*-
%% EMQ 3.0 config mapping %% EMQ config mapping
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Erlang Node %% Erlang Node
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Node Id
{mapping, "node.id", "emqttd.node_id", [
{default, 1},
{datatype, integer},
{validators, ["range:0-255"]}
]}.
%% @doc Erlang node name %% @doc Erlang node name
{mapping, "node.name", "vm_args.-name", [ {mapping, "node.name", "vm_args.-name", [
{default, "emqttd@127.0.0.1"} {default, "emqttd@127.0.0.1"}
@ -117,6 +110,27 @@
hidden hidden
]}. ]}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [
{commented, 60},
{datatype, integer},
hidden
]}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html
{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [
{commented, 6000},
{datatype, integer},
hidden
]}.
%% @see node.dist_listen_min
{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [
{commented, 6999},
{datatype, integer},
hidden
]}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Log %% Log
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -388,83 +402,6 @@ end}.
{async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}]
end}. end}.
%%--------------------------------------------------------------------
%% MQTT Listeners
%%--------------------------------------------------------------------
{mapping, "mqtt.listener.tcp", "emqttd.listeners", [
{default, 1883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl", "emqttd.listeners", [
{default, 8883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [
{default, 512},
{datatype, integer}
]}.
{mapping, "mqtt.listener.http", "emqttd.listeners", [
{default, 8883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [
{default, 64},
{datatype, integer}
]}.
{translation, "emqttd.listeners", fun(Conf) ->
TcpListeners = case cuttlefish:conf_get("mqtt.listener.tcp", Conf) of
undefined ->
[];
TcpPort ->
TcpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.tcp.acceptors", Conf)},
{max_clients, cuttlefish:conf_get("mqtt.listener.tcp.max_clients", Conf)}],
[{tcp, TcpPort, TcpOpts}]
end,
SslListeners = case cuttlefish:conf_get("mqtt.listener.ssl", Conf) of
undefined ->
[];
SslPort ->
SslOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.ssl.acceptors", Conf)},
{max_clients, cuttlefish:conf_get("mqtt.listener.ssl.max_clients", Conf)}],
[{ssl, SslPort, SslOpts}]
end,
HttpListeners = case cuttlefish:conf_get("mqtt.listener.http", Conf) of
undefined ->
[];
HttPort ->
HttpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.http.acceptors", Conf)},
{max_clients, cuttlefish:conf_get("mqtt.listener.http.max_clients", Conf)}],
[{http, HttPort, HttpOpts}]
end,
TcpListeners ++ SslListeners ++ HttpListeners
end}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT Bridge %% MQTT Bridge
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -496,6 +433,169 @@ end}.
{datatype, string} {datatype, string}
]}. ]}.
%%--------------------------------------------------------------------
%% MQTT Listeners
%%--------------------------------------------------------------------
{mapping, "mqtt.listener.tcp", "emqttd.listeners", [
{default, 1883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [
{default, undefined},
{datatype, string},
hidden
]}.
{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "mqtt.listener.ssl", "emqttd.listeners", [
{default, 8883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [
{default, 512},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [
{default, 10},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [
{datatype, {enum, [true, false]}}
]}.
{mapping, "mqtt.listener.http", "emqttd.listeners", [
{default, 8883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [
{default, 64},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https", "emqttd.listeners", [
{default, undefined},
{datatype, [integer, ip]},
hidden
]}.
{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [
{default, 64},
{datatype, integer}
]}.
{translation, "emqttd.listeners", fun(Conf) ->
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
TcpListeners = case cuttlefish:conf_get("mqtt.listener.tcp", Conf) of
undefined ->
[];
TcpPort ->
TcpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.tcp.acceptors", Conf)},
{max_clients, cuttlefish:conf_get("mqtt.listener.tcp.max_clients", Conf)},
{rate_limt, cuttlefish:conf_get("mqtt.listener.tcp.rate_limit", Conf, undefined)}],
[{tcp, TcpPort, Filter(TcpOpts)}]
end,
SslListeners = case cuttlefish:conf_get("mqtt.listener.ssl", Conf) of
undefined ->
[];
SslPort ->
SslOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.ssl.acceptors", Conf)},
{max_clients, cuttlefish:conf_get("mqtt.listener.ssl.max_clients", Conf)}],
[{ssl, SslPort, Filter(SslOpts)}]
end,
HttpListeners = case cuttlefish:conf_get("mqtt.listener.http", Conf) of
undefined ->
[];
HttPort ->
HttpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.http.acceptors", Conf)},
{max_clients, cuttlefish:conf_get("mqtt.listener.http.max_clients", Conf)}],
[{http, HttPort, Filter(HttpOpts)}]
end,
HttpsListeners = case cuttlefish:conf_get("mqtt.listener.https", Conf, undefined) of
undefined ->
[];
HttsPort ->
HttpsOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.https.acceptors", Conf)},
{max_clients, cuttlefish:conf_get("mqtt.listener.https.max_clients", Conf)}],
[{https, HttsPort, Filter(HttpsOpts)}]
end,
TcpListeners ++ SslListeners ++ HttpListeners ++ HttpsListeners
end}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% System Monitor %% System Monitor
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -170,20 +170,20 @@ start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners, [
%% Start mqtt listener %% Start mqtt listener
-spec(start_listener(listener()) -> any()). -spec(start_listener(listener()) -> any()).
start_listener({listener, tcp, ListenOn, Opts}) -> start_listener({tcp, ListenOn, Opts}) ->
start_listener('mqtt/tcp', ListenOn, Opts); start_listener('mqtt:tcp', ListenOn, Opts);
%% Start mqtt(SSL) listener %% Start mqtt(SSL) listener
start_listener({listener, ssl, ListenOn, Opts}) -> start_listener({ssl, ListenOn, Opts}) ->
start_listener('mqtt/ssl', ListenOn, Opts); start_listener('mqtt:ssl', ListenOn, Opts);
%% Start http listener %% Start http listener
start_listener({listener, ws, ListenOn, Opts}) -> start_listener({http, ListenOn, Opts}) ->
mochiweb:start_http('mqtt/ws', ListenOn, Opts, {emqttd_http, handle_request, []}); mochiweb:start_http('mqtt:http', ListenOn, Opts, {emqttd_http, handle_request, []});
%% Start https listener %% Start https listener
start_listener({listener, wss, ListenOn, Opts}) -> start_listener({listener, https, ListenOn, Opts}) ->
mochiweb:start_http('mqtt/wss', ListenOn, Opts, {emqttd_http, handle_request, []}). mochiweb:start_http('mqtt:https', ListenOn, Opts, {emqttd_http, handle_request, []}).
start_listener(Protocol, ListenOn, Opts) -> start_listener(Protocol, ListenOn, Opts) ->
{ok, Env} = emqttd:env(protocol), {ok, Env} = emqttd:env(protocol),