Merge branch 'master' into update_config

This commit is contained in:
turtleDeng 2021-07-23 18:09:01 +08:00 committed by GitHub
commit 29826000f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 1444 additions and 694 deletions

View File

@ -9,7 +9,7 @@
[![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow)](https://askemq.com)
[![YouTube](https://img.shields.io/badge/Subscribe-EMQ%20中文-FF0000?logo=youtube)](https://www.youtube.com/channel/UCir_r04HIsLjf2qqyZ4A8Cg)
[![最棒的物联网 MQTT 开源团队期待您的加入](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://careers.emqx.cn/)
[![最棒的物联网 MQTT 开源团队期待您的加入](https://static.emqx.net/images/github_readme_cn_bg.png)](https://careers.emqx.cn/)
[English](./README.md) | 简体中文 | [日本語](./README-JP.md) | [русский](./README-RU.md)
@ -18,7 +18,7 @@
从 3.0 版本开始,*EMQ X* 完整支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,并支持 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP 等通信协议。EMQ X 3.0 单集群可支持千万级别的 MQTT 并发连接。
- 新功能的完整列表,请参阅 [EMQ X Release Notes](https://github.com/emqx/emqx/releases)。
- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.cn/)。
- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.io/zh)。
## 安装
@ -34,7 +34,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p
#### 二进制软件包安装
需从 [EMQ X 下载](https://www.emqx.cn/downloads) 页面获取相应操作系统的二进制软件包。
需从 [EMQ X 下载](https://www.emqx.com/zh/downloads) 页面获取相应操作系统的二进制软件包。
- [单节点安装文档](https://docs.emqx.cn/broker/latest/getting-started/install.html)
- [集群配置文档](https://docs.emqx.cn/broker/latest/advanced/cluster.html)
@ -133,7 +133,7 @@ DIALYZER_ANALYSE_APP=emqx_lwm2m,emqx_authz make dialyzer
- [Facebook](https://www.facebook.com/emqxmqtt)
- [Reddit](https://www.reddit.com/r/emqx/)
- [Weibo](https://weibo.com/emqtt)
- [Blog](https://www.emqx.cn/blog)
- [Blog](https://www.emqx.com/zh/blog)
欢迎你将任何 bug、问题和功能请求提交到 [emqx/emqx](https://github.com/emqx/emqx/issues)。

View File

@ -8,7 +8,7 @@
[![Twitter](https://img.shields.io/badge/Twitter-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech)
[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
[![The best IoT MQTT open source team looks forward to your joining](https://static.emqx.net/images/github_readme_en_bg.png)](https://www.emqx.com/en/careers)
[English](./README.md) | [简体中文](./README-CN.md) | 日本語 | [русский](./README-RU.md)
@ -35,7 +35,7 @@ docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p
#### バイナリパッケージによるインストール
それぞれのOSに対応したバイナリソフトウェアパッケージは、[EMQ Xのダウンロード](https://www.emqx.io/downloads)ページから取得できます。
それぞれのOSに対応したバイナリソフトウェアパッケージは、[EMQ Xのダウンロード](https://www.emqx.com/en/downloads)ページから取得できます。
- [シングルノードインストール](https://docs.emqx.io/broker/latest/en/getting-started/installation.html)
- [マルチノードインストール](https://docs.emqx.io/broker/latest/en/advanced/cluster.html)

View File

@ -9,7 +9,7 @@
[![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow?logo=github)](https://github.com/emqx/emqx/discussions)
[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
[![The best IoT MQTT open source team looks forward to your joining](https://static.emqx.net/images/github_readme_en_bg.png)](https://www.emqx.com/en/careers)
[English](./README.md) | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | русский
@ -18,7 +18,7 @@
Начиная с релиза 3.0, брокер *EMQ X* полностью поддерживает протокол MQTT версии 5.0, и обратно совместим с версиями 3.1 и 3.1.1, а также протоколами MQTT-SN, CoAP, LwM2M, WebSocket и STOMP. Начиная с релиза 3.0, брокер *EMQ X* может масштабироваться до более чем 10 миллионов одновременных MQTT соединений на один кластер.
- Полный список возможностей доступен по ссылке: [EMQ X Release Notes](https://github.com/emqx/emqx/releases).
- Более подробная информация доступна на нашем сайте: [EMQ X homepage](https://www.emqx.io).
- Более подробная информация доступна на нашем сайте: [EMQ X homepage](https://www.emqx.io/).
## Установка
@ -34,7 +34,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p
#### Установка бинарного пакета
Сборки для различных операционных систем: [Загрузить EMQ X](https://www.emqx.io/downloads).
Сборки для различных операционных систем: [Загрузить EMQ X](https://www.emqx.com/en/downloads).
- [Установка на одном сервере](https://docs.emqx.io/en/broker/latest/getting-started/install.html)
- [Установка на кластере](https://docs.emqx.io/en/broker/latest/advanced/cluster.html)

View File

@ -8,7 +8,7 @@
[![Twitter](https://img.shields.io/badge/Follow-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech)
[![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q)
[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
[![The best IoT MQTT open source team looks forward to your joining](https://static.emqx.net/images/github_readme_en_bg.png)](https://www.emqx.com/en/careers)
English | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | [русский](./README-RU.md)
@ -17,7 +17,7 @@ English | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | [рус
Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster.
- For full list of new features, please read [EMQ X Release Notes](https://github.com/emqx/emqx/releases).
- For more information, please visit [EMQ X homepage](https://www.emqx.io).
- For more information, please visit [EMQ X homepage](https://www.emqx.io/).
## Installation
@ -33,7 +33,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p
#### Installing via Binary Package
Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.io/downloads) page.
Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.com/en/downloads) page.
- [Single Node Install](https://docs.emqx.io/en/broker/latest/getting-started/install.html)
- [Multi Node Install](https://docs.emqx.io/en/broker/latest/advanced/cluster.html)

View File

@ -813,7 +813,7 @@ broker {
## - `auth.*`
## - `stats.*`
## - `mqtt.*`
## - `acl.*`
## - `authorization.*`
## - `flapping_detect.*`
## - `force_shutdown.*`
## - `conn_congestion.*`
@ -1095,18 +1095,18 @@ zones.default {
}
acl {
authorization {
## Enable ACL check.
##
## @doc zones.<name>.acl.enable
## @doc zones.<name>.authorization.enable
## ValueType: Boolean
## Default: false
enable: false
## Default: true
enable: true
## The action when acl check reject current operation
## The action when authorization check reject current operation
##
## @doc zones.<name>.acl.deny_action
## @doc zones.<name>.authorization.deny_action
## ValueType: ignore | disconnect
## Default: ignore
deny_action: ignore
@ -1115,14 +1115,14 @@ zones.default {
##
## If enabled, ACLs roles for each client will be cached in the memory
##
## @doc zones.<name>.acl.cache.enable
## @doc zones.<name>.authorization.cache.enable
## ValueType: Boolean
## Default: true
cache.enable: true
## The maximum count of ACL entries can be cached for a client.
##
## @doc zones.<name>.acl.cache.max_size
## @doc zones.<name>.authorization.cache.max_size
## ValueType: Integer
## Range: [0, 1048576]
## Default: 32
@ -1130,7 +1130,7 @@ zones.default {
## The time after which an ACL cache entry will be deleted
##
## @doc zones.<name>.acl.cache.ttl
## @doc zones.<name>.authorization.cache.ttl
## ValueType: Duration
## Default: 1m
cache.ttl: 1m
@ -1857,7 +1857,7 @@ zones.default {
#This is an example zone which has less "strict" settings.
#It's useful to clients connecting the broker from trusted networks.
zones.internal {
acl.enable: false
authorization.enable: true
auth.enable: false
listeners.mqtt_internal: {
type: tcp

View File

@ -250,5 +250,4 @@ emqx_feature() ->
, emqx_bridge_mqtt
, emqx_modules
, emqx_management
, emqx_retainer
, emqx_statsd].
, emqx_retainer].

View File

@ -52,15 +52,15 @@ drain_k() -> {?MODULE, drain_timestamp}.
-spec(is_enabled(atom()) -> boolean()).
is_enabled(Zone) ->
emqx_config:get_zone_conf(Zone, [acl, cache, enable]).
emqx_config:get_zone_conf(Zone, [authorization, cache, enable]).
-spec(get_cache_max_size(atom()) -> integer()).
get_cache_max_size(Zone) ->
emqx_config:get_zone_conf(Zone, [acl, cache, max_size]).
emqx_config:get_zone_conf(Zone, [authorization, cache, max_size]).
-spec(get_cache_ttl(atom()) -> integer()).
get_cache_ttl(Zone) ->
emqx_config:get_zone_conf(Zone, [acl, cache, ttl]).
emqx_config:get_zone_conf(Zone, [authorization, cache, ttl]).
-spec(list_acl_cache(atom()) -> [acl_cache_entry()]).
list_acl_cache(Zone) ->

View File

@ -36,6 +36,8 @@
, stop/0
]).
-export([format/1]).
%% API
-export([ activate/1
, activate/2
@ -157,6 +159,27 @@ handle_update_config(#{<<"validity_period">> := Period0} = NewConf, OldConf) ->
handle_update_config(NewConf, OldConf) ->
maps:merge(OldConf, NewConf).
format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) ->
Now = erlang:system_time(microsecond),
#{
node => node(),
name => Name,
message => Message,
duration => Now - At,
details => Details
};
format(#deactivated_alarm{name = Name, message = Message, activate_at = At, details = Details,
deactivate_at = DAt}) ->
#{
node => node(),
name => Name,
message => Message,
duration => DAt - At,
details => Details
};
format(_) ->
{error, unknow_alarm}.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@ -249,7 +272,7 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
get_validity_period() ->
timer:seconds(emqx_config:get([alarm, validity_period])).
emqx_config:get([alarm, validity_period]).
deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name,
details = Details0, message = Msg0}) ->

View File

@ -435,7 +435,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
HasAclDeny = lists:any(fun({_TopicFilter, ReasonCode}) ->
ReasonCode =:= ?RC_NOT_AUTHORIZED
end, TupleTopicFilters0),
DenyAction = emqx_config:get_zone_conf(Zone, [acl, deny_action]),
DenyAction = emqx_config:get_zone_conf(Zone, [authorization, deny_action]),
case DenyAction =:= disconnect andalso HasAclDeny of
true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel);
false ->
@ -551,7 +551,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId),
{error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
?LOG(warning, "Cannot publish message to ~s due to ~s.",
[Topic, emqx_reason_codes:text(Rc)]),
case emqx_config:get_zone_conf(Zone, [acl_deny_action]) of
case emqx_config:get_zone_conf(Zone, [authorization, deny_action]) of
ignore ->
case QoS of
?QOS_0 -> {ok, NChannel};
@ -737,7 +737,7 @@ process_disconnect(ReasonCode, Properties, Channel) ->
maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
Channel = #channel{conninfo = ConnInfo}) ->
Channel#channel{conninfo = ConnInfo#{expiry_interval => Interval}};
Channel#channel{conninfo = ConnInfo#{expiry_interval => timer:seconds(Interval)}};
maybe_update_expiry_interval(_Properties, Channel) -> Channel.
%%--------------------------------------------------------------------
@ -1114,11 +1114,11 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{session = Session}) ->
timer:seconds(emqx_session:info(retry_interval, Session));
emqx_session:info(retry_interval, Session);
interval(await_timer, #channel{session = Session}) ->
timer:seconds(emqx_session:info(await_rel_timeout, Session));
emqx_session:info(await_rel_timeout, Session);
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
timer:seconds(maps:get(expiry_interval, ConnInfo));
maps:get(expiry_interval, ConnInfo);
interval(will_timer, #channel{will_msg = WillMsg}) ->
timer:seconds(will_delay_interval(WillMsg)).
@ -1176,7 +1176,7 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{
%% If the Session Expiry Interval is absent the value 0 is used.
expiry_interval(_, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
properties = ConnProps}) ->
emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0);
timer:seconds(emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0));
expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) ->
get_mqtt_conf(Zone, session_expiry_interval);
expiry_interval(_, #mqtt_packet_connect{clean_start = true}) ->
@ -1615,14 +1615,14 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
case maps:get(expiry_interval, ConnInfo) of
?UINT_MAX -> {ok, Channel};
I when I > 0 ->
{ok, ensure_timer(expire_timer, timer:seconds(I), Channel)};
{ok, ensure_timer(expire_timer, I, Channel)};
_ -> shutdown(Reason, Channel)
end.
%%--------------------------------------------------------------------
%% Is ACL enabled?
is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) ->
(not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [acl, enable]).
(not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [authorization, enable]).
%%--------------------------------------------------------------------
%% Parse Topic Filters

View File

@ -248,22 +248,22 @@ create_session(ClientInfo, ConnInfo) ->
Session.
get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
#{max_subscriptions => get_conf(Zone, max_subscriptions),
upgrade_qos => get_conf(Zone, upgrade_qos),
#{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
upgrade_qos => get_mqtt_conf(Zone, upgrade_qos),
max_inflight => MaxInflight,
retry_interval => get_conf(Zone, retry_interval),
await_rel_timeout => get_conf(Zone, await_rel_timeout),
retry_interval => get_mqtt_conf(Zone, retry_interval),
await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout),
mqueue => mqueue_confs(Zone)
}.
mqueue_confs(Zone) ->
#{max_len => get_conf(Zone, max_mqueue_len),
store_qos0 => get_conf(Zone, mqueue_store_qos0),
priorities => get_conf(Zone, mqueue_priorities),
default_priority => get_conf(Zone, mqueue_default_priority)
#{max_len => get_mqtt_conf(Zone, max_mqueue_len),
store_qos0 => get_mqtt_conf(Zone, mqueue_store_qos0),
priorities => get_mqtt_conf(Zone, mqueue_priorities),
default_priority => get_mqtt_conf(Zone, mqueue_default_priority)
}.
get_conf(Zone, Key) ->
get_mqtt_conf(Zone, Key) ->
emqx_config:get_zone_conf(Zone, [mqtt, Key]).
%% @doc Try to takeover a session.

View File

@ -87,7 +87,7 @@ code_change(_OldVsn, State, _Extra) ->
ensure_timer(State) ->
case emqx_config:get([node, global_gc_interval]) of
undefined -> State;
Interval -> TRef = emqx_misc:start_timer(timer:seconds(Interval), run),
Interval -> TRef = emqx_misc:start_timer(Interval, run),
State#{timer := TRef}
end.

View File

@ -20,9 +20,11 @@
-include("emqx_mqtt.hrl").
%% APIs
-export([ start/0
-export([ list/0
, start/0
, restart/0
, stop/0
, is_running/1
]).
-export([ start_listener/1
@ -33,6 +35,57 @@
, restart_listener/3
]).
-spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]).
list() ->
Zones = maps:to_list(emqx_config:get([zones], #{})),
lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]).
list(ZoneName, ZoneConf) ->
Listeners = maps:to_list(maps:get(listeners, ZoneConf, #{})),
[
begin
ListenerId = listener_id(ZoneName, LName),
Running = is_running(ListenerId),
Conf = merge_zone_and_listener_confs(ZoneConf, LConf),
{ListenerId, maps:put(running, Running, Conf)}
end
|| {LName, LConf} <- Listeners].
-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}.
is_running(ListenerId) ->
Zones = maps:to_list(emqx_config:get([zones], #{})),
Listeners = lists:append(
[
[{listener_id(ZoneName, LName),merge_zone_and_listener_confs(ZoneConf, LConf)}
|| {LName, LConf} <- maps:to_list(maps:get(listeners, ZoneConf, #{}))]
|| {ZoneName, ZoneConf} <- Zones]),
case proplists:get_value(ListenerId, Listeners, undefined) of
undefined ->
{error, no_found};
Conf ->
is_running(ListenerId, Conf)
end.
is_running(ListenerId, #{type := tcp, bind := ListenOn})->
try esockd:listener({ListenerId, ListenOn}) of
Pid when is_pid(Pid)->
true
catch _:_ ->
false
end;
is_running(ListenerId, #{type := ws})->
try
Info = ranch:info(ListenerId),
proplists:get_value(status, Info) =:= running
catch _:_ ->
false
end;
is_running(_ListenerId, #{type := quic})->
%% TODO: quic support
{error, no_found}.
%% @doc Start all listeners.
-spec(start() -> ok).
start() ->
@ -52,6 +105,8 @@ start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) ->
{ok, _} ->
console_print("Start ~s listener ~s on ~s successfully.~n",
[Type, listener_id(ZoneName, ListenerName), format(Bind)]);
{error, {already_started, Pid}} ->
{error, {already_started, Pid}};
{error, Reason} ->
io:format(standard_error, "Failed to start ~s listener ~s on ~s: ~0p~n",
[Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]),

View File

@ -53,12 +53,12 @@ start_link() ->
%%--------------------------------------------------------------------
get_mem_check_interval() ->
memsup:get_check_interval() div 1000.
memsup:get_check_interval().
set_mem_check_interval(Seconds) when Seconds < 60 ->
set_mem_check_interval(Seconds) when Seconds < 60000 ->
memsup:set_check_interval(1);
set_mem_check_interval(Seconds) ->
memsup:set_check_interval(Seconds div 60).
memsup:set_check_interval(Seconds div 60000).
get_sysmem_high_watermark() ->
memsup:get_sysmem_high_watermark().
@ -128,5 +128,5 @@ start_check_timer() ->
Interval = emqx_config:get([sysmon, os, cpu_check_interval]),
case erlang:system_info(system_architecture) of
"x86_64-pc-linux-musl" -> ok;
_ -> emqx_misc:start_timer(timer:seconds(Interval), check)
_ -> emqx_misc:start_timer(Interval, check)
end.

View File

@ -164,7 +164,7 @@ fields("node") ->
, {"config_files", t(list(string()), "emqx.config_files",
[ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"])
])}
, {"global_gc_interval", t(duration_s(), undefined, "15m")}
, {"global_gc_interval", t(duration(), undefined, "15m")}
, {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
, {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")}
, {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
@ -257,13 +257,13 @@ fields("auth") ->
[ {"enable", t(boolean(), undefined, false)}
];
fields("acl") ->
[ {"enable", t(boolean(), undefined, false)}
, {"cache", ref("acl_cache")}
fields("authorization") ->
[ {"enable", t(boolean(), undefined, true)}
, {"cache", ref("authorization_cache")}
, {"deny_action", t(union(ignore, disconnect), undefined, ignore)}
];
fields("acl_cache") ->
fields("authorization_cache") ->
[ {"enable", t(boolean(), undefined, true)}
, {"max_size", t(range(1, 1048576), undefined, 32)}
, {"ttl", t(duration(), undefined, "1m")}
@ -288,10 +288,10 @@ fields("mqtt") ->
, {"max_subscriptions", maybe_infinity(range(1, inf))}
, {"upgrade_qos", t(boolean(), undefined, false)}
, {"max_inflight", t(range(1, 65535), undefined, 32)}
, {"retry_interval", t(duration_s(), undefined, "30s")}
, {"retry_interval", t(duration(), undefined, "30s")}
, {"max_awaiting_rel", maybe_infinity(integer(), 100)}
, {"await_rel_timeout", t(duration_s(), undefined, "300s")}
, {"session_expiry_interval", t(duration_s(), undefined, "2h")}
, {"await_rel_timeout", t(duration(), undefined, "300s")}
, {"session_expiry_interval", t(duration(), undefined, "2h")}
, {"max_mqueue_len", maybe_infinity(range(0, inf), 1000)}
, {"mqueue_priorities", maybe_disabled(map())}
, {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)}
@ -306,7 +306,7 @@ fields("zones") ->
fields("zone_settings") ->
[ {"mqtt", ref("mqtt")}
, {"acl", ref("acl")}
, {"authorization", ref("authorization")}
, {"auth", ref("auth")}
, {"stats", ref("stats")}
, {"flapping_detect", ref("flapping_detect")}
@ -507,10 +507,10 @@ fields("sysmon_vm") ->
];
fields("sysmon_os") ->
[ {"cpu_check_interval", t(duration_s(), undefined, 60)}
[ {"cpu_check_interval", t(duration(), undefined, "60s")}
, {"cpu_high_watermark", t(percent(), undefined, "80%")}
, {"cpu_low_watermark", t(percent(), undefined, "60%")}
, {"mem_check_interval", maybe_disabled(duration_s(), 60)}
, {"mem_check_interval", maybe_disabled(duration(), "60s")}
, {"sysmem_high_watermark", t(percent(), undefined, "70%")}
, {"procmem_high_watermark", t(percent(), undefined, "5%")}
];
@ -518,7 +518,7 @@ fields("sysmon_os") ->
fields("alarm") ->
[ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])}
, {"size_limit", t(integer(), undefined, 1000)}
, {"validity_period", t(duration_s(), undefined, "24h")}
, {"validity_period", t(duration(), undefined, "24h")}
];
fields(ExtraField) ->

View File

@ -178,10 +178,10 @@ init(Opts) ->
inflight = emqx_inflight:new(MaxInflight),
mqueue = emqx_mqueue:init(QueueOpts),
next_pkt_id = 1,
retry_interval = timer:seconds(maps:get(retry_interval, Opts, 30)),
retry_interval = maps:get(retry_interval, Opts, 30000),
awaiting_rel = #{},
max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100),
await_rel_timeout = timer:seconds(maps:get(await_rel_timeout, Opts, 300)),
await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000),
created_at = erlang:system_time(millisecond)
}.
@ -211,7 +211,7 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
info(inflight_max, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
info(retry_interval, #session{retry_interval = Interval}) ->
Interval div 1000;
Interval;
info(mqueue, #session{mqueue = MQueue}) ->
MQueue;
info(mqueue_len, #session{mqueue = MQueue}) ->
@ -229,7 +229,7 @@ info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
Max;
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout div 1000;
Timeout;
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt.

View File

@ -80,4 +80,4 @@ t_drain_acl_cache(_) ->
emqtt:stop(Client).
toggle_acl(Bool) when is_boolean(Bool) ->
emqx_config:put_zone_conf(default, [acl, enable], Bool).
emqx_config:put_zone_conf(default, [authorization, enable], Bool).

View File

@ -28,7 +28,7 @@ all() ->
emqx_ct:all(?MODULE).
mqtt_conf() ->
#{await_rel_timeout => 300,
#{await_rel_timeout => 300000,
idle_timeout => 15000,
ignore_loop_deliver => false,
keepalive_backoff => 0.75,
@ -49,9 +49,9 @@ mqtt_conf() ->
peer_cert_as_username => disabled,
response_information => [],
retain_available => true,
retry_interval => 30,
retry_interval => 30000,
server_keepalive => disabled,
session_expiry_interval => 7200,
session_expiry_interval => 7200000,
shared_subscription => true,
strict_mode => false,
upgrade_qos => false,
@ -140,7 +140,7 @@ listener_mqtt_ws_conf() ->
default_zone_conf() ->
#{zones =>
#{default =>
#{ acl => #{
#{ authorization => #{
cache => #{enable => true,max_size => 32, ttl => 60000},
deny_action => ignore,
enable => false
@ -863,7 +863,7 @@ t_packing_alias(_) ->
channel())).
t_check_pub_acl(_) ->
emqx_config:put_zone_conf(default, [acl, enable], true),
emqx_config:put_zone_conf(default, [authorization, enable], true),
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
ok = emqx_channel:check_pub_acl(Publish, channel()).
@ -873,7 +873,7 @@ t_check_pub_alias(_) ->
ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel).
t_check_sub_acls(_) ->
emqx_config:put_zone_conf(default, [acl, enable], true),
emqx_config:put_zone_conf(default, [authorization, enable], true),
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
[{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()).

View File

@ -24,7 +24,7 @@
all() -> emqx_ct:all(?MODULE).
t_run_gc(_) ->
ok = emqx_config:put([node, global_gc_interval], 1),
ok = emqx_config:put([node, global_gc_interval], 1000),
{ok, _} = emqx_global_gc:start_link(),
ok = timer:sleep(1500),
{ok, MilliSecs} = emqx_global_gc:run(),

View File

@ -217,14 +217,12 @@ t_connect_will_message(Config) ->
ok = emqtt:disconnect(Client4).
t_batch_subscribe(init, Config) ->
emqx_config:put_zone_conf(default, [acl, enable], true),
emqx_config:put_zone_conf(default, [acl, enable], true),
emqx_config:put_zone_conf(default, [authorization, enable], true),
ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end),
Config;
t_batch_subscribe('end', _Config) ->
emqx_config:put_zone_conf(default, [acl, enable], false),
emqx_config:put_zone_conf(default, [acl, enable], false),
emqx_config:put_zone_conf(default, [authorization, enable], false),
meck:unload(emqx_access_control).
t_batch_subscribe(Config) ->

View File

@ -25,8 +25,8 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_config:put([sysmon, os], #{
cpu_check_interval => 60,cpu_high_watermark => 0.8,
cpu_low_watermark => 0.6,mem_check_interval => 60,
cpu_check_interval => 60000,cpu_high_watermark => 0.8,
cpu_low_watermark => 0.6,mem_check_interval => 60000,
procmem_high_watermark => 0.05,sysmem_high_watermark => 0.7}),
application:ensure_all_started(os_mon),
Config.
@ -38,11 +38,11 @@ t_api(_) ->
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
{ok, _} = emqx_os_mon:start_link(),
?assertEqual(60, emqx_os_mon:get_mem_check_interval()),
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30)),
?assertEqual(60, emqx_os_mon:get_mem_check_interval()),
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(122)),
?assertEqual(120, emqx_os_mon:get_mem_check_interval()),
?assertEqual(60000, emqx_os_mon:get_mem_check_interval()),
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30000)),
?assertEqual(60000, emqx_os_mon:get_mem_check_interval()),
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(122000)),
?assertEqual(120000, emqx_os_mon:get_mem_check_interval()),
?assertEqual(70, emqx_os_mon:get_sysmem_high_watermark()),
?assertEqual(ok, emqx_os_mon:set_sysmem_high_watermark(0.8)),

View File

@ -59,11 +59,11 @@ t_session_init(_) ->
?assertEqual(0, emqx_session:info(inflight_cnt, Session)),
?assertEqual(64, emqx_session:info(inflight_max, Session)),
?assertEqual(1, emqx_session:info(next_pkt_id, Session)),
?assertEqual(30, emqx_session:info(retry_interval, Session)),
?assertEqual(30000, emqx_session:info(retry_interval, Session)),
?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))),
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
?assertEqual(300, emqx_session:info(await_rel_timeout, Session)),
?assertEqual(300000, emqx_session:info(await_rel_timeout, Session)),
?assert(is_integer(emqx_session:info(created_at, Session))).
%%--------------------------------------------------------------------
@ -73,8 +73,8 @@ t_session_init(_) ->
t_session_info(_) ->
?assertMatch(#{subscriptions := #{},
upgrade_qos := false,
retry_interval := 30,
await_rel_timeout := 300
retry_interval := 30000,
await_rel_timeout := 300000
}, emqx_session:info(session())).
t_session_stats(_) ->
@ -309,9 +309,11 @@ t_enqueue(_) ->
t_retry(_) ->
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
Session = session(#{retry_interval => 100}),
RetryIntervalMs = 100, %% 0.1s
Session = session(#{retry_interval => RetryIntervalMs}),
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
ok = timer:sleep(200),
ElapseMs = 200, %% 0.2s
ok = timer:sleep(ElapseMs),
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
{ok, Msgs1, 100, Session2} = emqx_session:retry(Session1),
?assertEqual(2, emqx_session:info(inflight_cnt, Session2)).
@ -342,7 +344,7 @@ t_replay(_) ->
t_expire_awaiting_rel(_) ->
{ok, Session} = emqx_session:expire(awaiting_rel, session()),
Timeout = emqx_session:info(await_rel_timeout, Session) * 1000,
Timeout = emqx_session:info(await_rel_timeout, Session),
Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session),
{ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1),
?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)).

View File

@ -456,7 +456,7 @@ parse_topic_filters(TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) ->
(not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]).
(not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [authorization, enable]).
%%--------------------------------------------------------------------
%% Ensure & Hooks

View File

@ -1391,9 +1391,9 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{session = Session}) ->
timer:seconds(emqx_session:info(retry_interval, Session));
emqx_session:info(retry_interval, Session);
interval(await_timer, #channel{session = Session}) ->
timer:seconds(emqx_session:info(await_rel_timeout, Session)).
emqx_session:info(await_rel_timeout, Session).
%%--------------------------------------------------------------------
%% Helper functions

View File

@ -85,7 +85,10 @@
%% Listeners
-export([ list_listeners/0
, list_listeners/1
, restart_listener/2
, list_listeners/2
, list_listeners_by_id/1
, get_listener/2
, manage_listener/2
]).
%% Alarms
@ -451,37 +454,39 @@ reload_plugin(Node, Plugin) ->
%%--------------------------------------------------------------------
list_listeners() ->
[{Node, list_listeners(Node)} || Node <- ekka_mnesia:running_nodes()].
lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]).
list_listeners(Node, Identifier) ->
listener_id_filter(Identifier, list_listeners(Node)).
list_listeners(Node) when Node =:= node() ->
Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) ->
#{protocol => Protocol,
listen_on => ListenOn,
identifier => Protocol,
acceptors => esockd:get_acceptors({Protocol, ListenOn}),
max_conns => esockd:get_max_connections({Protocol, ListenOn}),
current_conns => esockd:get_current_connections({Protocol, ListenOn}),
shutdown_count => esockd:get_shutdown_count({Protocol, ListenOn})}
end, esockd:listeners()),
Http = lists:map(fun({Protocol, Opts}) ->
#{protocol => Protocol,
listen_on => proplists:get_value(port, Opts),
acceptors => maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0),
max_conns => proplists:get_value(max_connections, Opts),
current_conns => proplists:get_value(all_connections, Opts),
shutdown_count => []}
end, ranch:info()),
Tcp ++ Http;
[{Id, maps:put(node, Node, Conf)} || {Id, Conf} <- emqx_listeners:list()];
list_listeners(Node) ->
rpc_call(Node, list_listeners, [Node]).
-spec restart_listener(node(), atom()) -> ok | {error, term()}.
restart_listener(Node, Identifier) when Node =:= node() ->
emqx_listeners:restart_listener(Identifier);
list_listeners_by_id(Identifier) ->
listener_id_filter(Identifier, list_listeners()).
restart_listener(Node, Identifier) ->
rpc_call(Node, restart_listener, [Node, Identifier]).
get_listener(Node, Identifier) ->
case listener_id_filter(Identifier, list_listeners(Node)) of
[] ->
{error, not_found};
[Listener] ->
Listener
end.
listener_id_filter(Identifier, Listeners) ->
Filter =
fun({Id, _}) -> Id =:= Identifier end,
lists:filter(Filter, Listeners).
-spec manage_listener(Operation :: start_listener|stop_listener|restart_listener, Param :: map()) ->
ok | {error, Reason :: term()}.
manage_listener(Operation, #{identifier := Identifier, node := Node}) when Node =:= node()->
erlang:apply(emqx_listeners, Operation, [Identifier]);
manage_listener(Operation, Param = #{node := Node}) ->
rpc_call(Node, restart_listener, [Operation, Param]).
%%--------------------------------------------------------------------
%% Get Alarms
@ -542,7 +547,7 @@ item(route, {Topic, Node}) ->
#{topic => Topic, node => Node}.
%%--------------------------------------------------------------------
%% Internel Functions.
%% Internal Functions.
%%--------------------------------------------------------------------
rpc_call(Node, Fun, Args) ->

View File

@ -65,10 +65,10 @@ count(Table, Nodes) ->
lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]).
page(Params) ->
binary_to_integer(proplists:get_value(<<"_page">>, Params, <<"1">>)).
binary_to_integer(proplists:get_value(<<"page">>, Params, <<"1">>)).
limit(Params) ->
case proplists:get_value(<<"_limit">>, Params) of
case proplists:get_value(<<"limit">>, Params) of
undefined -> emqx_mgmt:max_row_limit();
Size -> binary_to_integer(Size)
end.
@ -204,7 +204,7 @@ params2qs(Params, QsSchema) ->
{length(Qs) + length(Fuzzy), {Qs, Fuzzy}}.
%%--------------------------------------------------------------------
%% Intenal funcs
%% Internal funcs
pick_params_to_qs([], _, Acc1, Acc2) ->
NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)],
@ -215,12 +215,12 @@ pick_params_to_qs([{Key, Value}|Params], QsKits, Acc1, Acc2) ->
undefined -> pick_params_to_qs(Params, QsKits, Acc1, Acc2);
Type ->
case Key of
<<Prefix:5/binary, NKey/binary>>
when Prefix =:= <<"_gte_">>;
Prefix =:= <<"_lte_">> ->
<<Prefix:4/binary, NKey/binary>>
when Prefix =:= <<"gte_">>;
Prefix =:= <<"lte_">> ->
OpposeKey = case Prefix of
<<"_gte_">> -> <<"_lte_", NKey/binary>>;
<<"_lte_">> -> <<"_gte_", NKey/binary>>
<<"gte_">> -> <<"lte_", NKey/binary>>;
<<"lte_">> -> <<"gte_", NKey/binary>>
end,
case lists:keytake(OpposeKey, 1, Params) of
false ->
@ -252,20 +252,20 @@ qs(K, Value0, Type) ->
throw({bad_value_type, {K, Type, Value0}})
end.
qs(<<"_gte_", Key/binary>>, Value) ->
qs(<<"gte_", Key/binary>>, Value) ->
{binary_to_existing_atom(Key, utf8), '>=', Value};
qs(<<"_lte_", Key/binary>>, Value) ->
qs(<<"lte_", Key/binary>>, Value) ->
{binary_to_existing_atom(Key, utf8), '=<', Value};
qs(<<"_like_", Key/binary>>, Value) ->
qs(<<"like_", Key/binary>>, Value) ->
{binary_to_existing_atom(Key, utf8), like, Value};
qs(<<"_match_", Key/binary>>, Value) ->
qs(<<"match_", Key/binary>>, Value) ->
{binary_to_existing_atom(Key, utf8), match, Value};
qs(Key, Value) ->
{binary_to_existing_atom(Key, utf8), '=:=', Value}.
is_fuzzy_key(<<"_like_", _/binary>>) ->
is_fuzzy_key(<<"like_", _/binary>>) ->
true;
is_fuzzy_key(<<"_match_", _/binary>>) ->
is_fuzzy_key(<<"match_", _/binary>>) ->
true;
is_fuzzy_key(_) ->
false.
@ -317,18 +317,18 @@ params2qs_test() ->
{<<"int">>, integer},
{<<"atom">>, atom},
{<<"ts">>, timestamp},
{<<"_gte_range">>, integer},
{<<"_lte_range">>, integer},
{<<"_like_fuzzy">>, binary},
{<<"_match_topic">>, binary}],
{<<"gte_range">>, integer},
{<<"lte_range">>, integer},
{<<"like_fuzzy">>, binary},
{<<"match_topic">>, binary}],
Params = [{<<"str">>, <<"abc">>},
{<<"int">>, <<"123">>},
{<<"atom">>, <<"connected">>},
{<<"ts">>, <<"156000">>},
{<<"_gte_range">>, <<"1">>},
{<<"_lte_range">>, <<"5">>},
{<<"_like_fuzzy">>, <<"user">>},
{<<"_match_topic">>, <<"t/#">>}],
{<<"gte_range">>, <<"1">>},
{<<"lte_range">>, <<"5">>},
{<<"like_fuzzy">>, <<"user">>},
{<<"match_topic">>, <<"t/#">>}],
ExpectedQs = [{str, '=:=', <<"abc">>},
{int, '=:=', 123},
{atom, '=:=', connected},

View File

@ -16,122 +16,119 @@
-module(emqx_mgmt_api_alarms).
-include("emqx_mgmt.hrl").
-behaviour(minirest_api).
-include_lib("emqx/include/emqx.hrl").
-export([api_spec/0]).
-rest_api(#{name => list_all_alarms,
method => 'GET',
path => "/alarms",
func => list,
descr => "List all alarms in the cluster"}).
-export([alarms/2]).
-rest_api(#{name => list_node_alarms,
method => 'GET',
path => "nodes/:atom:node/alarms",
func => list,
descr => "List all alarms on a node"}).
-export([ query_activated/3
, query_deactivated/3]).
%% notice: from emqx_alarms
-define(ACTIVATED_ALARM, emqx_activated_alarm).
-define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
-rest_api(#{name => list_all_activated_alarms,
method => 'GET',
path => "/alarms/activated",
func => list_activated,
descr => "List all activated alarm in the cluster"}).
api_spec() ->
{[alarms_api()], [alarm_schema()]}.
-rest_api(#{name => list_node_activated_alarms,
method => 'GET',
path => "nodes/:atom:node/alarms/activated",
func => list_activated,
descr => "List all activated alarm on a node"}).
alarm_schema() ->
#{
alarm => #{
type => object,
properties => #{
node => #{
type => string,
description => <<"Alarm in node">>},
name => #{
type => string,
description => <<"Alarm name">>},
message => #{
type => string,
description => <<"Alarm readable information">>},
details => #{
type => object,
description => <<"Alarm detail">>},
duration => #{
type => integer,
description => <<"Alarms duration time; UNIX time stamp">>}
}
}
}.
-rest_api(#{name => list_all_deactivated_alarms,
method => 'GET',
path => "/alarms/deactivated",
func => list_deactivated,
descr => "List all deactivated alarm in the cluster"}).
alarms_api() ->
Metadata = #{
get => #{
description => <<"EMQ X alarms">>,
parameters => [#{
name => activated,
in => query,
description => <<"All alarms, if not specified">>,
required => false,
schema => #{type => boolean, default => true}
}],
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"List all alarms">>, alarm)}},
delete => #{
description => <<"Remove all deactivated alarms">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Remove all deactivated alarms ok">>)}}},
{"/alarms", Metadata, alarms}.
-rest_api(#{name => list_node_deactivated_alarms,
method => 'GET',
path => "nodes/:atom:node/alarms/deactivated",
func => list_deactivated,
descr => "List all deactivated alarm on a node"}).
%%%==============================================================================================
%% parameters trans
alarms(get, Request) ->
case proplists:get_value(<<"activated">>, cowboy_req:parse_qs(Request), undefined) of
undefined ->
list(#{activated => undefined});
<<"true">> ->
list(#{activated => true});
<<"false">> ->
list(#{activated => false})
end;
-rest_api(#{name => deactivate_alarm,
method => 'POST',
path => "/alarms/deactivated",
func => deactivate,
descr => "Delete the special alarm on a node"}).
alarms(delete, _Request) ->
delete().
-rest_api(#{name => delete_all_deactivated_alarms,
method => 'DELETE',
path => "/alarms/deactivated",
func => delete_deactivated,
descr => "Delete all deactivated alarm in the cluster"}).
%%%==============================================================================================
%% api apply
list(#{activated := true}) ->
do_list(activated);
list(#{activated := false}) ->
do_list(deactivated);
list(#{activated := undefined}) ->
do_list(activated).
-rest_api(#{name => delete_node_deactivated_alarms,
method => 'DELETE',
path => "nodes/:atom:node/alarms/deactivated",
func => delete_deactivated,
descr => "Delete all deactivated alarm on a node"}).
-export([ list/2
, deactivate/2
, list_activated/2
, list_deactivated/2
, delete_deactivated/2
]).
list(Bindings, _Params) when map_size(Bindings) == 0 ->
{ok, #{code => ?SUCCESS,
data => [#{node => Node, alarms => Alarms} || {Node, Alarms} <- emqx_mgmt:get_alarms(all)]}};
list(#{node := Node}, _Params) ->
{ok, #{code => ?SUCCESS,
data => emqx_mgmt:get_alarms(Node, all)}}.
list_activated(Bindings, _Params) when map_size(Bindings) == 0 ->
{ok, #{code => ?SUCCESS,
data => [#{node => Node, alarms => Alarms} || {Node, Alarms} <- emqx_mgmt:get_alarms(activated)]}};
list_activated(#{node := Node}, _Params) ->
{ok, #{code => ?SUCCESS,
data => emqx_mgmt:get_alarms(Node, activated)}}.
list_deactivated(Bindings, _Params) when map_size(Bindings) == 0 ->
{ok, #{code => ?SUCCESS,
data => [#{node => Node, alarms => Alarms} || {Node, Alarms} <- emqx_mgmt:get_alarms(deactivated)]}};
list_deactivated(#{node := Node}, _Params) ->
{ok, #{code => ?SUCCESS,
data => emqx_mgmt:get_alarms(Node, deactivated)}}.
deactivate(_Bindings, Params) ->
Node = get_node(Params),
Name = get_name(Params),
do_deactivate(Node, Name).
delete_deactivated(Bindings, _Params) when map_size(Bindings) == 0 ->
delete() ->
_ = emqx_mgmt:delete_all_deactivated_alarms(),
{ok, #{code => ?SUCCESS}};
{200}.
delete_deactivated(#{node := Node}, _Params) ->
emqx_mgmt:delete_all_deactivated_alarms(Node),
{ok, #{code => ?SUCCESS}}.
%%%==============================================================================================
%% internal
do_list(Type) ->
{Table, Function} =
case Type of
activated ->
{?ACTIVATED_ALARM, query_activated};
deactivated ->
{?DEACTIVATED_ALARM, query_deactivated}
end,
Response = emqx_mgmt_api:cluster_query([], {Table, []}, {?MODULE, Function}),
{200, Response}.
get_node(Params) ->
binary_to_atom(proplists:get_value(<<"node">>, Params, undefined), utf8).
query_activated(_, Start, Limit) ->
query(?ACTIVATED_ALARM, Start, Limit).
get_name(Params) ->
binary_to_atom(proplists:get_value(<<"name">>, Params, undefined), utf8).
query_deactivated(_, Start, Limit) ->
query(?DEACTIVATED_ALARM, Start, Limit).
do_deactivate(undefined, _) ->
emqx_mgmt:return({error, missing_param});
do_deactivate(_, undefined) ->
emqx_mgmt:return({error, missing_param});
do_deactivate(Node, Name) ->
case emqx_mgmt:deactivate(Node, Name) of
ok ->
emqx_mgmt:return();
{error, Reason} ->
emqx_mgmt:return({error, Reason})
end.
query(Table, Start, Limit) ->
Ms = [{'$1',[],['$1']}],
emqx_mgmt_api:select_table(Table, Ms, Start, Limit, fun format_alarm/1).
format_alarm(Alarms) when is_list(Alarms) ->
[emqx_alarm:format(Alarm) || Alarm <- Alarms];
format_alarm(Alarm) ->
emqx_alarm:format(Alarm).

View File

@ -16,7 +16,7 @@
-module(emqx_mgmt_api_apps).
-behavior(minirest_api).
-behaviour(minirest_api).
-export([api_spec/0]).
@ -76,17 +76,17 @@ app_without_secret_schema() ->
apps_api() ->
Metadata = #{
get => #{
description => "List EMQ X apps",
description => <<"List EMQ X apps">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"All apps">>,
app_without_secret_schema())}},
post => #{
description => "EMQ X create apps",
description => <<"EMQ X create apps">>,
'requestBody' => emqx_mgmt_util:request_body_schema(<<"app">>),
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Create apps">>, <<"app_secret">>),
emqx_mgmt_util:response_schema(<<"Create apps">>, app_secret),
<<"400">> =>
emqx_mgmt_util:response_error_schema(<<"App ID already exist">>, [?BAD_APP_ID])}}},
{"/apps", Metadata, apps}.
@ -94,36 +94,34 @@ apps_api() ->
app_api() ->
Metadata = #{
get => #{
description => "EMQ X apps",
description => <<"EMQ X apps">>,
parameters => [#{
name => app_id,
in => path,
required => true,
schema => #{type => string},
example => <<"admin">>}],
schema => #{type => string}}],
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"App id not found">>),
<<"200">> =>
emqx_mgmt_util:response_schema("Get App", app_without_secret_schema())}},
emqx_mgmt_util:response_schema(<<"Get App">>, app_without_secret_schema())}},
delete => #{
description => "EMQ X apps",
description => <<"EMQ X apps">>,
parameters => [#{
name => app_id,
in => path,
required => true,
schema => #{type => string},
example => <<"admin">>}],
schema => #{type => string}
}],
responses => #{
<<"200">> => emqx_mgmt_util:response_schema("Remove app ok")}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Remove app ok">>)}},
put => #{
description => "EMQ X update apps",
description => <<"EMQ X update apps">>,
parameters => [#{
name => app_id,
in => path,
required => true,
schema => #{type => string},
default => <<"admin">>
schema => #{type => string}
}],
'requestBody' => emqx_mgmt_util:request_body_schema(app_without_secret_schema()),
responses => #{
@ -176,23 +174,19 @@ app(put, Request) ->
%%%==============================================================================================
%% api apply
list(_) ->
Data = [format_without_app_secret(Apps) || Apps <- emqx_mgmt_auth:list_apps()],
Response = emqx_json:encode(Data),
{200, Response}.
{200, [format_without_app_secret(Apps) || Apps <- emqx_mgmt_auth:list_apps()]}.
create(#{app_id := AppID, name := Name, secret := Secret,
desc := Desc, status := Status, expired := Expired}) ->
case emqx_mgmt_auth:add_app(AppID, Name, Secret, Desc, Status, Expired) of
{ok, AppSecret} ->
Response = emqx_json:encode(#{secret => AppSecret}),
{200, Response};
{200, #{secret => AppSecret}};
{error, alread_existed} ->
Message = list_to_binary(io_lib:format("appid ~p already existed", [AppID])),
{400, #{code => 'BAD_APP_ID', reason => Message}};
{400, #{code => 'BAD_APP_ID', message => Message}};
{error, Reason} ->
Data = #{code => 'UNKNOW_ERROR',
reason => list_to_binary(io_lib:format("~p", [Reason]))},
Response = emqx_json:encode(Data),
Response = #{code => 'UNKNOW_ERROR',
message => list_to_binary(io_lib:format("~p", [Reason]))},
{500, Response}
end.
@ -201,8 +195,7 @@ lookup(#{app_id := AppID}) ->
undefined ->
{404, ?APP_ID_NOT_FOUND};
App ->
Data = format_with_app_secret(App),
Response = emqx_json:encode(Data),
Response = format_with_app_secret(App),
{200, Response}
end.
@ -217,8 +210,7 @@ update(App = #{app_id := AppID, name := Name, desc := Desc, status := Status, ex
{error, not_found} ->
{404, ?APP_ID_NOT_FOUND};
{error, Reason} ->
Data = #{code => 'UNKNOW_ERROR', reason => list_to_binary(io_lib:format("~p", [Reason]))},
Response = emqx_json:encode(Data),
Response = #{code => 'UNKNOW_ERROR', message => list_to_binary(io_lib:format("~p", [Reason]))},
{500, Response}
end.

View File

@ -16,7 +16,7 @@
-module(emqx_mgmt_api_clients).
-behavior(minirest_api).
-behaviour(minirest_api).
-include_lib("emqx/include/emqx.hrl").
@ -48,12 +48,12 @@
, {<<"clean_start">>, atom}
, {<<"proto_name">>, binary}
, {<<"proto_ver">>, integer}
, {<<"_like_clientid">>, binary}
, {<<"_like_username">>, binary}
, {<<"_gte_created_at">>, timestamp}
, {<<"_lte_created_at">>, timestamp}
, {<<"_gte_connected_at">>, timestamp}
, {<<"_lte_connected_at">>, timestamp}]}).
, {<<"like_clientid">>, binary}
, {<<"like_username">>, binary}
, {<<"gte_created_at">>, timestamp}
, {<<"lte_created_at">>, timestamp}
, {<<"gte_connected_at">>, timestamp}
, {<<"lte_connected_at">>, timestamp}]}).
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format_channel_info}).
@ -214,112 +214,106 @@ schemas() ->
clients_api() ->
Metadata = #{
get => #{
description => "List clients",
description => <<"List clients">>,
responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"List clients 200 OK">>, <<"client">>)}}},
<<"200">> => emqx_mgmt_util:response_array_schema(<<"List clients 200 OK">>, client)}}},
{"/clients", Metadata, clients}.
client_api() ->
Metadata = #{
get => #{
description => "Get clients info by client ID",
description => <<"Get clients info by client ID">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true,
example => 123456}],
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, <<"client">>)}},
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, client)}},
delete => #{
description => "Kick out client by client ID",
description => <<"Kick out client by client ID">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true,
example => 123456}],
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, <<"client">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, client)}}},
{"/clients/:clientid", Metadata, client}.
clients_acl_cache_api() ->
Metadata = #{
get => #{
description => "Get client acl cache",
description => <<"Get client acl cache">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true,
example => 123456}],
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, <<"acl_cache">>)}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Get client acl cache">>, acl_cache)}},
delete => #{
description => "Clean client acl cache",
description => <<"Clean client acl cache">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true,
example => 123456}],
required => true
}],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients 200 OK">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients acl cache OK">>)}}},
{"/clients/:clientid/acl_cache", Metadata, acl_cache}.
subscribe_api() ->
Metadata = #{
post => #{
description => "subscribe",
parameters => [
#{
name => clientid,
in => path,
schema => #{type => string},
required => true,
example => 123456
}
],
description => <<"Subscribe">>,
parameters => [#{
name => clientid,
in => path,
schema => #{type => string},
required => true
}],
'requestBody' => emqx_mgmt_util:request_body_schema(#{
type => object,
properties => #{
<<"topic">> => #{
topic => #{
type => string,
example => <<"topic_1">>,
description => <<"Topic">>},
<<"qos">> => #{
qos => #{
type => integer,
enum => [0, 1, 2],
example => 0,
description => <<"QoS">>}}}),
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"subscribe ok">>)}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Subscribe ok">>)}},
delete => #{
description => "unsubscribe",
description => <<"Unsubscribe">>,
parameters => [
#{
name => clientid,
in => path,
schema => #{type => string},
required => true,
example => 123456
required => true
},
#{
name => topic,
in => query,
schema => #{type => string},
required => true,
example => <<"topic_1">>
required => true
}
],
responses => #{
<<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>),
<<"200">> => emqx_mgmt_util:response_schema(<<"unsubscribe ok">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Unsubscribe ok">>)}}},
{"/clients/:clientid/subscribe", Metadata, subscribe}.
%%%==============================================================================================
@ -373,17 +367,15 @@ subscribe_batch(post, Request) ->
%% api apply
list(Params) ->
Data = emqx_mgmt_api:cluster_query(maps:to_list(Params), ?CLIENT_QS_SCHEMA, ?query_fun),
Body = emqx_json:encode(Data),
{200, Body}.
Response = emqx_mgmt_api:cluster_query(maps:to_list(Params), ?CLIENT_QS_SCHEMA, ?query_fun),
{200, Response}.
lookup(#{clientid := ClientID}) ->
case emqx_mgmt:lookup_client({clientid, ClientID}, ?format_fun) of
[] ->
{404, ?CLIENT_ID_NOT_FOUND};
ClientInfo ->
Response = emqx_json:encode(hd(ClientInfo)),
{200, Response}
{200, hd(ClientInfo)}
end.
kickout(#{clientid := ClientID}) ->
@ -395,9 +387,10 @@ get_acl_cache(#{clientid := ClientID})->
{error, not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
{500, #{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}};
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
Caches ->
Response = emqx_json:encode([format_acl_cache(Cache) || Cache <- Caches]),
Response = [format_acl_cache(Cache) || Cache <- Caches],
{200, Response}
end.
@ -408,7 +401,8 @@ clean_acl_cache(#{clientid := ClientID}) ->
{error, not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
{500, #{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}}
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}}
end.
subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) ->
@ -416,8 +410,8 @@ subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) ->
{error, channel_not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
Body = emqx_json:encode(#{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}),
{500, Body};
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
ok ->
{200}
end.
@ -427,8 +421,8 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) ->
{error, channel_not_found} ->
{404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} ->
Body = emqx_json:encode(#{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}),
{500, Body};
Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
{unsubscribe, [{Topic, #{}}]} ->
{200}
end.

View File

@ -16,58 +16,319 @@
-module(emqx_mgmt_api_listeners).
-rest_api(#{name => list_listeners,
method => 'GET',
path => "/listeners/",
func => list,
descr => "A list of listeners in the cluster"}).
-behaviour(minirest_api).
-rest_api(#{name => list_node_listeners,
method => 'GET',
path => "/nodes/:atom:node/listeners",
func => list,
descr => "A list of listeners on the node"}).
-export([api_spec/0]).
-rest_api(#{name => restart_listener,
method => 'PUT',
path => "/listeners/:atom:identifier/restart",
func => restart,
descr => "Restart a listener in the cluster"}).
-export([ listeners/2
, listener/2
, node_listener/2
, node_listeners/2
, manage_listeners/2
, manage_nodes_listeners/2]).
-rest_api(#{name => restart_node_listener,
method => 'PUT',
path => "/nodes/:atom:node/listeners/:atom:identifier/restart",
func => restart,
descr => "Restart a listener on a node"}).
-export([format/1]).
-export([list/2, restart/2]).
-include_lib("emqx/include/emqx.hrl").
%% List listeners on a node.
list(#{node := Node}, _Params) ->
emqx_mgmt:return({ok, format(emqx_mgmt:list_listeners(Node))});
api_spec() ->
{
[
listeners_api(),
restart_listeners_api(),
nodes_listeners_api(),
nodes_listener_api(),
manage_listeners_api(),
manage_nodes_listeners_api()
],
[listener_schema()]
}.
listener_schema() ->
#{
listener => #{
type => object,
properties => #{
node => #{
type => string,
description => <<"Node">>,
example => node()},
identifier => #{
type => string,
description => <<"Identifier">>},
acceptors => #{
type => integer,
description => <<"Number of Acceptor proce">>},
max_conn => #{
type => integer,
description => <<"Maximum number of allowed connection">>},
type => #{
type => string,
description => <<"Plugin decription">>},
listen_on => #{
type => string,
description => <<"Litening port">>},
running => #{
type => boolean,
description => <<"Open or close">>},
auth => #{
type => boolean,
description => <<"Has auth">>}}}}.
listeners_api() ->
Metadata = #{
get => #{
description => <<"List listeners in cluster">>,
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"List all listeners">>, listener)}}},
{"/listeners", Metadata, listeners}.
restart_listeners_api() ->
Metadata = #{
get => #{
description => <<"List listeners by listener ID">>,
parameters => [param_path_identifier()],
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:response_array_schema(<<"List listener info ok">>, listener)}}},
{"/listeners/:identifier", Metadata, listener}.
manage_listeners_api() ->
Metadata = #{
get => #{
description => <<"Restart listeners in cluster">>,
parameters => [
param_path_identifier(),
param_path_operation()],
responses => #{
<<"500">> =>
emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
['BAD_LISTENER_ID']),
<<"400">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
['BAD_REQUEST']),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Operation success">>)}}},
{"/listeners/:identifier/:operation", Metadata, manage_listeners}.
manage_nodes_listeners_api() ->
Metadata = #{
get => #{
description => <<"Restart listeners in cluster">>,
parameters => [
param_path_node(),
param_path_identifier(),
param_path_operation()],
responses => #{
<<"500">> =>
emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Bad node or Listener id not found">>,
['BAD_NODE_NAME','BAD_LISTENER_ID']),
<<"400">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>,
['BAD_REQUEST']),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Operation success">>)}}},
{"/node/:node/listeners/:identifier/:operation", Metadata, manage_nodes_listeners}.
nodes_listeners_api() ->
Metadata = #{
get => #{
description => <<"Get listener info in one node">>,
parameters => [param_path_node(), param_path_identifier()],
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Node name or listener id not found">>,
['BAD_NODE_NAME', 'BAD_LISTENER_ID']),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Get listener info ok">>, listener)}}},
{"/nodes/:node/listeners/:identifier", Metadata, node_listener}.
nodes_listener_api() ->
Metadata = #{
get => #{
description => <<"List listeners in one node">>,
parameters => [param_path_node()],
responses => #{
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Listener id not found">>),
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Get listener info ok">>, listener)}}},
{"/nodes/:node/listeners", Metadata, node_listeners}.
%%%==============================================================================================
%% parameters
param_path_node() ->
#{
name => node,
in => path,
schema => #{type => string},
required => true,
example => node()
}.
param_path_identifier() ->
{Example,_} = hd(emqx_mgmt:list_listeners(node())),
#{
name => identifier,
in => path,
schema => #{type => string},
required => true,
example => Example
}.
param_path_operation()->
#{
name => operation,
in => path,
required => true,
schema => #{
type => string,
enum => [start, stop, restart]},
example => restart
}.
%%%==============================================================================================
%% api
listeners(get, _Request) ->
list().
listener(get, Request) ->
ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)),
get_listeners(#{identifier => ListenerID}).
node_listeners(get, Request) ->
Node = binary_to_atom(cowboy_req:binding(node, Request)),
get_listeners(#{node => Node}).
node_listener(get, Request) ->
Node = binary_to_atom(cowboy_req:binding(node, Request)),
ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)),
get_listeners(#{node => Node, identifier => ListenerID}).
manage_listeners(_, Request) ->
Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)),
Operation = binary_to_atom(cowboy_req:binding(operation, Request)),
manage(Operation, #{identifier => Identifier}).
manage_nodes_listeners(_, Request) ->
Node = binary_to_atom(cowboy_req:binding(node, Request)),
Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)),
Operation = binary_to_atom(cowboy_req:binding(operation, Request)),
manage(Operation, #{identifier => Identifier, node => Node}).
%%%==============================================================================================
%% List listeners in the cluster.
list(_Binding, _Params) ->
emqx_mgmt:return({ok, [#{node => Node, listeners => format(Listeners)}
|| {Node, Listeners} <- emqx_mgmt:list_listeners()]}).
list() ->
{200, format(emqx_mgmt:list_listeners())}.
%% Restart listeners on a node.
restart(#{node := Node, identifier := Identifier}, _Params) ->
case emqx_mgmt:restart_listener(Node, Identifier) of
ok -> emqx_mgmt:return({ok, "Listener restarted."});
{error, Error} -> emqx_mgmt:return({error, Error})
end;
%% Restart listeners on all nodes in the cluster.
restart(#{identifier := Identifier}, _Params) ->
Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()],
case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of
[] -> emqx_mgmt:return(ok);
Errors -> emqx_mgmt:return({error, {restart, Errors}})
get_listeners(Param) ->
case list_listener(Param) of
{error, not_found} ->
Identifier = maps:get(identifier, Param),
Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
{error, nodedown} ->
Node = maps:get(node, Param),
Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Response = #{code => 'BAD_NODE_NAME', message => Reason},
{404, Response};
[] ->
Identifier = maps:get(identifier, Param),
Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
Data ->
{200, Data}
end.
manage(Operation0, Param) ->
OperationMap = #{start => start_listener, stop => stop_listener, restart => restart_listener},
Operation = maps:get(Operation0, OperationMap),
case list_listener(Param) of
{error, not_found} ->
Identifier = maps:get(identifier, Param),
Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])),
{404, #{code => 'BAD_LISTENER_ID', message => Reason}};
{error, nodedown} ->
Node = maps:get(node, Param),
Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])),
Response = #{code => 'BAD_NODE_NAME', message => Reason},
{404, Response};
[] ->
Identifier = maps:get(identifier, Param),
Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])),
{404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}};
ListenersOrSingleListener ->
manage_(Operation, ListenersOrSingleListener)
end.
manage_(Operation, Listener) when is_map(Listener) ->
manage_(Operation, [Listener]);
manage_(Operation, Listeners) when is_list(Listeners) ->
Results = [emqx_mgmt:manage_listener(Operation, Listener) || Listener <- Listeners],
case lists:filter(fun(Result) -> Result =/= ok end, Results) of
[] ->
{200};
Errors ->
case lists:filter(fun({error, {already_started, _}}) -> false; (_) -> true end, Results) of
[] ->
Identifier = maps:get(identifier, hd(Listeners)),
Message = list_to_binary(io_lib:format("Already Started: ~s", [Identifier])),
{400, #{code => 'BAD_REQUEST', message => Message}};
_ ->
case lists:filter(fun({error,not_found}) -> false; (_) -> true end, Results) of
[] ->
Identifier = maps:get(identifier, hd(Listeners)),
Message = list_to_binary(io_lib:format("Already Stoped: ~s", [Identifier])),
{400, #{code => 'BAD_REQUEST', message => Message}};
_ ->
Reason = list_to_binary(io_lib:format("~p", [Errors])),
{500, #{code => 'UNKNOW_ERROR', message => Reason}}
end
end
end.
%%%==============================================================================================
%% util function
list_listener(Params) ->
format(list_listener_(Params)).
list_listener_(#{node := Node, identifier := Identifier}) ->
emqx_mgmt:get_listener(Node, Identifier);
list_listener_(#{identifier := Identifier}) ->
emqx_mgmt:list_listeners_by_id(Identifier);
list_listener_(#{node := Node}) ->
emqx_mgmt:list_listeners(Node);
list_listener_(#{}) ->
emqx_mgmt:list_listeners().
format(Listeners) when is_list(Listeners) ->
[ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))}
|| Info = #{listen_on := ListenOn} <- Listeners ];
[format(Listener) || Listener <- Listeners];
format({error, Reason}) -> [{error, Reason}].
format({error, Reason}) ->
{error, Reason};
format({Identifier, Conf}) ->
#{
identifier => Identifier,
node => maps:get(node, Conf),
acceptors => maps:get(acceptors, Conf),
max_conn => maps:get(max_connections, Conf),
type => maps:get(type, Conf),
listen_on => list_to_binary(esockd:to_string(maps:get(bind, Conf))),
running => trans_running(Conf),
auth => maps:get(enable, maps:get(auth, Conf))
}.
trans_running(Conf) ->
case maps:get(running, Conf) of
{error, _} ->
false;
Running ->
Running
end.

View File

@ -16,7 +16,7 @@
-module(emqx_mgmt_api_metrics).
-behavior(minirest_api).
-behaviour(minirest_api).
-export([api_spec/0]).
@ -283,13 +283,12 @@ metrics_schema() ->
metrics_api() ->
Metadata = #{
get => #{
description => "EMQ X metrics",
description => <<"EMQ X metrics">>,
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"List all metrics">>, <<"metrics">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"List all metrics">>, metrics)}}},
{"/metrics", Metadata, list}.
%%%==============================================================================================
%% api apply
list(get, _) ->
Response = emqx_json:encode(emqx_mgmt:get_metrics()),
{200, Response}.
{200, emqx_mgmt:get_metrics()}.

