From 6dbddfb089c8f2e0659cb0b0fdaae6770155d0d0 Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 10 Aug 2023 15:29:09 +0800 Subject: [PATCH] fix(limiter): simplify the code of the limiter move runtime code out from the schema --- .../src/emqx_limiter/src/emqx_htb_limiter.erl | 2 +- .../src/emqx_limiter_correction.erl | 36 ---- .../emqx_limiter/src/emqx_limiter_decimal.erl | 16 +- .../emqx_limiter/src/emqx_limiter_manager.erl | 2 +- .../emqx_limiter/src/emqx_limiter_schema.erl | 131 +-------------- .../emqx_limiter/src/emqx_limiter_server.erl | 16 +- .../src/emqx_limiter_server_sup.erl | 2 +- .../emqx_limiter/src/emqx_limiter_utils.erl | 158 ++++++++++++++++++ apps/emqx/src/emqx_listeners.erl | 4 +- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 22 +-- 10 files changed, 197 insertions(+), 192 deletions(-) delete mode 100644 apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl create mode 100644 apps/emqx/src/emqx_limiter/src/emqx_limiter_utils.erl diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index bcd4166af..7f50161a8 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -139,7 +139,7 @@ make_local_limiter(Cfg, Bucket) -> tokens => emqx_limiter_server:get_initial_val(Cfg), lasttime => ?NOW, bucket => Bucket, - capacity => emqx_limiter_schema:calc_capacity(Cfg) + capacity => emqx_limiter_utils:calc_capacity(Cfg) }. %%@doc create a limiter server's reference diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl deleted file mode 100644 index 013c23e61..000000000 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_correction.erl +++ /dev/null @@ -1,36 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_limiter_correction). - -%% API --export([add/2]). - --type correction_value() :: #{ - correction := emqx_limiter_decimal:zero_or_float(), - any() => any() -}. - --export_type([correction_value/0]). - -%%-------------------------------------------------------------------- -%%% API -%%-------------------------------------------------------------------- --spec add(number(), correction_value()) -> {integer(), correction_value()}. -add(Inc, #{correction := Correction} = Data) -> - FixedInc = Inc + Correction, - IntInc = erlang:floor(FixedInc), - {IntInc, Data#{correction := FixedInc - IntInc}}. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl index 33ba0e511..6bf4e9b20 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_decimal.erl @@ -24,13 +24,19 @@ sub/2, mul/2, put_to_counter/3, - floor_div/2 + floor_div/2, + precisely_add/2 ]). --export_type([decimal/0, zero_or_float/0]). +-export_type([decimal/0, zero_or_float/0, correction_value/0]). -type decimal() :: infinity | number(). -type zero_or_float() :: 0 | float(). +-type correction_value() :: #{ + correction := emqx_limiter_decimal:zero_or_float(), + any() => any() +}. + %%-------------------------------------------------------------------- %%% API %%-------------------------------------------------------------------- @@ -43,6 +49,12 @@ add(A, B) when add(A, B) -> A + B. +-spec precisely_add(number(), correction_value()) -> {integer(), correction_value()}. +precisely_add(Inc, #{correction := Correction} = Data) -> + FixedInc = Inc + Correction, + IntInc = erlang:floor(FixedInc), + {IntInc, Data#{correction := FixedInc - IntInc}}. + -spec sub(decimal(), decimal()) -> decimal(). sub(A, B) when A =:= infinity orelse diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl index afabc2580..91d59b3be 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_manager.erl @@ -131,7 +131,7 @@ delete_root(Type) -> delete_bucket(?ROOT_ID, Type). post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) -> - Conf = emqx_limiter_schema:convert_node_opts(NewConf), + Conf = emqx_limiter_utils:convert_node_opts(NewConf), _ = [on_post_config_update(Type, Cfg) || {Type, Cfg} <- maps:to_list(Conf)], ok. diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index e2951c302..802b29837 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -33,14 +33,7 @@ desc/1, types/0, short_paths/0, - calc_capacity/1, - extract_with_type/2, - default_client_config/0, - default_bucket_config/0, - short_paths_fields/1, - get_listener_opts/1, - get_node_opts/1, - convert_node_opts/1 + short_paths_fields/1 ]). -define(KILOBYTE, 1024). @@ -263,80 +256,6 @@ types() -> short_paths() -> [max_conn_rate, messages_rate, bytes_rate]. -calc_capacity(#{rate := infinity}) -> - infinity; -calc_capacity(#{rate := Rate, burst := Burst}) -> - erlang:floor(1000 * Rate / default_period()) + Burst. - -extract_with_type(_Type, undefined) -> - undefined; -extract_with_type(Type, #{client := ClientCfg} = BucketCfg) -> - BucketVal = maps:find(Type, BucketCfg), - ClientVal = maps:find(Type, ClientCfg), - merge_client_bucket(Type, ClientVal, BucketVal); -extract_with_type(Type, BucketCfg) -> - BucketVal = maps:find(Type, BucketCfg), - merge_client_bucket(Type, undefined, BucketVal). - -%% Since the client configuration can be absent and be a undefined value, -%% but we must need some basic settings to control the behaviour of the limiter, -%% so here add this helper function to generate a default setting. -%% This is a temporary workaround until we found a better way to simplify. -default_client_config() -> - #{ - rate => infinity, - initial => 0, - low_watermark => 0, - burst => 0, - divisible => true, - max_retry_time => timer:hours(1), - failure_strategy => force - }. - -default_bucket_config() -> - #{ - rate => infinity, - burst => 0, - initial => 0 - }. - -get_listener_opts(Conf) -> - Limiter = maps:get(limiter, Conf, undefined), - ShortPaths = maps:with(short_paths(), Conf), - get_listener_opts(Limiter, ShortPaths). - -get_node_opts(Type) -> - Opts = emqx:get_config([limiter, Type], default_bucket_config()), - case type_to_short_path_name(Type) of - undefined -> - Opts; - Name -> - case emqx:get_config([limiter, Name], undefined) of - undefined -> - Opts; - Rate -> - Opts#{rate := Rate} - end - end. - -convert_node_opts(Conf) -> - DefBucket = default_bucket_config(), - ShorPaths = short_paths(), - Fun = fun - %% The `client` in the node options was deprecated - (client, _Value, Acc) -> - Acc; - (Name, Value, Acc) -> - case lists:member(Name, ShorPaths) of - true -> - Type = short_path_name_to_type(Name), - Acc#{Type => DefBucket#{rate => Value}}; - _ -> - Acc#{Name => Value} - end - end, - maps:fold(Fun, #{}, Conf). - %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -538,51 +457,3 @@ alias_of_type(bytes) -> [bytes_in]; alias_of_type(_) -> []. - -merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) -> - #{Type => BucketVal, client => #{Type => ClientVal}}; -merge_client_bucket(Type, {ok, ClientVal}, _) -> - #{client => #{Type => ClientVal}}; -merge_client_bucket(Type, _, {ok, BucketVal}) -> - #{Type => BucketVal}; -merge_client_bucket(_, _, _) -> - undefined. - -short_path_name_to_type(max_conn_rate) -> - connection; -short_path_name_to_type(messages_rate) -> - messages; -short_path_name_to_type(bytes_rate) -> - bytes. - -type_to_short_path_name(connection) -> - max_conn_rate; -type_to_short_path_name(messages) -> - messages_rate; -type_to_short_path_name(bytes) -> - bytes_rate; -type_to_short_path_name(_) -> - undefined. - -get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 -> - Limiter; -get_listener_opts(undefined, ShortPaths) -> - convert_listener_short_paths(ShortPaths); -get_listener_opts(Limiter, ShortPaths) -> - Shorts = convert_listener_short_paths(ShortPaths), - emqx_utils_maps:deep_merge(Limiter, Shorts). - -convert_listener_short_paths(ShortPaths) -> - DefBucket = default_bucket_config(), - DefClient = default_client_config(), - Fun = fun(Name, Rate, Acc) -> - Type = short_path_name_to_type(Name), - case Name of - max_conn_rate -> - Acc#{Type => DefBucket#{rate => Rate}}; - _ -> - Client = maps:get(client, Acc, #{}), - Acc#{client => Client#{Type => DefClient#{rate => Rate}}} - end - end, - maps:fold(Fun, #{}, ShortPaths). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index fcb1fd66c..00d255c9c 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -383,7 +383,7 @@ longitudinal( case lists:min([ShouldAlloc, Flow, Capacity]) of Available when Available > 0 -> - {Inc, Bucket2} = emqx_limiter_correction:add(Available, Bucket), + {Inc, Bucket2} = emqx_limiter_decimal:precisely_add(Available, Bucket), counters:add(Counter, Index, Inc), {Available, Buckets#{Name := Bucket2#{obtained := Obtained + Available}}}; @@ -419,7 +419,7 @@ maybe_adjust_root_tokens(#{root := #{rate := Rate} = Root, counter := Counter} = State; _ -> Available = erlang:min(Rate - Token, InFlow), - {Inc, Root2} = emqx_limiter_correction:add(Available, Root), + {Inc, Root2} = emqx_limiter_decimal:precisely_add(Available, Root), counters:add(Counter, ?ROOT_COUNTER_IDX, Inc), State#{root := Root2} end. @@ -473,7 +473,7 @@ dispatch_burst_to_buckets([Bucket | T], InFlow, Alloced, Buckets) -> index := Index, obtained := Obtained } = Bucket, - {Inc, Bucket2} = emqx_limiter_correction:add(InFlow, Bucket), + {Inc, Bucket2} = emqx_limiter_decimal:precisely_add(InFlow, Bucket), counters:add(Counter, Index, Inc), @@ -484,7 +484,7 @@ dispatch_burst_to_buckets([], _, Alloced, Buckets) -> -spec init_tree(emqx_limiter_schema:limiter_type()) -> state(). init_tree(Type) when is_atom(Type) -> - Cfg = emqx_limiter_schema:get_node_opts(Type), + Cfg = emqx_limiter_utils:get_node_opts(Type), init_tree(Type, Cfg). init_tree(Type, #{rate := Rate} = Cfg) -> @@ -515,7 +515,7 @@ do_add_bucket(Id, #{rate := Rate} = Cfg, #{buckets := Buckets} = State) -> undefined -> make_bucket(Id, Cfg, State); Bucket -> - Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_schema:calc_capacity(Cfg)}, + Bucket2 = Bucket#{rate := Rate, capacity := emqx_limiter_utils:calc_capacity(Cfg)}, State#{buckets := Buckets#{Id := Bucket2}} end. @@ -536,7 +536,7 @@ make_bucket( rate => Rate, obtained => Initial, correction => 0, - capacity => emqx_limiter_schema:calc_capacity(Cfg), + capacity => emqx_limiter_utils:calc_capacity(Cfg), counter => Counter, index => NewIndex }, @@ -601,7 +601,7 @@ create_limiter_without_client(Id, Type, BucketCfg) -> false -> {ok, emqx_htb_limiter:make_infinity_limiter()}; {ok, Bucket, RefCfg} -> - ClientCfg = emqx_limiter_schema:default_client_config(), + ClientCfg = emqx_limiter_utils:default_client_config(), create_limiter_with_ref(Bucket, ClientCfg, RefCfg); Error -> Error @@ -627,7 +627,7 @@ find_referenced_bucket(Id, Type, #{rate := Rate} = Cfg) when Rate =/= infinity - end; %% this is a node-level reference find_referenced_bucket(_Id, Type, _) -> - case emqx_limiter_schema:get_node_opts(Type) of + case emqx_limiter_utils:get_node_opts(Type) of #{rate := infinity} -> false; NodeCfg -> diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl index be9b62d01..8f45da561 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -86,7 +86,7 @@ init([]) -> %% Internal functions %%--================================================================== make_child(Type) -> - Cfg = emqx_limiter_schema:get_node_opts(Type), + Cfg = emqx_limiter_utils:get_node_opts(Type), make_child(Type, Cfg). make_child(Type, Cfg) -> diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_utils.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_utils.erl new file mode 100644 index 000000000..6e528188b --- /dev/null +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_utils.erl @@ -0,0 +1,158 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_limiter_utils). + +-export([ + calc_capacity/1, + extract_with_type/2, + get_listener_opts/1, + get_node_opts/1, + convert_node_opts/1, + default_client_config/0, + default_bucket_config/0 +]). + +-import(emqx_limiter_schema, [default_period/0, short_paths/0]). + +%%-------------------------------------------------------------------- +%% Configuration-related runtime code +%%-------------------------------------------------------------------- + +calc_capacity(#{rate := infinity}) -> + infinity; +calc_capacity(#{rate := Rate, burst := Burst}) -> + erlang:floor(1000 * Rate / default_period()) + Burst. + +%% @doc extract data of a type from the nested config +extract_with_type(_Type, undefined) -> + undefined; +extract_with_type(Type, #{client := ClientCfg} = BucketCfg) -> + BucketVal = maps:find(Type, BucketCfg), + ClientVal = maps:find(Type, ClientCfg), + merge_client_bucket(Type, ClientVal, BucketVal); +extract_with_type(Type, BucketCfg) -> + BucketVal = maps:find(Type, BucketCfg), + merge_client_bucket(Type, undefined, BucketVal). + +%% @doc get the limiter configuration from the listener setting +%% and compatible with the old version limiter schema +get_listener_opts(Conf) -> + Limiter = maps:get(limiter, Conf, undefined), + ShortPaths = maps:with(short_paths(), Conf), + get_listener_opts(Limiter, ShortPaths). + +get_listener_opts(Limiter, ShortPaths) when map_size(ShortPaths) =:= 0 -> + Limiter; +get_listener_opts(undefined, ShortPaths) -> + convert_listener_short_paths(ShortPaths); +get_listener_opts(Limiter, ShortPaths) -> + Shorts = convert_listener_short_paths(ShortPaths), + emqx_utils_maps:deep_merge(Limiter, Shorts). + +convert_listener_short_paths(ShortPaths) -> + DefBucket = default_bucket_config(), + DefClient = default_client_config(), + Fun = fun(Name, Rate, Acc) -> + Type = short_path_name_to_type(Name), + case Name of + max_conn_rate -> + Acc#{Type => DefBucket#{rate => Rate}}; + _ -> + Client = maps:get(client, Acc, #{}), + Acc#{client => Client#{Type => DefClient#{rate => Rate}}} + end + end, + maps:fold(Fun, #{}, ShortPaths). + +%% @doc get the node-level limiter configuration and compatible with the old version limiter schema +get_node_opts(Type) -> + Opts = emqx:get_config([limiter, Type], default_bucket_config()), + case type_to_short_path_name(Type) of + undefined -> + Opts; + Name -> + case emqx:get_config([limiter, Name], undefined) of + undefined -> + Opts; + Rate -> + Opts#{rate := Rate} + end + end. + +convert_node_opts(Conf) -> + DefBucket = default_bucket_config(), + ShorPaths = short_paths(), + Fun = fun + %% The `client` in the node options was deprecated + (client, _Value, Acc) -> + Acc; + (Name, Value, Acc) -> + case lists:member(Name, ShorPaths) of + true -> + Type = short_path_name_to_type(Name), + Acc#{Type => DefBucket#{rate => Value}}; + _ -> + Acc#{Name => Value} + end + end, + maps:fold(Fun, #{}, Conf). + +merge_client_bucket(Type, {ok, ClientVal}, {ok, BucketVal}) -> + #{Type => BucketVal, client => #{Type => ClientVal}}; +merge_client_bucket(Type, {ok, ClientVal}, _) -> + #{client => #{Type => ClientVal}}; +merge_client_bucket(Type, _, {ok, BucketVal}) -> + #{Type => BucketVal}; +merge_client_bucket(_, _, _) -> + undefined. + +short_path_name_to_type(max_conn_rate) -> + connection; +short_path_name_to_type(messages_rate) -> + messages; +short_path_name_to_type(bytes_rate) -> + bytes. + +type_to_short_path_name(connection) -> + max_conn_rate; +type_to_short_path_name(messages) -> + messages_rate; +type_to_short_path_name(bytes) -> + bytes_rate; +type_to_short_path_name(_) -> + undefined. + +%% Since the client configuration can be absent and be a undefined value, +%% but we must need some basic settings to control the behaviour of the limiter, +%% so here add this helper function to generate a default setting. +%% This is a temporary workaround until we found a better way to simplify. +default_client_config() -> + #{ + rate => infinity, + initial => 0, + low_watermark => 0, + burst => 0, + divisible => true, + max_retry_time => timer:hours(1), + failure_strategy => force + }. + +default_bucket_config() -> + #{ + rate => infinity, + burst => 0, + initial => 0 + }. diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index b1bb29159..964873e53 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -587,7 +587,7 @@ esockd_opts(ListenerId, Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), Opts2 = - case emqx_limiter_schema:extract_with_type(connection, Limiter) of + case emqx_limiter_utils:extract_with_type(connection, Limiter) of undefined -> Opts1; BucketCfg -> @@ -732,7 +732,7 @@ zone(Opts) -> maps:get(zone, Opts, undefined). limiter(Opts) -> - emqx_limiter_schema:get_listener_opts(Opts). + emqx_limiter_utils:get_listener_opts(Opts). add_limiter_bucket(_Id, undefined) -> ok; diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index fc9960c81..f414c3759 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -589,11 +589,11 @@ t_extract_with_type(_) -> (Type, Cfg) -> IsOnly(Type, Cfg) end, - ?assertEqual(undefined, emqx_limiter_schema:extract_with_type(messages, undefined)), + ?assertEqual(undefined, emqx_limiter_utils:extract_with_type(messages, undefined)), ?assert( Checker( messages, - emqx_limiter_schema:extract_with_type(messages, #{ + emqx_limiter_utils:extract_with_type(messages, #{ messages => #{rate => 1}, bytes => #{rate => 1} }) ) @@ -601,7 +601,7 @@ t_extract_with_type(_) -> ?assert( Checker( messages, - emqx_limiter_schema:extract_with_type(messages, #{ + emqx_limiter_utils:extract_with_type(messages, #{ messages => #{rate => 1}, bytes => #{rate => 1}, client => #{messages => #{rate => 2}} @@ -611,7 +611,7 @@ t_extract_with_type(_) -> ?assert( Checker( messages, - emqx_limiter_schema:extract_with_type(messages, #{ + emqx_limiter_utils:extract_with_type(messages, #{ client => #{messages => #{rate => 2}, bytes => #{rate => 1}} }) ) @@ -622,7 +622,7 @@ t_add_bucket(_) -> #{buckets := Buckets} = sys:get_state(emqx_limiter_server:whereis(bytes)), ?assertEqual(Size, maps:size(Buckets), Buckets) end, - DefBucket = emqx_limiter_schema:default_bucket_config(), + DefBucket = emqx_limiter_utils:default_bucket_config(), ?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, undefined)), Checker(0), ?assertEqual(ok, emqx_limiter_server:add_bucket(?FUNCTION_NAME, bytes, DefBucket)), @@ -765,7 +765,7 @@ t_esockd_htb_consume(_) -> t_node_short_paths(_) -> CfgStr = <<"limiter {max_conn_rate = \"1000\", messages_rate = \"100\", bytes_rate = \"10\"}">>, ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr), - Accessor = fun emqx_limiter_schema:get_node_opts/1, + Accessor = fun emqx_limiter_utils:get_node_opts/1, ?assertMatch(#{rate := 100.0}, Accessor(connection)), ?assertMatch(#{rate := 10.0}, Accessor(messages)), ?assertMatch(#{rate := 1.0}, Accessor(bytes)), @@ -776,7 +776,7 @@ t_compatibility_for_node_short_paths(_) -> CfgStr = <<"limiter {max_conn_rate = \"1000\", connection.rate = \"500\", bytes.rate = \"200\"}">>, ok = emqx_common_test_helpers:load_config(emqx_limiter_schema, CfgStr), - Accessor = fun emqx_limiter_schema:get_node_opts/1, + Accessor = fun emqx_limiter_utils:get_node_opts/1, ?assertMatch(#{rate := 100.0}, Accessor(connection)), ?assertMatch(#{rate := 20.0}, Accessor(bytes)). @@ -796,7 +796,7 @@ t_listener_short_paths(_) -> }, connection := #{rate := 100.0} }, - emqx_limiter_schema:get_listener_opts(ListenerOpt) + emqx_limiter_utils:get_listener_opts(ListenerOpt) ). t_compatibility_for_listener_short_paths(_) -> @@ -809,7 +809,7 @@ t_compatibility_for_listener_short_paths(_) -> #{ connection := #{rate := 100.0} }, - emqx_limiter_schema:get_listener_opts(ListenerOpt) + emqx_limiter_utils:get_listener_opts(ListenerOpt) ). t_no_limiter_for_listener(_) -> @@ -818,7 +818,7 @@ t_no_limiter_for_listener(_) -> ListenerOpt = emqx:get_config([listeners, tcp, default]), ?assertEqual( undefined, - emqx_limiter_schema:get_listener_opts(ListenerOpt) + emqx_limiter_utils:get_listener_opts(ListenerOpt) ). %%-------------------------------------------------------------------- @@ -1135,5 +1135,5 @@ parse_schema(ConfigString) -> ). default_client_config() -> - Conf = emqx_limiter_schema:default_client_config(), + Conf = emqx_limiter_utils:default_client_config(), Conf#{divisible := false, max_retry_time := timer:seconds(10)}.