This commit is contained in:
Feng Lee 2016-10-16 17:31:59 +08:00
commit 1038f854e3
37 changed files with 2242 additions and 906 deletions

1
.gitignore vendored
View File

@ -27,3 +27,4 @@ ct.coverdata
.idea/
emqttd.iml
_rel/
data/

View File

@ -2,14 +2,22 @@ PROJECT = emqttd
PROJECT_DESCRIPTION = Erlang MQTT Broker
PROJECT_VERSION = 2.0
DEPS = gproc lager gen_logger gen_conf esockd mochiweb
DEPS = gproc lager gen_logger esockd mochiweb getopt pbkdf2 \
clique time_compat rand_compat
dep_gproc = git https://github.com/uwiger/gproc
dep_lager = git https://github.com/basho/lager
dep_gen_conf = git https://github.com/emqtt/gen_conf
dep_gen_logger = git https://github.com/emqtt/gen_logger
dep_esockd = git https://github.com/emqtt/esockd emq20
dep_mochiweb = git https://github.com/emqtt/mochiweb
dep_gproc = git https://github.com/uwiger/gproc
dep_getopt = git https://github.com/jcomellas/getopt v0.8.2
dep_lager = git https://github.com/basho/lager master
dep_gen_logger = git https://github.com/emqtt/gen_logger
dep_esockd = git https://github.com/emqtt/esockd emq20
dep_mochiweb = git https://github.com/emqtt/mochiweb
dep_clique = git https://github.com/basho/clique
dep_pbkdf2 = git https://github.com/basho/erlang-pbkdf2 2.0.0
dep_time_compat = git https://github.com/lasp-lang/time_compat
dep_rand_compat = git https://github.com/lasp-lang/rand_compat
TEST_DEPS = cuttlefish
dep_cuttlefish = git https://github.com/emqtt/cuttlefish
ERLC_OPTS += +'{parse_transform, lager_transform}'
@ -31,3 +39,5 @@ include erlang.mk
app:: rebar.config
app.config::
cuttlefish -l info -e etc/ -c etc/emq.conf -i priv/emq.schema -d data/

29
etc/acl.conf Normal file
View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%%
%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL)
%%
%% -type who() :: all | binary() |
%% {ipaddr, esockd_access:cidr()} |
%% {client, binary()} |
%% {user, binary()}.
%%
%% -type access() :: subscribe | publish | pubsub.
%%
%% -type topic() :: binary().
%%
%% -type rule() :: {allow, all} |
%% {allow, who(), access(), list(topic())} |
%% {deny, all} |
%% {deny, who(), access(), list(topic())}.
%%
%%--------------------------------------------------------------------
{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, all}.

1
etc/certs/README Normal file
View File

@ -0,0 +1 @@
Place your SSL/TLS Certificates here.

15
etc/certs/cacert.pem Normal file
View File

@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICZTCCAc4CCQCPzzI1ezeZPTANBgkqhkiG9w0BAQUFADB3MQswCQYDVQQGEwJD
TjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MREwDwYDVQQK
EwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG9w0BCQEWDWhv
bmdAZW1xdHQuaW8wHhcNMTYxMDEzMDkwNzQ5WhcNMTYxMTEyMDkwNzQ5WjB3MQsw
CQYDVQQGEwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91
MREwDwYDVQQKEwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG
9w0BCQEWDWhvbmdAZW1xdHQuaW8wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGB
AJ/6ACaLPXP6wpGOqc9+jFRFN6ufODqGB5SamCNmOKXpSm/U+KT87NPg4i4wn31s
167nb65lk3IbdvzPzTSCAP6DG5s0+qDgpEMHeKKEC4zaAwoIxCgVUjab51RbVFBs
AhzowxdRl6jQrGVgvXiLzz1+3b+1Xydu5J5Z2IeLm8NPAgMBAAEwDQYJKoZIhvcN
AQEFBQADgYEAkt/VWi4tUUEdOnDwnCZ4IheV9Sp+6T3XsRxje7PKDsvZQlmpvMP6
StfM+wkxty2dxVOU5Sx8CwXk5roKvULQY5rAyn9log6vEAI4Oyr4vnRN24JF7/Tr
xeP1cOv2LJlEuQm1JWe+VtNqfJ+f81CnfaJMAo17W5T/5UxI5n8ziKc=
-----END CERTIFICATE-----

15
etc/certs/cert.pem Normal file
View File

@ -0,0 +1,15 @@
-----BEGIN CERTIFICATE-----
MIICZjCCAc+gAwIBAgIJAO89PfgaeHB2MA0GCSqGSIb3DQEBBQUAMHcxCzAJBgNV
BAYTAkNOMREwDwYDVQQIEwhaaGVKaWFuZzERMA8GA1UEBxMISGFuZ1pob3UxETAP
BgNVBAoTCGVtcXR0LmlvMREwDwYDVQQDEwhlbXF0dC5pbzEcMBoGCSqGSIb3DQEJ
ARYNaG9uZ0BlbXF0dC5pbzAeFw0xNjEwMTMwOTEzMTFaFw0xNjExMTIwOTEzMTFa
MEYxCzAJBgNVBAYTAkNOMREwDwYDVQQIDAhaaGVKaWFuZzERMA8GA1UEBwwISGFu
Z1pob3UxETAPBgNVBAsMCGVtcXR0LmlvMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCB
iQKBgQDaOI1oasKjo0JGk5bMIxGInlxbvTuJZ8436u8HY4q8jZ+a4G12+UdTVHRF
d94/ClHWn8WvOvzbxvmSkhninlzdWm1rBLWis3Z2kStmhL77kITEfIImus9pjm5l
OgBxY4+Q7LSEsKYYH+ClYVaLlzO8PILEkBk6xxxq0X7AnCfDfQIDAQABoyswKTAJ
BgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAPBgNVHREECDAGhwR/AAABMA0GCSqGSIb3
DQEBBQUAA4GBAGS1yw1w9H7F4uOaK02mUCHZiV+EBB3gkBBqtAx7TXsmoGgT6ySA
7DwrbX6IH82bhZT4TjouhaPlUPE9pin88d/2kNbRrbZoZDMYGq02mVVRxfLzJqM2
GVlsxebsFFPbYhOaf9TuRR3v13ebga0FrXNke+IGLsYZSM2PZ+F4EvIA
-----END CERTIFICATE-----

14
etc/certs/client-cert.pem Normal file
View File

@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICITCCAYoCCQDvPT34GnhwdzANBgkqhkiG9w0BAQUFADB3MQswCQYDVQQGEwJD
TjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MREwDwYDVQQK
EwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG9w0BCQEWDWhv
bmdAZW1xdHQuaW8wHhcNMTYxMDEzMDkxNTMxWhcNMTYxMTEyMDkxNTMxWjB3MQsw
CQYDVQQGEwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91
MREwDwYDVQQKEwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG
9w0BCQEWDWhvbmdAZW1xdHQuaW8wXDANBgkqhkiG9w0BAQEFAANLADBIAkEAx1yF
I3YnvDPtHpGzJ+9ZGnnKkvMdaoyawT9rPvLsteeDkfknJcGCV5mKmjvH1xeeMIN1
Kql9nVPoe7BtzJ0XwQIDAQABMA0GCSqGSIb3DQEBBQUAA4GBAALulKuZuE6RhwIT
JBrUN7j4dbJe7Ttz+Q3qSQq6hNJoDf8hNrAHUDQzov9yU/KMMi9xE6+hu+ieuTo6
hKLBDAD4hDzb6+EU5HAcASDkAXWnQq/Keo73+VrmUwMQs93tTC/jGXpsj/gLMEWB
xcxXpgBPDGIR9L8Y2YMhEBLjm7Zv
-----END CERTIFICATE-----

9
etc/certs/client-key.pem Normal file
View File

@ -0,0 +1,9 @@
-----BEGIN RSA PRIVATE KEY-----
MIIBOwIBAAJBAMdchSN2J7wz7R6RsyfvWRp5ypLzHWqMmsE/az7y7LXng5H5JyXB
gleZipo7x9cXnjCDdSqpfZ1T6HuwbcydF8ECAwEAAQJAOGtblmyS1DVRzsvnCs82
xUJgbPP2iDfgd/4tqLPw/41T9d4RimhfNMUF9n+9IZPCGPXGGc4OYQttNq+w/BG/
vQIhAPlZCz+fI1OEcqNB5BjYRrZU+6KtkSaKXRL/2e2yAmbTAiEAzK4WOv+Zs8x7
aG9pO2SOy38qCBOq1xfohFJnPaWaEpsCIQCE1zaR75NfdEmqxnjh759ElmP1WCjj
coWBkMMmylZTNwIgAeFPfvc+GDK2p3zugIcp8KCYaD6WASfNEPoYzK4qviUCIQDt
sTM3JZeInrLoDJwhgrmMFDjlf7XZ+LF7uYDRuE+7jw==
-----END RSA PRIVATE KEY-----

15
etc/certs/key.pem Normal file
View File

@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQDaOI1oasKjo0JGk5bMIxGInlxbvTuJZ8436u8HY4q8jZ+a4G12
+UdTVHRFd94/ClHWn8WvOvzbxvmSkhninlzdWm1rBLWis3Z2kStmhL77kITEfIIm
us9pjm5lOgBxY4+Q7LSEsKYYH+ClYVaLlzO8PILEkBk6xxxq0X7AnCfDfQIDAQAB
AoGBAKyew61vllxfjtPJeCYvL3WE38ZqIKiHBufQ3hhYM60H0tNu6OiONE/EpN02
/wWbIjXG2VfOL6ui8FVzYSqU3xt8kD+zs3+Q4Qz+UEe9bwLEysswWyQA/YJtOS60
FPxedT/gs0SAQP4MAc/FeFXOXVSzX4nVMoVeLpjvakh3hQNBAkEA8yObp8U6WCdE
p5TBQl5CyYJq4lyZ+d8DLp/v6gK7P4Nm8tTeSNmOtoG2nA6/VtSI75jm9yl5owzF
CZn3654GSQJBAOXDhcQhEjR+NMUR62a1YRaBK36ZtiJXFdIV4g16BUBp6q4UDAiV
3GBVuCYsykXT6V+3cR0vvUQUbpfAUl1BQ5UCQQCDytVgx2OszPxF6jgnhXimSe8t
7Av6iYvsBf3B1uEwuEVhc0laK7NT8lPNm6DTrDjdxv/LEcxBOXbEkZT1Pp8hAkB3
tvdkqKKWrUeLgvm3azwqAKWL8kUfAWcCLpq40OIZnNZFW3alpofLvf4UDfRai76m
O6t5PJ2N8mNpODDyHAY9AkAqbnmnmbswN+ayB80357N8298GLBqlG+A6YHps7SnR
K+4poUZgdhs0e5zh7jAR7cyQuQnKC7LMR0ZRH1WTs97V
-----END RSA PRIVATE KEY-----

283
etc/emq.conf Normal file
View File

