diff --git a/test/emqttd_SUITE.erl b/test/emqttd_SUITE.erl index 8a38cad90..e16172a16 100644 --- a/test/emqttd_SUITE.erl +++ b/test/emqttd_SUITE.erl @@ -101,8 +101,7 @@ groups() -> init_per_suite(Config) -> application:start(lager), DataDir = proplists:get_value(data_dir, Config), - application:set_env(emqttd, conf, filename:join([DataDir, "emqttd.conf"])), - application:ensure_all_started(emqttd), + start_apps(emqttd, DataDir), Config. end_per_suite(_Config) -> @@ -598,4 +597,12 @@ slave(node, Node) -> {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"), N. +start_apps(App, DataDir) -> + application:start(lager), + Schema = cuttlefish_schema:files([filename:join([DataDir, atom_to_list(App) ++ ".schema"])]), + Conf = conf_parse:file(filename:join([DataDir, atom_to_list(App) ++ ".conf"])), + NewConfig = cuttlefish_generator:map(Schema, Conf), + Vals = proplists:get_value(App, NewConfig), + [application:set_env(App, Par, Value) || {Par, Value} <- Vals], + application:ensure_all_started(App). diff --git a/test/emqttd_SUITE_data/emqttd.conf b/test/emqttd_SUITE_data/emqttd.conf index 68546ed89..4710ef909 100644 --- a/test/emqttd_SUITE_data/emqttd.conf +++ b/test/emqttd_SUITE_data/emqttd.conf @@ -1,270 +1,280 @@ -%%=================================================================== -%% -%% Config file for emqttd 2.0 -%% -%% Erlang Term Syntax: -%% -%% {}: Tuple, usually {Key, Value} -%% []: List, seperated by comma -%% %%: Comment -%% -%%=================================================================== +##-------------------------------------------------------------------- +## Node Args +##-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% MQTT Protocol -%%-------------------------------------------------------------------- +## Node name +node.name = emqttd@127.0.0.1 -%% Max ClientId Length Allowed. -{mqtt_max_clientid_len, 512}. +## Cookie for distributed node +node.cookie = emq_dist_cookie -%% Max Packet Size Allowed, 64K by default. -{mqtt_max_packet_size, 65536}. +## SMP support: enable, auto, disable +node.smp = auto -%% Client Idle Timeout. -{mqtt_client_idle_timeout, 30}. % Second +## Enable kernel poll +node.kernel_poll = on -%%-------------------------------------------------------------------- -%% Authentication -%%-------------------------------------------------------------------- +## async thread pool +node.async_threads = 32 -%% Anonymous: Allow all -{auth, anonymous, []}. +## Erlang Process Limit +node.process_limit = 256000 -%% Authentication with username, password -{auth, username, []}. +## Sets the maximum number of simultaneously existing ports for this system +node.max_ports = 65536 -%% Authentication with clientId -{auth, clientid, [{password, no}]}. +## Set the distribution buffer busy limit (dist_buf_busy_limit) +node.dist_buffer_size = 32MB -%%-------------------------------------------------------------------- -%% ACL -%%-------------------------------------------------------------------- +## Max ETS Tables. +## Note that mnesia and SSL will create temporary ets tables. +node.max_ets_tables = 256000 -{acl, anonymous, []}. +## Tweak GC to run more often +node.fullsweep_after = 1000 -{acl, internal, [{nomatch, allow}]}. +## Crash dump +node.crash_dump = log/crash.dump -%% Cache ACL result for PUBLISH -{cache_acl, true}. +## Distributed node ticktime +node.dist_net_ticktime = 60 -%%-------------------------------------------------------------------- -%% Broker -%%-------------------------------------------------------------------- +## Distributed node port range +## node.dist_listen_min = 6000 +## node.dist_listen_max = 6999 -%% System interval of publishing broker $SYS messages -{broker_sys_interval, 60}. +##-------------------------------------------------------------------- +## Log +##-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% Session -%%-------------------------------------------------------------------- +## Console log. Enum: off, file, console, both +log.console = console -%% Max number of QoS 1 and 2 messages that can be “inflight” at one time. -%% 0 means no limit -{session_max_inflight, 100}. +## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency +log.console.level = error -%% Retry interval for redelivering QoS1/2 messages. -{session_unack_retry_interval, 60}. +## Console log file +## log.console.file = log/console.log -%% Awaiting PUBREL Timeout -{session_await_rel_timeout, 20}. +## Error log file +log.error.file = log/error.log -%% Max Packets that Awaiting PUBREL, 0 means no limit -{session_max_awaiting_rel, 0}. +## Enable the crash log. Enum: on, off +log.crash = on -%% Statistics Collection Interval(seconds) -{session_collect_interval, 0}. +log.crash.file = log/crash.log -%% Expired after 2 day (unit: minute) -{session_expired_after, 2880}. +##-------------------------------------------------------------------- +## MQTT Protocol +##-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% Queue -%%-------------------------------------------------------------------- +## Max ClientId Length Allowed. +mqtt.max_clientid_len = 1024 -%% Type: simple | priority -{queue_type, simple}. +## Max Packet Size Allowed, 64K by default. +mqtt.max_packet_size = 64KB -%% Topic Priority: 0~255, Default is 0 -%% {queue_priority, [{"topic/1", 10}, {"topic/2", 8}]}. +## Client Idle Timeout (Second) +mqtt.client_idle_timeout = 30 -%% Max queue length. Enqueued messages when persistent client disconnected, -%% or inflight window is full. -{queue_max_length, infinity}. +## Allow Anonymous authentication +mqtt.allow_anonymous = true -%% Low-water mark of queued messages -{queue_low_watermark, 0.2}. +##-------------------------------------------------------------------- +## MQTT Session +##-------------------------------------------------------------------- -%% High-water mark of queued messages -{queue_high_watermark, 0.6}. +## Max number of QoS 1 and 2 messages that can be “inflight” at one time. +## 0 means no limit +mqtt.session.max_inflight = 100 -%% Queue Qos0 messages? -{queue_qos0, true}. +## Retry interval for redelivering QoS1/2 messages. +mqtt.session.retry_interval = 60 -%%-------------------------------------------------------------------- -%% Zone -%%-------------------------------------------------------------------- +## Awaiting PUBREL Timeout +mqtt.session.await_rel_timeout = 20 -{zone, admin, []}. +## Max Packets that Awaiting PUBREL, 0 means no limit +mqtt.session.max_awaiting_rel = 0 -%%-------------------------------------------------------------------- -%% Listener -%%-------------------------------------------------------------------- +## Statistics Collection Interval(seconds) +mqtt.session.collect_interval = 0 -%% Plain MQTT -{listener, mqtt, 1883, [ - %% Size of acceptor pool - {acceptors, 16}, +## Expired after 1 day: +## w - week +## d - day +## h - hour +## m - minute +## s - second +mqtt.session.expired_after = 1d - %% Maximum number of concurrent clients - {max_clients, 512}, +##-------------------------------------------------------------------- +## MQTT Queue +##-------------------------------------------------------------------- - %% Mount point prefix - %% {mount_point, "prefix/"}, +## Type: simple | priority +mqtt.queue.type = simple - %% Socket Access Control - {access, [{allow, all}]}, +## Topic Priority: 0~255, Default is 0 +## mqtt.queue.priority = topic/1=10,topic/2=8 - %% Connection Options - {connopts, [ - %% Rate Limit. Format is 'burst, rate', Unit is KB/Sec - %% {rate_limit, "100,10"} %% 100K burst, 10K rate - ]}, +## Max queue length. Enqueued messages when persistent client disconnected, +## or inflight window is full. +mqtt.queue.max_length = infinity - %% Socket Options - {sockopts, [ - %Set buffer if hight thoughtput - %{recbuf, 4096}, - %{sndbuf, 4096}, - %{buffer, 4096}, - %{nodelay, true}, - {backlog, 1024} - ]} -]}. +## Low-water mark of queued messages +mqtt.queue.low_watermark = 20% -%% MQTT/SSL -{listener, mqtts, 8883, [ - %% Size of acceptor pool - {acceptors, 4}, +## High-water mark of queued messages +mqtt.queue.high_watermark = 60% - %% Maximum number of concurrent clients - {max_clients, 512}, +## Queue Qos0 messages? +mqtt.queue.qos0 = true - %% Socket Access Control - {access, [{allow, all}]}, +##-------------------------------------------------------------------- +## MQTT Broker and PubSub +##-------------------------------------------------------------------- - %% SSL certificate and key files - {ssl, [{certfile, "etc/ssl/ssl.crt"}, - {keyfile, "etc/ssl/ssl.key"}]}, +## System Interval of publishing broker $SYS Messages +mqtt.broker.sys_interval = 60 - %% Socket Options - {sockopts, [ - {backlog, 1024} - %{buffer, 4096}, - ]} -]}. +## PubSub Pool Size. Default should be scheduler numbers. +mqtt.pubsub.pool_size = 8 -%% HTTP and WebSocket Listener -{listener, http, 8083, [ - %% Size of acceptor pool - {acceptors, 4}, +mqtt.pubsub.by_clientid = true - %% Maximum number of concurrent clients - {max_clients, 64}, +## Subscribe Asynchronously +mqtt.pubsub.async = true - %% Socket Access Control - {access, [{allow, all}]}, +##-------------------------------------------------------------------- +## MQTT Bridge +##-------------------------------------------------------------------- - %% Socket Options - {sockopts, [ - {backlog, 1024} - %{buffer, 4096}, - ]} -]}. +## Bridge Queue Size +mqtt.bridge.max_queue_len = 10000 -%%-------------------------------------------------------------------- -%% PubSub -%%-------------------------------------------------------------------- +## Ping Interval of bridge node. Unit: Second +mqtt.bridge.ping_down_interval = 1 -%% PubSub and Router. Default should be scheduler numbers. -{pubsub_pool_size, 8}. +##------------------------------------------------------------------- +## MQTT Plugins +##------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% Routing -%%-------------------------------------------------------------------- +## Dir of plugins' config +##mqtt.plugins.etc_dir = etc/plugins/ -%% Route aging time(seconds) -{routing_age, 5}. +## File to store loaded plugin names. +##mqtt.plugins.loaded_file = data/loaded_plugins -%%-------------------------------------------------------------------- -%% Bridge -%%-------------------------------------------------------------------- +##------------------------------------------------------------------- +## MQTT Modules +##------------------------------------------------------------------- -%% TODO: Bridge Queue Size -{bridge_max_queue_len, 10000}. +## Enable retainer module +mqtt.module.retainer = on -%% Ping Interval of bridge node -{bridge_ping_down_interval, 1}. % second +## disc: disc_copies, ram: ram_copies +mqtt.module.retainer.storage_type = ram -%%------------------------------------------------------------------- -%% Plugins -%%------------------------------------------------------------------- +## Max number of retained messages +mqtt.module.retainer.max_message_num = 100000 -%% Dir of plugins' config -{plugins_etc_dir, "etc/plugins/"}. +## Max Payload Size of retained message +mqtt.module.retainer.max_payload_size = 64KB -%% File to store loaded plugin names. -{plugins_loaded_file, "data/loaded_plugins"}. +## Expired after seconds, never expired if 0 +mqtt.module.retainer.expired_after = 0 -%%-------------------------------------------------------------------- -%% Modules -%%-------------------------------------------------------------------- +## Enable presence module +## Client presence management module. Publish presence messages when +## client connected or disconnected. +mqtt.module.presence = on -%% Retainer Module -{module, retainer, [ +mqtt.module.presence.qos = 0 - %% disc: disc_copies, ram: ram_copies - {storage, ram}, +## Enable subscription module +## Subscribe topics automatically when client connected +mqtt.module.subscription = on - %% Max number of retained messages - {max_message_num, 100000}, +mqtt.module.subscription.topics = $client/%c=1,$user/%u=1 - %% Max Payload Size of retained message - {max_playload_size, 65536}, +##-------------------------------------------------------------------- +## MQTT Listeners +##-------------------------------------------------------------------- - %% Expired after seconds, never expired if 0 - {expired_after, 0} +## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 +mqtt.listener.tcp = 1883 -]}. +## Size of acceptor pool +mqtt.listener.tcp.acceptors = 8 -%% Client presence management module. Publish presence messages when -%% client connected or disconnected. -{module, presence, [{qos, 0}]}. +## Maximum number of concurrent clients +mqtt.listener.tcp.max_clients = 1024 -%% Subscribe topics automatically when client connected -{module, subscription, [{"$queue/clients/$c", 1}, backend]}. +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.tcp.rate_limit = 100,10 -%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite) -{module, rewrite, []}. +## TCP Socket Options +mqtt.listener.tcp.backlog = 1024 +## mqtt.listener.tcp.recbuf = 4096 +## mqtt.listener.tcp.sndbuf = 4096 +## mqtt.listener.tcp.buffer = 4096 +## mqtt.listener.tcp.nodelay = true -%%------------------------------------------------------------------- -%% Erlang System Monitor -%%------------------------------------------------------------------- +## SSL Listener: 8883, 127.0.0.1:8883, ::1:8883 +mqtt.listener.ssl = 8883 -%% Long GC, don't monitor in production mode for: -%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +## Size of acceptor pool +mqtt.listener.ssl.acceptors = 4 -{sysmon_long_gc, false}. +## Maximum number of concurrent clients +mqtt.listener.ssl.max_clients = 512 -%% Long Schedule(ms) -{sysmon_long_schedule, 240}. +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.ssl.rate_limit = 100,10 -%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. -%% 8 * 1024 * 1024 -{sysmon_large_heap, 8388608}. +## Configuring SSL Options +## See http://erlang.org/doc/man/ssl.html +mqtt.listener.ssl.handshake_timeout = 15 #seconds +mqtt.listener.ssl.keyfile = etc/ssl/key.pem +mqtt.listener.ssl.certfile = etc/ssl/cert.pem +mqtt.listener.ssl.cacertfile = etc/ssl/cacert.pem +## mqtt.listener.ssl.verify = verify_peer +## mqtt.listener.ssl.failed_if_no_peer_cert = true -%% Busy Port -{sysmon_busy_port, false}. +## HTTP Listener +mqtt.listener.http = 8083 +mqtt.listener.http.acceptors = 4 +mqtt.listener.http.max_clients = 64 -%% Busy Dist Port -{sysmon_busy_dist_port, true}. +## HTTP(SSL) Listener +mqtt.listener.https = 8084 +mqtt.listener.https.acceptors = 4 +mqtt.listener.https.max_clients = 64 +mqtt.listener.https.handshake_timeout = 10 #seconds +mqtt.listener.https.certfile = etc/ssl/cert.pem +mqtt.listener.https.keyfile = etc/ssl/key.pem +mqtt.listener.https.cacertfile = etc/ssl/cacert.pem +## mqtt.listener.https.verify = verify_peer +## mqtt.listener.https.failed_if_no_peer_cert = true + +##------------------------------------------------------------------- +## System Monitor +##------------------------------------------------------------------- + +## Long GC, don't monitor in production mode for: +## https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +sysmon.long_gc = false + +## Long Schedule(ms) +sysmon.long_schedule = 240 + +## 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. +sysmon.large_heap = 8MB + +## Busy Port +sysmon.busy_port = false + +## Busy Dist Port +sysmon.busy_dist_port = true diff --git a/test/emqttd_SUITE_data/emqttd.schema b/test/emqttd_SUITE_data/emqttd.schema new file mode 100644 index 000000000..8ad4eb187 --- /dev/null +++ b/test/emqttd_SUITE_data/emqttd.schema @@ -0,0 +1,752 @@ +%%-*- mode: erlang -*- +%% EMQ config mapping + +%%-------------------------------------------------------------------- +%% Erlang Node +%%-------------------------------------------------------------------- + +%% @doc Erlang node name +{mapping, "node.name", "vm_args.-name", [ + {default, "emqttd@127.0.0.1"} +]}. + +%% @doc Secret cookie for distributed erlang node +{mapping, "node.cookie", "vm_args.-setcookie", [ + {default, "emqsecretcookie"} +]}. + +%% @doc SMP Support +{mapping, "node.smp", "vm_args.-smp", [ + {default, auto}, + {datatype, {enum, [enable, auto, disable]}}, + hidden +]}. + +%% @doc Enable Kernel Poll +{mapping, "node.kernel_poll", "vm_args.+K", [ + {default, on}, + {datatype, flag}, + hidden +]}. + +%% @doc More information at: http://erlang.org/doc/man/erl.html +{mapping, "node.async_threads", "vm_args.+A", [ + {default, 64}, + {datatype, integer}, + {validators, ["range:0-1024"]} +]}. + +%% @doc Erlang Process Limit +{mapping, "node.process_limit", "vm_args.+P", [ + {datatype, integer}, + {default, 256000}, + hidden +]}. + +%% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q +%% @doc The number of concurrent ports/sockets +%% Valid range is 1024-134217727 +{mapping, "node.max_ports", + cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [ + {default, 262144}, + {datatype, integer}, + {validators, ["range4ports"]} +]}. + +{validator, "range4ports", "must be 1024 to 134217727", + fun(X) -> X >= 1024 andalso X =< 134217727 end}. + +%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl +{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ + {datatype, bytesize}, + {commented, "32MB"}, + hidden, + {validators, ["zdbbl_range"]} +]}. + +{translation, "vm_args.+zdbbl", + fun(Conf) -> + ZDBBL = cuttlefish:conf_get("node.dist_buffer_size", Conf, undefined), + case ZDBBL of + undefined -> undefined; + X when is_integer(X) -> cuttlefish_util:ceiling(X / 1024); %% Bytes to Kilobytes; + _ -> undefined + end + end +}. + +{validator, "zdbbl_range", "must be between 1KB and 2097151KB", + fun(ZDBBL) -> + %% 2097151KB = 2147482624 + ZDBBL >= 1024 andalso ZDBBL =< 2147482624 + end +}. + +%% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 +{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ + {default, 1000}, + {datatype, integer}, + hidden, + {validators, ["positive_integer"]} +]}. + +{validator, "positive_integer", "must be a positive integer", + fun(X) -> X >= 0 end}. + +%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, +%% R16+ uses +e +%% @doc The ETS table limit +{mapping, "node.max_ets_tables", + cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [ + {default, 256000}, + {datatype, integer}, + hidden +]}. + +%% @doc Set the location of crash dumps +{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ + {default, "{{crash_dump}}"}, + {datatype, file}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime +{mapping, "node.dist_net_ticktime", "vm_args.-kernel net_ticktime", [ + {commented, 60}, + {datatype, integer}, + hidden +]}. + +%% @doc http://www.erlang.org/doc/man/kernel_app.html +{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [ + {commented, 6000}, + {datatype, integer}, + hidden +]}. + +%% @see node.dist_listen_min +{mapping, "node.dist_listen_max", "kernel.inet_dist_listen_max", [ + {commented, 6999}, + {datatype, integer}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% Log +%%-------------------------------------------------------------------- + +{mapping, "log.console", "lager.handlers", [ + {default, file }, + {datatype, {enum, [off, file, console, both]}} +]}. + +{mapping, "log.console.level", "lager.handlers", [ + {default, info}, + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} +]}. + +{mapping, "log.console.file", "lager.handlers", [ + {default, "log/console.log"}, + {datatype, file} +]}. + +{mapping, "log.error.file", "lager.handlers", [ + {default, "log/error.log"}, + {datatype, file} +]}. + +{mapping, "log.error.redirect", "lager.error_logger_redirect", [ + {default, on}, + {datatype, flag}, + hidden +]}. + +{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ + {default, 1000}, + {datatype, integer}, + hidden +]}. + +{translation, + "lager.handlers", + fun(Conf) -> + ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of + undefined -> []; + ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, + {level, error}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}]}] + end, + + ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), + ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), + + ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, + ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, + {level, ConsoleLogLevel}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}]}, + + ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of + off -> []; + file -> [ConsoleFileHandler]; + console -> [ConsoleHandler]; + both -> [ConsoleHandler, ConsoleFileHandler]; + _ -> [] + end, + ConsoleHandlers ++ ErrorHandler + end +}. + +{mapping, "log.crash", "lager.crash_log", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "log.crash.file", "lager.crash_log", [ + {default, "log/crash.log"}, + {datatype, file} +]}. + +{translation, + "lager.crash_log", + fun(Conf) -> + case cuttlefish:conf_get("log.crash", Conf) of + false -> undefined; + _ -> + cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") + end + end}. + +{mapping, "sasl", "sasl.sasl_error_logger", [ + {default, off}, + {datatype, flag}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT Protocol +%%-------------------------------------------------------------------- + +%% @doc Set the Max ClientId Length Allowed. +{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [ + {default, 1024}, + {datatype, integer} +]}. + +%% @doc Max Packet Size Allowed, 64K by default. +{mapping, "mqtt.max_packet_size", "emqttd.protocol", [ + {default, "64KB"}, + {datatype, bytesize} +]}. + +%% @doc Client Idle Timeout. +{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [ + {default, 30}, + {datatype, integer} +]}. + +{translation, "emqttd.protocol", fun(Conf) -> + [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, + {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}, + {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}] +end}. + +%% @doc Allow Anonymous +{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [ + {default, false}, + {datatype, {enum, [true, false]}}, + hidden +]}. + +%%-------------------------------------------------------------------- +%% MQTT Session +%%-------------------------------------------------------------------- + +%% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. +%% 0 means no limit +{mapping, "mqtt.session.max_inflight", "emqttd.session", [ + {default, 100}, + {datatype, integer} +]}. + + +%% @doc Retry interval for redelivering QoS1/2 messages. +{mapping, "mqtt.session.retry_interval", "emqttd.session", [ + {default, 60}, + {datatype, integer} +]}. + +%% @doc Awaiting PUBREL Timeout +{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [ + {default, 30}, + {datatype, integer} +]}. + +%% @doc Max Packets that Awaiting PUBREL, 0 means no limit +{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Statistics Collection Interval(seconds) +{mapping, "mqtt.session.collect_interval", "emqttd.session", [ + {default, 0}, + {datatype, integer} +]}. + +%% @doc Session expired after... +{mapping, "mqtt.session.expired_after", "emqttd.session", [ + {default, "2d"}, + {datatype, {duration, s}} +]}. + +{translation, "emqttd.session", fun(Conf) -> + [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, + {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, + {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, + {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, + {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)}, + {expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}] +end}. + +%%-------------------------------------------------------------------- +%% MQTT Queue +%%-------------------------------------------------------------------- + +%% @doc Type: simple | priority +{mapping, "mqtt.queue.type", "emqttd.queue", [ + {default, simple}, + {datatype, atom} +]}. + +%% @doc Topic Priority: 0~255, Default is 0 +{mapping, "mqtt.queue.priority", "emqttd.queue", [ + {default, ""}, + {datatype, string}, + hidden +]}. + +%% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. +{mapping, "mqtt.queue.max_length", "emqttd.queue", [ + {default, infinity}, + {datatype, [atom, integer]} +]}. + +%% @doc Low-water mark of queued messages +{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [ + {default, "20%"}, + {datatype, string}, + hidden +]}. + +%% @doc High-water mark of queued messages +{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [ + {default, "60%"}, + {datatype, string}, + hidden +]}. + +%% @doc Queue Qos0 messages? +{mapping, "mqtt.queue.qos0", "emqttd.queue", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.queue", fun(Conf) -> + Parse = fun(S) -> + {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), + list_to_integer(N) / 100 + end, + Opts = [{type, cuttlefish:conf_get("mqtt.queue.type", Conf, simple)}, + {max_length, cuttlefish:conf_get("mqtt.queue.max_length", Conf)}, + {low_watermark, Parse(cuttlefish:conf_get("mqtt.queue.low_watermark", Conf))}, + {high_watermark, Parse(cuttlefish:conf_get("mqtt.queue.high_watermark", Conf))}, + {queue_qos0, cuttlefish:conf_get("mqtt.queue.qos0", Conf)}], + case cuttlefish:conf_get("mqtt.queue.priority", Conf) of + undefined -> Opts; + V -> [{priority, + [begin [T, P] = string:tokens(S, "="), + {T, list_to_integer(P)} + end || S <- string:tokens(V, ",")]}|Opts] + end +end}. + +%%-------------------------------------------------------------------- +%% MQTT Broker +%%-------------------------------------------------------------------- + +{mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [ + {default, 60}, + {datatype, integer} +]}. + +%%-------------------------------------------------------------------- +%% MQTT PubSub +%%-------------------------------------------------------------------- + +{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [ + {default, true}, + {datatype, {enum, [true, false]}}, + hidden +]}. + +{translation, "emqttd.pubsub", fun(Conf) -> + [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)}, + {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)}, + {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] +end}. + +%%-------------------------------------------------------------------- +%% MQTT Bridge +%%-------------------------------------------------------------------- + +{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [ + {default, 10000}, + {datatype, integer} +]}. + +{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [ + {default, 1}, + {datatype, integer} +]}. + +{translation, "emqttd.bridge", fun(Conf) -> + [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)}, + {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}] +end}. + +%%------------------------------------------------------------------- +%% MQTT Plugins +%%------------------------------------------------------------------- + +{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [ + {datatype, string} +]}. + +{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [ + {datatype, string} +]}. + +%%-------------------------------------------------------------------- +%% MQTT Listeners +%%-------------------------------------------------------------------- + +{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ + {default, 1883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [ + {default, undefined}, + {datatype, string}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.recbuf", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.sndbuf", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.buffer", "emqttd.listeners", [ + {datatype, integer}, + hidden +]}. + +{mapping, "mqtt.listener.tcp.nodelay", "emqttd.listeners", [ + {datatype, {enum, [true, false]}}, + hidden +]}. + +{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ + {default, 512}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.rate_limit", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [ + {default, 15}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.verify", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.ssl.failed_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "mqtt.listener.http", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https", "emqttd.listeners", [ + {default, undefined}, + {datatype, [integer, ip]}, + hidden +]}. + +{mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.handshake_timeout", "emqttd.listeners", [ + {default, 15}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.https.keyfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.certfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.cacertfile", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.verify", "emqttd.listeners", [ + {datatype, string} +]}. + +{mapping, "mqtt.listener.https.failed_if_no_peer_cert", "emqttd.listeners", [ + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.listeners", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + LisOpts = fun(Prefix) -> + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, + {rate_limt, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}]) + end, + TcpOpts = fun(Prefix) -> + Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, + {recbuf, cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)}, + {sndbuf, cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)}, + {buffer, cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)}, + {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}]) + end, + SslOpts = fun(Prefix) -> + Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf)}, + {keyfile, cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)}, + {certfile, cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)}, + {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)}, + {verify, cuttlefish:conf_get(Prefix ++ ".verify_peer", Conf, undefined)}, + {failed_if_no_peer_cert, cuttlefish:conf_get(Prefix ++ "failed_if_no_peer_cert", Conf, undefined)}]) + end, + + Listeners = fun(Name) when is_atom(Name) -> + Key = "mqtt.listener." ++ atom_to_list(Name), + case cuttlefish:conf_get(Key, Conf, undefined) of + undefined -> + []; + Port -> + ConnOpts = Filter([{rate_limit, cuttlefish:conf_get(Key ++ ".rate_limit", Conf, undefined)}]), + Opts = [{connopts, ConnOpts}, {sockopts, TcpOpts(Key)} | LisOpts(Key)], + [{Name, Port, case Name =:= ssl orelse Name =:= https of + true -> [{ssl, SslOpts(Key)} | Opts]; + false -> Opts + end}] + end + end, + lists:append([Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)]) +end}. + +%%-------------------------------------------------------------------- +%% MQTT Modules +%%-------------------------------------------------------------------- + +{mapping, "mqtt.module.retainer", "emqttd.modules", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.retainer.storage_type", "emqttd.modules", [ + {default, ram}, + {datatype, {enum, [disc, ram]}} +]}. + +{mapping, "mqtt.module.retainer.max_message_num", "emqttd.modules", [ + {default, 100000}, + {datatype, integer} +]}. + +{mapping, "mqtt.module.retainer.max_payload_size", "emqttd.modules", [ + {default, "64KB"}, + {datatype, bytesize} +]}. + +{mapping, "mqtt.module.retainer.expired_after", "emqttd.modules", [ + {default, 0}, + {datatype, integer} +]}. + +{mapping, "mqtt.module.presence", "emqttd.modules", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.presence.qos", "emqttd.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "mqtt.module.subscription", "emqttd.modules", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.subscription.topics", "emqttd.modules", [ + {default, undefined}, + {datatype, string} +]}. + +{translation, "emqttd.modules", fun(Conf) -> + WithMod = fun(Name, OptsF) -> + Key = "mqtt.module." ++ atom_to_list(Name), + case cuttlefish:conf_get(Key, Conf, false) of + true -> [{Name, OptsF(Key)}]; + false -> [] + end + end, + RetainOpts = fun(Prefix) -> + [{storage_type, cuttlefish:conf_get(Prefix ++ ".storage_type", Conf, ram)}, + {max_message_num, cuttlefish:conf_get(Prefix ++ ".max_message_num", Conf, undefined)}, + {max_payload_size, cuttlefish:conf_get(Prefix ++ ".max_payload_size", Conf, undefined)}, + {expired_after, cuttlefish:conf_get(Prefix ++ ".expired_after", Conf, 0)}] + end, + PresOpts = fun(Prefix) -> + [{qos, cuttlefish:conf_get(Prefix ++ ".qos", Conf, 0)}] + end, + ParseFun = fun(undefined) -> []; + (Topics) -> [begin + [Topic, Qos] = string:tokens(S, "="), + {list_to_binary(Topic), list_to_integer(Qos)} + end || S <- string:tokens(Topics, ",")] + end, + SubOpts = fun(Prefix) -> [{topics, ParseFun(cuttlefish:conf_get(Prefix ++ ".topics", Conf))}] end, + lists:append([WithMod(retainer, RetainOpts), WithMod(presence, PresOpts), WithMod(subscription, SubOpts)]) +end}. + +%%-------------------------------------------------------------------- +%% System Monitor +%%-------------------------------------------------------------------- + +%% @doc Long GC, don't monitor in production mode for: +%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 +{mapping, "sysmon.long_gc", "emqttd.sysmon", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Long Schedule(ms) +{mapping, "sysmon.long_schedule", "emqttd.sysmon", [ + {default, 1000}, + {datatype, integer} +]}. + +%% @doc Large Heap +{mapping, "sysmon.large_heap", "emqttd.sysmon", [ + {default, "8MB"}, + {datatype, bytesize} +]}. + +%% @doc Monitor Busy Port +{mapping, "sysmon.busy_port", "emqttd.sysmon", [ + {default, false}, + {datatype, {enum, [true, false]}} +]}. + +%% @doc Monitor Busy Dist Port +{mapping, "sysmon.busy_dist_port", "emqttd.sysmon", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{translation, "emqttd.sysmon", fun(Conf) -> + [{long_gc, cuttlefish:conf_get("sysmon.long_gc", Conf)}, + {long_schedule, cuttlefish:conf_get("sysmon.long_schedule", Conf)}, + {large_heap, cuttlefish:conf_get("sysmon.large_heap", Conf)}, + {busy_port, cuttlefish:conf_get("sysmon.busy_port", Conf)}, + {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] +end}. + diff --git a/test/emqttd_access_SUITE.erl b/test/emqttd_access_SUITE.erl index bad624157..8f1a38fba 100644 --- a/test/emqttd_access_SUITE.erl +++ b/test/emqttd_access_SUITE.erl @@ -41,7 +41,6 @@ groups() -> init_per_group(access_control, Config) -> application:load(emqttd), prepare_config(), - gen_conf:init(emqttd), Config; init_per_group(_Group, Config) ->