diff --git a/apps/emqx/i18n/emqx_limiter_i18n.conf b/apps/emqx/i18n/emqx_limiter_i18n.conf index 6fbc923b7..006a0662e 100644 --- a/apps/emqx/i18n/emqx_limiter_i18n.conf +++ b/apps/emqx/i18n/emqx_limiter_i18n.conf @@ -1,5 +1,16 @@ emqx_limiter_schema { + enable { + desc { + en: """Enable""" + zh: """是否开启""" + } + label: { + en: """Enable""" + zh: """是否开启""" + } + } + failure_strategy { desc { en: """The strategy when all the retries failed.""" diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index a29903205..b97f874b2 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -3,43 +3,4 @@ ##-------------------------------------------------------------------- limiter { - ## rate limiter for message publish - bytes_in { - bucket.default { - rate = infinity - capacity = infinity - } - } - - ## rate limiter for message publish - message_in { - bucket.default { - rate = infinity - capacity = infinity - } - } - - ## connection rate limiter - connection { - bucket.default { - rate = infinity - capacity = infinity - } - } - - ## rate limiter for message deliver - message_routing { - bucket.default { - rate = infinity - capacity = infinity - } - } - - ## rate limiter for internal batch operation - batch { - bucket.retainer { - rate = infinity - capacity = infinity - } - } } diff --git a/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl index b465fc287..c39cf2728 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_esockd_htb_limiter.erl @@ -40,7 +40,7 @@ new_create_options(Type, BucketName) -> -spec create(create_options()) -> esockd_generic_limiter:limiter(). create(#{module := ?MODULE, type := Type, bucket := BucketName}) -> - Limiter = emqx_limiter_server:connect(Type, BucketName), + {ok, Limiter} = emqx_limiter_server:connect(Type, BucketName), #{module => ?MODULE, name => Type, limiter => Limiter}. delete(_GLimiter) -> diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl index 84f32c2cf..f82a97a5a 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_container.erl @@ -89,7 +89,7 @@ new(Types, Names) -> ) -> container(). get_limiter_by_names(Types, BucketNames) -> Init = fun(Type, Acc) -> - Limiter = emqx_limiter_server:connect(Type, BucketNames), + {ok, Limiter} = emqx_limiter_server:connect(Type, BucketNames), add_new(Type, Limiter, Acc) end, lists:foldl(Init, #{retry_ctx => undefined}, Types). @@ -101,7 +101,7 @@ get_limiter_by_names(Types, BucketNames) -> container() ) -> container(). update_by_name(Type, Buckets, Container) -> - Limiter = emqx_limiter_server:connect(Type, Buckets), + {ok, Limiter} = emqx_limiter_server:connect(Type, Buckets), add_new(Type, Limiter, Container). -spec add_new(limiter_type(), limiter(), container()) -> container(). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index efd89236b..5b9413cb9 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -24,15 +24,21 @@ %% API -export([ start_link/0, - start_server/1, find_bucket/1, find_bucket/2, - insert_bucket/2, insert_bucket/3, + insert_bucket/2, + insert_bucket/3, make_path/2, - restart_server/1, post_config_update/5 ]). +-export([ + start_server/1, + start_server/2, + restart_server/1, + stop_server/1 +]). + %% gen_server callbacks -export([ init/1, @@ -67,10 +73,18 @@ start_server(Type) -> emqx_limiter_server_sup:start(Type). +-spec start_server(limiter_type(), hocons:config()) -> _. +start_server(Type, Cfg) -> + emqx_limiter_server_sup:start(Type, Cfg). + -spec restart_server(limiter_type()) -> _. restart_server(Type) -> emqx_limiter_server:restart(Type). +-spec stop_server(limiter_type()) -> _. +stop_server(Type) -> + emqx_limiter_server_sup:stop(Type). + -spec find_bucket(limiter_type(), bucket_name()) -> {ok, bucket_ref()} | undefined. find_bucket(Type, BucketName) -> @@ -103,7 +117,22 @@ make_path(Type, BucketName) -> post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) -> Config = maps:get(Type, NewConf), - emqx_limiter_server:update_config(Type, Config). + case emqx_limiter_server:whereis(Type) of + undefined -> + case Config of + #{enable := false} -> + ok; + _ -> + start_server(Type) + end; + _ -> + case Config of + #{enable := false} -> + stop_server(Type); + _ -> + emqx_limiter_server:update_config(Type, Config) + end + end. %%-------------------------------------------------------------------- %% @doc diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index da16c9f50..dd3b81f6e 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -30,7 +30,8 @@ namespace/0, get_bucket_cfg_path/2, desc/1, - types/0 + types/0, + is_enable/1 ]). -define(KILOBYTE, 1024). @@ -86,29 +87,31 @@ roots() -> [limiter]. fields(limiter) -> [ - {bytes_in, sc(ref(limiter_opts), #{desc => ?DESC(bytes_in)})}, - {message_in, sc(ref(limiter_opts), #{desc => ?DESC(message_in)})}, - {connection, sc(ref(limiter_opts), #{desc => ?DESC(connection)})}, - {message_routing, sc(ref(limiter_opts), #{desc => ?DESC(message_routing)})}, - {batch, sc(ref(limiter_opts), #{desc => ?DESC(batch)})} + {Type, sc(ref(limiter_opts), #{desc => ?DESC(Type), default => #{<<"enable">> => false}})} + || Type <- types() ]; fields(limiter_opts) -> [ - {rate, sc(rate(), #{default => "infinity", desc => ?DESC(rate)})}, + {enable, sc(boolean(), #{desc => ?DESC(enable), default => true})}, + {rate, sc(rate(), #{desc => ?DESC(rate), default => "infinity"})}, {burst, sc( burst_rate(), #{ - default => "0/0s", - desc => ?DESC(burst) + desc => ?DESC(burst), + default => 0 } )}, - {bucket, sc(map("bucket_name", ref(bucket_opts)), #{desc => ?DESC(bucket_cfg)})} + {bucket, + sc( + map("bucket_name", ref(bucket_opts)), + #{desc => ?DESC(bucket_cfg), default => #{<<"default">> => #{}}} + )} ]; fields(bucket_opts) -> [ - {rate, sc(rate(), #{desc => ?DESC(rate)})}, - {capacity, sc(capacity(), #{desc => ?DESC(capacity)})}, + {rate, sc(rate(), #{desc => ?DESC(rate), default => "infinity"})}, + {capacity, sc(capacity(), #{desc => ?DESC(capacity), default => "infinity"})}, {initial, sc(initial(), #{default => "0", desc => ?DESC(initial)})}, {per_client, sc( @@ -188,6 +191,10 @@ to_rate(Str) -> get_bucket_cfg_path(Type, BucketName) -> [limiter, Type, bucket, BucketName]. +-spec is_enable(limiter_type()) -> boolean(). +is_enable(Type) -> + emqx:get_config([limiter, Type, enable], false). + types() -> [bytes_in, message_in, connection, message_routing, batch]. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 52dab82dd..8cb763a41 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -41,8 +41,9 @@ ]). -export([ - start_link/1, + start_link/2, connect/2, + whereis/1, info/1, name/1, get_initial_val/1, @@ -88,7 +89,7 @@ -type decimal() :: emqx_limiter_decimal:decimal(). -type index() :: pos_integer(). --define(CALL(Type, Msg), gen_server:call(name(Type), Msg)). +-define(CALL(Type, Msg), call(Type, Msg)). -define(CALL(Type), ?CALL(Type, ?FUNCTION_NAME)). %% minimum coefficient for overloaded limiter @@ -107,16 +108,18 @@ limiter_type(), bucket_name() | #{limiter_type() => bucket_name() | undefined} ) -> - emqx_htb_limiter:limiter(). + {ok, emqx_htb_limiter:limiter()} | {error, _}. %% If no bucket path is set in config, there will be no limit connect(_Type, undefined) -> - emqx_htb_limiter:make_infinity_limiter(); + {ok, emqx_htb_limiter:make_infinity_limiter()}; connect(Type, BucketName) when is_atom(BucketName) -> - CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName), - case emqx:get_config(CfgPath, undefined) of + case check_enable_and_get_bucket_cfg(Type, BucketName) of undefined -> - ?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}), - throw("bucket's config not found"); + ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}), + {error, config_not_found}; + limiter_not_started -> + ?SLOG(error, #{msg => "limiter_not_started", type => Type, bucket => BucketName}), + {error, limiter_not_started}; #{ rate := AggrRate, capacity := AggrSize, @@ -124,23 +127,24 @@ connect(Type, BucketName) when is_atom(BucketName) -> } -> case emqx_limiter_manager:find_bucket(Type, BucketName) of {ok, Bucket} -> - if - CliRate < AggrRate orelse CliSize < AggrSize -> - emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); - Bucket =:= infinity -> - emqx_htb_limiter:make_infinity_limiter(); - true -> - emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) - end; + {ok, + if + CliRate < AggrRate orelse CliSize < AggrSize -> + emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); + Bucket =:= infinity -> + emqx_htb_limiter:make_infinity_limiter(); + true -> + emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) + end}; undefined -> - ?SLOG(error, #{msg => "bucket_not_found", path => CfgPath}), - throw("invalid bucket") + ?SLOG(error, #{msg => "bucket_not_found", type => Type, bucket => BucketName}), + {error, invalid_bucket} end end; connect(Type, Paths) -> connect(Type, maps:get(Type, Paths, undefined)). --spec info(limiter_type()) -> state(). +-spec info(limiter_type()) -> state() | {error, _}. info(Type) -> ?CALL(Type). @@ -148,22 +152,26 @@ info(Type) -> name(Type) -> erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])). --spec restart(limiter_type()) -> ok. +-spec restart(limiter_type()) -> ok | {error, _}. restart(Type) -> ?CALL(Type). --spec update_config(limiter_type(), hocons:config()) -> ok. +-spec update_config(limiter_type(), hocons:config()) -> ok | {error, _}. update_config(Type, Config) -> ?CALL(Type, {update_config, Type, Config}). +-spec whereis(limiter_type()) -> pid() | undefined. +whereis(Type) -> + erlang:whereis(name(Type)). + %%-------------------------------------------------------------------- %% @doc %% Starts the server %% @end %%-------------------------------------------------------------------- --spec start_link(limiter_type()) -> _. -start_link(Type) -> - gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). +-spec start_link(limiter_type(), hocons:config()) -> _. +start_link(Type, Cfg) -> + gen_server:start_link({local, name(Type)}, ?MODULE, [Type, Cfg], []). %%-------------------------------------------------------------------- %%% gen_server callbacks @@ -181,8 +189,8 @@ start_link(Type) -> | {ok, State :: term(), hibernate} | {stop, Reason :: term()} | ignore. -init([Type]) -> - State = init_tree(Type), +init([Type, Cfg]) -> + State = init_tree(Type, Cfg), #{root := #{period := Perido}} = State, oscillate(Perido), {ok, State}. @@ -597,3 +605,23 @@ get_initial_val(#{ true -> 0 end. + +-spec call(limiter_type(), any()) -> {error, _} | _. +call(Type, Msg) -> + case ?MODULE:whereis(Type) of + undefined -> + {error, limiter_not_started}; + Pid -> + gen_server:call(Pid, Msg) + end. + +-spec check_enable_and_get_bucket_cfg(limiter_type(), bucket_name()) -> + undefined | limiter_not_started | hocons:config(). +check_enable_and_get_bucket_cfg(Type, Bucket) -> + case emqx_limiter_schema:is_enable(Type) of + false -> + limiter_not_started; + _ -> + Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket), + emqx:get_config(Path, undefined) + end. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl index 71bc26eb2..4a30401f1 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -19,7 +19,7 @@ -behaviour(supervisor). %% API --export([start_link/0, start/1]). +-export([start_link/0, start/1, start/2, stop/1]). %% Supervisor callbacks -export([init/1]). @@ -47,6 +47,15 @@ start(Type) -> Spec = make_child(Type), supervisor:start_child(?MODULE, Spec). +-spec start(emqx_limiter_schema:limiter_type(), hocons:config()) -> _. +start(Type, Cfg) -> + Spec = make_child(Type, Cfg), + supervisor:start_child(?MODULE, Spec). + +stop(Type) -> + Id = emqx_limiter_server:name(Type), + supervisor:terminate_child(?MODULE, Id). + %%-------------------------------------------------------------------- %% Supervisor callbacks %%-------------------------------------------------------------------- @@ -76,10 +85,14 @@ init([]) -> %% Internal functions %%--================================================================== make_child(Type) -> + Cfg = emqx:get_config([limiter, Type]), + make_child(Type, Cfg). + +make_child(Type, Cfg) -> Id = emqx_limiter_server:name(Type), #{ id => Id, - start => {emqx_limiter_server, start_link, [Type]}, + start => {emqx_limiter_server, start_link, [Type, Cfg]}, restart => transient, shutdown => 5000, type => worker, @@ -88,5 +101,13 @@ make_child(Type) -> childs() -> Conf = emqx:get_config([limiter]), - Types = maps:keys(Conf), - [make_child(Type) || Type <- Types]. + lists:foldl( + fun + ({Type, #{enable := true}}, Acc) -> + [make_child(Type) | Acc]; + (_, Acc) -> + Acc + end, + [], + maps:to_list(Conf) + ). diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 9ceaa7c36..9c1372c42 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -749,7 +749,8 @@ delay_return(Case) -> end. connect(Name) -> - emqx_limiter_server:connect(message_routing, Name). + {ok, Limiter} = emqx_limiter_server:connect(message_routing, Name), + Limiter. check_average_rate(Counter, Second, Rate) -> Cost = counters:get(Counter, 1), diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 051f44940..d82dc10ad 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -56,16 +56,6 @@ retainer { ## ## Default: 0 batch_deliver_number = 0 - - ## The rate limiter name for retained messages delivery. - ## In order to avoid delivering too many messages to the client at once, which may cause the client - ## to block or crash, or message dropped due to exceeding the size of the message queue. We need - ## to specify a rate limiter for the retained messages delivery to the client. - ## - ## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`. - ## You can remove this field if you don't want any limit - ## Default: retainer - batch_deliver_limiter = retainer } ## Maximum retained message size. diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 1c39958b2..36672bcc1 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -111,8 +111,8 @@ start_link(Pool, Id) -> init([Pool, Id]) -> erlang:process_flag(trap_exit, true), true = gproc_pool:connect_worker(Pool, {Pool, Id}), - BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), - Limiter = emqx_limiter_server:connect(batch, BucketName), + BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined), + {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName), {ok, #{pool => Pool, id => Id, limiter => Limiter}}. %%-------------------------------------------------------------------- @@ -152,7 +152,7 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) -> {noreply, State#{limiter := Limiter2}}; handle_cast(refresh_limiter, State) -> BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), - Limiter = emqx_limiter_server:connect(batch, BucketName), + {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName), {noreply, State#{limiter := Limiter}}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),