@ -0,0 +1,283 @@
##--------------------------------------------------------------------
## Node Args
##--------------------------------------------------------------------
## Node name
node.name = emqttd@127.0.0.1
## Cookie for distributed node
node.cookie = emq_dist_cookie
## SMP support: enable, auto, disable
node.smp = auto
## Enable kernel poll
node.kernel_poll = on
## async thread pool
node.async_threads = 32
## Erlang Process Limit
node.process_limit = 256000
## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 65536
## Set the distribution buffer busy limit (dist_buf_busy_limit)
node.dist_buffer_size = 32MB
## Max ETS Tables.
## Note that mnesia and SSL will create temporary ets tables.
node.max_ets_tables = 256000
## Tweak GC to run more often
node.fullsweep_after = 1000
## Crash dump
node.crash_dump = log/crash.dump
## Distributed node ticktime
node.dist_net_ticktime = 60
## Distributed node port range
## node.dist_listen_min = 6000
## node.dist_listen_max = 6999
##--------------------------------------------------------------------
## Log
##--------------------------------------------------------------------
## Console log. Enum: off, file, console, both
log.console = console
## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
log.console.level = error
## Console log file
## log.console.file = log/console.log
## Error log file
log.error.file = log/error.log
## Enable the crash log. Enum: on, off
log.crash = on
log.crash.file = log/crash.log
##--------------------------------------------------------------------
## MQTT Protocol
##--------------------------------------------------------------------
## Max ClientId Length Allowed.
mqtt.max_clientid_len = 1024
## Max Packet Size Allowed, 64K by default.
mqtt.max_packet_size = 64KB
## Client Idle Timeout (Second)
mqtt.client_idle_timeout = 30
## Allow Anonymous authentication
mqtt.allow_anonymous = true
## Default ACL File
mqtt.acl_file = etc/acl.conf
##--------------------------------------------------------------------
## MQTT Session
##--------------------------------------------------------------------
## Max number of QoS 1 and 2 messages that can be “inflight” at one time.
## 0 means no limit
mqtt.session.max_inflight = 100
## Retry interval for redelivering QoS1/2 messages.
mqtt.session.retry_interval = 60
## Awaiting PUBREL Timeout
mqtt.session.await_rel_timeout = 20
## Max Packets that Awaiting PUBREL, 0 means no limit
mqtt.session.max_awaiting_rel = 0
## Statistics Collection Interval(seconds)
mqtt.session.collect_interval = 0
## Expired after 1 day:
## w - week
## d - day
## h - hour
## m - minute
## s - second
mqtt.session.expired_after = 1d
##--------------------------------------------------------------------
## MQTT Queue
##--------------------------------------------------------------------
## Type: simple | priority
mqtt.queue.type = simple
## Topic Priority: 0~255, Default is 0
## mqtt.queue.priority = topic/1=10,topic/2=8
## Max queue length. Enqueued messages when persistent client disconnected,
## or inflight window is full.
mqtt.queue.max_length = infinity
## Low-water mark of queued messages
mqtt.queue.low_watermark = 20%
## High-water mark of queued messages
mqtt.queue.high_watermark = 60%
## Queue Qos0 messages?
mqtt.queue.qos0 = true
##--------------------------------------------------------------------
## MQTT Broker and PubSub
##--------------------------------------------------------------------
## System Interval of publishing broker $SYS Messages
mqtt.broker.sys_interval = 60
## PubSub Pool Size. Default should be scheduler numbers.
mqtt.pubsub.pool_size = 8
mqtt.pubsub.by_clientid = true
## Subscribe Asynchronously
mqtt.pubsub.async = true
##--------------------------------------------------------------------
## MQTT Bridge
##--------------------------------------------------------------------
## Bridge Queue Size
mqtt.bridge.max_queue_len = 10000
## Ping Interval of bridge node. Unit: Second
mqtt.bridge.ping_down_interval = 1
##-------------------------------------------------------------------
## MQTT Plugins
##-------------------------------------------------------------------
## Dir of plugins' config
mqtt.plugins.etc_dir = etc/plugins/
## File to store loaded plugin names.
mqtt.plugins.loaded_file = data/loaded_plugins
##-------------------------------------------------------------------
## MQTT Modules
##-------------------------------------------------------------------
## Enable retainer module
mqtt.module.retainer = on
## disc: disc_copies, ram: ram_copies
mqtt.module.retainer.storage_type = ram
## Max number of retained messages
mqtt.module.retainer.max_message_num = 100000
## Max Payload Size of retained message
mqtt.module.retainer.max_payload_size = 64KB
## Expired after seconds, never expired if 0
mqtt.module.retainer.expired_after = 0
## Enable presence module
## Client presence management module. Publish presence messages when
## client connected or disconnected.
mqtt.module.presence = on
mqtt.module.presence.qos = 0
## Enable subscription module
## Subscribe topics automatically when client connected
mqtt.module.subscription = on
mqtt.module.subscription.topics = $client/%c=1,$user/%u=1
##--------------------------------------------------------------------
## MQTT Listeners
##--------------------------------------------------------------------
## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
mqtt.listener.tcp = 1883
## Size of acceptor pool
mqtt.listener.tcp.acceptors = 8
## Maximum number of concurrent clients
mqtt.listener.tcp.max_clients = 1024
## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.listener.tcp.rate_limit = 100,10
## TCP Socket Options
mqtt.listener.tcp.backlog = 1024
## mqtt.listener.tcp.recbuf = 4096
## mqtt.listener.tcp.sndbuf = 4096
## mqtt.listener.tcp.buffer = 4096
## mqtt.listener.tcp.nodelay = true
## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883
mqtt.listener.ssl = 8883
## Size of acceptor pool
mqtt.listener.ssl.acceptors = 4
## Maximum number of concurrent clients
mqtt.listener.ssl.max_clients = 512
## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.listener.ssl.rate_limit = 100,10
## Configuring SSL Options
## See http://erlang.org/doc/man/ssl.html
mqtt.listener.ssl.handshake_timeout = 15
mqtt.listener.ssl.keyfile = etc/certs/key.pem
mqtt.listener.ssl.certfile = etc/certs/cert.pem
mqtt.listener.ssl.cacertfile = etc/certs/cacert.pem
## mqtt.listener.ssl.verify = verify_peer
## mqtt.listener.ssl.failed_if_no_peer_cert = true
## HTTP Listener
mqtt.listener.http = 8083
mqtt.listener.http.acceptors = 4
mqtt.listener.http.max_clients = 64
## HTTP(SSL) Listener
## mqtt.listener.https = 8084
## mqtt.listener.https.acceptors = 4
## mqtt.listener.https.max_clients = 64
## mqtt.listener.https.handshake_timeout = 10
## mqtt.listener.https.certfile = etc/certs/cert.pem
## mqtt.listener.https.keyfile = etc/certs/key.pem
## mqtt.listener.https.cacertfile = etc/certs/cacert.pem
## mqtt.listener.https.verify = verify_peer
## mqtt.listener.https.failed_if_no_peer_cert = true
##-------------------------------------------------------------------
## System Monitor
##-------------------------------------------------------------------
## Long GC, don't monitor in production mode for:
## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
sysmon.long_gc = false
## Long Schedule(ms)
sysmon.long_schedule = 240
## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
sysmon.large_heap = 8MB
## Busy Port
sysmon.busy_port = false
## Busy Dist Port
sysmon.busy_dist_port = true

758
priv/emq.schema Normal file
View File