View File

@ -15,7 +15,7 @@
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_nodes).
-behavior(minirest_api).
-behaviour(minirest_api).
-export([api_spec/0]).
@ -103,15 +103,15 @@ node_schema() ->
nodes_api() ->
Metadata = #{
get => #{
description => "List EMQ X nodes",
description => <<"List EMQ X nodes">>,
responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"List EMQ X Nodes">>, <<"node">>)}}},
<<"200">> => emqx_mgmt_util:response_array_schema(<<"List EMQ X Nodes">>, node)}}},
{"/nodes", Metadata, nodes}.
node_api() ->
Metadata = #{
get => #{
description => "Get node info",
description => <<"Get node info">>,
parameters => [#{
name => node_name,
in => path,
@ -121,13 +121,13 @@ node_api() ->
example => node()}],
responses => #{
<<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Nodes info by name">>, <<"node">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Nodes info by name">>, node)}}},
{"/nodes/:node_name", Metadata, node}.
node_metrics_api() ->
Metadata = #{
get => #{
description => "Get node metrics",
description => <<"Get node metrics">>,
parameters => [#{
name => node_name,
in => path,
@ -137,13 +137,13 @@ node_metrics_api() ->
example => node()}],
responses => #{
<<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Metrics">>, <<"metrics">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Metrics">>, metrics)}}},
{"/nodes/:node_name/metrics", Metadata, node_metrics}.
node_stats_api() ->
Metadata = #{
get => #{
description => "Get node stats",
description => <<"Get node stats">>,
parameters => [#{
name => node_name,
in => path,
@ -153,7 +153,7 @@ node_stats_api() ->
example => node()}],
responses => #{
<<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']),
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Stats">>, <<"stats">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Stats">>, stats)}}},
{"/nodes/:node_name/stats", Metadata, node_metrics}.
%%%==============================================================================================
@ -177,32 +177,30 @@ node_stats(get, Request) ->
%% api apply
list(#{}) ->
NodesInfo = [format(Node, NodeInfo) || {Node, NodeInfo} <- emqx_mgmt:list_nodes()],
Response = emqx_json:encode(NodesInfo),
{200, Response}.
{200, NodesInfo}.
get_node(#{node := Node}) ->
case emqx_mgmt:lookup_node(Node) of
#{node_status := 'ERROR'} ->
{400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})};
{400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}};
NodeInfo ->
Response = emqx_json:encode(format(Node, NodeInfo)),
{200, Response}
{200, format(Node, NodeInfo)}
end.
get_metrics(#{node := Node}) ->
case emqx_mgmt:get_metrics(Node) of
{error, _} ->
{400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})};
{400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}};
Metrics ->
{200, emqx_json:encode(Metrics)}
{200, Metrics}
end.
get_stats(#{node := Node}) ->
case emqx_mgmt:get_stats(Node) of
{error, _} ->
{400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})};
{400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}};
Stats ->
{200, emqx_json:encode(Stats)}
{200, Stats}
end.
%%============================================================================================================

