From c0c79e8171f3e322b13326d6b8b2a4e95787c76d Mon Sep 17 00:00:00 2001 From: Feng Date: Tue, 11 Oct 2016 09:09:29 +0800 Subject: [PATCH] rm gen_conf and emqttd_conf --- etc/emqttd.2.0.conf | 252 ------------------ etc/emqttd.conf | 144 ++++++---- etc/emqttd.conf.3 | 31 --- priv/emqttd.schema | 480 +++++++++++++++++----------------- src/emqttd.erl | 11 +- src/emqttd_access_control.erl | 4 +- src/emqttd_app.erl | 4 +- src/emqttd_bridge_sup_sup.erl | 3 +- src/emqttd_conf.erl | 112 -------- src/emqttd_pubsub_sup.erl | 6 +- src/emqttd_session.erl | 11 +- src/emqttd_ws.erl | 2 +- src/emqttd_ws_client_sup.erl | 6 +- 13 files changed, 347 insertions(+), 719 deletions(-) delete mode 100644 etc/emqttd.2.0.conf delete mode 100644 src/emqttd_conf.erl diff --git a/etc/emqttd.2.0.conf b/etc/emqttd.2.0.conf deleted file mode 100644 index 98b460e86..000000000 --- a/etc/emqttd.2.0.conf +++ /dev/null @@ -1,252 +0,0 @@ - -%%-------------------------------------------------------------------- -%% MQTT Protocol -%%-------------------------------------------------------------------- - -%% Max ClientId Length Allowed. -{mqtt_max_clientid_len, 512}. - -%% Max Packet Size Allowed, 64K by default. -{mqtt_max_packet_size, 65536}. - -%% Client Idle Timeout. -{mqtt_client_idle_timeout, 30}. % Second - -%%-------------------------------------------------------------------- -%% Authentication -%%-------------------------------------------------------------------- - -%% Anonymous: Allow all -{auth, anonymous, []}. - -%% Authentication with username, password -{auth, username, [{passwd, "etc/modules/passwd.conf"}]}. - -%% Authentication with clientId -{auth, clientid, [{config, "etc/modules/client.conf"}, {password, no}]}. - -%%-------------------------------------------------------------------- -%% ACL -%%-------------------------------------------------------------------- - -{acl, anonymous, []}. - -{acl, internal, [{config, "etc/modules/acl.conf"}, {nomatch, allow}]}. - -%% Cache ACL result for PUBLISH -{cache_acl, true}. - - -%%-------------------------------------------------------------------- -%% Session -%%-------------------------------------------------------------------- - -%% Max number of QoS 1 and 2 messages that can be “inflight” at one time. -%% 0 means no limit -{session_max_inflight, 100}. - -%% Retry interval for redelivering QoS1/2 messages. -{session_unack_retry_interval, 60}. - -%% Awaiting PUBREL Timeout -{session_await_rel_timeout, 20}. - -%% Max Packets that Awaiting PUBREL, 0 means no limit -{session_max_awaiting_rel, 0}. - -%% Statistics Collection Interval(seconds) -{session_collect_interval, 0}. - -%% Expired after 2 day (unit: minute) -{session_expired_after, 2880}. - -%%-------------------------------------------------------------------- -%% Queue -%%-------------------------------------------------------------------- - -%% Type: simple | priority -{queue_type, simple}. - -%% Topic Priority: 0~255, Default is 0 -%% {queue_priority, [{"topic/1", 10}, {"topic/2", 8}]}. - -%% Max queue length. Enqueued messages when persistent client disconnected, -%% or inflight window is full. -{queue_max_length, infinity}. - -%% Low-water mark of queued messages -{queue_low_watermark, 0.2}. - -%% High-water mark of queued messages -{queue_high_watermark, 0.6}. - -%% Queue Qos0 messages? -{queue_qos0, true}. - -%%-------------------------------------------------------------------- -%% Listener -%%-------------------------------------------------------------------- - -%% Plain MQTT -{listener, mqtt, 1883, [ - %% Size of acceptor pool - {acceptors, 16}, - - %% Maximum number of concurrent clients - {max_clients, 512}, - - %% Mount point prefix - %% {mount_point, "prefix/"}, - - %% Socket Access Control - {access, [{allow, all}]}, - - %% Connection Options - {connopts, [ - %% Rate Limit. Format is 'burst, rate', Unit is KB/Sec - %% {rate_limit, "100,10"} %% 100K burst, 10K rate - ]}, - - %% Socket Options - {sockopts, [ - %Set buffer if hight thoughtput - %{recbuf, 4096}, - %{sndbuf, 4096}, - %{buffer, 4096}, - %{nodelay, true}, - {backlog, 1024} - ]} -]}. - -%% MQTT/SSL -{listener, mqtts, 8883, [ - %% Size of acceptor pool - {acceptors, 4}, - - %% Maximum number of concurrent clients - {max_clients, 512}, - - %% Mount point prefix - %% {mount_point, "secure/"}, - - %% Socket Access Control - {access, [{allow, all}]}, - - %% SSL certificate and key files - {ssl, [{handshake_timeout, 10000}, - %% Mutual SSL Authentication option - %% {verify, verify_peer}, - %% {cacertfile, "etc/ssl/ca.pem"}, - {certfile, "etc/ssl/ssl.crt"}, - {keyfile, "etc/ssl/ssl.key"}]}, - - %% Socket Options - {sockopts, [ - {backlog, 1024} - %{buffer, 4096}, - ]} -]}. - -%% HTTP and WebSocket Listener -{listener, http, 8083, [ - %% Size of acceptor pool - {acceptors, 4}, - - %% Maximum number of concurrent clients - {max_clients, 64}, - - %% Socket Access Control - {access, [{allow, all}]}, - - %% Socket Options - {sockopts, [ - {backlog, 1024} - %{buffer, 4096}, - ]} -]}. - -%%-------------------------------------------------------------------- -%% PubSub -%%-------------------------------------------------------------------- - -%% PubSub Pool Size. Default should be scheduler numbers. -{pubsub_pool_size, 8}. - -{pubsub_by_clientid, true}. - -%% Subscribe Asynchronously -{pubsub_async, true}. - -%%-------------------------------------------------------------------- -%% Bridge -%%-------------------------------------------------------------------- - -%% TODO: Bridge Queue Size -{bridge_max_queue_len, 10000}. - -%% Ping Interval of bridge node -{bridge_ping_down_interval, 1}. % second - -%%------------------------------------------------------------------- -%% Plugins -%%------------------------------------------------------------------- - -%% Dir of plugins' config -{plugins_etc_dir, "etc/plugins/"}. - -%% File to store loaded plugin names. -{plugins_loaded_file, "data/loaded_plugins"}. - -%%-------------------------------------------------------------------- -%% Modules -%%-------------------------------------------------------------------- - -%% Retainer Module -{module, retainer, [ - - %% disc: disc_copies, ram: ram_copies - {storage_type, disc}, - - %% Max number of retained messages - {max_message_num, 100000}, - - %% Max Payload Size of retained message - {max_playload_size, 65536}, - - %% Expired after seconds, never expired if 0 - {expired_after, 0} - -]}. - -%% Client presence management module. Publish presence messages when -%% client connected or disconnected. -{module, presence, [{qos, 0}]}. - -%% Subscribe topics automatically when client connected -{module, subscription, [{"$client/%c", 1}]}. - -%% [Rewrite](https://github.com/emqtt/emqttd/wiki/Rewrite) -%% {module, rewrite, [{config, "etc/modules/rewrite.conf"}]}. - -%%------------------------------------------------------------------- -%% Erlang System Monitor -%%------------------------------------------------------------------- - -%% Long GC, don't monitor in production mode for: -%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 - -{sysmon_long_gc, false}. - -%% Long Schedule(ms) -{sysmon_long_schedule, 240}. - -%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. -%% 8 * 1024 * 1024 -{sysmon_large_heap, 8388608}. - -%% Busy Port -{sysmon_busy_port, false}. - -%% Busy Dist Port -{sysmon_busy_dist_port, true}. - diff --git a/etc/emqttd.conf b/etc/emqttd.conf index f71829c79..28b16d72c 100644 --- a/etc/emqttd.conf +++ b/etc/emqttd.conf @@ -1,51 +1,70 @@ ##==================================================================== ## -## Config File for EMQ 3.0 +## Configration for EMQ 3.0 ## ##==================================================================== ##-------------------------------------------------------------------- -## Erlang VM +## Node ##-------------------------------------------------------------------- -## Erlang node name -vm.nodename = emqttd@127.0.0.1 +## Node Id: 1~255 +node.id = 1 -## Cookie for distributed erlang -vm.setcookie = emqsecretcookie +## Node Name +node.name = emqttd@127.0.0.1 + +## Cookie for distributed node +node.cookie = emq_dist_cookie ## SMP support: enable, auto, disable -vm.smp = auto +node.smp = auto ## Enable kernel poll -vm.kernel_poll = on +node.kernel_poll = on ## async thread pool -vm.async_threads = 32 +node.async_threads = 32 ## Erlang Process Limit -vm.process_limit = 256000 +node.process_limit = 256000 ## Sets the maximum number of simultaneously existing ports for this system -vm.max_ports = 262144 +node.max_ports = 262144 ## Set the distribution buffer busy limit (dist_buf_busy_limit) -vm.dist_buffer_size = 32MB +node.dist_buffer_size = 32MB ## Max ETS Tables. ## Note that mnesia and SSL will create temporary ets tables. -vm.max_ets_tables = 256000 +node.max_ets_tables = 256000 ## Tweak GC to run more often -vm.fullsweep_after = 1000 +node.fullsweep_after = 1000 ## Crash dump -vm.crash_dump = log/crash.dump +node.crash_dump = $(platform_log_dir)/crash.dump ##-------------------------------------------------------------------- ## Log ##-------------------------------------------------------------------- +## Console log. Enum: off, file, console, both +log.console = console + +## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency, none +log.console.level = error + +## Console log file +## log.console.file = $(platform_log_dir)/console.log + +## Error log file +log.error.file = $(platform_log_dir)/error.log + +## Enable the crash log. Enum: on, off +log.crash = on + +log.crash.file = $(platform_log_dir)/crash.log ##-------------------------------------------------------------------- ## MQTT Protocol @@ -60,45 +79,6 @@ mqtt.max_packet_size = 64KB ## Client Idle Timeout (Second) mqtt.client_idle_timeout = 30 -##-------------------------------------------------------------------- -## MQTT Listeners -##-------------------------------------------------------------------- - -## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 -## mqtt.listener.tcp = 1883 -## mqtt.listener.tcp.acceptors = 16 -## mqtt.listener.tcp.max_clients = 512 -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.tcp.rate_limit = 100,10 -## Mount Point -## mqtt.listener.tcp.mount_point = prefix/ -## TCP Socket Options. Set buffer if hight thoughtput -## mqtt.listener.tcp.opts.recbuf = 4096 -## mqtt.listener.tcp.opts.sndbuf = 4096 -## mqtt.listener.tcp.opts.buffer = 4096 -## mqtt.listener.tcp.opts.nodelay = true -## mqtt.listener.tcp.opts.backlog = 1024 - -## SSL Listener -## mqtt.listener.ssl = 883 -## mqtt.listener.ssl.acceptors = 4 -## mqtt.listener.ssl.max_clients = 512 -## Rate Limit. Format is 'burst,rate', Unit is KB/Sec -## mqtt.listener.ssl.rate_limit = 100,10 -## Mount Point -## mqtt.listener.ssl.mount_point = prefix/ -## mqtt.listener.ssl.opts.handshake_timeout = 10 ## Seconds -## mqtt.listener.ssl.opts.certfile = etc/ssl/cert.pem -## mqtt.listener.ssl.opts.keyfile = etc/ssl/key.pem -## mqtt.listener.ssl.opts.cacertfile = etc/ssl/cacert.pem -## mqtt.listener.ssl.opts.verify = verify_peer -## mqtt.listener.ssl.opts.failed_if_no_peer_cert = true - -## HTTP Listener -## mqtt.listener.http = 8083 -## mqtt.listener.http.acceptors = 4 -## mqtt.listener.http.max_clients = 64 - ##-------------------------------------------------------------------- ## MQTT Session ##-------------------------------------------------------------------- @@ -108,7 +88,7 @@ mqtt.client_idle_timeout = 30 mqtt.session.max_inflight = 100 ## Retry interval for redelivering QoS1/2 messages. -mqtt.session.unack_retry_interval = 60 +mqtt.session.retry_interval = 60 ## Awaiting PUBREL Timeout mqtt.session.await_rel_timeout = 20 @@ -165,11 +145,55 @@ mqtt.pubsub.by_clientid = true ## Subscribe Asynchronously mqtt.pubsub.async = true +##-------------------------------------------------------------------- +## MQTT Listeners +##-------------------------------------------------------------------- + +## TCP Listener: 1883, 127.0.0.1:1883, ::1:1883 +mqtt.listener.tcp = 1883 +mqtt.listener.tcp.acceptors = 8 +mqtt.listener.tcp.max_clients = 1024 +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.tcp.rate_limit = 100,10 +## Mount Point +## mqtt.listener.tcp.mount_point = prefix/ +## TCP Socket Options. Set buffer if hight thoughtput +## mqtt.listener.tcp.opts.recbuf = 4096 +## mqtt.listener.tcp.opts.sndbuf = 4096 +## mqtt.listener.tcp.opts.buffer = 4096 +## mqtt.listener.tcp.opts.nodelay = true +## mqtt.listener.tcp.opts.backlog = 1024 + +## SSL Listener +mqtt.listener.ssl = 8883 +mqtt.listener.ssl.acceptors = 4 +mqtt.listener.ssl.max_clients = 512 +## Rate Limit. Format is 'burst,rate', Unit is KB/Sec +## mqtt.listener.ssl.rate_limit = 100,10 +## Mount Point +## mqtt.listener.ssl.mount_point = prefix/ +## mqtt.listener.ssl.opts.handshake_timeout = 10 ## Seconds +## mqtt.listener.ssl.opts.certfile = $(platform_etc_dir)/ssl/cert.pem +## mqtt.listener.ssl.opts.keyfile = $(platform_etc_dir)/ssl/key.pem +## mqtt.listener.ssl.opts.cacertfile = $(platform_etc_dir)/ssl/cacert.pem +## mqtt.listener.ssl.opts.verify = verify_peer +## mqtt.listener.ssl.opts.failed_if_no_peer_cert = true + +## HTTP Listener +mqtt.listener.http = 127.0.0.1:8083 +mqtt.listener.http.acceptors = 4 +mqtt.listener.http.max_clients = 64 + +## HTTP(SSL) Listener +## mqtt.listener.http = 8083 +## mqtt.listener.http.acceptors = 4 +## mqtt.listener.http.max_clients = 64 + ##-------------------------------------------------------------------- ## MQTT Bridge ##-------------------------------------------------------------------- -## TODO: Bridge Queue Size +## Bridge Queue Size mqtt.bridge.max_queue_len = 10000 ## Ping Interval of bridge node @@ -180,10 +204,10 @@ mqtt.bridge.ping_down_interval = 1 ## Second ##------------------------------------------------------------------- ## Dir of plugins' config -## mqtt.plugins.etc_dir = etc/plugins/ +mqtt.plugins.etc_dir = $(platform_etc_dir)/plugins/ ## File to store loaded plugin names. -## mqtt.plugins.loaded_file = data/loaded_plugins +mqtt.plugins.loaded_file = $(platform_data_dir)/loaded_plugins ##------------------------------------------------------------------- ## MQTT Modules @@ -235,3 +259,7 @@ sysmon.busy_port = false ## Busy Dist Port sysmon.busy_dist_port = true +platform_etc_dir = ./etc + +platform_log_dir = ./log + diff --git a/etc/emqttd.conf.3 b/etc/emqttd.conf.3 index a353e48d0..95d0800ca 100644 --- a/etc/emqttd.conf.3 +++ b/etc/emqttd.conf.3 @@ -28,19 +28,6 @@ %% Cache ACL result for PUBLISH {cache_acl, true}. -##-------------------------------------------------------------------- -## Broker -##-------------------------------------------------------------------- - -## System interval of publishing broker $SYS messages -broker.sys_interval = 60. - -##-------------------------------------------------------------------- -## Session -##-------------------------------------------------------------------- - - - %%-------------------------------------------------------------------- %% Listener %%-------------------------------------------------------------------- @@ -190,21 +177,3 @@ broker.sys_interval = 60. %% Erlang System Monitor %%------------------------------------------------------------------- -%% Long GC, don't monitor in production mode for: -%% https://github.com/erlang/otp/blob/feb45017da36be78d4c5784d758ede619fa7bfd3/erts/emulator/beam/erl_gc.c#L421 - -sysmon.long_gc = false - -%% Long Schedule(ms) -sysmon.long_schedule = 240 - -%% 8M words. 32MB on 32-bit VM, 64MB on 64-bit VM. -%% 8 * 1024 * 1024 -sysmon.large_heap = 8388608 - -%% Busy Port -sysmon.busy_port = false - -%% Busy Dist Port -sysmon.busy_dist_port = true - diff --git a/priv/emqttd.schema b/priv/emqttd.schema index 2f56739a7..2a61cd4a0 100644 --- a/priv/emqttd.schema +++ b/priv/emqttd.schema @@ -2,42 +2,49 @@ %% EMQ 3.0 config mapping %%-------------------------------------------------------------------- -%% Erlang VM Args +%% Erlang Node %%-------------------------------------------------------------------- +%% @doc Node Id +{mapping, "node.id", "emqttd.node_id", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-255"]} +]}. + %% @doc Erlang node name -{mapping, "vm.nodename", "vm_args.-name", [ +{mapping, "node.name", "vm_args.-name", [ {default, "emqttd@127.0.0.1"} ]}. %% @doc Secret cookie for distributed erlang node -{mapping, "vm.setcookie", "vm_args.-setcookie", [ +{mapping, "node.cookie", "vm_args.-setcookie", [ {default, "emqsecretcookie"} ]}. %% @doc SMP Support -{mapping, "vm.smp", "vm_args.-smp", [ +{mapping, "node.smp", "vm_args.-smp", [ {default, auto}, {datatype, {enum, [enable, auto, disable]}}, hidden ]}. %% @doc Enable Kernel Poll -{mapping, "vm.kernel_poll", "vm_args.+K", [ +{mapping, "node.kernel_poll", "vm_args.+K", [ {default, on}, {datatype, flag}, hidden ]}. %% @doc More information at: http://erlang.org/doc/man/erl.html -{mapping, "vm.async_threads", "vm_args.+A", [ +{mapping, "node.async_threads", "vm_args.+A", [ {default, 64}, {datatype, integer}, {validators, ["range:0-1024"]} ]}. %% @doc Erlang Process Limit -{mapping, "vm.process_limit", "vm_args.+P", [ +{mapping, "node.process_limit", "vm_args.+P", [ {datatype, integer}, {default, 256000}, hidden @@ -46,7 +53,7 @@ %% Note: OTP R15 and earlier uses -env ERL_MAX_PORTS, R16+ uses +Q %% @doc The number of concurrent ports/sockets %% Valid range is 1024-134217727 -{mapping, "vm.max_ports", +{mapping, "node.max_ports", cuttlefish:otp("R16", "vm_args.+Q", "vm_args.-env ERL_MAX_PORTS"), [ {default, 262144}, {datatype, integer}, @@ -57,7 +64,7 @@ fun(X) -> X >= 1024 andalso X =< 134217727 end}. %% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl -{mapping, "vm.dist_buffer_size", "vm_args.+zdbbl", [ +{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ {datatype, bytesize}, {commented, "32MB"}, hidden, @@ -83,7 +90,7 @@ }. %% @doc http://www.erlang.org/doc/man/erlang.html#system_flag-2 -{mapping, "vm.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ +{mapping, "node.fullsweep_after", "vm_args.-env ERL_FULLSWEEP_AFTER", [ {default, 1000}, {datatype, integer}, hidden, @@ -96,7 +103,7 @@ %% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, %% R16+ uses +e %% @doc The ETS table limit -{mapping, "vm.max_ets_tables", +{mapping, "node.max_ets_tables", cuttlefish:otp("R16", "vm_args.+e", "vm_args.-env ERL_MAX_ETS_TABLES"), [ {default, 256000}, {datatype, integer}, @@ -104,35 +111,130 @@ ]}. %% @doc Set the location of crash dumps -{mapping, "vm.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ +{mapping, "node.crash_dump", "vm_args.-env ERL_CRASH_DUMP", [ {default, "{{crash_dump}}"}, {datatype, file}, hidden ]}. +%%-------------------------------------------------------------------- +%% Log +%%-------------------------------------------------------------------- + +{mapping, "log.console", "lager.handlers", [ + {default, file }, + {datatype, {enum, [off, file, console, both]}} +]}. + +{mapping, "log.console.level", "lager.handlers", [ + {default, info}, + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} +]}. + +{mapping, "log.console.file", "lager.handlers", [ + {default, "$(platform_log_dir)/console.log"}, + {datatype, file} +]}. + +{mapping, "log.error.file", "lager.handlers", [ + {default, "$(platform_log_dir)/error.log"}, + {datatype, file} +]}. + +{mapping, "log.error.redirect", "lager.error_logger_redirect", [ + {default, on}, + {datatype, flag}, + hidden +]}. + +{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ + {default, 1000}, + {datatype, integer}, + hidden +]}. + +{translation, + "lager.handlers", + fun(Conf) -> + ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of + undefined -> []; + ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, + {level, error}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}]}] + end, + + ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), + ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), + + ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, + ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, + {level, ConsoleLogLevel}, + {size, 10485760}, + {date, "$D0"}, + {count, 5}]}, + + ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of + off -> []; + file -> [ConsoleFileHandler]; + console -> [ConsoleHandler]; + both -> [ConsoleHandler, ConsoleFileHandler]; + _ -> [] + end, + ConsoleHandlers ++ ErrorHandler + end +}. + +{mapping, "log.crash", "lager.crash_log", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "log.crash.file", "lager.crash_log", [ + {default, "$(platform_log_dir)/crash.log"}, + {datatype, file} +]}. + +{translation, + "lager.crash_log", + fun(Conf) -> + case cuttlefish:conf_get("log.crash", Conf) of + false -> undefined; + _ -> + cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") + end + end}. + +{mapping, "sasl", "sasl.sasl_error_logger", [ + {default, off}, + {datatype, flag}, + hidden +]}. + %%-------------------------------------------------------------------- %% MQTT Protocol %%-------------------------------------------------------------------- %% @doc Set the Max ClientId Length Allowed. -{mapping, "mqtt.max_clientid_len", "emqttd.mqtt_protocol", [ +{mapping, "mqtt.max_clientid_len", "emqttd.protocol", [ {default, 1024}, {datatype, integer} ]}. %% @doc Max Packet Size Allowed, 64K by default. -{mapping, "mqtt.max_packet_size", "emqttd.mqtt_protocol", [ +{mapping, "mqtt.max_packet_size", "emqttd.protocol", [ {default, "64KB"}, {datatype, bytesize} ]}. %% @doc Client Idle Timeout. -{mapping, "mqtt.client_idle_timeout", "emqttd.mqtt_protocol", [ +{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [ {default, 30}, {datatype, integer} ]}. -{translation, "emqttd.mqtt_protocol", fun(Conf) -> +{translation, "emqttd.protocol", fun(Conf) -> [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)}, {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}, {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}] @@ -144,45 +246,45 @@ end}. %% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time. %% 0 means no limit -{mapping, "mqtt.session.max_inflight", "emqttd.mqtt_session", [ +{mapping, "mqtt.session.max_inflight", "emqttd.session", [ {default, 100}, {datatype, integer} ]}. %% @doc Retry interval for redelivering QoS1/2 messages. -{mapping, "mqtt.session.unack_retry_interval", "emqttd.mqtt_session", [ +{mapping, "mqtt.session.retry_interval", "emqttd.session", [ {default, 60}, {datatype, integer} ]}. %% @doc Awaiting PUBREL Timeout -{mapping, "mqtt.session.await_rel_timeout", "emqttd.mqtt_session", [ +{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [ {default, 30}, {datatype, integer} ]}. %% @doc Max Packets that Awaiting PUBREL, 0 means no limit -{mapping, "mqtt.session.max_awaiting_rel", "emqttd.mqtt_session", [ +{mapping, "mqtt.session.max_awaiting_rel", "emqttd.session", [ {default, 0}, {datatype, integer} ]}. %% @doc Statistics Collection Interval(seconds) -{mapping, "mqtt.session.collect_interval", "emqttd.mqtt_session", [ +{mapping, "mqtt.session.collect_interval", "emqttd.session", [ {default, 0}, {datatype, integer} ]}. %% @doc Session expired after... -{mapping, "mqtt.session.expired_after", "emqttd.mqtt_session", [ +{mapping, "mqtt.session.expired_after", "emqttd.session", [ {default, "2d"}, {datatype, {duration, s}} ]}. -{translation, "emqttd.mqtt_session", fun(Conf) -> +{translation, "emqttd.session", fun(Conf) -> [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)}, - {unack_retry_interval, cuttlefish:conf_get("mqtt.session.unack_retry_interval", Conf)}, + {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)}, {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)}, {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)}, {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)}, @@ -194,45 +296,45 @@ end}. %%-------------------------------------------------------------------- %% @doc Type: simple | priority -{mapping, "mqtt.queue.type", "emqttd.mqtt_queue", [ +{mapping, "mqtt.queue.type", "emqttd.queue", [ {default, simple}, {datatype, atom} ]}. %% @doc Topic Priority: 0~255, Default is 0 -{mapping, "mqtt.queue.priority", "emqttd.mqtt_queue", [ +{mapping, "mqtt.queue.priority", "emqttd.queue", [ {default, ""}, {datatype, string}, hidden ]}. %% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full. -{mapping, "mqtt.queue.max_length", "emqttd.mqtt_queue", [ +{mapping, "mqtt.queue.max_length", "emqttd.queue", [ {default, infinity}, {datatype, [atom, integer]} ]}. %% @doc Low-water mark of queued messages -{mapping, "mqtt.queue.low_watermark", "emqttd.mqtt_queue", [ +{mapping, "mqtt.queue.low_watermark", "emqttd.queue", [ {default, "20%"}, {datatype, string}, hidden ]}. %% @doc High-water mark of queued messages -{mapping, "mqtt.queue.high_watermark", "emqttd.mqtt_queue", [ +{mapping, "mqtt.queue.high_watermark", "emqttd.queue", [ {default, "60%"}, {datatype, string}, hidden ]}. %% @doc Queue Qos0 messages? -{mapping, "mqtt.queue.qos0", "emqttd.mqtt_queue", [ +{mapping, "mqtt.queue.qos0", "emqttd.queue", [ {default, true}, {datatype, {enum, [true, false]}} ]}. -{translation, "emqttd.mqtt_queue", fun(Conf) -> +{translation, "emqttd.queue", fun(Conf) -> Parse = fun(S) -> {match, [N]} = re:run(S, "^([0-9]+)%$", [{capture, all_but_first, list}]), list_to_integer(N) / 100 @@ -254,58 +356,150 @@ end}. %%-------------------------------------------------------------------- %% MQTT Broker %%-------------------------------------------------------------------- -{mapping, "mqtt.broker.sys_interval", "emqttd.mqtt_broker", [ + +{mapping, "mqtt.broker.sys_interval", "emqttd.broker", [ {default, 60}, {datatype, integer} ]}. -{translation, "emqttd.mqtt_broker", fun(Conf) -> +{translation, "emqttd.broker", fun(Conf) -> [{sys_interval, cuttlefish:conf_get("mqtt.broker.sys_interval", Conf)}] end}. %%-------------------------------------------------------------------- %% MQTT PubSub %%-------------------------------------------------------------------- -{mapping, "mqtt.pubsub.pool_size", "emqttd.mqtt_pubsub", [ + +{mapping, "mqtt.pubsub.pool_size", "emqttd.pubsub", [ {default, 8}, {datatype, integer} ]}. -{mapping, "mqtt.pubsub.by_clientid", "emqttd.mqtt_pubsub", [ +{mapping, "mqtt.pubsub.by_clientid", "emqttd.pubsub", [ {default, true}, {datatype, {enum, [true, false]}} ]}. -{mapping, "mqtt.pubsub.async", "emqttd.mqtt_pubsub", [ +{mapping, "mqtt.pubsub.async", "emqttd.pubsub", [ {default, true}, {datatype, {enum, [true, false]}}, hidden ]}. -{translation, "emqttd.mqtt_pubsub", fun(Conf) -> +{translation, "emqttd.pubsub", fun(Conf) -> [{pool_size, cuttlefish:conf_get("mqtt.pubsub.pool_size", Conf)}, {by_clientid, cuttlefish:conf_get("mqtt.pubsub.by_clientid", Conf)}, {async, cuttlefish:conf_get("mqtt.pubsub.async", Conf)}] end}. +%%-------------------------------------------------------------------- +%% MQTT Listeners +%%-------------------------------------------------------------------- + +{mapping, "mqtt.listener.tcp", "emqttd.listeners", [ + {default, 1883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.tcp.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.tcp.max_clients", "emqttd.listeners", [ + {default, 1024}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.ssl.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.ssl.max_clients", "emqttd.listeners", [ + {default, 512}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.http", "emqttd.listeners", [ + {default, 8883}, + {datatype, [integer, ip]} +]}. + +{mapping, "mqtt.listener.http.acceptors", "emqttd.listeners", [ + {default, 8}, + {datatype, integer} +]}. + +{mapping, "mqtt.listener.http.max_clients", "emqttd.listeners", [ + {default, 64}, + {datatype, integer} +]}. + +{translation, "emqttd.listeners", fun(Conf) -> + TcpListeners = case cuttlefish:conf_get("mqtt.listener.tcp", Conf) of + undefined -> + []; + TcpPort -> + TcpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.tcp.acceptors", Conf)}, + {max_clients, cuttlefish:conf_get("mqtt.listener.tcp.max_clients", Conf)}], + [{tcp, TcpPort, TcpOpts}] + end, + SslListeners = case cuttlefish:conf_get("mqtt.listener.ssl", Conf) of + undefined -> + []; + SslPort -> + SslOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.ssl.acceptors", Conf)}, + {max_clients, cuttlefish:conf_get("mqtt.listener.ssl.max_clients", Conf)}], + [{ssl, SslPort, SslOpts}] + end, + HttpListeners = case cuttlefish:conf_get("mqtt.listener.http", Conf) of + undefined -> + []; + HttPort -> + HttpOpts = [{acceptors, cuttlefish:conf_get("mqtt.listener.http.acceptors", Conf)}, + {max_clients, cuttlefish:conf_get("mqtt.listener.http.max_clients", Conf)}], + [{http, HttPort, HttpOpts}] + end, + TcpListeners ++ SslListeners ++ HttpListeners +end}. + %%-------------------------------------------------------------------- %% MQTT Bridge %%-------------------------------------------------------------------- -{mapping, "mqtt.bridge.max_queue_len", "emqttd.mqtt_bridge", [ + +{mapping, "mqtt.bridge.max_queue_len", "emqttd.bridge", [ {default, 10000}, {datatype, integer} ]}. -{mapping, "mqtt.bridge.ping_down_interval", "emqttd.mqtt_bridge", [ +{mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [ {default, 1}, {datatype, integer} ]}. -{translation, "emqttd.mqtt_bridge", fun(Conf) -> +{translation, "emqttd.bridge", fun(Conf) -> [{max_queue_len, cuttlefish:conf_get("mqtt.bridge.max_queue_len", Conf)}, {ping_down_interval, cuttlefish:conf_get("mqtt.bridge.ping_down_interval", Conf)}] end}. +%%------------------------------------------------------------------- +%% MQTT Plugins +%%------------------------------------------------------------------- + +{mapping, "mqtt.plugins.etc_dir", "emqttd.plugins_etc_dir", [ + {datatype, string} +]}. + +{mapping, "mqtt.plugins.loaded_file", "emqttd.plugins_loaded_file", [ + {datatype, string} +]}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- @@ -349,212 +543,18 @@ end}. {busy_dist_port, cuttlefish:conf_get("sysmon.busy_dist_port", Conf)}] end}. -%% @doc Where to emit the default log messages (typically at 'info' -%% severity): -%% off: disabled -%% file: the file specified by log.console.file -%% console: to standard output (seen when using `riak attach-direct`) -%% both: log.console.file and standard out. -{mapping, "log.console", "lager.handlers", [ - {default, file }, - {datatype, {enum, [off, file, console, both]}} -]}. - -%% @doc The severity level of the console log, default is 'info'. -{mapping, "log.console.level", "lager.handlers", [ - {default, info}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} -]}. - -%% @doc When 'log.console' is set to 'file' or 'both', the file where -%% console messages will be logged. -{mapping, "log.console.file", "lager.handlers", [ - {default, "$(platform_log_dir)/console.log"}, - {datatype, file} -]}. - -%% @doc The file where error messages will be logged. -{mapping, "log.error.file", "lager.handlers", [ - {default, "$(platform_log_dir)/error.log"}, - {datatype, file} -]}. - -%% @doc When set to 'on', enables log output to syslog. -{mapping, "log.syslog", "lager.handlers", [ - {default, off}, - {datatype, flag} -]}. - -%% @doc When set to 'on', enables log output to syslog. -{mapping, "log.syslog.ident", "lager.handlers", [ - {default, "riak"}, - hidden -]}. - -%% @doc Syslog facility to log entries from Riak. -{mapping, "log.syslog.facility", "lager.handlers", [ - {default, daemon}, - {datatype, {enum,[kern, user, mail, daemon, auth, syslog, - lpr, news, uucp, clock, authpriv, ftp, - cron, local0, local1, local2, local3, - local4, local5, local6, local7]}}, - hidden -]}. - -%% @doc The severity level at which to log entries to syslog, default is 'info'. -{mapping, "log.syslog.level", "lager.handlers", [ - {default, info}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}, - hidden -]}. - -{translation, - "lager.handlers", - fun(Conf) -> - SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of - true -> - Ident = cuttlefish:conf_get("log.syslog.ident", Conf), - Facility = cuttlefish:conf_get("log.syslog.facility", Conf), - LogLevel = cuttlefish:conf_get("log.syslog.level", Conf), - [{lager_syslog_backend, [Ident, Facility, LogLevel]}]; - _ -> [] - end, - ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf) of - undefined -> []; - ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, - {level, error}, - {size, 10485760}, - {date, "$D0"}, - {count, 5}]}] - end, - - ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), - ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), - - ConsoleHandler = {lager_console_backend, ConsoleLogLevel}, - ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, - {level, ConsoleLogLevel}, - {size, 10485760}, - {date, "$D0"}, - {count, 5}]}, - - ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of - off -> []; - file -> [ConsoleFileHandler]; - console -> [ConsoleHandler]; - both -> [ConsoleHandler, ConsoleFileHandler]; - _ -> [] - end, - SyslogHandler ++ ConsoleHandlers ++ ErrorHandler - end -}. - - -%% @doc Whether to enable Erlang's built-in error logger. -{mapping, "sasl", "sasl.sasl_error_logger", [ - {default, off}, - {datatype, flag}, - hidden -]}. - -%% @doc Whether to enable the crash log. -{mapping, "log.crash", "lager.crash_log", [ - {default, on}, - {datatype, flag} -]}. - -%% @doc If the crash log is enabled, the file where its messages will -%% be written. -{mapping, "log.crash.file", "lager.crash_log", [ - {default, "$(platform_log_dir)/crash.log"}, - {datatype, file} -]}. - -{translation, - "lager.crash_log", - fun(Conf) -> - case cuttlefish:conf_get("log.crash", Conf) of - false -> undefined; - _ -> - cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") - end - end}. - -%% @doc Maximum size in bytes of individual messages in the crash log -{mapping, "log.crash.maximum_message_size", "lager.crash_log_msg_size", [ - {default, "64KB"}, - {datatype, bytesize} -]}. - -%% @doc Maximum size of the crash log in bytes, before it is rotated -{mapping, "log.crash.size", "lager.crash_log_size", [ - {default, "10MB"}, - {datatype, bytesize} -]}. - -%% @doc The schedule on which to rotate the crash log. For more -%% information see: -%% https://github.com/basho/lager/blob/master/README.md#internal-log-rotation -{mapping, "log.crash.rotation", "lager.crash_log_date", [ - {default, "$D0"} -]}. - -%% @doc The number of rotated crash logs to keep. When set to -%% 'current', only the current open log file is kept. -{mapping, "log.crash.rotation.keep", "lager.crash_log_count", [ - {default, 5}, - {datatype, [integer, {atom, current}]}, - {validators, ["rotation_count"]} -]}. - -{validator, - "rotation_count", - "must be 'current' or a positive integer", - fun(current) -> true; - (Int) when is_integer(Int) andalso Int >= 0 -> true; - (_) -> false - end}. - -{translation, - "lager.crash_log_count", - fun(Conf) -> - case cuttlefish:conf_get("log.crash.rotation.keep", Conf) of - current -> 0; - Int -> Int - end - end}. - -%% @doc Whether to redirect error_logger messages into lager - -%% defaults to true -{mapping, "log.error.redirect", "lager.error_logger_redirect", [ - {default, on}, - {datatype, flag}, - hidden -]}. - -%% @doc Maximum number of error_logger messages to handle in a second -{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ - {default, 100}, - {datatype, integer}, - hidden -]}. - - -%% @doc Cookie for distributed node communication. All nodes in the -%% same cluster should use the same cookie or they will not be able to -%% communicate. -{mapping, "distributed_cookie", "vm_args.-setcookie", [ - {default, "riak"} -]}. - -%% @see platform_bin_dir {mapping, "platform_etc_dir", "emqttd.platform_etc_dir", [ {datatype, directory}, {default, "./etc"} ]}. -%% @see platform_bin_dir {mapping, "platform_log_dir", "emqttd.platform_log_dir", [ {datatype, directory}, {default, "./log"} ]}. + +{mapping, "platform_data_dir", "emqttd.platform_data_dir", [ + {datatype, directory}, + {default, "./data"} +]}. + diff --git a/src/emqttd.erl b/src/emqttd.erl index 07464ee56..c360a5870 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -22,7 +22,7 @@ -include("emqttd_protocol.hrl"). --export([start/0, conf/1, conf/2, env/1, env/2, is_running/1]). +-export([start/0, env/1, env/2, is_running/1]). %% PubSub API -export([subscribe/1, subscribe/2, subscribe/3, publish/1, @@ -57,15 +57,8 @@ -spec(start() -> ok | {error, any()}). start() -> application:start(?APP). -%% @doc Get Config --spec(conf(Key :: atom()) -> any()). -conf(Key) -> emqttd_conf:value(Key). - --spec(conf(Key :: atom(), Default :: any()) -> any()). -conf(Key, Default) -> emqttd_conf:value(Key, Default). - %% @doc Environment --spec(env(Key:: atom()) -> any()). +-spec(env(Key:: atom()) -> {ok, any()} | undefined). env(Key) -> application:get_env(?APP, Key). %% @doc Get environment diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index fb36892c7..c4dd4f037 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -125,8 +125,8 @@ stop() -> gen_server:call(?MODULE, stop). init([]) -> ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), - ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}), - ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(gen_conf:list(emqttd, acl))}), + %%ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(gen_conf:list(emqttd, auth))}), + %%ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(gen_conf:list(emqttd, acl))}), {ok, #state{}}. init_mods(Mods) -> diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 2299a7ec0..918452052 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -42,7 +42,6 @@ Reason :: term()). start(_StartType, _StartArgs) -> print_banner(), - emqttd_conf:init(), emqttd_mnesia:start(), {ok, Sup} = emqttd_sup:start_link(), start_servers(Sup), @@ -187,7 +186,8 @@ start_listener({listener, https, ListenOn, Opts}) -> mochiweb:start_http(https, ListenOn, Opts, {emqttd_http, handle_request, []}). start_listener(Protocol, ListenOn, Opts) -> - MFArgs = {emqttd_client, start_link, [emqttd_conf:mqtt()]}, + {ok, Env} = emqttd:env(protocol), + MFArgs = {emqttd_client, start_link, [Env]}, {ok, _} = esockd:open(Protocol, ListenOn, merge_sockopts(Opts), MFArgs). merge_sockopts(Options) -> diff --git a/src/emqttd_bridge_sup_sup.erl b/src/emqttd_bridge_sup_sup.erl index 82f7af3b0..109f94764 100644 --- a/src/emqttd_bridge_sup_sup.erl +++ b/src/emqttd_bridge_sup_sup.erl @@ -46,7 +46,8 @@ start_bridge(Node, Topic) when is_atom(Node) andalso is_binary(Topic) -> start_bridge(Node, _Topic, _Options) when Node =:= node() -> {error, bridge_to_self}; start_bridge(Node, Topic, Options) when is_atom(Node) andalso is_binary(Topic) -> - Options1 = emqttd_opts:merge(emqttd_conf:bridge(), Options), + {ok, BridgeEnv} = emqttd:env(bridge), + Options1 = emqttd_opts:merge(BridgeEnv, Options), supervisor:start_child(?MODULE, bridge_spec(Node, Topic, Options1)). %% @doc Stop a bridge diff --git a/src/emqttd_conf.erl b/src/emqttd_conf.erl deleted file mode 100644 index b3677d6b4..000000000 --- a/src/emqttd_conf.erl +++ /dev/null @@ -1,112 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_conf). - --export([init/0]). - --export([mqtt/0, session/0, queue/0, bridge/0, pubsub/0]). - --export([value/1, value/2, list/1]). - --define(APP, emqttd). - -init() -> gen_conf:init(?APP). - -mqtt() -> - with_env(mqtt_protocol, [ - %% Max ClientId Length Allowed. - {max_clientid_len, value(mqtt_max_clientid_len, 512)}, - %% Max Packet Size Allowed, 64K by default. - {max_packet_size, value(mqtt_max_packet_size, 65536)}, - %% Client Idle Timeout. - {client_idle_timeout, value(mqtt_client_idle_timeout, 30)} - ]). - -session() -> - with_env(mqtt_session, [ - %% Max number of QoS 1 and 2 messages that can be “inflight” at one time. - %% 0 means no limit - {max_inflight, value(session_max_inflight, 100)}, - - %% Retry interval for redelivering QoS1/2 messages. - {unack_retry_interval, value(session_unack_retry_interval, 60)}, - - %% Awaiting PUBREL Timeout - {await_rel_timeout, value(session_await_rel_timeout, 20)}, - - %% Max Packets that Awaiting PUBREL, 0 means no limit - {max_awaiting_rel, value(session_max_awaiting_rel, 0)}, - - %% Statistics Collection Interval(seconds) - {collect_interval, value(session_collect_interval, 0)}, - - %% Expired after 2 day (unit: minute) - {expired_after, value(session_expired_after, 2880)} - ]). - -queue() -> - with_env(mqtt_queue, [ - %% Type: simple | priority - {type, value(queue_type, simple)}, - - %% Topic Priority: 0~255, Default is 0 - {priority, value(queue_priority, [])}, - - %% Max queue length. Enqueued messages when persistent client disconnected, - %% or inflight window is full. - {max_length, value(queue_max_length, infinity)}, - - %% Low-water mark of queued messages - {low_watermark, value(queue_low_watermark, 0.2)}, - - %% High-water mark of queued messages - {high_watermark, value(queue_high_watermark, 0.6)}, - - %% Queue Qos0 messages? - {queue_qos0, value(queue_qos0, true)} - ]). - -bridge() -> - with_env(mqtt_bridge, [ - {max_queue_len, value(bridge_max_queue_len, 10000)}, - - %% Ping Interval of bridge node - {ping_down_interval, value(bridge_ping_down_interval, 1)} - ]). - -pubsub() -> - with_env(mqtt_pubsub, [ - %% PubSub and Router. Default should be scheduler numbers. - {pool_size, value(pubsub_pool_size, 8)} - ]). - -value(Key) -> - with_env(Key, gen_conf:value(?APP, Key)). - -value(Key, Default) -> - with_env(Key, gen_conf:value(?APP, Key, Default)). - -with_env(Key, Conf) -> - case application:get_env(?APP, Key) of - undefined -> - application:set_env(?APP, Key, Conf), Conf; - {ok, Val} -> - Val - end. - -list(Key) -> gen_conf:list(?APP, Key). - diff --git a/src/emqttd_pubsub_sup.erl b/src/emqttd_pubsub_sup.erl index e57eee517..988dc6553 100644 --- a/src/emqttd_pubsub_sup.erl +++ b/src/emqttd_pubsub_sup.erl @@ -32,7 +32,7 @@ %%-------------------------------------------------------------------- start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:pubsub()]). + supervisor:start_link({local, ?MODULE}, ?MODULE, []). pubsub_pool() -> hd([Pid || {pubsub_pool, Pid, _, _} <- supervisor:which_children(?MODULE)]). @@ -41,10 +41,10 @@ pubsub_pool() -> %% Supervisor Callbacks %%-------------------------------------------------------------------- -init([Env]) -> +init([]) -> + {ok, Env} = emqttd:env(pubsub), %% Create ETS Tables [create_tab(Tab) || Tab <- [mqtt_subproperty, mqtt_subscriber, mqtt_subscription]], - {ok, { {one_for_all, 10, 3600}, [pool_sup(pubsub, Env), pool_sup(server, Env)]} }. %%-------------------------------------------------------------------- diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index b6276ab85..f53f363a4 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -214,8 +214,9 @@ unsubscribe(SessPid, TopicTable) -> init([CleanSess, {ClientId, Username}, ClientPid]) -> process_flag(trap_exit, true), - true = link(ClientPid), - SessEnv = emqttd_conf:session(), + true = link(ClientPid), + {ok, QEnv} = emqttd:env(queue), + {ok, SessEnv} = emqttd:env(session), Session = #session{ clean_sess = CleanSess, client_id = ClientId, @@ -224,14 +225,14 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> subscriptions = #{}, inflight_queue = [], max_inflight = get_value(max_inflight, SessEnv, 0), - message_queue = emqttd_mqueue:new(ClientId, emqttd_conf:queue(), emqttd_alarm:alarm_fun()), + message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), awaiting_rel = #{}, awaiting_ack = #{}, awaiting_comp = #{}, - retry_interval = get_value(unack_retry_interval, SessEnv), + retry_interval = get_value(retry_interval, SessEnv), await_rel_timeout = get_value(await_rel_timeout, SessEnv), max_awaiting_rel = get_value(max_awaiting_rel, SessEnv), - expired_after = get_value(expired_after, SessEnv) * 60, + expired_after = get_value(expired_after, SessEnv), collect_interval = get_value(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)), diff --git a/src/emqttd_ws.erl b/src/emqttd_ws.erl index 56a4d92d9..58cb6dc43 100644 --- a/src/emqttd_ws.erl +++ b/src/emqttd_ws.erl @@ -31,7 +31,7 @@ %% @doc Handle WebSocket Request. handle_request(Req) -> Peer = Req:get(peer), - PktOpts = emqttd_conf:mqtt(), + {ok, PktOpts} = emqttd:env(protocol), ParserFun = emqttd_parser:new(PktOpts), {ReentryWs, ReplyChannel} = upgrade(Req), {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel), diff --git a/src/emqttd_ws_client_sup.erl b/src/emqttd_ws_client_sup.erl index 33983fd8c..3af715337 100644 --- a/src/emqttd_ws_client_sup.erl +++ b/src/emqttd_ws_client_sup.erl @@ -27,7 +27,7 @@ %% @doc Start websocket client supervisor -spec(start_link() -> {ok, pid()}). start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [emqttd_conf:mqtt()]). + supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% @doc Start a WebSocket Client -spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}). @@ -37,8 +37,8 @@ start_client(WsPid, Req, ReplyChannel) -> %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- - -init([Env]) -> +init([]) -> + {ok, Env} = emqttd:env(protocol), {ok, {{simple_one_for_one, 0, 1}, [{ws_client, {emqttd_ws_client, start_link, [Env]}, temporary, 5000, worker, [emqttd_ws_client]}]}}.