@ -0,0 +1,758 @@
%%-*- mode: erlang -*-
%% EMQ config mapping
%%--------------------------------------------------------------------
%% Erlang Node
%%--------------------------------------------------------------------
%% @doc Erlang node name
{mapping, "node.name", "vm_args.-name", [
{default, "emqttd@127.0.0.1"}
]}.
%% @doc Secret cookie for distributed erlang node
{mapping, "node.cookie", "vm_args.-setcookie", [
{default, "emqsecretcookie"}
]}.
%% @doc SMP Support
{mapping, "node.smp", "vm_args.-smp", [
{default, auto},
{datatype, {enum, [enable, auto, disable]}},
hidden
]}.
%% @doc Enable Kernel Poll
{mapping, "node.kernel_poll", "vm_args.+K", [
{default, on},
{datatype, flag},
hidden
]}.
%% @doc More information at: http://erlang.org/doc/man/erl.html
{mapping, "node.async_threads", "vm_args.+A", [
{default, 64},
{datatype, integer},
{validators, ["range:0-1024"]}
]}.
%% @doc Erlang Process Limit
{mapping, "node.process_limit", "vm_args.+P", [
{datatype, integer},
{default, 256000},
hidden
]}.
%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q
%% @doc The number of concurrent ports/sockets
%% Valid range is 1024-134217727
{mapping, "node.max_ports",
cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [
{default, 262144},
{datatype, integer},
{validators, ["range4ports"]}
]}.
{validator, "range4ports", "must be 1024 to 134217727",
fun(X) -> X >= 1024 andalso X =< 134217727 end}.
%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [
{datatype, bytesize},
{commented, "32MB"},
hidden,
{validators, ["zdbbl_range"]}
]}.
{translation, "vm_args.+zdbbl",
fun(Conf) ->
ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined),
case ZDBBL of
undefined -> undefined;
X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes;
_ -> undefined
end
end
}.
{validator, "zdbbl_range", "must be between 1KB and 2097151KB",
fun(ZDBBL) ->
%% 2097151KB = 2147482624
ZDBBL >= 1024 andalso ZDBBL =< 2147482624
end
}.
%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2
{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [
{default, 1000},
{datatype, integer},
hidden,
{validators, ["positive_integer"]}
]}.
{validator, "positive_integer", "must be a positive integer",
fun(X) -> X >= 0 end}.
%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES,
%% R16+ uses +e
%% @doc The ETS table limit
{mapping, "node.max_ets_tables",
cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [
{default, 256000},
{datatype, integer},
hidden
]}.
%% @doc Set the location of crash dumps
{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [
{default, "{{crash_dump}}"},
{datatype, file},
hidden
]}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [
{commented, 60},
{datatype, integer},
hidden
]}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html
{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [
{commented, 6000},
{datatype, integer},
hidden
]}.
%% @see node.dist_listen_min
{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [
{commented, 6999},
{datatype, integer},
hidden
]}.
%%--------------------------------------------------------------------
%% Log
%%--------------------------------------------------------------------
{mapping, "log.console", "lager.handlers", [
{default, file },
{datatype, {enum, [off, file, console, both]}}
]}.
{mapping, "log.console.level", "lager.handlers", [
{default, info},
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
]}.
{mapping, "log.console.file", "lager.handlers", [
{default, "log/console.log"},
{datatype, file}
]}.
{mapping, "log.error.file", "lager.handlers", [
{default, "log/error.log"},
{datatype, file}
]}.
{mapping, "log.error.redirect", "lager.error_logger_redirect", [
{default, on},
{datatype, flag},
hidden
]}.
{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [
{default, 1000},
{datatype, integer},
hidden
]}.
{translation,
"lager.handlers",
fun(Conf) ->
ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of
undefined -> [];
ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
{level, error},
{size, 10485760},
{date, "$D0"},
{count, 5}]}]
end,
ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
ConsoleHandler = {lager_console_backend, ConsoleLogLevel},
ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
{level, ConsoleLogLevel},
{size, 10485760},
{date, "$D0"},
{count, 5}]},
ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
off -> [];
file -> [ConsoleFileHandler];
console -> [ConsoleHandler];
both -> [ConsoleHandler, ConsoleFileHandler];
_ -> []
end,
ConsoleHandlers ++ ErrorHandler
end
}.
{mapping, "log.crash", "lager.crash_log", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.crash.file", "lager.crash_log", [
{default, "log/crash.log"},
{datatype, file}
]}.
{translation,
"lager.crash_log",
fun(Conf) ->
case cuttlefish:conf_get("log.crash", Conf) of
false -> undefined;
_ ->
cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
end
end}.
{mapping, "sasl", "sasl.sasl_error_logger", [
{default, off},
{datatype, flag},
hidden
]}.
%%--------------------------------------------------------------------
%% MQTT Protocol
%%--------------------------------------------------------------------
%% @doc Set the Max ClientId Length Allowed.
{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [
{default, 1024},
{datatype, integer}
]}.
%% @doc Max Packet Size Allowed, 64K by default.
{mapping, "mqtt.max_packet_size", "emqttd.protocol", [
{default, "64KB"},
{datatype, bytesize}
]}.
%% @doc Client Idle Timeout.
{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [
{default, 30},
{datatype, integer}
]}.
{translation, "emqttd.protocol", fun(Conf) ->
[{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
{max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
{client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}]
end}.
%% @doc Allow Anonymous
{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [
{default, false},
{datatype, {enum, [true, false]}},
hidden
]}.
%% @doc Default ACL File
{mapping, "mqtt.acl_file", "emqttd.acl_file", [
{datatype, string},
hidden
]}.
%%--------------------------------------------------------------------
%% MQTT Session
%%--------------------------------------------------------------------
%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
%% 0 means no limit
{mapping, "mqtt.session.max_inflight", "emqttd.session", [
{default, 100},
{datatype, integer}
]}.
%% @doc Retry interval for redelivering QoS1/2 messages.
{mapping, "mqtt.session.retry_interval", "emqttd.session", [
{default, 60},
{datatype, integer}
]}.
%% @doc Awaiting PUBREL Timeout
{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
{default, 30},
{datatype, integer}
]}.
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [
{default, 0},
{datatype, integer}
]}.
%% @doc Statistics Collection Interval(seconds)
{mapping, "mqtt.session.collect_interval", "emqttd.session", [
{default, 0},
{datatype, integer}
]}.
%% @doc Session expired after...
{mapping, "mqtt.session.expired_after", "emqttd.session", [
{default, "2d"},
{datatype, {duration, s}}
]}.
{translation, "emqttd.session", fun(Conf) ->
[{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
{retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
{await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
{max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
{collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)},
{expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}]
end}.
%%--------------------------------------------------------------------
%% MQTT Queue
%%--------------------------------------------------------------------
%% @doc Type: simple | priority
{mapping, "mqtt.queue.type", "emqttd.queue", [
{default, simple},
{datatype, atom}
]}.
%% @doc Topic Priority: 0~255, Default is 0
{mapping, "mqtt.queue.priority", "emqttd.queue", [
{default, ""},
{datatype, string},
hidden
]}.
%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full.
{mapping, "mqtt.queue.max_length", "emqttd.queue", [
{default, infinity},
{datatype, [atom, integer]}
]}.
%% @doc Low-water mark of queued messages
{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [
{default, "20%"},
{datatype, string},
hidden
]}.
%% @doc High-water mark of queued messages
{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [
{default, "60%"},
{datatype, string},
hidden
]}.
%% @doc Queue Qos0 messages?
{mapping, "mqtt.queue.qos0", "emqttd.queue", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqttd.queue", fun(Conf) ->
Parse = fun(S) ->
{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
list_to_integer(N) / 100
end,
Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)},
{max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)},
{low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))},
{high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))},
{queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}],
case cuttlefish:conf_get("mqtt.queue.priority", Conf) of
undefined -> Opts;
V -> [{priority,
[begin [T, P] = string:tokens(S, "="),
{T, list_to_integer(P)}
end || S <- string:tokens(V, ",")]}|Opts]
end
end}.
%%--------------------------------------------------------------------
%% MQTT Broker
%%--------------------------------------------------------------------
{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [
{default, 60},
{datatype, integer}
]}.
%%--------------------------------------------------------------------
%% MQTT PubSub
%%--------------------------------------------------------------------
{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [
{default, true},
{datatype, {enum, [true, false]}},
hidden
]}.
{translation, "emqttd.pubsub", fun(Conf) ->
[{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)},
{by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)},
{async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}]
end}.
%%--------------------------------------------------------------------
%% MQTT Bridge
%%--------------------------------------------------------------------
{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [
{default, 10000},
{datatype, integer}
]}.
{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [
{default, 1},
{datatype, integer}
]}.
{translation, "emqttd.bridge", fun(Conf) ->
[{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)},
{ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}]
end}.
%%-------------------------------------------------------------------
%% MQTT Plugins
%%-------------------------------------------------------------------
{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [
{datatype, string}
]}.
{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [
{datatype, string}
]}.
%%--------------------------------------------------------------------
%% MQTT Listeners
%%--------------------------------------------------------------------
{mapping, "mqtt.listener.tcp", "emqttd.listeners", [
{default, 1883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [
{default, undefined},
{datatype, string},
hidden
]}.
{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "mqtt.listener.ssl", "emqttd.listeners", [
{default, 8883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [
{default, 512},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [
{default, 15},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [
{datatype, {enum, [true, false]}}
]}.
{mapping, "mqtt.listener.http", "emqttd.listeners", [
{default, 8883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [
{default, 64},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https", "emqttd.listeners", [
{default, undefined},
{datatype, [integer, ip]},
hidden
]}.
{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [
{default, 64},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [
{default, 15},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.https.failed_if_no_peer_cert", "emqttd.listeners", [
{datatype, {enum, [true, false]}}
]}.
{translation, "emqttd.listeners", fun(Conf) ->
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
LisOpts = fun(Prefix) ->
Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
{max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)},
{rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}])
end,
TcpOpts = fun(Prefix) ->
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}])
end,
SslOpts = fun(Prefix) ->
Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf)},
{keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
{certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
{cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)},
{verify, cuttlefish:conf_get(Prefix ++ ".verify_peer", Conf, undefined)},
{failed_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ "failed_if_no_peer_cert", Conf, undefined)}])
end,
Listeners = fun(Name) when is_atom(Name) ->
Key = "mqtt.listener." ++ atom_to_list(Name),
case cuttlefish:conf_get(Key, Conf, undefined) of
undefined ->
[];
Port ->
ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]),
Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)],
[{Name, Port, case Name =:= ssl orelse Name =:= https of
true -> [{ssl, SslOpts(Key)} | Opts];
false -> Opts
end}]
end
end,
lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)])
end}.
%%--------------------------------------------------------------------
%% MQTT Modules
%%--------------------------------------------------------------------
{mapping, "mqtt.module.retainer", "emqttd.modules", [
{default, on},
{datatype, flag}
]}.
{mapping, "mqtt.module.retainer.storage_type", "emqttd.modules", [
{default, ram},
{datatype, {enum, [disc, ram]}}
]}.
{mapping, "mqtt.module.retainer.max_message_num", "emqttd.modules", [
{default, 100000},
{datatype, integer}
]}.
{mapping, "mqtt.module.retainer.max_payload_size", "emqttd.modules", [
{default, "64KB"},
{datatype, bytesize}
]}.
{mapping, "mqtt.module.retainer.expired_after", "emqttd.modules", [
{default, 0},
{datatype, integer}
]}.
{mapping, "mqtt.module.presence", "emqttd.modules", [
{default, on},
{datatype, flag}
]}.
{mapping, "mqtt.module.presence.qos", "emqttd.modules", [
{default, 0},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "mqtt.module.subscription", "emqttd.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "mqtt.module.subscription.topics", "emqttd.modules", [
{default, undefined},
{datatype, string}
]}.
{translation, "emqttd.modules", fun(Conf) ->
WithMod = fun(Name, OptsF) ->
Key = "mqtt.module." ++ atom_to_list(Name),
case cuttlefish:conf_get(Key, Conf, false) of
true -> [{Name, OptsF(Key)}];
false -> []
end
end,
RetainOpts = fun(Prefix) ->
[{storage_type, cuttlefish:conf_get(Prefix ++ ".storage_type", Conf, ram)},
{max_message_num, cuttlefish:conf_get(Prefix ++ ".max_message_num", Conf, undefined)},
{max_payload_size, cuttlefish:conf_get(Prefix ++ ".max_payload_size", Conf, undefined)},
{expired_after, cuttlefish:conf_get(Prefix ++ ".expired_after", Conf, 0)}]
end,
PresOpts = fun(Prefix) ->
[{qos, cuttlefish:conf_get(Prefix ++ ".qos", Conf, 0)}]
end,
ParseFun = fun(undefined) -> [];
(Topics) -> [begin
[Topic, Qos] = string:tokens(S, "="),
{list_to_binary(Topic), list_to_integer(Qos)}
end || S <- string:tokens(Topics, ",")]
end,
SubOpts = fun(Prefix) -> [{topics, ParseFun(cuttlefish:conf_get(Prefix ++ ".topics", Conf))}] end,
lists:append([WithMod(retainer, RetainOpts), WithMod(presence, PresOpts), WithMod(subscription, SubOpts)])
end}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------
%% @doc Long GC, don't monitor in production mode for:
%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
{mapping, "sysmon.long_gc", "emqttd.sysmon", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
%% @doc Long Schedule(ms)
{mapping, "sysmon.long_schedule", "emqttd.sysmon", [
{default, 1000},
{datatype, integer}
]}.
%% @doc Large Heap
{mapping, "sysmon.large_heap", "emqttd.sysmon", [
{default, "8MB"},
{datatype, bytesize}
]}.
%% @doc Monitor Busy Port
{mapping, "sysmon.busy_port", "emqttd.sysmon", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
%% @doc Monitor Busy Dist Port
{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqttd.sysmon", fun(Conf) ->
[{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)},
{long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)},
{large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)},
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
end}.

View File

@ -1,4 +1,4 @@
{deps, [
{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}}
{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}},{getopt,".*",{git,"https://github.com/jcomellas/getopt","v0.8.2"}},{pbkdf2,".*",{git,"https://github.com/basho/erlang-pbkdf2","2.0.0"}},{clique,".*",{git,"https://github.com/basho/clique",""}},{time_compat,".*",{git,"https://github.com/lasp-lang/time_compat",""}},{rand_compat,".*",{git,"https://github.com/lasp-lang/rand_compat",""}}
]}.
{erl_opts, [{parse_transform,lager_transform}]}.

View File

@ -1,12 +0,0 @@
{application, emqttd,
[
{description, "Erlang MQTT Broker"},
{vsn, "2.0"},
{id, "emqttd"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, gproc, esockd, mochiweb,
gen_logger, gen_conf]},
{mod, {emqttd_app, []}},
{env, []}
]}.

View File

@ -22,7 +22,7 @@
-include("emqttd_protocol.hrl").
-export([start/0, conf/1, conf/2, env/1, env/2, is_running/1]).
-export([start/0, env/1, env/2, is_running/1]).
%% PubSub API
-export([subscribe/1, subscribe/2, subscribe/3, publish/1,
@ -57,15 +57,8 @@
-spec(start() -> ok | {error, any()}).
start() -> application:start(?APP).
%% @doc Get Config
-spec(conf(Key :: atom()) -> any()).
conf(Key) -> emqttd_conf:value(Key).
-spec(conf(Key :: atom(), Default :: any()) -> any()).
conf(Key, Default) -> emqttd_conf:value(Key, Default).
%% @doc Environment
-spec(env(Key:: atom()) -> any()).
-spec(env(Key:: atom()) -> {ok, any()} | undefined).
env(Key) -> application:get_env(?APP, Key).
%% @doc Get environment

View File

@ -56,7 +56,10 @@ start_link() ->
auth(Client, Password) when is_record(Client, mqtt_client) ->
auth(Client, Password, lookup_mods(auth)).
auth(_Client, _Password, []) ->
{error, "No auth module to check!"};
case emqttd:env(allow_anonymous, false) of
true -> ok;
false -> {error, "No auth module to check!"}
end;
auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
case catch Mod:check(Client, Password, State) of
ok -> ok;
@ -73,7 +76,10 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
Topic :: binary()).
check_acl(Client, PubSub, Topic) when ?PUBSUB(PubSub) ->
case lookup_mods(acl) of
[] -> allow;
[] -> case emqttd:env(allow_anonymous, false) of
true -> allow;
false -> deny
end;
AclMods -> check_acl(Client, PubSub, Topic, AclMods)
end.
check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) ->
@ -120,21 +126,13 @@ tab_key(acl) -> acl_modules.
stop() -> gen_server:call(?MODULE, stop).
%%--------------------------------------------------------------------
%% gen_server callbacks
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([]) ->
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]),
ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}),
ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(gen_conf:list(emqttd, acl))}),
{ok, #state{}}.
init_mods(Mods) ->
[init_mod(mod_name(Type, Name), Opts) || {Type, Name, Opts} <- Mods].
init_mod(Mod, Opts) ->
{ok, State} = Mod:init(Opts), {Mod, State, 0}.
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
Mods = lookup_mods(Type),
Existed = lists:keyfind(Mod, 1, Mods),
@ -186,13 +184,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
mod_name(auth, Name) -> mod(emqttd_auth_, Name);
mod_name(acl, Name) -> mod(emqttd_acl_, Name).
mod(Prefix, Name) ->
list_to_atom(lists:concat([Prefix, Name])).
if_existed(false, Fun) -> Fun();
if_existed(_Mod, _Fun) -> {error, already_existed}.

View File

@ -1,35 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_acl_anonymous).
-behaviour(emqttd_acl_mod).
%% ACL callbacks
-export([init/1, check_acl/2, reload_acl/1, description/0]).
init(Opts) ->
{ok, Opts}.
check_acl(_Who, _State) ->
allow.
reload_acl(_State) ->
ok.
description() ->
"Anonymous ACL".