View File

@ -17,7 +17,7 @@
%% API
-include_lib("emqx/include/emqx.hrl").
-behavior(minirest_api).
-behaviour(minirest_api).
-export([api_spec/0]).
@ -33,7 +33,7 @@ api_spec() ->
publish_api() ->
MeteData = #{
post => #{
description => "publish",
description => <<"Publish">>,
'requestBody' => #{
content => #{
'application/json' => #{
@ -41,13 +41,13 @@ publish_api() ->
type => object,
properties => maps:with([id], message_properties())}}}},
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"publish ok">>, <<"message">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"publish ok">>, message)}}},
{"/publish", MeteData, publish}.
publish_batch_api() ->
MeteData = #{
post => #{
description => "publish",
description => <<"publish">>,
'requestBody' => #{
content => #{
'application/json' => #{
@ -57,8 +57,8 @@ publish_batch_api() ->
type => object,
properties => maps:with([id], message_properties())}}}}},
responses => #{
<<"200">> => emqx_mgmt_util:response_array_schema(<<"publish ok">>, <<"message">>)}}},
{"/publish_batch", MeteData, publish_batch}.
<<"200">> => emqx_mgmt_util:response_array_schema(<<"publish ok">>, message)}}},
{"/publish/bulk", MeteData, publish_batch}.
message_schema() ->
#{
@ -110,14 +110,13 @@ publish(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Message = message(emqx_json:decode(Body, [return_maps])),
_ = emqx_mgmt:publish(Message),
{200, emqx_json:encode(format_message(Message))}.
{200, format_message(Message)}.
publish_batch(post, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Messages = messages(emqx_json:decode(Body, [return_maps])),
_ = [emqx_mgmt:publish(Message) || Message <- Messages],
ResponseBody = emqx_json:encode(format_message(Messages)),
{200, ResponseBody}.
{200, format_message(Messages)}.
message(Map) ->
From = maps:get(<<"from">>, Map, http_api),

View File

@ -19,7 +19,7 @@
-include_lib("emqx/include/emqx.hrl").
%% API
-behavior(minirest_api).
-behaviour(minirest_api).
-export([api_spec/0]).
@ -48,31 +48,29 @@ route_schema() ->
routes_api() ->
Metadata = #{
get => #{
description => "EMQ X routes",
description => <<"EMQ X routes">>,
parameters => [
#{
name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer},
default => 1
schema => #{type => integer, default => 1}
},
#{
name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer},
default => emqx_mgmt:max_row_limit()
schema => #{type => integer, default => emqx_mgmt:max_row_limit()}
}],
responses => #{
<<"200">> =>
emqx_mgmt_util:response_array_schema("List route info", <<"route">>)}}},
emqx_mgmt_util:response_array_schema("List route info", route)}}},
{"/routes", Metadata, routes}.
route_api() ->
Metadata = #{
get => #{
description => "EMQ X routes",
description => <<"EMQ X routes">>,
parameters => [#{
name => topic,
in => path,
@ -82,7 +80,7 @@ route_api() ->
}],
responses => #{
<<"200">> =>
emqx_mgmt_util:response_schema(<<"Route info">>, <<"route">>),
emqx_mgmt_util:response_schema(<<"Route info">>, route),
<<"404">> =>
emqx_mgmt_util:response_error_schema(<<"Topic not found">>, [?TOPIC_NOT_FOUND])
}}},
@ -101,20 +99,15 @@ route(get, Request) ->
%%%==============================================================================================
%% api apply
list(Params) ->
Data = emqx_mgmt_api:paginate(emqx_route, Params, fun format/1),
Response = emqx_json:encode(Data),
Response = emqx_mgmt_api:paginate(emqx_route, Params, fun format/1),
{200, Response}.
lookup(#{topic := Topic}) ->
case emqx_mgmt:lookup_routes(Topic) of
[] ->
NotFound = #{code => ?TOPIC_NOT_FOUND, reason => <<"Topic not found">>},
Response = emqx_json:encode(NotFound),
{404, Response};
{404, #{code => ?TOPIC_NOT_FOUND, message => <<"Topic not found">>}};
[Route] ->
Data = format(Route),
Response = emqx_json:encode(Data),
{200, Response}
{200, format(Route)}
end.
%%%==============================================================================================

View File

@ -15,7 +15,7 @@
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_stats).
-behavior(minirest_api).
-behaviour(minirest_api).
-export([api_spec/0]).
@ -96,13 +96,12 @@ stats_schema() ->
stats_api() ->
Metadata = #{
get => #{
description => "EMQ X stats",
description => <<"EMQ X stats">>,
responses => #{
<<"200">> => emqx_mgmt_util:response_schema(<<"List stats ok">>, <<"stats">>)}}},
<<"200">> => emqx_mgmt_util:response_schema(<<"List stats ok">>, stats)}}},
{"/stats", Metadata, list}.
%%%==============================================================================================
%% api apply
list(get, _Request) ->
Response = emqx_json:encode(emqx_mgmt:get_stats()),
{200, Response}.
{200, emqx_mgmt:get_stats()}.

