diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
index 40b23415c..a4f7d5b89 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl
@@ -32,9 +32,13 @@
get_bucket_cfg_path/2,
desc/1,
types/0,
+ short_paths/0,
calc_capacity/1,
extract_with_type/2,
- default_client_config/0
+ default_client_config/0,
+ short_paths_fields/1,
+ get_listener_opts/1,
+ get_node_opts/1
]).
-define(KILOBYTE, 1024).
@@ -104,15 +108,17 @@ roots() ->
].
fields(limiter) ->
- [
- {Type,
- ?HOCON(?R_REF(node_opts), #{
- desc => ?DESC(Type),
- importance => ?IMPORTANCE_HIDDEN,
- aliases => alias_of_type(Type)
- })}
- || Type <- types()
- ] ++
+ short_paths_fields(?MODULE) ++
+ [
+ {Type,
+ ?HOCON(?R_REF(node_opts), #{
+ desc => ?DESC(Type),
+ importance => ?IMPORTANCE_HIDDEN,
+ required => {false, recursively},
+ aliases => alias_of_type(Type)
+ })}
+ || Type <- types()
+ ] ++
[
%% This is an undocumented feature, and it won't be support anymore
{client,
@@ -203,6 +209,14 @@ fields(listener_client_fields) ->
fields(Type) ->
simple_bucket_field(Type).
+short_paths_fields(DesModule) ->
+ [
+ {Name,
+ ?HOCON(rate(), #{desc => ?DESC(DesModule, Name), required => false, example => Example})}
+ || {Name, Example} <-
+ lists:zip(short_paths(), [<<"1000/s">>, <<"1000/s">>, <<"100MB/s">>])
+ ].
+
desc(limiter) ->
"Settings for the rate limiter.";
desc(node_opts) ->
@@ -236,6 +250,9 @@ get_bucket_cfg_path(Type, BucketName) ->
types() ->
[bytes, messages, connection, message_routing, internal].
+short_paths() ->
+ [max_conn_rate, messages_rate, bytes_rate].
+
calc_capacity(#{rate := infinity}) ->
infinity;
calc_capacity(#{rate := Rate, burst := Burst}) ->
@@ -266,6 +283,31 @@ default_client_config() ->
failure_strategy => force
}.
+default_bucket_config() ->
+ #{
+ rate => infinity,
+ burst => 0
+ }.
+
+get_listener_opts(Conf) ->
+ Limiter = maps:get(limiter, Conf, undefined),
+ ShortPaths = maps:with(short_paths(), Conf),
+ get_listener_opts(Limiter, ShortPaths).
+
+get_node_opts(Type) ->
+ Opts = emqx:get_config([limiter, Type], default_bucket_config()),
+ case type_to_short_path_name(Type) of
+ undefined ->
+ Opts;
+ Name ->
+ case emqx:get_config([limiter, Name], undefined) of
+ undefined ->
+ Opts;
+ Rate ->
+ Opts#{rate := Rate}
+ end
+ end.
+
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
@@ -476,3 +518,42 @@ merge_client_bucket(Type, _, {ok, BucketVal}) ->
#{Type => BucketVal};
merge_client_bucket(_, _, _) ->
undefined.
+
+short_path_name_to_type(max_conn_rate) ->
+ connection;
+short_path_name_to_type(messages_rate) ->
+ messages;
+short_path_name_to_type(bytes_rate) ->
+ bytes.
+
+type_to_short_path_name(connection) ->
+ max_conn_rate;
+type_to_short_path_name(messages) ->
+ messages_rate;
+type_to_short_path_name(bytes) ->
+ bytes_rate;
+type_to_short_path_name(_) ->
+ undefined.
+
+get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 ->
+ Limiter;
+get_listener_opts(undefined, ShortPaths) ->
+ convert_listener_short_paths(ShortPaths);
+get_listener_opts(Limiter, ShortPaths) ->
+ Shorts = convert_listener_short_paths(ShortPaths),
+ emqx_utils_maps:deep_merge(Limiter, Shorts).
+
+convert_listener_short_paths(ShortPaths) ->
+ DefBucket = default_bucket_config(),
+ DefClient = default_client_config(),
+ Fun = fun(Name, Rate, Acc) ->
+ Type = short_path_name_to_type(Name),
+ case Name of
+ max_conn_rate ->
+ Acc#{Type => DefBucket#{rate => Rate}};
+ _ ->
+ Client = maps:get(client, Acc, #{}),
+ Acc#{client => Client#{Type => DefClient#{rate => Rate}}}
+ end
+ end,
+ maps:fold(Fun, #{}, ShortPaths).
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
index 2867283d6..488f47851 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl
@@ -481,7 +481,7 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) ->
-spec init_tree(emqx_limiter_schema:limiter_type()) -> state().
init_tree(Type) when is_atom(Type) ->
- Cfg = emqx:get_config([limiter, Type]),
+ Cfg = emqx_limiter_schema:get_node_opts(Type),
init_tree(Type, Cfg).
init_tree(Type, #{rate := Rate} = Cfg) ->
@@ -625,13 +625,10 @@ find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity -
{error, invalid_bucket}
end;
%% this is a node-level reference
-find_referenced_bucket(Id, Type, _) ->
- case emqx:get_config([limiter, Type], undefined) of
+find_referenced_bucket(_Id, Type, _) ->
+ case emqx_limiter_schema:get_node_opts(Type) of
#{rate := infinity} ->
false;
- undefined ->
- ?SLOG(error, #{msg => "invalid limiter type", type => Type, id => Id}),
- {error, invalid_bucket};
NodeCfg ->
{ok, Bucket} = emqx_limiter_manager:find_root(Type),
{ok, Bucket, NodeCfg}
diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl
index cba11ede2..be9b62d01 100644
--- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl
+++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl
@@ -86,7 +86,7 @@ init([]) ->
%% Internal functions
%%--==================================================================
make_child(Type) ->
- Cfg = emqx:get_config([limiter, Type]),
+ Cfg = emqx_limiter_schema:get_node_opts(Type),
make_child(Type, Cfg).
make_child(Type, Cfg) ->
diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl
index 99ab52f61..1d49a0c44 100644
--- a/apps/emqx/src/emqx_listeners.erl
+++ b/apps/emqx/src/emqx_listeners.erl
@@ -639,7 +639,7 @@ zone(Opts) ->
maps:get(zone, Opts, undefined).
limiter(Opts) ->
- maps:get(limiter, Opts, undefined).
+ emqx_limiter_schema:get_listener_opts(Opts).
add_limiter_bucket(Id, #{limiter := Limiter}) ->
maps:fold(
diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl
index 790cb6107..c875b9632 100644
--- a/apps/emqx/src/emqx_schema.erl
+++ b/apps/emqx/src/emqx_schema.erl
@@ -2000,7 +2000,8 @@ base_listener(Bind) ->
listener_fields
),
#{
- desc => ?DESC(base_listener_limiter)
+ desc => ?DESC(base_listener_limiter),
+ importance => ?IMPORTANCE_HIDDEN
}
)},
{"enable_authn",
@@ -2011,7 +2012,7 @@ base_listener(Bind) ->
default => true
}
)}
- ].
+ ] ++ emqx_limiter_schema:short_paths_fields(?MODULE).
desc("persistent_session_store") ->
"Settings for message persistence.";
diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl
index 67ed8e6bc..6f488eaa9 100644
--- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl
+++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl
@@ -47,7 +47,7 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
- ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF),
+ load_conf(),
emqx_common_test_helpers:start_apps([?APP]),
Config.
@@ -55,13 +55,15 @@ end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([?APP]).
init_per_testcase(_TestCase, Config) ->
+ emqx_config:erase(limiter),
+ load_conf(),
Config.
end_per_testcase(_TestCase, Config) ->
Config.
load_conf() ->
- emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
+ ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
init_config() ->
emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF).
@@ -313,8 +315,8 @@ t_capacity(_) ->
%% Test Cases Global Level
%%--------------------------------------------------------------------
t_collaborative_alloc(_) ->
- GlobalMod = fun(#{message_routing := MR} = Cfg) ->
- Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}}
+ GlobalMod = fun(Cfg) ->
+ Cfg#{message_routing => #{rate => ?RATE("600/1s"), burst => 0}}
end,
Bucket1 = fun(#{client := Cli} = Bucket) ->
@@ -353,11 +355,11 @@ t_collaborative_alloc(_) ->
).
t_burst(_) ->
- GlobalMod = fun(#{message_routing := MR} = Cfg) ->
+ GlobalMod = fun(Cfg) ->
Cfg#{
- message_routing := MR#{
- rate := ?RATE("200/1s"),
- burst := ?RATE("400/1s")
+ message_routing => #{
+ rate => ?RATE("200/1s"),
+ burst => ?RATE("400/1s")
}
}
end,
@@ -653,16 +655,16 @@ t_not_exists_instance(_) ->
),
?assertEqual(
- {error, invalid_bucket},
+ {ok, infinity},
emqx_limiter_server:connect(?FUNCTION_NAME, not_exists, Cfg)
),
ok.
t_create_instance_with_node(_) ->
- GlobalMod = fun(#{message_routing := MR} = Cfg) ->
+ GlobalMod = fun(Cfg) ->
Cfg#{
- message_routing := MR#{rate := ?RATE("200/1s")},
- messages := MR#{rate := ?RATE("200/1s")}
+ message_routing => #{rate => ?RATE("200/1s"), burst => 0},
+ messages => #{rate => ?RATE("200/1s"), burst => 0}
}
end,
@@ -739,6 +741,68 @@ t_esockd_htb_consume(_) ->
?assertMatch({ok, _}, C2R),
ok.
+%%--------------------------------------------------------------------
+%% Test Cases short paths
+%%--------------------------------------------------------------------
+t_node_short_paths(_) ->
+ CfgStr = <<"limiter {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}">>,
+ ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
+ Accessor = fun emqx_limiter_schema:get_node_opts/1,
+ ?assertMatch(#{rate := 100.0}, Accessor(connection)),
+ ?assertMatch(#{rate := 10.0}, Accessor(messages)),
+ ?assertMatch(#{rate := 1.0}, Accessor(bytes)),
+ ?assertMatch(#{rate := infinity}, Accessor(message_routing)),
+ ?assertEqual(undefined, emqx:get_config([limiter, connection], undefined)).
+
+t_compatibility_for_node_short_paths(_) ->
+ CfgStr =
+ <<"limiter {max_conn_rate = \"1000\", connection.rate = \"500\", bytes.rate = \"200\"}">>,
+ ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr),
+ Accessor = fun emqx_limiter_schema:get_node_opts/1,
+ ?assertMatch(#{rate := 100.0}, Accessor(connection)),
+ ?assertMatch(#{rate := 20.0}, Accessor(bytes)).
+
+t_listener_short_paths(_) ->
+ CfgStr = <<
+ ""
+ "listeners.tcp.default {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}"
+ ""
+ >>,
+ ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
+ ListenerOpt = emqx:get_config([listeners, tcp, default]),
+ ?assertMatch(
+ #{
+ client := #{
+ messages := #{rate := 10.0},
+ bytes := #{rate := 1.0}
+ },
+ connection := #{rate := 100.0}
+ },
+ emqx_limiter_schema:get_listener_opts(ListenerOpt)
+ ).
+
+t_compatibility_for_listener_short_paths(_) ->
+ CfgStr = <<
+ "" "listeners.tcp.default {max_conn_rate = \"1000\", limiter.connection.rate = \"500\"}" ""
+ >>,
+ ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
+ ListenerOpt = emqx:get_config([listeners, tcp, default]),
+ ?assertMatch(
+ #{
+ connection := #{rate := 100.0}
+ },
+ emqx_limiter_schema:get_listener_opts(ListenerOpt)
+ ).
+
+t_no_limiter_for_listener(_) ->
+ CfgStr = <<>>,
+ ok = emqx_common_test_helpers:load_config(emqx_schema, CfgStr),
+ ListenerOpt = emqx:get_config([listeners, tcp, default]),
+ ?assertEqual(
+ undefined,
+ emqx_limiter_schema:get_listener_opts(ListenerOpt)
+ ).
+
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
@@ -1043,3 +1107,11 @@ make_create_test_data_with_infinity_node(FakeInstnace) ->
%% client = C bucket = B C > B
{MkA(1000, 100), IsRefLimiter(FakeInstnace)}
].
+
+parse_schema(ConfigString) ->
+ {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+ hocon_tconf:check_plain(
+ emqx_limiter_schema,
+ RawConf,
+ #{required => false, atom_key => false}
+ ).
diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl
index bc9aaf768..1d691c536 100644
--- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl
+++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl
@@ -28,7 +28,8 @@
config_reset/3,
configs/3,
get_full_config/0,
- global_zone_configs/3
+ global_zone_configs/3,
+ limiter/3
]).
-define(PREFIX, "/configs/").
@@ -42,7 +43,6 @@
<<"alarm">>,
<<"sys_topics">>,
<<"sysmon">>,
- <<"limiter">>,
<<"log">>,
<<"persistent_session_store">>,
<<"zones">>
@@ -57,7 +57,8 @@ paths() ->
[
"/configs",
"/configs_reset/:rootname",
- "/configs/global_zone"
+ "/configs/global_zone",
+ "/configs/limiter"
] ++
lists:map(fun({Name, _Type}) -> ?PREFIX ++ binary_to_list(Name) end, config_list()).
@@ -147,6 +148,28 @@ schema("/configs/global_zone") ->
}
}
};
+schema("/configs/limiter") ->
+ #{
+ 'operationId' => limiter,
+ get => #{
+ tags => ?TAGS,
+ description => <<"Get the node-level limiter configs">>,
+ responses => #{
+ 200 => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
+ 404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"config not found">>)
+ }
+ },
+ put => #{
+ tags => ?TAGS,
+ description => <<"Update the node-level limiter configs">>,
+ 'requestBody' => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
+ responses => #{
+ 200 => hoconsc:mk(hoconsc:ref(emqx_limiter_schema, limiter)),
+ 400 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED']),
+ 403 => emqx_dashboard_swagger:error_codes(['UPDATE_FAILED'])
+ }
+ }
+ };
schema(Path) ->
{RootKey, {_Root, Schema}} = find_schema(Path),
#{
@@ -272,6 +295,22 @@ configs(get, Params, _Req) ->
{200, Res}
end.
+limiter(get, _Params, _Req) ->
+ {200, format_limiter_config(get_raw_config(limiter))};
+limiter(put, #{body := NewConf}, _Req) ->
+ case emqx_conf:update([limiter], NewConf, ?OPTS) of
+ {ok, #{raw_config := RawConf}} ->
+ {200, format_limiter_config(RawConf)};
+ {error, {permission_denied, Reason}} ->
+ {403, #{code => 'UPDATE_FAILED', message => Reason}};
+ {error, Reason} ->
+ {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
+ end.
+
+format_limiter_config(RawConf) ->
+ Shorts = lists:map(fun erlang:atom_to_binary/1, emqx_limiter_schema:short_paths()),
+ maps:with(Shorts, RawConf).
+
conf_path_reset(Req) ->
<<"/api/v5", ?PREFIX_RESET, Path/binary>> = cowboy_req:path(Req),
string:lexemes(Path, "/ ").
diff --git a/rel/i18n/emqx_limiter_schema.hocon b/rel/i18n/emqx_limiter_schema.hocon
index c99840375..b2958ce90 100644
--- a/rel/i18n/emqx_limiter_schema.hocon
+++ b/rel/i18n/emqx_limiter_schema.hocon
@@ -1,5 +1,26 @@
emqx_limiter_schema {
+max_conn_rate.desc:
+"""Maximum connection rate.
+This is used to limit the connection rate for this node,
+once the limit is reached, new connections will be deferred or refused"""
+max_conn_rate.label:
+"""Maximum Connection Rate"""
+
+messages_rate.desc:
+"""Messages publish rate.
+This is used to limit the inbound message numbers for this node,
+once the limit is reached, the restricted client will slow down and even be hung for a while."""
+messages_rate.label:
+"""Messages Publish Rate"""
+
+bytes_rate.desc:
+"""Data publish rate.
+This is used to limit the inbound bytes rate for this node,
+once the limit is reached, the restricted client will slow down and even be hung for a while."""
+bytes_rate.label:
+"""Data Publish Rate"""
+
bucket_cfg.desc:
"""Bucket Configs"""
diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon
index 76cce8e78..11da6caa3 100644
--- a/rel/i18n/emqx_schema.hocon
+++ b/rel/i18n/emqx_schema.hocon
@@ -1033,6 +1033,27 @@ base_listener_limiter.desc:
base_listener_limiter.label:
"""Type of the rate limit."""
+max_conn_rate.desc:
+"""Maximum connection rate.
+This is used to limit the connection rate for this listener,
+once the limit is reached, new connections will be deferred or refused"""
+max_conn_rate.label:
+"""Maximum Connection Rate"""
+
+messages_rate.desc:
+"""Messages publish rate.
+This is used to limit the inbound message numbers for each client connected to this listener,
+once the limit is reached, the restricted client will slow down and even be hung for a while."""
+messages_rate.label:
+"""Messages Publish Rate"""
+
+bytes_rate.desc:
+"""Data publish rate.
+This is used to limit the inbound bytes rate for each client connected to this listener,
+once the limit is reached, the restricted client will slow down and even be hung for a while."""
+bytes_rate.label:
+"""Data Publish Rate"""
+
persistent_session_store_backend.desc:
"""Database management system used to store information about persistent sessions and messages.
- `builtin`: Use the embedded database (mria)"""