diff --git a/apps/emqx/i18n/emqx_limiter_i18n.conf b/apps/emqx/i18n/emqx_limiter_i18n.conf index b75435225..99ecc9e1e 100644 --- a/apps/emqx/i18n/emqx_limiter_i18n.conf +++ b/apps/emqx/i18n/emqx_limiter_i18n.conf @@ -89,10 +89,10 @@ the check/consume will succeed, but it will be forced to wait for a short period } } - per_client { + client { desc { - en: """The rate limit for each user of the bucket, this field is not required""" - zh: """对桶的每个使用者的速率控制设置,这个不是必须的""" + en: """The rate limit for each user of the bucket""" + zh: """对桶的每个使用者的速率控制设置""" } label: { en: """Per Client""" diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 61dc95bc7..f5e90a7e8 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -32,8 +32,7 @@ desc/1, types/0, infinity_value/0, - bucket_opts/0, - bucket_opts_meta/0 + bucket_fields/1 ]). -define(KILOBYTE, 1024). @@ -96,7 +95,17 @@ fields(limiter) -> default => make_limiter_default(Type) })} || Type <- types() - ]; + ] ++ + [ + {client, + ?HOCON( + ?R_REF(client_fields), + #{ + desc => ?DESC(client), + default => #{} + } + )} + ]; fields(node_opts) -> [ {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})}, @@ -104,15 +113,22 @@ fields(node_opts) -> ?HOCON(burst_rate(), #{ desc => ?DESC(burst), default => 0 - })}, - {client, ?HOCON(?R_REF(client_opts), #{default => #{}})} + })} + ]; +fields(client_fields) -> + [ + {Type, + ?HOCON(?R_REF(client_opts), #{ + desc => ?DESC(Type), + default => #{} + })} + || Type <- types() ]; fields(bucket_opts) -> [ {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})}, {capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => "infinity"})}, - {initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})}, - {client, ?HOCON(?R_REF(client_opts), #{required => false})} + {initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})} ]; fields(client_opts) -> [ @@ -159,14 +175,23 @@ fields(client_opts) -> default => force } )} - ]. + ]; +fields({client_fields, Types}) -> + [ + {Type, + ?HOCON(?R_REF(client_opts), #{ + desc => ?DESC(Type), + required => false + })} + || Type <- Types + ]; +fields({bucket_fields, Types}) -> + bucket_fields(Types). desc(limiter) -> "Settings for the rate limiter."; desc(node_opts) -> "Settings for the limiter of the node level."; -desc(node_client_opts) -> - "Settings for the client in the node level."; desc(bucket_opts) -> "Settings for the bucket."; desc(client_opts) -> @@ -174,23 +199,37 @@ desc(client_opts) -> desc(_) -> undefined. -bucket_opts() -> - ?HOCON( - ?MAP("bucket_name", ?R_REF(bucket_opts)), - bucket_opts_meta() - ). - -bucket_opts_meta() -> - #{ - default => #{}, - example => - #{ - <<"rate">> => <<"infinity">>, - <<"capcity">> => <<"infinity">>, - <<"initial">> => <<"100">>, - <<"client">> => #{<<"rate">> => <<"infinity">>} - } - }. +bucket_fields(Type) when is_atom(Type) -> + fields(bucket_opts) ++ + [ + {client, + ?HOCON( + ?R_REF(?MODULE, client_opts), + #{ + desc => ?DESC(client), + required => false + } + )} + ]; +bucket_fields(Types) -> + [ + {Type, + ?HOCON(?R_REF(?MODULE, bucket_opts), #{ + desc => ?DESC(?MODULE, Type), + required => false + })} + || Type <- Types + ] ++ + [ + {client, + ?HOCON( + ?R_REF(?MODULE, {client_fields, Types}), + #{ + desc => ?DESC(client), + required => false + } + )} + ]. %% default period is 100ms default_period() -> diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 997f6b788..66cafa7dc 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -116,35 +116,28 @@ %% If no bucket path is set in config, there will be no limit connect(_Id, _Type, undefined) -> {ok, emqx_htb_limiter:make_infinity_limiter()}; -connect( - Id, - Type, - #{ - rate := BucketRate, - capacity := BucketSize - } = BucketCfg -) -> - case emqx_limiter_manager:find_bucket(Id, Type) of - {ok, Bucket} -> - case find_client_cfg(Type, BucketCfg) of - #{rate := CliRate, capacity := CliSize} = ClientCfg -> - {ok, - if - CliRate < BucketRate orelse CliSize < BucketSize -> - emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket); - true -> - emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket) - end}; - {error, invalid_node_cfg} = Error -> - ?SLOG(error, #{msg => "invalid_node_cfg", type => Type, id => Id}), - Error - end; +connect(Id, Type, Cfg) -> + case find_limiter_cfg(Type, Cfg) of + {undefined, _} -> + {ok, emqx_htb_limiter:make_infinity_limiter()}; + { + #{ + rate := BucketRate, + capacity := BucketSize + } = BucketCfg, + #{rate := CliRate, capacity := CliSize} = ClientCfg + } -> + {ok, + if + CliRate < BucketRate orelse CliSize < BucketSize -> + emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, BucketCfg); + true -> + emqx_htb_limiter:make_ref_limiter(ClientCfg, BucketCfg) + end}; undefined -> ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}), {error, invalid_bucket} - end; -connect(Id, Type, Paths) -> - connect(Id, Type, maps:get(Type, Paths, undefined)). + end. -spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok. add_bucket(_Id, _Type, undefine) -> @@ -571,9 +564,16 @@ call(Type, Msg) -> gen_server:call(Pid, Msg) end. -find_client_cfg(Type, Cfg) -> - NodeCfg = emqx:get_config([limiter, Type, client], undefined), - BucketCfg = maps:get(client, Cfg, undefined), +find_limiter_cfg(Type, #{rate := _} = Cfg) -> + {Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))}; +find_limiter_cfg(Type, Cfg) -> + { + maps:get(Type, Cfg), + find_client_cfg(Type, emqx_map_lib:deep_get([client, Type], Cfg, undefined)) + }. + +find_client_cfg(Type, BucketCfg) -> + NodeCfg = emqx:get_config([limiter, client, Type], undefined), merge_client_cfg(NodeCfg, BucketCfg). merge_client_cfg(undefined, BucketCfg) -> diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 0c8a073a0..81f5c922a 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1619,10 +1619,13 @@ base_listener(Bind) -> )}, {"limiter", sc( - map("ratelimit_name", ?R_REF(emqx_limiter_schema, bucket_opts)), + ?R_REF( + emqx_limiter_schema, + {bucket_fields, [bytes_in, message_in, connection, message_routing]} + ), #{ - desc => ?DESC(base_listener_limiter) - %% TODO default => #{<<"connection">> => <<"default">>} + desc => ?DESC(base_listener_limiter), + default => #{} } )}, {"enable_authn", diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 80b4e9624..b9c1e5c61 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -755,6 +755,8 @@ to_bin(List) when is_list(List) -> end; to_bin(Boolean) when is_boolean(Boolean) -> Boolean; to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +to_bin({Type, Args}) -> + unicode:characters_to_binary(io_lib:format("~p(~p)", [Type, Args])); to_bin(X) -> X. diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 22083ba2c..986eb4105 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -86,7 +86,7 @@ fields(flow_control) -> )}, {batch_deliver_limiter, sc( - ?R_REF(emqx_limiter_schema, bucket_opts), + ?R_REF(emqx_limiter_schema, {bucket_fields, internal}), batch_deliver_limiter, undefined )}