View File

@ -15,7 +15,7 @@
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_status).
%% API
-behavior(minirest_api).
-behaviour(minirest_api).
-export([api_spec/0]).

View File

@ -16,79 +16,107 @@
-module(emqx_mgmt_api_subscriptions).
-behaviour(minirest_api).
-include_lib("emqx/include/emqx.hrl").
-define(SUBS_QS_SCHEMA, {emqx_suboption,
[{<<"clientid">>, binary},
{<<"topic">>, binary},
{<<"share">>, binary},
{<<"qos">>, integer},
{<<"_match_topic">>, binary}]}).
-export([api_spec/0]).
-rest_api(#{name => list_subscriptions,
method => 'GET',
path => "/subscriptions/",
func => list,
descr => "A list of subscriptions in the cluster"}).
-rest_api(#{name => list_node_subscriptions,
method => 'GET',
path => "/nodes/:atom:node/subscriptions/",
func => list,
descr => "A list of subscriptions on a node"}).
-rest_api(#{name => lookup_client_subscriptions,
method => 'GET',
path => "/subscriptions/:bin:clientid",
func => lookup,
descr => "A list of subscriptions of a client"}).
-rest_api(#{name => lookup_client_subscriptions_with_node,
method => 'GET',
path => "/nodes/:atom:node/subscriptions/:bin:clientid",
func => lookup,
descr => "A list of subscriptions of a client on the node"}).
-export([ list/2
, lookup/2
]).
-export([subscriptions/2]).
-export([ query/3
, format/1
]).
-define(SUBS_QS_SCHEMA, {emqx_suboption,
[ {<<"clientid">>, binary}
, {<<"topic">>, binary}
, {<<"share">>, binary}
, {<<"qos">>, integer}
, {<<"match_topic">>, binary}]}).
-define(query_fun, {?MODULE, query}).
-define(format_fun, {?MODULE, format}).
list(Bindings, Params) when map_size(Bindings) == 0 ->
case proplists:get_value(<<"topic">>, Params) of
undefined ->
emqx_mgmt:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
Topic ->
emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)})
end;
api_spec() ->
{
[subscriptions_api()],
[subscription_schema()]
}.
list(#{node := Node} = Bindings, Params) ->
case proplists:get_value(<<"topic">>, Params) of
undefined ->
case Node =:= node() of
true ->
emqx_mgmt:return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)});
false ->
case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of
{badrpc, Reason} -> emqx_mgmt:return({error, Reason});
Res -> Res
end
end;
Topic ->
emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)})
end.
subscriptions_api() ->
MetaData = #{
get => #{
description => <<"List subscriptions">>,
parameters => [
#{
name => page,
in => query,
description => <<"Page">>,
schema => #{type => integer}
},
#{
name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer}
},
#{
name => clientid,
in => query,
description => <<"Client ID">>,
schema => #{type => string}
},
#{
name => qos,
in => query,
description => <<"QoS">>,
schema => #{type => integer}
},
#{
name => share,
in => query,
description => <<"Shared subscription">>,
schema => #{type => boolean}
},
#{
name => topic,
in => query,
description => <<"Topic">>,
schema => #{type => string}
}
#{
name => match_topic,
in => query,
description => <<"Match topic string">>,
schema => #{type => string}
}
],
responses => #{
<<"200">> => emqx_mgmt_util:response_page_schema(subscription)}}},
{"/subscriptions", MetaData, subscriptions}.
lookup(#{node := Node, clientid := ClientId}, _Params) ->
emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))});
subscription_schema() ->
#{
subscription => #{
type => object,
properties => #{
topic => #{
type => string},
clientid => #{
type => string},
qos => #{
type => integer,
enum => [0,1,2]}}}
}.
subscriptions(get, Request) ->
Params = cowboy_req:parse_qs(Request),
list(Params).
list(Params) ->
{200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}.
lookup(#{clientid := ClientId}, _Params) ->
emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}).
format(Items) when is_list(Items) ->
[format(Item) || Item <- Items];
@ -98,10 +126,10 @@ format({{Subscriber, Topic}, Options}) ->
format({_Subscriber, Topic, Options = #{share := Group}}) ->
QoS = maps:get(qos, Options),
#{node => node(), topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS};
#{topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS};
format({_Subscriber, Topic, Options}) ->
QoS = maps:get(qos, Options),
#{node => node(), topic => Topic, clientid => maps:get(subid, Options), qos => QoS}.
#{topic => Topic, clientid => maps:get(subid, Options), qos => QoS}.
%%--------------------------------------------------------------------
%% Query Function

View File

@ -590,18 +590,21 @@ print({client, {ClientId, ChanPid}}) ->
InfoKeys = [clientid, username, peername,
clean_start, keepalive, expiry_interval,
subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped,
connected, created_at, connected_at] ++ case maps:is_key(disconnected_at, Info) of
true -> [disconnected_at];
false -> []
end,
connected, created_at, connected_at] ++
case maps:is_key(disconnected_at, Info) of
true -> [disconnected_at];
false -> []
end,
Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000},
emqx_ctl:print("Client(~s, username=~s, peername=~s, "
"clean_start=~s, keepalive=~w, session_expiry_interval=~w, "
"subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, "
"connected=~s, created_at=~w, connected_at=~w" ++ case maps:is_key(disconnected_at, Info) of
true -> ", disconnected_at=~w)~n";
false -> ")~n"
end,
[format(K, maps:get(K, Info)) || K <- InfoKeys]);
"connected=~s, created_at=~w, connected_at=~w" ++
case maps:is_key(disconnected_at, Info1) of
true -> ", disconnected_at=~w)~n";
false -> ")~n"
end,
[format(K, maps:get(K, Info1)) || K <- InfoKeys]);
print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) ->
emqx_ctl:print("~s -> ~s~n", [Topic, Node]);