View File

@ -46,18 +46,12 @@ all_rules() ->
%%--------------------------------------------------------------------
%% @doc Init internal ACL
-spec(init(Opts :: list()) -> {ok, State :: any()}).
init(Opts) ->
-spec(init([File :: string()]) -> {ok, State :: any()}).
init([File]) ->
ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]),
case proplists:get_value(config, Opts) of
undefined ->
{ok, #state{}};
File ->
Default = proplists:get_value(nomatch, Opts, allow),
State = #state{config = File, nomatch = Default},
true = load_rules_from_file(State),
{ok, State}
end.
State = #state{config = File},
true = load_rules_from_file(State),
{ok, State}.
load_rules_from_file(#state{config = AclFile}) ->
{ok, Terms} = file:consult(AclFile),
@ -118,7 +112,7 @@ reload_acl(#state{config = undefined}) ->
reload_acl(State) ->
case catch load_rules_from_file(State) of
{'EXIT', Error} -> {error, Error};
_ -> ok
true -> ok
end.
%% @doc ACL Module Description

View File

@ -42,11 +42,11 @@
Reason :: term()).
start(_StartType, _StartArgs) ->
print_banner(),
emqttd_conf:init(),
emqttd_mnesia:start(),
{ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup),
emqttd_cli:load(),
register_acl_mod(),
load_all_mods(),
emqttd_plugins:init(),
emqttd_plugins:load(),
@ -141,15 +141,25 @@ worker_spec(Module, Opts) when is_atom(Module) ->
worker_spec(M, F, A) ->
{M, {M, F, A}, permanent, 10000, worker, [M]}.
%%--------------------------------------------------------------------
%% Register default ACL File
%%--------------------------------------------------------------------
register_acl_mod() ->
case emqttd:env(acl_file) of
{ok, File} -> emqttd_access_control:register_mod(acl, emqttd_acl_internal, [File]);
undefined -> ok
end.
%%--------------------------------------------------------------------
%% Load Modules
%%--------------------------------------------------------------------
%% @doc load all modules
%% @doc Load all modules
load_all_mods() ->
lists:foreach(fun load_mod/1, gen_conf:list(emqttd, module)).
lists:foreach(fun load_mod/1, emqttd:env(modules, [])).
load_mod({module, Name, Opts}) ->
load_mod({Name, Opts}) ->
Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
case catch Mod:load(Opts) of
ok -> lager:info("Load module ~s successfully", [Name]);
@ -159,7 +169,7 @@ load_mod({module, Name, Opts}) ->
%% @doc Is module enabled?
-spec(is_mod_enabled(Name :: atom()) -> boolean()).
is_mod_enabled(Name) -> lists:keyfind(Name, 2, gen_conf:list(emqttd, module)).
is_mod_enabled(Name) -> lists:keyfind(Name, 1, emqttd:env(modules, [])).
%%--------------------------------------------------------------------
%% Start Listeners
@ -167,27 +177,28 @@ is_mod_enabled(Name) -> lists:keyfind(Name, 2, gen_conf:list(emqttd, module)).
%% @doc Start Listeners of the broker.
-spec(start_listeners() -> any()).
start_listeners() -> lists:foreach(fun start_listener/1, gen_conf:list(emqttd, listener)).
start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners, [])).
%% Start mqtt listener
-spec(start_listener(listener()) -> any()).
start_listener({listener, mqtt, ListenOn, Opts}) ->
start_listener(mqtt, ListenOn, Opts);
start_listener({tcp, ListenOn, Opts}) ->
start_listener('mqtt:tcp', ListenOn, Opts);
%% Start mqtt(SSL) listener
start_listener({listener, mqtts, ListenOn, Opts}) ->
start_listener(mqtts, ListenOn, Opts);
start_listener({ssl, ListenOn, Opts}) ->
start_listener('mqtt:ssl', ListenOn, Opts);
%% Start http listener
start_listener({listener, http, ListenOn, Opts}) ->
mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []});
start_listener({http, ListenOn, Opts}) ->
mochiweb:start_http('mqtt:http', ListenOn, Opts, {emqttd_http, handle_request, []});
%% Start https listener
start_listener({listener, https, ListenOn, Opts}) ->
mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}).
start_listener({https, ListenOn, Opts}) ->
mochiweb:start_http('mqtt:https', ListenOn, Opts, {emqttd_http, handle_request, []}).
start_listener(Protocol, ListenOn, Opts) ->
MFArgs = {emqttd_client, start_link, [emqttd_conf:mqtt()]},
{ok, Env} = emqttd:env(protocol),
MFArgs = {emqttd_client, start_link, [Env]},
{ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs).
merge_sockopts(Options) ->
@ -200,9 +211,11 @@ merge_sockopts(Options) ->
%%--------------------------------------------------------------------
%% @doc Stop Listeners
stop_listeners() -> lists:foreach(fun stop_listener/1, gen_conf:list(listener)).
stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners, [])).
%% @private
stop_listener({listener, tcp, ListenOn, _Opts}) -> esockd:close('mqtt/tcp', ListenOn);
stop_listener({listener, ssl, ListenOn, _Opts}) -> esockd:close('mqtt/ssl', ListenOn);
stop_listener({listener, Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn).
-ifdef(TEST).

View File

@ -1,29 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Anonymous Authentication Module
-module(emqttd_auth_anonymous).
-behaviour(emqttd_auth_mod).
-export([init/1, check/3, description/0]).
init(Opts) -> {ok, Opts}.
check(_Client, _Password, _Opts) -> ok.
description() -> "Anonymous Authentication Module".

View File

@ -1,123 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_auth_clientid).
-include("emqttd.hrl").
-export([add_clientid/1, add_clientid/2, lookup_clientid/1, remove_clientid/1,
all_clientids/0]).
-behaviour(emqttd_auth_mod).
%% emqttd_auth_mod callbacks
-export([init/1, check/3, description/0]).
-define(AUTH_CLIENTID_TAB, mqtt_auth_clientid).
-record(?AUTH_CLIENTID_TAB, {client_id, ipaddr, password}).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
%% @doc Add clientid
-spec(add_clientid(binary()) -> {atomic, ok} | {aborted, any()}).
add_clientid(ClientId) when is_binary(ClientId) ->
R = #mqtt_auth_clientid{client_id = ClientId},
mnesia:transaction(fun mnesia:write/1, [R]).
%% @doc Add clientid with password
-spec(add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}).
add_clientid(ClientId, Password) ->
R = #mqtt_auth_clientid{client_id = ClientId, password = Password},
mnesia:transaction(fun mnesia:write/1, [R]).
%% @doc Lookup clientid
-spec(lookup_clientid(binary()) -> list(#mqtt_auth_clientid{})).
lookup_clientid(ClientId) ->
mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
%% @doc Lookup all clientids
-spec(all_clientids() -> list(binary())).
all_clientids() -> mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
%% @doc Remove clientid
-spec(remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}).
remove_clientid(ClientId) ->
mnesia:transaction(fun mnesia:delete/1, [{?AUTH_CLIENTID_TAB, ClientId}]).
%%--------------------------------------------------------------------
%% emqttd_auth_mod callbacks
%%--------------------------------------------------------------------
init(Opts) ->
mnesia:create_table(?AUTH_CLIENTID_TAB, [
{ram_copies, [node()]},
{attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]),
mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies),
Clients = load_client_from(proplists:get_value(config, Opts)),
mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end),
{ok, Opts}.
check(#mqtt_client{client_id = undefined}, _Password, _Opts) ->
{error, clientid_undefined};
check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, []) ->
check_clientid_only(ClientId, IpAddress);
check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, [{password, no}|_]) ->
check_clientid_only(ClientId, IpAddress);
check(_Client, undefined, [{password, yes}|_]) ->
{error, password_undefined};
check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
[] -> {error, clientid_not_found};
[#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext??
_ -> {error, password_error}
end.
description() -> "ClientId authentication module".
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
load_client_from(undefined) ->
ok;
load_client_from(File) ->
{ok, Clients} = file:consult(File),
[client(Client) || Client <- Clients].
client(ClientId) when is_list(ClientId) ->
#mqtt_auth_clientid{client_id = list_to_binary(ClientId)};
client({ClientId, IpAddr}) when is_list(ClientId) ->
#mqtt_auth_clientid{client_id = iolist_to_binary(ClientId),
ipaddr = esockd_cidr:parse(IpAddr, true)}.
check_clientid_only(ClientId, IpAddr) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
[] ->
{error, clientid_not_found};
[#?AUTH_CLIENTID_TAB{ipaddr = undefined}] ->
ok;
[#?AUTH_CLIENTID_TAB{ipaddr = CIDR}] ->
case esockd_cidr:match(IpAddr, CIDR) of
true -> ok;
false -> {error, wrong_ipaddr}
end
end.

View File

@ -1,164 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Authentication with username and password
-module(emqttd_auth_username).
-include("emqttd.hrl").
-include("emqttd_cli.hrl").
%% CLI callbacks
-export([cli/1]).
-behaviour(emqttd_auth_mod).
-export([is_enabled/0]).
-export([add_user/2, remove_user/1, lookup_user/1, all_users/0]).
%% emqttd_auth callbacks
-export([init/1, check/3, description/0]).
-define(AUTH_USERNAME_TAB, mqtt_auth_username).
-record(?AUTH_USERNAME_TAB, {username, password}).
%%--------------------------------------------------------------------
%% CLI
%%--------------------------------------------------------------------
cli(["list"]) ->
if_enabled(fun() ->
Usernames = mnesia:dirty_all_keys(?AUTH_USERNAME_TAB),
[?PRINT("~s~n", [Username]) || Username <- Usernames]
end);
cli(["add", Username, Password]) ->
if_enabled(fun() ->
?PRINT("~p~n", [add_user(iolist_to_binary(Username), iolist_to_binary(Password))])
end);
cli(["del", Username]) ->
if_enabled(fun() ->
?PRINT("~p~n", [remove_user(iolist_to_binary(Username))])
end);
cli(_) ->
?USAGE([{"users list", "List users"},
{"users add <Username> <Password>", "Add User"},
{"users del <Username>", "Delete User"}]).
if_enabled(Fun) ->
case is_enabled() of
true -> Fun();
false -> hint()
end.
hint() ->
?PRINT_MSG("Please enable '{auth, username, []}' in etc/emqttd.conf first.~n").
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
is_enabled() ->
lists:member(?AUTH_USERNAME_TAB, mnesia:system_info(tables)).
%% @doc Add User
-spec(add_user(binary(), binary()) -> ok | {error, any()}).
add_user(Username, Password) ->
User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)},
ret(mnesia:transaction(fun insert_user/1, [User])).
insert_user(User = #?AUTH_USERNAME_TAB{username = Username}) ->
case mnesia:read(?AUTH_USERNAME_TAB, Username) of
[] -> mnesia:write(User);
[_|_] -> mnesia:abort(existed)
end.
add_default_user(Username, Password) when is_atom(Username) ->
add_default_user(atom_to_list(Username), Password);
add_default_user(Username, Password) ->
add_user(iolist_to_binary(Username), iolist_to_binary(Password)).
%% @doc Lookup user by username
-spec(lookup_user(binary()) -> list()).
lookup_user(Username) ->
mnesia:dirty_read(?AUTH_USERNAME_TAB, Username).
%% @doc Remove user
-spec(remove_user(binary()) -> ok | {error, any()}).
remove_user(Username) ->
ret(mnesia:transaction(fun mnesia:delete/1, [{?AUTH_USERNAME_TAB, Username}])).
ret({atomic, ok}) -> ok;
ret({aborted, Error}) -> {error, Error}.
%% @doc All usernames
-spec(all_users() -> list()).
all_users() -> mnesia:dirty_all_keys(?AUTH_USERNAME_TAB).
%%--------------------------------------------------------------------
%% emqttd_auth_mod callbacks
%%--------------------------------------------------------------------
init(Opts) ->
mnesia:create_table(?AUTH_USERNAME_TAB, [
{disc_copies, [node()]},
{attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]),
mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies),
case proplists:get_value(passwd, Opts) of
undefined -> ok;
File -> {ok, DefaultUsers} = file:consult(File),
lists:foreach(fun({Username, Password}) ->
add_default_user(Username, Password)
end, DefaultUsers)
end,
emqttd_ctl:register_cmd(users, {?MODULE, cli}, []),
{ok, Opts}.
check(#mqtt_client{username = undefined}, _Password, _Opts) ->
{error, username_undefined};
check(_User, undefined, _Opts) ->
{error, password_undefined};
check(#mqtt_client{username = Username}, Password, _Opts) ->
case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of
[] ->
{error, username_not_found};
[#?AUTH_USERNAME_TAB{password = <<Salt:4/binary, Hash/binary>>}] ->
case Hash =:= md5_hash(Salt, Password) of
true -> ok;
false -> {error, password_error}
end
end.
description() ->
"Username password authentication module".
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
hash(Password) ->
SaltBin = salt(), <<SaltBin/binary, (md5_hash(SaltBin, Password))/binary>>.
md5_hash(SaltBin, Password) ->
erlang:md5(<<SaltBin/binary, Password/binary>>).
salt() ->
emqttd_time:seed(), Salt = rand:uniform(16#ffffffff), <<Salt:32>>.

View File

@ -46,7 +46,8 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) ->
start_bridge(Node, _Topic, _Options) when Node =:= node() ->
{error, bridge_to_self};
start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) ->
Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options),
{ok, BridgeEnv} = emqttd:env(bridge),
Options1 = emqttd_opts:merge(BridgeEnv, Options),
supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)).
%% @doc Stop a bridge

