diff --git a/apps/emqx/i18n/emqx_limiter_i18n.conf b/apps/emqx/i18n/emqx_limiter_i18n.conf index 6fbc923b7..99ecc9e1e 100644 --- a/apps/emqx/i18n/emqx_limiter_i18n.conf +++ b/apps/emqx/i18n/emqx_limiter_i18n.conf @@ -89,10 +89,10 @@ the check/consume will succeed, but it will be forced to wait for a short period } } - per_client { + client { desc { - en: """The rate limit for each user of the bucket, this field is not required""" - zh: """对桶的每个使用者的速率控制设置,这个不是必须的""" + en: """The rate limit for each user of the bucket""" + zh: """对桶的每个使用者的速率控制设置""" } label: { en: """Per Client""" @@ -124,20 +124,6 @@ the check/consume will succeed, but it will be forced to wait for a short period } } - batch { - desc { - en: """The batch limiter. -This is used for EMQX internal batch operation -e.g. limit the retainer's deliver rate""" - zh: """批量操作速率控制器。 -这是给 EMQX 内部的批量操作使用的,比如用来控制保留消息的派发速率""" - } - label: { - en: """Batch""" - zh: """批量操作""" - } - } - message_routing { desc { en: """The message routing limiter. @@ -193,4 +179,12 @@ Once the limit is reached, the restricted client will be slow down even be hung zh: """流入字节率""" } } + + internal { + desc { + en: """Limiter for EMQX internal app.""" + zh: """EMQX 内部功能所用限制器。""" + + } + } } diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d6f2b87ea..cdd4b1a9e 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -252,11 +252,12 @@ init( <<>> -> undefined; MP -> MP end, + ListenerId = emqx_listeners:listener_id(Type, Listener), ClientInfo = set_peercert_infos( Peercert, #{ zone => Zone, - listener => emqx_listeners:listener_id(Type, Listener), + listener => ListenerId, protocol => Protocol, peerhost => PeerHost, sockport => SockPort, @@ -278,7 +279,9 @@ init( outbound => #{} }, auth_cache = #{}, - quota = emqx_limiter_container:get_limiter_by_names([?LIMITER_ROUTING], LimiterCfg), + quota = emqx_limiter_container:get_limiter_by_types( + ListenerId, [?LIMITER_ROUTING], LimiterCfg + ), timers = #{}, conn_state = idle, takeover = false, @@ -1199,9 +1202,6 @@ handle_call( disconnect_and_shutdown(takenover, AllPendings, Channel); handle_call(list_authz_cache, Channel) -> {reply, emqx_authz_cache:list_authz_cache(), Channel}; -handle_call({quota, Bucket}, #channel{quota = Quota} = Channel) -> - Quota2 = emqx_limiter_container:update_by_name(message_routing, Bucket, Quota), - reply(ok, Channel#channel{quota = Quota2}); handle_call( {keepalive, Interval}, Channel = #channel{ diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 59248a0b8..1caf345e6 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -321,7 +321,7 @@ init_state( }, LimiterTypes = [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN], - Limiter = emqx_limiter_container:get_limiter_by_names(LimiterTypes, LimiterCfg), + Limiter = emqx_limiter_container:get_limiter_by_types(Listener, LimiterTypes, LimiterCfg), FrameOpts = #{ strict_mode => emqx_config:get_zone_conf(Zone, [mqtt, strict_mode]), @@ -672,12 +672,6 @@ handle_call(_From, info, State) -> {reply, info(State), State}; handle_call(_From, stats, State) -> {reply, stats(State), State}; -handle_call(_From, {ratelimit, Changes}, State = #state{limiter = Limiter}) -> - Fun = fun({Type, Bucket}, Acc) -> - emqx_limiter_container:update_by_name(Type, Bucket, Acc) - end, - Limiter2 = lists:foldl(Fun, Limiter, Changes), - {reply, ok, State#state{limiter = Limiter2}}; handle_call(_From, Req, State = #state{channel = Channel}) -> case emqx_channel:handle_call(Req, Channel) of {reply, Reply, NChannel} -> diff --git a/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl index c39cf2728..16f7b03c8 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl @@ -19,12 +19,13 @@ -behaviour(esockd_generic_limiter). %% API --export([new_create_options/2, create/1, delete/1, consume/2]). +-export([new_create_options/3, create/1, delete/1, consume/2]). -type create_options() :: #{ module := ?MODULE, + id := emqx_limiter_schema:limiter_id(), type := emqx_limiter_schema:limiter_type(), - bucket := emqx_limiter_schema:bucket_name() + bucket := hocons:config() }. %%-------------------------------------------------------------------- @@ -32,15 +33,16 @@ %%-------------------------------------------------------------------- -spec new_create_options( + emqx_limiter_schema:limiter_id(), emqx_limiter_schema:limiter_type(), - emqx_limiter_schema:bucket_name() + hocons:config() ) -> create_options(). -new_create_options(Type, BucketName) -> - #{module => ?MODULE, type => Type, bucket => BucketName}. +new_create_options(Id, Type, BucketCfg) -> + #{module => ?MODULE, id => Id, type => Type, bucket => BucketCfg}. -spec create(create_options()) -> esockd_generic_limiter:limiter(). -create(#{module := ?MODULE, type := Type, bucket := BucketName}) -> - {ok, Limiter} = emqx_limiter_server:connect(Type, BucketName), +create(#{module := ?MODULE, id := Id, type := Type, bucket := BucketCfg}) -> + {ok, Limiter} = emqx_limiter_server:connect(Id, Type, BucketCfg), #{module => ?MODULE, name => Type, limiter => Limiter}. delete(_GLimiter) -> diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl index f82a97a5a..74b6c7b87 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl @@ -22,10 +22,8 @@ %% API -export([ - new/0, new/1, new/2, - get_limiter_by_names/2, + get_limiter_by_types/3, add_new/3, - update_by_name/3, set_retry_context/2, check/3, retry/2, @@ -48,10 +46,10 @@ }. -type future() :: pos_integer(). +-type limiter_id() :: emqx_limiter_schema:limiter_id(). -type limiter_type() :: emqx_limiter_schema:limiter_type(). -type limiter() :: emqx_htb_limiter:limiter(). -type retry_context() :: emqx_htb_limiter:retry_context(). --type bucket_name() :: emqx_limiter_schema:bucket_name(). -type millisecond() :: non_neg_integer(). -type check_result() :: {ok, container()} @@ -64,46 +62,24 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- --spec new() -> container(). -new() -> - new([]). - -%% @doc generate default data according to the type of limiter --spec new(list(limiter_type())) -> container(). -new(Types) -> - new(Types, #{}). - --spec new( - list(limiter_type()), - #{limiter_type() => emqx_limiter_schema:bucket_name()} -) -> container(). -new(Types, Names) -> - get_limiter_by_names(Types, Names). - %% @doc generate a container %% according to the type of limiter and the bucket name configuration of the limiter %% @end --spec get_limiter_by_names( +-spec get_limiter_by_types( + limiter_id() | {atom(), atom()}, list(limiter_type()), - #{limiter_type() => emqx_limiter_schema:bucket_name()} + #{limiter_type() => hocons:config()} ) -> container(). -get_limiter_by_names(Types, BucketNames) -> +get_limiter_by_types({Type, Listener}, Types, BucketCfgs) -> + Id = emqx_listeners:listener_id(Type, Listener), + get_limiter_by_types(Id, Types, BucketCfgs); +get_limiter_by_types(Id, Types, BucketCfgs) -> Init = fun(Type, Acc) -> - {ok, Limiter} = emqx_limiter_server:connect(Type, BucketNames), + {ok, Limiter} = emqx_limiter_server:connect(Id, Type, BucketCfgs), add_new(Type, Limiter, Acc) end, lists:foldl(Init, #{retry_ctx => undefined}, Types). -%% @doc add the specified type of limiter to the container --spec update_by_name( - limiter_type(), - bucket_name() | #{limiter_type() => bucket_name()}, - container() -) -> container(). -update_by_name(Type, Buckets, Container) -> - {ok, Limiter} = emqx_limiter_server:connect(Type, Buckets), - add_new(Type, Limiter, Container). - -spec add_new(limiter_type(), limiter(), container()) -> container(). add_new(Type, Limiter, Container) -> Container#{ diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index 89148a12c..aca27a6ff 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -24,11 +24,9 @@ %% API -export([ start_link/0, - find_bucket/1, find_bucket/2, - insert_bucket/2, insert_bucket/3, - make_path/2, + delete_bucket/2, post_config_update/5 ]). @@ -50,20 +48,19 @@ format_status/2 ]). --export_type([path/0]). - --type path() :: list(atom()). +-type limiter_id() :: emqx_limiter_schema:limiter_id(). -type limiter_type() :: emqx_limiter_schema:limiter_type(). --type bucket_name() :: emqx_limiter_schema:bucket_name(). +-type uid() :: {limiter_id(), limiter_type()}. %% counter record in ets table -record(bucket, { - path :: path(), + uid :: uid(), bucket :: bucket_ref() }). -type bucket_ref() :: emqx_limiter_bucket_ref:bucket_ref(). +-define(UID(Id, Type), {Id, Type}). -define(TAB, emqx_limiter_counters). %%-------------------------------------------------------------------- @@ -85,14 +82,10 @@ restart_server(Type) -> stop_server(Type) -> emqx_limiter_server_sup:stop(Type). --spec find_bucket(limiter_type(), bucket_name()) -> +-spec find_bucket(limiter_id(), limiter_type()) -> {ok, bucket_ref()} | undefined. -find_bucket(Type, BucketName) -> - find_bucket(make_path(Type, BucketName)). - --spec find_bucket(path()) -> {ok, bucket_ref()} | undefined. -find_bucket(Path) -> - case ets:lookup(?TAB, Path) of +find_bucket(Id, Type) -> + case ets:lookup(?TAB, ?UID(Id, Type)) of [#bucket{bucket = Bucket}] -> {ok, Bucket}; _ -> @@ -100,20 +93,19 @@ find_bucket(Path) -> end. -spec insert_bucket( + limiter_id(), limiter_type(), - bucket_name(), bucket_ref() ) -> boolean(). -insert_bucket(Type, BucketName, Bucket) -> - inner_insert_bucket(make_path(Type, BucketName), Bucket). +insert_bucket(Id, Type, Bucket) -> + ets:insert( + ?TAB, + #bucket{uid = ?UID(Id, Type), bucket = Bucket} + ). --spec insert_bucket(path(), bucket_ref()) -> true. -insert_bucket(Path, Bucket) -> - inner_insert_bucket(Path, Bucket). - --spec make_path(limiter_type(), bucket_name()) -> path(). -make_path(Type, BucketName) -> - [Type | BucketName]. +-spec delete_bucket(limiter_id(), limiter_type()) -> true. +delete_bucket(Type, Id) -> + ets:delete(?TAB, ?UID(Id, Type)). post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) -> Config = maps:get(Type, NewConf), @@ -159,7 +151,7 @@ init([]) -> set, public, named_table, - {keypos, #bucket.path}, + {keypos, #bucket.uid}, {write_concurrency, true}, {read_concurrency, true}, {heir, erlang:whereis(emqx_limiter_sup), none} @@ -266,9 +258,3 @@ format_status(_Opt, Status) -> %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- --spec inner_insert_bucket(path(), bucket_ref()) -> true. -inner_insert_bucket(Path, Bucket) -> - ets:insert( - ?TAB, - #bucket{path = Path, bucket = Bucket} - ). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 1e4679ee3..bce87e2ba 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -41,8 +41,10 @@ | message_in | connection | message_routing - | batch. + %% internal limiter for unclassified resources + | internal. +-type limiter_id() :: atom(). -type bucket_name() :: atom(). -type rate() :: infinity | float(). -type burst_rate() :: 0 | float(). @@ -76,7 +78,7 @@ bucket_name/0 ]). --export_type([limiter_type/0, bucket_path/0]). +-export_type([limiter_id/0, limiter_type/0, bucket_path/0]). -define(UNIT_TIME_IN_MS, 1000). @@ -87,52 +89,50 @@ roots() -> [limiter]. fields(limiter) -> [ {Type, - ?HOCON(?R_REF(limiter_opts), #{ + ?HOCON(?R_REF(node_opts), #{ desc => ?DESC(Type), - default => make_limiter_default(Type) + default => #{} })} || Type <- types() - ]; -fields(limiter_opts) -> + ] ++ + [ + {client, + ?HOCON( + ?R_REF(client_fields), + #{ + desc => ?DESC(client), + default => maps:from_list([ + {erlang:atom_to_binary(Type), #{}} + || Type <- types() + ]) + } + )} + ]; +fields(node_opts) -> [ {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})}, {burst, ?HOCON(burst_rate(), #{ desc => ?DESC(burst), default => 0 - })}, - {bucket, - ?HOCON( - ?MAP("bucket_name", ?R_REF(bucket_opts)), - #{ - desc => ?DESC(bucket_cfg), - default => #{<<"default">> => #{}}, - example => #{ - <<"mybucket-name">> => #{ - <<"rate">> => <<"infinity">>, - <<"capcity">> => <<"infinity">>, - <<"initial">> => <<"100">>, - <<"per_client">> => #{<<"rate">> => <<"infinity">>} - } - } - } - )} + })} + ]; +fields(client_fields) -> + [ + {Type, + ?HOCON(?R_REF(client_opts), #{ + desc => ?DESC(Type), + default => #{} + })} + || Type <- types() ]; fields(bucket_opts) -> [ {rate, ?HOCON(rate(), #{desc => ?DESC(rate), default => "infinity"})}, {capacity, ?HOCON(capacity(), #{desc => ?DESC(capacity), default => "infinity"})}, - {initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})}, - {per_client, - ?HOCON( - ?R_REF(client_bucket), - #{ - default => #{}, - desc => ?DESC(per_client) - } - )} + {initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})} ]; -fields(client_bucket) -> +fields(client_opts) -> [ {rate, ?HOCON(rate(), #{default => "infinity", desc => ?DESC(rate)})}, {initial, ?HOCON(initial(), #{default => "0", desc => ?DESC(initial)})}, @@ -177,16 +177,30 @@ fields(client_bucket) -> default => force } )} - ]. + ]; +fields(listener_fields) -> + bucket_fields([bytes_in, message_in, connection, message_routing], listener_client_fields); +fields(listener_client_fields) -> + client_fields([bytes_in, message_in, connection, message_routing]); +fields(Type) -> + bucket_field(Type). desc(limiter) -> "Settings for the rate limiter."; -desc(limiter_opts) -> - "Settings for the limiter."; +desc(node_opts) -> + "Settings for the limiter of the node level."; desc(bucket_opts) -> "Settings for the bucket."; -desc(client_bucket) -> - "Settings for the client bucket."; +desc(client_opts) -> + "Settings for the client in bucket level."; +desc(client_fields) -> + "Fields of the client level."; +desc(listener_fields) -> + "Fields of the listener."; +desc(listener_client_fields) -> + "Fields of the client level of the listener."; +desc(internal) -> + "Internal limiter."; desc(_) -> undefined. @@ -202,7 +216,7 @@ get_bucket_cfg_path(Type, BucketName) -> [limiter, Type, bucket, BucketName]. types() -> - [bytes_in, message_in, connection, message_routing, batch]. + [bytes_in, message_in, connection, message_routing, internal]. %%-------------------------------------------------------------------- %% Internal functions @@ -322,16 +336,44 @@ apply_unit("mb", Val) -> Val * ?KILOBYTE * ?KILOBYTE; apply_unit("gb", Val) -> Val * ?KILOBYTE * ?KILOBYTE * ?KILOBYTE; apply_unit(Unit, _) -> throw("invalid unit:" ++ Unit). -make_limiter_default(connection) -> - #{ - <<"rate">> => <<"1000/s">>, - <<"bucket">> => #{ - <<"default">> => - #{ - <<"rate">> => <<"1000/s">>, - <<"capacity">> => 1000 - } - } - }; -make_limiter_default(_) -> - #{}. +bucket_field(Type) when is_atom(Type) -> + fields(bucket_opts) ++ + [ + {client, + ?HOCON( + ?R_REF(?MODULE, client_opts), + #{ + desc => ?DESC(client), + required => false + } + )} + ]. +bucket_fields(Types, ClientRef) -> + [ + {Type, + ?HOCON(?R_REF(?MODULE, bucket_opts), #{ + desc => ?DESC(?MODULE, Type), + required => false + })} + || Type <- Types + ] ++ + [ + {client, + ?HOCON( + ?R_REF(?MODULE, ClientRef), + #{ + desc => ?DESC(client), + required => false + } + )} + ]. + +client_fields(Types) -> + [ + {Type, + ?HOCON(?R_REF(client_opts), #{ + desc => ?DESC(Type), + required => false + })} + || Type <- Types + ]. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 519b32eca..c5e919296 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -42,11 +42,13 @@ -export([ start_link/2, - connect/2, + connect/3, + add_bucket/3, + del_bucket/2, + get_initial_val/1, whereis/1, info/1, name/1, - get_initial_val/1, restart/1, update_config/2 ]). @@ -73,16 +75,17 @@ -type state() :: #{ type := limiter_type(), - root := undefined | root(), + root := root(), buckets := buckets(), %% current counter to alloc - counter := undefined | counters:counters_ref(), - index := index() + counter := counters:counters_ref(), + index := 0 | index() }. -type buckets() :: #{bucket_name() => bucket()}. -type limiter_type() :: emqx_limiter_schema:limiter_type(). -type bucket_name() :: emqx_limiter_schema:bucket_name(). +-type limiter_id() :: emqx_limiter_schema:limiter_id(). -type rate() :: decimal(). -type flow() :: decimal(). -type capacity() :: decimal(). @@ -94,7 +97,7 @@ %% minimum coefficient for overloaded limiter -define(OVERLOAD_MIN_ALLOC, 0.3). --define(CURRYING(X, F2), fun(Y) -> F2(X, Y) end). +-define(COUNTER_SIZE, 8). -export_type([index/0]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). @@ -105,39 +108,49 @@ %% API %%-------------------------------------------------------------------- -spec connect( + limiter_id(), limiter_type(), bucket_name() | #{limiter_type() => bucket_name() | undefined} ) -> {ok, emqx_htb_limiter:limiter()} | {error, _}. %% If no bucket path is set in config, there will be no limit -connect(_Type, undefined) -> +connect(_Id, _Type, undefined) -> {ok, emqx_htb_limiter:make_infinity_limiter()}; -connect(Type, BucketName) when is_atom(BucketName) -> - case get_bucket_cfg(Type, BucketName) of - undefined -> - ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}), - {error, config_not_found}; - #{ - rate := BucketRate, - capacity := BucketSize, - per_client := #{rate := CliRate, capacity := CliSize} = Cfg +connect(Id, Type, Cfg) -> + case find_limiter_cfg(Type, Cfg) of + {undefined, _} -> + {ok, emqx_htb_limiter:make_infinity_limiter()}; + { + #{ + rate := BucketRate, + capacity := BucketSize + }, + #{rate := CliRate, capacity := CliSize} = ClientCfg } -> - case emqx_limiter_manager:find_bucket(Type, BucketName) of + case emqx_limiter_manager:find_bucket(Id, Type) of {ok, Bucket} -> {ok, if CliRate < BucketRate orelse CliSize < BucketSize -> - emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); + emqx_htb_limiter:make_token_bucket_limiter(ClientCfg, Bucket); true -> - emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) + emqx_htb_limiter:make_ref_limiter(ClientCfg, Bucket) end}; undefined -> - ?SLOG(error, #{msg => "bucket_not_found", type => Type, bucket => BucketName}), + ?SLOG(error, #{msg => "bucket_not_found", type => Type, id => Id}), {error, invalid_bucket} end - end; -connect(Type, Paths) -> - connect(Type, maps:get(Type, Paths, undefined)). + end. + +-spec add_bucket(limiter_id(), limiter_type(), hocons:config() | undefined) -> ok. +add_bucket(_Id, _Type, undefine) -> + ok; +add_bucket(Id, Type, Cfg) -> + ?CALL(Type, {add_bucket, Id, Cfg}). + +-spec del_bucket(limiter_id(), limiter_type()) -> ok. +del_bucket(Id, Type) -> + ?CALL(Type, {del_bucket, Id}). -spec info(limiter_type()) -> state() | {error, _}. info(Type) -> @@ -213,6 +226,12 @@ handle_call(restart, _From, #{type := Type}) -> handle_call({update_config, Type, Config}, _From, #{type := Type}) -> NewState = init_tree(Type, Config), {reply, ok, NewState}; +handle_call({add_bucket, Id, Cfg}, _From, State) -> + NewState = do_add_bucket(Id, Cfg, State), + {reply, ok, NewState}; +handle_call({del_bucket, Id}, _From, State) -> + NewState = do_del_bucket(Id, State), + {reply, ok, NewState}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -456,24 +475,14 @@ init_tree(Type) when is_atom(Type) -> Cfg = emqx:get_config([limiter, Type]), init_tree(Type, Cfg). -init_tree(Type, #{bucket := Buckets} = Cfg) -> - State = #{ +init_tree(Type, Cfg) -> + #{ type => Type, - root => undefined, - counter => undefined, - index => 1, + root => make_root(Cfg), + counter => counters:new(?COUNTER_SIZE, [write_concurrency]), + index => 0, buckets => #{} - }, - - Root = make_root(Cfg), - {CounterNum, DelayBuckets} = make_bucket(maps:to_list(Buckets), Type, Cfg, 1, []), - - State2 = State#{ - root := Root, - counter := counters:new(CounterNum, [write_concurrency]) - }, - - lists:foldl(fun(F, Acc) -> F(Acc) end, State2, DelayBuckets). + }. -spec make_root(hocons:confg()) -> root(). make_root(#{rate := Rate, burst := Burst}) -> @@ -484,79 +493,50 @@ make_root(#{rate := Rate, burst := Burst}) -> produced => 0.0 }. -make_bucket([{Name, Conf} | T], Type, GlobalCfg, CounterNum, DelayBuckets) -> - Path = emqx_limiter_manager:make_path(Type, Name), - Rate = get_counter_rate(Conf, GlobalCfg), - #{capacity := Capacity} = Conf, - Initial = get_initial_val(Conf), - CounterNum2 = CounterNum + 1, - InitFun = fun(#{name := BucketName} = Bucket, #{buckets := Buckets} = State) -> - {Counter, Idx, State2} = alloc_counter(Path, Rate, Initial, State), - Bucket2 = Bucket#{counter := Counter, index := Idx}, - State2#{buckets := Buckets#{BucketName => Bucket2}} - end, +do_add_bucket(Id, #{rate := Rate, capacity := Capacity} = Cfg, #{buckets := Buckets} = State) -> + case maps:get(Id, Buckets, undefined) of + undefined -> + make_bucket(Id, Cfg, State); + Bucket -> + Bucket2 = Bucket#{rate := Rate, capacity := Capacity}, + State#{buckets := Buckets#{Id := Bucket2}} + end. +make_bucket(Id, Cfg, #{index := ?COUNTER_SIZE} = State) -> + make_bucket(Id, Cfg, State#{ + counter => counters:new(?COUNTER_SIZE, [write_concurrency]), + index => 0 + }); +make_bucket( + Id, + #{rate := Rate, capacity := Capacity} = Cfg, + #{type := Type, counter := Counter, index := Index, buckets := Buckets} = State +) -> + NewIndex = Index + 1, + Initial = get_initial_val(Cfg), Bucket = #{ - name => Name, + name => Id, rate => Rate, obtained => Initial, correction => 0, capacity => Capacity, - counter => undefined, - index => undefined + counter => Counter, + index => NewIndex }, + _ = put_to_counter(Counter, NewIndex, Initial), + Ref = emqx_limiter_bucket_ref:new(Counter, NewIndex, Rate), + emqx_limiter_manager:insert_bucket(Id, Type, Ref), + State#{buckets := Buckets#{Id => Bucket}, index := NewIndex}. - DelayInit = ?CURRYING(Bucket, InitFun), - - make_bucket( - T, - Type, - GlobalCfg, - CounterNum2, - [DelayInit | DelayBuckets] - ); -make_bucket([], _Type, _Global, CounterNum, DelayBuckets) -> - {CounterNum, DelayBuckets}. - --spec alloc_counter(emqx_limiter_manager:path(), rate(), capacity(), state()) -> - {counters:counters_ref(), pos_integer(), state()}. -alloc_counter( - Path, - Rate, - Initial, - #{counter := Counter, index := Index} = State -) -> - case emqx_limiter_manager:find_bucket(Path) of - {ok, #{ - counter := ECounter, - index := EIndex - }} when ECounter =/= undefined -> - init_counter(Path, ECounter, EIndex, Rate, Initial, State); +do_del_bucket(Id, #{type := Type, buckets := Buckets} = State) -> + case maps:get(Id, Buckets, undefined) of + undefined -> + State; _ -> - init_counter( - Path, - Counter, - Index, - Rate, - Initial, - State#{index := Index + 1} - ) + emqx_limiter_manager:delete_bucket(Id, Type), + State#{buckets := maps:remove(Id, Buckets)} end. -init_counter(Path, Counter, Index, Rate, Initial, State) -> - _ = put_to_counter(Counter, Index, Initial), - Ref = emqx_limiter_bucket_ref:new(Counter, Index, Rate), - emqx_limiter_manager:insert_bucket(Path, Ref), - {Counter, Index, State}. - -%% @doc find first limited node -get_counter_rate(#{rate := Rate}, _GlobalCfg) when Rate =/= infinity -> - Rate; -get_counter_rate(_Cfg, #{rate := Rate}) when Rate =/= infinity -> - Rate; -get_counter_rate(_Cfg, _GlobalCfg) -> - emqx_limiter_schema:infinity_value(). - -spec get_initial_val(hocons:config()) -> decimal(). get_initial_val( #{ @@ -587,8 +567,21 @@ call(Type, Msg) -> gen_server:call(Pid, Msg) end. --spec get_bucket_cfg(limiter_type(), bucket_name()) -> - undefined | limiter_not_started | hocons:config(). -get_bucket_cfg(Type, Bucket) -> - Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket), - emqx:get_config(Path, undefined). +find_limiter_cfg(Type, #{rate := _} = Cfg) -> + {Cfg, find_client_cfg(Type, maps:get(client, Cfg, undefined))}; +find_limiter_cfg(Type, Cfg) -> + { + maps:get(Type, Cfg, undefined), + find_client_cfg(Type, emqx_map_lib:deep_get([client, Type], Cfg, undefined)) + }. + +find_client_cfg(Type, BucketCfg) -> + NodeCfg = emqx:get_config([limiter, client, Type], undefined), + merge_client_cfg(NodeCfg, BucketCfg). + +merge_client_cfg(undefined, BucketCfg) -> + BucketCfg; +merge_client_cfg(NodeCfg, undefined) -> + NodeCfg; +merge_client_cfg(NodeCfg, BucketCfg) -> + maps:merge(NodeCfg, BucketCfg). diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 09b923d0c..9fc82d123 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -279,12 +279,19 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> end. -spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}. -do_stop_listener(Type, ListenerName, #{bind := ListenOn}) when Type == tcp; Type == ssl -> - esockd:close(listener_id(Type, ListenerName), ListenOn); -do_stop_listener(Type, ListenerName, _Conf) when Type == ws; Type == wss -> - cowboy:stop_listener(listener_id(Type, ListenerName)); -do_stop_listener(quic, ListenerName, _Conf) -> - quicer:stop_listener(listener_id(quic, ListenerName)). + +do_stop_listener(Type, ListenerName, #{bind := ListenOn} = Conf) when Type == tcp; Type == ssl -> + Id = listener_id(Type, ListenerName), + del_limiter_bucket(Id, Conf), + esockd:close(Id, ListenOn); +do_stop_listener(Type, ListenerName, Conf) when Type == ws; Type == wss -> + Id = listener_id(Type, ListenerName), + del_limiter_bucket(Id, Conf), + cowboy:stop_listener(Id); +do_stop_listener(quic, ListenerName, Conf) -> + Id = listener_id(quic, ListenerName), + del_limiter_bucket(Id, Conf), + quicer:stop_listener(Id). -ifndef(TEST). console_print(Fmt, Args) -> ?ULOG(Fmt, Args). @@ -300,10 +307,12 @@ do_start_listener(_Type, _ListenerName, #{enabled := false}) -> do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when Type == tcp; Type == ssl -> + Id = listener_id(Type, ListenerName), + add_limiter_bucket(Id, Opts), esockd:open( - listener_id(Type, ListenerName), + Id, ListenOn, - merge_default(esockd_opts(Type, Opts)), + merge_default(esockd_opts(Id, Type, Opts)), {emqx_connection, start_link, [ #{ listener => {Type, ListenerName}, @@ -318,6 +327,7 @@ do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when Type == ws; Type == wss -> Id = listener_id(Type, ListenerName), + add_limiter_bucket(Id, Opts), RanchOpts = ranch_opts(Type, ListenOn, Opts), WsOpts = ws_opts(Type, ListenerName, Opts), case Type of @@ -352,8 +362,10 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> limiter => limiter(Opts) }, StreamOpts = [{stream_callback, emqx_quic_stream}], + Id = listener_id(quic, ListenerName), + add_limiter_bucket(Id, Opts), quicer:start_listener( - listener_id(quic, ListenerName), + Id, port(ListenOn), {ListenOpts, ConnectionOpts, StreamOpts} ); @@ -410,16 +422,18 @@ post_config_update([listeners, Type, Name], {action, _Action, _}, NewConf, OldCo post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. -esockd_opts(Type, Opts0) -> +esockd_opts(ListenerId, Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), Opts2 = case maps:get(connection, Limiter, undefined) of undefined -> Opts1; - BucketName -> + BucketCfg -> Opts1#{ - limiter => emqx_esockd_htb_limiter:new_create_options(connection, BucketName) + limiter => emqx_esockd_htb_limiter:new_create_options( + ListenerId, connection, BucketCfg + ) } end, Opts3 = Opts2#{ @@ -539,6 +553,27 @@ zone(Opts) -> limiter(Opts) -> maps:get(limiter, Opts, #{}). +add_limiter_bucket(Id, #{limiter := Limiter}) -> + maps:fold( + fun(Type, Cfg, _) -> + emqx_limiter_server:add_bucket(Id, Type, Cfg) + end, + ok, + maps:without([client], Limiter) + ); +add_limiter_bucket(_Id, _Cfg) -> + ok. + +del_limiter_bucket(Id, #{limiter := Limiters}) -> + lists:foreach( + fun(Type) -> + emqx_limiter_server:del_bucket(Id, Type) + end, + maps:keys(Limiters) + ); +del_limiter_bucket(_Id, _Cfg) -> + ok. + enable_authn(Opts) -> maps:get(enable_authn, Opts, true). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 54b57b21d..9f96595ed 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1619,10 +1619,15 @@ base_listener(Bind) -> )}, {"limiter", sc( - map("ratelimit_name", emqx_limiter_schema:bucket_name()), + ?R_REF( + emqx_limiter_schema, + listener_fields + ), #{ desc => ?DESC(base_listener_limiter), - default => #{<<"connection">> => <<"default">>} + default => #{ + <<"connection">> => #{<<"rate">> => <<"1000/s">>, <<"capacity">> => 1000} + } } )}, {"enable_authn", diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 0134810c1..c7c31a2d8 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -273,7 +273,7 @@ check_origin_header(Req, #{listener := {Type, Listener}} = Opts) -> end. websocket_init([Req, Opts]) -> - #{zone := Zone, limiter := LimiterCfg, listener := {Type, Listener}} = Opts, + #{zone := Zone, limiter := LimiterCfg, listener := {Type, Listener} = ListenerCfg} = Opts, case check_max_connection(Type, Listener) of allow -> {Peername, PeerCert} = get_peer_info(Type, Listener, Req, Opts), @@ -287,8 +287,10 @@ websocket_init([Req, Opts]) -> ws_cookie => WsCookie, conn_mod => ?MODULE }, - Limiter = emqx_limiter_container:get_limiter_by_names( - [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN], LimiterCfg + Limiter = emqx_limiter_container:get_limiter_by_types( + ListenerCfg, + [?LIMITER_BYTES_IN, ?LIMITER_MESSAGE_IN], + LimiterCfg ), MQTTPiggyback = get_ws_opts(Type, Listener, mqtt_piggyback), FrameOpts = #{ @@ -487,9 +489,6 @@ handle_call(From, info, State) -> handle_call(From, stats, State) -> gen_server:reply(From, stats(State)), return(State); -handle_call(_From, {ratelimit, Type, Bucket}, State = #state{limiter = Limiter}) -> - Limiter2 = emqx_limiter_container:update_by_name(Type, Bucket, Limiter), - {reply, ok, State#state{limiter = Limiter2}}; handle_call(From, Req, State = #state{channel = Channel}) -> case emqx_channel:handle_call(Req, Channel) of {reply, Reply, NChannel} -> diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 40bf6ff45..df1720772 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -33,18 +33,6 @@ force_gc_conf() -> force_shutdown_conf() -> #{enable => true, max_heap_size => 4194304, max_message_queue_len => 1000}. -rate_limit_conf() -> - #{ - conn_bytes_in => ["100KB", "10s"], - conn_messages_in => ["100", "10s"], - max_conn_rate => 1000, - quota => - #{ - conn_messages_routing => infinity, - overall_messages_routing => infinity - } - }. - rpc_conf() -> #{ async_batch_size => 256, @@ -173,27 +161,9 @@ listeners_conf() -> limiter_conf() -> Make = fun() -> #{ - bucket => - #{ - default => - #{ - capacity => infinity, - initial => 0, - rate => infinity, - per_client => - #{ - capacity => infinity, - divisible => false, - failure_strategy => force, - initial => 0, - low_watermark => 0, - max_retry_time => 5000, - rate => infinity - } - } - }, burst => 0, - rate => infinity + rate => infinity, + capacity => infinity } end, @@ -202,7 +172,7 @@ limiter_conf() -> Acc#{Name => Make()} end, #{}, - [bytes_in, message_in, message_routing, connection, batch] + [bytes_in, message_in, message_routing, connection, internal] ). stats_conf() -> @@ -213,7 +183,6 @@ zone_conf() -> basic_conf() -> #{ - rate_limit => rate_limit_conf(), force_gc => force_gc_conf(), force_shutdown => force_shutdown_conf(), mqtt => mqtt_conf(), @@ -274,10 +243,9 @@ end_per_suite(_Config) -> emqx_banned ]). -init_per_testcase(TestCase, Config) -> +init_per_testcase(_TestCase, Config) -> OldConf = set_test_listener_confs(), emqx_common_test_helpers:start_apps([]), - check_modify_limiter(TestCase), [{config, OldConf} | Config]. end_per_testcase(_TestCase, Config) -> @@ -285,41 +253,6 @@ end_per_testcase(_TestCase, Config) -> emqx_common_test_helpers:stop_apps([]), Config. -check_modify_limiter(TestCase) -> - Checks = [t_quota_qos0, t_quota_qos1, t_quota_qos2], - case lists:member(TestCase, Checks) of - true -> - modify_limiter(); - _ -> - ok - end. - -%% per_client 5/1s,5 -%% aggregated 10/1s,10 -modify_limiter() -> - Limiter = emqx_config:get([limiter]), - #{message_routing := #{bucket := Bucket} = Routing} = Limiter, - #{default := #{per_client := Client} = Default} = Bucket, - Client2 = Client#{ - rate := 5, - initial := 0, - capacity := 5, - low_watermark := 1 - }, - Default2 = Default#{ - per_client := Client2, - rate => 10, - initial => 0, - capacity => 10 - }, - Bucket2 = Bucket#{default := Default2}, - Routing2 = Routing#{bucket := Bucket2}, - - emqx_config:put([limiter], Limiter#{message_routing := Routing2}), - emqx_limiter_manager:restart_server(message_routing), - timer:sleep(100), - ok. - %%-------------------------------------------------------------------- %% Test cases for channel info/stats/caps %%-------------------------------------------------------------------- @@ -729,6 +662,7 @@ t_process_unsubscribe(_) -> t_quota_qos0(_) -> esockd_limiter:start_link(), + add_bucket(), Cnter = counters:new(1, []), ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end), ok = meck:expect( @@ -755,10 +689,12 @@ t_quota_qos0(_) -> ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end), ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end), + del_bucket(), esockd_limiter:stop(). t_quota_qos1(_) -> esockd_limiter:start_link(), + add_bucket(), ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end), Chann = channel(#{conn_state => connected, quota => quota()}), Pub = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>), @@ -769,10 +705,12 @@ t_quota_qos1(_) -> {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub, Chann3), %% Quota in overall {ok, ?PUBACK_PACKET(1, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub, Chann4), + del_bucket(), esockd_limiter:stop(). t_quota_qos2(_) -> esockd_limiter:start_link(), + add_bucket(), ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end), Chann = channel(#{conn_state => connected, quota => quota()}), Pub1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>), @@ -786,6 +724,7 @@ t_quota_qos2(_) -> {ok, ?PUBREC_PACKET(3, ?RC_SUCCESS), Chann4} = emqx_channel:handle_in(Pub3, Chann3), %% Quota in overall {ok, ?PUBREC_PACKET(4, ?RC_QUOTA_EXCEEDED), _} = emqx_channel:handle_in(Pub4, Chann4), + del_bucket(), esockd_limiter:stop(). %%-------------------------------------------------------------------- @@ -952,12 +891,6 @@ t_handle_call_takeover_end(_) -> {shutdown, takenover, [], _, _Chan} = emqx_channel:handle_call({takeover, 'end'}, channel()). -t_handle_call_quota(_) -> - {reply, ok, _Chan} = emqx_channel:handle_call( - {quota, default}, - channel() - ). - t_handle_call_unexpected(_) -> {reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()). @@ -1176,7 +1109,7 @@ t_ws_cookie_init(_) -> ConnInfo, #{ zone => default, - limiter => limiter_cfg(), + limiter => undefined, listener => {tcp, default} } ), @@ -1210,7 +1143,7 @@ channel(InitFields) -> ConnInfo, #{ zone => default, - limiter => limiter_cfg(), + limiter => undefined, listener => {tcp, default} } ), @@ -1270,9 +1203,31 @@ session(InitFields) when is_map(InitFields) -> %% conn: 5/s; overall: 10/s quota() -> - emqx_limiter_container:get_limiter_by_names([message_routing], limiter_cfg()). + emqx_limiter_container:get_limiter_by_types(?MODULE, [message_routing], limiter_cfg()). -limiter_cfg() -> #{message_routing => default}. +limiter_cfg() -> + Client = #{ + rate => 5, + initial => 0, + capacity => 5, + low_watermark => 1, + divisible => false, + max_retry_time => timer:seconds(5), + failure_strategy => force + }, + #{ + message_routing => bucket_cfg(), + client => #{message_routing => Client} + }. + +bucket_cfg() -> + #{rate => 10, initial => 0, capacity => 10}. + +add_bucket() -> + emqx_limiter_server:add_bucket(?MODULE, message_routing, bucket_cfg()). + +del_bucket() -> + emqx_limiter_server:del_bucket(?MODULE, message_routing). v4(Channel) -> ConnInfo = emqx_channel:info(conninfo, Channel), diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index b199565c2..c5dfdf34a 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -78,6 +78,7 @@ end_per_suite(_Config) -> init_per_testcase(TestCase, Config) when TestCase =/= t_ws_pingreq_before_connected -> + add_bucket(), ok = meck:expect(emqx_transport, wait, fun(Sock) -> {ok, Sock} end), ok = meck:expect(emqx_transport, type, fun(_Sock) -> tcp end), ok = meck:expect( @@ -104,9 +105,11 @@ init_per_testcase(TestCase, Config) when _ -> Config end; init_per_testcase(_, Config) -> + add_bucket(), Config. end_per_testcase(TestCase, Config) -> + del_bucket(), case erlang:function_exported(?MODULE, TestCase, 2) of true -> ?MODULE:TestCase('end', Config); false -> ok @@ -291,11 +294,6 @@ t_handle_call(_) -> ?assertMatch({ok, _St}, handle_msg({event, undefined}, St)), ?assertMatch({reply, _Info, _NSt}, handle_call(self(), info, St)), ?assertMatch({reply, _Stats, _NSt}, handle_call(self(), stats, St)), - ?assertMatch({reply, ok, _NSt}, handle_call(self(), {ratelimit, []}, St)), - ?assertMatch( - {reply, ok, _NSt}, - handle_call(self(), {ratelimit, [{bytes_in, default}]}, St) - ), ?assertEqual({reply, ignored, St}, handle_call(self(), for_testing, St)), ?assertMatch( {stop, {shutdown, kicked}, ok, _NSt}, @@ -704,7 +702,34 @@ handle_msg(Msg, St) -> emqx_connection:handle_msg(Msg, St). handle_call(Pid, Call, St) -> emqx_connection:handle_call(Pid, Call, St). -limiter_cfg() -> #{}. +-define(LIMITER_ID, 'tcp:default'). init_limiter() -> - emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()). + emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], limiter_cfg()). + +limiter_cfg() -> + Infinity = emqx_limiter_schema:infinity_value(), + Cfg = bucket_cfg(), + Client = #{ + rate => Infinity, + initial => 0, + capacity => Infinity, + low_watermark => 1, + divisible => false, + max_retry_time => timer:seconds(5), + failure_strategy => force + }, + #{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}. + +bucket_cfg() -> + Infinity = emqx_limiter_schema:infinity_value(), + #{rate => Infinity, initial => 0, capacity => Infinity}. + +add_bucket() -> + Cfg = bucket_cfg(), + emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg), + emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg). + +del_bucket() -> + emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in), + emqx_limiter_server:del_bucket(?LIMITER_ID, message_in). diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 1251278f2..7efcbaa18 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -24,48 +24,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(BASE_CONF, << - "" - "\n" - "limiter {\n" - " bytes_in {\n" - " bucket.default {\n" - " rate = infinity\n" - " capacity = infinity\n" - " }\n" - " }\n" - "\n" - " message_in {\n" - " bucket.default {\n" - " rate = infinity\n" - " capacity = infinity\n" - " }\n" - " }\n" - "\n" - " connection {\n" - " bucket.default {\n" - " rate = infinity\n" - " capacity = infinity\n" - " }\n" - " }\n" - "\n" - " message_routing {\n" - " bucket.default {\n" - " rate = infinity\n" - " capacity = infinity\n" - " }\n" - " }\n" - "\n" - " batch {\n" - " bucket.retainer {\n" - " rate = infinity\n" - " capacity = infinity\n" - " }\n" - " }\n" - "}\n" - "\n" - "" ->>). +-define(BASE_CONF, <<"">>). -record(client, { counter :: counters:counter_ref(), @@ -97,6 +56,9 @@ end_per_suite(_Config) -> init_per_testcase(_TestCase, Config) -> Config. +end_per_testcase(_TestCase, Config) -> + Config. + load_conf() -> emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF). @@ -116,12 +78,12 @@ t_consume(_) -> failure_strategy := force } end, - Case = fun() -> - Client = connect(default), + Case = fun(BucketCfg) -> + Client = connect(BucketCfg), {ok, L2} = emqx_htb_limiter:consume(50, Client), {ok, _L3} = emqx_htb_limiter:consume(150, L2) end, - with_per_client(default, Cfg, Case). + with_per_client(Cfg, Case). t_retry(_) -> Cfg = fun(Cfg) -> @@ -133,15 +95,15 @@ t_retry(_) -> failure_strategy := force } end, - Case = fun() -> - Client = connect(default), - {ok, Client} = emqx_htb_limiter:retry(Client), - {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client), + Case = fun(BucketCfg) -> + Client = connect(BucketCfg), + {ok, Client2} = emqx_htb_limiter:retry(Client), + {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client2), L3 = emqx_htb_limiter:set_retry(Retry, L2), timer:sleep(500), {ok, _L4} = emqx_htb_limiter:retry(L3) end, - with_per_client(default, Cfg, Case). + with_per_client(Cfg, Case). t_restore(_) -> Cfg = fun(Cfg) -> @@ -153,15 +115,15 @@ t_restore(_) -> failure_strategy := force } end, - Case = fun() -> - Client = connect(default), + Case = fun(BucketCfg) -> + Client = connect(BucketCfg), {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client), timer:sleep(200), {ok, L3} = emqx_htb_limiter:check(Retry, L2), Avaiable = emqx_htb_limiter:available(L3), ?assert(Avaiable >= 50) end, - with_per_client(default, Cfg, Case). + with_per_client(Cfg, Case). t_max_retry_time(_) -> Cfg = fun(Cfg) -> @@ -172,15 +134,15 @@ t_max_retry_time(_) -> failure_strategy := drop } end, - Case = fun() -> - Client = connect(default), + Case = fun(BucketCfg) -> + Client = connect(BucketCfg), Begin = ?NOW, Result = emqx_htb_limiter:consume(101, Client), ?assertMatch({drop, _}, Result), Time = ?NOW - Begin, ?assert(Time >= 500 andalso Time < 550) end, - with_per_client(default, Cfg, Case). + with_per_client(Cfg, Case). t_divisible(_) -> Cfg = fun(Cfg) -> @@ -191,8 +153,8 @@ t_divisible(_) -> capacity := 600 } end, - Case = fun() -> - Client = connect(default), + Case = fun(BucketCfg) -> + Client = connect(BucketCfg), Result = emqx_htb_limiter:check(1000, Client), ?assertMatch( {partial, 400, @@ -206,7 +168,7 @@ t_divisible(_) -> Result ) end, - with_per_client(default, Cfg, Case). + with_per_client(Cfg, Case). t_low_watermark(_) -> Cfg = fun(Cfg) -> @@ -217,8 +179,8 @@ t_low_watermark(_) -> capacity := 1000 } end, - Case = fun() -> - Client = connect(default), + Case = fun(BucketCfg) -> + Client = connect(BucketCfg), Result = emqx_htb_limiter:check(500, Client), ?assertMatch({ok, _}, Result), {_, Client2} = Result, @@ -233,28 +195,21 @@ t_low_watermark(_) -> Result2 ) end, - with_per_client(default, Cfg, Case). + with_per_client(Cfg, Case). t_infinity_client(_) -> - Fun = fun(#{per_client := Cli} = Bucket) -> - Bucket2 = Bucket#{ - rate := infinity, - capacity := infinity - }, - Cli2 = Cli#{rate := infinity, capacity := infinity}, - Bucket2#{per_client := Cli2} - end, - Case = fun() -> - Client = connect(default), + Fun = fun(Cfg) -> Cfg end, + Case = fun(Cfg) -> + Client = connect(Cfg), InfVal = emqx_limiter_schema:infinity_value(), ?assertMatch(#{bucket := #{rate := InfVal}}, Client), Result = emqx_htb_limiter:check(100000, Client), ?assertEqual({ok, Client}, Result) end, - with_bucket(default, Fun, Case). + with_per_client(Fun, Case). t_try_restore_agg(_) -> - Fun = fun(#{per_client := Cli} = Bucket) -> + Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := 1, capacity := 200, @@ -267,20 +222,20 @@ t_try_restore_agg(_) -> max_retry_time := 100, failure_strategy := force }, - Bucket2#{per_client := Cli2} + Bucket2#{client := Cli2} end, - Case = fun() -> - Client = connect(default), + Case = fun(Cfg) -> + Client = connect(Cfg), {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client), timer:sleep(200), {ok, L3} = emqx_htb_limiter:check(Retry, L2), Avaiable = emqx_htb_limiter:available(L3), ?assert(Avaiable >= 50) end, - with_bucket(default, Fun, Case). + with_bucket(Fun, Case). t_short_board(_) -> - Fun = fun(#{per_client := Cli} = Bucket) -> + Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := ?RATE("100/1s"), initial := 0, @@ -291,18 +246,18 @@ t_short_board(_) -> capacity := 600, initial := 600 }, - Bucket2#{per_client := Cli2} + Bucket2#{client := Cli2} end, - Case = fun() -> + Case = fun(Cfg) -> Counter = counters:new(1, []), - start_client(default, ?NOW + 2000, Counter, 20), + start_client(Cfg, ?NOW + 2000, Counter, 20), timer:sleep(2100), check_average_rate(Counter, 2, 100) end, - with_bucket(default, Fun, Case). + with_bucket(Fun, Case). t_rate(_) -> - Fun = fun(#{per_client := Cli} = Bucket) -> + Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := ?RATE("100/100ms"), initial := 0, @@ -313,10 +268,10 @@ t_rate(_) -> capacity := infinity, initial := 0 }, - Bucket2#{per_client := Cli2} + Bucket2#{client := Cli2} end, - Case = fun() -> - Client = connect(default), + Case = fun(Cfg) -> + Client = connect(Cfg), Ts1 = erlang:system_time(millisecond), C1 = emqx_htb_limiter:available(Client), timer:sleep(1000), @@ -326,11 +281,11 @@ t_rate(_) -> Inc = C2 - C1, ?assert(in_range(Inc, ShouldInc - 100, ShouldInc + 100), "test bucket rate") end, - with_bucket(default, Fun, Case). + with_bucket(Fun, Case). t_capacity(_) -> Capacity = 600, - Fun = fun(#{per_client := Cli} = Bucket) -> + Fun = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := ?RATE("100/100ms"), initial := 0, @@ -341,25 +296,25 @@ t_capacity(_) -> capacity := infinity, initial := 0 }, - Bucket2#{per_client := Cli2} + Bucket2#{client := Cli2} end, - Case = fun() -> - Client = connect(default), + Case = fun(Cfg) -> + Client = connect(Cfg), timer:sleep(1000), C1 = emqx_htb_limiter:available(Client), ?assertEqual(Capacity, C1, "test bucket capacity") end, - with_bucket(default, Fun, Case). + with_bucket(Fun, Case). %%-------------------------------------------------------------------- %% Test Cases Global Level %%-------------------------------------------------------------------- t_collaborative_alloc(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} + GlobalMod = fun(#{message_routing := MR} = Cfg) -> + Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}} end, - Bucket1 = fun(#{per_client := Cli} = Bucket) -> + Bucket1 = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := ?RATE("400/1s"), initial := 0, @@ -370,7 +325,7 @@ t_collaborative_alloc(_) -> capacity := 100, initial := 100 }, - Bucket2#{per_client := Cli2} + Bucket2#{client := Cli2} end, Bucket2 = fun(Bucket) -> @@ -381,8 +336,8 @@ t_collaborative_alloc(_) -> Case = fun() -> C1 = counters:new(1, []), C2 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), - start_client(b2, ?NOW + 2000, C2, 30), + start_client({b1, Bucket1}, ?NOW + 2000, C1, 20), + start_client({b2, Bucket2}, ?NOW + 2000, C2, 30), timer:sleep(2100), check_average_rate(C1, 2, 300), check_average_rate(C2, 2, 300) @@ -395,14 +350,16 @@ t_collaborative_alloc(_) -> ). t_burst(_) -> - GlobalMod = fun(Cfg) -> + GlobalMod = fun(#{message_routing := MR} = Cfg) -> Cfg#{ - rate := ?RATE("200/1s"), - burst := ?RATE("400/1s") + message_routing := MR#{ + rate := ?RATE("200/1s"), + burst := ?RATE("400/1s") + } } end, - Bucket = fun(#{per_client := Cli} = Bucket) -> + Bucket = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := ?RATE("200/1s"), initial := 0, @@ -413,16 +370,16 @@ t_burst(_) -> capacity := 200, divisible := true }, - Bucket2#{per_client := Cli2} + Bucket2#{client := Cli2} end, Case = fun() -> C1 = counters:new(1, []), C2 = counters:new(1, []), C3 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), - start_client(b2, ?NOW + 2000, C2, 30), - start_client(b3, ?NOW + 2000, C3, 30), + start_client({b1, Bucket}, ?NOW + 2000, C1, 20), + start_client({b2, Bucket}, ?NOW + 2000, C2, 30), + start_client({b3, Bucket}, ?NOW + 2000, C3, 30), timer:sleep(2100), Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]), @@ -436,11 +393,11 @@ t_burst(_) -> ). t_limit_global_with_unlimit_other(_) -> - GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} + GlobalMod = fun(#{message_routing := MR} = Cfg) -> + Cfg#{message_routing := MR#{rate := ?RATE("600/1s")}} end, - Bucket = fun(#{per_client := Cli} = Bucket) -> + Bucket = fun(#{client := Cli} = Bucket) -> Bucket2 = Bucket#{ rate := infinity, initial := 0, @@ -451,12 +408,12 @@ t_limit_global_with_unlimit_other(_) -> capacity := infinity, initial := 0 }, - Bucket2#{per_client := Cli2} + Bucket2#{client := Cli2} end, Case = fun() -> C1 = counters:new(1, []), - start_client(b1, ?NOW + 2000, C1, 20), + start_client({b1, Bucket}, ?NOW + 2000, C1, 20), timer:sleep(2100), check_average_rate(C1, 2, 600) end, @@ -470,28 +427,6 @@ t_limit_global_with_unlimit_other(_) -> %%-------------------------------------------------------------------- %% Test Cases container %%-------------------------------------------------------------------- -t_new_container(_) -> - C1 = emqx_limiter_container:new(), - C2 = emqx_limiter_container:new([message_routing]), - C3 = emqx_limiter_container:update_by_name(message_routing, default, C1), - ?assertMatch( - #{ - message_routing := _, - retry_ctx := undefined, - {retry, message_routing} := _ - }, - C2 - ), - ?assertMatch( - #{ - message_routing := _, - retry_ctx := undefined, - {retry, message_routing} := _ - }, - C3 - ), - ok. - t_check_container(_) -> Cfg = fun(Cfg) -> Cfg#{ @@ -500,10 +435,11 @@ t_check_container(_) -> capacity := 1000 } end, - Case = fun() -> - C1 = emqx_limiter_container:new( + Case = fun(#{client := Client} = BucketCfg) -> + C1 = emqx_limiter_container:get_limiter_by_types( + ?MODULE, [message_routing], - #{message_routing => default} + #{message_routing => BucketCfg, client => #{message_routing => Client}} ), {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1), {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2), @@ -514,7 +450,39 @@ t_check_container(_) -> RetryData = emqx_limiter_container:get_retry_context(C5), ?assertEqual(Context, RetryData) end, - with_per_client(default, Cfg, Case). + with_per_client(Cfg, Case). + +%%-------------------------------------------------------------------- +%% Test Override +%%-------------------------------------------------------------------- +t_bucket_no_client(_) -> + Rate = ?RATE("1/s"), + GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) -> + Cfg#{client := Client#{message_routing := MR#{rate := Rate}}} + end, + BucketMod = fun(Bucket) -> + maps:remove(client, Bucket) + end, + Case = fun() -> + Limiter = connect(BucketMod(make_limiter_cfg())), + ?assertMatch(#{rate := Rate}, Limiter) + end, + with_global(GlobalMod, [BucketMod], Case). + +t_bucket_client(_) -> + GlobalRate = ?RATE("1/s"), + BucketRate = ?RATE("10/s"), + GlobalMod = fun(#{client := #{message_routing := MR} = Client} = Cfg) -> + Cfg#{client := Client#{message_routing := MR#{rate := GlobalRate}}} + end, + BucketMod = fun(#{client := Client} = Bucket) -> + Bucket#{client := Client#{rate := BucketRate}} + end, + Case = fun() -> + Limiter = connect(BucketMod(make_limiter_cfg())), + ?assertMatch(#{rate := BucketRate}, Limiter) + end, + with_global(GlobalMod, [BucketMod], Case). %%-------------------------------------------------------------------- %% Test Cases misc @@ -607,19 +575,23 @@ t_schema_unit(_) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -start_client(Name, EndTime, Counter, Number) -> +start_client(Cfg, EndTime, Counter, Number) -> lists:foreach( fun(_) -> spawn(fun() -> - start_client(Name, EndTime, Counter) + do_start_client(Cfg, EndTime, Counter) end) end, lists:seq(1, Number) ). -start_client(Name, EndTime, Counter) -> - #{per_client := PerClient} = - emqx_config:get([limiter, message_routing, bucket, Name]), +do_start_client({Name, CfgFun}, EndTime, Counter) -> + do_start_client(Name, CfgFun(make_limiter_cfg()), EndTime, Counter); +do_start_client(Cfg, EndTime, Counter) -> + do_start_client(?MODULE, Cfg, EndTime, Counter). + +do_start_client(Name, Cfg, EndTime, Counter) -> + #{client := PerClient} = Cfg, #{rate := Rate} = PerClient, Client = #client{ start = ?NOW, @@ -627,7 +599,7 @@ start_client(Name, EndTime, Counter) -> counter = Counter, obtained = 0, rate = Rate, - client = connect(Name) + client = connect(Name, Cfg) }, client_loop(Client). @@ -711,35 +683,50 @@ to_rate(Str) -> {ok, Rate} = emqx_limiter_schema:to_rate(Str), Rate. -with_global(Modifier, BuckeTemps, Case) -> - Fun = fun(Cfg) -> - #{bucket := #{default := BucketCfg}} = Cfg2 = Modifier(Cfg), - Fun = fun({Name, BMod}, Acc) -> - Acc#{Name => BMod(BucketCfg)} - end, - Buckets = lists:foldl(Fun, #{}, BuckeTemps), - Cfg2#{bucket := Buckets} - end, +with_global(Modifier, Buckets, Case) -> + with_config([limiter], Modifier, Buckets, Case). - with_config([limiter, message_routing], Fun, Case). +with_bucket(Modifier, Case) -> + Cfg = Modifier(make_limiter_cfg()), + add_bucket(Cfg), + Case(Cfg), + del_bucket(). -with_bucket(Bucket, Modifier, Case) -> - Path = [limiter, message_routing, bucket, Bucket], - with_config(Path, Modifier, Case). +with_per_client(Modifier, Case) -> + #{client := Client} = Cfg = make_limiter_cfg(), + Cfg2 = Cfg#{client := Modifier(Client)}, + add_bucket(Cfg2), + Case(Cfg2), + del_bucket(). -with_per_client(Bucket, Modifier, Case) -> - Path = [limiter, message_routing, bucket, Bucket, per_client], - with_config(Path, Modifier, Case). - -with_config(Path, Modifier, Case) -> +with_config(Path, Modifier, Buckets, Case) -> Cfg = emqx_config:get(Path), NewCfg = Modifier(Cfg), - ct:pal("test with config:~p~n", [NewCfg]), emqx_config:put(Path, NewCfg), emqx_limiter_server:restart(message_routing), timer:sleep(500), + BucketCfg = make_limiter_cfg(), + lists:foreach( + fun + ({Name, BucketFun}) -> + add_bucket(Name, BucketFun(BucketCfg)); + (BucketFun) -> + add_bucket(BucketFun(BucketCfg)) + end, + Buckets + ), DelayReturn = delay_return(Case), + lists:foreach( + fun + ({Name, _Cfg}) -> + del_bucket(Name); + (_Cfg) -> + del_bucket() + end, + Buckets + ), emqx_config:put(Path, Cfg), + emqx_limiter_server:restart(message_routing), DelayReturn(). delay_return(Case) -> @@ -751,10 +738,40 @@ delay_return(Case) -> fun() -> erlang:raise(Type, Reason, Trace) end end. -connect(Name) -> - {ok, Limiter} = emqx_limiter_server:connect(message_routing, Name), +connect({Name, CfgFun}) -> + connect(Name, CfgFun(make_limiter_cfg())); +connect(Cfg) -> + connect(?MODULE, Cfg). + +connect(Name, Cfg) -> + {ok, Limiter} = emqx_limiter_server:connect(Name, message_routing, Cfg), Limiter. +make_limiter_cfg() -> + Infinity = emqx_limiter_schema:infinity_value(), + Client = #{ + rate => Infinity, + initial => 0, + capacity => Infinity, + low_watermark => 0, + divisible => false, + max_retry_time => timer:seconds(5), + failure_strategy => force + }, + #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}. + +add_bucket(Cfg) -> + add_bucket(?MODULE, Cfg). + +add_bucket(Name, Cfg) -> + emqx_limiter_server:add_bucket(Name, message_routing, Cfg). + +del_bucket() -> + del_bucket(?MODULE). + +del_bucket(Name) -> + emqx_limiter_server:del_bucket(Name, message_routing). + check_average_rate(Counter, Second, Rate) -> Cost = counters:get(Counter, 1), PerSec = Cost / Second, diff --git a/apps/emqx/test/emqx_ws_connection_SUITE.erl b/apps/emqx/test/emqx_ws_connection_SUITE.erl index 89d892c67..47efc1829 100644 --- a/apps/emqx/test/emqx_ws_connection_SUITE.erl +++ b/apps/emqx/test/emqx_ws_connection_SUITE.erl @@ -59,6 +59,7 @@ init_per_testcase(TestCase, Config) when TestCase =/= t_ws_pingreq_before_connected, TestCase =/= t_ws_non_check_origin -> + add_bucket(), %% Meck Cm ok = meck:new(emqx_cm, [passthrough, no_history, no_link]), ok = meck:expect(emqx_cm, mark_channel_connected, fun(_) -> ok end), @@ -96,6 +97,7 @@ init_per_testcase(TestCase, Config) when | Config ]; init_per_testcase(t_ws_non_check_origin, Config) -> + add_bucket(), ok = emqx_common_test_helpers:start_apps([]), PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]), emqx_config:put_listener_conf(ws, default, [websocket, check_origin_enable], false), @@ -105,6 +107,7 @@ init_per_testcase(t_ws_non_check_origin, Config) -> | Config ]; init_per_testcase(_, Config) -> + add_bucket(), PrevConfig = emqx_config:get_listener_conf(ws, default, [websocket]), ok = emqx_common_test_helpers:start_apps([]), [ @@ -119,6 +122,7 @@ end_per_testcase(TestCase, _Config) when TestCase =/= t_ws_non_check_origin, TestCase =/= t_ws_pingreq_before_connected -> + del_bucket(), lists:foreach( fun meck:unload/1, [ @@ -131,11 +135,13 @@ end_per_testcase(TestCase, _Config) when ] ); end_per_testcase(t_ws_non_check_origin, Config) -> + del_bucket(), PrevConfig = ?config(prev_config, Config), emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig), emqx_common_test_helpers:stop_apps([]), ok; end_per_testcase(_, Config) -> + del_bucket(), PrevConfig = ?config(prev_config, Config), emqx_config:put_listener_conf(ws, default, [websocket], PrevConfig), emqx_common_test_helpers:stop_apps([]), @@ -501,15 +507,12 @@ t_handle_timeout_emit_stats(_) -> ?assertEqual(undefined, ?ws_conn:info(stats_timer, St)). t_ensure_rate_limit(_) -> - %% XXX In the future, limiter should provide API for config update - Path = [limiter, bytes_in, bucket, default, per_client], - PerClient = emqx_config:get(Path), {ok, Rate} = emqx_limiter_schema:to_rate("50MB"), - emqx_config:put(Path, PerClient#{rate := Rate}), - emqx_limiter_server:restart(bytes_in), - timer:sleep(100), - - Limiter = init_limiter(), + Limiter = init_limiter(#{ + bytes_in => bucket_cfg(), + message_in => bucket_cfg(), + client => #{bytes_in => client_cfg(Rate)} + }), St = st(#{limiter => Limiter}), %% must bigger than value in emqx_ratelimit_SUITE @@ -522,11 +525,7 @@ t_ensure_rate_limit(_) -> St ), ?assertEqual(blocked, ?ws_conn:info(sockstate, St1)), - ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)), - - emqx_config:put(Path, PerClient), - emqx_limiter_server:restart(bytes_in), - timer:sleep(100). + ?assertEqual([{active, false}], ?ws_conn:info(postponed, St1)). t_parse_incoming(_) -> {Packets, St} = ?ws_conn:parse_incoming(<<48, 3>>, [], st()), @@ -691,7 +690,44 @@ ws_client(State) -> ct:fail(ws_timeout) end. -limiter_cfg() -> #{bytes_in => default, message_in => default}. +-define(LIMITER_ID, 'ws:default'). init_limiter() -> - emqx_limiter_container:get_limiter_by_names([bytes_in, message_in], limiter_cfg()). + init_limiter(limiter_cfg()). + +init_limiter(LimiterCfg) -> + emqx_limiter_container:get_limiter_by_types(?LIMITER_ID, [bytes_in, message_in], LimiterCfg). + +limiter_cfg() -> + Cfg = bucket_cfg(), + Client = client_cfg(), + #{bytes_in => Cfg, message_in => Cfg, client => #{bytes_in => Client, message_in => Client}}. + +client_cfg() -> + Infinity = emqx_limiter_schema:infinity_value(), + client_cfg(Infinity). + +client_cfg(Rate) -> + Infinity = emqx_limiter_schema:infinity_value(), + #{ + rate => Rate, + initial => 0, + capacity => Infinity, + low_watermark => 1, + divisible => false, + max_retry_time => timer:seconds(5), + failure_strategy => force + }. + +bucket_cfg() -> + Infinity = emqx_limiter_schema:infinity_value(), + #{rate => Infinity, initial => 0, capacity => Infinity}. + +add_bucket() -> + Cfg = bucket_cfg(), + emqx_limiter_server:add_bucket(?LIMITER_ID, bytes_in, Cfg), + emqx_limiter_server:add_bucket(?LIMITER_ID, message_in, Cfg). + +del_bucket() -> + emqx_limiter_server:del_bucket(?LIMITER_ID, bytes_in), + emqx_limiter_server:del_bucket(?LIMITER_ID, message_in). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 59b320368..34f32d8be 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -777,6 +777,8 @@ to_bin(List) when is_list(List) -> end; to_bin(Boolean) when is_boolean(Boolean) -> Boolean; to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); +to_bin({Type, Args}) -> + unicode:characters_to_binary(io_lib:format("~p(~p)", [Type, Args])); to_bin(X) -> X. diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 5d911b5f4..f5a3ad403 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -348,12 +348,16 @@ enable_retainer( #{context_id := ContextId} = State, #{ msg_clear_interval := ClearInterval, - backend := BackendCfg + backend := BackendCfg, + flow_control := FlowControl } ) -> NewContextId = ContextId + 1, Context = create_resource(new_context(NewContextId), BackendCfg), load(Context), + emqx_limiter_server:add_bucket( + ?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined) + ), State#{ enable := true, context_id := NewContextId, @@ -369,6 +373,7 @@ disable_retainer( } = State ) -> unload(), + emqx_limiter_server:del_bucket(?APP, internal), ok = close_resource(Context), State#{ enable := false, diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 7d085b422..2c0bd725c 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -151,13 +151,8 @@ config(get, _) -> {200, emqx:get_raw_config([retainer])}; config(put, #{body := Body}) -> try - check_bucket_exists( - Body, - fun(Conf) -> - {ok, _} = emqx_retainer:update_config(Conf), - {200, emqx:get_raw_config([retainer])} - end - ) + {ok, _} = emqx_retainer:update_config(Body), + {200, emqx:get_raw_config([retainer])} catch _:Reason:_ -> {400, #{ @@ -237,30 +232,3 @@ check_backend(Type, Params, Cont) -> _ -> {400, 'BAD_REQUEST', <<"This API only support built in database">>} end. - -check_bucket_exists( - #{ - <<"flow_control">> := - #{<<"batch_deliver_limiter">> := Name} = Flow - } = Conf, - Cont -) -> - case erlang:binary_to_atom(Name) of - '' -> - %% workaround, empty string means set the value to undefined, - %% but now, we can't store `undefined` in the config file correct, - %% but, we can delete this field - Cont(Conf#{ - <<"flow_control">> := maps:remove(<<"batch_deliver_limiter">>, Flow) - }); - Bucket -> - Path = emqx_limiter_schema:get_bucket_cfg_path(batch, Bucket), - case emqx:get_config(Path, undefined) of - undefined -> - {400, 'BAD_REQUEST', <<"The limiter bucket not exists">>}; - _ -> - Cont(Conf) - end - end; -check_bucket_exists(Conf, Cont) -> - Cont(Conf). diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 29818481d..f52fd982c 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -115,8 +115,8 @@ start_link(Pool, Id) -> init([Pool, Id]) -> erlang:process_flag(trap_exit, true), true = gproc_pool:connect_worker(Pool, {Pool, Id}), - BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined), - {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName), + BucketCfg = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined), + {ok, Limiter} = emqx_limiter_server:connect(?APP, internal, BucketCfg), {ok, #{pool => Pool, id => Id, limiter => Limiter}}. %%-------------------------------------------------------------------- @@ -155,8 +155,8 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter), {noreply, State#{limiter := Limiter2}}; handle_cast({refresh_limiter, Conf}, State) -> - BucketName = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), - {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName), + BucketCfg = emqx_map_lib:deep_get([flow_control, batch_deliver_limiter], Conf, undefined), + {ok, Limiter} = emqx_limiter_server:connect(?APP, internal, BucketCfg), {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 526059c9e..51dbf496b 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -86,7 +86,7 @@ fields(flow_control) -> )}, {batch_deliver_limiter, sc( - emqx_limiter_schema:bucket_name(), + ?R_REF(emqx_limiter_schema, internal), batch_deliver_limiter, undefined )} diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index ed49f6f5c..d7ddc2424 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -368,27 +368,16 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> - #{per_client := PerClient} = RetainerCfg = emqx_config:get([limiter, batch, bucket, retainer]), - RetainerCfg2 = RetainerCfg#{ - per_client := - PerClient#{ - rate := emqx_ratelimiter_SUITE:to_rate("1/1s"), - capacity := 1 - } - }, - emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg2), - emqx_limiter_manager:restart_server(batch), - timer:sleep(500), - - emqx_retainer_dispatcher:refresh_limiter(), - timer:sleep(500), - + Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"), + LimiterCfg = make_limiter_cfg(Rate), + JsonCfg = make_limiter_json(<<"1/1s">>), + emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), emqx_retainer:update_config(#{ <<"flow_control">> => #{ <<"batch_read_number">> => 1, <<"batch_deliver_number">> => 1, - <<"batch_deliver_limiter">> => retainer + <<"batch_deliver_limiter">> => JsonCfg } }), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), @@ -424,13 +413,14 @@ t_flow_control(_) -> ok = emqtt:disconnect(C1), - %% recover the limiter - emqx_config:put([limiter, batch, bucket, retainer], RetainerCfg), - emqx_limiter_manager:restart_server(batch), - timer:sleep(500), - - emqx_retainer_dispatcher:refresh_limiter(), - timer:sleep(500), + emqx_limiter_server:del_bucket(emqx_retainer, internal), + emqx_retainer:update_config(#{ + <<"flow_control">> => + #{ + <<"batch_read_number">> => 1, + <<"batch_deliver_number">> => 1 + } + }), ok. t_clear_expired(_) -> @@ -684,3 +674,33 @@ with_conf(ConfMod, Case) -> emqx_retainer:update_config(Conf), erlang:raise(Type, Error, Strace) end. + +make_limiter_cfg(Rate) -> + Infinity = emqx_limiter_schema:infinity_value(), + Client = #{ + rate => Rate, + initial => 0, + capacity => Infinity, + low_watermark => 1, + divisible => false, + max_retry_time => timer:seconds(5), + failure_strategy => force + }, + #{client => Client, rate => Infinity, initial => 0, capacity => Infinity}. + +make_limiter_json(Rate) -> + Client = #{ + <<"rate">> => Rate, + <<"initial">> => 0, + <<"capacity">> => <<"infinity">>, + <<"low_watermark">> => 0, + <<"divisible">> => <<"false">>, + <<"max_retry_time">> => <<"5s">>, + <<"failure_strategy">> => <<"force">> + }, + #{ + <<"client">> => Client, + <<"rate">> => <<"infinity">>, + <<"initial">> => 0, + <<"capacity">> => <<"infinity">> + }.