View File

@ -31,7 +31,8 @@
, response_array_schema/2
, response_error_schema/1
, response_error_schema/2
, batch_response_schema/1]).
, response_page_schema/1
, response_batch_schema/1]).
-export([urldecode/1]).
@ -92,16 +93,22 @@ urldecode(S) ->
request_body_array_schema(Schema) when is_map(Schema) ->
json_content_schema("", #{type => array, items => Schema});
request_body_array_schema(Ref) when is_atom(Ref) ->
request_body_array_schema(atom_to_binary(Ref, utf8));
request_body_array_schema(Ref) when is_binary(Ref) ->
json_content_schema("", #{type => array, items => minirest:ref(Ref)}).
request_body_schema(Schema) when is_map(Schema) ->
json_content_schema("", Schema);
request_body_schema(Ref) when is_atom(Ref) ->
request_body_schema(atom_to_binary(Ref));
request_body_schema(Ref) when is_binary(Ref) ->
json_content_schema("", minirest:ref(Ref)).
response_array_schema(Description, Schema) when is_map(Schema) ->
json_content_schema(Description, #{type => array, items => Schema});
response_array_schema(Description, Ref) when is_atom(Ref) ->
response_array_schema(Description, atom_to_binary(Ref, utf8));
response_array_schema(Description, Ref) when is_binary(Ref) ->
json_content_schema(Description, #{type => array, items => minirest:ref(Ref)}).
@ -110,6 +117,8 @@ response_schema(Description) ->
response_schema(Description, Schema) when is_map(Schema) ->
json_content_schema(Description, Schema);
response_schema(Description, Ref) when is_atom(Ref) ->
response_schema(Description, atom_to_binary(Ref, utf8));
response_schema(Description, Ref) when is_binary(Ref) ->
json_content_schema(Description, minirest:ref(Ref)).
@ -124,11 +133,33 @@ response_error_schema(Description, Enum) ->
code => #{
type => string,
enum => Enum},
reason => #{
message => #{
type => string}}},
json_content_schema(Description, Schema).
batch_response_schema(DefName) when is_binary(DefName) ->
response_page_schema(Def) when is_atom(Def) ->
response_page_schema(atom_to_binary(Def, utf8));
response_page_schema(Def) when is_binary(Def) ->
Schema = #{
type => object,
properties => #{
meta => #{
type => object,
properties => #{
page => #{
type => integer},
limit => #{
type => integer},
count => #{
type => integer}}},
data => #{
type => array,
items => minirest:ref(Def)}}},
json_content_schema("", Schema).
response_batch_schema(DefName) when is_atom(DefName) ->
response_batch_schema(atom_to_binary(DefName, utf8));
response_batch_schema(DefName) when is_binary(DefName) ->
Schema = #{
type => object,
properties => #{

