From c4232bfae4290b3d35730d608b62918c8f03fd85 Mon Sep 17 00:00:00 2001 From: CrazyWisdom Date: Tue, 26 May 2020 15:06:57 +0800 Subject: [PATCH 1/8] Update README --- README-CN.md | 27 +++++++++++++++------------ README.md | 24 +++++++++++++----------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/README-CN.md b/README-CN.md index fb406e708..03a1d1a58 100644 --- a/README-CN.md +++ b/README-CN.md @@ -7,8 +7,7 @@ [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt) - -[![We are hiring](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://www.emqx.io/cn/careers) +[![最棒的物联网 MQTT 开源团队期待您的加入](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://www.emqx.io/cn/careers) [English](./README.md) | 简体中文 @@ -17,22 +16,24 @@ 从 3.0 版本开始,*EMQ X* 完整支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,并支持 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP 等通信协议。EMQ X 3.0 单集群可支持千万级别的 MQTT 并发连接。 - 新功能的完整列表,请参阅 [EMQ X Release Notes](https://github.com/emqx/emqx/releases)。 -- 获取更多信息,请访问 [EMQ X](https://emqx.io)。 +- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.io/cn/)。 ## 安装 -*EMQ X* 是跨平台的,支持 Linux、Unix、Mac OS 以及 Windows。这意味着 *EMQ X* 可以部署在 x86_64 架构的服务器上,也可以部署在 Raspberry Pi 这样的 ARM 设备上。 +*EMQ X* 是跨平台的,支持 Linux、Unix、macOS 以及 Windows。这意味着 *EMQ X* 可以部署在 x86_64 架构的服务器上,也可以部署在 Raspberry Pi 这样的 ARM 设备上。 -**使用 EMQ X Docker 镜像安装** +#### EMQ X Docker 镜像安装 ``` docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx ``` -**或者 [点此下载](https://emqx.io/downloads) 适合你的二进制软件包** +#### 二进制软件包安装 -- [单节点安装](https://docs.emqx.io/broker/v3/cn/install.html) -- [集群安装](https://docs.emqx.io/broker/v3/cn/cluster.html) +需从 [EMQ X 下载](https://www.emqx.io/cn/downloads) 页面获取相应操作系统的二进制软件包。 + +- [单节点安装文档](https://docs.emqx.io/broker/latest/cn/getting-started/install.html) +- [集群配置文档](https://docs.emqx.io/broker/latest/cn/advanced/cluster.html) ## 从源码构建 @@ -63,7 +64,7 @@ cd _rel/emqx && ./bin/emqx console ## FAQ -访问 [FAQ](https://docs.emqx.io/tutorial/v3/cn/faq/faq.html) 以获取常见问题的帮助。 +访问 [EMQ X FAQ](https://docs.emqx.io/broker/latest/cn/faq/faq.html) 以获取常见问题的帮助。 ## 产品路线 @@ -73,11 +74,13 @@ cd _rel/emqx && ./bin/emqx console 你可通过以下途径与 EMQ 社区及开发者联系: -- [EMQX Slack](http://emqx.slack.com) +- [Slack](https://slack-invite.emqx.io) - [Twitter](https://twitter.com/emqtt) -- [Forum](https://groups.google.com/d/forum/emqtt) -- [Blog](https://medium.com/@emqtt) +- [Facebook](https://www.facebook.com/emqxmqtt) - [Reddit](https://www.reddit.com/r/emqx/) +- [Forum](https://groups.google.com/d/forum/emqtt) +- [Weibo](https://weibo.com/emqtt) +- [Blog](https://www.emqx.io/cn/blog) 欢迎你将任何 bug、问题和功能请求提交到 [emqx/emqx](https://github.com/emqx/emqx/issues)。 diff --git a/README.md b/README.md index f352842e1..6d681fac6 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,7 @@ [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt) - -[![We are hiring](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) +[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) English | [简体中文](./README-CN.md) @@ -17,22 +16,24 @@ English | [简体中文](./README-CN.md) Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. - For full list of new features, please read [EMQ X Release Notes](https://github.com/emqx/emqx/releases). -- For more information, please visit [EMQ X homepage](https://emqx.io). +- For more information, please visit [EMQ X homepage](https://www.emqx.io). ## Installation -The *EMQ X* broker is cross-platform, which supports Linux, Unix, Mac OS and Windows. It means *EMQ X* can be deployed on x86_64 architecture servers and ARM devices like Raspberry Pi. +The *EMQ X* broker is cross-platform, which supports Linux, Unix, macOS and Windows. It means *EMQ X* can be deployed on x86_64 architecture servers and ARM devices like Raspberry Pi. -**Installing via EMQ X Docker Image** +#### Installing via EMQ X Docker Image ``` docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx ``` -**Or download the binary package for your platform from [here](https://emqx.io/downloads).** +#### Installing via Binary Package -- [Single Node Install](https://docs.emqx.io/broker/v3/en/install.html) -- [Multi Node Install](https://docs.emqx.io/broker/v3/en/cluster.html) +Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.io/downloads) page. + +- [Single Node Install](https://docs.emqx.io/broker/latest/en/getting-started/installation.html) +- [Multi Node Install](https://docs.emqx.io/broker/latest/en/advanced/cluster.html) ## Build From Source @@ -65,7 +66,7 @@ To view the dashboard after running, use your browser to open: http://localhost: ## FAQ -Visiting [FAQ](https://docs.emqx.io/tutorial/v3/en/faq/faq.html) to get help of common problems. +Visiting [EMQ X FAQ](https://docs.emqx.io/broker/latest/en/faq/faq.html) to get help of common problems. ## Roadmap @@ -74,11 +75,12 @@ The [EMQ X Roadmap uses Github milestones](https://github.com/emqx/emqx/mileston ## Community, discussion, contribution, and support You can reach the EMQ community and developers via the following channels: -- [EMQX Slack](https://slack-invite.emqx.io/) +- [Slack](https://slack-invite.emqx.io/) - [Twitter](https://twitter.com/emqtt) +- [Facebook](https://www.facebook.com/emqxmqtt) +- [Reddit](https://www.reddit.com/r/emqx/) - [Forum](https://groups.google.com/d/forum/emqtt) - [Blog](https://medium.com/@emqtt) -- [Reddit](https://www.reddit.com/r/emqx/) Please submit any bugs, issues, and feature requests to [emqx/emqx](https://github.com/emqx/emqx/issues). From aaa2c48758ca2b7d1c0963629e0bc8fce720949e Mon Sep 17 00:00:00 2001 From: CrazyWisdom Date: Tue, 26 May 2020 15:06:57 +0800 Subject: [PATCH 2/8] Update README --- README-CN.md | 27 +++++++++++++++------------ README.md | 24 +++++++++++++----------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/README-CN.md b/README-CN.md index fb406e708..03a1d1a58 100644 --- a/README-CN.md +++ b/README-CN.md @@ -7,8 +7,7 @@ [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt) - -[![We are hiring](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://www.emqx.io/cn/careers) +[![最棒的物联网 MQTT 开源团队期待您的加入](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://www.emqx.io/cn/careers) [English](./README.md) | 简体中文 @@ -17,22 +16,24 @@ 从 3.0 版本开始,*EMQ X* 完整支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,并支持 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP 等通信协议。EMQ X 3.0 单集群可支持千万级别的 MQTT 并发连接。 - 新功能的完整列表,请参阅 [EMQ X Release Notes](https://github.com/emqx/emqx/releases)。 -- 获取更多信息,请访问 [EMQ X](https://emqx.io)。 +- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.io/cn/)。 ## 安装 -*EMQ X* 是跨平台的,支持 Linux、Unix、Mac OS 以及 Windows。这意味着 *EMQ X* 可以部署在 x86_64 架构的服务器上,也可以部署在 Raspberry Pi 这样的 ARM 设备上。 +*EMQ X* 是跨平台的,支持 Linux、Unix、macOS 以及 Windows。这意味着 *EMQ X* 可以部署在 x86_64 架构的服务器上,也可以部署在 Raspberry Pi 这样的 ARM 设备上。 -**使用 EMQ X Docker 镜像安装** +#### EMQ X Docker 镜像安装 ``` docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx ``` -**或者 [点此下载](https://emqx.io/downloads) 适合你的二进制软件包** +#### 二进制软件包安装 -- [单节点安装](https://docs.emqx.io/broker/v3/cn/install.html) -- [集群安装](https://docs.emqx.io/broker/v3/cn/cluster.html) +需从 [EMQ X 下载](https://www.emqx.io/cn/downloads) 页面获取相应操作系统的二进制软件包。 + +- [单节点安装文档](https://docs.emqx.io/broker/latest/cn/getting-started/install.html) +- [集群配置文档](https://docs.emqx.io/broker/latest/cn/advanced/cluster.html) ## 从源码构建 @@ -63,7 +64,7 @@ cd _rel/emqx && ./bin/emqx console ## FAQ -访问 [FAQ](https://docs.emqx.io/tutorial/v3/cn/faq/faq.html) 以获取常见问题的帮助。 +访问 [EMQ X FAQ](https://docs.emqx.io/broker/latest/cn/faq/faq.html) 以获取常见问题的帮助。 ## 产品路线 @@ -73,11 +74,13 @@ cd _rel/emqx && ./bin/emqx console 你可通过以下途径与 EMQ 社区及开发者联系: -- [EMQX Slack](http://emqx.slack.com) +- [Slack](https://slack-invite.emqx.io) - [Twitter](https://twitter.com/emqtt) -- [Forum](https://groups.google.com/d/forum/emqtt) -- [Blog](https://medium.com/@emqtt) +- [Facebook](https://www.facebook.com/emqxmqtt) - [Reddit](https://www.reddit.com/r/emqx/) +- [Forum](https://groups.google.com/d/forum/emqtt) +- [Weibo](https://weibo.com/emqtt) +- [Blog](https://www.emqx.io/cn/blog) 欢迎你将任何 bug、问题和功能请求提交到 [emqx/emqx](https://github.com/emqx/emqx/issues)。 diff --git a/README.md b/README.md index 9b6168434..93813e1a8 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,7 @@ [![Slack Invite]()](https://slack-invite.emqx.io) [![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt) - -[![We are hiring](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) +[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) English | [简体中文](./README-CN.md) @@ -17,22 +16,24 @@ English | [简体中文](./README-CN.md) Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. - For full list of new features, please read [EMQ X Release Notes](https://github.com/emqx/emqx/releases). -- For more information, please visit [EMQ X homepage](https://emqx.io). +- For more information, please visit [EMQ X homepage](https://www.emqx.io). ## Installation -The *EMQ X* broker is cross-platform, which supports Linux, Unix, Mac OS and Windows. It means *EMQ X* can be deployed on x86_64 architecture servers and ARM devices like Raspberry Pi. +The *EMQ X* broker is cross-platform, which supports Linux, Unix, macOS and Windows. It means *EMQ X* can be deployed on x86_64 architecture servers and ARM devices like Raspberry Pi. -**Installing via EMQ X Docker Image** +#### Installing via EMQ X Docker Image ``` docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx ``` -**Or download the binary package for your platform from [here](https://emqx.io/downloads).** +#### Installing via Binary Package -- [Single Node Install](https://docs.emqx.io/broker/v3/en/install.html) -- [Multi Node Install](https://docs.emqx.io/broker/v3/en/cluster.html) +Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.io/downloads) page. + +- [Single Node Install](https://docs.emqx.io/broker/latest/en/getting-started/installation.html) +- [Multi Node Install](https://docs.emqx.io/broker/latest/en/advanced/cluster.html) ## Build From Source @@ -65,7 +66,7 @@ To view the dashboard after running, use your browser to open: http://localhost: ## FAQ -Visiting [FAQ](https://docs.emqx.io/tutorial/v3/en/faq/faq.html) to get help of common problems. +Visiting [EMQ X FAQ](https://docs.emqx.io/broker/latest/en/faq/faq.html) to get help of common problems. ## Roadmap @@ -74,11 +75,12 @@ The [EMQ X Roadmap uses Github milestones](https://github.com/emqx/emqx/mileston ## Community, discussion, contribution, and support You can reach the EMQ community and developers via the following channels: -- [EMQX Slack](https://slack-invite.emqx.io/) +- [Slack](https://slack-invite.emqx.io/) - [Twitter](https://twitter.com/emqtt) +- [Facebook](https://www.facebook.com/emqxmqtt) +- [Reddit](https://www.reddit.com/r/emqx/) - [Forum](https://groups.google.com/d/forum/emqtt) - [Blog](https://medium.com/@emqtt) -- [Reddit](https://www.reddit.com/r/emqx/) Please submit any bugs, issues, and feature requests to [emqx/emqx](https://github.com/emqx/emqx/issues). From ac82ad12e30456cca4421a8a50abe524560b5e0d Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Thu, 28 May 2020 21:18:32 +0800 Subject: [PATCH 3/8] Add log overload protection parameters --- etc/emqx.conf | 107 ++++++++++++++++++++++++++++++++++++++++++++++- priv/emqx.schema | 84 ++++++++++++++++++++++++++++++++++++- 2 files changed, 188 insertions(+), 3 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index e5b52617b..915ca597a 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -414,6 +414,9 @@ log.dir = {{ platform_log_dir }} ## The log filename for logs of level specified in "log.level". ## +## If `log.rotation` is enabled, this is the base name of the +## files. Each file in a rotated log is named .N, where N is an integer. +## ## Value: String ## Default: emqx.log log.file = emqx.log @@ -424,6 +427,14 @@ log.file = emqx.log ## Default: No Limit #log.chars_limit = 8192 +## Enables the log rotation. +## With this enabled, new log files will be created when the current +## log file is full, max to `log.rotation.size` files will be created. +## +## Value: on | off +## Default: on +log.rotation = on + ## Maximum size of each log file. ## ## Value: Number @@ -446,9 +457,103 @@ log.rotation.count = 5 ## Note: Log files for a specific log level will only contain all the logs ## that higher than or equal to that level ## -#log.info.file = info.log +#log.info.file = info.log #log.error.file = error.log +## The max allowed queue length before switching to sync mode. +## +## Log overload protection parameter. If the message queue grows +## larger than this value the handler switches from anync to sync mode. +## +## Default: 100 +## +#log.sync_mode_qlen = 100 + +## The max allowed queue length before switching to drop mode. +## +## Log overload protection parameter. When the message queue grows +## larger than this threshold, the handler switches to a mode in which +## it drops all new events that senders want to log. +## +## Default: 3000 +## +#log.drop_mode_qlen = 3000 + +## The max allowed queue length before switching to flush mode. +## +## Log overload protection parameter. If the length of the message queue +## grows larger than this threshold, a flush (delete) operation takes place. +## To flush events, the handler discards the messages in the message queue +## by receiving them in a loop without logging. +## +## Default: 8000 +## +#log.flush_qlen = 8000 + +## Kill the log handler when it gets overloaded. +## +## Log overload protection parameter. It is possible that a handler, +## even if it can successfully manage peaks of high load without crashing, +## can build up a large message queue, or use a large amount of memory. +## We could kill the log handler in these cases and restart it after a +## few seconds. +## +## Default: on +## +#log.overload_kill = on + +## The max allowed queue length before killing the log hanlder. +## +## Log overload protection parameter. This is the maximum allowed queue +## length. If the message queue grows larger than this, the handler +## process is terminated. +## +## Default: 20000 +## +#log.overload_kill_qlen = 20000 + +## The max allowed memory size before killing the log hanlder. +## +## Log overload protection parameter. This is the maximum memory size +## that the handler process is allowed to use. If the handler grows +## larger than this, the process is terminated. +## +## Default: 30MB +## +#log.overload_kill_mem_size = 30MB + +## Restart the log hanlder after some seconds. +## +## Log overload protection parameter. If the handler is terminated, +## it restarts automatically after a delay specified in seconds. +## The value "infinity" prevents restarts. +## +## Default: 5s +## +#log.overload_kill_restart_after = 5s + +## Max burst count and time window for burst control. +## +## Log overload protection parameter. Large bursts of log events - many +## events received by the handler under a short period of time - can +## potentially cause problems. By specifying the maximum number of events +## to be handled within a certain time frame, the handler can avoid +## choking the log with massive amounts of printouts. +## +## This config controls the maximum number of events to handle within +## a time frame. After the limit is reached, successive events are +## dropped until the end of the time frame. +## +## Note that there would be no warning if any messages were +## dropped because of burst control. +## +## Comment this config out to disable the burst control feature. +## +## Value: MaxBurstCount,TimeWindow +## Default: disabled +## +#log.burst_limit = 20000, 1s + ##-------------------------------------------------------------------- ## Authentication/Access Control ##-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index f3bf3fb3f..1149e7c79 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -477,11 +477,21 @@ end}. {datatype, integer} ]}. +{mapping, "log.rotation", "kernel.logger", [ + {default, on}, + {datatype, flag} +]}. + {mapping, "log.rotation.size", "kernel.logger", [ {default, "10MB"}, {datatype, bytesize} ]}. +{mapping, "log.size", "kernel.logger", [ + {default, infinity}, + {datatype, [bytesize, atom]} +]}. + {mapping, "log.rotation.count", "kernel.logger", [ {default, 5}, {datatype, integer} @@ -491,6 +501,46 @@ end}. {datatype, file} ]}. +{mapping, "log.sync_mode_qlen", "kernel.logger", [ + {default, 100}, + {datatype, integer} +]}. + +{mapping, "log.drop_mode_qlen", "kernel.logger", [ + {default, 3000}, + {datatype, integer} +]}. + +{mapping, "log.flush_qlen", "kernel.logger", [ + {default, 8000}, + {datatype, integer} +]}. + +{mapping, "log.overload_kill", "kernel.logger", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "log.overload_kill_mem_size", "kernel.logger", [ + {default, "30MB"}, + {datatype, bytesize} +]}. + +{mapping, "log.overload_kill_qlen", "kernel.logger", [ + {default, 20000}, + {datatype, integer} +]}. + +{mapping, "log.overload_kill_restart_after", "kernel.logger", [ + {default, "5s"}, + {datatype, [{duration, ms}, atom]} +]}. + +{mapping, "log.burst_limit", "kernel.logger", [ + {default, "disabled"}, + {datatype, string} +]}. + {mapping, "log.sasl", "sasl.sasl_error_logger", [ {default, off}, {datatype, flag}, @@ -521,6 +571,10 @@ end}. {translation, "kernel.logger", fun(Conf) -> LogTo = cuttlefish:conf_get("log.to", Conf), LogLevel = cuttlefish:conf_get("log.level", Conf), + LogType = case cuttlefish:conf_get("log.rotation", Conf) of + true -> wrap; + false -> halt + end, CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of -1 -> unlimited; V -> V @@ -537,11 +591,37 @@ end}. []}]}, msg,"\n"], chars_limit => CharsLimit}}, + {BustLimitOn, {MaxBurstCount, TimeWindow}} = + case string:tokens(cuttlefish:conf_get("log.burst_limit", Conf), ", ") of + ["disabled"] -> {false, {20000, 1000}}; + [Count, Window] -> + {true, {list_to_integer(Count), + case cuttlefish_duration:parse(Window, ms) of + Secs when is_integer(Secs) -> Secs; + {error, Reason1} -> error(Reason1) + end}} + end, FileConf = fun(Filename) -> - #{type => wrap, + BasicConf = + #{type => LogType, file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename), max_no_files => cuttlefish:conf_get("log.rotation.count", Conf), - max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)} + sync_mode_qlen => cuttlefish:conf_get("log.sync_mode_qlen", Conf), + drop_mode_qlen => cuttlefish:conf_get("log.drop_mode_qlen", Conf), + flush_qlen => cuttlefish:conf_get("log.flush_qlen", Conf), + overload_kill_enable => cuttlefish:conf_get("log.overload_kill", Conf), + overload_kill_qlen => cuttlefish:conf_get("log.overload_kill_qlen", Conf), + overload_kill_mem_size => cuttlefish:conf_get("log.overload_kill_mem_size", Conf), + overload_kill_restart_after => cuttlefish:conf_get("log.overload_kill_restart_after", Conf), + burst_limit_enable => BustLimitOn, + burst_limit_max_count => MaxBurstCount, + burst_limit_window_time => TimeWindow + }, + MaxNoBytes = case LogType of + wrap -> cuttlefish:conf_get("log.rotation.size", Conf); + halt -> cuttlefish:conf_get("log.size", Conf) + end, + BasicConf#{max_no_bytes => MaxNoBytes} end, %% For the default logger that outputs to console From 3fb82f72341c793426e7dda9c38cf55925bbab7d Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 1 Jun 2020 11:31:15 +0800 Subject: [PATCH 4/8] Fix(connect): fix the race condition for openning session - Remove the register_channel/1,2 functions --- src/emqx_cm.erl | 63 ++++++++++++++--------- test/emqx_cm_SUITE.erl | 112 +++++++++++++++++++++++++++++++---------- 2 files changed, 125 insertions(+), 50 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 6a1b3cfc1..92d54c056 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -27,9 +27,7 @@ -export([start_link/0]). --export([ register_channel/1 - , register_channel/2 - , register_channel/3 +-export([ register_channel/3 , unregister_channel/1 ]). @@ -45,6 +43,8 @@ , set_chan_stats/2 ]). +-export([get_chann_conn_mod/2]). + -export([ open_session/3 , discard_session/1 , discard_session/2 @@ -100,28 +100,29 @@ start_link() -> %% API %%-------------------------------------------------------------------- -%% @doc Register a channel. --spec(register_channel(emqx_types:clientid()) -> ok). -register_channel(ClientId) -> - register_channel(ClientId, self()). - -%% @doc Register a channel with pid. --spec(register_channel(emqx_types:clientid(), chan_pid()) -> ok). -register_channel(ClientId, ChanPid) when is_pid(ChanPid) -> - Chan = {ClientId, ChanPid}, - true = ets:insert(?CHAN_TAB, Chan), - true = ets:insert(?CHAN_CONN_TAB, Chan), - ok = emqx_cm_registry:register_channel(Chan), - cast({registered, Chan}). - %% @doc Register a channel with info and stats. -spec(register_channel(emqx_types:clientid(), emqx_types:infos(), emqx_types:stats()) -> ok). -register_channel(ClientId, Info, Stats) -> +register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) -> Chan = {ClientId, ChanPid = self()}, true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), - register_channel(ClientId, ChanPid). + register_channel(ClientId, ChanPid, ConnInfo); + +%% @private +%% @doc Register a channel with pid and conn_mod. +%% +%% There is a Race-Condition on one node or cluster when many connections +%% login to Broker with the same clientid. We should register it and save +%% the conn_mod first for taking up the clientid access right. +%% +%% Note that: It should be called on a lock transaction +register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> + Chan = {ClientId, ChanPid}, + true = ets:insert(?CHAN_TAB, Chan), + true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), + ok = emqx_cm_registry:register_channel(Chan), + cast({registered, Chan}). %% @doc Unregister a channel. -spec(unregister_channel(emqx_types:clientid()) -> ok). @@ -132,7 +133,7 @@ unregister_channel(ClientId) when is_binary(ClientId) -> %% @private do_unregister_channel(Chan) -> ok = emqx_cm_registry:unregister_channel(Chan), - true = ets:delete_object(?CHAN_CONN_TAB, Chan), + true = ets:delete(?CHAN_CONN_TAB, Chan), true = ets:delete(?CHAN_INFO_TAB, Chan), ets:delete_object(?CHAN_TAB, Chan). @@ -206,24 +207,29 @@ set_chan_stats(ClientId, ChanPid, Stats) -> pendings => list()}} | {error, Reason :: term()}). open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> + Self = self(), CleanStart = fun(_) -> ok = discard_session(ClientId), Session = create_session(ClientInfo, ConnInfo), + register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} end, emqx_cm_locker:trans(ClientId, CleanStart); open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> + Self = self(), ResumeStart = fun(_) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), + register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, pendings => Pendings}}; {error, not_found} -> Session = create_session(ClientInfo, ConnInfo), + register_channel(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} end end, @@ -253,8 +259,8 @@ takeover_session(ClientId) -> end. takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chan_info(ClientId, ChanPid) of - #{conninfo := #{conn_mod := ConnMod}} -> + case get_chann_conn_mod(ClientId, ChanPid) of + ConnMod when is_atom(ConnMod) -> Session = ConnMod:call(ChanPid, {takeover, 'begin'}), {ok, ConnMod, ChanPid, Session}; undefined -> @@ -284,8 +290,8 @@ discard_session(ClientId) when is_binary(ClientId) -> end. discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> - case get_chan_info(ClientId, ChanPid) of - #{conninfo := #{conn_mod := ConnMod}} -> + case get_chann_conn_mod(ClientId, ChanPid) of + ConnMod when is_atom(ConnMod) -> ConnMod:call(ChanPid, discard); undefined -> ok end; @@ -418,3 +424,12 @@ update_stats({Tab, Stat, MaxStat}) -> Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. +get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() -> + Chan = {ClientId, ChanPid}, + try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod + catch + error:badarg -> undefined + end; +get_chann_conn_mod(ClientId, ChanPid) -> + rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]). + diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 1c201d0aa..6a4b84d01 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -23,6 +23,13 @@ -include_lib("eunit/include/eunit.hrl"). -define(CM, emqx_cm). +-define(ChanInfo,#{conninfo => + #{socktype => tcp, + peername => {{127,0,0,1}, 5000}, + sockname => {{127,0,0,1}, 1883}, + peercert => nossl, + conn_mod => emqx_connection, + receive_maximum => 100}}). %%-------------------------------------------------------------------- %% CT callbacks @@ -43,13 +50,13 @@ end_per_suite(_Config) -> %%-------------------------------------------------------------------- t_reg_unreg_channel(_) -> - ok = emqx_cm:register_channel(<<"clientid">>), + ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ?assertEqual([self()], emqx_cm:lookup_channels(<<"clientid">>)), ok = emqx_cm:unregister_channel(<<"clientid">>), ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)). t_get_set_chan_info(_) -> - Info = #{proto_ver => 4, proto_name => <<"MQTT">>}, + Info = ?ChanInfo, ok = emqx_cm:register_channel(<<"clientid">>, Info, []), ?assertEqual(Info, emqx_cm:get_chan_info(<<"clientid">>)), Info1 = Info#{proto_ver => 5}, @@ -60,7 +67,7 @@ t_get_set_chan_info(_) -> t_get_set_chan_stats(_) -> Stats = [{recv_oct, 10}, {send_oct, 8}], - ok = emqx_cm:register_channel(<<"clientid">>, #{}, Stats), + ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, Stats), ?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)), Stats1 = [{recv_oct, 10}|Stats], true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1), @@ -69,27 +76,89 @@ t_get_set_chan_stats(_) -> ?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)). t_open_session(_) -> + ok = meck:new(emqx_connection, [passthrough, no_history]), + ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), + ClientInfo = #{zone => external, clientid => <<"clientid">>, username => <<"username">>, peerhost => {127,0,0,1}}, - ConnInfo = #{peername => {{127,0,0,1}, 5000}, + ConnInfo = #{socktype => tcp, + peername => {{127,0,0,1}, 5000}, + sockname => {{127,0,0,1}, 1883}, + peercert => nossl, + conn_mod => emqx_connection, receive_maximum => 100}, {ok, #{session := Session1, present := false}} = emqx_cm:open_session(true, ClientInfo, ConnInfo), ?assertEqual(100, emqx_session:info(inflight_max, Session1)), {ok, #{session := Session2, present := false}} - = emqx_cm:open_session(false, ClientInfo, ConnInfo), - ?assertEqual(100, emqx_session:info(inflight_max, Session2)). + = emqx_cm:open_session(true, ClientInfo, ConnInfo), + ?assertEqual(100, emqx_session:info(inflight_max, Session2)), + + emqx_cm:unregister_channel(<<"clientid">>), + ok = meck:unload(emqx_connection). + +t_open_session_race_condition(_) -> + ClientInfo = #{zone => external, + clientid => <<"clientid">>, + username => <<"username">>, + peerhost => {127,0,0,1}}, + ConnInfo = #{socktype => tcp, + peername => {{127,0,0,1}, 5000}, + sockname => {{127,0,0,1}, 1883}, + peercert => nossl, + conn_mod => emqx_connection, + receive_maximum => 100}, + + Parent = self(), + OpenASession = fun() -> + timer:sleep(rand:uniform(100)), + OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)), + Parent ! OpenR, + case OpenR of + {ok, _} -> + receive + {'$gen_call', From, discard} -> + gen_server:reply(From, ok), ok + end; + {error, Reason} -> + exit(Reason) + end + end, + [spawn( + fun() -> + spawn(OpenASession), + spawn(OpenASession) + end) || _ <- lists:seq(1, 1000)], + + WaitingRecv = fun _Wr(N1, N2, 0) -> + {N1, N2}; + _Wr(N1, N2, Rest) -> + receive + {ok, _} -> _Wr(N1+1, N2, Rest-1); + {error, _} -> _Wr(N1, N2+1, Rest-1) + end + end, + + ct:pal("Race condition status: ~p~n", [WaitingRecv(0, 0, 2000)]), + + ?assertEqual(1, ets:info(emqx_channel, size)), + ?assertEqual(1, ets:info(emqx_channel_conn, size)), + ?assertEqual(1, ets:info(emqx_channel_registry, size)), + + [Pid] = emqx_cm:lookup_channels(<<"clientid">>), + exit(Pid, kill), timer:sleep(100), + ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)). t_discard_session(_) -> ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), ok = emqx_cm:discard_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>), + ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), + ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:discard_session(<<"clientid">>), ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), ok = emqx_cm:discard_session(<<"clientid">>), @@ -97,35 +166,26 @@ t_discard_session(_) -> ok = meck:unload(emqx_connection). t_takeover_session(_) -> - ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>), - {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), - Pid = self(), - {ok, emqx_connection, Pid, test} = emqx_cm:takeover_session(<<"clientid">>), erlang:spawn(fun() -> - ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), - timer:sleep(1000) + ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), + receive + {'$gen_call', From, {takeover, 'begin'}} -> + gen_server:reply(From, test), ok + end end), - ct:sleep(100), + timer:sleep(100), {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = meck:unload(emqx_connection). + emqx_cm:unregister_channel(<<"clientid">>). t_kick_session(_) -> ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), {error, not_found} = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>), - {error, not_found} = emqx_cm:kick_session(<<"clientid">>), - ok = emqx_cm:unregister_channel(<<"clientid">>), - ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), + ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), test = emqx_cm:kick_session(<<"clientid">>), erlang:spawn(fun() -> - ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), + ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), timer:sleep(1000) end), ct:sleep(100), From ee757402adad3cfd0784bd81daace635f681a0b3 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 3 Jun 2020 00:12:27 +0800 Subject: [PATCH 5/8] chore(deps): upgrade esockd to 5.6.2 esockd: avoid the `ssl_close_timeout` send to channel --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 50d430c44..4f73fb23e 100644 --- a/rebar.config +++ b/rebar.config @@ -4,7 +4,7 @@ [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.4"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.1"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.2"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} From 42fc81338cce7a724d74c587147a51b379169ef9 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 1 Jun 2020 10:48:06 +0800 Subject: [PATCH 6/8] Fix(modules): correct the bad return value for emqx_modules:load/0 --- src/emqx_modules.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 75ac35ce5..55fb44001 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -39,7 +39,7 @@ list() -> -spec(load() -> ok). load() -> case emqx:get_env(modules_loaded_file) of - undefined -> ignore; + undefined -> ok; File -> load_modules(File) end. @@ -166,4 +166,4 @@ write_loaded(true) -> ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]), {error, Error} end; -write_loaded(false) -> ok. \ No newline at end of file +write_loaded(false) -> ok. From f456f40c59c29491793802adbfa5b6c5f3cadbe8 Mon Sep 17 00:00:00 2001 From: zhouzb Date: Tue, 26 May 2020 17:24:10 +0800 Subject: [PATCH 7/8] Subscribe or unsubscribe via HTTP API skip ACL checking --- src/emqx_channel.erl | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index be8a1f572..ff964eff1 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -538,6 +538,21 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = {error, RC} -> {RC, Channel} end. +-compile({inline, [process_force_subscribe/2]}). +process_force_subscribe(Subscriptions, Channel = + #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, + session = Session}) -> + lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) -> + NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), + NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc), + case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of + {ok, NSession} -> + {ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}}; + {error, ReasonCode} -> + {ReasonCodes ++ [ReasonCode], ChannelAcc} + end + end, {[], Channel}, Subscriptions). + %%-------------------------------------------------------------------- %% Process Unsubscribe %%-------------------------------------------------------------------- @@ -563,6 +578,20 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = {error, RC} -> {RC, Channel} end. +-compile({inline, [process_force_unsubscribe/2]}). +process_force_unsubscribe(Subscriptions, Channel = + #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, + session = Session}) -> + lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) -> + NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), + case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of + {ok, NSession} -> + {ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}}; + {error, ReasonCode} -> + {ReasonCodes ++ [ReasonCode], ChannelAcc} + end + end, {[], Channel}, Subscriptions). + %%-------------------------------------------------------------------- %% Process Disconnect %%-------------------------------------------------------------------- @@ -818,6 +847,10 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {ok, NChannel}; +handle_info({force_subscribe, TopicFilters}, Channel) -> + {_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel), + {ok, NChannel}; + handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> TopicFilters1 = run_hooks('client.unsubscribe', [ClientInfo, #{'Internal' => true}], @@ -826,6 +859,10 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; +handle_info({force_unsubscribe, TopicFilters}, Channel) -> + {_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel), + {ok, NChannel}; + handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel); From ea2a424ce253ec1bea80814685338c8b07adeed0 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 4 Jun 2020 10:46:53 +0800 Subject: [PATCH 8/8] Add ensure/2 funcs to avoid needless logs (#3483) --- src/emqx_metrics.erl | 13 +++++++++++++ src/emqx_reason_codes.erl | 2 +- src/emqx_types.erl | 8 +++++--- test/emqx_metrics_SUITE.erl | 18 ++++++++++++++++++ 4 files changed, 37 insertions(+), 4 deletions(-) diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 4009ca67c..2a5a82749 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -30,6 +30,8 @@ -export([ new/1 , new/2 + , ensure/1 + , ensure/2 , all/0 ]). @@ -204,6 +206,17 @@ new(gauge, Name) -> new(counter, Name) -> create(counter, Name). +-spec(ensure(metric_name()) -> ok). +ensure(Name) -> + ensure(counter, Name). + +-spec(ensure(gauge|counter, metric_name()) -> ok). +ensure(Type, Name) when Type =:= gauge; Type =:= counter -> + case ets:lookup(?TAB, Name) of + [] -> create(Type, Name); + _ -> ok + end. + %% @private create(Type, Name) -> case gen_server:call(?SERVER, {create, Type, Name}) of diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index ece285767..8e96df165 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -50,7 +50,7 @@ name(16#11) -> no_subscription_existed; name(16#18) -> continue_authentication; name(16#19) -> re_authenticate; name(16#80) -> unspecified_error; -name(16#81) -> malformed_Packet; +name(16#81) -> malformed_packet; name(16#82) -> protocol_error; name(16#83) -> implementation_specific_error; name(16#84) -> unsupported_protocol_version; diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 15bfb10b5..31f9ad65c 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -91,7 +91,9 @@ -type(ver() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 - | ?MQTT_PROTO_V5). + | ?MQTT_PROTO_V5 + | non_neg_integer()). + -type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). -type(qos_name() :: qos0 | at_most_once | qos1 | at_least_once | @@ -107,7 +109,7 @@ -type(conninfo() :: #{socktype := socktype(), sockname := peername(), peername := peername(), - peercert := esockd_peercert:peercert(), + peercert := nossl | undefined | esockd_peercert:peercert(), conn_mod := module(), proto_name := binary(), proto_ver := ver(), @@ -116,7 +118,7 @@ username := username(), conn_props := properties(), connected := boolean(), - connected_at := erlang:timestamp(), + connected_at := non_neg_integer(), keepalive := 0..16#FFFF, receive_maximum := non_neg_integer(), expiry_interval := non_neg_integer(), diff --git a/test/emqx_metrics_SUITE.erl b/test/emqx_metrics_SUITE.erl index d6ef6b451..33f297e7d 100644 --- a/test/emqx_metrics_SUITE.erl +++ b/test/emqx_metrics_SUITE.erl @@ -42,6 +42,24 @@ t_new(_) -> 1 = emqx_metrics:val('metrics.test.total') end). +t_ensure(_) -> + with_metrics_server( + fun() -> + ok = emqx_metrics:ensure('metrics.test'), + ok = emqx_metrics:ensure('metrics.test'), + 0 = emqx_metrics:val('metrics.test'), + ok = emqx_metrics:inc('metrics.test'), + 1 = emqx_metrics:val('metrics.test'), + ok = emqx_metrics:ensure(counter, 'metrics.test.cnt'), + 0 = emqx_metrics:val('metrics.test.cnt'), + ok = emqx_metrics:inc('metrics.test.cnt'), + 1 = emqx_metrics:val('metrics.test.cnt'), + ok = emqx_metrics:ensure(gauge, 'metrics.test.total'), + 0 = emqx_metrics:val('metrics.test.total'), + ok = emqx_metrics:inc('metrics.test.total'), + 1 = emqx_metrics:val('metrics.test.total') + end). + t_all(_) -> with_metrics_server( fun() ->