View File

@ -95,7 +95,7 @@ datetime() ->
%% @doc Start a tick timer
start_tick(Msg) ->
start_tick(timer:seconds(emqttd:conf(broker_sys_interval, 60)), Msg).
start_tick(timer:seconds(emqttd:env(broker_sys_interval, 60)), Msg).
start_tick(0, _Msg) ->
undefined;

View File

@ -1,112 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_conf).
-export([init/0]).
-export([mqtt/0, session/0, queue/0, bridge/0, pubsub/0]).
-export([value/1, value/2, list/1]).
-define(APP, emqttd).
init() -> gen_conf:init(?APP).
mqtt() ->
with_env(mqtt_protocol, [
%% Max ClientId Length Allowed.
{max_clientid_len, value(mqtt_max_clientid_len, 512)},
%% Max Packet Size Allowed, 64K by default.
{max_packet_size, value(mqtt_max_packet_size, 65536)},
%% Client Idle Timeout.
{client_idle_timeout, value(mqtt_client_idle_timeout, 30)}
]).
session() ->
with_env(mqtt_session, [
%% Max number of QoS 1 and 2 messages that can be inflight at one time.
%% 0 means no limit
{max_inflight, value(session_max_inflight, 100)},
%% Retry interval for redelivering QoS1/2 messages.
{unack_retry_interval, value(session_unack_retry_interval, 60)},
%% Awaiting PUBREL Timeout
{await_rel_timeout, value(session_await_rel_timeout, 20)},
%% Max Packets that Awaiting PUBREL, 0 means no limit
{max_awaiting_rel, value(session_max_awaiting_rel, 0)},
%% Statistics Collection Interval(seconds)
{collect_interval, value(session_collect_interval, 0)},
%% Expired after 2 day (unit: minute)
{expired_after, value(session_expired_after, 2880)}
]).
queue() ->
with_env(mqtt_queue, [
%% Type: simple | priority
{type, value(queue_type, simple)},
%% Topic Priority: 0~255, Default is 0
{priority, value(queue_priority, [])},
%% Max queue length. Enqueued messages when persistent client disconnected,
%% or inflight window is full.
{max_length, value(queue_max_length, infinity)},
%% Low-water mark of queued messages
{low_watermark, value(queue_low_watermark, 0.2)},
%% High-water mark of queued messages
{high_watermark, value(queue_high_watermark, 0.6)},
%% Queue Qos0 messages?
{queue_qos0, value(queue_qos0, true)}
]).
bridge() ->
with_env(mqtt_bridge, [
{max_queue_len, value(bridge_max_queue_len, 10000)},
%% Ping Interval of bridge node
{ping_down_interval, value(bridge_ping_down_interval, 1)}
]).
pubsub() ->
with_env(mqtt_pubsub, [
%% PubSub and Router. Default should be scheduler numbers.
{pool_size, value(pubsub_pool_size, 8)}
]).
value(Key) ->
with_env(Key, gen_conf:value(?APP, Key)).
value(Key, Default) ->
with_env(Key, gen_conf:value(?APP, Key, Default)).
with_env(Key, Conf) ->
case application:get_env(?APP, Key) of
undefined ->
application:set_env(?APP, Key, Conf), Conf;
{ok, Val} ->
Val
end.
list(Key) -> gen_conf:list(?APP, Key).

View File

@ -1,115 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_mod_rewrite).
-behaviour(emqttd_gen_mod).
-include("emqttd.hrl").
-export([load/1, reload/1, unload/1]).
-export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
load(Opts) ->
case proplists:get_value(config, Opts) of
undefined ->
ok;
File ->
{ok, Terms} = file:consult(File), Sections = compile(Terms),
emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Sections]),
emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/4, [Sections]),
emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections])
end.
rewrite_subscribe(_ClientId, _Username, TopicTable, Sections) ->
lager:info("Rewrite subscribe: ~p", [TopicTable]),
{ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_unsubscribe(_ClientId, _Username, TopicTable, Sections) ->
lager:info("Rewrite unsubscribe: ~p", [TopicTable]),
{ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) ->
%%TODO: this will not work if the client is always online.
RewriteTopic =
case get({rewrite, Topic}) of
undefined ->
DestTopic = match_topic(Topic, Sections),
put({rewrite, Topic}, DestTopic), DestTopic;
DestTopic ->
DestTopic
end,
{ok, Message#mqtt_message{topic = RewriteTopic}}.
reload(File) ->
%%TODO: The unload api is not right...
case emqttd_app:is_mod_enabled(rewrite) of
true ->
unload(state),
load([{file, File}]);
false ->
{error, module_unloaded}
end.
unload(_) ->
emqttd:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4),
emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4),
emqttd:unhook('message.publish', fun ?MODULE:rewrite_publish/2).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
compile(Sections) ->
C = fun({rewrite, Re, Dest}) ->
{ok, MP} = re:compile(Re),
{rewrite, MP, Dest}
end,
F = fun({topic, Topic, Rules}) ->
{topic, list_to_binary(Topic), [C(R) || R <- Rules]}
end,
[F(Section) || Section <- Sections].
match_topic(Topic, []) ->
Topic;
match_topic(Topic, [{topic, Filter, Rules} | Sections]) ->
case emqttd_topic:match(Topic, Filter) of
true ->
match_rule(Topic, Rules);
false ->
match_topic(Topic, Sections)
end.
match_rule(Topic, []) ->
Topic;
match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
{match, Captured} ->
Vars = lists:zip(["\\$" ++ integer_to_list(I)
|| I <- lists:seq(1, length(Captured))], Captured),
iolist_to_binary(lists:foldl(
fun({Var, Val}, Acc) ->
re:replace(Acc, Var, Val, [global])
end, Dest, Vars));
nomatch ->
match_rule(Topic, Rules)
end.

View File

@ -26,23 +26,28 @@
-export([list/0]).
%% @doc Init plugins' config
-spec(init() -> ok).
init() ->
case emqttd:conf(plugins_etc_dir) of
case emqttd:env(plugins_etc_dir) of
{ok, PluginsEtc} ->
CfgFiles = filelib:wildcard("*.conf", PluginsEtc),
lists:foreach(fun(CfgFile) ->
App = app_name(CfgFile),
application:set_env(App, conf, filename:join(PluginsEtc, CfgFile)),
gen_conf:init(App)
end, CfgFiles);
CfgFiles = [filename:join(PluginsEtc, File) ||
File <- filelib:wildcard("*.config", PluginsEtc)],
lists:foreach(fun init_config/1, CfgFiles);
undefined ->
ok
end.
init_config(CfgFile) ->
{ok, [AppsEnv]} = file:consult(CfgFile),
lists:foreach(fun({AppName, Envs}) ->
[application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
end, AppsEnv).
%% @doc Load all plugins when the broker started.
-spec(load() -> list() | {error, any()}).
load() ->
case emqttd:conf(plugins_loaded_file) of
case emqttd:env(plugins_loaded_file) of
{ok, File} ->
ensure_file(File),
with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end);
@ -75,7 +80,7 @@ load_plugins(Names, Persistent) ->
%% @doc Unload all plugins before broker stopped.
-spec(unload() -> list() | {error, any()}).
unload() ->
case emqttd:conf(plugins_loaded_file) of
case emqttd:env(plugins_loaded_file) of
{ok, File} ->
with_loaded_file(File, fun stop_plugins/1);
undefined ->
@ -89,9 +94,9 @@ stop_plugins(Names) ->
%% @doc List all available plugins
-spec(list() -> [mqtt_plugin()]).
list() ->
case emqttd:conf(plugins_etc_dir) of
{ok, PluginsEtc} ->
CfgFiles = filelib:wildcard("*.conf", PluginsEtc),
case emqttd:env(plugins_etc_dir) of
{ok, PluginsEtc} ->
CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc),
Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles],
StartedApps = names(started_app),
lists:map(fun(Plugin = #mqtt_plugin{name = Name}) ->
@ -244,7 +249,7 @@ plugin_unloaded(Name, true) ->
end.
read_loaded() ->
case emqttd:conf(plugins_loaded_file) of
case emqttd:env(plugins_loaded_file) of
{ok, File} -> read_loaded(File);
undefined -> {error, not_found}
end.
@ -252,7 +257,7 @@ read_loaded() ->
read_loaded(File) -> file:consult(File).
write_loaded(AppNames) ->
{ok, File} = emqttd:conf(plugins_loaded_file),
{ok, File} = emqttd:env(plugins_loaded_file),
case file:open(File, [binary, write]) of
{ok, Fd} ->
lists:foreach(fun(Name) ->
@ -262,3 +267,4 @@ write_loaded(AppNames) ->
lager:error("Open File ~p Error: ~p", [File, Error]),
{error, Error}
end.

View File

@ -448,7 +448,7 @@ authenticate(Client, Password) ->
%% PUBLISH ACL is cached in process dictionary.
check_acl(publish, Topic, Client) ->
IfCache = emqttd:conf(cache_acl, true),
IfCache = emqttd:env(cache_acl, true),
case {IfCache, get({acl, publish, Topic})} of
{true, undefined} ->
AllowDeny = emqttd_access_control:check_acl(Client, publish, Topic),

View File

@ -32,7 +32,7 @@
%%--------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]).
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
pubsub_pool() ->
hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]).
@ -41,10 +41,10 @@ pubsub_pool() ->
%% Supervisor Callbacks
%%--------------------------------------------------------------------
init([Env]) ->
init([]) ->
{ok, Env} = emqttd:env(pubsub),
%% Create ETS Tables
[create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]],
{ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }.
%%--------------------------------------------------------------------

View File