View File

@ -0,0 +1,73 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_alarms_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(ACT_ALARM, test_act_alarm).
-define(DE_ACT_ALARM, test_de_act_alarm).
all() ->
[t_alarms_api, t_delete_alarms_api].
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_management]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
t_alarms_api(_) ->
ok = emqx_alarm:activate(?ACT_ALARM),
ok = emqx_alarm:activate(?DE_ACT_ALARM),
ok = emqx_alarm:deactivate(?DE_ACT_ALARM),
get_alarms(1, true),
get_alarms(1, false).
t_delete_alarms_api(_) ->
Path = emqx_mgmt_api_test_util:api_path(["alarms"]),
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, Path),
get_alarms(1, true),
get_alarms(0, false).
get_alarms(AssertCount, Activated) when is_atom(Activated) ->
get_alarms(AssertCount, atom_to_list(Activated));
get_alarms(AssertCount, Activated) ->
Path = emqx_mgmt_api_test_util:api_path(["alarms"]),
Qs = "activated=" ++ Activated,
Headers = emqx_mgmt_api_test_util:auth_header_(),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, Headers),
Data = emqx_json:decode(Response, [return_maps]),
Meta = maps:get(<<"meta">>, Data),
Page = maps:get(<<"page">>, Meta),
Limit = maps:get(<<"limit">>, Meta),
Count = maps:get(<<"count">>, Meta),
?assertEqual(Page, 1),
?assertEqual(Limit, emqx_mgmt:max_row_limit()),
?assert(Count >= AssertCount).

