diff --git a/README-CN.md b/README-CN.md index 67f1b0ff5..b430d4b5f 100644 --- a/README-CN.md +++ b/README-CN.md @@ -9,7 +9,7 @@ [![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow)](https://askemq.com) [![YouTube](https://img.shields.io/badge/Subscribe-EMQ%20中文-FF0000?logo=youtube)](https://www.youtube.com/channel/UCir_r04HIsLjf2qqyZ4A8Cg) -[![最棒的物联网 MQTT 开源团队期待您的加入](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://careers.emqx.cn/) +[![最棒的物联网 MQTT 开源团队期待您的加入](https://static.emqx.net/images/github_readme_cn_bg.png)](https://careers.emqx.cn/) [English](./README.md) | 简体中文 | [日本語](./README-JP.md) | [русский](./README-RU.md) @@ -18,7 +18,7 @@ 从 3.0 版本开始,*EMQ X* 完整支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,并支持 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP 等通信协议。EMQ X 3.0 单集群可支持千万级别的 MQTT 并发连接。 - 新功能的完整列表,请参阅 [EMQ X Release Notes](https://github.com/emqx/emqx/releases)。 -- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.cn/)。 +- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.io/zh)。 ## 安装 @@ -34,7 +34,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p #### 二进制软件包安装 -需从 [EMQ X 下载](https://www.emqx.cn/downloads) 页面获取相应操作系统的二进制软件包。 +需从 [EMQ X 下载](https://www.emqx.com/zh/downloads) 页面获取相应操作系统的二进制软件包。 - [单节点安装文档](https://docs.emqx.cn/broker/latest/getting-started/install.html) - [集群配置文档](https://docs.emqx.cn/broker/latest/advanced/cluster.html) @@ -133,7 +133,7 @@ DIALYZER_ANALYSE_APP=emqx_lwm2m,emqx_authz make dialyzer - [Facebook](https://www.facebook.com/emqxmqtt) - [Reddit](https://www.reddit.com/r/emqx/) - [Weibo](https://weibo.com/emqtt) -- [Blog](https://www.emqx.cn/blog) +- [Blog](https://www.emqx.com/zh/blog) 欢迎你将任何 bug、问题和功能请求提交到 [emqx/emqx](https://github.com/emqx/emqx/issues)。 diff --git a/README-JP.md b/README-JP.md index a3e1f5130..6e1c62f2f 100644 --- a/README-JP.md +++ b/README-JP.md @@ -8,7 +8,7 @@ [![Twitter](https://img.shields.io/badge/Twitter-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech) [![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q) -[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) +[![The best IoT MQTT open source team looks forward to your joining](https://static.emqx.net/images/github_readme_en_bg.png)](https://www.emqx.com/en/careers) [English](./README.md) | [简体中文](./README-CN.md) | 日本語 | [русский](./README-RU.md) @@ -35,7 +35,7 @@ docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p #### バイナリパッケージによるインストール -それぞれのOSに対応したバイナリソフトウェアパッケージは、[EMQ Xのダウンロード](https://www.emqx.io/downloads)ページから取得できます。 +それぞれのOSに対応したバイナリソフトウェアパッケージは、[EMQ Xのダウンロード](https://www.emqx.com/en/downloads)ページから取得できます。 - [シングルノードインストール](https://docs.emqx.io/broker/latest/en/getting-started/installation.html) - [マルチノードインストール](https://docs.emqx.io/broker/latest/en/advanced/cluster.html) diff --git a/README-RU.md b/README-RU.md index 45f253f5b..2a06dac71 100644 --- a/README-RU.md +++ b/README-RU.md @@ -9,7 +9,7 @@ [![Community](https://img.shields.io/badge/Community-EMQ%20X-yellow?logo=github)](https://github.com/emqx/emqx/discussions) [![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q) -[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) +[![The best IoT MQTT open source team looks forward to your joining](https://static.emqx.net/images/github_readme_en_bg.png)](https://www.emqx.com/en/careers) [English](./README.md) | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | русский @@ -18,7 +18,7 @@ Начиная с релиза 3.0, брокер *EMQ X* полностью поддерживает протокол MQTT версии 5.0, и обратно совместим с версиями 3.1 и 3.1.1, а также протоколами MQTT-SN, CoAP, LwM2M, WebSocket и STOMP. Начиная с релиза 3.0, брокер *EMQ X* может масштабироваться до более чем 10 миллионов одновременных MQTT соединений на один кластер. - Полный список возможностей доступен по ссылке: [EMQ X Release Notes](https://github.com/emqx/emqx/releases). -- Более подробная информация доступна на нашем сайте: [EMQ X homepage](https://www.emqx.io). +- Более подробная информация доступна на нашем сайте: [EMQ X homepage](https://www.emqx.io/). ## Установка @@ -34,7 +34,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p #### Установка бинарного пакета -Сборки для различных операционных систем: [Загрузить EMQ X](https://www.emqx.io/downloads). +Сборки для различных операционных систем: [Загрузить EMQ X](https://www.emqx.com/en/downloads). - [Установка на одном сервере](https://docs.emqx.io/en/broker/latest/getting-started/install.html) - [Установка на кластере](https://docs.emqx.io/en/broker/latest/advanced/cluster.html) diff --git a/README.md b/README.md index 0f86fd188..1726d426b 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ [![Twitter](https://img.shields.io/badge/Follow-EMQ-1DA1F2?logo=twitter)](https://twitter.com/EMQTech) [![YouTube](https://img.shields.io/badge/Subscribe-EMQ-FF0000?logo=youtube)](https://www.youtube.com/channel/UC5FjR77ErAxvZENEWzQaO5Q) -[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers) +[![The best IoT MQTT open source team looks forward to your joining](https://static.emqx.net/images/github_readme_en_bg.png)](https://www.emqx.com/en/careers) English | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | [русский](./README-RU.md) @@ -17,7 +17,7 @@ English | [简体中文](./README-CN.md) | [日本語](./README-JP.md) | [рус Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. - For full list of new features, please read [EMQ X Release Notes](https://github.com/emqx/emqx/releases). -- For more information, please visit [EMQ X homepage](https://www.emqx.io). +- For more information, please visit [EMQ X homepage](https://www.emqx.io/). ## Installation @@ -33,7 +33,7 @@ docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8883:8883 -p #### Installing via Binary Package -Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.io/downloads) page. +Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.com/en/downloads) page. - [Single Node Install](https://docs.emqx.io/en/broker/latest/getting-started/install.html) - [Multi Node Install](https://docs.emqx.io/en/broker/latest/advanced/cluster.html) diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index cbd41a2a8..4dfeabbe2 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -813,7 +813,7 @@ broker { ## - `auth.*` ## - `stats.*` ## - `mqtt.*` -## - `acl.*` +## - `authorization.*` ## - `flapping_detect.*` ## - `force_shutdown.*` ## - `conn_congestion.*` @@ -1095,18 +1095,18 @@ zones.default { } - acl { + authorization { ## Enable ACL check. ## - ## @doc zones..acl.enable + ## @doc zones..authorization.enable ## ValueType: Boolean - ## Default: false - enable: false + ## Default: true + enable: true - ## The action when acl check reject current operation + ## The action when authorization check reject current operation ## - ## @doc zones..acl.deny_action + ## @doc zones..authorization.deny_action ## ValueType: ignore | disconnect ## Default: ignore deny_action: ignore @@ -1115,14 +1115,14 @@ zones.default { ## ## If enabled, ACLs roles for each client will be cached in the memory ## - ## @doc zones..acl.cache.enable + ## @doc zones..authorization.cache.enable ## ValueType: Boolean ## Default: true cache.enable: true ## The maximum count of ACL entries can be cached for a client. ## - ## @doc zones..acl.cache.max_size + ## @doc zones..authorization.cache.max_size ## ValueType: Integer ## Range: [0, 1048576] ## Default: 32 @@ -1130,7 +1130,7 @@ zones.default { ## The time after which an ACL cache entry will be deleted ## - ## @doc zones..acl.cache.ttl + ## @doc zones..authorization.cache.ttl ## ValueType: Duration ## Default: 1m cache.ttl: 1m @@ -1857,7 +1857,7 @@ zones.default { #This is an example zone which has less "strict" settings. #It's useful to clients connecting the broker from trusted networks. zones.internal { - acl.enable: false + authorization.enable: true auth.enable: false listeners.mqtt_internal: { type: tcp diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index 2fa0e630c..d969012da 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -250,5 +250,4 @@ emqx_feature() -> , emqx_bridge_mqtt , emqx_modules , emqx_management - , emqx_retainer - , emqx_statsd]. + , emqx_retainer]. diff --git a/apps/emqx/src/emqx_acl_cache.erl b/apps/emqx/src/emqx_acl_cache.erl index 189faca2f..0232eadaa 100644 --- a/apps/emqx/src/emqx_acl_cache.erl +++ b/apps/emqx/src/emqx_acl_cache.erl @@ -52,15 +52,15 @@ drain_k() -> {?MODULE, drain_timestamp}. -spec(is_enabled(atom()) -> boolean()). is_enabled(Zone) -> - emqx_config:get_zone_conf(Zone, [acl, cache, enable]). + emqx_config:get_zone_conf(Zone, [authorization, cache, enable]). -spec(get_cache_max_size(atom()) -> integer()). get_cache_max_size(Zone) -> - emqx_config:get_zone_conf(Zone, [acl, cache, max_size]). + emqx_config:get_zone_conf(Zone, [authorization, cache, max_size]). -spec(get_cache_ttl(atom()) -> integer()). get_cache_ttl(Zone) -> - emqx_config:get_zone_conf(Zone, [acl, cache, ttl]). + emqx_config:get_zone_conf(Zone, [authorization, cache, ttl]). -spec(list_acl_cache(atom()) -> [acl_cache_entry()]). list_acl_cache(Zone) -> diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 21581e71a..ea96f14ff 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -36,6 +36,8 @@ , stop/0 ]). +-export([format/1]). + %% API -export([ activate/1 , activate/2 @@ -157,6 +159,27 @@ handle_update_config(#{<<"validity_period">> := Period0} = NewConf, OldConf) -> handle_update_config(NewConf, OldConf) -> maps:merge(OldConf, NewConf). +format(#activated_alarm{name = Name, message = Message, activate_at = At, details = Details}) -> + Now = erlang:system_time(microsecond), + #{ + node => node(), + name => Name, + message => Message, + duration => Now - At, + details => Details + }; +format(#deactivated_alarm{name = Name, message = Message, activate_at = At, details = Details, + deactivate_at = DAt}) -> + #{ + node => node(), + name => Name, + message => Message, + duration => DAt - At, + details => Details + }; +format(_) -> + {error, unknow_alarm}. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -249,7 +272,7 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ get_validity_period() -> - timer:seconds(emqx_config:get([alarm, validity_period])). + emqx_config:get([alarm, validity_period]). deactivate_alarm(Details, #activated_alarm{activate_at = ActivateAt, name = Name, details = Details0, message = Msg0}) -> diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index b706baf99..c023a166a 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -435,7 +435,7 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), HasAclDeny = lists:any(fun({_TopicFilter, ReasonCode}) -> ReasonCode =:= ?RC_NOT_AUTHORIZED end, TupleTopicFilters0), - DenyAction = emqx_config:get_zone_conf(Zone, [acl, deny_action]), + DenyAction = emqx_config:get_zone_conf(Zone, [authorization, deny_action]), case DenyAction =:= disconnect andalso HasAclDeny of true -> handle_out(disconnect, ?RC_NOT_AUTHORIZED, Channel); false -> @@ -551,7 +551,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} -> ?LOG(warning, "Cannot publish message to ~s due to ~s.", [Topic, emqx_reason_codes:text(Rc)]), - case emqx_config:get_zone_conf(Zone, [acl_deny_action]) of + case emqx_config:get_zone_conf(Zone, [authorization, deny_action]) of ignore -> case QoS of ?QOS_0 -> {ok, NChannel}; @@ -737,7 +737,7 @@ process_disconnect(ReasonCode, Properties, Channel) -> maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval}, Channel = #channel{conninfo = ConnInfo}) -> - Channel#channel{conninfo = ConnInfo#{expiry_interval => Interval}}; + Channel#channel{conninfo = ConnInfo#{expiry_interval => timer:seconds(Interval)}}; maybe_update_expiry_interval(_Properties, Channel) -> Channel. %%-------------------------------------------------------------------- @@ -1114,11 +1114,11 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); interval(retry_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(retry_interval, Session)); + emqx_session:info(retry_interval, Session); interval(await_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(await_rel_timeout, Session)); + emqx_session:info(await_rel_timeout, Session); interval(expire_timer, #channel{conninfo = ConnInfo}) -> - timer:seconds(maps:get(expiry_interval, ConnInfo)); + maps:get(expiry_interval, ConnInfo); interval(will_timer, #channel{will_msg = WillMsg}) -> timer:seconds(will_delay_interval(WillMsg)). @@ -1176,7 +1176,7 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{ %% If the Session Expiry Interval is absent the value 0 is used. expiry_interval(_, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, properties = ConnProps}) -> - emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0); + timer:seconds(emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0)); expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) -> get_mqtt_conf(Zone, session_expiry_interval); expiry_interval(_, #mqtt_packet_connect{clean_start = true}) -> @@ -1615,14 +1615,14 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> case maps:get(expiry_interval, ConnInfo) of ?UINT_MAX -> {ok, Channel}; I when I > 0 -> - {ok, ensure_timer(expire_timer, timer:seconds(I), Channel)}; + {ok, ensure_timer(expire_timer, I, Channel)}; _ -> shutdown(Reason, Channel) end. %%-------------------------------------------------------------------- %% Is ACL enabled? is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> - (not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [acl, enable]). + (not IsSuperuser) andalso emqx_config:get_zone_conf(Zone, [authorization, enable]). %%-------------------------------------------------------------------- %% Parse Topic Filters diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 1f878718c..e2c8438a2 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -248,22 +248,22 @@ create_session(ClientInfo, ConnInfo) -> Session. get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> - #{max_subscriptions => get_conf(Zone, max_subscriptions), - upgrade_qos => get_conf(Zone, upgrade_qos), + #{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions), + upgrade_qos => get_mqtt_conf(Zone, upgrade_qos), max_inflight => MaxInflight, - retry_interval => get_conf(Zone, retry_interval), - await_rel_timeout => get_conf(Zone, await_rel_timeout), + retry_interval => get_mqtt_conf(Zone, retry_interval), + await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout), mqueue => mqueue_confs(Zone) }. mqueue_confs(Zone) -> - #{max_len => get_conf(Zone, max_mqueue_len), - store_qos0 => get_conf(Zone, mqueue_store_qos0), - priorities => get_conf(Zone, mqueue_priorities), - default_priority => get_conf(Zone, mqueue_default_priority) + #{max_len => get_mqtt_conf(Zone, max_mqueue_len), + store_qos0 => get_mqtt_conf(Zone, mqueue_store_qos0), + priorities => get_mqtt_conf(Zone, mqueue_priorities), + default_priority => get_mqtt_conf(Zone, mqueue_default_priority) }. -get_conf(Zone, Key) -> +get_mqtt_conf(Zone, Key) -> emqx_config:get_zone_conf(Zone, [mqtt, Key]). %% @doc Try to takeover a session. diff --git a/apps/emqx/src/emqx_global_gc.erl b/apps/emqx/src/emqx_global_gc.erl index b52711964..9449efe9a 100644 --- a/apps/emqx/src/emqx_global_gc.erl +++ b/apps/emqx/src/emqx_global_gc.erl @@ -87,7 +87,7 @@ code_change(_OldVsn, State, _Extra) -> ensure_timer(State) -> case emqx_config:get([node, global_gc_interval]) of undefined -> State; - Interval -> TRef = emqx_misc:start_timer(timer:seconds(Interval), run), + Interval -> TRef = emqx_misc:start_timer(Interval, run), State#{timer := TRef} end. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index e2d9c698c..faad35c3d 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -20,9 +20,11 @@ -include("emqx_mqtt.hrl"). %% APIs --export([ start/0 +-export([ list/0 + , start/0 , restart/0 , stop/0 + , is_running/1 ]). -export([ start_listener/1 @@ -33,6 +35,57 @@ , restart_listener/3 ]). +-spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]). +list() -> + Zones = maps:to_list(emqx_config:get([zones], #{})), + lists:append([list(ZoneName, ZoneConf) || {ZoneName, ZoneConf} <- Zones]). + +list(ZoneName, ZoneConf) -> + Listeners = maps:to_list(maps:get(listeners, ZoneConf, #{})), + [ + begin + ListenerId = listener_id(ZoneName, LName), + Running = is_running(ListenerId), + Conf = merge_zone_and_listener_confs(ZoneConf, LConf), + {ListenerId, maps:put(running, Running, Conf)} + end + || {LName, LConf} <- Listeners]. + +-spec is_running(ListenerId :: atom()) -> boolean() | {error, no_found}. +is_running(ListenerId) -> + Zones = maps:to_list(emqx_config:get([zones], #{})), + Listeners = lists:append( + [ + [{listener_id(ZoneName, LName),merge_zone_and_listener_confs(ZoneConf, LConf)} + || {LName, LConf} <- maps:to_list(maps:get(listeners, ZoneConf, #{}))] + || {ZoneName, ZoneConf} <- Zones]), + case proplists:get_value(ListenerId, Listeners, undefined) of + undefined -> + {error, no_found}; + Conf -> + is_running(ListenerId, Conf) + end. + +is_running(ListenerId, #{type := tcp, bind := ListenOn})-> + try esockd:listener({ListenerId, ListenOn}) of + Pid when is_pid(Pid)-> + true + catch _:_ -> + false + end; + +is_running(ListenerId, #{type := ws})-> + try + Info = ranch:info(ListenerId), + proplists:get_value(status, Info) =:= running + catch _:_ -> + false + end; + +is_running(_ListenerId, #{type := quic})-> +%% TODO: quic support + {error, no_found}. + %% @doc Start all listeners. -spec(start() -> ok). start() -> @@ -52,6 +105,8 @@ start_listener(ZoneName, ListenerName, #{type := Type, bind := Bind} = Conf) -> {ok, _} -> console_print("Start ~s listener ~s on ~s successfully.~n", [Type, listener_id(ZoneName, ListenerName), format(Bind)]); + {error, {already_started, Pid}} -> + {error, {already_started, Pid}}; {error, Reason} -> io:format(standard_error, "Failed to start ~s listener ~s on ~s: ~0p~n", [Type, listener_id(ZoneName, ListenerName), format(Bind), Reason]), diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index b70c27e1b..dfce7a255 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -53,12 +53,12 @@ start_link() -> %%-------------------------------------------------------------------- get_mem_check_interval() -> - memsup:get_check_interval() div 1000. + memsup:get_check_interval(). -set_mem_check_interval(Seconds) when Seconds < 60 -> +set_mem_check_interval(Seconds) when Seconds < 60000 -> memsup:set_check_interval(1); set_mem_check_interval(Seconds) -> - memsup:set_check_interval(Seconds div 60). + memsup:set_check_interval(Seconds div 60000). get_sysmem_high_watermark() -> memsup:get_sysmem_high_watermark(). @@ -128,5 +128,5 @@ start_check_timer() -> Interval = emqx_config:get([sysmon, os, cpu_check_interval]), case erlang:system_info(system_architecture) of "x86_64-pc-linux-musl" -> ok; - _ -> emqx_misc:start_timer(timer:seconds(Interval), check) + _ -> emqx_misc:start_timer(Interval, check) end. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 0dc48e42f..cfaffc6c7 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -164,7 +164,7 @@ fields("node") -> , {"config_files", t(list(string()), "emqx.config_files", [ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"]) ])} - , {"global_gc_interval", t(duration_s(), undefined, "15m")} + , {"global_gc_interval", t(duration(), undefined, "15m")} , {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)} , {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")} , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} @@ -257,13 +257,13 @@ fields("auth") -> [ {"enable", t(boolean(), undefined, false)} ]; -fields("acl") -> - [ {"enable", t(boolean(), undefined, false)} - , {"cache", ref("acl_cache")} +fields("authorization") -> + [ {"enable", t(boolean(), undefined, true)} + , {"cache", ref("authorization_cache")} , {"deny_action", t(union(ignore, disconnect), undefined, ignore)} ]; -fields("acl_cache") -> +fields("authorization_cache") -> [ {"enable", t(boolean(), undefined, true)} , {"max_size", t(range(1, 1048576), undefined, 32)} , {"ttl", t(duration(), undefined, "1m")} @@ -288,10 +288,10 @@ fields("mqtt") -> , {"max_subscriptions", maybe_infinity(range(1, inf))} , {"upgrade_qos", t(boolean(), undefined, false)} , {"max_inflight", t(range(1, 65535), undefined, 32)} - , {"retry_interval", t(duration_s(), undefined, "30s")} + , {"retry_interval", t(duration(), undefined, "30s")} , {"max_awaiting_rel", maybe_infinity(integer(), 100)} - , {"await_rel_timeout", t(duration_s(), undefined, "300s")} - , {"session_expiry_interval", t(duration_s(), undefined, "2h")} + , {"await_rel_timeout", t(duration(), undefined, "300s")} + , {"session_expiry_interval", t(duration(), undefined, "2h")} , {"max_mqueue_len", maybe_infinity(range(0, inf), 1000)} , {"mqueue_priorities", maybe_disabled(map())} , {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)} @@ -306,7 +306,7 @@ fields("zones") -> fields("zone_settings") -> [ {"mqtt", ref("mqtt")} - , {"acl", ref("acl")} + , {"authorization", ref("authorization")} , {"auth", ref("auth")} , {"stats", ref("stats")} , {"flapping_detect", ref("flapping_detect")} @@ -507,10 +507,10 @@ fields("sysmon_vm") -> ]; fields("sysmon_os") -> - [ {"cpu_check_interval", t(duration_s(), undefined, 60)} + [ {"cpu_check_interval", t(duration(), undefined, "60s")} , {"cpu_high_watermark", t(percent(), undefined, "80%")} , {"cpu_low_watermark", t(percent(), undefined, "60%")} - , {"mem_check_interval", maybe_disabled(duration_s(), 60)} + , {"mem_check_interval", maybe_disabled(duration(), "60s")} , {"sysmem_high_watermark", t(percent(), undefined, "70%")} , {"procmem_high_watermark", t(percent(), undefined, "5%")} ]; @@ -518,7 +518,7 @@ fields("sysmon_os") -> fields("alarm") -> [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} , {"size_limit", t(integer(), undefined, 1000)} - , {"validity_period", t(duration_s(), undefined, "24h")} + , {"validity_period", t(duration(), undefined, "24h")} ]; fields(ExtraField) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index d2670e978..3e0f56610 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -178,10 +178,10 @@ init(Opts) -> inflight = emqx_inflight:new(MaxInflight), mqueue = emqx_mqueue:init(QueueOpts), next_pkt_id = 1, - retry_interval = timer:seconds(maps:get(retry_interval, Opts, 30)), + retry_interval = maps:get(retry_interval, Opts, 30000), awaiting_rel = #{}, max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), - await_rel_timeout = timer:seconds(maps:get(await_rel_timeout, Opts, 300)), + await_rel_timeout = maps:get(await_rel_timeout, Opts, 300000), created_at = erlang:system_time(millisecond) }. @@ -211,7 +211,7 @@ info(inflight_cnt, #session{inflight = Inflight}) -> info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); info(retry_interval, #session{retry_interval = Interval}) -> - Interval div 1000; + Interval; info(mqueue, #session{mqueue = MQueue}) -> MQueue; info(mqueue_len, #session{mqueue = MQueue}) -> @@ -229,7 +229,7 @@ info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) -> info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> Max; info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> - Timeout div 1000; + Timeout; info(created_at, #session{created_at = CreatedAt}) -> CreatedAt. diff --git a/apps/emqx/test/emqx_acl_cache_SUITE.erl b/apps/emqx/test/emqx_acl_cache_SUITE.erl index 3708d0524..ebbf974af 100644 --- a/apps/emqx/test/emqx_acl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_acl_cache_SUITE.erl @@ -80,4 +80,4 @@ t_drain_acl_cache(_) -> emqtt:stop(Client). toggle_acl(Bool) when is_boolean(Bool) -> - emqx_config:put_zone_conf(default, [acl, enable], Bool). + emqx_config:put_zone_conf(default, [authorization, enable], Bool). diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index ec97daa10..0b2039f30 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -28,7 +28,7 @@ all() -> emqx_ct:all(?MODULE). mqtt_conf() -> - #{await_rel_timeout => 300, + #{await_rel_timeout => 300000, idle_timeout => 15000, ignore_loop_deliver => false, keepalive_backoff => 0.75, @@ -49,9 +49,9 @@ mqtt_conf() -> peer_cert_as_username => disabled, response_information => [], retain_available => true, - retry_interval => 30, + retry_interval => 30000, server_keepalive => disabled, - session_expiry_interval => 7200, + session_expiry_interval => 7200000, shared_subscription => true, strict_mode => false, upgrade_qos => false, @@ -140,7 +140,7 @@ listener_mqtt_ws_conf() -> default_zone_conf() -> #{zones => #{default => - #{ acl => #{ + #{ authorization => #{ cache => #{enable => true,max_size => 32, ttl => 60000}, deny_action => ignore, enable => false @@ -863,7 +863,7 @@ t_packing_alias(_) -> channel())). t_check_pub_acl(_) -> - emqx_config:put_zone_conf(default, [acl, enable], true), + emqx_config:put_zone_conf(default, [authorization, enable], true), Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), ok = emqx_channel:check_pub_acl(Publish, channel()). @@ -873,7 +873,7 @@ t_check_pub_alias(_) -> ok = emqx_channel:check_pub_alias(#mqtt_packet{variable = Publish}, Channel). t_check_sub_acls(_) -> - emqx_config:put_zone_conf(default, [acl, enable], true), + emqx_config:put_zone_conf(default, [authorization, enable], true), TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS}, [{TopicFilter, 0}] = emqx_channel:check_sub_acls([TopicFilter], channel()). diff --git a/apps/emqx/test/emqx_global_gc_SUITE.erl b/apps/emqx/test/emqx_global_gc_SUITE.erl index 643e5dd0d..dcdeeae57 100644 --- a/apps/emqx/test/emqx_global_gc_SUITE.erl +++ b/apps/emqx/test/emqx_global_gc_SUITE.erl @@ -24,7 +24,7 @@ all() -> emqx_ct:all(?MODULE). t_run_gc(_) -> - ok = emqx_config:put([node, global_gc_interval], 1), + ok = emqx_config:put([node, global_gc_interval], 1000), {ok, _} = emqx_global_gc:start_link(), ok = timer:sleep(1500), {ok, MilliSecs} = emqx_global_gc:run(), diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index ff04055e6..8f82d83bb 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -217,14 +217,12 @@ t_connect_will_message(Config) -> ok = emqtt:disconnect(Client4). t_batch_subscribe(init, Config) -> - emqx_config:put_zone_conf(default, [acl, enable], true), - emqx_config:put_zone_conf(default, [acl, enable], true), + emqx_config:put_zone_conf(default, [authorization, enable], true), ok = meck:new(emqx_access_control, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_access_control, authorize, fun(_, _, _) -> deny end), Config; t_batch_subscribe('end', _Config) -> - emqx_config:put_zone_conf(default, [acl, enable], false), - emqx_config:put_zone_conf(default, [acl, enable], false), + emqx_config:put_zone_conf(default, [authorization, enable], false), meck:unload(emqx_access_control). t_batch_subscribe(Config) -> diff --git a/apps/emqx/test/emqx_os_mon_SUITE.erl b/apps/emqx/test/emqx_os_mon_SUITE.erl index 6c9ac51c2..56c81e9e6 100644 --- a/apps/emqx/test/emqx_os_mon_SUITE.erl +++ b/apps/emqx/test/emqx_os_mon_SUITE.erl @@ -25,8 +25,8 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> emqx_config:put([sysmon, os], #{ - cpu_check_interval => 60,cpu_high_watermark => 0.8, - cpu_low_watermark => 0.6,mem_check_interval => 60, + cpu_check_interval => 60000,cpu_high_watermark => 0.8, + cpu_low_watermark => 0.6,mem_check_interval => 60000, procmem_high_watermark => 0.05,sysmem_high_watermark => 0.7}), application:ensure_all_started(os_mon), Config. @@ -38,11 +38,11 @@ t_api(_) -> gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}), {ok, _} = emqx_os_mon:start_link(), - ?assertEqual(60, emqx_os_mon:get_mem_check_interval()), - ?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30)), - ?assertEqual(60, emqx_os_mon:get_mem_check_interval()), - ?assertEqual(ok, emqx_os_mon:set_mem_check_interval(122)), - ?assertEqual(120, emqx_os_mon:get_mem_check_interval()), + ?assertEqual(60000, emqx_os_mon:get_mem_check_interval()), + ?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30000)), + ?assertEqual(60000, emqx_os_mon:get_mem_check_interval()), + ?assertEqual(ok, emqx_os_mon:set_mem_check_interval(122000)), + ?assertEqual(120000, emqx_os_mon:get_mem_check_interval()), ?assertEqual(70, emqx_os_mon:get_sysmem_high_watermark()), ?assertEqual(ok, emqx_os_mon:set_sysmem_high_watermark(0.8)), diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index be0e81de8..1aa5b0196 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -59,11 +59,11 @@ t_session_init(_) -> ?assertEqual(0, emqx_session:info(inflight_cnt, Session)), ?assertEqual(64, emqx_session:info(inflight_max, Session)), ?assertEqual(1, emqx_session:info(next_pkt_id, Session)), - ?assertEqual(30, emqx_session:info(retry_interval, Session)), + ?assertEqual(30000, emqx_session:info(retry_interval, Session)), ?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))), ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)), ?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)), - ?assertEqual(300, emqx_session:info(await_rel_timeout, Session)), + ?assertEqual(300000, emqx_session:info(await_rel_timeout, Session)), ?assert(is_integer(emqx_session:info(created_at, Session))). %%-------------------------------------------------------------------- @@ -73,8 +73,8 @@ t_session_init(_) -> t_session_info(_) -> ?assertMatch(#{subscriptions := #{}, upgrade_qos := false, - retry_interval := 30, - await_rel_timeout := 300 + retry_interval := 30000, + await_rel_timeout := 300000 }, emqx_session:info(session())). t_session_stats(_) -> @@ -309,9 +309,11 @@ t_enqueue(_) -> t_retry(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], - Session = session(#{retry_interval => 100}), + RetryIntervalMs = 100, %% 0.1s + Session = session(#{retry_interval => RetryIntervalMs}), {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session), - ok = timer:sleep(200), + ElapseMs = 200, %% 0.2s + ok = timer:sleep(ElapseMs), Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], {ok, Msgs1, 100, Session2} = emqx_session:retry(Session1), ?assertEqual(2, emqx_session:info(inflight_cnt, Session2)). @@ -342,7 +344,7 @@ t_replay(_) -> t_expire_awaiting_rel(_) -> {ok, Session} = emqx_session:expire(awaiting_rel, session()), - Timeout = emqx_session:info(await_rel_timeout, Session) * 1000, + Timeout = emqx_session:info(await_rel_timeout, Session), Session1 = emqx_session:set_field(awaiting_rel, #{1 => Ts = ts(millisecond)}, Session), {ok, Timeout, Session2} = emqx_session:expire(awaiting_rel, Session1), ?assertEqual(#{1 => Ts}, emqx_session:info(awaiting_rel, Session2)). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index c9c26a766..5978e83b7 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -456,7 +456,7 @@ parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). is_acl_enabled(#{zone := Zone, listener := Listener, is_superuser := IsSuperuser}) -> - (not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [acl, enable]). + (not IsSuperuser) andalso emqx_config:get_listener_conf(Zone, Listener, [authorization, enable]). %%-------------------------------------------------------------------- %% Ensure & Hooks diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 9d08fb8a1..0ebbc5195 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -1391,9 +1391,9 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); interval(retry_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(retry_interval, Session)); + emqx_session:info(retry_interval, Session); interval(await_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(await_rel_timeout, Session)). + emqx_session:info(await_rel_timeout, Session). %%-------------------------------------------------------------------- %% Helper functions diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index b539b223b..eae5563ba 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -85,7 +85,10 @@ %% Listeners -export([ list_listeners/0 , list_listeners/1 - , restart_listener/2 + , list_listeners/2 + , list_listeners_by_id/1 + , get_listener/2 + , manage_listener/2 ]). %% Alarms @@ -451,37 +454,39 @@ reload_plugin(Node, Plugin) -> %%-------------------------------------------------------------------- list_listeners() -> - [{Node, list_listeners(Node)} || Node <- ekka_mnesia:running_nodes()]. + lists:append([list_listeners(Node) || Node <- ekka_mnesia:running_nodes()]). + +list_listeners(Node, Identifier) -> + listener_id_filter(Identifier, list_listeners(Node)). list_listeners(Node) when Node =:= node() -> - Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) -> - #{protocol => Protocol, - listen_on => ListenOn, - identifier => Protocol, - acceptors => esockd:get_acceptors({Protocol, ListenOn}), - max_conns => esockd:get_max_connections({Protocol, ListenOn}), - current_conns => esockd:get_current_connections({Protocol, ListenOn}), - shutdown_count => esockd:get_shutdown_count({Protocol, ListenOn})} - end, esockd:listeners()), - Http = lists:map(fun({Protocol, Opts}) -> - #{protocol => Protocol, - listen_on => proplists:get_value(port, Opts), - acceptors => maps:get(num_acceptors, proplists:get_value(transport_options, Opts, #{}), 0), - max_conns => proplists:get_value(max_connections, Opts), - current_conns => proplists:get_value(all_connections, Opts), - shutdown_count => []} - end, ranch:info()), - Tcp ++ Http; + [{Id, maps:put(node, Node, Conf)} || {Id, Conf} <- emqx_listeners:list()]; list_listeners(Node) -> rpc_call(Node, list_listeners, [Node]). --spec restart_listener(node(), atom()) -> ok | {error, term()}. -restart_listener(Node, Identifier) when Node =:= node() -> - emqx_listeners:restart_listener(Identifier); +list_listeners_by_id(Identifier) -> + listener_id_filter(Identifier, list_listeners()). -restart_listener(Node, Identifier) -> - rpc_call(Node, restart_listener, [Node, Identifier]). +get_listener(Node, Identifier) -> + case listener_id_filter(Identifier, list_listeners(Node)) of + [] -> + {error, not_found}; + [Listener] -> + Listener + end. + +listener_id_filter(Identifier, Listeners) -> + Filter = + fun({Id, _}) -> Id =:= Identifier end, + lists:filter(Filter, Listeners). + +-spec manage_listener(Operation :: start_listener|stop_listener|restart_listener, Param :: map()) -> + ok | {error, Reason :: term()}. +manage_listener(Operation, #{identifier := Identifier, node := Node}) when Node =:= node()-> + erlang:apply(emqx_listeners, Operation, [Identifier]); +manage_listener(Operation, Param = #{node := Node}) -> + rpc_call(Node, restart_listener, [Operation, Param]). %%-------------------------------------------------------------------- %% Get Alarms @@ -542,7 +547,7 @@ item(route, {Topic, Node}) -> #{topic => Topic, node => Node}. %%-------------------------------------------------------------------- -%% Internel Functions. +%% Internal Functions. %%-------------------------------------------------------------------- rpc_call(Node, Fun, Args) -> diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index e068c5384..fbf926540 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -65,10 +65,10 @@ count(Table, Nodes) -> lists:sum([rpc_call(Node, ets, info, [Table, size], 5000) || Node <- Nodes]). page(Params) -> - binary_to_integer(proplists:get_value(<<"_page">>, Params, <<"1">>)). + binary_to_integer(proplists:get_value(<<"page">>, Params, <<"1">>)). limit(Params) -> - case proplists:get_value(<<"_limit">>, Params) of + case proplists:get_value(<<"limit">>, Params) of undefined -> emqx_mgmt:max_row_limit(); Size -> binary_to_integer(Size) end. @@ -204,7 +204,7 @@ params2qs(Params, QsSchema) -> {length(Qs) + length(Fuzzy), {Qs, Fuzzy}}. %%-------------------------------------------------------------------- -%% Intenal funcs +%% Internal funcs pick_params_to_qs([], _, Acc1, Acc2) -> NAcc2 = [E || E <- Acc2, not lists:keymember(element(1, E), 1, Acc1)], @@ -215,12 +215,12 @@ pick_params_to_qs([{Key, Value}|Params], QsKits, Acc1, Acc2) -> undefined -> pick_params_to_qs(Params, QsKits, Acc1, Acc2); Type -> case Key of - <> - when Prefix =:= <<"_gte_">>; - Prefix =:= <<"_lte_">> -> + <> + when Prefix =:= <<"gte_">>; + Prefix =:= <<"lte_">> -> OpposeKey = case Prefix of - <<"_gte_">> -> <<"_lte_", NKey/binary>>; - <<"_lte_">> -> <<"_gte_", NKey/binary>> + <<"gte_">> -> <<"lte_", NKey/binary>>; + <<"lte_">> -> <<"gte_", NKey/binary>> end, case lists:keytake(OpposeKey, 1, Params) of false -> @@ -252,20 +252,20 @@ qs(K, Value0, Type) -> throw({bad_value_type, {K, Type, Value0}}) end. -qs(<<"_gte_", Key/binary>>, Value) -> +qs(<<"gte_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), '>=', Value}; -qs(<<"_lte_", Key/binary>>, Value) -> +qs(<<"lte_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), '=<', Value}; -qs(<<"_like_", Key/binary>>, Value) -> +qs(<<"like_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), like, Value}; -qs(<<"_match_", Key/binary>>, Value) -> +qs(<<"match_", Key/binary>>, Value) -> {binary_to_existing_atom(Key, utf8), match, Value}; qs(Key, Value) -> {binary_to_existing_atom(Key, utf8), '=:=', Value}. -is_fuzzy_key(<<"_like_", _/binary>>) -> +is_fuzzy_key(<<"like_", _/binary>>) -> true; -is_fuzzy_key(<<"_match_", _/binary>>) -> +is_fuzzy_key(<<"match_", _/binary>>) -> true; is_fuzzy_key(_) -> false. @@ -317,18 +317,18 @@ params2qs_test() -> {<<"int">>, integer}, {<<"atom">>, atom}, {<<"ts">>, timestamp}, - {<<"_gte_range">>, integer}, - {<<"_lte_range">>, integer}, - {<<"_like_fuzzy">>, binary}, - {<<"_match_topic">>, binary}], + {<<"gte_range">>, integer}, + {<<"lte_range">>, integer}, + {<<"like_fuzzy">>, binary}, + {<<"match_topic">>, binary}], Params = [{<<"str">>, <<"abc">>}, {<<"int">>, <<"123">>}, {<<"atom">>, <<"connected">>}, {<<"ts">>, <<"156000">>}, - {<<"_gte_range">>, <<"1">>}, - {<<"_lte_range">>, <<"5">>}, - {<<"_like_fuzzy">>, <<"user">>}, - {<<"_match_topic">>, <<"t/#">>}], + {<<"gte_range">>, <<"1">>}, + {<<"lte_range">>, <<"5">>}, + {<<"like_fuzzy">>, <<"user">>}, + {<<"match_topic">>, <<"t/#">>}], ExpectedQs = [{str, '=:=', <<"abc">>}, {int, '=:=', 123}, {atom, '=:=', connected}, diff --git a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl index 9b9e4a4a6..36a0f3a5b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_alarms.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_alarms.erl @@ -16,122 +16,119 @@ -module(emqx_mgmt_api_alarms). --include("emqx_mgmt.hrl"). +-behaviour(minirest_api). --include_lib("emqx/include/emqx.hrl"). +-export([api_spec/0]). --rest_api(#{name => list_all_alarms, - method => 'GET', - path => "/alarms", - func => list, - descr => "List all alarms in the cluster"}). +-export([alarms/2]). --rest_api(#{name => list_node_alarms, - method => 'GET', - path => "nodes/:atom:node/alarms", - func => list, - descr => "List all alarms on a node"}). +-export([ query_activated/3 + , query_deactivated/3]). +%% notice: from emqx_alarms +-define(ACTIVATED_ALARM, emqx_activated_alarm). +-define(DEACTIVATED_ALARM, emqx_deactivated_alarm). --rest_api(#{name => list_all_activated_alarms, - method => 'GET', - path => "/alarms/activated", - func => list_activated, - descr => "List all activated alarm in the cluster"}). +api_spec() -> + {[alarms_api()], [alarm_schema()]}. --rest_api(#{name => list_node_activated_alarms, - method => 'GET', - path => "nodes/:atom:node/alarms/activated", - func => list_activated, - descr => "List all activated alarm on a node"}). +alarm_schema() -> + #{ + alarm => #{ + type => object, + properties => #{ + node => #{ + type => string, + description => <<"Alarm in node">>}, + name => #{ + type => string, + description => <<"Alarm name">>}, + message => #{ + type => string, + description => <<"Alarm readable information">>}, + details => #{ + type => object, + description => <<"Alarm detail">>}, + duration => #{ + type => integer, + description => <<"Alarms duration time; UNIX time stamp">>} + } + } + }. --rest_api(#{name => list_all_deactivated_alarms, - method => 'GET', - path => "/alarms/deactivated", - func => list_deactivated, - descr => "List all deactivated alarm in the cluster"}). +alarms_api() -> + Metadata = #{ + get => #{ + description => <<"EMQ X alarms">>, + parameters => [#{ + name => activated, + in => query, + description => <<"All alarms, if not specified">>, + required => false, + schema => #{type => boolean, default => true} + }], + responses => #{ + <<"200">> => + emqx_mgmt_util:response_array_schema(<<"List all alarms">>, alarm)}}, + delete => #{ + description => <<"Remove all deactivated alarms">>, + responses => #{ + <<"200">> => + emqx_mgmt_util:response_schema(<<"Remove all deactivated alarms ok">>)}}}, + {"/alarms", Metadata, alarms}. --rest_api(#{name => list_node_deactivated_alarms, - method => 'GET', - path => "nodes/:atom:node/alarms/deactivated", - func => list_deactivated, - descr => "List all deactivated alarm on a node"}). +%%%============================================================================================== +%% parameters trans +alarms(get, Request) -> + case proplists:get_value(<<"activated">>, cowboy_req:parse_qs(Request), undefined) of + undefined -> + list(#{activated => undefined}); + <<"true">> -> + list(#{activated => true}); + <<"false">> -> + list(#{activated => false}) + end; --rest_api(#{name => deactivate_alarm, - method => 'POST', - path => "/alarms/deactivated", - func => deactivate, - descr => "Delete the special alarm on a node"}). +alarms(delete, _Request) -> + delete(). --rest_api(#{name => delete_all_deactivated_alarms, - method => 'DELETE', - path => "/alarms/deactivated", - func => delete_deactivated, - descr => "Delete all deactivated alarm in the cluster"}). +%%%============================================================================================== +%% api apply +list(#{activated := true}) -> + do_list(activated); +list(#{activated := false}) -> + do_list(deactivated); +list(#{activated := undefined}) -> + do_list(activated). --rest_api(#{name => delete_node_deactivated_alarms, - method => 'DELETE', - path => "nodes/:atom:node/alarms/deactivated", - func => delete_deactivated, - descr => "Delete all deactivated alarm on a node"}). - --export([ list/2 - , deactivate/2 - , list_activated/2 - , list_deactivated/2 - , delete_deactivated/2 - ]). - -list(Bindings, _Params) when map_size(Bindings) == 0 -> - {ok, #{code => ?SUCCESS, - data => [#{node => Node, alarms => Alarms} || {Node, Alarms} <- emqx_mgmt:get_alarms(all)]}}; - -list(#{node := Node}, _Params) -> - {ok, #{code => ?SUCCESS, - data => emqx_mgmt:get_alarms(Node, all)}}. - -list_activated(Bindings, _Params) when map_size(Bindings) == 0 -> - {ok, #{code => ?SUCCESS, - data => [#{node => Node, alarms => Alarms} || {Node, Alarms} <- emqx_mgmt:get_alarms(activated)]}}; - -list_activated(#{node := Node}, _Params) -> - {ok, #{code => ?SUCCESS, - data => emqx_mgmt:get_alarms(Node, activated)}}. - -list_deactivated(Bindings, _Params) when map_size(Bindings) == 0 -> - {ok, #{code => ?SUCCESS, - data => [#{node => Node, alarms => Alarms} || {Node, Alarms} <- emqx_mgmt:get_alarms(deactivated)]}}; - -list_deactivated(#{node := Node}, _Params) -> - {ok, #{code => ?SUCCESS, - data => emqx_mgmt:get_alarms(Node, deactivated)}}. - -deactivate(_Bindings, Params) -> - Node = get_node(Params), - Name = get_name(Params), - do_deactivate(Node, Name). - -delete_deactivated(Bindings, _Params) when map_size(Bindings) == 0 -> +delete() -> _ = emqx_mgmt:delete_all_deactivated_alarms(), - {ok, #{code => ?SUCCESS}}; + {200}. -delete_deactivated(#{node := Node}, _Params) -> - emqx_mgmt:delete_all_deactivated_alarms(Node), - {ok, #{code => ?SUCCESS}}. +%%%============================================================================================== +%% internal +do_list(Type) -> + {Table, Function} = + case Type of + activated -> + {?ACTIVATED_ALARM, query_activated}; + deactivated -> + {?DEACTIVATED_ALARM, query_deactivated} + end, + Response = emqx_mgmt_api:cluster_query([], {Table, []}, {?MODULE, Function}), + {200, Response}. -get_node(Params) -> - binary_to_atom(proplists:get_value(<<"node">>, Params, undefined), utf8). +query_activated(_, Start, Limit) -> + query(?ACTIVATED_ALARM, Start, Limit). -get_name(Params) -> - binary_to_atom(proplists:get_value(<<"name">>, Params, undefined), utf8). +query_deactivated(_, Start, Limit) -> + query(?DEACTIVATED_ALARM, Start, Limit). -do_deactivate(undefined, _) -> - emqx_mgmt:return({error, missing_param}); -do_deactivate(_, undefined) -> - emqx_mgmt:return({error, missing_param}); -do_deactivate(Node, Name) -> - case emqx_mgmt:deactivate(Node, Name) of - ok -> - emqx_mgmt:return(); - {error, Reason} -> - emqx_mgmt:return({error, Reason}) - end. +query(Table, Start, Limit) -> + Ms = [{'$1',[],['$1']}], + emqx_mgmt_api:select_table(Table, Ms, Start, Limit, fun format_alarm/1). + +format_alarm(Alarms) when is_list(Alarms) -> + [emqx_alarm:format(Alarm) || Alarm <- Alarms]; + +format_alarm(Alarm) -> + emqx_alarm:format(Alarm). diff --git a/apps/emqx_management/src/emqx_mgmt_api_apps.erl b/apps/emqx_management/src/emqx_mgmt_api_apps.erl index 396c05696..2a5f330c4 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_apps.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_apps.erl @@ -16,7 +16,7 @@ -module(emqx_mgmt_api_apps). --behavior(minirest_api). +-behaviour(minirest_api). -export([api_spec/0]). @@ -76,17 +76,17 @@ app_without_secret_schema() -> apps_api() -> Metadata = #{ get => #{ - description => "List EMQ X apps", + description => <<"List EMQ X apps">>, responses => #{ <<"200">> => emqx_mgmt_util:response_array_schema(<<"All apps">>, app_without_secret_schema())}}, post => #{ - description => "EMQ X create apps", + description => <<"EMQ X create apps">>, 'requestBody' => emqx_mgmt_util:request_body_schema(<<"app">>), responses => #{ <<"200">> => - emqx_mgmt_util:response_schema(<<"Create apps">>, <<"app_secret">>), + emqx_mgmt_util:response_schema(<<"Create apps">>, app_secret), <<"400">> => emqx_mgmt_util:response_error_schema(<<"App ID already exist">>, [?BAD_APP_ID])}}}, {"/apps", Metadata, apps}. @@ -94,36 +94,34 @@ apps_api() -> app_api() -> Metadata = #{ get => #{ - description => "EMQ X apps", + description => <<"EMQ X apps">>, parameters => [#{ name => app_id, in => path, required => true, - schema => #{type => string}, - example => <<"admin">>}], + schema => #{type => string}}], responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"App id not found">>), <<"200">> => - emqx_mgmt_util:response_schema("Get App", app_without_secret_schema())}}, + emqx_mgmt_util:response_schema(<<"Get App">>, app_without_secret_schema())}}, delete => #{ - description => "EMQ X apps", + description => <<"EMQ X apps">>, parameters => [#{ name => app_id, in => path, required => true, - schema => #{type => string}, - example => <<"admin">>}], + schema => #{type => string} + }], responses => #{ - <<"200">> => emqx_mgmt_util:response_schema("Remove app ok")}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Remove app ok">>)}}, put => #{ - description => "EMQ X update apps", + description => <<"EMQ X update apps">>, parameters => [#{ name => app_id, in => path, required => true, - schema => #{type => string}, - default => <<"admin">> + schema => #{type => string} }], 'requestBody' => emqx_mgmt_util:request_body_schema(app_without_secret_schema()), responses => #{ @@ -176,23 +174,19 @@ app(put, Request) -> %%%============================================================================================== %% api apply list(_) -> - Data = [format_without_app_secret(Apps) || Apps <- emqx_mgmt_auth:list_apps()], - Response = emqx_json:encode(Data), - {200, Response}. + {200, [format_without_app_secret(Apps) || Apps <- emqx_mgmt_auth:list_apps()]}. create(#{app_id := AppID, name := Name, secret := Secret, desc := Desc, status := Status, expired := Expired}) -> case emqx_mgmt_auth:add_app(AppID, Name, Secret, Desc, Status, Expired) of {ok, AppSecret} -> - Response = emqx_json:encode(#{secret => AppSecret}), - {200, Response}; + {200, #{secret => AppSecret}}; {error, alread_existed} -> Message = list_to_binary(io_lib:format("appid ~p already existed", [AppID])), - {400, #{code => 'BAD_APP_ID', reason => Message}}; + {400, #{code => 'BAD_APP_ID', message => Message}}; {error, Reason} -> - Data = #{code => 'UNKNOW_ERROR', - reason => list_to_binary(io_lib:format("~p", [Reason]))}, - Response = emqx_json:encode(Data), + Response = #{code => 'UNKNOW_ERROR', + message => list_to_binary(io_lib:format("~p", [Reason]))}, {500, Response} end. @@ -201,8 +195,7 @@ lookup(#{app_id := AppID}) -> undefined -> {404, ?APP_ID_NOT_FOUND}; App -> - Data = format_with_app_secret(App), - Response = emqx_json:encode(Data), + Response = format_with_app_secret(App), {200, Response} end. @@ -217,8 +210,7 @@ update(App = #{app_id := AppID, name := Name, desc := Desc, status := Status, ex {error, not_found} -> {404, ?APP_ID_NOT_FOUND}; {error, Reason} -> - Data = #{code => 'UNKNOW_ERROR', reason => list_to_binary(io_lib:format("~p", [Reason]))}, - Response = emqx_json:encode(Data), + Response = #{code => 'UNKNOW_ERROR', message => list_to_binary(io_lib:format("~p", [Reason]))}, {500, Response} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 0fcd7404a..6e7b4f687 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -16,7 +16,7 @@ -module(emqx_mgmt_api_clients). --behavior(minirest_api). +-behaviour(minirest_api). -include_lib("emqx/include/emqx.hrl"). @@ -48,12 +48,12 @@ , {<<"clean_start">>, atom} , {<<"proto_name">>, binary} , {<<"proto_ver">>, integer} - , {<<"_like_clientid">>, binary} - , {<<"_like_username">>, binary} - , {<<"_gte_created_at">>, timestamp} - , {<<"_lte_created_at">>, timestamp} - , {<<"_gte_connected_at">>, timestamp} - , {<<"_lte_connected_at">>, timestamp}]}). + , {<<"like_clientid">>, binary} + , {<<"like_username">>, binary} + , {<<"gte_created_at">>, timestamp} + , {<<"lte_created_at">>, timestamp} + , {<<"gte_connected_at">>, timestamp} + , {<<"lte_connected_at">>, timestamp}]}). -define(query_fun, {?MODULE, query}). -define(format_fun, {?MODULE, format_channel_info}). @@ -214,112 +214,106 @@ schemas() -> clients_api() -> Metadata = #{ get => #{ - description => "List clients", + description => <<"List clients">>, responses => #{ - <<"200">> => emqx_mgmt_util:response_array_schema(<<"List clients 200 OK">>, <<"client">>)}}}, + <<"200">> => emqx_mgmt_util:response_array_schema(<<"List clients 200 OK">>, client)}}}, {"/clients", Metadata, clients}. client_api() -> Metadata = #{ get => #{ - description => "Get clients info by client ID", + description => <<"Get clients info by client ID">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, - required => true, - example => 123456}], + required => true + }], responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, <<"client">>)}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, client)}}, delete => #{ - description => "Kick out client by client ID", + description => <<"Kick out client by client ID">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, - required => true, - example => 123456}], + required => true + }], responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, <<"client">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, client)}}}, {"/clients/:clientid", Metadata, client}. clients_acl_cache_api() -> Metadata = #{ get => #{ - description => "Get client acl cache", + description => <<"Get client acl cache">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, - required => true, - example => 123456}], + required => true + }], responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:response_schema(<<"List clients 200 OK">>, <<"acl_cache">>)}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Get client acl cache">>, acl_cache)}}, delete => #{ - description => "Clean client acl cache", + description => <<"Clean client acl cache">>, parameters => [#{ name => clientid, in => path, schema => #{type => string}, - required => true, - example => 123456}], + required => true + }], responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients 200 OK">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Delete clients acl cache OK">>)}}}, {"/clients/:clientid/acl_cache", Metadata, acl_cache}. subscribe_api() -> Metadata = #{ post => #{ - description => "subscribe", - parameters => [ - #{ - name => clientid, - in => path, - schema => #{type => string}, - required => true, - example => 123456 - } - ], + description => <<"Subscribe">>, + parameters => [#{ + name => clientid, + in => path, + schema => #{type => string}, + required => true + }], 'requestBody' => emqx_mgmt_util:request_body_schema(#{ type => object, properties => #{ - <<"topic">> => #{ + topic => #{ type => string, - example => <<"topic_1">>, description => <<"Topic">>}, - <<"qos">> => #{ + qos => #{ type => integer, enum => [0, 1, 2], example => 0, description => <<"QoS">>}}}), responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:response_schema(<<"subscribe ok">>)}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Subscribe ok">>)}}, delete => #{ - description => "unsubscribe", + description => <<"Unsubscribe">>, parameters => [ #{ name => clientid, in => path, schema => #{type => string}, - required => true, - example => 123456 + required => true }, #{ name => topic, in => query, schema => #{type => string}, - required => true, - example => <<"topic_1">> + required => true } ], responses => #{ <<"404">> => emqx_mgmt_util:response_error_schema(<<"Client id not found">>), - <<"200">> => emqx_mgmt_util:response_schema(<<"unsubscribe ok">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Unsubscribe ok">>)}}}, {"/clients/:clientid/subscribe", Metadata, subscribe}. %%%============================================================================================== @@ -373,17 +367,15 @@ subscribe_batch(post, Request) -> %% api apply list(Params) -> - Data = emqx_mgmt_api:cluster_query(maps:to_list(Params), ?CLIENT_QS_SCHEMA, ?query_fun), - Body = emqx_json:encode(Data), - {200, Body}. + Response = emqx_mgmt_api:cluster_query(maps:to_list(Params), ?CLIENT_QS_SCHEMA, ?query_fun), + {200, Response}. lookup(#{clientid := ClientID}) -> case emqx_mgmt:lookup_client({clientid, ClientID}, ?format_fun) of [] -> {404, ?CLIENT_ID_NOT_FOUND}; ClientInfo -> - Response = emqx_json:encode(hd(ClientInfo)), - {200, Response} + {200, hd(ClientInfo)} end. kickout(#{clientid := ClientID}) -> @@ -395,9 +387,10 @@ get_acl_cache(#{clientid := ClientID})-> {error, not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> - {500, #{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}}; + Message = list_to_binary(io_lib:format("~p", [Reason])), + {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; Caches -> - Response = emqx_json:encode([format_acl_cache(Cache) || Cache <- Caches]), + Response = [format_acl_cache(Cache) || Cache <- Caches], {200, Response} end. @@ -408,7 +401,8 @@ clean_acl_cache(#{clientid := ClientID}) -> {error, not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> - {500, #{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}} + Message = list_to_binary(io_lib:format("~p", [Reason])), + {500, #{code => <<"UNKNOW_ERROR">>, message => Message}} end. subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) -> @@ -416,8 +410,8 @@ subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) -> {error, channel_not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> - Body = emqx_json:encode(#{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}), - {500, Body}; + Message = list_to_binary(io_lib:format("~p", [Reason])), + {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; ok -> {200} end. @@ -427,8 +421,8 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) -> {error, channel_not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> - Body = emqx_json:encode(#{code => <<"UNKNOW_ERROR">>, reason => io_lib:format("~p", [Reason])}), - {500, Body}; + Message = list_to_binary(io_lib:format("~p", [Reason])), + {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; {unsubscribe, [{Topic, #{}}]} -> {200} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index a2bc7309d..e845d2679 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -16,58 +16,319 @@ -module(emqx_mgmt_api_listeners). --rest_api(#{name => list_listeners, - method => 'GET', - path => "/listeners/", - func => list, - descr => "A list of listeners in the cluster"}). +-behaviour(minirest_api). --rest_api(#{name => list_node_listeners, - method => 'GET', - path => "/nodes/:atom:node/listeners", - func => list, - descr => "A list of listeners on the node"}). +-export([api_spec/0]). --rest_api(#{name => restart_listener, - method => 'PUT', - path => "/listeners/:atom:identifier/restart", - func => restart, - descr => "Restart a listener in the cluster"}). +-export([ listeners/2 + , listener/2 + , node_listener/2 + , node_listeners/2 + , manage_listeners/2 + , manage_nodes_listeners/2]). --rest_api(#{name => restart_node_listener, - method => 'PUT', - path => "/nodes/:atom:node/listeners/:atom:identifier/restart", - func => restart, - descr => "Restart a listener on a node"}). +-export([format/1]). --export([list/2, restart/2]). +-include_lib("emqx/include/emqx.hrl"). -%% List listeners on a node. -list(#{node := Node}, _Params) -> - emqx_mgmt:return({ok, format(emqx_mgmt:list_listeners(Node))}); +api_spec() -> + { + [ + listeners_api(), + restart_listeners_api(), + nodes_listeners_api(), + nodes_listener_api(), + manage_listeners_api(), + manage_nodes_listeners_api() + ], + [listener_schema()] + }. + +listener_schema() -> + #{ + listener => #{ + type => object, + properties => #{ + node => #{ + type => string, + description => <<"Node">>, + example => node()}, + identifier => #{ + type => string, + description => <<"Identifier">>}, + acceptors => #{ + type => integer, + description => <<"Number of Acceptor proce">>}, + max_conn => #{ + type => integer, + description => <<"Maximum number of allowed connection">>}, + type => #{ + type => string, + description => <<"Plugin decription">>}, + listen_on => #{ + type => string, + description => <<"Litening port">>}, + running => #{ + type => boolean, + description => <<"Open or close">>}, + auth => #{ + type => boolean, + description => <<"Has auth">>}}}}. + +listeners_api() -> + Metadata = #{ + get => #{ + description => <<"List listeners in cluster">>, + responses => #{ + <<"200">> => + emqx_mgmt_util:response_array_schema(<<"List all listeners">>, listener)}}}, + {"/listeners", Metadata, listeners}. + +restart_listeners_api() -> + Metadata = #{ + get => #{ + description => <<"List listeners by listener ID">>, + parameters => [param_path_identifier()], + responses => #{ + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, ['BAD_LISTENER_ID']), + <<"200">> => + emqx_mgmt_util:response_array_schema(<<"List listener info ok">>, listener)}}}, + {"/listeners/:identifier", Metadata, listener}. + +manage_listeners_api() -> + Metadata = #{ + get => #{ + description => <<"Restart listeners in cluster">>, + parameters => [ + param_path_identifier(), + param_path_operation()], + responses => #{ + <<"500">> => + emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, + ['BAD_LISTENER_ID']), + <<"400">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, + ['BAD_REQUEST']), + <<"200">> => + emqx_mgmt_util:response_schema(<<"Operation success">>)}}}, + {"/listeners/:identifier/:operation", Metadata, manage_listeners}. + +manage_nodes_listeners_api() -> + Metadata = #{ + get => #{ + description => <<"Restart listeners in cluster">>, + parameters => [ + param_path_node(), + param_path_identifier(), + param_path_operation()], + responses => #{ + <<"500">> => + emqx_mgmt_util:response_error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Bad node or Listener id not found">>, + ['BAD_NODE_NAME','BAD_LISTENER_ID']), + <<"400">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>, + ['BAD_REQUEST']), + <<"200">> => + emqx_mgmt_util:response_schema(<<"Operation success">>)}}}, + {"/node/:node/listeners/:identifier/:operation", Metadata, manage_nodes_listeners}. + +nodes_listeners_api() -> + Metadata = #{ + get => #{ + description => <<"Get listener info in one node">>, + parameters => [param_path_node(), param_path_identifier()], + responses => #{ + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Node name or listener id not found">>, + ['BAD_NODE_NAME', 'BAD_LISTENER_ID']), + <<"200">> => + emqx_mgmt_util:response_schema(<<"Get listener info ok">>, listener)}}}, + {"/nodes/:node/listeners/:identifier", Metadata, node_listener}. + +nodes_listener_api() -> + Metadata = #{ + get => #{ + description => <<"List listeners in one node">>, + parameters => [param_path_node()], + responses => #{ + <<"404">> => + emqx_mgmt_util:response_error_schema(<<"Listener id not found">>), + <<"200">> => + emqx_mgmt_util:response_schema(<<"Get listener info ok">>, listener)}}}, + {"/nodes/:node/listeners", Metadata, node_listeners}. +%%%============================================================================================== +%% parameters +param_path_node() -> + #{ + name => node, + in => path, + schema => #{type => string}, + required => true, + example => node() + }. + +param_path_identifier() -> + {Example,_} = hd(emqx_mgmt:list_listeners(node())), + #{ + name => identifier, + in => path, + schema => #{type => string}, + required => true, + example => Example + }. + +param_path_operation()-> + #{ + name => operation, + in => path, + required => true, + schema => #{ + type => string, + enum => [start, stop, restart]}, + example => restart + }. + +%%%============================================================================================== +%% api +listeners(get, _Request) -> + list(). + +listener(get, Request) -> + ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)), + get_listeners(#{identifier => ListenerID}). + +node_listeners(get, Request) -> + Node = binary_to_atom(cowboy_req:binding(node, Request)), + get_listeners(#{node => Node}). + +node_listener(get, Request) -> + Node = binary_to_atom(cowboy_req:binding(node, Request)), + ListenerID = binary_to_atom(cowboy_req:binding(identifier, Request)), + get_listeners(#{node => Node, identifier => ListenerID}). + +manage_listeners(_, Request) -> + Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)), + Operation = binary_to_atom(cowboy_req:binding(operation, Request)), + manage(Operation, #{identifier => Identifier}). + +manage_nodes_listeners(_, Request) -> + Node = binary_to_atom(cowboy_req:binding(node, Request)), + Identifier = binary_to_atom(cowboy_req:binding(identifier, Request)), + Operation = binary_to_atom(cowboy_req:binding(operation, Request)), + manage(Operation, #{identifier => Identifier, node => Node}). + +%%%============================================================================================== %% List listeners in the cluster. -list(_Binding, _Params) -> - emqx_mgmt:return({ok, [#{node => Node, listeners => format(Listeners)} - || {Node, Listeners} <- emqx_mgmt:list_listeners()]}). +list() -> + {200, format(emqx_mgmt:list_listeners())}. -%% Restart listeners on a node. -restart(#{node := Node, identifier := Identifier}, _Params) -> - case emqx_mgmt:restart_listener(Node, Identifier) of - ok -> emqx_mgmt:return({ok, "Listener restarted."}); - {error, Error} -> emqx_mgmt:return({error, Error}) - end; -%% Restart listeners on all nodes in the cluster. -restart(#{identifier := Identifier}, _Params) -> - Results = [{Node, emqx_mgmt:restart_listener(Node, Identifier)} || {Node, _Info} <- emqx_mgmt:list_nodes()], - case lists:filter(fun({_, Result}) -> Result =/= ok end, Results) of - [] -> emqx_mgmt:return(ok); - Errors -> emqx_mgmt:return({error, {restart, Errors}}) +get_listeners(Param) -> + case list_listener(Param) of + {error, not_found} -> + Identifier = maps:get(identifier, Param), + Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; + {error, nodedown} -> + Node = maps:get(node, Param), + Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])), + Response = #{code => 'BAD_NODE_NAME', message => Reason}, + {404, Response}; + [] -> + Identifier = maps:get(identifier, Param), + Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; + Data -> + {200, Data} end. +manage(Operation0, Param) -> + OperationMap = #{start => start_listener, stop => stop_listener, restart => restart_listener}, + Operation = maps:get(Operation0, OperationMap), + case list_listener(Param) of + {error, not_found} -> + Identifier = maps:get(identifier, Param), + Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + {404, #{code => 'BAD_LISTENER_ID', message => Reason}}; + {error, nodedown} -> + Node = maps:get(node, Param), + Reason = list_to_binary(io_lib:format("Node ~p rpc failed", [Node])), + Response = #{code => 'BAD_NODE_NAME', message => Reason}, + {404, Response}; + [] -> + Identifier = maps:get(identifier, Param), + Reason = list_to_binary(io_lib:format("Error listener identifier ~p", [Identifier])), + {404, #{code => 'RESOURCE_NOT_FOUND', message => Reason}}; + ListenersOrSingleListener -> + manage_(Operation, ListenersOrSingleListener) + end. + +manage_(Operation, Listener) when is_map(Listener) -> + manage_(Operation, [Listener]); +manage_(Operation, Listeners) when is_list(Listeners) -> + Results = [emqx_mgmt:manage_listener(Operation, Listener) || Listener <- Listeners], + case lists:filter(fun(Result) -> Result =/= ok end, Results) of + [] -> + {200}; + Errors -> + case lists:filter(fun({error, {already_started, _}}) -> false; (_) -> true end, Results) of + [] -> + Identifier = maps:get(identifier, hd(Listeners)), + Message = list_to_binary(io_lib:format("Already Started: ~s", [Identifier])), + {400, #{code => 'BAD_REQUEST', message => Message}}; + _ -> + case lists:filter(fun({error,not_found}) -> false; (_) -> true end, Results) of + [] -> + Identifier = maps:get(identifier, hd(Listeners)), + Message = list_to_binary(io_lib:format("Already Stoped: ~s", [Identifier])), + {400, #{code => 'BAD_REQUEST', message => Message}}; + _ -> + Reason = list_to_binary(io_lib:format("~p", [Errors])), + {500, #{code => 'UNKNOW_ERROR', message => Reason}} + end + end + end. + +%%%============================================================================================== +%% util function +list_listener(Params) -> + format(list_listener_(Params)). + +list_listener_(#{node := Node, identifier := Identifier}) -> + emqx_mgmt:get_listener(Node, Identifier); +list_listener_(#{identifier := Identifier}) -> + emqx_mgmt:list_listeners_by_id(Identifier); +list_listener_(#{node := Node}) -> + emqx_mgmt:list_listeners(Node); +list_listener_(#{}) -> + emqx_mgmt:list_listeners(). + format(Listeners) when is_list(Listeners) -> - [ Info#{listen_on => list_to_binary(esockd:to_string(ListenOn))} - || Info = #{listen_on := ListenOn} <- Listeners ]; + [format(Listener) || Listener <- Listeners]; -format({error, Reason}) -> [{error, Reason}]. +format({error, Reason}) -> + {error, Reason}; +format({Identifier, Conf}) -> + #{ + identifier => Identifier, + node => maps:get(node, Conf), + acceptors => maps:get(acceptors, Conf), + max_conn => maps:get(max_connections, Conf), + type => maps:get(type, Conf), + listen_on => list_to_binary(esockd:to_string(maps:get(bind, Conf))), + running => trans_running(Conf), + auth => maps:get(enable, maps:get(auth, Conf)) + }. +trans_running(Conf) -> + case maps:get(running, Conf) of + {error, _} -> + false; + Running -> + Running + end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl index ed9de428d..c4a34442d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_metrics.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_metrics.erl @@ -16,7 +16,7 @@ -module(emqx_mgmt_api_metrics). --behavior(minirest_api). +-behaviour(minirest_api). -export([api_spec/0]). @@ -283,13 +283,12 @@ metrics_schema() -> metrics_api() -> Metadata = #{ get => #{ - description => "EMQ X metrics", + description => <<"EMQ X metrics">>, responses => #{ - <<"200">> => emqx_mgmt_util:response_schema(<<"List all metrics">>, <<"metrics">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"List all metrics">>, metrics)}}}, {"/metrics", Metadata, list}. %%%============================================================================================== %% api apply list(get, _) -> - Response = emqx_json:encode(emqx_mgmt:get_metrics()), - {200, Response}. + {200, emqx_mgmt:get_metrics()}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl index bd92abf98..bd00b173f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl @@ -15,7 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_mgmt_api_nodes). --behavior(minirest_api). +-behaviour(minirest_api). -export([api_spec/0]). @@ -103,15 +103,15 @@ node_schema() -> nodes_api() -> Metadata = #{ get => #{ - description => "List EMQ X nodes", + description => <<"List EMQ X nodes">>, responses => #{ - <<"200">> => emqx_mgmt_util:response_array_schema(<<"List EMQ X Nodes">>, <<"node">>)}}}, + <<"200">> => emqx_mgmt_util:response_array_schema(<<"List EMQ X Nodes">>, node)}}}, {"/nodes", Metadata, nodes}. node_api() -> Metadata = #{ get => #{ - description => "Get node info", + description => <<"Get node info">>, parameters => [#{ name => node_name, in => path, @@ -121,13 +121,13 @@ node_api() -> example => node()}], responses => #{ <<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']), - <<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Nodes info by name">>, <<"node">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Nodes info by name">>, node)}}}, {"/nodes/:node_name", Metadata, node}. node_metrics_api() -> Metadata = #{ get => #{ - description => "Get node metrics", + description => <<"Get node metrics">>, parameters => [#{ name => node_name, in => path, @@ -137,13 +137,13 @@ node_metrics_api() -> example => node()}], responses => #{ <<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']), - <<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Metrics">>, <<"metrics">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Metrics">>, metrics)}}}, {"/nodes/:node_name/metrics", Metadata, node_metrics}. node_stats_api() -> Metadata = #{ get => #{ - description => "Get node stats", + description => <<"Get node stats">>, parameters => [#{ name => node_name, in => path, @@ -153,7 +153,7 @@ node_stats_api() -> example => node()}], responses => #{ <<"400">> => emqx_mgmt_util:response_error_schema(<<"Node error">>, ['SOURCE_ERROR']), - <<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Stats">>, <<"stats">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"Get EMQ X Node Stats">>, stats)}}}, {"/nodes/:node_name/stats", Metadata, node_metrics}. %%%============================================================================================== @@ -177,32 +177,30 @@ node_stats(get, Request) -> %% api apply list(#{}) -> NodesInfo = [format(Node, NodeInfo) || {Node, NodeInfo} <- emqx_mgmt:list_nodes()], - Response = emqx_json:encode(NodesInfo), - {200, Response}. + {200, NodesInfo}. get_node(#{node := Node}) -> case emqx_mgmt:lookup_node(Node) of #{node_status := 'ERROR'} -> - {400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})}; + {400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}}; NodeInfo -> - Response = emqx_json:encode(format(Node, NodeInfo)), - {200, Response} + {200, format(Node, NodeInfo)} end. get_metrics(#{node := Node}) -> case emqx_mgmt:get_metrics(Node) of {error, _} -> - {400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})}; + {400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}}; Metrics -> - {200, emqx_json:encode(Metrics)} + {200, Metrics} end. get_stats(#{node := Node}) -> case emqx_mgmt:get_stats(Node) of {error, _} -> - {400, emqx_json:encode(#{code => 'SOURCE_ERROR', reason => <<"rpc_failed">>})}; + {400, #{code => 'SOURCE_ERROR', message => <<"rpc_failed">>}}; Stats -> - {200, emqx_json:encode(Stats)} + {200, Stats} end. %%============================================================================================================ diff --git a/apps/emqx_management/src/emqx_mgmt_api_publish.erl b/apps/emqx_management/src/emqx_mgmt_api_publish.erl index 30fc9beab..29e162b11 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_publish.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_publish.erl @@ -17,7 +17,7 @@ %% API -include_lib("emqx/include/emqx.hrl"). --behavior(minirest_api). +-behaviour(minirest_api). -export([api_spec/0]). @@ -33,7 +33,7 @@ api_spec() -> publish_api() -> MeteData = #{ post => #{ - description => "publish", + description => <<"Publish">>, 'requestBody' => #{ content => #{ 'application/json' => #{ @@ -41,13 +41,13 @@ publish_api() -> type => object, properties => maps:with([id], message_properties())}}}}, responses => #{ - <<"200">> => emqx_mgmt_util:response_schema(<<"publish ok">>, <<"message">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"publish ok">>, message)}}}, {"/publish", MeteData, publish}. publish_batch_api() -> MeteData = #{ post => #{ - description => "publish", + description => <<"publish">>, 'requestBody' => #{ content => #{ 'application/json' => #{ @@ -57,8 +57,8 @@ publish_batch_api() -> type => object, properties => maps:with([id], message_properties())}}}}}, responses => #{ - <<"200">> => emqx_mgmt_util:response_array_schema(<<"publish ok">>, <<"message">>)}}}, - {"/publish_batch", MeteData, publish_batch}. + <<"200">> => emqx_mgmt_util:response_array_schema(<<"publish ok">>, message)}}}, + {"/publish/bulk", MeteData, publish_batch}. message_schema() -> #{ @@ -110,14 +110,13 @@ publish(post, Request) -> {ok, Body, _} = cowboy_req:read_body(Request), Message = message(emqx_json:decode(Body, [return_maps])), _ = emqx_mgmt:publish(Message), - {200, emqx_json:encode(format_message(Message))}. + {200, format_message(Message)}. publish_batch(post, Request) -> {ok, Body, _} = cowboy_req:read_body(Request), Messages = messages(emqx_json:decode(Body, [return_maps])), _ = [emqx_mgmt:publish(Message) || Message <- Messages], - ResponseBody = emqx_json:encode(format_message(Messages)), - {200, ResponseBody}. + {200, format_message(Messages)}. message(Map) -> From = maps:get(<<"from">>, Map, http_api), diff --git a/apps/emqx_management/src/emqx_mgmt_api_routes.erl b/apps/emqx_management/src/emqx_mgmt_api_routes.erl index 2b514eb51..258680546 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_routes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_routes.erl @@ -19,7 +19,7 @@ -include_lib("emqx/include/emqx.hrl"). %% API --behavior(minirest_api). +-behaviour(minirest_api). -export([api_spec/0]). @@ -48,31 +48,29 @@ route_schema() -> routes_api() -> Metadata = #{ get => #{ - description => "EMQ X routes", + description => <<"EMQ X routes">>, parameters => [ #{ name => page, in => query, description => <<"Page">>, - schema => #{type => integer}, - default => 1 + schema => #{type => integer, default => 1} }, #{ name => limit, in => query, description => <<"Page size">>, - schema => #{type => integer}, - default => emqx_mgmt:max_row_limit() + schema => #{type => integer, default => emqx_mgmt:max_row_limit()} }], responses => #{ <<"200">> => - emqx_mgmt_util:response_array_schema("List route info", <<"route">>)}}}, + emqx_mgmt_util:response_array_schema("List route info", route)}}}, {"/routes", Metadata, routes}. route_api() -> Metadata = #{ get => #{ - description => "EMQ X routes", + description => <<"EMQ X routes">>, parameters => [#{ name => topic, in => path, @@ -82,7 +80,7 @@ route_api() -> }], responses => #{ <<"200">> => - emqx_mgmt_util:response_schema(<<"Route info">>, <<"route">>), + emqx_mgmt_util:response_schema(<<"Route info">>, route), <<"404">> => emqx_mgmt_util:response_error_schema(<<"Topic not found">>, [?TOPIC_NOT_FOUND]) }}}, @@ -101,20 +99,15 @@ route(get, Request) -> %%%============================================================================================== %% api apply list(Params) -> - Data = emqx_mgmt_api:paginate(emqx_route, Params, fun format/1), - Response = emqx_json:encode(Data), + Response = emqx_mgmt_api:paginate(emqx_route, Params, fun format/1), {200, Response}. lookup(#{topic := Topic}) -> case emqx_mgmt:lookup_routes(Topic) of [] -> - NotFound = #{code => ?TOPIC_NOT_FOUND, reason => <<"Topic not found">>}, - Response = emqx_json:encode(NotFound), - {404, Response}; + {404, #{code => ?TOPIC_NOT_FOUND, message => <<"Topic not found">>}}; [Route] -> - Data = format(Route), - Response = emqx_json:encode(Data), - {200, Response} + {200, format(Route)} end. %%%============================================================================================== diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index f06968f60..fa93fabfe 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -15,7 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_mgmt_api_stats). --behavior(minirest_api). +-behaviour(minirest_api). -export([api_spec/0]). @@ -96,13 +96,12 @@ stats_schema() -> stats_api() -> Metadata = #{ get => #{ - description => "EMQ X stats", + description => <<"EMQ X stats">>, responses => #{ - <<"200">> => emqx_mgmt_util:response_schema(<<"List stats ok">>, <<"stats">>)}}}, + <<"200">> => emqx_mgmt_util:response_schema(<<"List stats ok">>, stats)}}}, {"/stats", Metadata, list}. %%%============================================================================================== %% api apply list(get, _Request) -> - Response = emqx_json:encode(emqx_mgmt:get_stats()), - {200, Response}. + {200, emqx_mgmt:get_stats()}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_status.erl b/apps/emqx_management/src/emqx_mgmt_api_status.erl index f7f013f20..fa46b1d25 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_status.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_status.erl @@ -15,7 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_mgmt_api_status). %% API --behavior(minirest_api). +-behaviour(minirest_api). -export([api_spec/0]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 3f563427b..059327f4c 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -16,79 +16,107 @@ -module(emqx_mgmt_api_subscriptions). +-behaviour(minirest_api). + -include_lib("emqx/include/emqx.hrl"). --define(SUBS_QS_SCHEMA, {emqx_suboption, - [{<<"clientid">>, binary}, - {<<"topic">>, binary}, - {<<"share">>, binary}, - {<<"qos">>, integer}, - {<<"_match_topic">>, binary}]}). +-export([api_spec/0]). --rest_api(#{name => list_subscriptions, - method => 'GET', - path => "/subscriptions/", - func => list, - descr => "A list of subscriptions in the cluster"}). - --rest_api(#{name => list_node_subscriptions, - method => 'GET', - path => "/nodes/:atom:node/subscriptions/", - func => list, - descr => "A list of subscriptions on a node"}). - --rest_api(#{name => lookup_client_subscriptions, - method => 'GET', - path => "/subscriptions/:bin:clientid", - func => lookup, - descr => "A list of subscriptions of a client"}). - --rest_api(#{name => lookup_client_subscriptions_with_node, - method => 'GET', - path => "/nodes/:atom:node/subscriptions/:bin:clientid", - func => lookup, - descr => "A list of subscriptions of a client on the node"}). - --export([ list/2 - , lookup/2 - ]). +-export([subscriptions/2]). -export([ query/3 , format/1 ]). +-define(SUBS_QS_SCHEMA, {emqx_suboption, + [ {<<"clientid">>, binary} + , {<<"topic">>, binary} + , {<<"share">>, binary} + , {<<"qos">>, integer} + , {<<"match_topic">>, binary}]}). + -define(query_fun, {?MODULE, query}). -define(format_fun, {?MODULE, format}). -list(Bindings, Params) when map_size(Bindings) == 0 -> - case proplists:get_value(<<"topic">>, Params) of - undefined -> - emqx_mgmt:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}); - Topic -> - emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)}) - end; +api_spec() -> + { + [subscriptions_api()], + [subscription_schema()] + }. -list(#{node := Node} = Bindings, Params) -> - case proplists:get_value(<<"topic">>, Params) of - undefined -> - case Node =:= node() of - true -> - emqx_mgmt:return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)}); - false -> - case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of - {badrpc, Reason} -> emqx_mgmt:return({error, Reason}); - Res -> Res - end - end; - Topic -> - emqx_mgmt:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)}) - end. +subscriptions_api() -> + MetaData = #{ + get => #{ + description => <<"List subscriptions">>, + parameters => [ + #{ + name => page, + in => query, + description => <<"Page">>, + schema => #{type => integer} + }, + #{ + name => limit, + in => query, + description => <<"Page size">>, + schema => #{type => integer} + }, + #{ + name => clientid, + in => query, + description => <<"Client ID">>, + schema => #{type => string} + }, + #{ + name => qos, + in => query, + description => <<"QoS">>, + schema => #{type => integer} + }, + #{ + name => share, + in => query, + description => <<"Shared subscription">>, + schema => #{type => boolean} + }, + #{ + name => topic, + in => query, + description => <<"Topic">>, + schema => #{type => string} + } + #{ + name => match_topic, + in => query, + description => <<"Match topic string">>, + schema => #{type => string} + } + ], + responses => #{ + <<"200">> => emqx_mgmt_util:response_page_schema(subscription)}}}, + {"/subscriptions", MetaData, subscriptions}. -lookup(#{node := Node, clientid := ClientId}, _Params) -> - emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))}); +subscription_schema() -> + #{ + subscription => #{ + type => object, + properties => #{ + topic => #{ + type => string}, + clientid => #{ + type => string}, + qos => #{ + type => integer, + enum => [0,1,2]}}} + }. + +subscriptions(get, Request) -> + Params = cowboy_req:parse_qs(Request), + list(Params). + +list(Params) -> + {200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}. -lookup(#{clientid := ClientId}, _Params) -> - emqx_mgmt:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}). format(Items) when is_list(Items) -> [format(Item) || Item <- Items]; @@ -98,10 +126,10 @@ format({{Subscriber, Topic}, Options}) -> format({_Subscriber, Topic, Options = #{share := Group}}) -> QoS = maps:get(qos, Options), - #{node => node(), topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; + #{topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; format({_Subscriber, Topic, Options}) -> QoS = maps:get(qos, Options), - #{node => node(), topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. + #{topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. %%-------------------------------------------------------------------- %% Query Function diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 8d759eb18..9e07dbe2f 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -590,18 +590,21 @@ print({client, {ClientId, ChanPid}}) -> InfoKeys = [clientid, username, peername, clean_start, keepalive, expiry_interval, subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped, - connected, created_at, connected_at] ++ case maps:is_key(disconnected_at, Info) of - true -> [disconnected_at]; - false -> [] - end, + connected, created_at, connected_at] ++ + case maps:is_key(disconnected_at, Info) of + true -> [disconnected_at]; + false -> [] + end, + Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000}, emqx_ctl:print("Client(~s, username=~s, peername=~s, " "clean_start=~s, keepalive=~w, session_expiry_interval=~w, " "subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, " - "connected=~s, created_at=~w, connected_at=~w" ++ case maps:is_key(disconnected_at, Info) of - true -> ", disconnected_at=~w)~n"; - false -> ")~n" - end, - [format(K, maps:get(K, Info)) || K <- InfoKeys]); + "connected=~s, created_at=~w, connected_at=~w" ++ + case maps:is_key(disconnected_at, Info1) of + true -> ", disconnected_at=~w)~n"; + false -> ")~n" + end, + [format(K, maps:get(K, Info1)) || K <- InfoKeys]); print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~s -> ~s~n", [Topic, Node]); diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index ea6570d79..7d5f85cf2 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -31,7 +31,8 @@ , response_array_schema/2 , response_error_schema/1 , response_error_schema/2 - , batch_response_schema/1]). + , response_page_schema/1 + , response_batch_schema/1]). -export([urldecode/1]). @@ -92,16 +93,22 @@ urldecode(S) -> request_body_array_schema(Schema) when is_map(Schema) -> json_content_schema("", #{type => array, items => Schema}); +request_body_array_schema(Ref) when is_atom(Ref) -> + request_body_array_schema(atom_to_binary(Ref, utf8)); request_body_array_schema(Ref) when is_binary(Ref) -> json_content_schema("", #{type => array, items => minirest:ref(Ref)}). request_body_schema(Schema) when is_map(Schema) -> json_content_schema("", Schema); +request_body_schema(Ref) when is_atom(Ref) -> + request_body_schema(atom_to_binary(Ref)); request_body_schema(Ref) when is_binary(Ref) -> json_content_schema("", minirest:ref(Ref)). response_array_schema(Description, Schema) when is_map(Schema) -> json_content_schema(Description, #{type => array, items => Schema}); +response_array_schema(Description, Ref) when is_atom(Ref) -> + response_array_schema(Description, atom_to_binary(Ref, utf8)); response_array_schema(Description, Ref) when is_binary(Ref) -> json_content_schema(Description, #{type => array, items => minirest:ref(Ref)}). @@ -110,6 +117,8 @@ response_schema(Description) -> response_schema(Description, Schema) when is_map(Schema) -> json_content_schema(Description, Schema); +response_schema(Description, Ref) when is_atom(Ref) -> + response_schema(Description, atom_to_binary(Ref, utf8)); response_schema(Description, Ref) when is_binary(Ref) -> json_content_schema(Description, minirest:ref(Ref)). @@ -124,11 +133,33 @@ response_error_schema(Description, Enum) -> code => #{ type => string, enum => Enum}, - reason => #{ + message => #{ type => string}}}, json_content_schema(Description, Schema). -batch_response_schema(DefName) when is_binary(DefName) -> +response_page_schema(Def) when is_atom(Def) -> + response_page_schema(atom_to_binary(Def, utf8)); +response_page_schema(Def) when is_binary(Def) -> + Schema = #{ + type => object, + properties => #{ + meta => #{ + type => object, + properties => #{ + page => #{ + type => integer}, + limit => #{ + type => integer}, + count => #{ + type => integer}}}, + data => #{ + type => array, + items => minirest:ref(Def)}}}, + json_content_schema("", Schema). + +response_batch_schema(DefName) when is_atom(DefName) -> + response_batch_schema(atom_to_binary(DefName, utf8)); +response_batch_schema(DefName) when is_binary(DefName) -> Schema = #{ type => object, properties => #{ diff --git a/apps/emqx_management/test/emqx_mgmt_alarms_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_alarms_api_SUITE.erl new file mode 100644 index 000000000..2929e961d --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_alarms_api_SUITE.erl @@ -0,0 +1,73 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_alarms_api_SUITE). + + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(ACT_ALARM, test_act_alarm). +-define(DE_ACT_ALARM, test_de_act_alarm). + +all() -> + [t_alarms_api, t_delete_alarms_api]. + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_alarms_api(_) -> + ok = emqx_alarm:activate(?ACT_ALARM), + ok = emqx_alarm:activate(?DE_ACT_ALARM), + ok = emqx_alarm:deactivate(?DE_ACT_ALARM), + get_alarms(1, true), + get_alarms(1, false). + +t_delete_alarms_api(_) -> + Path = emqx_mgmt_api_test_util:api_path(["alarms"]), + {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Path), + get_alarms(1, true), + get_alarms(0, false). + +get_alarms(AssertCount, Activated) when is_atom(Activated) -> + get_alarms(AssertCount, atom_to_list(Activated)); +get_alarms(AssertCount, Activated) -> + Path = emqx_mgmt_api_test_util:api_path(["alarms"]), + Qs = "activated=" ++ Activated, + Headers = emqx_mgmt_api_test_util:auth_header_(), + {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path, Qs, Headers), + Data = emqx_json:decode(Response, [return_maps]), + Meta = maps:get(<<"meta">>, Data), + Page = maps:get(<<"page">>, Meta), + Limit = maps:get(<<"limit">>, Meta), + Count = maps:get(<<"count">>, Meta), + ?assertEqual(Page, 1), + ?assertEqual(Limit, emqx_mgmt:max_row_limit()), + ?assert(Count >= AssertCount). diff --git a/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl index ac63db0fb..0ddc2c56c 100644 --- a/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_clients_api_SUITE.erl @@ -83,6 +83,7 @@ t_clients(_) -> %% delete /clients/:clientid kickout Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]), {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path), + timer:sleep(300), AfterKickoutResponse = emqx_mgmt_api_test_util:request_api(get, Client2Path), ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse), @@ -95,6 +96,7 @@ t_clients(_) -> SubscribeBody = #{topic => Topic, qos => Qos}, SubscribePath = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId1), "subscribe"]), {ok, _} = emqx_mgmt_api_test_util:request_api(post, SubscribePath, "", AuthHeader, SubscribeBody), + timer:sleep(100), [{{_, AfterSubTopic}, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1), ?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubQos, Qos), @@ -102,4 +104,5 @@ t_clients(_) -> %% delete /clients/:clientid/subscribe UnSubscribeQuery = "topic=" ++ binary_to_list(Topic), {ok, _} = emqx_mgmt_api_test_util:request_api(delete, SubscribePath, UnSubscribeQuery, AuthHeader), + timer:sleep(100), ?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)). diff --git a/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl new file mode 100644 index 000000000..e3d50f57c --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_listeners_api_SUITE.erl @@ -0,0 +1,120 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_listeners_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_list_listeners(_) -> + Path = emqx_mgmt_api_test_util:api_path(["listeners"]), + get_api(Path). + +t_list_node_listeners(_) -> + Path = emqx_mgmt_api_test_util:api_path(["nodes", atom_to_binary(node(), utf8), "listeners"]), + get_api(Path). + +t_get_listeners(_) -> + LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), + Identifier = maps:get(identifier, LocalListener), + Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]), + get_api(Path). + +t_get_node_listeners(_) -> + LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), + Identifier = maps:get(identifier, LocalListener), + Path = emqx_mgmt_api_test_util:api_path( + ["nodes", atom_to_binary(node(), utf8), "listeners", atom_to_list(Identifier)]), + get_api(Path). + +t_stop_listener(_) -> + LocalListener = emqx_mgmt_api_listeners:format(hd(emqx_mgmt:list_listeners())), + Identifier = maps:get(identifier, LocalListener), + Path = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier), "stop"]), + {ok, _} = emqx_mgmt_api_test_util:request_api(get, Path), + GetPath = emqx_mgmt_api_test_util:api_path(["listeners", atom_to_list(Identifier)]), + {ok, ListenersResponse} = emqx_mgmt_api_test_util:request_api(get, GetPath), + Listeners = emqx_json:decode(ListenersResponse, [return_maps]), + [listener_stats(Listener, false) || Listener <- Listeners]. + +get_api(Path) -> + {ok, ListenersData} = emqx_mgmt_api_test_util:request_api(get, Path), + LocalListeners = emqx_mgmt_api_listeners:format(emqx_mgmt:list_listeners()), + case emqx_json:decode(ListenersData, [return_maps]) of + [Listener] -> + Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8), + Filter = + fun(Local) -> + maps:get(identifier, Local) =:= Identifier + end, + LocalListener = hd(lists:filter(Filter, LocalListeners)), + comparison_listener(LocalListener, Listener); + Listeners when is_list(Listeners) -> + ?assertEqual(erlang:length(LocalListeners), erlang:length(Listeners)), + Fun = + fun(LocalListener) -> + Identifier = maps:get(identifier, LocalListener), + IdentifierBinary = atom_to_binary(Identifier, utf8), + Filter = + fun(Listener) -> + maps:get(<<"identifier">>, Listener) =:= IdentifierBinary + end, + Listener = hd(lists:filter(Filter, Listeners)), + comparison_listener(LocalListener, Listener) + end, + lists:foreach(Fun, LocalListeners); + Listener when is_map(Listener) -> + Identifier = binary_to_atom(maps:get(<<"identifier">>, Listener), utf8), + Filter = + fun(Local) -> + maps:get(identifier, Local) =:= Identifier + end, + LocalListener = hd(lists:filter(Filter, LocalListeners)), + comparison_listener(LocalListener, Listener) + end. + +comparison_listener(Local, Response) -> + ?assertEqual(maps:get(identifier, Local), binary_to_atom(maps:get(<<"identifier">>, Response))), + ?assertEqual(maps:get(node, Local), binary_to_atom(maps:get(<<"node">>, Response))), + ?assertEqual(maps:get(acceptors, Local), maps:get(<<"acceptors">>, Response)), + ?assertEqual(maps:get(max_conn, Local), maps:get(<<"max_conn">>, Response)), + ?assertEqual(maps:get(listen_on, Local), maps:get(<<"listen_on">>, Response)), + ?assertEqual(maps:get(running, Local), maps:get(<<"running">>, Response)), + ?assertEqual(maps:get(auth, Local), maps:get(<<"auth">>, Response)). + + +listener_stats(Listener, Stats) -> + ?assertEqual(maps:get(<<"running">>, Listener), Stats). diff --git a/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl index 9ecb1a11b..2f566e5f4 100644 --- a/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_publish_api_SUITE.erl @@ -60,13 +60,13 @@ t_publish_api(_) -> ?assertEqual(receive_assert(?TOPIC1, 0, Payload), ok), emqtt:disconnect(Client). -t_publish_batch_api(_) -> +t_publish_bulk_api(_) -> {ok, Client} = emqtt:start_link(#{username => <<"api_username">>, clientid => <<"api_clientid">>}), {ok, _} = emqtt:connect(Client), {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC1), {ok, _, [0]} = emqtt:subscribe(Client, ?TOPIC2), Payload = <<"hello">>, - Path = emqx_mgmt_api_test_util:api_path(["publish_batch"]), + Path = emqx_mgmt_api_test_util:api_path(["publish", "bulk"]), Auth = emqx_mgmt_api_test_util:auth_header_(), Body =[#{topic => ?TOPIC1, payload => Payload}, #{topic => ?TOPIC2, payload => Payload}], {ok, Response} = emqx_mgmt_api_test_util:request_api(post, Path, "", Auth, Body), diff --git a/apps/emqx_management/test/emqx_mgmt_subscription_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_subscription_api_SUITE.erl new file mode 100644 index 000000000..6d56f21c4 --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_subscription_api_SUITE.erl @@ -0,0 +1,74 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_subscription_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(CLIENTID, <<"api_clientid">>). +-define(USERNAME, <<"api_username">>). + +%% notice: integer topic for sort response +-define(TOPIC1, <<"0000">>). +-define(TOPIC2, <<"0001">>). + + +all() -> + emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + ekka_mnesia:start(), + emqx_mgmt_auth:mnesia(boot), + emqx_ct_helpers:start_apps([emqx_management], fun set_special_configs/1), + Config. + + +end_per_suite(_) -> + emqx_ct_helpers:stop_apps([emqx_management]). + +set_special_configs(emqx_management) -> + emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], + applications =>[#{id => "admin", secret => "public"}]}), + ok; +set_special_configs(_App) -> + ok. + +t_subscription_api(_) -> + {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID}), + {ok, _} = emqtt:connect(Client), + {ok, _, _} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), + Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), + {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), + Data = emqx_json:decode(Response, [return_maps]), + Meta = maps:get(<<"meta">>, Data), + ?assertEqual(1, maps:get(<<"page">>, Meta)), + ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta)), + ?assertEqual(2, maps:get(<<"count">>, Meta)), + Subscriptions = maps:get(<<"data">>, Data), + ?assertEqual(length(Subscriptions), 2), + Sort = + fun(#{<<"topic">> := T1}, #{<<"topic">> := T2}) -> + binary_to_integer(T1) =< binary_to_integer(T2) + end, + [Subscriptions1, Subscriptions2] = lists:sort(Sort, Subscriptions), + ?assertEqual(maps:get(<<"topic">>, Subscriptions1), ?TOPIC1), + ?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2), + ?assertEqual(maps:get(<<"clientid">>, Subscriptions1), ?CLIENTID), + ?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID), + emqtt:disconnect(Client). diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf index ff36265aa..c90515b74 100644 --- a/apps/emqx_modules/etc/emqx_modules.conf +++ b/apps/emqx_modules/etc/emqx_modules.conf @@ -31,7 +31,7 @@ emqx_modules: { }, { type: telemetry - enable: true + enable: false } ] } diff --git a/apps/emqx_modules/src/emqx_mod_presence.erl b/apps/emqx_modules/src/emqx_mod_presence.erl index 738d1c892..729316663 100644 --- a/apps/emqx_modules/src/emqx_mod_presence.erl +++ b/apps/emqx_modules/src/emqx_mod_presence.erl @@ -102,7 +102,7 @@ connected_presence(#{peerhost := PeerHost, keepalive => Keepalive, connack => 0, %% Deprecated? clean_start => CleanStart, - expiry_interval => ExpiryInterval, + expiry_interval => ExpiryInterval div 1000, connected_at => ConnectedAt, ts => erlang:system_time(millisecond) }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 824cfdcb1..5865f224c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -182,7 +182,7 @@ eventmsg_connected(_ClientInfo = #{ keepalive => Keepalive, clean_start => CleanStart, receive_maximum => RcvMax, - expiry_interval => ExpiryInterval, + expiry_interval => ExpiryInterval div 1000, is_bridge => IsBridge, conn_props => printable_maps(ConnProps), connected_at => ConnectedAt diff --git a/apps/emqx_statsd/etc/emqx_statsd.conf b/apps/emqx_statsd/etc/emqx_statsd.conf index a2daa5521..994986898 100644 --- a/apps/emqx_statsd/etc/emqx_statsd.conf +++ b/apps/emqx_statsd/etc/emqx_statsd.conf @@ -3,11 +3,8 @@ ##-------------------------------------------------------------------- emqx_statsd:{ - host: "127.0.0.1" - port: 8125 - batch_size: 10 - prefix: "emqx" - tags: {"from": "emqx"} + enable: true + server: "127.0.0.1:8125" sample_time_interval: "10s" flush_time_interval: "10s" } diff --git a/apps/emqx_statsd/include/emqx_statsd.hrl b/apps/emqx_statsd/include/emqx_statsd.hrl index d88dbccbd..52f8774c0 100644 --- a/apps/emqx_statsd/include/emqx_statsd.hrl +++ b/apps/emqx_statsd/include/emqx_statsd.hrl @@ -1,9 +1,5 @@ -define(APP, emqx_statsd). - --define(DEFAULT_HOST, {127, 0, 0, 1}). +-define(DEFAULT_SAMPLE_TIME_INTERVAL, 10000). +-define(DEFAULT_FLUSH_TIME_INTERVAL, 10000). +-define(DEFAULT_HOST, "127.0.0.1"). -define(DEFAULT_PORT, 8125). --define(DEFAULT_PREFIX, undefined). --define(DEFAULT_TAGS, #{}). --define(DEFAULT_BATCH_SIZE, 10). --define(DEFAULT_SAMPLE_TIME_INTERVAL, 10). --define(DEFAULT_FLUSH_TIME_INTERVAL, 10). \ No newline at end of file diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 04338fd62..2885f4dc9 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,6 +1,6 @@ {application, emqx_statsd, [{description, "An OTP application"}, - {vsn, "0.1.0"}, + {vsn, "5.0.0"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index 3a73de664..892731a6c 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -1,100 +1,119 @@ %%-------------------------------------------------------------------- - %% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. - %% - %% Licensed under the Apache License, Version 2.0 (the "License"); - %% you may not use this file except in compliance with the License. - %% You may obtain a copy of the License at - %% - %% http://www.apache.org/licenses/LICENSE-2.0 - %% - %% Unless required by applicable law or agreed to in writing, software - %% distributed under the License is distributed on an "AS IS" BASIS, - %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - %% See the License for the specific language governing permissions and - %% limitations under the License. - %%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- - -module(emqx_statsd). +-module(emqx_statsd). - -behaviour(gen_server). +-behaviour(gen_server). - -ifdef(TEST). - -compile(export_all). - -compile(nowarn_export_all). - -endif. +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. - -include_lib("emqx/include/logger.hrl"). +-include("emqx_statsd.hrl"). - %% Interface - -export([start_link/1]). +%% Interface +-export([start_link/1]). - %% Internal Exports - -export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , code_change/3 - , terminate/2 - ]). +%% Internal Exports +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , code_change/3 + , terminate/2 + ]). - -record(state, { - timer :: reference(), - sample_time_interval :: pos_integer(), - flush_time_interval :: pos_integer(), - estatsd_pid :: pid() - }). +-record(state, { + timer :: reference() | undefined, + sample_time_interval :: pos_integer(), + flush_time_interval :: pos_integer(), + estatsd_pid :: pid() +}). - start_link(Opts) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link(Opts) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). - init([Opts]) -> - SampleTimeInterval = proplists:get_value(sample_time_interval, Opts), - FlushTimeInterval = proplists:get_value(flush_time_interval, Opts), - Ref = erlang:start_timer(SampleTimeInterval, self(), sample_timeout), - Pid = proplists:get_value(estatsd_pid, Opts), - {ok, #state{timer = Ref, - sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid}}. +init([Opts]) -> + process_flag(trap_exit, true), + Tags = tags(maps:get(tags, Opts, #{})), + {Host, Port} = maps:get(server, Opts, {?DEFAULT_HOST, ?DEFAULT_PORT}), + Opts1 = maps:without([sample_time_interval, + flush_time_interval], Opts#{tags => Tags, + host => Host, + port => Port, + prefix => <<"emqx">>}), + {ok, Pid} = estatsd:start_link(maps:to_list(Opts1)), + SampleTimeInterval = maps:get(sample_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), + FlushTimeInterval = maps:get(flush_time_interval, Opts, ?DEFAULT_FLUSH_TIME_INTERVAL), + {ok, ensure_timer(#state{sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid})}. - handle_call(_Req, _From, State) -> - {noreply, State}. +handle_call(_Req, _From, State) -> + {noreply, State}. - handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(_Msg, State) -> + {noreply, State}. - handle_info({timeout, Ref, sample_timeout}, State = #state{sample_time_interval = SampleTimeInterval, - flush_time_interval = FlushTimeInterval, - estatsd_pid = Pid, - timer = Ref}) -> - Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), - SampleRate = SampleTimeInterval / FlushTimeInterval, - StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], - estatsd:submit(Pid, StatsdMetrics), - {noreply, State#state{timer = erlang:start_timer(SampleTimeInterval, self(), sample_timeout)}}; +handle_info({timeout, Ref, sample_timeout}, + State = #state{sample_time_interval = SampleTimeInterval, + flush_time_interval = FlushTimeInterval, + estatsd_pid = Pid, + timer = Ref}) -> + Metrics = emqx_metrics:all() ++ emqx_stats:getstats() ++ emqx_vm_data(), + SampleRate = SampleTimeInterval / FlushTimeInterval, + StatsdMetrics = [{gauge, trans_metrics_name(Name), Value, SampleRate, []} || {Name, Value} <- Metrics], + estatsd:submit(Pid, StatsdMetrics), + {noreply, ensure_timer(State)}; - handle_info(_Msg, State) -> - {noreply, State}. +handle_info({'EXIT', Pid, Error}, State = #state{estatsd_pid = Pid}) -> + {stop, {shutdown, Error}, State}; - code_change(_OldVsn, State, _Extra) -> - {ok, State}. +handle_info(_Msg, State) -> + {noreply, State}. - terminate(_Reason, _State) -> - ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. - %%------------------------------------------------------------------------------ - %% Internale function - %%------------------------------------------------------------------------------ - trans_metrics_name(Name) -> - Name0 = atom_to_binary(Name, utf8), - binary_to_atom(<<"emqx.", Name0/binary>>, utf8). +terminate(_Reason, #state{estatsd_pid = Pid}) -> + estatsd:stop(Pid), + ok. - emqx_vm_data() -> - Idle = case cpu_sup:util([detailed]) of - {_, 0, 0, _} -> 0; %% Not support for Windows - {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) - end, - RunQueue = erlang:statistics(run_queue), - [{run_queue, RunQueue}, - {cpu_idle, Idle}, - {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). +%%------------------------------------------------------------------------------ +%% Internale function +%%------------------------------------------------------------------------------ +trans_metrics_name(Name) -> + Name0 = atom_to_binary(Name, utf8), + binary_to_atom(<<"emqx.", Name0/binary>>, utf8). + +emqx_vm_data() -> + Idle = case cpu_sup:util([detailed]) of + {_, 0, 0, _} -> 0; %% Not support for Windows + {_Num, _Use, IdleList, _} -> proplists:get_value(idle, IdleList, 0) + end, + RunQueue = erlang:statistics(run_queue), + [{run_queue, RunQueue}, + {cpu_idle, Idle}, + {cpu_use, 100 - Idle}] ++ emqx_vm:mem_info(). + +tags(Map) -> + Tags = maps:to_list(Map), + [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. + + +ensure_timer(State =#state{sample_time_interval = SampleTimeInterval}) -> + State#state{timer = emqx_misc:start_timer(SampleTimeInterval, sample_timeout)}. diff --git a/apps/emqx_statsd/src/emqx_statsd_api.erl b/apps/emqx_statsd/src/emqx_statsd_api.erl new file mode 100644 index 000000000..13391646a --- /dev/null +++ b/apps/emqx_statsd/src/emqx_statsd_api.erl @@ -0,0 +1,108 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_statsd_api). + +-behaviour(minirest_api). + +-include("emqx_statsd.hrl"). + +-import(emqx_mgmt_util, [ response_schema/1 + , response_schema/2 + , request_body_schema/1 + , request_body_schema/2 + ]). + +-export([api_spec/0]). + +-export([ statsd/2 + ]). + +api_spec() -> + {statsd_api(), schemas()}. + +schemas() -> + [#{statsd => #{ + type => object, + properties => #{ + server => #{ + type => string, + description => <<"Statsd Server">>, + example => get_raw(<<"server">>, <<"127.0.0.1:8125">>)}, + enable => #{ + type => boolean, + description => <<"Statsd status">>, + example => get_raw(<<"enable">>, false)}, + sample_time_interval => #{ + type => string, + description => <<"Sample Time Interval">>, + example => get_raw(<<"sample_time_interval">>, <<"10s">>)}, + flush_time_interval => #{ + type => string, + description => <<"Flush Time Interval">>, + example => get_raw(<<"flush_time_interval">>, <<"10s">>)} + } + }}]. + +statsd_api() -> + Metadata = #{ + get => #{ + description => <<"Get statsd info">>, + responses => #{ + <<"200">> => response_schema(<<"statsd">>) + } + }, + put => #{ + description => <<"Update Statsd">>, + 'requestBody' => request_body_schema(<<"statsd">>), + responses => #{ + <<"200">> => + response_schema(<<"Update Statsd successfully">>), + <<"400">> => + response_schema(<<"Bad Request">>, #{ + type => object, + properties => #{ + message => #{type => string}, + code => #{type => string} + } + }) + } + } + }, + [{"/statsd", Metadata, statsd}]. + +statsd(get, _Request) -> + Response = emqx_config:get_raw([<<"emqx_statsd">>], #{}), + {200, Response}; + +statsd(put, Request) -> + {ok, Body, _} = cowboy_req:read_body(Request), + Params = emqx_json:decode(Body, [return_maps]), + Enable = maps:get(<<"enable">>, Params), + ok = emqx_config:update_config([emqx_statsd], Params), + enable_statsd(Enable). + +enable_statsd(true) -> + ok = emqx_statsd_sup:stop_child(?APP), + emqx_statsd_sup:start_child(?APP, emqx_config:get([?APP], #{})), + {200}; +enable_statsd(false) -> + ok = emqx_statsd_sup:stop_child(?APP), + {200}. + + +get_raw(Key, Def) -> + emqx_config:get_raw([<<"emqx_statsd">>]++ [Key], Def). diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index cd998b158..a5054a16c 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -18,9 +18,7 @@ -behaviour(application). --include_lib("emqx/include/logger.hrl"). - - -emqx_plugin(?MODULE). +-include("emqx_statsd.hrl"). -export([ start/2 , stop/1 @@ -28,11 +26,15 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_statsd_sup:start_link(), - {ok, _} = emqx_statsd_sup:start_statsd(), - ?LOG(info, "emqx statsd start: successfully"), + maybe_enable_statsd(), {ok, Sup}. - stop(_) -> - ok = emqx_statsd_sup:stop_statsd(), - ?LOG(info, "emqx statsd stop: successfully"), ok. + +maybe_enable_statsd() -> + case emqx_config:get([?APP, enable], false) of + true -> + emqx_statsd_sup:start_child(emqx_statsd, emqx_config:get([?APP], #{})); + false -> + ok + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_schema.erl b/apps/emqx_statsd/src/emqx_statsd_schema.erl index 5600739e7..bbc7eedc0 100644 --- a/apps/emqx_statsd/src/emqx_statsd_schema.erl +++ b/apps/emqx_statsd/src/emqx_statsd_schema.erl @@ -4,41 +4,38 @@ -behaviour(hocon_schema). +-export([to_ip_port/1]). + -export([ structs/0 , fields/1]). +-typerefl_from_string({ip_port/0, emqx_statsd_schema, to_ip_port}). + structs() -> ["emqx_statsd"]. fields("emqx_statsd") -> - [ {host, fun host/1} - , {port, fun port/1} - , {prefix, fun prefix/1} - , {tags, map()} - , {batch_size, fun batch_size/1} - , {sample_time_interval, fun duration_s/1} - , {flush_time_interval, fun duration_s/1}]. + [ {enable, emqx_schema:t(boolean(), undefined, false)} + , {server, fun server/1} + , {sample_time_interval, fun duration_ms/1} + , {flush_time_interval, fun duration_ms/1} + ]. -host(type) -> string(); -host(default) -> "127.0.0.1"; -host(nullable) -> false; -host(_) -> undefined. +server(type) -> emqx_schema:ip_port(); +server(default) -> "127.0.0.1:8125"; +server(nullable) -> false; +server(_) -> undefined. -port(type) -> integer(); -port(default) -> 8125; -port(nullable) -> true; -port(_) -> undefined. +duration_ms(type) -> emqx_schema:duration_ms(); +duration_ms(nullable) -> false; +duration_ms(default) -> "10s"; +duration_ms(_) -> undefined. -prefix(type) -> string(); -prefix(default) -> "emqx"; -prefix(nullable) -> true; -prefix(_) -> undefined. - -batch_size(type) -> integer(); -batch_size(nullable) -> false; -batch_size(default) -> 10; -batch_size(_) -> undefined. - -duration_s(type) -> emqx_schema:duration_s(); -duration_s(nullable) -> false; -duration_s(default) -> "10s"; -duration_s(_) -> undefined. +to_ip_port(Str) -> + case string:tokens(Str, ":") of + [Ip, Port] -> + case inet:parse_address(Ip) of + {ok, R} -> {ok, {R, list_to_integer(Port)}}; + _ -> {error, Str} + end; + _ -> {error, Str} + end. diff --git a/apps/emqx_statsd/src/emqx_statsd_sup.erl b/apps/emqx_statsd/src/emqx_statsd_sup.erl index e33ea5493..02b50e3ca 100644 --- a/apps/emqx_statsd/src/emqx_statsd_sup.erl +++ b/apps/emqx_statsd/src/emqx_statsd_sup.erl @@ -7,63 +7,48 @@ -behaviour(supervisor). --include("emqx_statsd.hrl"). - --export([start_link/0]). - --export([start_statsd/0, stop_statsd/0]). +-export([ start_link/0 + , start_child/1 + , start_child/2 + , stop_child/1 + ]). -export([init/1]). --export([estatsd_options/0]). +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod, Opts), #{id => Mod, + start => {Mod, start_link, [Opts]}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod]}). - start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). - init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. +-spec start_child(supervisor:child_spec()) -> ok. +start_child(ChildSpec) when is_map(ChildSpec) -> + assert_started(supervisor:start_child(?MODULE, ChildSpec)). -start_statsd() -> - {ok, Pid} = supervisor:start_child(?MODULE, estatsd_child_spec()), - {ok, _Pid1} = supervisor:start_child(?MODULE, emqx_statsd_child_spec(Pid)). +-spec start_child(atom(), map()) -> ok. +start_child(Mod, Opts) when is_atom(Mod) andalso is_map(Opts) -> + assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Opts))). -stop_statsd() -> - ok = supervisor:terminate_child(?MODULE, emqx_statsd), - ok = supervisor:terminate_child(?MODULE, estatsd). -%%============================================================================================== -%% internal -estatsd_child_spec() -> - #{id => estatsd - , start => {estatsd, start_link, [estatsd_options()]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [estatsd]}. +-spec(stop_child(any()) -> ok | {error, term()}). +stop_child(ChildId) -> + case supervisor:terminate_child(?MODULE, ChildId) of + ok -> supervisor:delete_child(?MODULE, ChildId); + Error -> Error + end. -estatsd_options() -> - Host = get_conf(host, ?DEFAULT_HOST), - Port = get_conf(port, ?DEFAULT_PORT), - Prefix = get_conf(prefix, ?DEFAULT_PREFIX), - Tags = tags(get_conf(tags, ?DEFAULT_TAGS)), - BatchSize = get_conf(batch_size, ?DEFAULT_BATCH_SIZE), - [{host, Host}, {port, Port}, {prefix, Prefix}, {tags, Tags}, {batch_size, BatchSize}]. +init([]) -> + {ok, {{one_for_one, 10, 3600}, []}}. -tags(Map) -> - Tags = maps:to_list(Map), - [{atom_to_binary(Key, utf8), Value} || {Key, Value} <- Tags]. +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- -emqx_statsd_child_spec(Pid) -> - #{id => emqx_statsd - , start => {emqx_statsd, start_link, [[{estatsd_pid, Pid} | emqx_statsd_options()]]} - , restart => permanent - , shutdown => 5000 - , type => worker - , modules => [emqx_statsd]}. - -emqx_statsd_options() -> - SampleTimeInterval = get_conf(sample_time_interval, ?DEFAULT_SAMPLE_TIME_INTERVAL) * 1000, - FlushTimeInterval = get_conf(flush_time_interval, ?DEFAULT_FLUSH_TIME_INTERVAL) * 1000, - [{sample_time_interval, SampleTimeInterval}, {flush_time_interval, FlushTimeInterval}]. - -get_conf(Key, Default) -> - emqx_config:get([?APP, Key], Default). +assert_started({ok, _Pid}) -> ok; +assert_started({ok, _Pid, _Info}) -> ok; +assert_started({error, {already_tarted, _Pid}}) -> ok; +assert_started({error, Reason}) -> erlang:error(Reason). diff --git a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl index 2855e8ee5..9d06ee351 100644 --- a/apps/emqx_statsd/test/emqx_statsd_SUITE.erl +++ b/apps/emqx_statsd/test/emqx_statsd_SUITE.erl @@ -13,7 +13,7 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_statsd]). -all() -> +all() -> emqx_ct:all(?MODULE). t_statsd(_) -> diff --git a/rebar.config b/rebar.config index 4be003e42..66d8b8f27 100644 --- a/rebar.config +++ b/rebar.config @@ -50,7 +50,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.4"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.2"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.4"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}