@ -214,8 +214,9 @@ unsubscribe(SessPid, TopicTable) ->
init([CleanSess, {ClientId, Username}, ClientPid]) ->
process_flag(trap_exit, true),
true = link(ClientPid),
SessEnv = emqttd_conf:session(),
true = link(ClientPid),
{ok, QEnv} = emqttd:env(queue),
{ok, SessEnv} = emqttd:env(session),
Session = #session{
clean_sess = CleanSess,
client_id = ClientId,
@ -224,14 +225,14 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
subscriptions = #{},
inflight_queue = [],
max_inflight = get_value(max_inflight, SessEnv, 0),
message_queue = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()),
message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
awaiting_rel = #{},
awaiting_ack = #{},
awaiting_comp = #{},
retry_interval = get_value(unack_retry_interval, SessEnv),
retry_interval = get_value(retry_interval, SessEnv),
await_rel_timeout = get_value(await_rel_timeout, SessEnv),
max_awaiting_rel = get_value(max_awaiting_rel, SessEnv),
expired_after = get_value(expired_after, SessEnv) * 60,
expired_after = get_value(expired_after, SessEnv),
collect_interval = get_value(collect_interval, SessEnv, 0),
timestamp = os:timestamp()},
emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)),

View File

@ -28,15 +28,8 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
Sysmon = {sysmon, {emqttd_sysmon, start_link, [opts()]},
{ok, Env} = emqttd:env(sysmon),
Sysmon = {sysmon, {emqttd_sysmon, start_link, [Env]},
permanent, 5000, worker, [emqttd_sysmon]},
{ok, {{one_for_one, 10, 100}, [Sysmon]}}.
opts() ->
Opts = [{long_gc, emqttd:conf(sysmon_long_gc)},
{long_schedule, emqttd:conf(sysmon_long_schedule)},
{large_heap, emqttd:conf(sysmon_large_heap)},
{busy_port, emqttd:conf(busy_port)},
{busy_dist_port, emqttd:conf(sysmon_busy_dist_port)}],
[{Key, Val} || {Key, {ok, Val}} <- Opts].

View File

@ -31,7 +31,7 @@
%% @doc Handle WebSocket Request.
handle_request(Req) ->
Peer = Req:get(peer),
PktOpts = emqttd_conf:mqtt(),
{ok, PktOpts} = emqttd:env(protocol),
ParserFun = emqttd_parser:new(PktOpts),
{ReentryWs, ReplyChannel} = upgrade(Req),
{ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),

View File

@ -27,7 +27,7 @@
%% @doc Start websocket client supervisor
-spec(start_link() -> {ok, pid()}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:mqtt()]).
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc Start a WebSocket Client
-spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}).
@ -37,8 +37,8 @@ start_client(WsPid, Req, ReplyChannel) ->
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([Env]) ->
init([]) ->
{ok, Env} = emqttd:env(protocol),
{ok, {{simple_one_for_one, 0, 1},
[{ws_client, {emqttd_ws_client, start_link, [Env]},
temporary, 5000, worker, [emqttd_ws_client]}]}}.

View File

@ -101,8 +101,8 @@ groups() ->
init_per_suite(Config) ->
application:start(lager),
DataDir = proplists:get_value(data_dir, Config),
application:set_env(emqttd, conf, filename:join([DataDir, "emqttd.conf"])),
application:ensure_all_started(emqttd),
peg_com(DataDir),
start_apps(emqttd, DataDir),
Config.
end_per_suite(_Config) ->
@ -403,7 +403,7 @@ auth_header_(User, Pass) ->
websocket_test(_) ->
Conn = esockd_connection:new(esockd_transport, nil, []),
Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1},
Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1},
mochiweb_headers:make([{"Sec-WebSocket-Key","Xn3fdKyc3qEXPuj2A3O+ZA=="}])),
ct:log("Req:~p", [Req]),
@ -593,4 +593,31 @@ slave(node, Node) ->
{ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
N.
start_apps(App, DataDir) ->
Schema = cuttlefish_schema:files([filename:join([DataDir, atom_to_list(App) ++ ".schema"])]),
Conf = conf_parse:file(filename:join([DataDir, atom_to_list(App) ++ ".conf"])),
NewConfig = cuttlefish_generator:map(Schema, Conf),
Vals = proplists:get_value(App, NewConfig),
[application:set_env(App, Par, Value) || {Par, Value} <- Vals],
application:ensure_all_started(App).
peg_com(DataDir) ->
ParsePeg = file2(3, DataDir, "conf_parse.peg"),
neotoma:file(ParsePeg),
ParseErl = file2(3, DataDir, "conf_parse.erl"),
compile:file(ParseErl, []),
DurationPeg = file2(3, DataDir, "cuttlefish_duration_parse.peg"),
neotoma:file(DurationPeg),
DurationErl = file2(3, DataDir, "cuttlefish_duration_parse.erl"),
compile:file(DurationErl, []).
file2(Times, Dir, FileName) when Times < 1 ->
filename:join([Dir, "deps", "cuttlefish","src", FileName]);
file2(Times, Dir, FileName) ->
Dir1 = filename:dirname(Dir),
file2(Times - 1, Dir1, FileName).

View File

@ -1,270 +1,280 @@
%%===================================================================
%%
%% Config file for emqttd 2.0
%%
%% Erlang Term Syntax:
%%
%% {}: Tuple, usually {Key, Value}
%% []: List, seperated by comma
%% %%: Comment
%%
%%===================================================================
##--------------------------------------------------------------------
## Node Args
##--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% MQTT Protocol
%%--------------------------------------------------------------------
## Node name
node.name = emqttd@127.0.0.1
%% Max ClientId Length Allowed.
{mqtt_max_clientid_len, 512}.
## Cookie for distributed node
node.cookie = emq_dist_cookie
%% Max Packet Size Allowed, 64K by default.
{mqtt_max_packet_size, 65536}.
## SMP support: enable, auto, disable
node.smp = auto
%% Client Idle Timeout.
{mqtt_client_idle_timeout, 30}. % Second
## Enable kernel poll
node.kernel_poll = on
%%--------------------------------------------------------------------
%% Authentication
%%--------------------------------------------------------------------
## async thread pool
node.async_threads = 32
%% Anonymous: Allow all
{auth, anonymous, []}.
## Erlang Process Limit
node.process_limit = 256000
%% Authentication with username, password
{auth, username, []}.
## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 65536
%% Authentication with clientId
{auth, clientid, [{password, no}]}.
## Set the distribution buffer busy limit (dist_buf_busy_limit)
node.dist_buffer_size = 32MB
%%--------------------------------------------------------------------
%% ACL
%%--------------------------------------------------------------------
## Max ETS Tables.
## Note that mnesia and SSL will create temporary ets tables.
node.max_ets_tables = 256000
{acl, anonymous, []}.
## Tweak GC to run more often
node.fullsweep_after = 1000
{acl, internal, [{nomatch, allow}]}.
## Crash dump
node.crash_dump = log/crash.dump
%% Cache ACL result for PUBLISH
{cache_acl, true}.
## Distributed node ticktime
node.dist_net_ticktime = 60
%%--------------------------------------------------------------------
%% Broker
%%--------------------------------------------------------------------
## Distributed node port range
## node.dist_listen_min = 6000
## node.dist_listen_max = 6999
%% System interval of publishing broker $SYS messages
{broker_sys_interval, 60}.
##--------------------------------------------------------------------
## Log
##--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Session
%%--------------------------------------------------------------------
## Console log. Enum: off, file, console, both
log.console = console
%% Max number of QoS 1 and 2 messages that can be “inflight” at one time.
%% 0 means no limit
{session_max_inflight, 100}.
## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
log.console.level = error
%% Retry interval for redelivering QoS1/2 messages.
{session_unack_retry_interval, 60}.
## Console log file
## log.console.file = log/console.log
%% Awaiting PUBREL Timeout
{session_await_rel_timeout, 20}.
## Error log file
log.error.file = log/error.log
%% Max Packets that Awaiting PUBREL, 0 means no limit
{session_max_awaiting_rel, 0}.
## Enable the crash log. Enum: on, off
log.crash = on
%% Statistics Collection Interval(seconds)
{session_collect_interval, 0}.
log.crash.file = log/crash.log
%% Expired after 2 day (unit: minute)
{session_expired_after, 2880}.
##--------------------------------------------------------------------
## MQTT Protocol
##--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Queue
%%--------------------------------------------------------------------
## Max ClientId Length Allowed.
mqtt.max_clientid_len = 1024
%% Type: simple | priority
{queue_type, simple}.
## Max Packet Size Allowed, 64K by default.
mqtt.max_packet_size = 64KB
%% Topic Priority: 0~255, Default is 0
%% {queue_priority, [{"topic/1", 10}, {"topic/2", 8}]}.
## Client Idle Timeout (Second)
mqtt.client_idle_timeout = 30
%% Max queue length. Enqueued messages when persistent client disconnected,
%% or inflight window is full.
{queue_max_length, infinity}.
## Allow Anonymous authentication
mqtt.allow_anonymous = true
%% Low-water mark of queued messages
{queue_low_watermark, 0.2}.
##--------------------------------------------------------------------
## MQTT Session
##--------------------------------------------------------------------
%% High-water mark of queued messages
{queue_high_watermark, 0.6}.
## Max number of QoS 1 and 2 messages that can be “inflight” at one time.
## 0 means no limit
mqtt.session.max_inflight = 100
%% Queue Qos0 messages?
{queue_qos0, true}.
## Retry interval for redelivering QoS1/2 messages.
mqtt.session.retry_interval = 60
%%--------------------------------------------------------------------
%% Zone
%%--------------------------------------------------------------------
## Awaiting PUBREL Timeout
mqtt.session.await_rel_timeout = 20
{zone, admin, []}.
## Max Packets that Awaiting PUBREL, 0 means no limit
mqtt.session.max_awaiting_rel = 0
%%--------------------------------------------------------------------
%% Listener
%%--------------------------------------------------------------------
## Statistics Collection Interval(seconds)
mqtt.session.collect_interval = 0
%% Plain MQTT
{listener, mqtt, 1883, [
%% Size of acceptor pool
{acceptors, 16},
## Expired after 1 day:
## w - week
## d - day
## h - hour
## m - minute
## s - second
mqtt.session.expired_after = 1d
%% Maximum number of concurrent clients
{max_clients, 512},
##--------------------------------------------------------------------
## MQTT Queue
##--------------------------------------------------------------------
%% Mount point prefix
%% {mount_point, "prefix/"},
## Type: simple | priority
mqtt.queue.type = simple
%% Socket Access Control
{access, [{allow, all}]},
## Topic Priority: 0~255, Default is 0
## mqtt.queue.priority = topic/1=10,topic/2=8
%% Connection Options
{connopts, [
%% Rate Limit. Format is 'burst, rate', Unit is KB/Sec
%% {rate_limit, "100,10"} %% 100K burst, 10K rate
]},
## Max queue length. Enqueued messages when persistent client disconnected,
## or inflight window is full.
mqtt.queue.max_length = infinity
%% Socket Options
{sockopts, [
%Set buffer if hight thoughtput
%{recbuf, 4096},
%{sndbuf, 4096},
%{buffer, 4096},
%{nodelay, true},
{backlog, 1024}
]}
]}.
## Low-water mark of queued messages
mqtt.queue.low_watermark = 20%
%% MQTT/SSL
{listener, mqtts, 8883, [
%% Size of acceptor pool
{acceptors, 4},
## High-water mark of queued messages
mqtt.queue.high_watermark = 60%
%% Maximum number of concurrent clients
{max_clients, 512},
## Queue Qos0 messages?
mqtt.queue.qos0 = true
%% Socket Access Control
{access, [{allow, all}]},
##--------------------------------------------------------------------
## MQTT Broker and PubSub
##--------------------------------------------------------------------
%% SSL certificate and key files
{ssl, [{certfile, "etc/ssl/ssl.crt"},
{keyfile, "etc/ssl/ssl.key"}]},
## System Interval of publishing broker $SYS Messages
mqtt.broker.sys_interval = 60
%% Socket Options
{sockopts, [
{backlog, 1024}
%{buffer, 4096},
]}
]}.
## PubSub Pool Size. Default should be scheduler numbers.
mqtt.pubsub.pool_size = 8
%% HTTP and WebSocket Listener
{listener, http, 8083, [
%% Size of acceptor pool
{acceptors, 4},
mqtt.pubsub.by_clientid = true
%% Maximum number of concurrent clients
{max_clients, 64},
## Subscribe Asynchronously
mqtt.pubsub.async = true
%% Socket Access Control
{access, [{allow, all}]},
##--------------------------------------------------------------------
## MQTT Bridge
##--------------------------------------------------------------------
%% Socket Options
{sockopts, [
{backlog, 1024}
%{buffer, 4096},
]}
]}.
## Bridge Queue Size
mqtt.bridge.max_queue_len = 10000
%%--------------------------------------------------------------------
%% PubSub
%%--------------------------------------------------------------------
## Ping Interval of bridge node. Unit: Second
mqtt.bridge.ping_down_interval = 1
%% PubSub and Router. Default should be scheduler numbers.
{pubsub_pool_size, 8}.
##-------------------------------------------------------------------
## MQTT Plugins
##-------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Routing
%%--------------------------------------------------------------------
## Dir of plugins' config
##mqtt.plugins.etc_dir = etc/plugins/
%% Route aging time(seconds)
{routing_age, 5}.
## File to store loaded plugin names.
##mqtt.plugins.loaded_file = data/loaded_plugins
%%--------------------------------------------------------------------
%% Bridge
%%--------------------------------------------------------------------
##-------------------------------------------------------------------
## MQTT Modules
##-------------------------------------------------------------------
%% TODO: Bridge Queue Size
{bridge_max_queue_len, 10000}.
## Enable retainer module
mqtt.module.retainer = on
%% Ping Interval of bridge node
{bridge_ping_down_interval, 1}. % second
## disc: disc_copies, ram: ram_copies
mqtt.module.retainer.storage_type = ram
%%-------------------------------------------------------------------
%% Plugins
%%-------------------------------------------------------------------
## Max number of retained messages
mqtt.module.retainer.max_message_num = 100000
%% Dir of plugins' config
{plugins_etc_dir, "etc/plugins/"}.
## Max Payload Size of retained message
mqtt.module.retainer.max_payload_size = 64KB
%% File to store loaded plugin names.
{plugins_loaded_file, "data/loaded_plugins"}.
## Expired after seconds, never expired if 0
mqtt.module.retainer.expired_after = 0
%%--------------------------------------------------------------------
%% Modules
%%--------------------------------------------------------------------
## Enable presence module
## Client presence management module. Publish presence messages when
## client connected or disconnected.
mqtt.module.presence = on
%% Retainer Module
{module, retainer, [
mqtt.module.presence.qos = 0
%% disc: disc_copies, ram: ram_copies
{storage, ram},
## Enable subscription module
## Subscribe topics automatically when client connected
mqtt.module.subscription = on
%% Max number of retained messages
{max_message_num, 100000},
mqtt.module.subscription.topics = $client/%c=1,$user/%u=1
%% Max Payload Size of retained message
{max_playload_size, 65536},
##--------------------------------------------------------------------
## MQTT Listeners
##--------------------------------------------------------------------
%% Expired after seconds, never expired if 0
{expired_after, 0}
## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883
mqtt.listener.tcp = 1883
]}.
## Size of acceptor pool
mqtt.listener.tcp.acceptors = 8
%% Client presence management module. Publish presence messages when
%% client connected or disconnected.
{module, presence, [{qos, 0}]}.
## Maximum number of concurrent clients
mqtt.listener.tcp.max_clients = 1024
%% Subscribe topics automatically when client connected
{module, subscription, [{"$queue/clients/$c", 1}, backend]}.
## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.listener.tcp.rate_limit = 100,10
%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite)
{module, rewrite, []}.
## TCP Socket Options
mqtt.listener.tcp.backlog = 1024
## mqtt.listener.tcp.recbuf = 4096
## mqtt.listener.tcp.sndbuf = 4096
## mqtt.listener.tcp.buffer = 4096
## mqtt.listener.tcp.nodelay = true
%%-------------------------------------------------------------------
%% Erlang System Monitor
%%-------------------------------------------------------------------
## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883
mqtt.listener.ssl = 8883
%% Long GC, don't monitor in production mode for:
%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
## Size of acceptor pool
mqtt.listener.ssl.acceptors = 4
{sysmon_long_gc, false}.
## Maximum number of concurrent clients
mqtt.listener.ssl.max_clients = 512
%% Long Schedule(ms)
{sysmon_long_schedule, 240}.
## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
## mqtt.listener.ssl.rate_limit = 100,10
%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
%% 8 * 1024 * 1024
{sysmon_large_heap, 8388608}.
## Configuring SSL Options
## See http://erlang.org/doc/man/ssl.html
mqtt.listener.ssl.handshake_timeout = 15 #seconds
mqtt.listener.ssl.keyfile = etc/ssl/key.pem
mqtt.listener.ssl.certfile = etc/ssl/cert.pem
mqtt.listener.ssl.cacertfile = etc/ssl/cacert.pem
## mqtt.listener.ssl.verify = verify_peer
## mqtt.listener.ssl.failed_if_no_peer_cert = true
%% Busy Port
{sysmon_busy_port, false}.
## HTTP Listener
mqtt.listener.http = 8083
mqtt.listener.http.acceptors = 4
mqtt.listener.http.max_clients = 64
%% Busy Dist Port
{sysmon_busy_dist_port, true}.
## HTTP(SSL) Listener
mqtt.listener.https = 8084
mqtt.listener.https.acceptors = 4
mqtt.listener.https.max_clients = 64
mqtt.listener.https.handshake_timeout = 10 #seconds
mqtt.listener.https.certfile = etc/ssl/cert.pem
mqtt.listener.https.keyfile = etc/ssl/key.pem
mqtt.listener.https.cacertfile = etc/ssl/cacert.pem
## mqtt.listener.https.verify = verify_peer
## mqtt.listener.https.failed_if_no_peer_cert = true
##-------------------------------------------------------------------
## System Monitor
##-------------------------------------------------------------------
## Long GC, don't monitor in production mode for:
## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
sysmon.long_gc = false
## Long Schedule(ms)
sysmon.long_schedule = 240
## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM.
sysmon.large_heap = 8MB
## Busy Port
sysmon.busy_port = false
## Busy Dist Port
sysmon.busy_dist_port = true