View File

@ -83,6 +83,7 @@ t_clients(_) ->
%% delete /clients/:clientid kickout
Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]),
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path),
timer:sleep(300),
AfterKickoutResponse = emqx_mgmt_api_test_util:request_api(get, Client2Path),
?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse),
@ -95,6 +96,7 @@ t_clients(_) ->
SubscribeBody = #{topic => Topic, qos => Qos},
SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]),
{ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody),
timer:sleep(100),
[{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
?assertEqual(AfterSubTopic, Topic),
?assertEqual(AfterSubQos, Qos),
@ -102,4 +104,5 @@ t_clients(_) ->
%% delete /clients/:clientid/subscribe
UnSubscribeQuery = "topic=" ++ binary_to_list(Topic),
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, SubscribePath, UnSubscribeQuery, AuthHeader),
timer:sleep(100),
?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)).

View File

@ -0,0 +1,120 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_listeners_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_management]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
t_list_listeners(_) ->
Path = emqx_mgmt_api_test_util:api_path(["listeners"]),
get_api(Path).
t_list_node_listeners(_) ->
Path = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "listeners"]),
get_api(Path).
t_get_listeners(_) ->
LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
Identifier = maps:get(identifier, LocalListener),
Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]),
get_api(Path).
t_get_node_listeners(_) ->
LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
Identifier = maps:get(identifier, LocalListener),
Path = emqx_mgmt_api_test_util:api_path(
["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(Identifier)]),
get_api(Path).
t_stop_listener(_) ->
LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())),
Identifier = maps:get(identifier, LocalListener),
Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier), "stop"]),
{ok, _} = emqx_mgmt_api_test_util:request_api(get, Path),
GetPath = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]),
{ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath),
Listeners = emqx_json:decode(ListenersResponse, [return_maps]),
[listener_stats(Listener, false) || Listener <- Listeners].
get_api(Path) ->
{ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path),
LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()),
case emqx_json:decode(ListenersData, [return_maps]) of
[Listener] ->
Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8),
Filter =
fun(Local) ->
maps:get(identifier, Local) =:= Identifier
end,
LocalListener = hd(lists:filter(Filter, LocalListeners)),
comparison_listener(LocalListener, Listener);
Listeners when is_list(Listeners) ->
?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)),
Fun =
fun(LocalListener) ->
Identifier = maps:get(identifier, LocalListener),
IdentifierBinary = atom_to_binary(Identifier, utf8),
Filter =
fun(Listener) ->
maps:get(<<"identifier">>, Listener) =:= IdentifierBinary
end,
Listener = hd(lists:filter(Filter, Listeners)),
comparison_listener(LocalListener, Listener)
end,
lists:foreach(Fun, LocalListeners);
Listener when is_map(Listener) ->
Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8),
Filter =
fun(Local) ->
maps:get(identifier, Local) =:= Identifier
end,
LocalListener = hd(lists:filter(Filter, LocalListeners)),
comparison_listener(LocalListener, Listener)
end.
comparison_listener(Local, Response) ->
?assertEqual(maps:get(identifier, Local), binary_to_atom(maps:get(<<"identifier">>, Response))),
?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))),
?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)),
?assertEqual(maps:get(max_conn, Local), maps:get(<<"max_conn">>, Response)),
?assertEqual(maps:get(listen_on, Local), maps:get(<<"listen_on">>, Response)),
?assertEqual(maps:get(running, Local), maps:get(<<"running">>, Response)),
?assertEqual(maps:get(auth, Local), maps:get(<<"auth">>, Response)).
listener_stats(Listener, Stats) ->
?assertEqual(maps:get(<<"running">>, Listener), Stats).

View File

@ -60,13 +60,13 @@ t_publish_api(_) ->
?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok),
emqtt:disconnect(Client).
t_publish_batch_api(_) ->
t_publish_bulk_api(_) ->
{ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}),
{ok, _} = emqtt:connect(Client),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2),
Payload = <<"hello">>,
Path = emqx_mgmt_api_test_util:api_path(["publish_batch"]),
Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]),
Auth = emqx_mgmt_api_test_util:auth_header_(),
Body =[#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}],
{ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body),

View File

@ -0,0 +1,74 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_subscription_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(CLIENTID, <<"api_clientid">>).
-define(USERNAME, <<"api_username">>).
%% notice: integer topic for sort response
-define(TOPIC1, <<"0000">>).
-define(TOPIC2, <<"0001">>).
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_management]).
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_App) ->
ok.
t_subscription_api(_) ->
{ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID}),
{ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, ?TOPIC1),
{ok, _, _} = emqtt:subscribe(Client, ?TOPIC2),
Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path),
Data = emqx_json:decode(Response, [return_maps]),
Meta = maps:get(<<"meta">>, Data),
?assertEqual(1, maps:get(<<"page">>, Meta)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta)),
?assertEqual(2, maps:get(<<"count">>, Meta)),
Subscriptions = maps:get(<<"data">>, Data),
?assertEqual(length(Subscriptions), 2),
Sort =
fun(#{<<"topic">> := T1}, #{<<"topic">> := T2}) ->
binary_to_integer(T1) =< binary_to_integer(T2)
end,
[Subscriptions1, Subscriptions2] = lists:sort(Sort, Subscriptions),
?assertEqual(maps:get(<<"topic">>, Subscriptions1), ?TOPIC1),
?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2),
?assertEqual(maps:get(<<"clientid">>, Subscriptions1), ?CLIENTID),
?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID),
emqtt:disconnect(Client).

View File

@ -31,7 +31,7 @@ emqx_modules: {
},
{
type: telemetry
enable: true
enable: false
}
]
}

View File

@ -102,7 +102,7 @@ connected_presence(#{peerhost := PeerHost,
keepalive => Keepalive,
connack => 0, %% Deprecated?
clean_start => CleanStart,
expiry_interval => ExpiryInterval,
expiry_interval => ExpiryInterval div 1000,
connected_at => ConnectedAt,
ts => erlang:system_time(millisecond)
}.

View File

@ -182,7 +182,7 @@ eventmsg_connected(_ClientInfo = #{
keepalive => Keepalive,
clean_start => CleanStart,
receive_maximum => RcvMax,
expiry_interval => ExpiryInterval,
expiry_interval => ExpiryInterval div 1000,
is_bridge => IsBridge,
conn_props => printable_maps(ConnProps),
connected_at => ConnectedAt

View File

@ -3,11 +3,8 @@
##--------------------------------------------------------------------
emqx_statsd:{
host: "127.0.0.1"
port: 8125
batch_size: 10
prefix: "emqx"
tags: {"from": "emqx"}
enable: true
server: "127.0.0.1:8125"
sample_time_interval: "10s"
flush_time_interval: "10s"
}

View File

@ -1,9 +1,5 @@
-define(APP, emqx_statsd).
-define(DEFAULT_HOST, {127, 0, 0, 1}).
-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000).
-define(DEFAULT_FLUSH_TIME_INTERVAL, 10000).
-define(DEFAULT_HOST, "127.0.0.1").
-define(DEFAULT_PORT, 8125).
-define(DEFAULT_PREFIX, undefined).
-define(DEFAULT_TAGS, #{}).
-define(DEFAULT_BATCH_SIZE, 10).
-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10).
-define(DEFAULT_FLUSH_TIME_INTERVAL, 10).

