Merge pull request #7608 from emqx/dev/e4.2.11
Auto-pull-request-on-2022-04-13
This commit is contained in:
commit
a44ed688ca
|
@ -42,3 +42,11 @@ erlang.mk
|
||||||
etc/emqx.conf.rendered
|
etc/emqx.conf.rendered
|
||||||
Mnesia.*/
|
Mnesia.*/
|
||||||
.stamp
|
.stamp
|
||||||
|
erlang_ls.config
|
||||||
|
# Emacs Backup files
|
||||||
|
*~
|
||||||
|
# Emacs temporary files
|
||||||
|
.#*
|
||||||
|
*#
|
||||||
|
# For direnv
|
||||||
|
.envrc
|
||||||
|
|
|
@ -255,6 +255,27 @@ end}.
|
||||||
{validator, "range4ports", "must be 1024 to 134217727",
|
{validator, "range4ports", "must be 1024 to 134217727",
|
||||||
fun(X) -> X >= 1024 andalso X =< 134217727 end}.
|
fun(X) -> X >= 1024 andalso X =< 134217727 end}.
|
||||||
|
|
||||||
|
{validator, "range:0-2", "must be 0 to 2",
|
||||||
|
fun(X) -> X >= 0 andalso X =< 2 end}.
|
||||||
|
|
||||||
|
{validator, "range:0-128", "must be 0 to 128",
|
||||||
|
fun(X) -> X >= 0 andalso X =< 128 end}.
|
||||||
|
|
||||||
|
{validator, "range:0-65535", "must be 0 to 65535",
|
||||||
|
fun(X) -> X >= 0 andalso X =< 65535 end}.
|
||||||
|
|
||||||
|
{validator, "range:1-65535", "must be 1 to 65535",
|
||||||
|
fun(X) -> X >= 1 andalso X =< 65535 end}.
|
||||||
|
|
||||||
|
{validator, "range:1-9", "must be 1 to 9",
|
||||||
|
fun(X) -> X >= 1 andalso X =< 9 end}.
|
||||||
|
|
||||||
|
{validator, "range:8-15", "must be 8 to 15",
|
||||||
|
fun(X) -> X >= 8 andalso X =< 15 end}.
|
||||||
|
|
||||||
|
{validator, "range:0-1024", "must be 0 to 1024",
|
||||||
|
fun(X) -> X >= 0 andalso X =< 1024 end}.
|
||||||
|
|
||||||
%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
|
%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
|
||||||
{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [
|
{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [
|
||||||
{datatype, bytesize},
|
{datatype, bytesize},
|
||||||
|
@ -290,10 +311,10 @@ end}.
|
||||||
{default, 1000},
|
{default, 1000},
|
||||||
{datatype, integer},
|
{datatype, integer},
|
||||||
hidden,
|
hidden,
|
||||||
{validators, ["positive_integer"]}
|
{validators, ["range:0-inf"]}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
{validator, "positive_integer", "must be a positive integer",
|
{validator, "range:0-inf", "must be non neg_integer",
|
||||||
fun(X) -> X >= 0 end}.
|
fun(X) -> X >= 0 end}.
|
||||||
|
|
||||||
%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES,
|
%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES,
|
||||||
|
@ -758,7 +779,8 @@ end}.
|
||||||
%% @doc Set the Maximum topic levels.
|
%% @doc Set the Maximum topic levels.
|
||||||
{mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [
|
{mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [
|
||||||
{default, 128},
|
{default, 128},
|
||||||
{datatype, integer}
|
{datatype, integer},
|
||||||
|
{validators, ["range:0-inf"]}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc Set the Maximum QoS allowed.
|
%% @doc Set the Maximum QoS allowed.
|
||||||
|
@ -771,7 +793,8 @@ end}.
|
||||||
%% @doc Set the Maximum Topic Alias.
|
%% @doc Set the Maximum Topic Alias.
|
||||||
{mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [
|
{mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [
|
||||||
{default, 65535},
|
{default, 65535},
|
||||||
{datatype, integer}
|
{datatype, integer},
|
||||||
|
{validators, ["range:0-65535"]}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc Whether the server supports MQTT retained messages.
|
%% @doc Whether the server supports MQTT retained messages.
|
||||||
|
@ -868,7 +891,8 @@ end}.
|
||||||
|
|
||||||
%% @doc Set the Maximum topic levels.
|
%% @doc Set the Maximum topic levels.
|
||||||
{mapping, "zone.$name.max_topic_levels", "emqx.zones", [
|
{mapping, "zone.$name.max_topic_levels", "emqx.zones", [
|
||||||
{datatype, integer}
|
{datatype, integer},
|
||||||
|
{validators, ["range:0-128"]}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc Set the Maximum QoS allowed.
|
%% @doc Set the Maximum QoS allowed.
|
||||||
|
|
|
@ -3,12 +3,15 @@
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
{"4.2.0", [
|
{"4.2.0", [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{add_module, emqx_congestion},
|
{add_module, emqx_congestion},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor,emqx_connection, emqx_ws_connection]},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
@ -22,17 +25,22 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{add_module, emqx_congestion},
|
{add_module, emqx_congestion},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
@ -46,12 +54,15 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[23]">>, [
|
{<<"4.2.[23]">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{add_module, emqx_congestion},
|
{add_module, emqx_congestion},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
@ -67,19 +78,25 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.4">>, [
|
{<<"4.2.4">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
@ -87,19 +104,25 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.5">>, [
|
{<<"4.2.5">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
|
@ -107,48 +130,88 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[6-7]">>, [
|
{<<"4.2.[6-7]">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.8">>, [
|
{<<"4.2.8">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.9">>, [
|
{<<"4.2.9">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{<<"4.2.10">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
[
|
[
|
||||||
{"4.2.0", [
|
{"4.2.0", [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
{load_module, emqx_metrics, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
@ -163,15 +226,20 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
{suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
|
@ -187,12 +255,15 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[23]">>, [
|
{<<"4.2.[23]">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
{update, emqx_connection, {advanced, []}},
|
{update, emqx_connection, {advanced, []}},
|
||||||
|
@ -208,13 +279,18 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.4">>, [
|
{<<"4.2.4">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
|
@ -228,13 +304,19 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.5">>, [
|
{<<"4.2.5">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
|
@ -248,38 +330,77 @@
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[6-7]">>, [
|
{<<"4.2.[6-7]">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
{load_module, emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module, emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_misc,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.8">>, [
|
{<<"4.2.8">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.9">>, [
|
{<<"4.2.9">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_limiter, brutal_purge, soft_purge, []}
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
|
]},
|
||||||
|
{<<"4.2.10">>, [
|
||||||
|
{load_module, emqx_message, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_app, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_os_mon, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_limiter, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_listeners, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_sys, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_plugins, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -165,6 +165,8 @@ init([Opts]) ->
|
||||||
Actions = proplists:get_value(actions, Opts),
|
Actions = proplists:get_value(actions, Opts),
|
||||||
SizeLimit = proplists:get_value(size_limit, Opts),
|
SizeLimit = proplists:get_value(size_limit, Opts),
|
||||||
ValidityPeriod = timer:seconds(proplists:get_value(validity_period, Opts)),
|
ValidityPeriod = timer:seconds(proplists:get_value(validity_period, Opts)),
|
||||||
|
emqx_alarm_handler:load(),
|
||||||
|
process_flag(trap_exit, true),
|
||||||
{ok, ensure_delete_timer(#state{actions = Actions,
|
{ok, ensure_delete_timer(#state{actions = Actions,
|
||||||
size_limit = SizeLimit,
|
size_limit = SizeLimit,
|
||||||
validity_period = ValidityPeriod})}.
|
validity_period = ValidityPeriod})}.
|
||||||
|
@ -228,6 +230,7 @@ handle_info(Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, _State) ->
|
||||||
|
emqx_alarm_handler:unload(),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
|
@ -38,13 +38,11 @@ start(_Type, _Args) ->
|
||||||
ok = emqx_plugins:init(),
|
ok = emqx_plugins:init(),
|
||||||
emqx_plugins:load(),
|
emqx_plugins:load(),
|
||||||
register(emqx, self()),
|
register(emqx, self()),
|
||||||
emqx_alarm_handler:load(),
|
|
||||||
print_vsn(),
|
print_vsn(),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
-spec(stop(State :: term()) -> term()).
|
-spec(stop(State :: term()) -> term()).
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
emqx_alarm_handler:unload(),
|
|
||||||
emqx_boot:is_enabled(listeners)
|
emqx_boot:is_enabled(listeners)
|
||||||
andalso emqx_listeners:stop().
|
andalso emqx_listeners:stop().
|
||||||
|
|
||||||
|
@ -68,4 +66,3 @@ start_autocluster() ->
|
||||||
ekka:callback(prepare, fun emqx:shutdown/1),
|
ekka:callback(prepare, fun emqx:shutdown/1),
|
||||||
ekka:callback(reboot, fun emqx:reboot/0),
|
ekka:callback(reboot, fun emqx:reboot/0),
|
||||||
ekka:autocluster(?APP).
|
ekka:autocluster(?APP).
|
||||||
|
|
||||||
|
|
|
@ -1455,11 +1455,13 @@ enrich_connack_caps(AckProps, _Channel) -> AckProps.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Enrich server keepalive
|
%% Enrich server keepalive
|
||||||
|
|
||||||
enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) ->
|
enrich_server_keepalive(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{zone := Zone}}) ->
|
||||||
case emqx_zone:server_keepalive(Zone) of
|
case emqx_zone:server_keepalive(Zone) of
|
||||||
undefined -> AckProps;
|
undefined -> AckProps;
|
||||||
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
|
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
|
||||||
end.
|
end;
|
||||||
|
|
||||||
|
enrich_server_keepalive(AckProps, _Channel) -> AckProps.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Enrich response information
|
%% Enrich response information
|
||||||
|
@ -1505,7 +1507,7 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
||||||
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Enrich Keepalive
|
%% Ensure Keepalive
|
||||||
|
|
||||||
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
|
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
|
||||||
ensure_keepalive_timer(Interval, Channel);
|
ensure_keepalive_timer(Interval, Channel);
|
||||||
|
@ -1671,4 +1673,3 @@ flag(false) -> 0.
|
||||||
set_field(Name, Value, Channel) ->
|
set_field(Name, Value, Channel) ->
|
||||||
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
|
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
|
||||||
setelement(Pos+1, Channel, Value).
|
setelement(Pos+1, Channel, Value).
|
||||||
|
|
||||||
|
|
|
@ -78,6 +78,10 @@
|
||||||
|
|
||||||
-dialyzer({no_match, [serialize_utf8_string/2]}).
|
-dialyzer({no_match, [serialize_utf8_string/2]}).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([parse_variable_byte_integer/1]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Init Parse State
|
%% Init Parse State
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -196,8 +200,9 @@ packet(Header, Variable) ->
|
||||||
packet(Header, Variable, Payload) ->
|
packet(Header, Variable, Payload) ->
|
||||||
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.
|
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
|
parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin,
|
||||||
{ProtoName, Rest} = parse_utf8_string(FrameBin),
|
#{strict_mode := StrictMode}) ->
|
||||||
|
{ProtoName, Rest} = parse_utf8_string(FrameBin, StrictMode),
|
||||||
<<BridgeTag:4, ProtoVer:4, Rest1/binary>> = Rest,
|
<<BridgeTag:4, ProtoVer:4, Rest1/binary>> = Rest,
|
||||||
% Note: Crash when reserved flag doesn't equal to 0, there is no strict
|
% Note: Crash when reserved flag doesn't equal to 0, there is no strict
|
||||||
% compliance with the MQTT5.0.
|
% compliance with the MQTT5.0.
|
||||||
|
@ -211,8 +216,8 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
|
||||||
KeepAlive : 16/big,
|
KeepAlive : 16/big,
|
||||||
Rest2/binary>> = Rest1,
|
Rest2/binary>> = Rest1,
|
||||||
|
|
||||||
{Properties, Rest3} = parse_properties(Rest2, ProtoVer),
|
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
|
||||||
{ClientId, Rest4} = parse_utf8_string(Rest3),
|
{ClientId, Rest4} = parse_utf8_string(Rest3, StrictMode),
|
||||||
ConnPacket = #mqtt_packet_connect{proto_name = ProtoName,
|
ConnPacket = #mqtt_packet_connect{proto_name = ProtoName,
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
is_bridge = (BridgeTag =:= 8),
|
is_bridge = (BridgeTag =:= 8),
|
||||||
|
@ -224,14 +229,14 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
|
||||||
properties = Properties,
|
properties = Properties,
|
||||||
clientid = ClientId
|
clientid = ClientId
|
||||||
},
|
},
|
||||||
{ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4),
|
{ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4, StrictMode),
|
||||||
{Username, Rest6} = parse_utf8_string(Rest5, bool(UsernameFlag)),
|
{Username, Rest6} = parse_utf8_string(Rest5, StrictMode, bool(UsernameFlag)),
|
||||||
{Passsword, <<>>} = parse_utf8_string(Rest6, bool(PasswordFlag)),
|
{Passsword, <<>>} = parse_utf8_string(Rest6, StrictMode, bool(PasswordFlag)),
|
||||||
ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword};
|
ConnPacket1#mqtt_packet_connect{username = Username, password = Passsword};
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?CONNACK},
|
parse_packet(#mqtt_packet_header{type = ?CONNACK}, <<AckFlags:8, ReasonCode:8, Rest/binary>>,
|
||||||
<<AckFlags:8, ReasonCode:8, Rest/binary>>, #{version := Ver}) ->
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
{Properties, <<>>} = parse_properties(Rest, Ver),
|
{Properties, <<>>} = parse_properties(Rest, Ver, StrictMode),
|
||||||
#mqtt_packet_connack{ack_flags = AckFlags,
|
#mqtt_packet_connack{ack_flags = AckFlags,
|
||||||
reason_code = ReasonCode,
|
reason_code = ReasonCode,
|
||||||
properties = Properties
|
properties = Properties
|
||||||
|
@ -239,21 +244,23 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK},
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
|
parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
|
||||||
#{strict_mode := StrictMode, version := Ver}) ->
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
{TopicName, Rest} = parse_utf8_string(Bin),
|
{TopicName, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||||
{PacketId, Rest1} = case QoS of
|
{PacketId, Rest1} = case QoS of
|
||||||
?QOS_0 -> {undefined, Rest};
|
?QOS_0 -> {undefined, Rest};
|
||||||
_ -> parse_packet_id(Rest)
|
_ -> parse_packet_id(Rest)
|
||||||
end,
|
end,
|
||||||
(PacketId =/= undefined) andalso
|
(PacketId =/= undefined) andalso
|
||||||
StrictMode andalso validate_packet_id(PacketId),
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
{Properties, Payload} = parse_properties(Rest1, Ver),
|
{Properties, Payload} = parse_properties(Rest1, Ver, StrictMode),
|
||||||
|
ok = ensure_topic_name_valid(StrictMode, TopicName, Properties),
|
||||||
Publish = #mqtt_packet_publish{topic_name = TopicName,
|
Publish = #mqtt_packet_publish{topic_name = TopicName,
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
properties = Properties
|
properties = Properties
|
||||||
},
|
},
|
||||||
{Publish, Payload};
|
{Publish, Payload};
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big>>, #{strict_mode := StrictMode})
|
parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big>>,
|
||||||
|
#{strict_mode := StrictMode})
|
||||||
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
||||||
StrictMode andalso validate_packet_id(PacketId),
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
#mqtt_packet_puback{packet_id = PacketId, reason_code = 0};
|
#mqtt_packet_puback{packet_id = PacketId, reason_code = 0};
|
||||||
|
@ -262,7 +269,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big, ReasonCode,
|
||||||
#{strict_mode := StrictMode, version := Ver = ?MQTT_PROTO_V5})
|
#{strict_mode := StrictMode, version := Ver = ?MQTT_PROTO_V5})
|
||||||
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
||||||
StrictMode andalso validate_packet_id(PacketId),
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
{Properties, <<>>} = parse_properties(Rest, Ver),
|
{Properties, <<>>} = parse_properties(Rest, Ver, StrictMode),
|
||||||
#mqtt_packet_puback{packet_id = PacketId,
|
#mqtt_packet_puback{packet_id = PacketId,
|
||||||
reason_code = ReasonCode,
|
reason_code = ReasonCode,
|
||||||
properties = Properties
|
properties = Properties
|
||||||
|
@ -271,7 +278,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big, ReasonCode,
|
||||||
parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
||||||
#{strict_mode := StrictMode, version := Ver}) ->
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
StrictMode andalso validate_packet_id(PacketId),
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
{Properties, Rest1} = parse_properties(Rest, Ver),
|
{Properties, Rest1} = parse_properties(Rest, Ver, StrictMode),
|
||||||
TopicFilters = parse_topic_filters(subscribe, Rest1),
|
TopicFilters = parse_topic_filters(subscribe, Rest1),
|
||||||
ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]),
|
ok = validate_subqos([QoS || {_, #{qos := QoS}} <- TopicFilters]),
|
||||||
#mqtt_packet_subscribe{packet_id = PacketId,
|
#mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
@ -282,7 +289,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <<PacketId:16/big, Rest/bin
|
||||||
parse_packet(#mqtt_packet_header{type = ?SUBACK}, <<PacketId:16/big, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?SUBACK}, <<PacketId:16/big, Rest/binary>>,
|
||||||
#{strict_mode := StrictMode, version := Ver}) ->
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
StrictMode andalso validate_packet_id(PacketId),
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
{Properties, Rest1} = parse_properties(Rest, Ver),
|
{Properties, Rest1} = parse_properties(Rest, Ver, StrictMode),
|
||||||
ReasonCodes = parse_reason_codes(Rest1),
|
ReasonCodes = parse_reason_codes(Rest1),
|
||||||
#mqtt_packet_suback{packet_id = PacketId,
|
#mqtt_packet_suback{packet_id = PacketId,
|
||||||
properties = Properties,
|
properties = Properties,
|
||||||
|
@ -292,7 +299,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBACK}, <<PacketId:16/big, Rest/binary
|
||||||
parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
||||||
#{strict_mode := StrictMode, version := Ver}) ->
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
StrictMode andalso validate_packet_id(PacketId),
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
{Properties, Rest1} = parse_properties(Rest, Ver),
|
{Properties, Rest1} = parse_properties(Rest, Ver, StrictMode),
|
||||||
TopicFilters = parse_topic_filters(unsubscribe, Rest1),
|
TopicFilters = parse_topic_filters(unsubscribe, Rest1),
|
||||||
#mqtt_packet_unsubscribe{packet_id = PacketId,
|
#mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
properties = Properties,
|
properties = Properties,
|
||||||
|
@ -307,7 +314,7 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big>>,
|
||||||
parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big, Rest/binary>>,
|
||||||
#{strict_mode := StrictMode, version := Ver}) ->
|
#{strict_mode := StrictMode, version := Ver}) ->
|
||||||
StrictMode andalso validate_packet_id(PacketId),
|
StrictMode andalso validate_packet_id(PacketId),
|
||||||
{Properties, Rest1} = parse_properties(Rest, Ver),
|
{Properties, Rest1} = parse_properties(Rest, Ver, StrictMode),
|
||||||
ReasonCodes = parse_reason_codes(Rest1),
|
ReasonCodes = parse_reason_codes(Rest1),
|
||||||
#mqtt_packet_unsuback{packet_id = PacketId,
|
#mqtt_packet_unsuback{packet_id = PacketId,
|
||||||
properties = Properties,
|
properties = Properties,
|
||||||
|
@ -315,115 +322,120 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big, Rest/bina
|
||||||
};
|
};
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?DISCONNECT}, <<ReasonCode, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?DISCONNECT}, <<ReasonCode, Rest/binary>>,
|
||||||
#{version := ?MQTT_PROTO_V5}) ->
|
#{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}) ->
|
||||||
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5),
|
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode),
|
||||||
#mqtt_packet_disconnect{reason_code = ReasonCode,
|
#mqtt_packet_disconnect{reason_code = ReasonCode,
|
||||||
properties = Properties
|
properties = Properties
|
||||||
};
|
};
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?AUTH}, <<ReasonCode, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?AUTH}, <<ReasonCode, Rest/binary>>,
|
||||||
#{version := ?MQTT_PROTO_V5}) ->
|
#{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}) ->
|
||||||
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5),
|
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode),
|
||||||
#mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}.
|
#mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}.
|
||||||
|
|
||||||
parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
|
parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
|
||||||
proto_ver = Ver}, Bin) ->
|
proto_ver = Ver},
|
||||||
{Props, Rest} = parse_properties(Bin, Ver),
|
Bin, StrictMode) ->
|
||||||
{Topic, Rest1} = parse_utf8_string(Rest),
|
{Props, Rest} = parse_properties(Bin, Ver, StrictMode),
|
||||||
|
{Topic, Rest1} = parse_utf8_string(Rest, StrictMode),
|
||||||
{Payload, Rest2} = parse_binary_data(Rest1),
|
{Payload, Rest2} = parse_binary_data(Rest1),
|
||||||
|
ok = ensure_topic_name_valid(StrictMode, Topic, Props),
|
||||||
{Packet#mqtt_packet_connect{will_props = Props,
|
{Packet#mqtt_packet_connect{will_props = Props,
|
||||||
will_topic = Topic,
|
will_topic = Topic,
|
||||||
will_payload = Payload
|
will_payload = Payload
|
||||||
}, Rest2};
|
}, Rest2};
|
||||||
parse_will_message(Packet, Bin) -> {Packet, Bin}.
|
parse_will_message(Packet, Bin, _StrictMode) -> {Packet, Bin}.
|
||||||
|
|
||||||
-compile({inline, [parse_packet_id/1]}).
|
-compile({inline, [parse_packet_id/1]}).
|
||||||
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
|
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
|
||||||
{PacketId, Rest}.
|
{PacketId, Rest}.
|
||||||
|
|
||||||
parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
|
parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
|
||||||
{#{}, Bin};
|
{#{}, Bin};
|
||||||
%% TODO: version mess?
|
%% TODO: version mess?
|
||||||
parse_properties(<<>>, ?MQTT_PROTO_V5) ->
|
parse_properties(<<>>, ?MQTT_PROTO_V5, _StrictMode) ->
|
||||||
{#{}, <<>>};
|
{#{}, <<>>};
|
||||||
parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5) ->
|
parse_properties(<<0, Rest/binary>>, ?MQTT_PROTO_V5, _StrictMode) ->
|
||||||
{#{}, Rest};
|
{#{}, Rest};
|
||||||
parse_properties(Bin, ?MQTT_PROTO_V5) ->
|
parse_properties(Bin, ?MQTT_PROTO_V5, StrictMode) ->
|
||||||
{Len, Rest} = parse_variable_byte_integer(Bin),
|
{Len, Rest} = parse_variable_byte_integer(Bin),
|
||||||
<<PropsBin:Len/binary, Rest1/binary>> = Rest,
|
<<PropsBin:Len/binary, Rest1/binary>> = Rest,
|
||||||
{parse_property(PropsBin, #{}), Rest1}.
|
{parse_property(PropsBin, #{}, StrictMode), Rest1}.
|
||||||
|
|
||||||
parse_property(<<>>, Props) ->
|
parse_property(<<>>, Props, _StrictMode) ->
|
||||||
Props;
|
Props;
|
||||||
parse_property(<<16#01, Val, Bin/binary>>, Props) ->
|
parse_property(<<16#01, Val, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Payload-Format-Indicator' => Val});
|
parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}, StrictMode);
|
||||||
parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) ->
|
parse_property(<<16#02, Val:32/big, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Message-Expiry-Interval' => Val});
|
parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}, StrictMode);
|
||||||
parse_property(<<16#03, Bin/binary>>, Props) ->
|
parse_property(<<16#03, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Val, Rest} = parse_utf8_string(Bin),
|
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||||
parse_property(Rest, Props#{'Content-Type' => Val});
|
parse_property(Rest, Props#{'Content-Type' => Val}, StrictMode);
|
||||||
parse_property(<<16#08, Bin/binary>>, Props) ->
|
parse_property(<<16#08, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Val, Rest} = parse_utf8_string(Bin),
|
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||||
parse_property(Rest, Props#{'Response-Topic' => Val});
|
parse_property(Rest, Props#{'Response-Topic' => Val}, StrictMode);
|
||||||
parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props) ->
|
parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Correlation-Data' => Val});
|
parse_property(Bin, Props#{'Correlation-Data' => Val}, StrictMode);
|
||||||
parse_property(<<16#0B, Bin/binary>>, Props) ->
|
parse_property(<<16#0B, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Val, Rest} = parse_variable_byte_integer(Bin),
|
{Val, Rest} = parse_variable_byte_integer(Bin),
|
||||||
parse_property(Rest, Props#{'Subscription-Identifier' => Val});
|
parse_property(Rest, Props#{'Subscription-Identifier' => Val}, StrictMode);
|
||||||
parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) ->
|
parse_property(<<16#11, Val:32/big, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Session-Expiry-Interval' => Val});
|
parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}, StrictMode);
|
||||||
parse_property(<<16#12, Bin/binary>>, Props) ->
|
parse_property(<<16#12, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Val, Rest} = parse_utf8_string(Bin),
|
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||||
parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val});
|
parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}, StrictMode);
|
||||||
parse_property(<<16#13, Val:16, Bin/binary>>, Props) ->
|
parse_property(<<16#13, Val:16, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Server-Keep-Alive' => Val});
|
parse_property(Bin, Props#{'Server-Keep-Alive' => Val}, StrictMode);
|
||||||
parse_property(<<16#15, Bin/binary>>, Props) ->
|
parse_property(<<16#15, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Val, Rest} = parse_utf8_string(Bin),
|
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||||
parse_property(Rest, Props#{'Authentication-Method' => Val});
|
parse_property(Rest, Props#{'Authentication-Method' => Val}, StrictMode);
|
||||||
parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props) ->
|
parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Authentication-Data' => Val});
|
parse_property(Bin, Props#{'Authentication-Data' => Val}, StrictMode);
|
||||||
parse_property(<<16#17, Val, Bin/binary>>, Props) ->
|
parse_property(<<16#17, Val, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Request-Problem-Information' => Val});
|
parse_property(Bin, Props#{'Request-Problem-Information' => Val}, StrictMode);
|
||||||
parse_property(<<16#18, Val:32, Bin/binary>>, Props) ->
|
parse_property(<<16#18, Val:32, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Will-Delay-Interval' => Val});
|
parse_property(Bin, Props#{'Will-Delay-Interval' => Val}, StrictMode);
|
||||||
parse_property(<<16#19, Val, Bin/binary>>, Props) ->
|
parse_property(<<16#19, Val, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Request-Response-Information' => Val});
|
parse_property(Bin, Props#{'Request-Response-Information' => Val}, StrictMode);
|
||||||
parse_property(<<16#1A, Bin/binary>>, Props) ->
|
parse_property(<<16#1A, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Val, Rest} = parse_utf8_string(Bin),
|
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||||
parse_property(Rest, Props#{'Response-Information' => Val});
|
parse_property(Rest, Props#{'Response-Information' => Val}, StrictMode);
|
||||||
parse_property(<<16#1C, Bin/binary>>, Props) ->
|
parse_property(<<16#1C, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Val, Rest} = parse_utf8_string(Bin),
|
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||||
parse_property(Rest, Props#{'Server-Reference' => Val});
|
parse_property(Rest, Props#{'Server-Reference' => Val}, StrictMode);
|
||||||
parse_property(<<16#1F, Bin/binary>>, Props) ->
|
parse_property(<<16#1F, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Val, Rest} = parse_utf8_string(Bin),
|
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
|
||||||
parse_property(Rest, Props#{'Reason-String' => Val});
|
parse_property(Rest, Props#{'Reason-String' => Val}, StrictMode);
|
||||||
parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) ->
|
parse_property(<<16#21, Val:16/big, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Receive-Maximum' => Val});
|
parse_property(Bin, Props#{'Receive-Maximum' => Val}, StrictMode);
|
||||||
parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) ->
|
parse_property(<<16#22, Val:16/big, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val});
|
parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}, StrictMode);
|
||||||
parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) ->
|
parse_property(<<16#23, Val:16/big, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Topic-Alias' => Val});
|
parse_property(Bin, Props#{'Topic-Alias' => Val}, StrictMode);
|
||||||
parse_property(<<16#24, Val, Bin/binary>>, Props) ->
|
parse_property(<<16#24, Val, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Maximum-QoS' => Val});
|
parse_property(Bin, Props#{'Maximum-QoS' => Val}, StrictMode);
|
||||||
parse_property(<<16#25, Val, Bin/binary>>, Props) ->
|
parse_property(<<16#25, Val, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Retain-Available' => Val});
|
parse_property(Bin, Props#{'Retain-Available' => Val}, StrictMode);
|
||||||
parse_property(<<16#26, Bin/binary>>, Props) ->
|
parse_property(<<16#26, Bin/binary>>, Props, StrictMode) ->
|
||||||
{Pair, Rest} = parse_utf8_pair(Bin),
|
{Pair, Rest} = parse_utf8_pair(Bin, StrictMode),
|
||||||
case maps:find('User-Property', Props) of
|
case maps:find('User-Property', Props) of
|
||||||
{ok, UserProps} ->
|
{ok, UserProps} ->
|
||||||
UserProps1 = lists:append(UserProps, [Pair]),
|
UserProps1 = lists:append(UserProps, [Pair]),
|
||||||
parse_property(Rest, Props#{'User-Property' := UserProps1});
|
parse_property(Rest, Props#{'User-Property' := UserProps1}, StrictMode);
|
||||||
error ->
|
error ->
|
||||||
parse_property(Rest, Props#{'User-Property' => [Pair]})
|
parse_property(Rest, Props#{'User-Property' => [Pair]}, StrictMode)
|
||||||
end;
|
end;
|
||||||
parse_property(<<16#27, Val:32, Bin/binary>>, Props) ->
|
parse_property(<<16#27, Val:32, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Maximum-Packet-Size' => Val});
|
parse_property(Bin, Props#{'Maximum-Packet-Size' => Val}, StrictMode);
|
||||||
parse_property(<<16#28, Val, Bin/binary>>, Props) ->
|
parse_property(<<16#28, Val, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val});
|
parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}, StrictMode);
|
||||||
parse_property(<<16#29, Val, Bin/binary>>, Props) ->
|
parse_property(<<16#29, Val, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val});
|
parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}, StrictMode);
|
||||||
parse_property(<<16#2A, Val, Bin/binary>>, Props) ->
|
parse_property(<<16#2A, Val, Bin/binary>>, Props, StrictMode) ->
|
||||||
parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}).
|
parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}, StrictMode);
|
||||||
|
parse_property(<<Property:8, _Rest/binary>>, _Props, _StrictMode) ->
|
||||||
|
error(#{invalid_property_code => Property}).
|
||||||
|
%% TODO: invalid property in specific packet.
|
||||||
|
|
||||||
parse_variable_byte_integer(Bin) ->
|
parse_variable_byte_integer(Bin) ->
|
||||||
parse_variable_byte_integer(Bin, 1, 0).
|
parse_variable_byte_integer(Bin, 1, 0).
|
||||||
|
@ -445,20 +457,62 @@ parse_topic_filters(unsubscribe, Bin) ->
|
||||||
parse_reason_codes(Bin) ->
|
parse_reason_codes(Bin) ->
|
||||||
[Code || <<Code>> <= Bin].
|
[Code || <<Code>> <= Bin].
|
||||||
|
|
||||||
parse_utf8_pair(<<Len1:16/big, Key:Len1/binary,
|
%%--------------------
|
||||||
Len2:16/big, Val:Len2/binary, Rest/binary>>) ->
|
%% parse utf8 pair
|
||||||
{{Key, Val}, Rest}.
|
parse_utf8_pair( <<Len1:16/big, Key:Len1/binary,
|
||||||
|
Len2:16/big, Val:Len2/binary, Rest/binary>>
|
||||||
|
, true) ->
|
||||||
|
{{validate_utf8(Key), validate_utf8(Val)}, Rest};
|
||||||
|
parse_utf8_pair( <<Len1:16/big, Key:Len1/binary,
|
||||||
|
Len2:16/big, Val:Len2/binary, Rest/binary>>
|
||||||
|
, false) ->
|
||||||
|
{{Key, Val}, Rest};
|
||||||
|
parse_utf8_pair(<<LenK:16/big, Rest/binary>>, _StrictMode)
|
||||||
|
when LenK > byte_size(Rest) ->
|
||||||
|
error(user_property_not_enough_bytes);
|
||||||
|
parse_utf8_pair(<<LenK:16/big, _Key:LenK/binary, %% key maybe malformed
|
||||||
|
LenV:16/big, Rest/binary>>, _StrictMode)
|
||||||
|
when LenV > byte_size(Rest) ->
|
||||||
|
error(malformed_user_property_value);
|
||||||
|
parse_utf8_pair(Bin, _StrictMode)
|
||||||
|
when 4 > byte_size(Bin) ->
|
||||||
|
error(user_property_not_enough_bytes).
|
||||||
|
|
||||||
parse_utf8_string(Bin, false) ->
|
%%--------------------
|
||||||
|
%% parse utf8 string
|
||||||
|
parse_utf8_string(Bin, _StrictMode, false) ->
|
||||||
{undefined, Bin};
|
{undefined, Bin};
|
||||||
parse_utf8_string(Bin, true) ->
|
parse_utf8_string(Bin, StrictMode, true) ->
|
||||||
parse_utf8_string(Bin).
|
parse_utf8_string(Bin, StrictMode).
|
||||||
|
|
||||||
parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
|
parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>, true) ->
|
||||||
{Str, Rest}.
|
{validate_utf8(Str), Rest};
|
||||||
|
parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>, false) ->
|
||||||
|
{Str, Rest};
|
||||||
|
parse_utf8_string(<<Len:16/big, Rest/binary>>, _)
|
||||||
|
when Len > byte_size(Rest) ->
|
||||||
|
error(malformed_utf8_string);
|
||||||
|
parse_utf8_string(Bin, _)
|
||||||
|
when 2 > byte_size(Bin) ->
|
||||||
|
error(malformed_utf8_string_length).
|
||||||
|
|
||||||
parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
|
parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
|
||||||
{Data, Rest}.
|
{Data, Rest};
|
||||||
|
parse_binary_data(<<Len:16/big, Rest/binary>>)
|
||||||
|
when Len > byte_size(Rest) ->
|
||||||
|
error(malformed_binary_data);
|
||||||
|
parse_binary_data(Bin)
|
||||||
|
when 2 > byte_size(Bin) ->
|
||||||
|
error(malformed_binary_data_length).
|
||||||
|
|
||||||
|
ensure_topic_name_valid(false, _TopicName, _Properties) ->
|
||||||
|
ok;
|
||||||
|
ensure_topic_name_valid(true, TopicName, _Properties) when TopicName =/= <<>> ->
|
||||||
|
ok;
|
||||||
|
ensure_topic_name_valid(true, <<>>, #{'Topic-Alias' := _}) ->
|
||||||
|
ok;
|
||||||
|
ensure_topic_name_valid(true, <<>>, _) ->
|
||||||
|
error(empty_topic_name).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Serialize MQTT Packet
|
%% Serialize MQTT Packet
|
||||||
|
@ -796,3 +850,52 @@ fixqos(?PUBREL, 0) -> 1;
|
||||||
fixqos(?SUBSCRIBE, 0) -> 1;
|
fixqos(?SUBSCRIBE, 0) -> 1;
|
||||||
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
||||||
fixqos(_Type, QoS) -> QoS.
|
fixqos(_Type, QoS) -> QoS.
|
||||||
|
|
||||||
|
validate_utf8(Bin) ->
|
||||||
|
case unicode:characters_to_binary(Bin) of
|
||||||
|
{error, _, _} ->
|
||||||
|
error(utf8_string_invalid);
|
||||||
|
{incomplete, _, _} ->
|
||||||
|
error(utf8_string_invalid);
|
||||||
|
Bin when is_binary(Bin) ->
|
||||||
|
case validate_mqtt_utf8_char(Bin) of
|
||||||
|
true -> Bin;
|
||||||
|
false -> error(utf8_string_invalid)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Is the utf8 string respecting UTF-8 characters defined by MQTT Spec?
|
||||||
|
%% i.e. contains invalid UTF-8 char or control char
|
||||||
|
validate_mqtt_utf8_char(<<>>) ->
|
||||||
|
true;
|
||||||
|
%% ==== 1-Byte UTF-8 invalid: [[U+0000 .. U+001F] && [U+007F]]
|
||||||
|
validate_mqtt_utf8_char(<<B1, Bs/binary>>)
|
||||||
|
when B1 >= 16#20, B1 =< 16#7E ->
|
||||||
|
validate_mqtt_utf8_char(Bs);
|
||||||
|
validate_mqtt_utf8_char(<<B1, _Bs/binary>>)
|
||||||
|
when B1 >= 16#00, B1 =< 16#1F;
|
||||||
|
B1 =:= 16#7F ->
|
||||||
|
%% [U+0000 .. U+001F] && [U+007F]
|
||||||
|
false;
|
||||||
|
%% ==== 2-Bytes UTF-8 invalid: [U+0080 .. U+009F]
|
||||||
|
validate_mqtt_utf8_char(<<B1, B2, Bs/binary>>)
|
||||||
|
when B1 =:= 16#C2;
|
||||||
|
B2 >= 16#A0, B2 =< 16#BF;
|
||||||
|
B1 > 16#C3, B1 =< 16#DE;
|
||||||
|
B2 >= 16#80, B2 =< 16#BF ->
|
||||||
|
validate_mqtt_utf8_char(Bs);
|
||||||
|
validate_mqtt_utf8_char(<<16#C2, B2, _Bs/binary>>)
|
||||||
|
when B2 >= 16#80, B2 =< 16#9F ->
|
||||||
|
%% [U+0080 .. U+009F]
|
||||||
|
false;
|
||||||
|
%% ==== 3-Bytes UTF-8 invalid: [U+D800 .. U+DFFF]
|
||||||
|
validate_mqtt_utf8_char(<<B1, _B2, _B3, Bs/binary>>)
|
||||||
|
when B1 >= 16#E0, B1 =< 16#EE;
|
||||||
|
B1 =:= 16#EF ->
|
||||||
|
validate_mqtt_utf8_char(Bs);
|
||||||
|
validate_mqtt_utf8_char(<<16#ED, _B2, _B3, _Bs/binary>>) ->
|
||||||
|
false;
|
||||||
|
%% ==== 4-Bytes UTF-8
|
||||||
|
validate_mqtt_utf8_char(<<B1, _B2, _B3, _B4, Bs/binary>>)
|
||||||
|
when B1 =:= 16#0F ->
|
||||||
|
validate_mqtt_utf8_char(Bs).
|
||||||
|
|
|
@ -23,7 +23,8 @@
|
||||||
, init/4 %% XXX: Compatible with before 4.2 version
|
, init/4 %% XXX: Compatible with before 4.2 version
|
||||||
, info/1
|
, info/1
|
||||||
, check/2
|
, check/2
|
||||||
, update_overall_limiter/4
|
, update_overall_limiter/3
|
||||||
|
, delete_overall_limiter/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-record(limiter, {
|
-record(limiter, {
|
||||||
|
@ -154,14 +155,18 @@ is_message_limiter(conn_messages_routing) -> true;
|
||||||
is_message_limiter(overall_messages_routing) -> true;
|
is_message_limiter(overall_messages_routing) -> true;
|
||||||
is_message_limiter(_) -> false.
|
is_message_limiter(_) -> false.
|
||||||
|
|
||||||
update_overall_limiter(Zone, Name, Capacity, Interval) ->
|
update_overall_limiter(Zone, Capacity, Interval) ->
|
||||||
case is_overall_limiter(Name) of
|
|
||||||
false -> false;
|
|
||||||
_ ->
|
|
||||||
try
|
try
|
||||||
esockd_limiter:update({Zone, Name}, Capacity, Interval),
|
esockd_limiter:update({Zone, overall_messages_routing}, Capacity, Interval),
|
||||||
|
true
|
||||||
|
catch _:_:_ ->
|
||||||
|
false
|
||||||
|
end.
|
||||||
|
|
||||||
|
delete_overall_limiter(Zone) ->
|
||||||
|
try
|
||||||
|
esockd_limiter:delete({Zone, overall_messages_routing}),
|
||||||
true
|
true
|
||||||
catch _:_:_ ->
|
catch _:_:_ ->
|
||||||
false
|
false
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -29,6 +29,7 @@
|
||||||
, start_listener/3
|
, start_listener/3
|
||||||
, stop_listener/1
|
, stop_listener/1
|
||||||
, stop_listener/3
|
, stop_listener/3
|
||||||
|
, update_listeners_env/2
|
||||||
, restart_listener/1
|
, restart_listener/1
|
||||||
, restart_listener/3
|
, restart_listener/3
|
||||||
]).
|
]).
|
||||||
|
@ -114,6 +115,20 @@ with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) ->
|
||||||
restart() ->
|
restart() ->
|
||||||
lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
|
lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])).
|
||||||
|
|
||||||
|
update_listeners_env(Action, NewConf = #{name := NewName, proto := NewProto}) ->
|
||||||
|
Listener = emqx:get_env(listeners, []),
|
||||||
|
Listener1 = lists:filter(
|
||||||
|
fun(#{name := Name, proto := Proto}) ->
|
||||||
|
not (Name =:= NewName andalso Proto =:= NewProto)
|
||||||
|
end, Listener),
|
||||||
|
Listener2 =
|
||||||
|
case Action of
|
||||||
|
update -> [NewConf | Listener1];
|
||||||
|
delete -> Listener1
|
||||||
|
end,
|
||||||
|
application:set_env(emqx, listeners, Listener2),
|
||||||
|
ok.
|
||||||
|
|
||||||
-spec(restart_listener(listener()) -> any()).
|
-spec(restart_listener(listener()) -> any()).
|
||||||
restart_listener({Proto, ListenOn, Options}) ->
|
restart_listener({Proto, ListenOn, Options}) ->
|
||||||
restart_listener(Proto, ListenOn, Options).
|
restart_listener(Proto, ListenOn, Options).
|
||||||
|
|
|
@ -298,7 +298,8 @@ elapsed(Since) ->
|
||||||
|
|
||||||
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
|
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
|
||||||
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
|
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
|
||||||
[Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
|
[emqx_guid:to_hexstr(Id), QoS, Topic, From, format(flags, Flags),
|
||||||
|
format(headers, Headers)]).
|
||||||
|
|
||||||
format(flags, Flags) ->
|
format(flags, Flags) ->
|
||||||
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
|
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
|
||||||
|
|
|
@ -105,8 +105,10 @@ call(Req) ->
|
||||||
|
|
||||||
init([Opts]) ->
|
init([Opts]) ->
|
||||||
set_mem_check_interval(proplists:get_value(mem_check_interval, Opts)),
|
set_mem_check_interval(proplists:get_value(mem_check_interval, Opts)),
|
||||||
set_sysmem_high_watermark(proplists:get_value(sysmem_high_watermark, Opts)),
|
HW = proplists:get_value(sysmem_high_watermark, Opts),
|
||||||
|
set_sysmem_high_watermark(HW),
|
||||||
set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts)),
|
set_procmem_high_watermark(proplists:get_value(procmem_high_watermark, Opts)),
|
||||||
|
ensure_system_memory_alarm(HW),
|
||||||
{ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts),
|
{ok, ensure_check_timer(#{cpu_high_watermark => proplists:get_value(cpu_high_watermark, Opts),
|
||||||
cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts),
|
cpu_low_watermark => proplists:get_value(cpu_low_watermark, Opts),
|
||||||
cpu_check_interval => proplists:get_value(cpu_check_interval, Opts),
|
cpu_check_interval => proplists:get_value(cpu_check_interval, Opts),
|
||||||
|
@ -177,3 +179,20 @@ ensure_check_timer(State = #{cpu_check_interval := Interval}) ->
|
||||||
"x86_64-pc-linux-musl" -> State;
|
"x86_64-pc-linux-musl" -> State;
|
||||||
_ -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}
|
_ -> State#{timer := emqx_misc:start_timer(timer:seconds(Interval), check)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% At startup, memsup starts first and checks for memory alarms,
|
||||||
|
%% but emqx_alarm_handler is not yet used instead of alarm_handler,
|
||||||
|
%% so alarm_handler is used directly for notification (normally emqx_alarm_handler should be used).
|
||||||
|
%%The internal memsup will no longer trigger events that have been alerted,
|
||||||
|
%% and there is no exported function to remove the alerted flag,
|
||||||
|
%% so it can only be checked again at startup.
|
||||||
|
ensure_system_memory_alarm(HW) ->
|
||||||
|
case erlang:whereis(memsup) of
|
||||||
|
undefined -> ok;
|
||||||
|
_Pid ->
|
||||||
|
{Total, Allocated, _Worst} = memsup:get_memory_data(),
|
||||||
|
case Total =/= 0 andalso Allocated/Total * 100 >= HW of
|
||||||
|
true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW});
|
||||||
|
false -> ok
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
|
@ -204,10 +204,21 @@ with_loaded_file(File, SuccFun) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
filter_plugins(Names) ->
|
filter_plugins(Names) ->
|
||||||
lists:filtermap(fun(Name1) when is_atom(Name1) -> {true, Name1};
|
filter_plugins(Names, []).
|
||||||
({Name1, true}) -> {true, Name1};
|
|
||||||
({_Name1, false}) -> false
|
filter_plugins([], Plugins) ->
|
||||||
end, Names).
|
lists:reverse(Plugins);
|
||||||
|
filter_plugins([{Name, Load} | Names], Plugins) ->
|
||||||
|
case {Load, lists:member(Name, Plugins)} of
|
||||||
|
{true, false} ->
|
||||||
|
filter_plugins(Names, [Name | Plugins]);
|
||||||
|
{false, true} ->
|
||||||
|
filter_plugins(Names, Plugins -- [Name]);
|
||||||
|
_ ->
|
||||||
|
filter_plugins(Names, Plugins)
|
||||||
|
end;
|
||||||
|
filter_plugins([Name | Names], Plugins) when is_atom(Name) ->
|
||||||
|
filter_plugins([{Name, true} | Names], Plugins).
|
||||||
|
|
||||||
load_plugins(Names, Persistent) ->
|
load_plugins(Names, Persistent) ->
|
||||||
Plugins = list(), NotFound = Names -- names(Plugins),
|
Plugins = list(), NotFound = Names -- names(Plugins),
|
||||||
|
|
|
@ -96,7 +96,15 @@ sysdescr() ->
|
||||||
%% @doc Get sys uptime
|
%% @doc Get sys uptime
|
||||||
-spec(uptime() -> string()).
|
-spec(uptime() -> string()).
|
||||||
uptime() ->
|
uptime() ->
|
||||||
gen_server:call(?SYS, uptime).
|
{TotalWallClock, _} = erlang:statistics(wall_clock),
|
||||||
|
uptime(TotalWallClock div 1000).
|
||||||
|
|
||||||
|
uptime(Seconds) ->
|
||||||
|
{D, {H, M, S}} = calendar:seconds_to_daystime(Seconds),
|
||||||
|
L0 = [{D, " days"}, {H, " hours"}, {M, " minutes"}, {S, " seconds"}],
|
||||||
|
L1 = lists:dropwhile(fun({K, _}) -> K =:= 0 end, L0),
|
||||||
|
L2 = lists:map(fun({Time, Unit}) -> [integer_to_list(Time), Unit] end, L1),
|
||||||
|
lists:flatten(lists:join(", ", L2)).
|
||||||
|
|
||||||
%% @doc Get sys datetime
|
%% @doc Get sys datetime
|
||||||
-spec(datetime() -> string()).
|
-spec(datetime() -> string()).
|
||||||
|
@ -139,9 +147,6 @@ heartbeat(State) ->
|
||||||
tick(State) ->
|
tick(State) ->
|
||||||
State#state{ticker = start_timer(sys_interval(), tick)}.
|
State#state{ticker = start_timer(sys_interval(), tick)}.
|
||||||
|
|
||||||
handle_call(uptime, _From, State) ->
|
|
||||||
{reply, uptime(State), State};
|
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
@ -151,7 +156,7 @@ handle_cast(Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
|
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
|
||||||
publish(uptime, iolist_to_binary(uptime(State))),
|
publish(uptime, iolist_to_binary(uptime())),
|
||||||
publish(datetime, iolist_to_binary(datetime())),
|
publish(datetime, iolist_to_binary(datetime())),
|
||||||
{noreply, heartbeat(State)};
|
{noreply, heartbeat(State)};
|
||||||
|
|
||||||
|
@ -174,24 +179,6 @@ terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
uptime(#state{start_time = Ts}) ->
|
|
||||||
Secs = timer:now_diff(erlang:timestamp(), Ts) div 1000000,
|
|
||||||
lists:flatten(uptime(seconds, Secs)).
|
|
||||||
uptime(seconds, Secs) when Secs < 60 ->
|
|
||||||
[integer_to_list(Secs), " seconds"];
|
|
||||||
uptime(seconds, Secs) ->
|
|
||||||
[uptime(minutes, Secs div 60), integer_to_list(Secs rem 60), " seconds"];
|
|
||||||
uptime(minutes, M) when M < 60 ->
|
|
||||||
[integer_to_list(M), " minutes, "];
|
|
||||||
uptime(minutes, M) ->
|
|
||||||
[uptime(hours, M div 60), integer_to_list(M rem 60), " minutes, "];
|
|
||||||
uptime(hours, H) when H < 24 ->
|
|
||||||
[integer_to_list(H), " hours, "];
|
|
||||||
uptime(hours, H) ->
|
|
||||||
[uptime(days, H div 24), integer_to_list(H rem 24), " hours, "];
|
|
||||||
uptime(days, D) ->
|
|
||||||
[integer_to_list(D), " days, "].
|
|
||||||
|
|
||||||
publish(uptime, Uptime) ->
|
publish(uptime, Uptime) ->
|
||||||
safe_publish(systop(uptime), Uptime);
|
safe_publish(systop(uptime), Uptime);
|
||||||
publish(datetime, Datetime) ->
|
publish(datetime, Datetime) ->
|
||||||
|
|
|
@ -43,7 +43,12 @@ groups() ->
|
||||||
[{parse, [parallel],
|
[{parse, [parallel],
|
||||||
[t_parse_cont,
|
[t_parse_cont,
|
||||||
t_parse_frame_too_large,
|
t_parse_frame_too_large,
|
||||||
t_parse_frame_malformed_variable_byte_integer
|
t_parse_frame_malformed_variable_byte_integer,
|
||||||
|
t_parse_frame_variable_byte_integer,
|
||||||
|
t_parse_malformed_utf8_string,
|
||||||
|
t_parse_empty_topic_name,
|
||||||
|
t_parse_empty_topic_name_with_alias,
|
||||||
|
t_parse_frame_proxy_protocol %% proxy_protocol_config_disabled packet.
|
||||||
]},
|
]},
|
||||||
{connect, [parallel],
|
{connect, [parallel],
|
||||||
[t_serialize_parse_v3_connect,
|
[t_serialize_parse_v3_connect,
|
||||||
|
@ -136,6 +141,50 @@ t_parse_frame_malformed_variable_byte_integer(_) ->
|
||||||
?catch_error(malformed_variable_byte_integer,
|
?catch_error(malformed_variable_byte_integer,
|
||||||
emqx_frame:parse(MalformedPayload, ParseState)).
|
emqx_frame:parse(MalformedPayload, ParseState)).
|
||||||
|
|
||||||
|
t_parse_frame_variable_byte_integer(_) ->
|
||||||
|
Bin = <<2#10010011, 2#10000000, 2#10001000, 2#10011001, 2#10101101, 2#00110010>>,
|
||||||
|
?catch_error(malformed_variable_byte_integer,
|
||||||
|
emqx_frame:parse_variable_byte_integer(Bin)).
|
||||||
|
|
||||||
|
t_parse_malformed_utf8_string(_) ->
|
||||||
|
MalformedPacket = <<16,31,0,4,
|
||||||
|
%% Specification name, should be "MQTT"
|
||||||
|
%% 77,81,84,84,
|
||||||
|
%% malformed 1-Byte UTF-8 in (U+0000 .. U+001F] && [U+007F])
|
||||||
|
16#00,16#01,16#1F,16#7F,
|
||||||
|
|
||||||
|
4,194,0,60,
|
||||||
|
0,4,101,109,
|
||||||
|
113,120,0,5,
|
||||||
|
97,100,109,105,
|
||||||
|
110,0,6,112,
|
||||||
|
117,98,108,105,
|
||||||
|
99>>,
|
||||||
|
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
|
||||||
|
?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
|
||||||
|
|
||||||
|
t_parse_empty_topic_name(_) ->
|
||||||
|
Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<>>),
|
||||||
|
?assertEqual(Packet, parse_serialize(Packet, #{strict_mode => false})),
|
||||||
|
?catch_error(empty_topic_name, parse_serialize(Packet, #{strict_mode => true})).
|
||||||
|
|
||||||
|
t_parse_empty_topic_name_with_alias(_) ->
|
||||||
|
Props = #{'Topic-Alias' => 16#AB},
|
||||||
|
Packet = ?PUBLISH_PACKET(?QOS_1, <<>>, 1, Props, <<>>),
|
||||||
|
?assertEqual(
|
||||||
|
Packet, parse_serialize(Packet, #{strict_mode => false, version => ?MQTT_PROTO_V5})
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
Packet, parse_serialize(Packet, #{strict_mode => true, version => ?MQTT_PROTO_V5})
|
||||||
|
).
|
||||||
|
|
||||||
|
t_parse_frame_proxy_protocol(_) ->
|
||||||
|
BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">>
|
||||||
|
, <<"\r\n\r\n\0\r\nQUIT\n">>],
|
||||||
|
[?assertError( proxy_protocol_config_disabled
|
||||||
|
, emqx_frame:parse(Bin))
|
||||||
|
|| Bin <- BinList].
|
||||||
|
|
||||||
t_serialize_parse_v3_connect(_) ->
|
t_serialize_parse_v3_connect(_) ->
|
||||||
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
||||||
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
|
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
|
||||||
|
@ -531,4 +580,3 @@ parse_to_packet(Bin, Opts) ->
|
||||||
Packet.
|
Packet.
|
||||||
|
|
||||||
payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
|
payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
|
||||||
|
|
||||||
|
|
|
@ -24,10 +24,23 @@
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
emqx_ct_helpers:boot_modules(all),
|
||||||
|
emqx_ct_helpers:start_apps([],
|
||||||
|
fun(emqx) ->
|
||||||
|
application:set_env(emqx, os_mon, [
|
||||||
|
{cpu_check_interval, 1},
|
||||||
|
{cpu_high_watermark, 5},
|
||||||
|
{cpu_low_watermark, 80},
|
||||||
|
{mem_check_interval, 60},
|
||||||
|
{sysmem_high_watermark, 70},
|
||||||
|
{procmem_high_watermark, 5}]);
|
||||||
|
(_) -> ok
|
||||||
|
end),
|
||||||
application:ensure_all_started(os_mon),
|
application:ensure_all_started(os_mon),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
|
emqx_ct_helpers:stop_apps([]),
|
||||||
application:stop(os_mon).
|
application:stop(os_mon).
|
||||||
|
|
||||||
% t_set_mem_check_interval(_) ->
|
% t_set_mem_check_interval(_) ->
|
||||||
|
@ -40,13 +53,6 @@ end_per_suite(_Config) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
t_api(_) ->
|
t_api(_) ->
|
||||||
gen_event:swap_handler(alarm_handler, {emqx_alarm_handler, swap}, {alarm_handler, []}),
|
|
||||||
{ok, _} = emqx_os_mon:start_link([{cpu_check_interval, 1},
|
|
||||||
{cpu_high_watermark, 5},
|
|
||||||
{cpu_low_watermark, 80},
|
|
||||||
{mem_check_interval, 60},
|
|
||||||
{sysmem_high_watermark, 70},
|
|
||||||
{procmem_high_watermark, 5}]),
|
|
||||||
?assertEqual(1, emqx_os_mon:get_cpu_check_interval()),
|
?assertEqual(1, emqx_os_mon:get_cpu_check_interval()),
|
||||||
?assertEqual(5, emqx_os_mon:get_cpu_high_watermark()),
|
?assertEqual(5, emqx_os_mon:get_cpu_high_watermark()),
|
||||||
?assertEqual(80, emqx_os_mon:get_cpu_low_watermark()),
|
?assertEqual(80, emqx_os_mon:get_cpu_low_watermark()),
|
||||||
|
@ -69,4 +75,3 @@ t_api(_) ->
|
||||||
emqx_os_mon ! ignored,
|
emqx_os_mon ! ignored,
|
||||||
gen_server:stop(emqx_os_mon),
|
gen_server:stop(emqx_os_mon),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -42,10 +42,24 @@ end_per_suite(_Config) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
||||||
t_uptime(_) ->
|
t_uptime(_) ->
|
||||||
?assertEqual(<<"1 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 1))),
|
?assert(is_list(emqx_sys:uptime())),
|
||||||
?assertEqual(<<"1 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 60))),
|
?assertEqual(<<"1 seconds">>, iolist_to_binary(emqx_sys:uptime(1))),
|
||||||
?assertEqual(<<"1 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 3600))),
|
?assertEqual(<<"1 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(60))),
|
||||||
?assertEqual(<<"1 days, 0 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(seconds, 86400))).
|
?assertEqual(<<"1 hours, 0 minutes, 0 seconds">>, iolist_to_binary(emqx_sys:uptime(3600))),
|
||||||
|
?assertEqual(<<"1 hours, 1 minutes, 1 seconds">>, iolist_to_binary(emqx_sys:uptime(3661))),
|
||||||
|
?assertEqual(<<"1 days, 0 hours, 0 minutes, 0 seconds">>,
|
||||||
|
iolist_to_binary(emqx_sys:uptime(86400))),
|
||||||
|
lists:map(fun({D, H, M, S}) ->
|
||||||
|
Expect = <<
|
||||||
|
(integer_to_binary(D))/binary, " days, ",
|
||||||
|
(integer_to_binary(H))/binary, " hours, ",
|
||||||
|
(integer_to_binary(M))/binary, " minutes, ",
|
||||||
|
(integer_to_binary(S))/binary, " seconds"
|
||||||
|
>>,
|
||||||
|
Actual = iolist_to_binary(emqx_sys:uptime(D * 86400 + H * 3600 + M * 60 + S)),
|
||||||
|
?assertEqual(Expect, Actual)
|
||||||
|
end,
|
||||||
|
[{1, 2, 3, 4}, {10, 20, 30, 40}, {2222, 3, 56, 59}, {59, 23, 59, 59}]).
|
||||||
|
|
||||||
% t_datetime(_) ->
|
% t_datetime(_) ->
|
||||||
% error('TODO').
|
% error('TODO').
|
||||||
|
|
Loading…
Reference in New Issue