View File

@ -0,0 +1,752 @@
%%-*- mode: erlang -*-
%% EMQ config mapping
%%--------------------------------------------------------------------
%% Erlang Node
%%--------------------------------------------------------------------
%% @doc Erlang node name
{mapping, "node.name", "vm_args.-name", [
{default, "emqttd@127.0.0.1"}
]}.
%% @doc Secret cookie for distributed erlang node
{mapping, "node.cookie", "vm_args.-setcookie", [
{default, "emqsecretcookie"}
]}.
%% @doc SMP Support
{mapping, "node.smp", "vm_args.-smp", [
{default, auto},
{datatype, {enum, [enable, auto, disable]}},
hidden
]}.
%% @doc Enable Kernel Poll
{mapping, "node.kernel_poll", "vm_args.+K", [
{default, on},
{datatype, flag},
hidden
]}.
%% @doc More information at: http://erlang.org/doc/man/erl.html
{mapping, "node.async_threads", "vm_args.+A", [
{default, 64},
{datatype, integer},
{validators, ["range:0-1024"]}
]}.
%% @doc Erlang Process Limit
{mapping, "node.process_limit", "vm_args.+P", [
{datatype, integer},
{default, 256000},
hidden
]}.
%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q
%% @doc The number of concurrent ports/sockets
%% Valid range is 1024-134217727
{mapping, "node.max_ports",
cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [
{default, 262144},
{datatype, integer},
{validators, ["range4ports"]}
]}.
{validator, "range4ports", "must be 1024 to 134217727",
fun(X) -> X >= 1024 andalso X =< 134217727 end}.
%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [
{datatype, bytesize},
{commented, "32MB"},
hidden,
{validators, ["zdbbl_range"]}
]}.
{translation, "vm_args.+zdbbl",
fun(Conf) ->
ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined),
case ZDBBL of
undefined -> undefined;
X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes;
_ -> undefined
end
end
}.
{validator, "zdbbl_range", "must be between 1KB and 2097151KB",
fun(ZDBBL) ->
%% 2097151KB = 2147482624
ZDBBL >= 1024 andalso ZDBBL =< 2147482624
end
}.
%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2
{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [
{default, 1000},
{datatype, integer},
hidden,
{validators, ["positive_integer"]}
]}.
{validator, "positive_integer", "must be a positive integer",
fun(X) -> X >= 0 end}.
%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES,
%% R16+ uses +e
%% @doc The ETS table limit
{mapping, "node.max_ets_tables",
cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [
{default, 256000},
{datatype, integer},
hidden
]}.
%% @doc Set the location of crash dumps
{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [
{default, "{{crash_dump}}"},
{datatype, file},
hidden
]}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [
{commented, 60},
{datatype, integer},
hidden
]}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html
{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [
{commented, 6000},
{datatype, integer},
hidden
]}.
%% @see node.dist_listen_min
{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [
{commented, 6999},
{datatype, integer},
hidden
]}.
%%--------------------------------------------------------------------
%% Log
%%--------------------------------------------------------------------
{mapping, "log.console", "lager.handlers", [
{default, file },
{datatype, {enum, [off, file, console, both]}}
]}.
{mapping, "log.console.level", "lager.handlers", [
{default, info},
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
]}.
{mapping, "log.console.file", "lager.handlers", [
{default, "log/console.log"},
{datatype, file}
]}.
{mapping, "log.error.file", "lager.handlers", [
{default, "log/error.log"},
{datatype, file}
]}.
{mapping, "log.error.redirect", "lager.error_logger_redirect", [
{default, on},
{datatype, flag},
hidden
]}.
{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [
{default, 1000},
{datatype, integer},
hidden
]}.
{translation,
"lager.handlers",
fun(Conf) ->
ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of
undefined -> [];
ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
{level, error},
{size, 10485760},
{date, "$D0"},
{count, 5}]}]
end,
ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
ConsoleHandler = {lager_console_backend, ConsoleLogLevel},
ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
{level, ConsoleLogLevel},
{size, 10485760},
{date, "$D0"},
{count, 5}]},
ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
off -> [];
file -> [ConsoleFileHandler];
console -> [ConsoleHandler];
both -> [ConsoleHandler, ConsoleFileHandler];
_ -> []
end,
ConsoleHandlers ++ ErrorHandler
end
}.
{mapping, "log.crash", "lager.crash_log", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.crash.file", "lager.crash_log", [
{default, "log/crash.log"},
{datatype, file}
]}.
{translation,
"lager.crash_log",
fun(Conf) ->
case cuttlefish:conf_get("log.crash", Conf) of
false -> undefined;
_ ->
cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
end
end}.
{mapping, "sasl", "sasl.sasl_error_logger", [
{default, off},
{datatype, flag},
hidden
]}.
%%--------------------------------------------------------------------
%% MQTT Protocol
%%--------------------------------------------------------------------
%% @doc Set the Max ClientId Length Allowed.
{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [
{default, 1024},
{datatype, integer}
]}.
%% @doc Max Packet Size Allowed, 64K by default.
{mapping, "mqtt.max_packet_size", "emqttd.protocol", [
{default, "64KB"},
{datatype, bytesize}
]}.
%% @doc Client Idle Timeout.
{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [
{default, 30},
{datatype, integer}
]}.
{translation, "emqttd.protocol", fun(Conf) ->
[{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
{max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
{client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}]
end}.
%% @doc Allow Anonymous
{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [
{default, false},
{datatype, {enum, [true, false]}},
hidden
]}.
%%--------------------------------------------------------------------
%% MQTT Session
%%--------------------------------------------------------------------
%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
%% 0 means no limit
{mapping, "mqtt.session.max_inflight", "emqttd.session", [
{default, 100},
{datatype, integer}
]}.
%% @doc Retry interval for redelivering QoS1/2 messages.
{mapping, "mqtt.session.retry_interval", "emqttd.session", [
{default, 60},
{datatype, integer}
]}.
%% @doc Awaiting PUBREL Timeout
{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
{default, 30},
{datatype, integer}
]}.
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [
{default, 0},
{datatype, integer}
]}.
%% @doc Statistics Collection Interval(seconds)
{mapping, "mqtt.session.collect_interval", "emqttd.session", [
{default, 0},
{datatype, integer}
]}.
%% @doc Session expired after...
{mapping, "mqtt.session.expired_after", "emqttd.session", [
{default, "2d"},
{datatype, {duration, s}}
]}.
{translation, "emqttd.session", fun(Conf) ->
[{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
{retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
{await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
{max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
{collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)},
{expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}]
end}.
%%--------------------------------------------------------------------
%% MQTT Queue
%%--------------------------------------------------------------------
%% @doc Type: simple | priority
{mapping, "mqtt.queue.type", "emqttd.queue", [
{default, simple},
{datatype, atom}
]}.
%% @doc Topic Priority: 0~255, Default is 0
{mapping, "mqtt.queue.priority", "emqttd.queue", [
{default, ""},
{datatype, string},
hidden
]}.
%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full.
{mapping, "mqtt.queue.max_length", "emqttd.queue", [
{default, infinity},
{datatype, [atom, integer]}
]}.
%% @doc Low-water mark of queued messages
{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [
{default, "20%"},
{datatype, string},
hidden
]}.
%% @doc High-water mark of queued messages
{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [
{default, "60%"},
{datatype, string},
hidden
]}.
%% @doc Queue Qos0 messages?
{mapping, "mqtt.queue.qos0", "emqttd.queue", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqttd.queue", fun(Conf) ->
Parse = fun(S) ->
{match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]),
list_to_integer(N) / 100
end,
Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)},
{max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)},
{low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))},
{high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))},
{queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}],
case cuttlefish:conf_get("mqtt.queue.priority", Conf) of
undefined -> Opts;
V -> [{priority,
[begin [T, P] = string:tokens(S, "="),
{T, list_to_integer(P)}
end || S <- string:tokens(V, ",")]}|Opts]
end
end}.
%%--------------------------------------------------------------------
%% MQTT Broker
%%--------------------------------------------------------------------
{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [
{default, 60},
{datatype, integer}
]}.
%%--------------------------------------------------------------------
%% MQTT PubSub
%%--------------------------------------------------------------------
{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [
{default, true},
{datatype, {enum, [true, false]}},
hidden
]}.
{translation, "emqttd.pubsub", fun(Conf) ->
[{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)},
{by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)},
{async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}]
end}.
%%--------------------------------------------------------------------
%% MQTT Bridge
%%--------------------------------------------------------------------
{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [
{default, 10000},
{datatype, integer}
]}.
{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [
{default, 1},
{datatype, integer}
]}.
{translation, "emqttd.bridge", fun(Conf) ->
[{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)},
{ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}]
end}.
%%-------------------------------------------------------------------
%% MQTT Plugins
%%-------------------------------------------------------------------
{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [
{datatype, string}
]}.
{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [
{datatype, string}
]}.
%%--------------------------------------------------------------------
%% MQTT Listeners
%%--------------------------------------------------------------------
{mapping, "mqtt.listener.tcp", "emqttd.listeners", [
{default, 1883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [
{default, undefined},
{datatype, string},
hidden
]}.
{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [
{default, 1024},
{datatype, integer}
]}.
{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [
{datatype, integer},
hidden
]}.
{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [
{datatype, {enum, [true, false]}},
hidden
]}.
{mapping, "mqtt.listener.ssl", "emqttd.listeners", [
{default, 8883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [
{default, 512},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [
{default, 15},
{datatype, integer}
]}.
{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [
{datatype, {enum, [true, false]}}
]}.
{mapping, "mqtt.listener.http", "emqttd.listeners", [
{default, 8883},
{datatype, [integer, ip]}
]}.
{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [
{default, 64},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https", "emqttd.listeners", [
{default, undefined},
{datatype, [integer, ip]},
hidden
]}.
{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [
{default, 8},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [
{default, 64},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [
{default, 15},
{datatype, integer}
]}.
{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [
{datatype, string}
]}.
{mapping, "mqtt.listener.https.failed_if_no_peer_cert", "emqttd.listeners", [
{datatype, {enum, [true, false]}}
]}.
{translation, "emqttd.listeners", fun(Conf) ->
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
LisOpts = fun(Prefix) ->
Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
{max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)},
{rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}])
end,
TcpOpts = fun(Prefix) ->
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
{recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
{sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
{buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
{nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}])
end,
SslOpts = fun(Prefix) ->
Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf)},
{keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
{certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
{cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)},
{verify, cuttlefish:conf_get(Prefix ++ ".verify_peer", Conf, undefined)},
{failed_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ "failed_if_no_peer_cert", Conf, undefined)}])
end,
Listeners = fun(Name) when is_atom(Name) ->
Key = "mqtt.listener." ++ atom_to_list(Name),
case cuttlefish:conf_get(Key, Conf, undefined) of
undefined ->
[];
Port ->
ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]),
Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)],
[{Name, Port, case Name =:= ssl orelse Name =:= https of
true -> [{ssl, SslOpts(Key)} | Opts];
false -> Opts
end}]
end
end,
lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)])
end}.
%%--------------------------------------------------------------------
%% MQTT Modules
%%--------------------------------------------------------------------
{mapping, "mqtt.module.retainer", "emqttd.modules", [
{default, on},
{datatype, flag}
]}.
{mapping, "mqtt.module.retainer.storage_type", "emqttd.modules", [
{default, ram},
{datatype, {enum, [disc, ram]}}
]}.
{mapping, "mqtt.module.retainer.max_message_num", "emqttd.modules", [
{default, 100000},
{datatype, integer}
]}.
{mapping, "mqtt.module.retainer.max_payload_size", "emqttd.modules", [
{default, "64KB"},
{datatype, bytesize}
]}.
{mapping, "mqtt.module.retainer.expired_after", "emqttd.modules", [
{default, 0},
{datatype, integer}
]}.
{mapping, "mqtt.module.presence", "emqttd.modules", [
{default, on},
{datatype, flag}
]}.
{mapping, "mqtt.module.presence.qos", "emqttd.modules", [
{default, 0},
{datatype, integer},
{validators, ["range:0-2"]}
]}.
{mapping, "mqtt.module.subscription", "emqttd.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "mqtt.module.subscription.topics", "emqttd.modules", [
{default, undefined},
{datatype, string}
]}.
{translation, "emqttd.modules", fun(Conf) ->
WithMod = fun(Name, OptsF) ->
Key = "mqtt.module." ++ atom_to_list(Name),
case cuttlefish:conf_get(Key, Conf, false) of
true -> [{Name, OptsF(Key)}];
false -> []
end
end,
RetainOpts = fun(Prefix) ->
[{storage_type, cuttlefish:conf_get(Prefix ++ ".storage_type", Conf, ram)},
{max_message_num, cuttlefish:conf_get(Prefix ++ ".max_message_num", Conf, undefined)},
{max_payload_size, cuttlefish:conf_get(Prefix ++ ".max_payload_size", Conf, undefined)},
{expired_after, cuttlefish:conf_get(Prefix ++ ".expired_after", Conf, 0)}]
end,
PresOpts = fun(Prefix) ->
[{qos, cuttlefish:conf_get(Prefix ++ ".qos", Conf, 0)}]
end,
ParseFun = fun(undefined) -> [];
(Topics) -> [begin
[Topic, Qos] = string:tokens(S, "="),
{list_to_binary(Topic), list_to_integer(Qos)}
end || S <- string:tokens(Topics, ",")]
end,
SubOpts = fun(Prefix) -> [{topics, ParseFun(cuttlefish:conf_get(Prefix ++ ".topics", Conf))}] end,
lists:append([WithMod(retainer, RetainOpts), WithMod(presence, PresOpts), WithMod(subscription, SubOpts)])
end}.
%%--------------------------------------------------------------------
%% System Monitor
%%--------------------------------------------------------------------
%% @doc Long GC, don't monitor in production mode for:
%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421
{mapping, "sysmon.long_gc", "emqttd.sysmon", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
%% @doc Long Schedule(ms)
{mapping, "sysmon.long_schedule", "emqttd.sysmon", [
{default, 1000},
{datatype, integer}
]}.
%% @doc Large Heap
{mapping, "sysmon.large_heap", "emqttd.sysmon", [
{default, "8MB"},
{datatype, bytesize}
]}.
%% @doc Monitor Busy Port
{mapping, "sysmon.busy_port", "emqttd.sysmon", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
%% @doc Monitor Busy Dist Port
{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqttd.sysmon", fun(Conf) ->
[{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)},
{long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)},
{large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)},
{busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)},
{busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}]
end}.

