Merge branch 'master' into api_format
This commit is contained in:
commit
ffc7a070f4
|
@ -9,7 +9,7 @@
|
||||||
[](https://askemq.com)
|
[](https://askemq.com)
|
||||||
[](https://www.youtube.com/channel/UCir_r04HIsLjf2qqyZ4A8Cg)
|
[](https://www.youtube.com/channel/UCir_r04HIsLjf2qqyZ4A8Cg)
|
||||||
|
|
||||||
[](https://careers.emqx.cn/)
|
[](https://careers.emqx.cn/)
|
||||||
|
|
||||||
[English](./README.md) | 简体中文 | [日本語](./README-JP.md) | [русский](./README-RU.md)
|
[English](./README.md) | 简体中文 | [日本語](./README-JP.md) | [русский](./README-RU.md)
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@
|
||||||
从 3.0 版本开始,*EMQ X* 完整支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,并支持 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP 等通信协议。EMQ X 3.0 单集群可支持千万级别的 MQTT 并发连接。
|
从 3.0 版本开始,*EMQ X* 完整支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,并支持 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP 等通信协议。EMQ X 3.0 单集群可支持千万级别的 MQTT 并发连接。
|
||||||
|
|
||||||
- 新功能的完整列表,请参阅 [EMQ X Release Notes](https://github.com/emqx/emqx/releases)。
|
- 新功能的完整列表,请参阅 [EMQ X Release Notes](https://github.com/emqx/emqx/releases)。
|
||||||
- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.cn/)。
|
- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.io/zh)。
|
||||||
|
|
||||||
## 安装
|
## 安装
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p
|
||||||
|
|
||||||
#### 二进制软件包安装
|
#### 二进制软件包安装
|
||||||
|
|
||||||
需从 [EMQ X 下载](https://www.emqx.cn/downloads) 页面获取相应操作系统的二进制软件包。
|
需从 [EMQ X 下载](https://www.emqx.com/zh/downloads) 页面获取相应操作系统的二进制软件包。
|
||||||
|
|
||||||
- [单节点安装文档](https://docs.emqx.cn/broker/latest/getting-started/install.html)
|
- [单节点安装文档](https://docs.emqx.cn/broker/latest/getting-started/install.html)
|
||||||
- [集群配置文档](https://docs.emqx.cn/broker/latest/advanced/cluster.html)
|
- [集群配置文档](https://docs.emqx.cn/broker/latest/advanced/cluster.html)
|
||||||
|
@ -133,7 +133,7 @@ DIALYZER_ANALYSE_APP=emqx_lwm2m,emqx_authz make dialyzer
|
||||||
- [Facebook](https://www.facebook.com/emqxmqtt)
|
- [Facebook](https://www.facebook.com/emqxmqtt)
|
||||||
- [Reddit](https://www.reddit.com/r/emqx/)
|
- [Reddit](https://www.reddit.com/r/emqx/)
|
||||||
- [Weibo](https://weibo.com/emqtt)
|
- [Weibo](https://weibo.com/emqtt)
|
||||||
- [Blog](https://www.emqx.cn/blog)
|
- [Blog](https://www.emqx.com/zh/blog)
|
||||||
|
|
||||||
欢迎你将任何 bug、问题和功能请求提交到 [emqx/emqx](https://github.com/emqx/emqx/issues)。
|
欢迎你将任何 bug、问题和功能请求提交到 [emqx/emqx](https://github.com/emqx/emqx/issues)。
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
[](https://twitter.com/EMQTech)
|
[](https://twitter.com/EMQTech)
|
||||||
[](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
|
[](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
|
||||||
|
|
||||||
[](https://www.emqx.io/careers)
|
[](https://www.emqx.com/en/careers)
|
||||||
|
|
||||||
[English](./README.md) | [简体中文](./README-CN.md) | 日本語 | [русский](./README-RU.md)
|
[English](./README.md) | [简体中文](./README-CN.md) | 日本語 | [русский](./README-RU.md)
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p
|
||||||
|
|
||||||
#### バイナリパッケージによるインストール
|
#### バイナリパッケージによるインストール
|
||||||
|
|
||||||
それぞれのOSに対応したバイナリソフトウェアパッケージは、[EMQ Xのダウンロード](https://www.emqx.io/downloads)ページから取得できます。
|
それぞれのOSに対応したバイナリソフトウェアパッケージは、[EMQ Xのダウンロード](https://www.emqx.com/en/downloads)ページから取得できます。
|
||||||
|
|
||||||
- [シングルノードインストール](https://docs.emqx.io/broker/latest/en/getting-started/installation.html)
|
- [シングルノードインストール](https://docs.emqx.io/broker/latest/en/getting-started/installation.html)
|
||||||
- [マルチノードインストール](https://docs.emqx.io/broker/latest/en/advanced/cluster.html)
|
- [マルチノードインストール](https://docs.emqx.io/broker/latest/en/advanced/cluster.html)
|
||||||
|
|
|
@ -9,7 +9,7 @@
|
||||||
[](https://github.com/emqx/emqx/discussions)
|
[](https://github.com/emqx/emqx/discussions)
|
||||||
[](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
|
[](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
|
||||||
|
|
||||||
[](https://www.emqx.io/careers)
|
[](https://www.emqx.com/en/careers)
|
||||||
|
|
||||||
[English](./README.md) | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | русский
|
[English](./README.md) | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | русский
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@
|
||||||
Начиная с релиза 3.0, брокер *EMQ X* полностью поддерживает протокол MQTT версии 5.0, и обратно совместим с версиями 3.1 и 3.1.1, а также протоколами MQTT-SN, CoAP, LwM2M, WebSocket и STOMP. Начиная с релиза 3.0, брокер *EMQ X* может масштабироваться до более чем 10 миллионов одновременных MQTT соединений на один кластер.
|
Начиная с релиза 3.0, брокер *EMQ X* полностью поддерживает протокол MQTT версии 5.0, и обратно совместим с версиями 3.1 и 3.1.1, а также протоколами MQTT-SN, CoAP, LwM2M, WebSocket и STOMP. Начиная с релиза 3.0, брокер *EMQ X* может масштабироваться до более чем 10 миллионов одновременных MQTT соединений на один кластер.
|
||||||
|
|
||||||
- Полный список возможностей доступен по ссылке: [EMQ X Release Notes](https://github.com/emqx/emqx/releases).
|
- Полный список возможностей доступен по ссылке: [EMQ X Release Notes](https://github.com/emqx/emqx/releases).
|
||||||
- Более подробная информация доступна на нашем сайте: [EMQ X homepage](https://www.emqx.io).
|
- Более подробная информация доступна на нашем сайте: [EMQ X homepage](https://www.emqx.io/).
|
||||||
|
|
||||||
## Установка
|
## Установка
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p
|
||||||
|
|
||||||
#### Установка бинарного пакета
|
#### Установка бинарного пакета
|
||||||
|
|
||||||
Сборки для различных операционных систем: [Загрузить EMQ X](https://www.emqx.io/downloads).
|
Сборки для различных операционных систем: [Загрузить EMQ X](https://www.emqx.com/en/downloads).
|
||||||
|
|
||||||
- [Установка на одном сервере](https://docs.emqx.io/en/broker/latest/getting-started/install.html)
|
- [Установка на одном сервере](https://docs.emqx.io/en/broker/latest/getting-started/install.html)
|
||||||
- [Установка на кластере](https://docs.emqx.io/en/broker/latest/advanced/cluster.html)
|
- [Установка на кластере](https://docs.emqx.io/en/broker/latest/advanced/cluster.html)
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
[](https://twitter.com/EMQTech)
|
[](https://twitter.com/EMQTech)
|
||||||
[](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
|
[](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
|
||||||
|
|
||||||
[](https://www.emqx.io/careers)
|
[](https://www.emqx.com/en/careers)
|
||||||
|
|
||||||
English | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | [русский](./README-RU.md)
|
English | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | [русский](./README-RU.md)
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@ English | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | [рус
|
||||||
Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster.
|
Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster.
|
||||||
|
|
||||||
- For full list of new features, please read [EMQ X Release Notes](https://github.com/emqx/emqx/releases).
|
- For full list of new features, please read [EMQ X Release Notes](https://github.com/emqx/emqx/releases).
|
||||||
- For more information, please visit [EMQ X homepage](https://www.emqx.io).
|
- For more information, please visit [EMQ X homepage](https://www.emqx.io/).
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p
|
||||||
|
|
||||||
#### Installing via Binary Package
|
#### Installing via Binary Package
|
||||||
|
|
||||||
Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.io/downloads) page.
|
Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.com/en/downloads) page.
|
||||||
|
|
||||||
- [Single Node Install](https://docs.emqx.io/en/broker/latest/getting-started/install.html)
|
- [Single Node Install](https://docs.emqx.io/en/broker/latest/getting-started/install.html)
|
||||||
- [Multi Node Install](https://docs.emqx.io/en/broker/latest/advanced/cluster.html)
|
- [Multi Node Install](https://docs.emqx.io/en/broker/latest/advanced/cluster.html)
|
||||||
|
|
|
@ -272,7 +272,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
get_validity_period() ->
|
get_validity_period() ->
|
||||||
timer:seconds(emqx_config:get([alarm, validity_period])).
|
emqx_config:get([alarm, validity_period]).
|
||||||
|
|
||||||
deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name,
|
deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name,
|
||||||
details = Details0, message = Msg0}) ->
|
details = Details0, message = Msg0}) ->
|
||||||
|
|
|
@ -737,7 +737,7 @@ process_disconnect(ReasonCode, Properties, Channel) ->
|
||||||
|
|
||||||
maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
|
maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
|
||||||
Channel = #channel{conninfo = ConnInfo}) ->
|
Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
Channel#channel{conninfo = ConnInfo#{expiry_interval => Interval}};
|
Channel#channel{conninfo = ConnInfo#{expiry_interval => timer:seconds(Interval)}};
|
||||||
maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1114,11 +1114,11 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
||||||
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
||||||
emqx_keepalive:info(interval, KeepAlive);
|
emqx_keepalive:info(interval, KeepAlive);
|
||||||
interval(retry_timer, #channel{session = Session}) ->
|
interval(retry_timer, #channel{session = Session}) ->
|
||||||
timer:seconds(emqx_session:info(retry_interval, Session));
|
emqx_session:info(retry_interval, Session);
|
||||||
interval(await_timer, #channel{session = Session}) ->
|
interval(await_timer, #channel{session = Session}) ->
|
||||||
timer:seconds(emqx_session:info(await_rel_timeout, Session));
|
emqx_session:info(await_rel_timeout, Session);
|
||||||
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
|
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
|
||||||
timer:seconds(maps:get(expiry_interval, ConnInfo));
|
maps:get(expiry_interval, ConnInfo);
|
||||||
interval(will_timer, #channel{will_msg = WillMsg}) ->
|
interval(will_timer, #channel{will_msg = WillMsg}) ->
|
||||||
timer:seconds(will_delay_interval(WillMsg)).
|
timer:seconds(will_delay_interval(WillMsg)).
|
||||||
|
|
||||||
|
@ -1176,7 +1176,7 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{
|
||||||
%% If the Session Expiry Interval is absent the value 0 is used.
|
%% If the Session Expiry Interval is absent the value 0 is used.
|
||||||
expiry_interval(_, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
expiry_interval(_, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
||||||
properties = ConnProps}) ->
|
properties = ConnProps}) ->
|
||||||
emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0);
|
timer:seconds(emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0));
|
||||||
expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) ->
|
expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) ->
|
||||||
get_mqtt_conf(Zone, session_expiry_interval);
|
get_mqtt_conf(Zone, session_expiry_interval);
|
||||||
expiry_interval(_, #mqtt_packet_connect{clean_start = true}) ->
|
expiry_interval(_, #mqtt_packet_connect{clean_start = true}) ->
|
||||||
|
@ -1615,7 +1615,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
case maps:get(expiry_interval, ConnInfo) of
|
case maps:get(expiry_interval, ConnInfo) of
|
||||||
?UINT_MAX -> {ok, Channel};
|
?UINT_MAX -> {ok, Channel};
|
||||||
I when I > 0 ->
|
I when I > 0 ->
|
||||||
{ok, ensure_timer(expire_timer, timer:seconds(I), Channel)};
|
{ok, ensure_timer(expire_timer, I, Channel)};
|
||||||
_ -> shutdown(Reason, Channel)
|
_ -> shutdown(Reason, Channel)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -248,22 +248,22 @@ create_session(ClientInfo, ConnInfo) ->
|
||||||
Session.
|
Session.
|
||||||
|
|
||||||
get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
|
get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
|
||||||
#{max_subscriptions => get_conf(Zone, max_subscriptions),
|
#{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
|
||||||
upgrade_qos => get_conf(Zone, upgrade_qos),
|
upgrade_qos => get_mqtt_conf(Zone, upgrade_qos),
|
||||||
max_inflight => MaxInflight,
|
max_inflight => MaxInflight,
|
||||||
retry_interval => get_conf(Zone, retry_interval),
|
retry_interval => get_mqtt_conf(Zone, retry_interval),
|
||||||
await_rel_timeout => get_conf(Zone, await_rel_timeout),
|
await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout),
|
||||||
mqueue => mqueue_confs(Zone)
|
mqueue => mqueue_confs(Zone)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
mqueue_confs(Zone) ->
|
mqueue_confs(Zone) ->
|
||||||
#{max_len => get_conf(Zone, max_mqueue_len),
|
#{max_len => get_mqtt_conf(Zone, max_mqueue_len),
|
||||||
store_qos0 => get_conf(Zone, mqueue_store_qos0),
|
store_qos0 => get_mqtt_conf(Zone, mqueue_store_qos0),
|
||||||
priorities => get_conf(Zone, mqueue_priorities),
|
priorities => get_mqtt_conf(Zone, mqueue_priorities),
|
||||||
default_priority => get_conf(Zone, mqueue_default_priority)
|
default_priority => get_mqtt_conf(Zone, mqueue_default_priority)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
get_conf(Zone, Key) ->
|
get_mqtt_conf(Zone, Key) ->
|
||||||
emqx_config:get_zone_conf(Zone, [mqtt, Key]).
|
emqx_config:get_zone_conf(Zone, [mqtt, Key]).
|
||||||
|
|
||||||
%% @doc Try to takeover a session.
|
%% @doc Try to takeover a session.
|
||||||
|
|
|
@ -87,7 +87,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
ensure_timer(State) ->
|
ensure_timer(State) ->
|
||||||
case emqx_config:get([node, global_gc_interval]) of
|
case emqx_config:get([node, global_gc_interval]) of
|
||||||
undefined -> State;
|
undefined -> State;
|
||||||
Interval -> TRef = emqx_misc:start_timer(timer:seconds(Interval), run),
|
Interval -> TRef = emqx_misc:start_timer(Interval, run),
|
||||||
State#{timer := TRef}
|
State#{timer := TRef}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -53,12 +53,12 @@ start_link() ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
get_mem_check_interval() ->
|
get_mem_check_interval() ->
|
||||||
memsup:get_check_interval() div 1000.
|
memsup:get_check_interval().
|
||||||
|
|
||||||
set_mem_check_interval(Seconds) when Seconds < 60 ->
|
set_mem_check_interval(Seconds) when Seconds < 60000 ->
|
||||||
memsup:set_check_interval(1);
|
memsup:set_check_interval(1);
|
||||||
set_mem_check_interval(Seconds) ->
|
set_mem_check_interval(Seconds) ->
|
||||||
memsup:set_check_interval(Seconds div 60).
|
memsup:set_check_interval(Seconds div 60000).
|
||||||
|
|
||||||
get_sysmem_high_watermark() ->
|
get_sysmem_high_watermark() ->
|
||||||
memsup:get_sysmem_high_watermark().
|
memsup:get_sysmem_high_watermark().
|
||||||
|
@ -128,5 +128,5 @@ start_check_timer() ->
|
||||||
Interval = emqx_config:get([sysmon, os, cpu_check_interval]),
|
Interval = emqx_config:get([sysmon, os, cpu_check_interval]),
|
||||||
case erlang:system_info(system_architecture) of
|
case erlang:system_info(system_architecture) of
|
||||||
"x86_64-pc-linux-musl" -> ok;
|
"x86_64-pc-linux-musl" -> ok;
|
||||||
_ -> emqx_misc:start_timer(timer:seconds(Interval), check)
|
_ -> emqx_misc:start_timer(Interval, check)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -164,7 +164,7 @@ fields("node") ->
|
||||||
, {"config_files", t(list(string()), "emqx.config_files",
|
, {"config_files", t(list(string()), "emqx.config_files",
|
||||||
[ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"])
|
[ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"])
|
||||||
])}
|
])}
|
||||||
, {"global_gc_interval", t(duration_s(), undefined, "15m")}
|
, {"global_gc_interval", t(duration(), undefined, "15m")}
|
||||||
, {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
|
, {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
|
||||||
, {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")}
|
, {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")}
|
||||||
, {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
|
, {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
|
||||||
|
@ -288,10 +288,10 @@ fields("mqtt") ->
|
||||||
, {"max_subscriptions", maybe_infinity(range(1, inf))}
|
, {"max_subscriptions", maybe_infinity(range(1, inf))}
|
||||||
, {"upgrade_qos", t(boolean(), undefined, false)}
|
, {"upgrade_qos", t(boolean(), undefined, false)}
|
||||||
, {"max_inflight", t(range(1, 65535), undefined, 32)}
|
, {"max_inflight", t(range(1, 65535), undefined, 32)}
|
||||||
, {"retry_interval", t(duration_s(), undefined, "30s")}
|
, {"retry_interval", t(duration(), undefined, "30s")}
|
||||||
, {"max_awaiting_rel", maybe_infinity(integer(), 100)}
|
, {"max_awaiting_rel", maybe_infinity(integer(), 100)}
|
||||||
, {"await_rel_timeout", t(duration_s(), undefined, "300s")}
|
, {"await_rel_timeout", t(duration(), undefined, "300s")}
|
||||||
, {"session_expiry_interval", t(duration_s(), undefined, "2h")}
|
, {"session_expiry_interval", t(duration(), undefined, "2h")}
|
||||||
, {"max_mqueue_len", maybe_infinity(range(0, inf), 1000)}
|
, {"max_mqueue_len", maybe_infinity(range(0, inf), 1000)}
|
||||||
, {"mqueue_priorities", maybe_disabled(map())}
|
, {"mqueue_priorities", maybe_disabled(map())}
|
||||||
, {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)}
|
, {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)}
|
||||||
|
@ -507,10 +507,10 @@ fields("sysmon_vm") ->
|
||||||
];
|
];
|
||||||
|
|
||||||
fields("sysmon_os") ->
|
fields("sysmon_os") ->
|
||||||
[ {"cpu_check_interval", t(duration_s(), undefined, 60)}
|
[ {"cpu_check_interval", t(duration(), undefined, "60s")}
|
||||||
, {"cpu_high_watermark", t(percent(), undefined, "80%")}
|
, {"cpu_high_watermark", t(percent(), undefined, "80%")}
|
||||||
, {"cpu_low_watermark", t(percent(), undefined, "60%")}
|
, {"cpu_low_watermark", t(percent(), undefined, "60%")}
|
||||||
, {"mem_check_interval", maybe_disabled(duration_s(), 60)}
|
, {"mem_check_interval", maybe_disabled(duration(), "60s")}
|
||||||
, {"sysmem_high_watermark", t(percent(), undefined, "70%")}
|
, {"sysmem_high_watermark", t(percent(), undefined, "70%")}
|
||||||
, {"procmem_high_watermark", t(percent(), undefined, "5%")}
|
, {"procmem_high_watermark", t(percent(), undefined, "5%")}
|
||||||
];
|
];
|
||||||
|
@ -518,7 +518,7 @@ fields("sysmon_os") ->
|
||||||
fields("alarm") ->
|
fields("alarm") ->
|
||||||
[ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])}
|
[ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])}
|
||||||
, {"size_limit", t(integer(), undefined, 1000)}
|
, {"size_limit", t(integer(), undefined, 1000)}
|
||||||
, {"validity_period", t(duration_s(), undefined, "24h")}
|
, {"validity_period", t(duration(), undefined, "24h")}
|
||||||
];
|
];
|
||||||
|
|
||||||
fields(ExtraField) ->
|
fields(ExtraField) ->
|
||||||
|
|
|
@ -178,10 +178,10 @@ init(Opts) ->
|
||||||
inflight = emqx_inflight:new(MaxInflight),
|
inflight = emqx_inflight:new(MaxInflight),
|
||||||
mqueue = emqx_mqueue:init(QueueOpts),
|
mqueue = emqx_mqueue:init(QueueOpts),
|
||||||
next_pkt_id = 1,
|
next_pkt_id = 1,
|
||||||
retry_interval = timer:seconds(maps:get(retry_interval, Opts, 30)),
|
retry_interval = maps:get(retry_interval, Opts, 30000),
|
||||||
awaiting_rel = #{},
|
awaiting_rel = #{},
|
||||||
max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100),
|
max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100),
|
||||||
await_rel_timeout = timer:seconds(maps:get(await_rel_timeout, Opts, 300)),
|
await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000),
|
||||||
created_at = erlang:system_time(millisecond)
|
created_at = erlang:system_time(millisecond)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -211,7 +211,7 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
|
||||||
info(inflight_max, #session{inflight = Inflight}) ->
|
info(inflight_max, #session{inflight = Inflight}) ->
|
||||||
emqx_inflight:max_size(Inflight);
|
emqx_inflight:max_size(Inflight);
|
||||||
info(retry_interval, #session{retry_interval = Interval}) ->
|
info(retry_interval, #session{retry_interval = Interval}) ->
|
||||||
Interval div 1000;
|
Interval;
|
||||||
info(mqueue, #session{mqueue = MQueue}) ->
|
info(mqueue, #session{mqueue = MQueue}) ->
|
||||||
MQueue;
|
MQueue;
|
||||||
info(mqueue_len, #session{mqueue = MQueue}) ->
|
info(mqueue_len, #session{mqueue = MQueue}) ->
|
||||||
|
@ -229,7 +229,7 @@ info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
|
||||||
info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
|
info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
|
||||||
Max;
|
Max;
|
||||||
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
|
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
|
||||||
Timeout div 1000;
|
Timeout;
|
||||||
info(created_at, #session{created_at = CreatedAt}) ->
|
info(created_at, #session{created_at = CreatedAt}) ->
|
||||||
CreatedAt.
|
CreatedAt.
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ all() ->
|
||||||
emqx_ct:all(?MODULE).
|
emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
mqtt_conf() ->
|
mqtt_conf() ->
|
||||||
#{await_rel_timeout => 300,
|
#{await_rel_timeout => 300000,
|
||||||
idle_timeout => 15000,
|
idle_timeout => 15000,
|
||||||
ignore_loop_deliver => false,
|
ignore_loop_deliver => false,
|
||||||
keepalive_backoff => 0.75,
|
keepalive_backoff => 0.75,
|
||||||
|
@ -49,9 +49,9 @@ mqtt_conf() ->
|
||||||
peer_cert_as_username => disabled,
|
peer_cert_as_username => disabled,
|
||||||
response_information => [],
|
response_information => [],
|
||||||
retain_available => true,
|
retain_available => true,
|
||||||
retry_interval => 30,
|
retry_interval => 30000,
|
||||||
server_keepalive => disabled,
|
server_keepalive => disabled,
|
||||||
session_expiry_interval => 7200,
|
session_expiry_interval => 7200000,
|
||||||
shared_subscription => true,
|
shared_subscription => true,
|
||||||
strict_mode => false,
|
strict_mode => false,
|
||||||
upgrade_qos => false,
|
upgrade_qos => false,
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
t_run_gc(_) ->
|
t_run_gc(_) ->
|
||||||
ok = emqx_config:put([node, global_gc_interval], 1),
|
ok = emqx_config:put([node, global_gc_interval], 1000),
|
||||||
{ok, _} = emqx_global_gc:start_link(),
|
{ok, _} = emqx_global_gc:start_link(),
|
||||||
ok = timer:sleep(1500),
|
ok = timer:sleep(1500),
|
||||||
{ok, MilliSecs} = emqx_global_gc:run(),
|
{ok, MilliSecs} = emqx_global_gc:run(),
|
||||||
|
|
|
@ -25,8 +25,8 @@ all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_config:put([sysmon, os], #{
|
emqx_config:put([sysmon, os], #{
|
||||||
cpu_check_interval => 60,cpu_high_watermark => 0.8,
|
cpu_check_interval => 60000,cpu_high_watermark => 0.8,
|
||||||
cpu_low_watermark => 0.6,mem_check_interval => 60,
|
cpu_low_watermark => 0.6,mem_check_interval => 60000,
|
||||||
procmem_high_watermark => 0.05,sysmem_high_watermark => 0.7}),
|
procmem_high_watermark => 0.05,sysmem_high_watermark => 0.7}),
|
||||||
application:ensure_all_started(os_mon),
|
application:ensure_all_started(os_mon),
|
||||||
Config.
|
Config.
|
||||||
|
@ -38,11 +38,11 @@ t_api(_) ->
|
||||||
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
|
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
|
||||||
{ok, _} = emqx_os_mon:start_link(),
|
{ok, _} = emqx_os_mon:start_link(),
|
||||||
|
|
||||||
?assertEqual(60, emqx_os_mon:get_mem_check_interval()),
|
?assertEqual(60000, emqx_os_mon:get_mem_check_interval()),
|
||||||
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30)),
|
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30000)),
|
||||||
?assertEqual(60, emqx_os_mon:get_mem_check_interval()),
|
?assertEqual(60000, emqx_os_mon:get_mem_check_interval()),
|
||||||
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(122)),
|
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(122000)),
|
||||||
?assertEqual(120, emqx_os_mon:get_mem_check_interval()),
|
?assertEqual(120000, emqx_os_mon:get_mem_check_interval()),
|
||||||
|
|
||||||
?assertEqual(70, emqx_os_mon:get_sysmem_high_watermark()),
|
?assertEqual(70, emqx_os_mon:get_sysmem_high_watermark()),
|
||||||
?assertEqual(ok, emqx_os_mon:set_sysmem_high_watermark(0.8)),
|
?assertEqual(ok, emqx_os_mon:set_sysmem_high_watermark(0.8)),
|
||||||
|
|
|
@ -59,11 +59,11 @@ t_session_init(_) ->
|
||||||
?assertEqual(0, emqx_session:info(inflight_cnt, Session)),
|
?assertEqual(0, emqx_session:info(inflight_cnt, Session)),
|
||||||
?assertEqual(64, emqx_session:info(inflight_max, Session)),
|
?assertEqual(64, emqx_session:info(inflight_max, Session)),
|
||||||
?assertEqual(1, emqx_session:info(next_pkt_id, Session)),
|
?assertEqual(1, emqx_session:info(next_pkt_id, Session)),
|
||||||
?assertEqual(30, emqx_session:info(retry_interval, Session)),
|
?assertEqual(30000, emqx_session:info(retry_interval, Session)),
|
||||||
?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))),
|
?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))),
|
||||||
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
|
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
|
||||||
?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
|
?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
|
||||||
?assertEqual(300, emqx_session:info(await_rel_timeout, Session)),
|
?assertEqual(300000, emqx_session:info(await_rel_timeout, Session)),
|
||||||
?assert(is_integer(emqx_session:info(created_at, Session))).
|
?assert(is_integer(emqx_session:info(created_at, Session))).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -73,8 +73,8 @@ t_session_init(_) ->
|
||||||
t_session_info(_) ->
|
t_session_info(_) ->
|
||||||
?assertMatch(#{subscriptions := #{},
|
?assertMatch(#{subscriptions := #{},
|
||||||
upgrade_qos := false,
|
upgrade_qos := false,
|
||||||
retry_interval := 30,
|
retry_interval := 30000,
|
||||||
await_rel_timeout := 300
|
await_rel_timeout := 300000
|
||||||
}, emqx_session:info(session())).
|
}, emqx_session:info(session())).
|
||||||
|
|
||||||
t_session_stats(_) ->
|
t_session_stats(_) ->
|
||||||
|
@ -309,9 +309,11 @@ t_enqueue(_) ->
|
||||||
|
|
||||||
t_retry(_) ->
|
t_retry(_) ->
|
||||||
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
||||||
Session = session(#{retry_interval => 100}),
|
RetryIntervalMs = 100, %% 0.1s
|
||||||
|
Session = session(#{retry_interval => RetryIntervalMs}),
|
||||||
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
|
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
|
||||||
ok = timer:sleep(200),
|
ElapseMs = 200, %% 0.2s
|
||||||
|
ok = timer:sleep(ElapseMs),
|
||||||
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
|
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
|
||||||
{ok, Msgs1, 100, Session2} = emqx_session:retry(Session1),
|
{ok, Msgs1, 100, Session2} = emqx_session:retry(Session1),
|
||||||
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
|
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
|
||||||
|
@ -342,7 +344,7 @@ t_replay(_) ->
|
||||||
|
|
||||||
t_expire_awaiting_rel(_) ->
|
t_expire_awaiting_rel(_) ->
|
||||||
{ok, Session} = emqx_session:expire(awaiting_rel, session()),
|
{ok, Session} = emqx_session:expire(awaiting_rel, session()),
|
||||||
Timeout = emqx_session:info(await_rel_timeout, Session) * 1000,
|
Timeout = emqx_session:info(await_rel_timeout, Session),
|
||||||
Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session),
|
Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session),
|
||||||
{ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1),
|
{ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1),
|
||||||
?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)).
|
?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)).
|
||||||
|
|
|
@ -1391,9 +1391,9 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
||||||
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
||||||
emqx_keepalive:info(interval, KeepAlive);
|
emqx_keepalive:info(interval, KeepAlive);
|
||||||
interval(retry_timer, #channel{session = Session}) ->
|
interval(retry_timer, #channel{session = Session}) ->
|
||||||
timer:seconds(emqx_session:info(retry_interval, Session));
|
emqx_session:info(retry_interval, Session);
|
||||||
interval(await_timer, #channel{session = Session}) ->
|
interval(await_timer, #channel{session = Session}) ->
|
||||||
timer:seconds(emqx_session:info(await_rel_timeout, Session)).
|
emqx_session:info(await_rel_timeout, Session).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
|
|
|
@ -590,18 +590,21 @@ print({client, {ClientId, ChanPid}}) ->
|
||||||
InfoKeys = [clientid, username, peername,
|
InfoKeys = [clientid, username, peername,
|
||||||
clean_start, keepalive, expiry_interval,
|
clean_start, keepalive, expiry_interval,
|
||||||
subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
|
subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
|
||||||
connected, created_at, connected_at] ++ case maps:is_key(disconnected_at, Info) of
|
connected, created_at, connected_at] ++
|
||||||
true -> [disconnected_at];
|
case maps:is_key(disconnected_at, Info) of
|
||||||
false -> []
|
true -> [disconnected_at];
|
||||||
end,
|
false -> []
|
||||||
|
end,
|
||||||
|
Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000},
|
||||||
emqx_ctl:print("Client(~s, username=~s, peername=~s, "
|
emqx_ctl:print("Client(~s, username=~s, peername=~s, "
|
||||||
"clean_start=~s, keepalive=~w, session_expiry_interval=~w, "
|
"clean_start=~s, keepalive=~w, session_expiry_interval=~w, "
|
||||||
"subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, "
|
"subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, "
|
||||||
"connected=~s, created_at=~w, connected_at=~w" ++ case maps:is_key(disconnected_at, Info) of
|
"connected=~s, created_at=~w, connected_at=~w" ++
|
||||||
true -> ", disconnected_at=~w)~n";
|
case maps:is_key(disconnected_at, Info1) of
|
||||||
false -> ")~n"
|
true -> ", disconnected_at=~w)~n";
|
||||||
end,
|
false -> ")~n"
|
||||||
[format(K, maps:get(K, Info)) || K <- InfoKeys]);
|
end,
|
||||||
|
[format(K, maps:get(K, Info1)) || K <- InfoKeys]);
|
||||||
|
|
||||||
print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->
|
print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->
|
||||||
emqx_ctl:print("~s -> ~s~n", [Topic, Node]);
|
emqx_ctl:print("~s -> ~s~n", [Topic, Node]);
|
||||||
|
|
|
@ -96,6 +96,7 @@ t_clients(_) ->
|
||||||
SubscribeBody = #{topic => Topic, qos => Qos},
|
SubscribeBody = #{topic => Topic, qos => Qos},
|
||||||
SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]),
|
SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]),
|
||||||
{ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody),
|
{ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody),
|
||||||
|
timer:sleep(100),
|
||||||
[{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
|
[{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
|
||||||
?assertEqual(AfterSubTopic, Topic),
|
?assertEqual(AfterSubTopic, Topic),
|
||||||
?assertEqual(AfterSubQos, Qos),
|
?assertEqual(AfterSubQos, Qos),
|
||||||
|
@ -103,4 +104,5 @@ t_clients(_) ->
|
||||||
%% delete /clients/:clientid/subscribe
|
%% delete /clients/:clientid/subscribe
|
||||||
UnSubscribeQuery = "topic=" ++ binary_to_list(Topic),
|
UnSubscribeQuery = "topic=" ++ binary_to_list(Topic),
|
||||||
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, SubscribePath, UnSubscribeQuery, AuthHeader),
|
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, SubscribePath, UnSubscribeQuery, AuthHeader),
|
||||||
|
timer:sleep(100),
|
||||||
?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)).
|
?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)).
|
||||||
|
|
|
@ -31,7 +31,7 @@ emqx_modules: {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
type: telemetry
|
type: telemetry
|
||||||
enable: true
|
enable: false
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -102,7 +102,7 @@ connected_presence(#{peerhost := PeerHost,
|
||||||
keepalive => Keepalive,
|
keepalive => Keepalive,
|
||||||
connack => 0, %% Deprecated?
|
connack => 0, %% Deprecated?
|
||||||
clean_start => CleanStart,
|
clean_start => CleanStart,
|
||||||
expiry_interval => ExpiryInterval,
|
expiry_interval => ExpiryInterval div 1000,
|
||||||
connected_at => ConnectedAt,
|
connected_at => ConnectedAt,
|
||||||
ts => erlang:system_time(millisecond)
|
ts => erlang:system_time(millisecond)
|
||||||
}.
|
}.
|
||||||
|
|
|
@ -182,7 +182,7 @@ eventmsg_connected(_ClientInfo = #{
|
||||||
keepalive => Keepalive,
|
keepalive => Keepalive,
|
||||||
clean_start => CleanStart,
|
clean_start => CleanStart,
|
||||||
receive_maximum => RcvMax,
|
receive_maximum => RcvMax,
|
||||||
expiry_interval => ExpiryInterval,
|
expiry_interval => ExpiryInterval div 1000,
|
||||||
is_bridge => IsBridge,
|
is_bridge => IsBridge,
|
||||||
conn_props => printable_maps(ConnProps),
|
conn_props => printable_maps(ConnProps),
|
||||||
connected_at => ConnectedAt
|
connected_at => ConnectedAt
|
||||||
|
|
Loading…
Reference in New Issue