View File

@ -1,6 +1,6 @@
{application, emqx_statsd,
[{description, "An OTP application"},
{vsn, "0.1.0"},
{vsn, "5.0.0"},
{registered, []},
{mod, {emqx_statsd_app, []}},
{applications,

View File

@ -1,100 +1,119 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd).
-module(emqx_statsd).
-behaviour(gen_server).
-behaviour(gen_server).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-include_lib("emqx/include/logger.hrl").
-include("emqx_statsd.hrl").
%% Interface
-export([start_link/1]).
%% Interface
-export([start_link/1]).
%% Internal Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, code_change/3
, terminate/2
]).
%% Internal Exports
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, code_change/3
, terminate/2
]).
-record(state, {
timer :: reference(),
sample_time_interval :: pos_integer(),
flush_time_interval :: pos_integer(),
estatsd_pid :: pid()
}).
-record(state, {
timer :: reference() | undefined,
sample_time_interval :: pos_integer(),
flush_time_interval :: pos_integer(),
estatsd_pid :: pid()
}).
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
init([Opts]) ->
SampleTimeInterval = proplists:get_value(sample_time_interval, Opts),
FlushTimeInterval = proplists:get_value(flush_time_interval, Opts),
Ref = erlang:start_timer(SampleTimeInterval, self(), sample_timeout),
Pid = proplists:get_value(estatsd_pid, Opts),
{ok, #state{timer = Ref,
sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid}}.
init([Opts]) ->
process_flag(trap_exit, true),
Tags = tags(maps:get(tags, Opts, #{})),
{Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}),
Opts1 = maps:without([sample_time_interval,
flush_time_interval], Opts#{tags => Tags,
host => Host,
port => Port,
prefix => <<"emqx">>}),
{ok, Pid} = estatsd:start_link(maps:to_list(Opts1)),
SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL),
{ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid})}.
handle_call(_Req, _From, State) ->
{noreply, State}.
handle_call(_Req, _From, State) ->
{noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, Ref, sample_timeout}, State = #state{sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid,
timer = Ref}) ->
Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(),
SampleRate = SampleTimeInterval / FlushTimeInterval,
StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics],
estatsd:submit(Pid, StatsdMetrics),
{noreply, State#state{timer = erlang:start_timer(SampleTimeInterval, self(), sample_timeout)}};
handle_info({timeout, Ref, sample_timeout},
State = #state{sample_time_interval = SampleTimeInterval,
flush_time_interval = FlushTimeInterval,
estatsd_pid = Pid,
timer = Ref}) ->
Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(),
SampleRate = SampleTimeInterval / FlushTimeInterval,
StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics],
estatsd:submit(Pid, StatsdMetrics),
{noreply, ensure_timer(State)};
handle_info(_Msg, State) ->
{noreply, State}.
handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) ->
{stop, {shutdown, Error}, State};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info(_Msg, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Internale function
%%------------------------------------------------------------------------------
trans_metrics_name(Name) ->
Name0 = atom_to_binary(Name, utf8),
binary_to_atom(<<"emqx.", Name0/binary>>, utf8).
terminate(_Reason, #state{estatsd_pid = Pid}) ->
estatsd:stop(Pid),
ok.
emqx_vm_data() ->
Idle = case cpu_sup:util([detailed]) of
{_, 0, 0, _} -> 0; %% Not support for Windows
{_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0)
end,
RunQueue = erlang:statistics(run_queue),
[{run_queue, RunQueue},
{cpu_idle, Idle},
{cpu_use, 100 - Idle}] ++ emqx_vm:mem_info().
%%------------------------------------------------------------------------------
%% Internale function
%%------------------------------------------------------------------------------
trans_metrics_name(Name) ->
Name0 = atom_to_binary(Name, utf8),
binary_to_atom(<<"emqx.", Name0/binary>>, utf8).
emqx_vm_data() ->
Idle = case cpu_sup:util([detailed]) of
{_, 0, 0, _} -> 0; %% Not support for Windows
{_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0)
end,
RunQueue = erlang:statistics(run_queue),
[{run_queue, RunQueue},
{cpu_idle, Idle},
{cpu_use, 100 - Idle}] ++ emqx_vm:mem_info().
tags(Map) ->
Tags = maps:to_list(Map),
[{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags].
ensure_timer(State =#state{sample_time_interval = SampleTimeInterval}) ->
State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}.

View File

@ -0,0 +1,108 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_statsd_api).
-behaviour(minirest_api).
-include("emqx_statsd.hrl").
-import(emqx_mgmt_util, [ response_schema/1
, response_schema/2
, request_body_schema/1
, request_body_schema/2
]).
-export([api_spec/0]).
-export([ statsd/2
]).
api_spec() ->
{statsd_api(), schemas()}.
schemas() ->
[#{statsd => #{
type => object,
properties => #{
server => #{
type => string,
description => <<"Statsd Server">>,
example => get_raw(<<"server">>, <<"127.0.0.1:8125">>)},
enable => #{
type => boolean,
description => <<"Statsd status">>,
example => get_raw(<<"enable">>, false)},
sample_time_interval => #{
type => string,
description => <<"Sample Time Interval">>,
example => get_raw(<<"sample_time_interval">>, <<"10s">>)},
flush_time_interval => #{
type => string,
description => <<"Flush Time Interval">>,
example => get_raw(<<"flush_time_interval">>, <<"10s">>)}
}
}}].
statsd_api() ->
Metadata = #{
get => #{
description => <<"Get statsd info">>,
responses => #{
<<"200">> => response_schema(<<"statsd">>)
}
},
put => #{
description => <<"Update Statsd">>,
'requestBody' => request_body_schema(<<"statsd">>),
responses => #{
<<"200">> =>
response_schema(<<"Update Statsd successfully">>),
<<"400">> =>
response_schema(<<"Bad Request">>, #{
type => object,
properties => #{
message => #{type => string},
code => #{type => string}
}
})
}
}
},
[{"/statsd", Metadata, statsd}].
statsd(get, _Request) ->
Response = emqx_config:get_raw([<<"emqx_statsd">>], #{}),
{200, Response};
statsd(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
Enable = maps:get(<<"enable">>, Params),
ok = emqx_config:update_config([emqx_statsd], Params),
enable_statsd(Enable).
enable_statsd(true) ->
ok = emqx_statsd_sup:stop_child(?APP),
emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})),
{200};
enable_statsd(false) ->
ok = emqx_statsd_sup:stop_child(?APP),
{200}.
get_raw(Key, Def) ->
emqx_config:get_raw([<<"emqx_statsd">>]++ [Key], Def).

View File

@ -18,9 +18,7 @@
-behaviour(application).
-include_lib("emqx/include/logger.hrl").
-emqx_plugin(?MODULE).
-include("emqx_statsd.hrl").
-export([ start/2
, stop/1
@ -28,11 +26,15 @@
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_statsd_sup:start_link(),
{ok, _} = emqx_statsd_sup:start_statsd(),
?LOG(info, "emqx statsd start: successfully"),
maybe_enable_statsd(),
{ok, Sup}.
stop(_) ->
ok = emqx_statsd_sup:stop_statsd(),
?LOG(info, "emqx statsd stop: successfully"),
ok.
maybe_enable_statsd() ->
case emqx_config:get([?APP, enable], false) of
true ->
emqx_statsd_sup:start_child(emqx_statsd, emqx_config:get([?APP], #{}));
false ->
ok
end.

View File

@ -4,41 +4,38 @@
-behaviour(hocon_schema).
-export([to_ip_port/1]).
-export([ structs/0
, fields/1]).
-typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}).
structs() -> ["emqx_statsd"].
fields("emqx_statsd") ->
[ {host, fun host/1}
, {port, fun port/1}
, {prefix, fun prefix/1}
, {tags, map()}
, {batch_size, fun batch_size/1}
, {sample_time_interval, fun duration_s/1}
, {flush_time_interval, fun duration_s/1}].
[ {enable, emqx_schema:t(boolean(), undefined, false)}
, {server, fun server/1}
, {sample_time_interval, fun duration_ms/1}
, {flush_time_interval, fun duration_ms/1}
].
host(type) -> string();
host(default) -> "127.0.0.1";
host(nullable) -> false;
host(_) -> undefined.
server(type) -> emqx_schema:ip_port();
server(default) -> "127.0.0.1:8125";
server(nullable) -> false;
server(_) -> undefined.
port(type) -> integer();
port(default) -> 8125;
port(nullable) -> true;
port(_) -> undefined.
duration_ms(type) -> emqx_schema:duration_ms();
duration_ms(nullable) -> false;
duration_ms(default) -> "10s";
duration_ms(_) -> undefined.
prefix(type) -> string();
prefix(default) -> "emqx";
prefix(nullable) -> true;
prefix(_) -> undefined.
batch_size(type) -> integer();
batch_size(nullable) -> false;
batch_size(default) -> 10;
batch_size(_) -> undefined.
duration_s(type) -> emqx_schema:duration_s();
duration_s(nullable) -> false;
duration_s(default) -> "10s";
duration_s(_) -> undefined.
to_ip_port(Str) ->
case string:tokens(Str, ":") of
[Ip, Port] ->
case inet:parse_address(Ip) of
{ok, R} -> {ok, {R, list_to_integer(Port)}};
_ -> {error, Str}
end;
_ -> {error, Str}
end.

View File

@ -7,63 +7,48 @@
-behaviour(supervisor).
-include("emqx_statsd.hrl").
-export([start_link/0]).
-export([start_statsd/0, stop_statsd/0]).
-export([ start_link/0
, start_child/1
, start_child/2
, stop_child/1
]).
-export([init/1]).
-export([estatsd_options/0]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Opts), #{id => Mod,
start => {Mod, start_link, [Opts]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, {{one_for_one, 10, 100}, []}}.
-spec start_child(supervisor:child_spec()) -> ok.
start_child(ChildSpec) when is_map(ChildSpec) ->
assert_started(supervisor:start_child(?MODULE, ChildSpec)).
start_statsd() ->
{ok, Pid} = supervisor:start_child(?MODULE, estatsd_child_spec()),
{ok, _Pid1} = supervisor:start_child(?MODULE, emqx_statsd_child_spec(Pid)).
-spec start_child(atom(), map()) -> ok.
start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))).
stop_statsd() ->
ok = supervisor:terminate_child(?MODULE, emqx_statsd),
ok = supervisor:terminate_child(?MODULE, estatsd).
%%==============================================================================================
%% internal
estatsd_child_spec() ->
#{id => estatsd
, start => {estatsd, start_link, [estatsd_options()]}
, restart => permanent
, shutdown => 5000
, type => worker
, modules => [estatsd]}.
-spec(stop_child(any()) -> ok | {error, term()}).
stop_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId);
Error -> Error
end.
estatsd_options() ->
Host = get_conf(host, ?DEFAULT_HOST),
Port = get_conf(port, ?DEFAULT_PORT),
Prefix = get_conf(prefix, ?DEFAULT_PREFIX),
Tags = tags(get_conf(tags, ?DEFAULT_TAGS)),
BatchSize = get_conf(batch_size, ?DEFAULT_BATCH_SIZE),
[{host, Host}, {port, Port}, {prefix, Prefix}, {tags, Tags}, {batch_size, BatchSize}].
init([]) ->
{ok, {{one_for_one, 10, 3600}, []}}.
tags(Map) ->
Tags = maps:to_list(Map),
[{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags].
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
emqx_statsd_child_spec(Pid) ->
#{id => emqx_statsd
, start => {emqx_statsd, start_link, [[{estatsd_pid, Pid} | emqx_statsd_options()]]}
, restart => permanent
, shutdown => 5000
, type => worker
, modules => [emqx_statsd]}.
emqx_statsd_options() ->
SampleTimeInterval = get_conf(sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL) * 1000,
FlushTimeInterval = get_conf(flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL) * 1000,
[{sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}].
get_conf(Key, Default) ->
emqx_config:get([?APP, Key], Default).
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_tarted, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason).

View File

@ -13,7 +13,7 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_statsd]).
all() ->
all() ->
emqx_ct:all(?MODULE).
t_statsd(_) ->

View File

@ -50,7 +50,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.4"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.2"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.4"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}