View File

@ -41,7 +41,6 @@ groups() ->
init_per_group(access_control, Config) ->
application:load(emqttd),
prepare_config(),
gen_conf:init(emqttd),
Config;
init_per_group(_Group, Config) ->
@ -92,43 +91,39 @@ end_per_testcase(_TestCase, _Config) ->
%%--------------------------------------------------------------------
reload_acl(_) ->
[ok] = ?AC:reload_acl().
[] = ?AC:reload_acl().
register_mod(_) ->
ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
{error, already_existed} = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
[{emqttd_acl_test_mod, _, 0},
{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
[{emqttd_acl_test_mod, _, 0}] = ?AC:lookup_mods(acl),
ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
ok = ?AC:register_mod(auth, emqttd_auth_dashboard, [], 99),
[{emqttd_auth_dashboard, _, 99},
{emqttd_auth_anonymous_test_mod, _, 0},
{emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth).
{emqttd_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth).
unregister_mod(_) ->
ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []),
[{emqttd_acl_test_mod, _, 0},
{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
[{emqttd_acl_test_mod, _, 0}] = ?AC:lookup_mods(acl),
ok = ?AC:unregister_mod(acl, emqttd_acl_test_mod),
timer:sleep(5),
[{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl),
[] = ?AC:lookup_mods(acl),
ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]),
[{emqttd_auth_anonymous_test_mod, _, 0},
{emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth),
[{emqttd_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth),
ok = ?AC:unregister_mod(auth, emqttd_auth_anonymous_test_mod),
timer:sleep(5),
[{emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth).
[] = ?AC:lookup_mods(auth).
check_acl(_) ->
User1 = #mqtt_client{client_id = <<"client1">>, username = <<"testuser">>},
User2 = #mqtt_client{client_id = <<"client2">>, username = <<"xyz">>},
allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>),
deny = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>),
allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>),
deny = ?AC:check_acl(User2, subscribe, <<"a/b/c">>).
allow = ?AC:check_acl(User2, subscribe, <<"a/b/c">>).
%%--------------------------------------------------------------------
%% emqttd_access_rule

View File

@ -124,13 +124,13 @@ process_list(_Config) ->
process_info(_Config) ->
ProcessInfos = emqttd_vm:get_process_info(),
ProcessInfo = lists:last(ProcessInfos),
Keys = [K || {K, V}<- ProcessInfo],
Keys = [K || {K, _V}<- ProcessInfo],
?PROCESS_INFO = Keys.
process_gc(_Config) ->
ProcessGcs = emqttd_vm:get_process_gc(),
ProcessGc = lists:last(ProcessGcs),
Keys = [K || {K, V}<- ProcessGc],
Keys = [K || {K, _V}<- ProcessGc],
?PROCESS_GC = Keys.
get_ets_list(_Config) ->