Merge pull request #8016 from lafirest/fix/simplify_limiter_cfg

fix(limiter): Simplified limiter configuration
This commit is contained in:
zhongwencool 2022-05-24 14:01:08 +08:00 committed by GitHub
commit 875b40a4fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 174 additions and 116 deletions

View File

@ -1,5 +1,16 @@
emqx_limiter_schema { emqx_limiter_schema {
enable {
desc {
en: """Enable"""
zh: """是否开启"""
}
label: {
en: """Enable"""
zh: """是否开启"""
}
}
failure_strategy { failure_strategy {
desc { desc {
en: """The strategy when all the retries failed.""" en: """The strategy when all the retries failed."""

View File

@ -2,44 +2,4 @@
## EMQX Rate Limiter ## EMQX Rate Limiter
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
limiter { limiter {}
## rate limiter for message publish
bytes_in {
bucket.default {
rate = infinity
capacity = infinity
}
}
## rate limiter for message publish
message_in {
bucket.default {
rate = infinity
capacity = infinity
}
}
## connection rate limiter
connection {
bucket.default {
rate = infinity
capacity = infinity
}
}
## rate limiter for message deliver
message_routing {
bucket.default {
rate = infinity
capacity = infinity
}
}
## rate limiter for internal batch operation
batch {
bucket.retainer {
rate = infinity
capacity = infinity
}
}
}

View File

@ -40,7 +40,7 @@ new_create_options(Type, BucketName) ->
-spec create(create_options()) -> esockd_generic_limiter:limiter(). -spec create(create_options()) -> esockd_generic_limiter:limiter().
create(#{module := ?MODULE, type := Type, bucket := BucketName}) -> create(#{module := ?MODULE, type := Type, bucket := BucketName}) ->
Limiter = emqx_limiter_server:connect(Type, BucketName), {ok, Limiter} = emqx_limiter_server:connect(Type, BucketName),
#{module => ?MODULE, name => Type, limiter => Limiter}. #{module => ?MODULE, name => Type, limiter => Limiter}.
delete(_GLimiter) -> delete(_GLimiter) ->

View File

@ -89,7 +89,7 @@ new(Types, Names) ->
) -> container(). ) -> container().
get_limiter_by_names(Types, BucketNames) -> get_limiter_by_names(Types, BucketNames) ->
Init = fun(Type, Acc) -> Init = fun(Type, Acc) ->
Limiter = emqx_limiter_server:connect(Type, BucketNames), {ok, Limiter} = emqx_limiter_server:connect(Type, BucketNames),
add_new(Type, Limiter, Acc) add_new(Type, Limiter, Acc)
end, end,
lists:foldl(Init, #{retry_ctx => undefined}, Types). lists:foldl(Init, #{retry_ctx => undefined}, Types).
@ -101,7 +101,7 @@ get_limiter_by_names(Types, BucketNames) ->
container() container()
) -> container(). ) -> container().
update_by_name(Type, Buckets, Container) -> update_by_name(Type, Buckets, Container) ->
Limiter = emqx_limiter_server:connect(Type, Buckets), {ok, Limiter} = emqx_limiter_server:connect(Type, Buckets),
add_new(Type, Limiter, Container). add_new(Type, Limiter, Container).
-spec add_new(limiter_type(), limiter(), container()) -> container(). -spec add_new(limiter_type(), limiter(), container()) -> container().

View File

@ -24,15 +24,21 @@
%% API %% API
-export([ -export([
start_link/0, start_link/0,
start_server/1,
find_bucket/1, find_bucket/1,
find_bucket/2, find_bucket/2,
insert_bucket/2, insert_bucket/3, insert_bucket/2,
insert_bucket/3,
make_path/2, make_path/2,
restart_server/1,
post_config_update/5 post_config_update/5
]). ]).
-export([
start_server/1,
start_server/2,
restart_server/1,
stop_server/1
]).
%% gen_server callbacks %% gen_server callbacks
-export([ -export([
init/1, init/1,
@ -67,10 +73,18 @@
start_server(Type) -> start_server(Type) ->
emqx_limiter_server_sup:start(Type). emqx_limiter_server_sup:start(Type).
-spec start_server(limiter_type(), hocons:config()) -> _.
start_server(Type, Cfg) ->
emqx_limiter_server_sup:start(Type, Cfg).
-spec restart_server(limiter_type()) -> _. -spec restart_server(limiter_type()) -> _.
restart_server(Type) -> restart_server(Type) ->
emqx_limiter_server:restart(Type). emqx_limiter_server:restart(Type).
-spec stop_server(limiter_type()) -> _.
stop_server(Type) ->
emqx_limiter_server_sup:stop(Type).
-spec find_bucket(limiter_type(), bucket_name()) -> -spec find_bucket(limiter_type(), bucket_name()) ->
{ok, bucket_ref()} | undefined. {ok, bucket_ref()} | undefined.
find_bucket(Type, BucketName) -> find_bucket(Type, BucketName) ->
@ -103,7 +117,22 @@ make_path(Type, BucketName) ->
post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) -> post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
Config = maps:get(Type, NewConf), Config = maps:get(Type, NewConf),
emqx_limiter_server:update_config(Type, Config). case emqx_limiter_server:whereis(Type) of
undefined ->
case Config of
#{enable := false} ->
ok;
_ ->
start_server(Type)
end;
_ ->
case Config of
#{enable := false} ->
stop_server(Type);
_ ->
emqx_limiter_server:update_config(Type, Config)
end
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc

View File

@ -30,7 +30,8 @@
namespace/0, namespace/0,
get_bucket_cfg_path/2, get_bucket_cfg_path/2,
desc/1, desc/1,
types/0 types/0,
is_enable/1
]). ]).
-define(KILOBYTE, 1024). -define(KILOBYTE, 1024).
@ -86,29 +87,31 @@ roots() -> [limiter].
fields(limiter) -> fields(limiter) ->
[ [
{bytes_in, sc(ref(limiter_opts), #{desc => ?DESC(bytes_in)})}, {Type, sc(ref(limiter_opts), #{desc => ?DESC(Type), default => #{<<"enable">> => false}})}
{message_in, sc(ref(limiter_opts), #{desc => ?DESC(message_in)})}, || Type <- types()
{connection, sc(ref(limiter_opts), #{desc => ?DESC(connection)})},
{message_routing, sc(ref(limiter_opts), #{desc => ?DESC(message_routing)})},
{batch, sc(ref(limiter_opts), #{desc => ?DESC(batch)})}
]; ];
fields(limiter_opts) -> fields(limiter_opts) ->
[ [
{rate, sc(rate(), #{default => "infinity", desc => ?DESC(rate)})}, {enable, sc(boolean(), #{desc => ?DESC(enable), default => true})},
{rate, sc(rate(), #{desc => ?DESC(rate), default => "infinity"})},
{burst, {burst,
sc( sc(
burst_rate(), burst_rate(),
#{ #{
default => "0/0s", desc => ?DESC(burst),
desc => ?DESC(burst) default => 0
} }
)}, )},
{bucket, sc(map("bucket_name", ref(bucket_opts)), #{desc => ?DESC(bucket_cfg)})} {bucket,
sc(
map("bucket_name", ref(bucket_opts)),
#{desc => ?DESC(bucket_cfg), default => #{<<"default">> => #{}}}
)}
]; ];
fields(bucket_opts) -> fields(bucket_opts) ->
[ [
{rate, sc(rate(), #{desc => ?DESC(rate)})}, {rate, sc(rate(), #{desc => ?DESC(rate), default => "infinity"})},
{capacity, sc(capacity(), #{desc => ?DESC(capacity)})}, {capacity, sc(capacity(), #{desc => ?DESC(capacity), default => "infinity"})},
{initial, sc(initial(), #{default => "0", desc => ?DESC(initial)})}, {initial, sc(initial(), #{default => "0", desc => ?DESC(initial)})},
{per_client, {per_client,
sc( sc(
@ -188,6 +191,10 @@ to_rate(Str) ->
get_bucket_cfg_path(Type, BucketName) -> get_bucket_cfg_path(Type, BucketName) ->
[limiter, Type, bucket, BucketName]. [limiter, Type, bucket, BucketName].
-spec is_enable(limiter_type()) -> boolean().
is_enable(Type) ->
emqx:get_config([limiter, Type, enable], false).
types() -> types() ->
[bytes_in, message_in, connection, message_routing, batch]. [bytes_in, message_in, connection, message_routing, batch].

View File

@ -41,8 +41,9 @@
]). ]).
-export([ -export([
start_link/1, start_link/2,
connect/2, connect/2,
whereis/1,
info/1, info/1,
name/1, name/1,
get_initial_val/1, get_initial_val/1,
@ -88,7 +89,7 @@
-type decimal() :: emqx_limiter_decimal:decimal(). -type decimal() :: emqx_limiter_decimal:decimal().
-type index() :: pos_integer(). -type index() :: pos_integer().
-define(CALL(Type, Msg), gen_server:call(name(Type), Msg)). -define(CALL(Type, Msg), call(Type, Msg)).
-define(CALL(Type), ?CALL(Type, ?FUNCTION_NAME)). -define(CALL(Type), ?CALL(Type, ?FUNCTION_NAME)).
%% minimum coefficient for overloaded limiter %% minimum coefficient for overloaded limiter
@ -107,16 +108,18 @@
limiter_type(), limiter_type(),
bucket_name() | #{limiter_type() => bucket_name() | undefined} bucket_name() | #{limiter_type() => bucket_name() | undefined}
) -> ) ->
emqx_htb_limiter:limiter(). {ok, emqx_htb_limiter:limiter()} | {error, _}.
%% If no bucket path is set in config, there will be no limit %% If no bucket path is set in config, there will be no limit
connect(_Type, undefined) -> connect(_Type, undefined) ->
emqx_htb_limiter:make_infinity_limiter(); {ok, emqx_htb_limiter:make_infinity_limiter()};
connect(Type, BucketName) when is_atom(BucketName) -> connect(Type, BucketName) when is_atom(BucketName) ->
CfgPath = emqx_limiter_schema:get_bucket_cfg_path(Type, BucketName), case check_enable_and_get_bucket_cfg(Type, BucketName) of
case emqx:get_config(CfgPath, undefined) of
undefined -> undefined ->
?SLOG(error, #{msg => "bucket_config_not_found", path => CfgPath}), ?SLOG(error, #{msg => "bucket_config_not_found", type => Type, bucket => BucketName}),
throw("bucket's config not found"); {error, config_not_found};
limiter_not_started ->
?SLOG(error, #{msg => "limiter_not_started", type => Type, bucket => BucketName}),
{error, limiter_not_started};
#{ #{
rate := AggrRate, rate := AggrRate,
capacity := AggrSize, capacity := AggrSize,
@ -124,6 +127,7 @@ connect(Type, BucketName) when is_atom(BucketName) ->
} -> } ->
case emqx_limiter_manager:find_bucket(Type, BucketName) of case emqx_limiter_manager:find_bucket(Type, BucketName) of
{ok, Bucket} -> {ok, Bucket} ->
{ok,
if if
CliRate < AggrRate orelse CliSize < AggrSize -> CliRate < AggrRate orelse CliSize < AggrSize ->
emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket); emqx_htb_limiter:make_token_bucket_limiter(Cfg, Bucket);
@ -131,16 +135,16 @@ connect(Type, BucketName) when is_atom(BucketName) ->
emqx_htb_limiter:make_infinity_limiter(); emqx_htb_limiter:make_infinity_limiter();
true -> true ->
emqx_htb_limiter:make_ref_limiter(Cfg, Bucket) emqx_htb_limiter:make_ref_limiter(Cfg, Bucket)
end; end};
undefined -> undefined ->
?SLOG(error, #{msg => "bucket_not_found", path => CfgPath}), ?SLOG(error, #{msg => "bucket_not_found", type => Type, bucket => BucketName}),
throw("invalid bucket") {error, invalid_bucket}
end end
end; end;
connect(Type, Paths) -> connect(Type, Paths) ->
connect(Type, maps:get(Type, Paths, undefined)). connect(Type, maps:get(Type, Paths, undefined)).
-spec info(limiter_type()) -> state(). -spec info(limiter_type()) -> state() | {error, _}.
info(Type) -> info(Type) ->
?CALL(Type). ?CALL(Type).
@ -148,22 +152,26 @@ info(Type) ->
name(Type) -> name(Type) ->
erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])). erlang:list_to_atom(io_lib:format("~s_~s", [?MODULE, Type])).
-spec restart(limiter_type()) -> ok. -spec restart(limiter_type()) -> ok | {error, _}.
restart(Type) -> restart(Type) ->
?CALL(Type). ?CALL(Type).
-spec update_config(limiter_type(), hocons:config()) -> ok. -spec update_config(limiter_type(), hocons:config()) -> ok | {error, _}.
update_config(Type, Config) -> update_config(Type, Config) ->
?CALL(Type, {update_config, Type, Config}). ?CALL(Type, {update_config, Type, Config}).
-spec whereis(limiter_type()) -> pid() | undefined.
whereis(Type) ->
erlang:whereis(name(Type)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc %% @doc
%% Starts the server %% Starts the server
%% @end %% @end
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec start_link(limiter_type()) -> _. -spec start_link(limiter_type(), hocons:config()) -> _.
start_link(Type) -> start_link(Type, Cfg) ->
gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). gen_server:start_link({local, name(Type)}, ?MODULE, [Type, Cfg], []).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% gen_server callbacks %%% gen_server callbacks
@ -181,8 +189,8 @@ start_link(Type) ->
| {ok, State :: term(), hibernate} | {ok, State :: term(), hibernate}
| {stop, Reason :: term()} | {stop, Reason :: term()}
| ignore. | ignore.
init([Type]) -> init([Type, Cfg]) ->
State = init_tree(Type), State = init_tree(Type, Cfg),
#{root := #{period := Perido}} = State, #{root := #{period := Perido}} = State,
oscillate(Perido), oscillate(Perido),
{ok, State}. {ok, State}.
@ -597,3 +605,23 @@ get_initial_val(#{
true -> true ->
0 0
end. end.
-spec call(limiter_type(), any()) -> {error, _} | _.
call(Type, Msg) ->
case ?MODULE:whereis(Type) of
undefined ->
{error, limiter_not_started};
Pid ->
gen_server:call(Pid, Msg)
end.
-spec check_enable_and_get_bucket_cfg(limiter_type(), bucket_name()) ->
undefined | limiter_not_started | hocons:config().
check_enable_and_get_bucket_cfg(Type, Bucket) ->
case emqx_limiter_schema:is_enable(Type) of
false ->
limiter_not_started;
_ ->
Path = emqx_limiter_schema:get_bucket_cfg_path(Type, Bucket),
emqx:get_config(Path, undefined)
end.

View File

@ -19,7 +19,7 @@
-behaviour(supervisor). -behaviour(supervisor).
%% API %% API
-export([start_link/0, start/1]). -export([start_link/0, start/1, start/2, stop/1]).
%% Supervisor callbacks %% Supervisor callbacks
-export([init/1]). -export([init/1]).
@ -47,6 +47,16 @@ start(Type) ->
Spec = make_child(Type), Spec = make_child(Type),
supervisor:start_child(?MODULE, Spec). supervisor:start_child(?MODULE, Spec).
-spec start(emqx_limiter_schema:limiter_type(), hocons:config()) -> _.
start(Type, Cfg) ->
Spec = make_child(Type, Cfg),
supervisor:start_child(?MODULE, Spec).
stop(Type) ->
Id = emqx_limiter_server:name(Type),
_ = supervisor:terminate_child(?MODULE, Id),
supervisor:delete_child(?MODULE, Id).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Supervisor callbacks %% Supervisor callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -76,10 +86,14 @@ init([]) ->
%% Internal functions %% Internal functions
%%--================================================================== %%--==================================================================
make_child(Type) -> make_child(Type) ->
Cfg = emqx:get_config([limiter, Type]),
make_child(Type, Cfg).
make_child(Type, Cfg) ->
Id = emqx_limiter_server:name(Type), Id = emqx_limiter_server:name(Type),
#{ #{
id => Id, id => Id,
start => {emqx_limiter_server, start_link, [Type]}, start => {emqx_limiter_server, start_link, [Type, Cfg]},
restart => transient, restart => transient,
shutdown => 5000, shutdown => 5000,
type => worker, type => worker,
@ -88,5 +102,13 @@ make_child(Type) ->
childs() -> childs() ->
Conf = emqx:get_config([limiter]), Conf = emqx:get_config([limiter]),
Types = maps:keys(Conf), lists:foldl(
[make_child(Type) || Type <- Types]. fun
({Type, #{enable := true}}, Acc) ->
[make_child(Type) | Acc];
(_, Acc) ->
Acc
end,
[],
maps:to_list(Conf)
).

View File

@ -151,7 +151,7 @@ start_apps(Apps, Handler) when is_function(Handler) ->
%% Because, minirest, ekka etc.. application will scan these modules %% Because, minirest, ekka etc.. application will scan these modules
lists:foreach(fun load/1, [emqx | Apps]), lists:foreach(fun load/1, [emqx | Apps]),
ok = start_ekka(), ok = start_ekka(),
ok = emqx_ratelimiter_SUITE:base_conf(), ok = emqx_ratelimiter_SUITE:load_conf(),
lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]). lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
load(App) -> load(App) ->

View File

@ -29,6 +29,7 @@
"\n" "\n"
"limiter {\n" "limiter {\n"
" bytes_in {\n" " bytes_in {\n"
" enable = true\n"
" bucket.default {\n" " bucket.default {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -36,6 +37,7 @@
" }\n" " }\n"
"\n" "\n"
" message_in {\n" " message_in {\n"
" enable = true\n"
" bucket.default {\n" " bucket.default {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -43,6 +45,7 @@
" }\n" " }\n"
"\n" "\n"
" connection {\n" " connection {\n"
" enable = true\n"
" bucket.default {\n" " bucket.default {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -50,6 +53,7 @@
" }\n" " }\n"
"\n" "\n"
" message_routing {\n" " message_routing {\n"
" enable = true\n"
" bucket.default {\n" " bucket.default {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -57,6 +61,7 @@
" }\n" " }\n"
"\n" "\n"
" batch {\n" " batch {\n"
" enable = true\n"
" bucket.retainer {\n" " bucket.retainer {\n"
" rate = infinity\n" " rate = infinity\n"
" capacity = infinity\n" " capacity = infinity\n"
@ -79,7 +84,6 @@
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)). -define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-define(RATE(Rate), to_rate(Rate)). -define(RATE(Rate), to_rate(Rate)).
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
-define(CONST(X), fun(_) -> X end).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Setups %% Setups
@ -98,9 +102,12 @@ end_per_suite(_Config) ->
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Config.
base_conf() -> load_conf() ->
emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF). emqx_common_test_helpers:load_config(emqx_limiter_schema, ?BASE_CONF).
init_config() ->
emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases Bucket Level %% Test Cases Bucket Level
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -749,7 +756,8 @@ delay_return(Case) ->
end. end.
connect(Name) -> connect(Name) ->
emqx_limiter_server:connect(message_routing, Name). {ok, Limiter} = emqx_limiter_server:connect(message_routing, Name),
Limiter.
check_average_rate(Counter, Second, Rate) -> check_average_rate(Counter, Second, Rate) ->
Cost = counters:get(Counter, 1), Cost = counters:get(Counter, 1),

View File

@ -56,16 +56,6 @@ retainer {
## ##
## Default: 0 ## Default: 0
batch_deliver_number = 0 batch_deliver_number = 0
## The rate limiter name for retained messages delivery.
## In order to avoid delivering too many messages to the client at once, which may cause the client
## to block or crash, or message dropped due to exceeding the size of the message queue. We need
## to specify a rate limiter for the retained messages delivery to the client.
##
## The names of the available rate limiters are taken from the existing rate limiters under `limiter.batch`.
## You can remove this field if you don't want any limit
## Default: retainer
batch_deliver_limiter = retainer
} }
## Maximum retained message size. ## Maximum retained message size.

View File

@ -111,8 +111,8 @@ start_link(Pool, Id) ->
init([Pool, Id]) -> init([Pool, Id]) ->
erlang:process_flag(trap_exit, true), erlang:process_flag(trap_exit, true),
true = gproc_pool:connect_worker(Pool, {Pool, Id}), true = gproc_pool:connect_worker(Pool, {Pool, Id}),
BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined),
Limiter = emqx_limiter_server:connect(batch, BucketName), {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
{ok, #{pool => Pool, id => Id, limiter => Limiter}}. {ok, #{pool => Pool, id => Id, limiter => Limiter}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -152,7 +152,7 @@ handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
{noreply, State#{limiter := Limiter2}}; {noreply, State#{limiter := Limiter2}};
handle_cast(refresh_limiter, State) -> handle_cast(refresh_limiter, State) ->
BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]), BucketName = emqx:get_config([retainer, flow_control, batch_deliver_limiter]),
Limiter = emqx_limiter_server:connect(batch, BucketName), {ok, Limiter} = emqx_limiter_server:connect(batch, BucketName),
{noreply, State#{limiter := Limiter}}; {noreply, State#{limiter := Limiter}};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),

View File

@ -71,16 +71,17 @@ common_tests() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init_per_suite(Config) -> init_per_suite(Config) ->
load_base_conf(), emqx_common_test_helpers:start_apps([emqx_conf]),
emqx_ratelimiter_SUITE:base_conf(), load_conf(),
emqx_common_test_helpers:start_apps([emqx_conf, ?APP]), emqx_limiter_sup:start_link(),
timer:sleep(200),
ok = application:ensure_started(?APP),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ekka:stop(), ekka:stop(),
mria:stop(), mria:stop(),
mria_mnesia:delete_schema(), mria_mnesia:delete_schema(),
emqx_common_test_helpers:stop_apps([?APP, emqx_conf]). emqx_common_test_helpers:stop_apps([?APP, emqx_conf]).
init_per_group(mnesia_without_indices, Config) -> init_per_group(mnesia_without_indices, Config) ->
@ -111,8 +112,10 @@ init_per_testcase(t_get_basic_usage_info, Config) ->
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Config.
load_base_conf() -> load_conf() ->
ok = emqx_common_test_helpers:load_config(emqx_retainer_schema, ?BASE_CONF). ok = emqx_config:delete_override_conf_files(),
emqx_ratelimiter_SUITE:init_config(),
ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases %% Test Cases

View File

@ -34,7 +34,7 @@ init_per_suite(Config) ->
application:load(emqx_conf), application:load(emqx_conf),
ok = ekka:start(), ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
emqx_retainer_SUITE:load_base_conf(), emqx_retainer_SUITE:load_conf(),
emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]), emqx_mgmt_api_test_util:init_suite([emqx_retainer, emqx_conf]),
%% make sure no "$SYS/#" topics %% make sure no "$SYS/#" topics
emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}), emqx_conf:update([sys_topics], raw_systopic_conf(), #{override_to => cluster}),

View File

@ -27,7 +27,7 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_retainer_SUITE:load_base_conf(), emqx_retainer_SUITE:load_conf(),
%% Start Apps %% Start Apps
emqx_common_test_helpers:start_apps([emqx_retainer]), emqx_common_test_helpers:start_apps([emqx_retainer]),
Config. Config.

View File

@ -24,7 +24,7 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_retainer_SUITE:load_base_conf(), emqx_retainer_SUITE:load_conf(),
%% Start Apps %% Start Apps
emqx_common_test_helpers:start_apps([emqx_retainer]), emqx_common_test_helpers:start_apps([emqx_retainer]),
Config. Config.