diff --git a/.gitignore b/.gitignore index 0982fe30f..8bbf0a28b 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ ct.coverdata .idea/ emqttd.iml _rel/ +data/ diff --git a/Makefile b/Makefile index 85c815362..a7dde4b75 100644 --- a/Makefile +++ b/Makefile @@ -2,14 +2,22 @@ PROJECT = emqttd PROJECT_DESCRIPTION = Erlang MQTT Broker PROJECT_VERSION = 2.0 -DEPS = gproc lager gen_logger gen_conf esockd mochiweb +DEPS = gproc lager gen_logger esockd mochiweb getopt pbkdf2 \ + clique time_compat rand_compat -dep_gproc = git https://github.com/uwiger/gproc -dep_lager = git https://github.com/basho/lager -dep_gen_conf = git https://github.com/emqtt/gen_conf -dep_gen_logger = git https://github.com/emqtt/gen_logger -dep_esockd = git https://github.com/emqtt/esockd emq20 -dep_mochiweb = git https://github.com/emqtt/mochiweb +dep_gproc = git https://github.com/uwiger/gproc +dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 +dep_lager = git https://github.com/basho/lager master +dep_gen_logger = git https://github.com/emqtt/gen_logger +dep_esockd = git https://github.com/emqtt/esockd emq20 +dep_mochiweb = git https://github.com/emqtt/mochiweb +dep_clique = git https://github.com/basho/clique +dep_pbkdf2 = git https://github.com/basho/erlang-pbkdf2 2.0.0 +dep_time_compat = git https://github.com/lasp-lang/time_compat +dep_rand_compat = git https://github.com/lasp-lang/rand_compat + +TEST_DEPS = cuttlefish +dep_cuttlefish = git https://github.com/emqtt/cuttlefish ERLC_OPTS += +'{parse_transform, lager_transform}' @@ -31,3 +39,5 @@ include erlang.mk app:: rebar.config +app.config:: + cuttlefish -l info -e etc/ -c etc/emq.conf -i priv/emq.schema -d data/ diff --git a/etc/acl.conf b/etc/acl.conf new file mode 100644 index 000000000..3cb3b8c52 --- /dev/null +++ b/etc/acl.conf @@ -0,0 +1,29 @@ +%%-------------------------------------------------------------------- +%% +%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL) +%% +%% -type who() :: all | binary() | +%% {ipaddr, esockd_access:cidr()} | +%% {client, binary()} | +%% {user, binary()}. +%% +%% -type access() :: subscribe | publish | pubsub. +%% +%% -type topic() :: binary(). +%% +%% -type rule() :: {allow, all} | +%% {allow, who(), access(), list(topic())} | +%% {deny, all} | +%% {deny, who(), access(), list(topic())}. +%% +%%-------------------------------------------------------------------- + +{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. + +{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}. + +{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}. + +{allow, all}. + + diff --git a/etc/certs/README b/etc/certs/README new file mode 100644 index 000000000..114a360ac --- /dev/null +++ b/etc/certs/README @@ -0,0 +1 @@ +Place your SSL/TLS Certificates here. diff --git a/etc/certs/cacert.pem b/etc/certs/cacert.pem new file mode 100644 index 000000000..cb7afbcc7 --- /dev/null +++ b/etc/certs/cacert.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICZTCCAc4CCQCPzzI1ezeZPTANBgkqhkiG9w0BAQUFADB3MQswCQYDVQQGEwJD +TjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MREwDwYDVQQK +EwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG9w0BCQEWDWhv +bmdAZW1xdHQuaW8wHhcNMTYxMDEzMDkwNzQ5WhcNMTYxMTEyMDkwNzQ5WjB3MQsw +CQYDVQQGEwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91 +MREwDwYDVQQKEwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG +9w0BCQEWDWhvbmdAZW1xdHQuaW8wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGB +AJ/6ACaLPXP6wpGOqc9+jFRFN6ufODqGB5SamCNmOKXpSm/U+KT87NPg4i4wn31s +167nb65lk3IbdvzPzTSCAP6DG5s0+qDgpEMHeKKEC4zaAwoIxCgVUjab51RbVFBs +AhzowxdRl6jQrGVgvXiLzz1+3b+1Xydu5J5Z2IeLm8NPAgMBAAEwDQYJKoZIhvcN +AQEFBQADgYEAkt/VWi4tUUEdOnDwnCZ4IheV9Sp+6T3XsRxje7PKDsvZQlmpvMP6 +StfM+wkxty2dxVOU5Sx8CwXk5roKvULQY5rAyn9log6vEAI4Oyr4vnRN24JF7/Tr +xeP1cOv2LJlEuQm1JWe+VtNqfJ+f81CnfaJMAo17W5T/5UxI5n8ziKc= +-----END CERTIFICATE----- diff --git a/etc/certs/cert.pem b/etc/certs/cert.pem new file mode 100644 index 000000000..e583916c2 --- /dev/null +++ b/etc/certs/cert.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICZjCCAc+gAwIBAgIJAO89PfgaeHB2MA0GCSqGSIb3DQEBBQUAMHcxCzAJBgNV +BAYTAkNOMREwDwYDVQQIEwhaaGVKaWFuZzERMA8GA1UEBxMISGFuZ1pob3UxETAP +BgNVBAoTCGVtcXR0LmlvMREwDwYDVQQDEwhlbXF0dC5pbzEcMBoGCSqGSIb3DQEJ +ARYNaG9uZ0BlbXF0dC5pbzAeFw0xNjEwMTMwOTEzMTFaFw0xNjExMTIwOTEzMTFa +MEYxCzAJBgNVBAYTAkNOMREwDwYDVQQIDAhaaGVKaWFuZzERMA8GA1UEBwwISGFu +Z1pob3UxETAPBgNVBAsMCGVtcXR0LmlvMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCB +iQKBgQDaOI1oasKjo0JGk5bMIxGInlxbvTuJZ8436u8HY4q8jZ+a4G12+UdTVHRF +d94/ClHWn8WvOvzbxvmSkhninlzdWm1rBLWis3Z2kStmhL77kITEfIImus9pjm5l +OgBxY4+Q7LSEsKYYH+ClYVaLlzO8PILEkBk6xxxq0X7AnCfDfQIDAQABoyswKTAJ +BgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAPBgNVHREECDAGhwR/AAABMA0GCSqGSIb3 +DQEBBQUAA4GBAGS1yw1w9H7F4uOaK02mUCHZiV+EBB3gkBBqtAx7TXsmoGgT6ySA +7DwrbX6IH82bhZT4TjouhaPlUPE9pin88d/2kNbRrbZoZDMYGq02mVVRxfLzJqM2 +GVlsxebsFFPbYhOaf9TuRR3v13ebga0FrXNke+IGLsYZSM2PZ+F4EvIA +-----END CERTIFICATE----- diff --git a/etc/certs/client-cert.pem b/etc/certs/client-cert.pem new file mode 100644 index 000000000..614816a54 --- /dev/null +++ b/etc/certs/client-cert.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICITCCAYoCCQDvPT34GnhwdzANBgkqhkiG9w0BAQUFADB3MQswCQYDVQQGEwJD +TjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91MREwDwYDVQQK +EwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG9w0BCQEWDWhv +bmdAZW1xdHQuaW8wHhcNMTYxMDEzMDkxNTMxWhcNMTYxMTEyMDkxNTMxWjB3MQsw +CQYDVQQGEwJDTjERMA8GA1UECBMIWmhlSmlhbmcxETAPBgNVBAcTCEhhbmdaaG91 +MREwDwYDVQQKEwhlbXF0dC5pbzERMA8GA1UEAxMIZW1xdHQuaW8xHDAaBgkqhkiG +9w0BCQEWDWhvbmdAZW1xdHQuaW8wXDANBgkqhkiG9w0BAQEFAANLADBIAkEAx1yF +I3YnvDPtHpGzJ+9ZGnnKkvMdaoyawT9rPvLsteeDkfknJcGCV5mKmjvH1xeeMIN1 +Kql9nVPoe7BtzJ0XwQIDAQABMA0GCSqGSIb3DQEBBQUAA4GBAALulKuZuE6RhwIT +JBrUN7j4dbJe7Ttz+Q3qSQq6hNJoDf8hNrAHUDQzov9yU/KMMi9xE6+hu+ieuTo6 +hKLBDAD4hDzb6+EU5HAcASDkAXWnQq/Keo73+VrmUwMQs93tTC/jGXpsj/gLMEWB +xcxXpgBPDGIR9L8Y2YMhEBLjm7Zv +-----END CERTIFICATE----- diff --git a/etc/certs/client-key.pem b/etc/certs/client-key.pem new file mode 100644 index 000000000..ac01b944a --- /dev/null +++ b/etc/certs/client-key.pem @@ -0,0 +1,9 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIBOwIBAAJBAMdchSN2J7wz7R6RsyfvWRp5ypLzHWqMmsE/az7y7LXng5H5JyXB +gleZipo7x9cXnjCDdSqpfZ1T6HuwbcydF8ECAwEAAQJAOGtblmyS1DVRzsvnCs82 +xUJgbPP2iDfgd/4tqLPw/41T9d4RimhfNMUF9n+9IZPCGPXGGc4OYQttNq+w/BG/ +vQIhAPlZCz+fI1OEcqNB5BjYRrZU+6KtkSaKXRL/2e2yAmbTAiEAzK4WOv+Zs8x7 +aG9pO2SOy38qCBOq1xfohFJnPaWaEpsCIQCE1zaR75NfdEmqxnjh759ElmP1WCjj +coWBkMMmylZTNwIgAeFPfvc+GDK2p3zugIcp8KCYaD6WASfNEPoYzK4qviUCIQDt +sTM3JZeInrLoDJwhgrmMFDjlf7XZ+LF7uYDRuE+7jw== +-----END RSA PRIVATE KEY----- diff --git a/etc/certs/key.pem b/etc/certs/key.pem new file mode 100644 index 000000000..6f5053722 --- /dev/null +++ b/etc/certs/key.pem @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXQIBAAKBgQDaOI1oasKjo0JGk5bMIxGInlxbvTuJZ8436u8HY4q8jZ+a4G12 ++UdTVHRFd94/ClHWn8WvOvzbxvmSkhninlzdWm1rBLWis3Z2kStmhL77kITEfIIm +us9pjm5lOgBxY4+Q7LSEsKYYH+ClYVaLlzO8PILEkBk6xxxq0X7AnCfDfQIDAQAB +AoGBAKyew61vllxfjtPJeCYvL3WE38ZqIKiHBufQ3hhYM60H0tNu6OiONE/EpN02 +/wWbIjXG2VfOL6ui8FVzYSqU3xt8kD+zs3+Q4Qz+UEe9bwLEysswWyQA/YJtOS60 +FPxedT/gs0SAQP4MAc/FeFXOXVSzX4nVMoVeLpjvakh3hQNBAkEA8yObp8U6WCdE +p5TBQl5CyYJq4lyZ+d8DLp/v6gK7P4Nm8tTeSNmOtoG2nA6/VtSI75jm9yl5owzF +CZn3654GSQJBAOXDhcQhEjR+NMUR62a1YRaBK36ZtiJXFdIV4g16BUBp6q4UDAiV +3GBVuCYsykXT6V+3cR0vvUQUbpfAUl1BQ5UCQQCDytVgx2OszPxF6jgnhXimSe8t +7Av6iYvsBf3B1uEwuEVhc0laK7NT8lPNm6DTrDjdxv/LEcxBOXbEkZT1Pp8hAkB3 +tvdkqKKWrUeLgvm3azwqAKWL8kUfAWcCLpq40OIZnNZFW3alpofLvf4UDfRai76m +O6t5PJ2N8mNpODDyHAY9AkAqbnmnmbswN+ayB80357N8298GLBqlG+A6YHps7SnR +K+4poUZgdhs0e5zh7jAR7cyQuQnKC7LMR0ZRH1WTs97V +-----END RSA PRIVATE KEY----- diff --git a/etc/emq.conf b/etc/emq.conf new file mode 100644 index 000000000..8bb69021d --- /dev/null +++ b/etc/emq.conf @@ -0,0 +1,283 @@ +##-------------------------------------------------------------------- +## Node Args +##-------------------------------------------------------------------- + +## Node name +node.name = emqttd@127.0.0.1 + +## Cookie for distributed node +node.cookie = emq_dist_cookie + +## SMP support: enable, auto, disable +node.smp = auto + +## Enable kernel poll +node.kernel_poll = on + +## async thread pool +node.async_threads = 32 + +## Erlang Process Limit +node.process_limit = 256000 + +## Sets the maximum number of simultaneously existing ports for this system +node.max_ports = 65536 + +## Set the distribution buffer busy limit (dist_buf_busy_limit) +node.dist_buffer_size = 32MB + +## Max ETS Tables. +## Note that mnesia and SSL will create temporary ets tables. +node.max_ets_tables = 256000 + +## Tweak GC to run more often +node.fullsweep_after = 1000 + +## Crash dump +node.crash_dump = log/crash.dump + +## Distributed node ticktime +node.dist_net_ticktime = 60 + +## Distributed node port range +## node.dist_listen_min = 6000 +## node.dist_listen_max = 6999 + +##-------------------------------------------------------------------- +## Log +##-------------------------------------------------------------------- + +## Console log. Enum: off, file, console, both +log.console = console + +## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency +log.console.level = error + +## Console log file +## log.console.file = log/console.log + +## Error log file +log.error.file = log/error.log + +## Enable the crash log. Enum: on, off +log.crash = on + +log.crash.file = log/crash.log + +##-------------------------------------------------------------------- +## MQTT Protocol +##-------------------------------------------------------------------- + +## Max ClientId Length Allowed. +mqtt.max_clientid_len = 1024 + +## Max Packet Size Allowed, 64K by default. +mqtt.max_packet_size = 64KB + +## Client Idle Timeout (Second) +mqtt.client_idle_timeout = 30 + +## Allow Anonymous authentication +mqtt.allow_anonymous = true + +## Default ACL File +mqtt.acl_file = etc/acl.conf + +##-------------------------------------------------------------------- +## MQTT Session +##-------------------------------------------------------------------- + +## Max number of QoS 1 and 2 messages that can be “inflight” at one time. +## 0 means no limit +mqtt.session.max_inflight = 100 + +## Retry interval for redelivering QoS1/2 messages. +mqtt.session.retry_interval = 60 + +## Awaiting PUBREL Timeout +mqtt.session.await_rel_timeout = 20 + +## Max Packets that Awaiting PUBREL, 0 means no limit +mqtt.session.max_awaiting_rel = 0 + +## Statistics Collection Interval(seconds) +mqtt.session.collect_interval = 0 + +## Expired after 1 day: +## w - week +## d - day +## h - hour +## m - minute +## s - second +mqtt.session.expired_after = 1d + +##-------------------------------------------------------------------- +## MQTT Queue +##-------------------------------------------------------------------- + +## Type: simple | priority +mqtt.queue.type = simple + +## Topic Priority: 0~255, Default is 0 +## mqtt.queue.priority = topic/1=10,topic/2=8 + +## Max queue length. Enqueued messages when persistent client disconnected, +## or inflight window is full. +mqtt.queue.max_length = infinity + +## Low-water mark of queued messages +mqtt.queue.low_watermark = 20% + +## High-water mark of queued messages +mqtt.queue.high_watermark = 60% + +## Queue Qos0 messages? +mqtt.queue.qos0 = true + +##-------------------------------------------------------------------- +## MQTT Broker and PubSub +##-------------------------------------------------------------------- + +## System Interval of publishing broker $SYS Messages +mqtt.broker.sys_interval = 60 + +## PubSub Pool Size. Default should be scheduler numbers. +mqtt.pubsub.pool_size = 8 + +mqtt.pubsub.by_clientid = true + +## Subscribe Asynchronously +mqtt.pubsub.async = true + +##-------------------------------------------------------------------- +## MQTT Bridge +##-------------------------------------------------------------------- + +## Bridge Queue Size +mqtt.bridge.max_queue_len = 10000 + +## Ping Interval of bridge node. Unit: Second +mqtt.bridge.ping_down_interval = 1 + +##------------------------------------------------------------------- +## MQTT Plugins +##------------------------------------------------------------------- + +## Dir of plugins' config +mqtt.plugins.etc_dir = etc/plugins/ + +## File to store loaded plugin names. +mqtt.plugins.loaded_file = data/loaded_plugins + +##------------------------------------------------------------------- +## MQTT Modules +##------------------------------------------------------------------- + +## Enable retainer module +mqtt.module.retainer = on + +## disc: disc_copies, ram: ram_copies +mqtt.module.retainer.storage_type = ram + +## Max number of retained messages +mqtt.module.retainer.max_message_num = 100000 + +## Max Payload Size of retained message +mqtt.module.retainer.max_payload_size = 64KB + +## Expired after seconds, never expired if 0 +mqtt.module.retainer.expired_after = 0 + +## Enable presence module +## Client presence management module. Publish presence messages when +## client connected or disconnected. +mqtt.module.presence = on + +mqtt.module.presence.qos = 0 + +## Enable subscription module +## Subscribe topics automatically when client connected +mqtt.module.subscription = on + +mqtt.module.subscription.topics = $client/%c=1,$user/%u=1 + +##-------------------------------------------------------------------- +## MQTT Listeners +##-------------------------------------------------------------------- + +## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 +mqtt.listener.tcp = 1883 + +## Size of acceptor pool +mqtt.listener.tcp.acceptors = 8 + +## Maximum number of concurrent clients +mqtt.listener.tcp.max_clients = 1024 + +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.tcp.rate_limit = 100,10 + +## TCP Socket Options +mqtt.listener.tcp.backlog = 1024 +## mqtt.listener.tcp.recbuf = 4096 +## mqtt.listener.tcp.sndbuf = 4096 +## mqtt.listener.tcp.buffer = 4096 +## mqtt.listener.tcp.nodelay = true + +## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 +mqtt.listener.ssl = 8883 + +## Size of acceptor pool +mqtt.listener.ssl.acceptors = 4 + +## Maximum number of concurrent clients +mqtt.listener.ssl.max_clients = 512 + +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.ssl.rate_limit = 100,10 + +## Configuring SSL Options +## See http://erlang.org/doc/man/ssl.html +mqtt.listener.ssl.handshake_timeout = 15 +mqtt.listener.ssl.keyfile = etc/certs/key.pem +mqtt.listener.ssl.certfile = etc/certs/cert.pem +mqtt.listener.ssl.cacertfile = etc/certs/cacert.pem +## mqtt.listener.ssl.verify = verify_peer +## mqtt.listener.ssl.failed_if_no_peer_cert = true + +## HTTP Listener +mqtt.listener.http = 8083 +mqtt.listener.http.acceptors = 4 +mqtt.listener.http.max_clients = 64 + +## HTTP(SSL) Listener +## mqtt.listener.https = 8084 +## mqtt.listener.https.acceptors = 4 +## mqtt.listener.https.max_clients = 64 +## mqtt.listener.https.handshake_timeout = 10 +## mqtt.listener.https.certfile = etc/certs/cert.pem +## mqtt.listener.https.keyfile = etc/certs/key.pem +## mqtt.listener.https.cacertfile = etc/certs/cacert.pem +## mqtt.listener.https.verify = verify_peer +## mqtt.listener.https.failed_if_no_peer_cert = true + +##------------------------------------------------------------------- +## System Monitor +##------------------------------------------------------------------- + +## Long GC, don't monitor in production mode for: +## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +sysmon.long_gc = false + +## Long Schedule(ms) +sysmon.long_schedule = 240 + +## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. +sysmon.large_heap = 8MB + +## Busy Port +sysmon.busy_port = false + +## Busy Dist Port +sysmon.busy_dist_port = true + diff --git a/priv/emq.schema b/priv/emq.schema new file mode 100644 index 000000000..9829218a4 --- /dev/null +++ b/priv/emq.schema @@ -0,0 +1,758 @@ +%%-*- mode: erlang -*- +%% EMQ config mapping + +%%-------------------------------------------------------------------- +%% Erlang Node +%%-------------------------------------------------------------------- + +%% @doc Erlang node name +{mapping, "node.name", "vm_args.-name", [ + {default, "emqttd@127.0.0.1"} +]}. + +%% @doc Secret cookie for distributed erlang node +{mapping, "node.cookie", "vm_args.-setcookie", [ + {default, "emqsecretcookie"} +]}. + +%% @doc SMP Support +{mapping, "node.smp", "vm_args.-smp", [ + {default, auto}, + {datatype, {enum, [enable, auto, disable]}}, + hidden +]}. + +%% @doc Enable Kernel Poll +{mapping, "node.kernel_poll", "vm_args.+K", [ + {default, on}, + {datatype, flag}, + hidden +]}. + +%% @doc More information at: http://erlang.org/doc/man/erl.html +{mapping, "node.async_threads", "vm_args.+A", [ + {default, 64}, + {datatype, integer}, + {validators, ["range:0-1024"]} +]}. + +%% @doc Erlang Process Limit +{mapping, "node.process_limit", "vm_args.+P", [ + {datatype, integer}, + {default, 256000}, + hidden +]}. + +%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q +%% @doc The number of concurrent ports/sockets +%% Valid range is 1024-134217727 +{mapping, "node.max_ports", + cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [ + {default, 262144}, + {datatype, integer}, + {validators, ["range4ports"]} +]}. + +{validator, "range4ports", "must be 1024 to 134217727", + fun(X) -> X >= 1024 andalso X =< 134217727 end}. + +%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl +{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ + {datatype, bytesize}, + {commented, "32MB"}, + hidden, + {validators, ["zdbbl_range"]} +]}. + +{translation, "vm_args.+zdbbl", + fun(Conf) -> + ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined), + case ZDBBL of + undefined -> undefined; + X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; + _ -> undefined + end + end +}. + +{validator, "zdbbl_range", "must be between 1KB and 2097151KB", + fun(ZDBBL) -> + %% 2097151KB = 2147482624 + ZDBBL >= 1024 andalso ZDBBL =< 2147482624 + end +}. + +%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 +{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ + {default, 1000}, + {datatype, integer}, + hidden, + {validators, ["positive_integer"]} +]}. + +{validator, "positive_integer", "must be a positive integer", + fun(X) -> X >= 0 end}. + +%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, +%% R16+ uses +e +%% @doc The ETS table limit +{mapping, "node.max_ets_tables", + cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [ + {default, 256000}, + {datatype, integer}, + hidden +]}. + +%% @doc Set the location of crash dumps +{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ + {default, "{{crash_dump}}"}, + {datatype, file}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime +{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ + {commented, 60}, + {datatype, integer}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html +{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ + {commented, 6000}, + {datatype, integer}, + hidden +]}. + +%% @see node.dist_listen_min +{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ + {commented, 6999}, + {datatype, integer}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% Log +%%-------------------------------------------------------------------- + +{mapping, "log.console", "lager.handlers", [ + {default, file }, + {datatype, {enum, [off, file, console, both]}} +]}. + +{mapping, "log.console.level", "lager.handlers", [ + {default, info}, + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} +]}. + +{mapping, "log.console.file", "lager.handlers", [ + {default, "log/console.log"}, + {datatype, file} +]}. + +{mapping, "log.error.file", "lager.handlers", [ + {default, "log/error.log"}, + {datatype, file} +]}. + +{mapping, "log.error.redirect", "lager.error_logger_redirect", [ + {default, on}, + {datatype, flag}, + hidden +]}. + +{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ + {default, 1000}, + {datatype, integer}, + hidden +]}. + +{translation, + "lager.handlers", + fun(Conf) -> + ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of + undefined -> []; + ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, + {level, error}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}]}] + end, + + ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), + ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), + + ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, + ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, + {level, ConsoleLogLevel}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}]}, + + ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of + off -> []; + file -> [ConsoleFileHandler]; + console -> [ConsoleHandler]; + both -> [ConsoleHandler, ConsoleFileHandler]; + _ -> [] + end, + ConsoleHandlers ++ ErrorHandler + end +}. + +{mapping, "log.crash", "lager.crash_log", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "log.crash.file", "lager.crash_log", [ + {default, "log/crash.log"}, + {datatype, file} +]}. + +{translation, + "lager.crash_log", + fun(Conf) -> + case cuttlefish:conf_get("log.crash", Conf) of + false -> undefined; + _ -> + cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") + end + end}. + +{mapping, "sasl", "sasl.sasl_error_logger", [ + {default, off}, + {datatype, flag}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT Protocol +%%-------------------------------------------------------------------- + +%% @doc Set the Max ClientId Length Allowed. +{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [ + {default, 1024}, + {datatype, integer} +]}. + +%% @doc Max Packet Size Allowed, 64K by default. +{mapping, "mqtt.max_packet_size", "emqttd.protocol", [ + {default, "64KB"}, + {datatype, bytesize} +]}. + +%% @doc Client Idle Timeout. +{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [ + {default, 30}, + {datatype, integer} +]}. + +{translation, "emqttd.protocol", fun(Conf) -> + [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, + {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}, + {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}] +end}. + +%% @doc Allow Anonymous +{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [ + {default, false}, + {datatype, {enum, [true, false]}}, + hidden +]}. + +%% @doc Default ACL File +{mapping, "mqtt.acl_file", "emqttd.acl_file", [ + {datatype, string}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT Session +%%-------------------------------------------------------------------- + +%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. +%% 0 means no limit +{mapping, "mqtt.session.max_inflight", "emqttd.session", [ + {default, 100}, + {datatype, integer} +]}. + + +%% @doc Retry interval for redelivering QoS1/2 messages. +{mapping, "mqtt.session.retry_interval", "emqttd.session", [ + {default, 60}, + {datatype, integer} +]}. + +%% @doc Awaiting PUBREL Timeout +{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [ + {default, 30}, + {datatype, integer} +]}. + +%% @doc Max Packets that Awaiting PUBREL, 0 means no limit +{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Statistics Collection Interval(seconds) +{mapping, "mqtt.session.collect_interval", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Session expired after... +{mapping, "mqtt.session.expired_after", "emqttd.session", [ + {default, "2d"}, + {datatype, {duration, s}} +]}. + +{translation, "emqttd.session", fun(Conf) -> + [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, + {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, + {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, + {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, + {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)}, + {expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}] +end}. + +%%-------------------------------------------------------------------- +%% MQTT Queue +%%-------------------------------------------------------------------- + +%% @doc Type: simple | priority +{mapping, "mqtt.queue.type", "emqttd.queue", [ + {default, simple}, + {datatype, atom} +]}. + +%% @doc Topic Priority: 0~255, Default is 0 +{mapping, "mqtt.queue.priority", "emqttd.queue", [ + {default, ""}, + {datatype, string}, + hidden +]}. + +%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. +{mapping, "mqtt.queue.max_length", "emqttd.queue", [ + {default, infinity}, + {datatype, [atom, integer]} +]}. + +%% @doc Low-water mark of queued messages +{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [ + {default, "20%"}, + {datatype, string}, + hidden +]}. + +%% @doc High-water mark of queued messages +{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [ + {default, "60%"}, + {datatype, string}, + hidden +]}. + +%% @doc Queue Qos0 messages? +{mapping, "mqtt.queue.qos0", "emqttd.queue", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.queue", fun(Conf) -> + Parse = fun(S) -> + {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), + list_to_integer(N) / 100 + end, + Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)}, + {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)}, + {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))}, + {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))}, + {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}], + case cuttlefish:conf_get("mqtt.queue.priority", Conf) of + undefined -> Opts; + V -> [{priority, + [begin [T, P] = string:tokens(S, "="), + {T, list_to_integer(P)} + end || S <- string:tokens(V, ",")]}|Opts] + end +end}. + +%%-------------------------------------------------------------------- +%% MQTT Broker +%%-------------------------------------------------------------------- + +{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [ + {default, 60}, + {datatype, integer} +]}. + +%%-------------------------------------------------------------------- +%% MQTT PubSub +%%-------------------------------------------------------------------- + +{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [ + {default, true}, + {datatype, {enum, [true, false]}}, + hidden +]}. + +{translation, "emqttd.pubsub", fun(Conf) -> + [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)}, + {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)}, + {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] +end}. + +%%-------------------------------------------------------------------- +%% MQTT Bridge +%%-------------------------------------------------------------------- + +{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [ + {default, 10000}, + {datatype, integer} +]}. + +{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [ + {default, 1}, + {datatype, integer} +]}. + +{translation, "emqttd.bridge", fun(Conf) -> + [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)}, + {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}] +end}. + +%%------------------------------------------------------------------- +%% MQTT Plugins +%%------------------------------------------------------------------- + +{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [ + {datatype, string} +]}. + +{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% MQTT Listeners +%%-------------------------------------------------------------------- + +{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ + {default, 1883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [ + {default, undefined}, + {datatype, string}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ + {default, 512}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [ + {default, 15}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "mqtt.listener.http", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https", "emqttd.listeners", [ + {default, undefined}, + {datatype, [integer, ip]}, + hidden +]}. + +{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [ + {default, 15}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.failed_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.listeners", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + LisOpts = fun(Prefix) -> + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, + {rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}]) + end, + TcpOpts = fun(Prefix) -> + Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}]) + end, + SslOpts = fun(Prefix) -> + Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf)}, + {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, + {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, + {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, + {verify, cuttlefish:conf_get(Prefix ++ ".verify_peer", Conf, undefined)}, + {failed_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ "failed_if_no_peer_cert", Conf, undefined)}]) + end, + + Listeners = fun(Name) when is_atom(Name) -> + Key = "mqtt.listener." ++ atom_to_list(Name), + case cuttlefish:conf_get(Key, Conf, undefined) of + undefined -> + []; + Port -> + ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]), + Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)], + [{Name, Port, case Name =:= ssl orelse Name =:= https of + true -> [{ssl, SslOpts(Key)} | Opts]; + false -> Opts + end}] + end + end, + lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)]) +end}. + +%%-------------------------------------------------------------------- +%% MQTT Modules +%%-------------------------------------------------------------------- + +{mapping, "mqtt.module.retainer", "emqttd.modules", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.retainer.storage_type", "emqttd.modules", [ + {default, ram}, + {datatype, {enum, [disc, ram]}} +]}. + +{mapping, "mqtt.module.retainer.max_message_num", "emqttd.modules", [ + {default, 100000}, + {datatype, integer} +]}. + +{mapping, "mqtt.module.retainer.max_payload_size", "emqttd.modules", [ + {default, "64KB"}, + {datatype, bytesize} +]}. + +{mapping, "mqtt.module.retainer.expired_after", "emqttd.modules", [ + {default, 0}, + {datatype, integer} +]}. + +{mapping, "mqtt.module.presence", "emqttd.modules", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.presence.qos", "emqttd.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "mqtt.module.subscription", "emqttd.modules", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.subscription.topics", "emqttd.modules", [ + {default, undefined}, + {datatype, string} +]}. + +{translation, "emqttd.modules", fun(Conf) -> + WithMod = fun(Name, OptsF) -> + Key = "mqtt.module." ++ atom_to_list(Name), + case cuttlefish:conf_get(Key, Conf, false) of + true -> [{Name, OptsF(Key)}]; + false -> [] + end + end, + RetainOpts = fun(Prefix) -> + [{storage_type, cuttlefish:conf_get(Prefix ++ ".storage_type", Conf, ram)}, + {max_message_num, cuttlefish:conf_get(Prefix ++ ".max_message_num", Conf, undefined)}, + {max_payload_size, cuttlefish:conf_get(Prefix ++ ".max_payload_size", Conf, undefined)}, + {expired_after, cuttlefish:conf_get(Prefix ++ ".expired_after", Conf, 0)}] + end, + PresOpts = fun(Prefix) -> + [{qos, cuttlefish:conf_get(Prefix ++ ".qos", Conf, 0)}] + end, + ParseFun = fun(undefined) -> []; + (Topics) -> [begin + [Topic, Qos] = string:tokens(S, "="), + {list_to_binary(Topic), list_to_integer(Qos)} + end || S <- string:tokens(Topics, ",")] + end, + SubOpts = fun(Prefix) -> [{topics, ParseFun(cuttlefish:conf_get(Prefix ++ ".topics", Conf))}] end, + lists:append([WithMod(retainer, RetainOpts), WithMod(presence, PresOpts), WithMod(subscription, SubOpts)]) +end}. + +%%-------------------------------------------------------------------- +%% System Monitor +%%-------------------------------------------------------------------- + +%% @doc Long GC, don't monitor in production mode for: +%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +{mapping, "sysmon.long_gc", "emqttd.sysmon", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Long Schedule(ms) +{mapping, "sysmon.long_schedule", "emqttd.sysmon", [ + {default, 1000}, + {datatype, integer} +]}. + +%% @doc Large Heap +{mapping, "sysmon.large_heap", "emqttd.sysmon", [ + {default, "8MB"}, + {datatype, bytesize} +]}. + +%% @doc Monitor Busy Port +{mapping, "sysmon.busy_port", "emqttd.sysmon", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Monitor Busy Dist Port +{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.sysmon", fun(Conf) -> + [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, + {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, + {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, + {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, + {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] +end}. + diff --git a/rebar.config b/rebar.config index 74a744b0d..f30238fd7 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager",""}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger",""}},{gen_conf,".*",{git,"https://github.com/emqtt/gen_conf",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}} +{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{gen_logger,".*",{git,"https://github.com/emqtt/gen_logger",""}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq20"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb",""}},{getopt,".*",{git,"https://github.com/jcomellas/getopt","v0.8.2"}},{pbkdf2,".*",{git,"https://github.com/basho/erlang-pbkdf2","2.0.0"}},{clique,".*",{git,"https://github.com/basho/clique",""}},{time_compat,".*",{git,"https://github.com/lasp-lang/time_compat",""}},{rand_compat,".*",{git,"https://github.com/lasp-lang/rand_compat",""}} ]}. {erl_opts, [{parse_transform,lager_transform}]}. diff --git a/src/emqttd.app.src b/src/emqttd.app.src deleted file mode 100644 index 1a8dc1d15..000000000 --- a/src/emqttd.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqttd, - [ - {description, "Erlang MQTT Broker"}, - {vsn, "2.0"}, - {id, "emqttd"}, - {modules, []}, - {registered, []}, - {applications, [kernel, stdlib, gproc, esockd, mochiweb, - gen_logger, gen_conf]}, - {mod, {emqttd_app, []}}, - {env, []} - ]}. diff --git a/src/emqttd.erl b/src/emqttd.erl index 07464ee56..c360a5870 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -22,7 +22,7 @@ -include("emqttd_protocol.hrl"). --export([start/0, conf/1, conf/2, env/1, env/2, is_running/1]). +-export([start/0, env/1, env/2, is_running/1]). %% PubSub API -export([subscribe/1, subscribe/2, subscribe/3, publish/1, @@ -57,15 +57,8 @@ -spec(start() -> ok | {error, any()}). start() -> application:start(?APP). -%% @doc Get Config --spec(conf(Key :: atom()) -> any()). -conf(Key) -> emqttd_conf:value(Key). - --spec(conf(Key :: atom(), Default :: any()) -> any()). -conf(Key, Default) -> emqttd_conf:value(Key, Default). - %% @doc Environment --spec(env(Key:: atom()) -> any()). +-spec(env(Key:: atom()) -> {ok, any()} | undefined). env(Key) -> application:get_env(?APP, Key). %% @doc Get environment diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index fb36892c7..688e244a4 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -56,7 +56,10 @@ start_link() -> auth(Client, Password) when is_record(Client, mqtt_client) -> auth(Client, Password, lookup_mods(auth)). auth(_Client, _Password, []) -> - {error, "No auth module to check!"}; + case emqttd:env(allow_anonymous, false) of + true -> ok; + false -> {error, "No auth module to check!"} + end; auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> case catch Mod:check(Client, Password, State) of ok -> ok; @@ -73,7 +76,10 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> Topic :: binary()). check_acl(Client, PubSub, Topic) when ?PUBSUB(PubSub) -> case lookup_mods(acl) of - [] -> allow; + [] -> case emqttd:env(allow_anonymous, false) of + true -> allow; + false -> deny + end; AclMods -> check_acl(Client, PubSub, Topic, AclMods) end. check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) -> @@ -120,21 +126,13 @@ tab_key(acl) -> acl_modules. stop() -> gen_server:call(?MODULE, stop). %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server Callbacks %%-------------------------------------------------------------------- init([]) -> ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), - ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}), - ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(gen_conf:list(emqttd, acl))}), {ok, #state{}}. -init_mods(Mods) -> - [init_mod(mod_name(Type, Name), Opts) || {Type, Name, Opts} <- Mods]. - -init_mod(Mod, Opts) -> - {ok, State} = Mod:init(Opts), {Mod, State, 0}. - handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> Mods = lookup_mods(Type), Existed = lists:keyfind(Mod, 1, Mods), @@ -186,13 +184,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -mod_name(auth, Name) -> mod(emqttd_auth_, Name); - -mod_name(acl, Name) -> mod(emqttd_acl_, Name). - -mod(Prefix, Name) -> - list_to_atom(lists:concat([Prefix, Name])). - if_existed(false, Fun) -> Fun(); if_existed(_Mod, _Fun) -> {error, already_existed}. diff --git a/src/emqttd_acl_anonymous.erl b/src/emqttd_acl_anonymous.erl deleted file mode 100644 index ef80457fd..000000000 --- a/src/emqttd_acl_anonymous.erl +++ /dev/null @@ -1,35 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_acl_anonymous). - --behaviour(emqttd_acl_mod). - -%% ACL callbacks --export([init/1, check_acl/2, reload_acl/1, description/0]). - -init(Opts) -> - {ok, Opts}. - -check_acl(_Who, _State) -> - allow. - -reload_acl(_State) -> - ok. - -description() -> - "Anonymous ACL". - diff --git a/src/emqttd_acl_internal.erl b/src/emqttd_acl_internal.erl index 282fb77a4..610cf91a3 100644 --- a/src/emqttd_acl_internal.erl +++ b/src/emqttd_acl_internal.erl @@ -46,18 +46,12 @@ all_rules() -> %%-------------------------------------------------------------------- %% @doc Init internal ACL --spec(init(Opts :: list()) -> {ok, State :: any()}). -init(Opts) -> +-spec(init([File :: string()]) -> {ok, State :: any()}). +init([File]) -> ets:new(?ACL_RULE_TAB, [set, public, named_table, {read_concurrency, true}]), - case proplists:get_value(config, Opts) of - undefined -> - {ok, #state{}}; - File -> - Default = proplists:get_value(nomatch, Opts, allow), - State = #state{config = File, nomatch = Default}, - true = load_rules_from_file(State), - {ok, State} - end. + State = #state{config = File}, + true = load_rules_from_file(State), + {ok, State}. load_rules_from_file(#state{config = AclFile}) -> {ok, Terms} = file:consult(AclFile), @@ -118,7 +112,7 @@ reload_acl(#state{config = undefined}) -> reload_acl(State) -> case catch load_rules_from_file(State) of {'EXIT', Error} -> {error, Error}; - _ -> ok + true -> ok end. %% @doc ACL Module Description diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 2299a7ec0..bec092b52 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -42,11 +42,11 @@ Reason :: term()). start(_StartType, _StartArgs) -> print_banner(), - emqttd_conf:init(), emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), emqttd_cli:load(), + register_acl_mod(), load_all_mods(), emqttd_plugins:init(), emqttd_plugins:load(), @@ -141,15 +141,25 @@ worker_spec(Module, Opts) when is_atom(Module) -> worker_spec(M, F, A) -> {M, {M, F, A}, permanent, 10000, worker, [M]}. +%%-------------------------------------------------------------------- +%% Register default ACL File +%%-------------------------------------------------------------------- + +register_acl_mod() -> + case emqttd:env(acl_file) of + {ok, File} -> emqttd_access_control:register_mod(acl, emqttd_acl_internal, [File]); + undefined -> ok + end. + %%-------------------------------------------------------------------- %% Load Modules %%-------------------------------------------------------------------- -%% @doc load all modules +%% @doc Load all modules load_all_mods() -> - lists:foreach(fun load_mod/1, gen_conf:list(emqttd, module)). + lists:foreach(fun load_mod/1, emqttd:env(modules, [])). -load_mod({module, Name, Opts}) -> +load_mod({Name, Opts}) -> Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), case catch Mod:load(Opts) of ok -> lager:info("Load module ~s successfully", [Name]); @@ -159,7 +169,7 @@ load_mod({module, Name, Opts}) -> %% @doc Is module enabled? -spec(is_mod_enabled(Name :: atom()) -> boolean()). -is_mod_enabled(Name) -> lists:keyfind(Name, 2, gen_conf:list(emqttd, module)). +is_mod_enabled(Name) -> lists:keyfind(Name, 1, emqttd:env(modules, [])). %%-------------------------------------------------------------------- %% Start Listeners @@ -167,27 +177,28 @@ is_mod_enabled(Name) -> lists:keyfind(Name, 2, gen_conf:list(emqttd, module)). %% @doc Start Listeners of the broker. -spec(start_listeners() -> any()). -start_listeners() -> lists:foreach(fun start_listener/1, gen_conf:list(emqttd, listener)). +start_listeners() -> lists:foreach(fun start_listener/1, emqttd:env(listeners, [])). %% Start mqtt listener -spec(start_listener(listener()) -> any()). -start_listener({listener, mqtt, ListenOn, Opts}) -> - start_listener(mqtt, ListenOn, Opts); +start_listener({tcp, ListenOn, Opts}) -> + start_listener('mqtt:tcp', ListenOn, Opts); %% Start mqtt(SSL) listener -start_listener({listener, mqtts, ListenOn, Opts}) -> - start_listener(mqtts, ListenOn, Opts); +start_listener({ssl, ListenOn, Opts}) -> + start_listener('mqtt:ssl', ListenOn, Opts); %% Start http listener -start_listener({listener, http, ListenOn, Opts}) -> - mochiweb:start_http(http, ListenOn, Opts, {emqttd_http, handle_request, []}); +start_listener({http, ListenOn, Opts}) -> + mochiweb:start_http('mqtt:http', ListenOn, Opts, {emqttd_http, handle_request, []}); %% Start https listener -start_listener({listener, https, ListenOn, Opts}) -> - mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}). +start_listener({https, ListenOn, Opts}) -> + mochiweb:start_http('mqtt:https', ListenOn, Opts, {emqttd_http, handle_request, []}). start_listener(Protocol, ListenOn, Opts) -> - MFArgs = {emqttd_client, start_link, [emqttd_conf:mqtt()]}, + {ok, Env} = emqttd:env(protocol), + MFArgs = {emqttd_client, start_link, [Env]}, {ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs). merge_sockopts(Options) -> @@ -200,9 +211,11 @@ merge_sockopts(Options) -> %%-------------------------------------------------------------------- %% @doc Stop Listeners -stop_listeners() -> lists:foreach(fun stop_listener/1, gen_conf:list(listener)). +stop_listeners() -> lists:foreach(fun stop_listener/1, emqttd:env(listeners, [])). %% @private +stop_listener({listener, tcp, ListenOn, _Opts}) -> esockd:close('mqtt/tcp', ListenOn); +stop_listener({listener, ssl, ListenOn, _Opts}) -> esockd:close('mqtt/ssl', ListenOn); stop_listener({listener, Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn). -ifdef(TEST). diff --git a/src/emqttd_auth_anonymous.erl b/src/emqttd_auth_anonymous.erl deleted file mode 100644 index 8acdb7bf0..000000000 --- a/src/emqttd_auth_anonymous.erl +++ /dev/null @@ -1,29 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc Anonymous Authentication Module --module(emqttd_auth_anonymous). - --behaviour(emqttd_auth_mod). - --export([init/1, check/3, description/0]). - -init(Opts) -> {ok, Opts}. - -check(_Client, _Password, _Opts) -> ok. - -description() -> "Anonymous Authentication Module". - diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl deleted file mode 100644 index 15a751ea8..000000000 --- a/src/emqttd_auth_clientid.erl +++ /dev/null @@ -1,123 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_auth_clientid). - --include("emqttd.hrl"). - --export([add_clientid/1, add_clientid/2, lookup_clientid/1, remove_clientid/1, - all_clientids/0]). - --behaviour(emqttd_auth_mod). - -%% emqttd_auth_mod callbacks --export([init/1, check/3, description/0]). - --define(AUTH_CLIENTID_TAB, mqtt_auth_clientid). - --record(?AUTH_CLIENTID_TAB, {client_id, ipaddr, password}). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% @doc Add clientid --spec(add_clientid(binary()) -> {atomic, ok} | {aborted, any()}). -add_clientid(ClientId) when is_binary(ClientId) -> - R = #mqtt_auth_clientid{client_id = ClientId}, - mnesia:transaction(fun mnesia:write/1, [R]). - -%% @doc Add clientid with password --spec(add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}). -add_clientid(ClientId, Password) -> - R = #mqtt_auth_clientid{client_id = ClientId, password = Password}, - mnesia:transaction(fun mnesia:write/1, [R]). - -%% @doc Lookup clientid --spec(lookup_clientid(binary()) -> list(#mqtt_auth_clientid{})). -lookup_clientid(ClientId) -> - mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId). - -%% @doc Lookup all clientids --spec(all_clientids() -> list(binary())). -all_clientids() -> mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB). - -%% @doc Remove clientid --spec(remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}). -remove_clientid(ClientId) -> - mnesia:transaction(fun mnesia:delete/1, [{?AUTH_CLIENTID_TAB, ClientId}]). - -%%-------------------------------------------------------------------- -%% emqttd_auth_mod callbacks -%%-------------------------------------------------------------------- - -init(Opts) -> - mnesia:create_table(?AUTH_CLIENTID_TAB, [ - {ram_copies, [node()]}, - {attributes, record_info(fields, ?AUTH_CLIENTID_TAB)}]), - mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies), - Clients = load_client_from(proplists:get_value(config, Opts)), - mnesia:transaction(fun() -> [mnesia:write(C) || C<- Clients] end), - {ok, Opts}. - -check(#mqtt_client{client_id = undefined}, _Password, _Opts) -> - {error, clientid_undefined}; -check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, []) -> - check_clientid_only(ClientId, IpAddress); -check(#mqtt_client{client_id = ClientId, peername = {IpAddress, _}}, _Password, [{password, no}|_]) -> - check_clientid_only(ClientId, IpAddress); -check(_Client, undefined, [{password, yes}|_]) -> - {error, password_undefined}; -check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) -> - case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of - [] -> {error, clientid_not_found}; - [#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext?? - _ -> {error, password_error} - end. - -description() -> "ClientId authentication module". - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -load_client_from(undefined) -> - ok; - -load_client_from(File) -> - {ok, Clients} = file:consult(File), - [client(Client) || Client <- Clients]. - -client(ClientId) when is_list(ClientId) -> - #mqtt_auth_clientid{client_id = list_to_binary(ClientId)}; - -client({ClientId, IpAddr}) when is_list(ClientId) -> - #mqtt_auth_clientid{client_id = iolist_to_binary(ClientId), - ipaddr = esockd_cidr:parse(IpAddr, true)}. - -check_clientid_only(ClientId, IpAddr) -> - case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of - [] -> - {error, clientid_not_found}; - [#?AUTH_CLIENTID_TAB{ipaddr = undefined}] -> - ok; - [#?AUTH_CLIENTID_TAB{ipaddr = CIDR}] -> - case esockd_cidr:match(IpAddr, CIDR) of - true -> ok; - false -> {error, wrong_ipaddr} - end - end. - diff --git a/src/emqttd_auth_username.erl b/src/emqttd_auth_username.erl deleted file mode 100644 index 545bce7c3..000000000 --- a/src/emqttd_auth_username.erl +++ /dev/null @@ -1,164 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc Authentication with username and password --module(emqttd_auth_username). - --include("emqttd.hrl"). - --include("emqttd_cli.hrl"). - -%% CLI callbacks --export([cli/1]). - --behaviour(emqttd_auth_mod). - --export([is_enabled/0]). - --export([add_user/2, remove_user/1, lookup_user/1, all_users/0]). - -%% emqttd_auth callbacks --export([init/1, check/3, description/0]). - --define(AUTH_USERNAME_TAB, mqtt_auth_username). - --record(?AUTH_USERNAME_TAB, {username, password}). - -%%-------------------------------------------------------------------- -%% CLI -%%-------------------------------------------------------------------- -cli(["list"]) -> - if_enabled(fun() -> - Usernames = mnesia:dirty_all_keys(?AUTH_USERNAME_TAB), - [?PRINT("~s~n", [Username]) || Username <- Usernames] - end); - -cli(["add", Username, Password]) -> - if_enabled(fun() -> - ?PRINT("~p~n", [add_user(iolist_to_binary(Username), iolist_to_binary(Password))]) - end); - -cli(["del", Username]) -> - if_enabled(fun() -> - ?PRINT("~p~n", [remove_user(iolist_to_binary(Username))]) - end); - -cli(_) -> - ?USAGE([{"users list", "List users"}, - {"users add ", "Add User"}, - {"users del ", "Delete User"}]). - -if_enabled(Fun) -> - case is_enabled() of - true -> Fun(); - false -> hint() - end. - -hint() -> - ?PRINT_MSG("Please enable '{auth, username, []}' in etc/emqttd.conf first.~n"). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -is_enabled() -> - lists:member(?AUTH_USERNAME_TAB, mnesia:system_info(tables)). - -%% @doc Add User --spec(add_user(binary(), binary()) -> ok | {error, any()}). -add_user(Username, Password) -> - User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)}, - ret(mnesia:transaction(fun insert_user/1, [User])). - -insert_user(User = #?AUTH_USERNAME_TAB{username = Username}) -> - case mnesia:read(?AUTH_USERNAME_TAB, Username) of - [] -> mnesia:write(User); - [_|_] -> mnesia:abort(existed) - end. - -add_default_user(Username, Password) when is_atom(Username) -> - add_default_user(atom_to_list(Username), Password); - -add_default_user(Username, Password) -> - add_user(iolist_to_binary(Username), iolist_to_binary(Password)). - -%% @doc Lookup user by username --spec(lookup_user(binary()) -> list()). -lookup_user(Username) -> - mnesia:dirty_read(?AUTH_USERNAME_TAB, Username). - -%% @doc Remove user --spec(remove_user(binary()) -> ok | {error, any()}). -remove_user(Username) -> - ret(mnesia:transaction(fun mnesia:delete/1, [{?AUTH_USERNAME_TAB, Username}])). - -ret({atomic, ok}) -> ok; -ret({aborted, Error}) -> {error, Error}. - -%% @doc All usernames --spec(all_users() -> list()). -all_users() -> mnesia:dirty_all_keys(?AUTH_USERNAME_TAB). - -%%-------------------------------------------------------------------- -%% emqttd_auth_mod callbacks -%%-------------------------------------------------------------------- - -init(Opts) -> - mnesia:create_table(?AUTH_USERNAME_TAB, [ - {disc_copies, [node()]}, - {attributes, record_info(fields, ?AUTH_USERNAME_TAB)}]), - mnesia:add_table_copy(?AUTH_USERNAME_TAB, node(), disc_copies), - case proplists:get_value(passwd, Opts) of - undefined -> ok; - File -> {ok, DefaultUsers} = file:consult(File), - lists:foreach(fun({Username, Password}) -> - add_default_user(Username, Password) - end, DefaultUsers) - end, - emqttd_ctl:register_cmd(users, {?MODULE, cli}, []), - {ok, Opts}. - -check(#mqtt_client{username = undefined}, _Password, _Opts) -> - {error, username_undefined}; -check(_User, undefined, _Opts) -> - {error, password_undefined}; -check(#mqtt_client{username = Username}, Password, _Opts) -> - case mnesia:dirty_read(?AUTH_USERNAME_TAB, Username) of - [] -> - {error, username_not_found}; - [#?AUTH_USERNAME_TAB{password = <>}] -> - case Hash =:= md5_hash(Salt, Password) of - true -> ok; - false -> {error, password_error} - end - end. - -description() -> - "Username password authentication module". - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -hash(Password) -> - SaltBin = salt(), <>. - -md5_hash(SaltBin, Password) -> - erlang:md5(<>). - -salt() -> - emqttd_time:seed(), Salt = rand:uniform(16#ffffffff), <>. - diff --git a/src/emqttd_bridge_sup_sup.erl b/src/emqttd_bridge_sup_sup.erl index 82f7af3b0..109f94764 100644 --- a/src/emqttd_bridge_sup_sup.erl +++ b/src/emqttd_bridge_sup_sup.erl @@ -46,7 +46,8 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> start_bridge(Node, _Topic, _Options) when Node =:= node() -> {error, bridge_to_self}; start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> - Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options), + {ok, BridgeEnv} = emqttd:env(bridge), + Options1 = emqttd_opts:merge(BridgeEnv, Options), supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). %% @doc Stop a bridge diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 2d53a1328..409fe5c8b 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -95,7 +95,7 @@ datetime() -> %% @doc Start a tick timer start_tick(Msg) -> - start_tick(timer:seconds(emqttd:conf(broker_sys_interval, 60)), Msg). + start_tick(timer:seconds(emqttd:env(broker_sys_interval, 60)), Msg). start_tick(0, _Msg) -> undefined; diff --git a/src/emqttd_conf.erl b/src/emqttd_conf.erl deleted file mode 100644 index b3677d6b4..000000000 --- a/src/emqttd_conf.erl +++ /dev/null @@ -1,112 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_conf). - --export([init/0]). - --export([mqtt/0, session/0, queue/0, bridge/0, pubsub/0]). - --export([value/1, value/2, list/1]). - --define(APP, emqttd). - -init() -> gen_conf:init(?APP). - -mqtt() -> - with_env(mqtt_protocol, [ - %% Max ClientId Length Allowed. - {max_clientid_len, value(mqtt_max_clientid_len, 512)}, - %% Max Packet Size Allowed, 64K by default. - {max_packet_size, value(mqtt_max_packet_size, 65536)}, - %% Client Idle Timeout. - {client_idle_timeout, value(mqtt_client_idle_timeout, 30)} - ]). - -session() -> - with_env(mqtt_session, [ - %% Max number of QoS 1 and 2 messages that can be “inflight” at one time. - %% 0 means no limit - {max_inflight, value(session_max_inflight, 100)}, - - %% Retry interval for redelivering QoS1/2 messages. - {unack_retry_interval, value(session_unack_retry_interval, 60)}, - - %% Awaiting PUBREL Timeout - {await_rel_timeout, value(session_await_rel_timeout, 20)}, - - %% Max Packets that Awaiting PUBREL, 0 means no limit - {max_awaiting_rel, value(session_max_awaiting_rel, 0)}, - - %% Statistics Collection Interval(seconds) - {collect_interval, value(session_collect_interval, 0)}, - - %% Expired after 2 day (unit: minute) - {expired_after, value(session_expired_after, 2880)} - ]). - -queue() -> - with_env(mqtt_queue, [ - %% Type: simple | priority - {type, value(queue_type, simple)}, - - %% Topic Priority: 0~255, Default is 0 - {priority, value(queue_priority, [])}, - - %% Max queue length. Enqueued messages when persistent client disconnected, - %% or inflight window is full. - {max_length, value(queue_max_length, infinity)}, - - %% Low-water mark of queued messages - {low_watermark, value(queue_low_watermark, 0.2)}, - - %% High-water mark of queued messages - {high_watermark, value(queue_high_watermark, 0.6)}, - - %% Queue Qos0 messages? - {queue_qos0, value(queue_qos0, true)} - ]). - -bridge() -> - with_env(mqtt_bridge, [ - {max_queue_len, value(bridge_max_queue_len, 10000)}, - - %% Ping Interval of bridge node - {ping_down_interval, value(bridge_ping_down_interval, 1)} - ]). - -pubsub() -> - with_env(mqtt_pubsub, [ - %% PubSub and Router. Default should be scheduler numbers. - {pool_size, value(pubsub_pool_size, 8)} - ]). - -value(Key) -> - with_env(Key, gen_conf:value(?APP, Key)). - -value(Key, Default) -> - with_env(Key, gen_conf:value(?APP, Key, Default)). - -with_env(Key, Conf) -> - case application:get_env(?APP, Key) of - undefined -> - application:set_env(?APP, Key, Conf), Conf; - {ok, Val} -> - Val - end. - -list(Key) -> gen_conf:list(?APP, Key). - diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl deleted file mode 100644 index df44ab96f..000000000 --- a/src/emqttd_mod_rewrite.erl +++ /dev/null @@ -1,115 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_mod_rewrite). - --behaviour(emqttd_gen_mod). - --include("emqttd.hrl"). - --export([load/1, reload/1, unload/1]). - --export([rewrite_subscribe/4, rewrite_unsubscribe/4, rewrite_publish/2]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -load(Opts) -> - case proplists:get_value(config, Opts) of - undefined -> - ok; - File -> - {ok, Terms} = file:consult(File), Sections = compile(Terms), - emqttd:hook('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Sections]), - emqttd:hook('client.unsubscribe', fun ?MODULE:rewrite_unsubscribe/4, [Sections]), - emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections]) - end. - -rewrite_subscribe(_ClientId, _Username, TopicTable, Sections) -> - lager:info("Rewrite subscribe: ~p", [TopicTable]), - {ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}. - -rewrite_unsubscribe(_ClientId, _Username, TopicTable, Sections) -> - lager:info("Rewrite unsubscribe: ~p", [TopicTable]), - {ok, [{match_topic(Topic, Sections), Opts} || {Topic, Opts} <- TopicTable]}. - -rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) -> - %%TODO: this will not work if the client is always online. - RewriteTopic = - case get({rewrite, Topic}) of - undefined -> - DestTopic = match_topic(Topic, Sections), - put({rewrite, Topic}, DestTopic), DestTopic; - DestTopic -> - DestTopic - end, - {ok, Message#mqtt_message{topic = RewriteTopic}}. - -reload(File) -> - %%TODO: The unload api is not right... - case emqttd_app:is_mod_enabled(rewrite) of - true -> - unload(state), - load([{file, File}]); - false -> - {error, module_unloaded} - end. - -unload(_) -> - emqttd:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4), - emqttd:unhook('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4), - emqttd:unhook('message.publish', fun ?MODULE:rewrite_publish/2). - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -compile(Sections) -> - C = fun({rewrite, Re, Dest}) -> - {ok, MP} = re:compile(Re), - {rewrite, MP, Dest} - end, - F = fun({topic, Topic, Rules}) -> - {topic, list_to_binary(Topic), [C(R) || R <- Rules]} - end, - [F(Section) || Section <- Sections]. - -match_topic(Topic, []) -> - Topic; -match_topic(Topic, [{topic, Filter, Rules} | Sections]) -> - case emqttd_topic:match(Topic, Filter) of - true -> - match_rule(Topic, Rules); - false -> - match_topic(Topic, Sections) - end. - -match_rule(Topic, []) -> - Topic; -match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> - case re:run(Topic, MP, [{capture, all_but_first, list}]) of - {match, Captured} -> - Vars = lists:zip(["\\$" ++ integer_to_list(I) - || I <- lists:seq(1, length(Captured))], Captured), - iolist_to_binary(lists:foldl( - fun({Var, Val}, Acc) -> - re:replace(Acc, Var, Val, [global]) - end, Dest, Vars)); - nomatch -> - match_rule(Topic, Rules) - end. - diff --git a/src/emqttd_plugins.erl b/src/emqttd_plugins.erl index 384a595de..0657b7799 100644 --- a/src/emqttd_plugins.erl +++ b/src/emqttd_plugins.erl @@ -26,23 +26,28 @@ -export([list/0]). +%% @doc Init plugins' config +-spec(init() -> ok). init() -> - case emqttd:conf(plugins_etc_dir) of + case emqttd:env(plugins_etc_dir) of {ok, PluginsEtc} -> - CfgFiles = filelib:wildcard("*.conf", PluginsEtc), - lists:foreach(fun(CfgFile) -> - App = app_name(CfgFile), - application:set_env(App, conf, filename:join(PluginsEtc, CfgFile)), - gen_conf:init(App) - end, CfgFiles); + CfgFiles = [filename:join(PluginsEtc, File) || + File <- filelib:wildcard("*.config", PluginsEtc)], + lists:foreach(fun init_config/1, CfgFiles); undefined -> ok end. +init_config(CfgFile) -> + {ok, [AppsEnv]} = file:consult(CfgFile), + lists:foreach(fun({AppName, Envs}) -> + [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs] + end, AppsEnv). + %% @doc Load all plugins when the broker started. -spec(load() -> list() | {error, any()}). load() -> - case emqttd:conf(plugins_loaded_file) of + case emqttd:env(plugins_loaded_file) of {ok, File} -> ensure_file(File), with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end); @@ -75,7 +80,7 @@ load_plugins(Names, Persistent) -> %% @doc Unload all plugins before broker stopped. -spec(unload() -> list() | {error, any()}). unload() -> - case emqttd:conf(plugins_loaded_file) of + case emqttd:env(plugins_loaded_file) of {ok, File} -> with_loaded_file(File, fun stop_plugins/1); undefined -> @@ -89,9 +94,9 @@ stop_plugins(Names) -> %% @doc List all available plugins -spec(list() -> [mqtt_plugin()]). list() -> - case emqttd:conf(plugins_etc_dir) of - {ok, PluginsEtc} -> - CfgFiles = filelib:wildcard("*.conf", PluginsEtc), + case emqttd:env(plugins_etc_dir) of + {ok, PluginsEtc} -> + CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc), Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles], StartedApps = names(started_app), lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> @@ -244,7 +249,7 @@ plugin_unloaded(Name, true) -> end. read_loaded() -> - case emqttd:conf(plugins_loaded_file) of + case emqttd:env(plugins_loaded_file) of {ok, File} -> read_loaded(File); undefined -> {error, not_found} end. @@ -252,7 +257,7 @@ read_loaded() -> read_loaded(File) -> file:consult(File). write_loaded(AppNames) -> - {ok, File} = emqttd:conf(plugins_loaded_file), + {ok, File} = emqttd:env(plugins_loaded_file), case file:open(File, [binary, write]) of {ok, Fd} -> lists:foreach(fun(Name) -> @@ -262,3 +267,4 @@ write_loaded(AppNames) -> lager:error("Open File ~p Error: ~p", [File, Error]), {error, Error} end. + diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index f3579d46b..038d30b91 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -448,7 +448,7 @@ authenticate(Client, Password) -> %% PUBLISH ACL is cached in process dictionary. check_acl(publish, Topic, Client) -> - IfCache = emqttd:conf(cache_acl, true), + IfCache = emqttd:env(cache_acl, true), case {IfCache, get({acl, publish, Topic})} of {true, undefined} -> AllowDeny = emqttd_access_control:check_acl(Client, publish, Topic), diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index e57eee517..988dc6553 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -32,7 +32,7 @@ %%-------------------------------------------------------------------- start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]). + supervisor:start_link({local, ?MODULE}, ?MODULE, []). pubsub_pool() -> hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). @@ -41,10 +41,10 @@ pubsub_pool() -> %% Supervisor Callbacks %%-------------------------------------------------------------------- -init([Env]) -> +init([]) -> + {ok, Env} = emqttd:env(pubsub), %% Create ETS Tables [create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]], - {ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }. %%-------------------------------------------------------------------- diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index b6276ab85..f53f363a4 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -214,8 +214,9 @@ unsubscribe(SessPid, TopicTable) -> init([CleanSess, {ClientId, Username}, ClientPid]) -> process_flag(trap_exit, true), - true = link(ClientPid), - SessEnv = emqttd_conf:session(), + true = link(ClientPid), + {ok, QEnv} = emqttd:env(queue), + {ok, SessEnv} = emqttd:env(session), Session = #session{ clean_sess = CleanSess, client_id = ClientId, @@ -224,14 +225,14 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> subscriptions = #{}, inflight_queue = [], max_inflight = get_value(max_inflight, SessEnv, 0), - message_queue = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()), + message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), awaiting_rel = #{}, awaiting_ack = #{}, awaiting_comp = #{}, - retry_interval = get_value(unack_retry_interval, SessEnv), + retry_interval = get_value(retry_interval, SessEnv), await_rel_timeout = get_value(await_rel_timeout, SessEnv), max_awaiting_rel = get_value(max_awaiting_rel, SessEnv), - expired_after = get_value(expired_after, SessEnv) * 60, + expired_after = get_value(expired_after, SessEnv), collect_interval = get_value(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)), diff --git a/src/emqttd_sysmon_sup.erl b/src/emqttd_sysmon_sup.erl index 3ed8e36a5..9ba73b41e 100644 --- a/src/emqttd_sysmon_sup.erl +++ b/src/emqttd_sysmon_sup.erl @@ -28,15 +28,8 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Sysmon = {sysmon, {emqttd_sysmon, start_link, [opts()]}, + {ok, Env} = emqttd:env(sysmon), + Sysmon = {sysmon, {emqttd_sysmon, start_link, [Env]}, permanent, 5000, worker, [emqttd_sysmon]}, {ok, {{one_for_one, 10, 100}, [Sysmon]}}. -opts() -> - Opts = [{long_gc, emqttd:conf(sysmon_long_gc)}, - {long_schedule, emqttd:conf(sysmon_long_schedule)}, - {large_heap, emqttd:conf(sysmon_large_heap)}, - {busy_port, emqttd:conf(busy_port)}, - {busy_dist_port, emqttd:conf(sysmon_busy_dist_port)}], - [{Key, Val} || {Key, {ok, Val}} <- Opts]. - diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index 56a4d92d9..58cb6dc43 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -31,7 +31,7 @@ %% @doc Handle WebSocket Request. handle_request(Req) -> Peer = Req:get(peer), - PktOpts = emqttd_conf:mqtt(), + {ok, PktOpts} = emqttd:env(protocol), ParserFun = emqttd_parser:new(PktOpts), {ReentryWs, ReplyChannel} = upgrade(Req), {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), diff --git a/src/emqttd_ws_client_sup.erl b/src/emqttd_ws_client_sup.erl index 33983fd8c..3af715337 100644 --- a/src/emqttd_ws_client_sup.erl +++ b/src/emqttd_ws_client_sup.erl @@ -27,7 +27,7 @@ %% @doc Start websocket client supervisor -spec(start_link() -> {ok, pid()}). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:mqtt()]). + supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc Start a WebSocket Client -spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}). @@ -37,8 +37,8 @@ start_client(WsPid, Req, ReplyChannel) -> %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- - -init([Env]) -> +init([]) -> + {ok, Env} = emqttd:env(protocol), {ok, {{simple_one_for_one, 0, 1}, [{ws_client, {emqttd_ws_client, start_link, [Env]}, temporary, 5000, worker, [emqttd_ws_client]}]}}. diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index bd6a32edc..aeed5051f 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -101,8 +101,8 @@ groups() -> init_per_suite(Config) -> application:start(lager), DataDir = proplists:get_value(data_dir, Config), - application:set_env(emqttd, conf, filename:join([DataDir, "emqttd.conf"])), - application:ensure_all_started(emqttd), + peg_com(DataDir), + start_apps(emqttd, DataDir), Config. end_per_suite(_Config) -> @@ -403,7 +403,7 @@ auth_header_(User, Pass) -> websocket_test(_) -> Conn = esockd_connection:new(esockd_transport, nil, []), - Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1}, + Req = mochiweb_request:new(Conn, 'GET', "/mqtt", {1, 1}, mochiweb_headers:make([{"Sec-WebSocket-Key","Xn3fdKyc3qEXPuj2A3O+ZA=="}])), ct:log("Req:~p", [Req]), @@ -593,4 +593,31 @@ slave(node, Node) -> {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), N. +start_apps(App, DataDir) -> + Schema = cuttlefish_schema:files([filename:join([DataDir, atom_to_list(App) ++ ".schema"])]), + Conf = conf_parse:file(filename:join([DataDir, atom_to_list(App) ++ ".conf"])), + NewConfig = cuttlefish_generator:map(Schema, Conf), + Vals = proplists:get_value(App, NewConfig), + [application:set_env(App, Par, Value) || {Par, Value} <- Vals], + application:ensure_all_started(App). + +peg_com(DataDir) -> + ParsePeg = file2(3, DataDir, "conf_parse.peg"), + neotoma:file(ParsePeg), + ParseErl = file2(3, DataDir, "conf_parse.erl"), + compile:file(ParseErl, []), + + DurationPeg = file2(3, DataDir, "cuttlefish_duration_parse.peg"), + neotoma:file(DurationPeg), + DurationErl = file2(3, DataDir, "cuttlefish_duration_parse.erl"), + compile:file(DurationErl, []). + + +file2(Times, Dir, FileName) when Times < 1 -> + filename:join([Dir, "deps", "cuttlefish","src", FileName]); + +file2(Times, Dir, FileName) -> + Dir1 = filename:dirname(Dir), + file2(Times - 1, Dir1, FileName). + diff --git a/test/emqttd_SUITE_data/emqttd.conf b/test/emqttd_SUITE_data/emqttd.conf index 68546ed89..4710ef909 100644 --- a/test/emqttd_SUITE_data/emqttd.conf +++ b/test/emqttd_SUITE_data/emqttd.conf @@ -1,270 +1,280 @@ -%%=================================================================== -%% -%% Config file for emqttd 2.0 -%% -%% Erlang Term Syntax: -%% -%% {}: Tuple, usually {Key, Value} -%% []: List, seperated by comma -%% %%: Comment -%% -%%=================================================================== +##-------------------------------------------------------------------- +## Node Args +##-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% MQTT Protocol -%%-------------------------------------------------------------------- +## Node name +node.name = emqttd@127.0.0.1 -%% Max ClientId Length Allowed. -{mqtt_max_clientid_len, 512}. +## Cookie for distributed node +node.cookie = emq_dist_cookie -%% Max Packet Size Allowed, 64K by default. -{mqtt_max_packet_size, 65536}. +## SMP support: enable, auto, disable +node.smp = auto -%% Client Idle Timeout. -{mqtt_client_idle_timeout, 30}. % Second +## Enable kernel poll +node.kernel_poll = on -%%-------------------------------------------------------------------- -%% Authentication -%%-------------------------------------------------------------------- +## async thread pool +node.async_threads = 32 -%% Anonymous: Allow all -{auth, anonymous, []}. +## Erlang Process Limit +node.process_limit = 256000 -%% Authentication with username, password -{auth, username, []}. +## Sets the maximum number of simultaneously existing ports for this system +node.max_ports = 65536 -%% Authentication with clientId -{auth, clientid, [{password, no}]}. +## Set the distribution buffer busy limit (dist_buf_busy_limit) +node.dist_buffer_size = 32MB -%%-------------------------------------------------------------------- -%% ACL -%%-------------------------------------------------------------------- +## Max ETS Tables. +## Note that mnesia and SSL will create temporary ets tables. +node.max_ets_tables = 256000 -{acl, anonymous, []}. +## Tweak GC to run more often +node.fullsweep_after = 1000 -{acl, internal, [{nomatch, allow}]}. +## Crash dump +node.crash_dump = log/crash.dump -%% Cache ACL result for PUBLISH -{cache_acl, true}. +## Distributed node ticktime +node.dist_net_ticktime = 60 -%%-------------------------------------------------------------------- -%% Broker -%%-------------------------------------------------------------------- +## Distributed node port range +## node.dist_listen_min = 6000 +## node.dist_listen_max = 6999 -%% System interval of publishing broker $SYS messages -{broker_sys_interval, 60}. +##-------------------------------------------------------------------- +## Log +##-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% Session -%%-------------------------------------------------------------------- +## Console log. Enum: off, file, console, both +log.console = console -%% Max number of QoS 1 and 2 messages that can be “inflight” at one time. -%% 0 means no limit -{session_max_inflight, 100}. +## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency +log.console.level = error -%% Retry interval for redelivering QoS1/2 messages. -{session_unack_retry_interval, 60}. +## Console log file +## log.console.file = log/console.log -%% Awaiting PUBREL Timeout -{session_await_rel_timeout, 20}. +## Error log file +log.error.file = log/error.log -%% Max Packets that Awaiting PUBREL, 0 means no limit -{session_max_awaiting_rel, 0}. +## Enable the crash log. Enum: on, off +log.crash = on -%% Statistics Collection Interval(seconds) -{session_collect_interval, 0}. +log.crash.file = log/crash.log -%% Expired after 2 day (unit: minute) -{session_expired_after, 2880}. +##-------------------------------------------------------------------- +## MQTT Protocol +##-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% Queue -%%-------------------------------------------------------------------- +## Max ClientId Length Allowed. +mqtt.max_clientid_len = 1024 -%% Type: simple | priority -{queue_type, simple}. +## Max Packet Size Allowed, 64K by default. +mqtt.max_packet_size = 64KB -%% Topic Priority: 0~255, Default is 0 -%% {queue_priority, [{"topic/1", 10}, {"topic/2", 8}]}. +## Client Idle Timeout (Second) +mqtt.client_idle_timeout = 30 -%% Max queue length. Enqueued messages when persistent client disconnected, -%% or inflight window is full. -{queue_max_length, infinity}. +## Allow Anonymous authentication +mqtt.allow_anonymous = true -%% Low-water mark of queued messages -{queue_low_watermark, 0.2}. +##-------------------------------------------------------------------- +## MQTT Session +##-------------------------------------------------------------------- -%% High-water mark of queued messages -{queue_high_watermark, 0.6}. +## Max number of QoS 1 and 2 messages that can be “inflight” at one time. +## 0 means no limit +mqtt.session.max_inflight = 100 -%% Queue Qos0 messages? -{queue_qos0, true}. +## Retry interval for redelivering QoS1/2 messages. +mqtt.session.retry_interval = 60 -%%-------------------------------------------------------------------- -%% Zone -%%-------------------------------------------------------------------- +## Awaiting PUBREL Timeout +mqtt.session.await_rel_timeout = 20 -{zone, admin, []}. +## Max Packets that Awaiting PUBREL, 0 means no limit +mqtt.session.max_awaiting_rel = 0 -%%-------------------------------------------------------------------- -%% Listener -%%-------------------------------------------------------------------- +## Statistics Collection Interval(seconds) +mqtt.session.collect_interval = 0 -%% Plain MQTT -{listener, mqtt, 1883, [ - %% Size of acceptor pool - {acceptors, 16}, +## Expired after 1 day: +## w - week +## d - day +## h - hour +## m - minute +## s - second +mqtt.session.expired_after = 1d - %% Maximum number of concurrent clients - {max_clients, 512}, +##-------------------------------------------------------------------- +## MQTT Queue +##-------------------------------------------------------------------- - %% Mount point prefix - %% {mount_point, "prefix/"}, +## Type: simple | priority +mqtt.queue.type = simple - %% Socket Access Control - {access, [{allow, all}]}, +## Topic Priority: 0~255, Default is 0 +## mqtt.queue.priority = topic/1=10,topic/2=8 - %% Connection Options - {connopts, [ - %% Rate Limit. Format is 'burst, rate', Unit is KB/Sec - %% {rate_limit, "100,10"} %% 100K burst, 10K rate - ]}, +## Max queue length. Enqueued messages when persistent client disconnected, +## or inflight window is full. +mqtt.queue.max_length = infinity - %% Socket Options - {sockopts, [ - %Set buffer if hight thoughtput - %{recbuf, 4096}, - %{sndbuf, 4096}, - %{buffer, 4096}, - %{nodelay, true}, - {backlog, 1024} - ]} -]}. +## Low-water mark of queued messages +mqtt.queue.low_watermark = 20% -%% MQTT/SSL -{listener, mqtts, 8883, [ - %% Size of acceptor pool - {acceptors, 4}, +## High-water mark of queued messages +mqtt.queue.high_watermark = 60% - %% Maximum number of concurrent clients - {max_clients, 512}, +## Queue Qos0 messages? +mqtt.queue.qos0 = true - %% Socket Access Control - {access, [{allow, all}]}, +##-------------------------------------------------------------------- +## MQTT Broker and PubSub +##-------------------------------------------------------------------- - %% SSL certificate and key files - {ssl, [{certfile, "etc/ssl/ssl.crt"}, - {keyfile, "etc/ssl/ssl.key"}]}, +## System Interval of publishing broker $SYS Messages +mqtt.broker.sys_interval = 60 - %% Socket Options - {sockopts, [ - {backlog, 1024} - %{buffer, 4096}, - ]} -]}. +## PubSub Pool Size. Default should be scheduler numbers. +mqtt.pubsub.pool_size = 8 -%% HTTP and WebSocket Listener -{listener, http, 8083, [ - %% Size of acceptor pool - {acceptors, 4}, +mqtt.pubsub.by_clientid = true - %% Maximum number of concurrent clients - {max_clients, 64}, +## Subscribe Asynchronously +mqtt.pubsub.async = true - %% Socket Access Control - {access, [{allow, all}]}, +##-------------------------------------------------------------------- +## MQTT Bridge +##-------------------------------------------------------------------- - %% Socket Options - {sockopts, [ - {backlog, 1024} - %{buffer, 4096}, - ]} -]}. +## Bridge Queue Size +mqtt.bridge.max_queue_len = 10000 -%%-------------------------------------------------------------------- -%% PubSub -%%-------------------------------------------------------------------- +## Ping Interval of bridge node. Unit: Second +mqtt.bridge.ping_down_interval = 1 -%% PubSub and Router. Default should be scheduler numbers. -{pubsub_pool_size, 8}. +##------------------------------------------------------------------- +## MQTT Plugins +##------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% Routing -%%-------------------------------------------------------------------- +## Dir of plugins' config +##mqtt.plugins.etc_dir = etc/plugins/ -%% Route aging time(seconds) -{routing_age, 5}. +## File to store loaded plugin names. +##mqtt.plugins.loaded_file = data/loaded_plugins -%%-------------------------------------------------------------------- -%% Bridge -%%-------------------------------------------------------------------- +##------------------------------------------------------------------- +## MQTT Modules +##------------------------------------------------------------------- -%% TODO: Bridge Queue Size -{bridge_max_queue_len, 10000}. +## Enable retainer module +mqtt.module.retainer = on -%% Ping Interval of bridge node -{bridge_ping_down_interval, 1}. % second +## disc: disc_copies, ram: ram_copies +mqtt.module.retainer.storage_type = ram -%%------------------------------------------------------------------- -%% Plugins -%%------------------------------------------------------------------- +## Max number of retained messages +mqtt.module.retainer.max_message_num = 100000 -%% Dir of plugins' config -{plugins_etc_dir, "etc/plugins/"}. +## Max Payload Size of retained message +mqtt.module.retainer.max_payload_size = 64KB -%% File to store loaded plugin names. -{plugins_loaded_file, "data/loaded_plugins"}. +## Expired after seconds, never expired if 0 +mqtt.module.retainer.expired_after = 0 -%%-------------------------------------------------------------------- -%% Modules -%%-------------------------------------------------------------------- +## Enable presence module +## Client presence management module. Publish presence messages when +## client connected or disconnected. +mqtt.module.presence = on -%% Retainer Module -{module, retainer, [ +mqtt.module.presence.qos = 0 - %% disc: disc_copies, ram: ram_copies - {storage, ram}, +## Enable subscription module +## Subscribe topics automatically when client connected +mqtt.module.subscription = on - %% Max number of retained messages - {max_message_num, 100000}, +mqtt.module.subscription.topics = $client/%c=1,$user/%u=1 - %% Max Payload Size of retained message - {max_playload_size, 65536}, +##-------------------------------------------------------------------- +## MQTT Listeners +##-------------------------------------------------------------------- - %% Expired after seconds, never expired if 0 - {expired_after, 0} +## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 +mqtt.listener.tcp = 1883 -]}. +## Size of acceptor pool +mqtt.listener.tcp.acceptors = 8 -%% Client presence management module. Publish presence messages when -%% client connected or disconnected. -{module, presence, [{qos, 0}]}. +## Maximum number of concurrent clients +mqtt.listener.tcp.max_clients = 1024 -%% Subscribe topics automatically when client connected -{module, subscription, [{"$queue/clients/$c", 1}, backend]}. +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.tcp.rate_limit = 100,10 -%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite) -{module, rewrite, []}. +## TCP Socket Options +mqtt.listener.tcp.backlog = 1024 +## mqtt.listener.tcp.recbuf = 4096 +## mqtt.listener.tcp.sndbuf = 4096 +## mqtt.listener.tcp.buffer = 4096 +## mqtt.listener.tcp.nodelay = true -%%------------------------------------------------------------------- -%% Erlang System Monitor -%%------------------------------------------------------------------- +## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 +mqtt.listener.ssl = 8883 -%% Long GC, don't monitor in production mode for: -%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +## Size of acceptor pool +mqtt.listener.ssl.acceptors = 4 -{sysmon_long_gc, false}. +## Maximum number of concurrent clients +mqtt.listener.ssl.max_clients = 512 -%% Long Schedule(ms) -{sysmon_long_schedule, 240}. +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.ssl.rate_limit = 100,10 -%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. -%% 8 * 1024 * 1024 -{sysmon_large_heap, 8388608}. +## Configuring SSL Options +## See http://erlang.org/doc/man/ssl.html +mqtt.listener.ssl.handshake_timeout = 15 #seconds +mqtt.listener.ssl.keyfile = etc/ssl/key.pem +mqtt.listener.ssl.certfile = etc/ssl/cert.pem +mqtt.listener.ssl.cacertfile = etc/ssl/cacert.pem +## mqtt.listener.ssl.verify = verify_peer +## mqtt.listener.ssl.failed_if_no_peer_cert = true -%% Busy Port -{sysmon_busy_port, false}. +## HTTP Listener +mqtt.listener.http = 8083 +mqtt.listener.http.acceptors = 4 +mqtt.listener.http.max_clients = 64 -%% Busy Dist Port -{sysmon_busy_dist_port, true}. +## HTTP(SSL) Listener +mqtt.listener.https = 8084 +mqtt.listener.https.acceptors = 4 +mqtt.listener.https.max_clients = 64 +mqtt.listener.https.handshake_timeout = 10 #seconds +mqtt.listener.https.certfile = etc/ssl/cert.pem +mqtt.listener.https.keyfile = etc/ssl/key.pem +mqtt.listener.https.cacertfile = etc/ssl/cacert.pem +## mqtt.listener.https.verify = verify_peer +## mqtt.listener.https.failed_if_no_peer_cert = true + +##------------------------------------------------------------------- +## System Monitor +##------------------------------------------------------------------- + +## Long GC, don't monitor in production mode for: +## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +sysmon.long_gc = false + +## Long Schedule(ms) +sysmon.long_schedule = 240 + +## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. +sysmon.large_heap = 8MB + +## Busy Port +sysmon.busy_port = false + +## Busy Dist Port +sysmon.busy_dist_port = true diff --git a/test/emqttd_SUITE_data/emqttd.schema b/test/emqttd_SUITE_data/emqttd.schema new file mode 100644 index 000000000..8ad4eb187 --- /dev/null +++ b/test/emqttd_SUITE_data/emqttd.schema @@ -0,0 +1,752 @@ +%%-*- mode: erlang -*- +%% EMQ config mapping + +%%-------------------------------------------------------------------- +%% Erlang Node +%%-------------------------------------------------------------------- + +%% @doc Erlang node name +{mapping, "node.name", "vm_args.-name", [ + {default, "emqttd@127.0.0.1"} +]}. + +%% @doc Secret cookie for distributed erlang node +{mapping, "node.cookie", "vm_args.-setcookie", [ + {default, "emqsecretcookie"} +]}. + +%% @doc SMP Support +{mapping, "node.smp", "vm_args.-smp", [ + {default, auto}, + {datatype, {enum, [enable, auto, disable]}}, + hidden +]}. + +%% @doc Enable Kernel Poll +{mapping, "node.kernel_poll", "vm_args.+K", [ + {default, on}, + {datatype, flag}, + hidden +]}. + +%% @doc More information at: http://erlang.org/doc/man/erl.html +{mapping, "node.async_threads", "vm_args.+A", [ + {default, 64}, + {datatype, integer}, + {validators, ["range:0-1024"]} +]}. + +%% @doc Erlang Process Limit +{mapping, "node.process_limit", "vm_args.+P", [ + {datatype, integer}, + {default, 256000}, + hidden +]}. + +%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q +%% @doc The number of concurrent ports/sockets +%% Valid range is 1024-134217727 +{mapping, "node.max_ports", + cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [ + {default, 262144}, + {datatype, integer}, + {validators, ["range4ports"]} +]}. + +{validator, "range4ports", "must be 1024 to 134217727", + fun(X) -> X >= 1024 andalso X =< 134217727 end}. + +%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl +{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ + {datatype, bytesize}, + {commented, "32MB"}, + hidden, + {validators, ["zdbbl_range"]} +]}. + +{translation, "vm_args.+zdbbl", + fun(Conf) -> + ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined), + case ZDBBL of + undefined -> undefined; + X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; + _ -> undefined + end + end +}. + +{validator, "zdbbl_range", "must be between 1KB and 2097151KB", + fun(ZDBBL) -> + %% 2097151KB = 2147482624 + ZDBBL >= 1024 andalso ZDBBL =< 2147482624 + end +}. + +%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 +{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ + {default, 1000}, + {datatype, integer}, + hidden, + {validators, ["positive_integer"]} +]}. + +{validator, "positive_integer", "must be a positive integer", + fun(X) -> X >= 0 end}. + +%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, +%% R16+ uses +e +%% @doc The ETS table limit +{mapping, "node.max_ets_tables", + cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [ + {default, 256000}, + {datatype, integer}, + hidden +]}. + +%% @doc Set the location of crash dumps +{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ + {default, "{{crash_dump}}"}, + {datatype, file}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime +{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ + {commented, 60}, + {datatype, integer}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html +{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ + {commented, 6000}, + {datatype, integer}, + hidden +]}. + +%% @see node.dist_listen_min +{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ + {commented, 6999}, + {datatype, integer}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% Log +%%-------------------------------------------------------------------- + +{mapping, "log.console", "lager.handlers", [ + {default, file }, + {datatype, {enum, [off, file, console, both]}} +]}. + +{mapping, "log.console.level", "lager.handlers", [ + {default, info}, + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} +]}. + +{mapping, "log.console.file", "lager.handlers", [ + {default, "log/console.log"}, + {datatype, file} +]}. + +{mapping, "log.error.file", "lager.handlers", [ + {default, "log/error.log"}, + {datatype, file} +]}. + +{mapping, "log.error.redirect", "lager.error_logger_redirect", [ + {default, on}, + {datatype, flag}, + hidden +]}. + +{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ + {default, 1000}, + {datatype, integer}, + hidden +]}. + +{translation, + "lager.handlers", + fun(Conf) -> + ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of + undefined -> []; + ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, + {level, error}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}]}] + end, + + ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), + ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), + + ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, + ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, + {level, ConsoleLogLevel}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}]}, + + ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of + off -> []; + file -> [ConsoleFileHandler]; + console -> [ConsoleHandler]; + both -> [ConsoleHandler, ConsoleFileHandler]; + _ -> [] + end, + ConsoleHandlers ++ ErrorHandler + end +}. + +{mapping, "log.crash", "lager.crash_log", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "log.crash.file", "lager.crash_log", [ + {default, "log/crash.log"}, + {datatype, file} +]}. + +{translation, + "lager.crash_log", + fun(Conf) -> + case cuttlefish:conf_get("log.crash", Conf) of + false -> undefined; + _ -> + cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") + end + end}. + +{mapping, "sasl", "sasl.sasl_error_logger", [ + {default, off}, + {datatype, flag}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT Protocol +%%-------------------------------------------------------------------- + +%% @doc Set the Max ClientId Length Allowed. +{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [ + {default, 1024}, + {datatype, integer} +]}. + +%% @doc Max Packet Size Allowed, 64K by default. +{mapping, "mqtt.max_packet_size", "emqttd.protocol", [ + {default, "64KB"}, + {datatype, bytesize} +]}. + +%% @doc Client Idle Timeout. +{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [ + {default, 30}, + {datatype, integer} +]}. + +{translation, "emqttd.protocol", fun(Conf) -> + [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, + {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}, + {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}] +end}. + +%% @doc Allow Anonymous +{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [ + {default, false}, + {datatype, {enum, [true, false]}}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT Session +%%-------------------------------------------------------------------- + +%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. +%% 0 means no limit +{mapping, "mqtt.session.max_inflight", "emqttd.session", [ + {default, 100}, + {datatype, integer} +]}. + + +%% @doc Retry interval for redelivering QoS1/2 messages. +{mapping, "mqtt.session.retry_interval", "emqttd.session", [ + {default, 60}, + {datatype, integer} +]}. + +%% @doc Awaiting PUBREL Timeout +{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [ + {default, 30}, + {datatype, integer} +]}. + +%% @doc Max Packets that Awaiting PUBREL, 0 means no limit +{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Statistics Collection Interval(seconds) +{mapping, "mqtt.session.collect_interval", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Session expired after... +{mapping, "mqtt.session.expired_after", "emqttd.session", [ + {default, "2d"}, + {datatype, {duration, s}} +]}. + +{translation, "emqttd.session", fun(Conf) -> + [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, + {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, + {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, + {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, + {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)}, + {expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}] +end}. + +%%-------------------------------------------------------------------- +%% MQTT Queue +%%-------------------------------------------------------------------- + +%% @doc Type: simple | priority +{mapping, "mqtt.queue.type", "emqttd.queue", [ + {default, simple}, + {datatype, atom} +]}. + +%% @doc Topic Priority: 0~255, Default is 0 +{mapping, "mqtt.queue.priority", "emqttd.queue", [ + {default, ""}, + {datatype, string}, + hidden +]}. + +%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. +{mapping, "mqtt.queue.max_length", "emqttd.queue", [ + {default, infinity}, + {datatype, [atom, integer]} +]}. + +%% @doc Low-water mark of queued messages +{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [ + {default, "20%"}, + {datatype, string}, + hidden +]}. + +%% @doc High-water mark of queued messages +{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [ + {default, "60%"}, + {datatype, string}, + hidden +]}. + +%% @doc Queue Qos0 messages? +{mapping, "mqtt.queue.qos0", "emqttd.queue", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.queue", fun(Conf) -> + Parse = fun(S) -> + {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), + list_to_integer(N) / 100 + end, + Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)}, + {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)}, + {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))}, + {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))}, + {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}], + case cuttlefish:conf_get("mqtt.queue.priority", Conf) of + undefined -> Opts; + V -> [{priority, + [begin [T, P] = string:tokens(S, "="), + {T, list_to_integer(P)} + end || S <- string:tokens(V, ",")]}|Opts] + end +end}. + +%%-------------------------------------------------------------------- +%% MQTT Broker +%%-------------------------------------------------------------------- + +{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [ + {default, 60}, + {datatype, integer} +]}. + +%%-------------------------------------------------------------------- +%% MQTT PubSub +%%-------------------------------------------------------------------- + +{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [ + {default, true}, + {datatype, {enum, [true, false]}}, + hidden +]}. + +{translation, "emqttd.pubsub", fun(Conf) -> + [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)}, + {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)}, + {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] +end}. + +%%-------------------------------------------------------------------- +%% MQTT Bridge +%%-------------------------------------------------------------------- + +{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [ + {default, 10000}, + {datatype, integer} +]}. + +{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [ + {default, 1}, + {datatype, integer} +]}. + +{translation, "emqttd.bridge", fun(Conf) -> + [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)}, + {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}] +end}. + +%%------------------------------------------------------------------- +%% MQTT Plugins +%%------------------------------------------------------------------- + +{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [ + {datatype, string} +]}. + +{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% MQTT Listeners +%%-------------------------------------------------------------------- + +{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ + {default, 1883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [ + {default, undefined}, + {datatype, string}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ + {default, 512}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [ + {default, 15}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "mqtt.listener.http", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https", "emqttd.listeners", [ + {default, undefined}, + {datatype, [integer, ip]}, + hidden +]}. + +{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [ + {default, 15}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.failed_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.listeners", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + LisOpts = fun(Prefix) -> + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, + {rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}]) + end, + TcpOpts = fun(Prefix) -> + Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}]) + end, + SslOpts = fun(Prefix) -> + Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf)}, + {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, + {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, + {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, + {verify, cuttlefish:conf_get(Prefix ++ ".verify_peer", Conf, undefined)}, + {failed_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ "failed_if_no_peer_cert", Conf, undefined)}]) + end, + + Listeners = fun(Name) when is_atom(Name) -> + Key = "mqtt.listener." ++ atom_to_list(Name), + case cuttlefish:conf_get(Key, Conf, undefined) of + undefined -> + []; + Port -> + ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]), + Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)], + [{Name, Port, case Name =:= ssl orelse Name =:= https of + true -> [{ssl, SslOpts(Key)} | Opts]; + false -> Opts + end}] + end + end, + lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)]) +end}. + +%%-------------------------------------------------------------------- +%% MQTT Modules +%%-------------------------------------------------------------------- + +{mapping, "mqtt.module.retainer", "emqttd.modules", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.retainer.storage_type", "emqttd.modules", [ + {default, ram}, + {datatype, {enum, [disc, ram]}} +]}. + +{mapping, "mqtt.module.retainer.max_message_num", "emqttd.modules", [ + {default, 100000}, + {datatype, integer} +]}. + +{mapping, "mqtt.module.retainer.max_payload_size", "emqttd.modules", [ + {default, "64KB"}, + {datatype, bytesize} +]}. + +{mapping, "mqtt.module.retainer.expired_after", "emqttd.modules", [ + {default, 0}, + {datatype, integer} +]}. + +{mapping, "mqtt.module.presence", "emqttd.modules", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.presence.qos", "emqttd.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "mqtt.module.subscription", "emqttd.modules", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.subscription.topics", "emqttd.modules", [ + {default, undefined}, + {datatype, string} +]}. + +{translation, "emqttd.modules", fun(Conf) -> + WithMod = fun(Name, OptsF) -> + Key = "mqtt.module." ++ atom_to_list(Name), + case cuttlefish:conf_get(Key, Conf, false) of + true -> [{Name, OptsF(Key)}]; + false -> [] + end + end, + RetainOpts = fun(Prefix) -> + [{storage_type, cuttlefish:conf_get(Prefix ++ ".storage_type", Conf, ram)}, + {max_message_num, cuttlefish:conf_get(Prefix ++ ".max_message_num", Conf, undefined)}, + {max_payload_size, cuttlefish:conf_get(Prefix ++ ".max_payload_size", Conf, undefined)}, + {expired_after, cuttlefish:conf_get(Prefix ++ ".expired_after", Conf, 0)}] + end, + PresOpts = fun(Prefix) -> + [{qos, cuttlefish:conf_get(Prefix ++ ".qos", Conf, 0)}] + end, + ParseFun = fun(undefined) -> []; + (Topics) -> [begin + [Topic, Qos] = string:tokens(S, "="), + {list_to_binary(Topic), list_to_integer(Qos)} + end || S <- string:tokens(Topics, ",")] + end, + SubOpts = fun(Prefix) -> [{topics, ParseFun(cuttlefish:conf_get(Prefix ++ ".topics", Conf))}] end, + lists:append([WithMod(retainer, RetainOpts), WithMod(presence, PresOpts), WithMod(subscription, SubOpts)]) +end}. + +%%-------------------------------------------------------------------- +%% System Monitor +%%-------------------------------------------------------------------- + +%% @doc Long GC, don't monitor in production mode for: +%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +{mapping, "sysmon.long_gc", "emqttd.sysmon", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Long Schedule(ms) +{mapping, "sysmon.long_schedule", "emqttd.sysmon", [ + {default, 1000}, + {datatype, integer} +]}. + +%% @doc Large Heap +{mapping, "sysmon.large_heap", "emqttd.sysmon", [ + {default, "8MB"}, + {datatype, bytesize} +]}. + +%% @doc Monitor Busy Port +{mapping, "sysmon.busy_port", "emqttd.sysmon", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Monitor Busy Dist Port +{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.sysmon", fun(Conf) -> + [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, + {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, + {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, + {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, + {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] +end}. + diff --git a/test/emqttd_access_SUITE.erl b/test/emqttd_access_SUITE.erl index bad624157..e127cf601 100644 --- a/test/emqttd_access_SUITE.erl +++ b/test/emqttd_access_SUITE.erl @@ -41,7 +41,6 @@ groups() -> init_per_group(access_control, Config) -> application:load(emqttd), prepare_config(), - gen_conf:init(emqttd), Config; init_per_group(_Group, Config) -> @@ -92,43 +91,39 @@ end_per_testcase(_TestCase, _Config) -> %%-------------------------------------------------------------------- reload_acl(_) -> - [ok] = ?AC:reload_acl(). + [] = ?AC:reload_acl(). register_mod(_) -> ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []), {error, already_existed} = ?AC:register_mod(acl, emqttd_acl_test_mod, []), - [{emqttd_acl_test_mod, _, 0}, - {emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl), + [{emqttd_acl_test_mod, _, 0}] = ?AC:lookup_mods(acl), ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]), ok = ?AC:register_mod(auth, emqttd_auth_dashboard, [], 99), [{emqttd_auth_dashboard, _, 99}, - {emqttd_auth_anonymous_test_mod, _, 0}, - {emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth). + {emqttd_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth). unregister_mod(_) -> ok = ?AC:register_mod(acl, emqttd_acl_test_mod, []), - [{emqttd_acl_test_mod, _, 0}, - {emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl), + [{emqttd_acl_test_mod, _, 0}] = ?AC:lookup_mods(acl), ok = ?AC:unregister_mod(acl, emqttd_acl_test_mod), timer:sleep(5), - [{emqttd_acl_internal, _, 0}] = ?AC:lookup_mods(acl), + [] = ?AC:lookup_mods(acl), ok = ?AC:register_mod(auth, emqttd_auth_anonymous_test_mod,[]), - [{emqttd_auth_anonymous_test_mod, _, 0}, - {emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth), + [{emqttd_auth_anonymous_test_mod, _, 0}] = ?AC:lookup_mods(auth), ok = ?AC:unregister_mod(auth, emqttd_auth_anonymous_test_mod), timer:sleep(5), - [{emqttd_auth_anonymous, _, 0}] = ?AC:lookup_mods(auth). + [] = ?AC:lookup_mods(auth). check_acl(_) -> User1 = #mqtt_client{client_id = <<"client1">>, username = <<"testuser">>}, User2 = #mqtt_client{client_id = <<"client2">>, username = <<"xyz">>}, allow = ?AC:check_acl(User1, subscribe, <<"users/testuser/1">>), allow = ?AC:check_acl(User1, subscribe, <<"clients/client1">>), - deny = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>), + allow = ?AC:check_acl(User1, subscribe, <<"clients/client1/x/y">>), allow = ?AC:check_acl(User1, publish, <<"users/testuser/1">>), allow = ?AC:check_acl(User1, subscribe, <<"a/b/c">>), - deny = ?AC:check_acl(User2, subscribe, <<"a/b/c">>). + allow = ?AC:check_acl(User2, subscribe, <<"a/b/c">>). %%-------------------------------------------------------------------- %% emqttd_access_rule diff --git a/test/emqttd_vm_SUITE.erl b/test/emqttd_vm_SUITE.erl index 708172099..a26bde4f4 100644 --- a/test/emqttd_vm_SUITE.erl +++ b/test/emqttd_vm_SUITE.erl @@ -124,13 +124,13 @@ process_list(_Config) -> process_info(_Config) -> ProcessInfos = emqttd_vm:get_process_info(), ProcessInfo = lists:last(ProcessInfos), - Keys = [K || {K, V}<- ProcessInfo], + Keys = [K || {K, _V}<- ProcessInfo], ?PROCESS_INFO = Keys. process_gc(_Config) -> ProcessGcs = emqttd_vm:get_process_gc(), ProcessGc = lists:last(ProcessGcs), - Keys = [K || {K, V}<- ProcessGc], + Keys = [K || {K, _V}<- ProcessGc], ?PROCESS_GC = Keys. get_ets_list(_Config) ->