diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 3bba67c52..45256c00b 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -334,7 +334,7 @@ make_retry_context(Fun, Diff) -> -spec try_restore(retry_context(Limiter), Limiter) -> Limiter when Limiter :: limiter(). try_restore(#{need := Need, diff := Diff}, - #{tokens := Tokens, capcacity := Capacity, bucket := Bucket} = Limiter) -> + #{tokens := Tokens, capacity := Capacity, bucket := Bucket} = Limiter) -> Back = Need - Diff, Tokens2 = erlang:min(Capacity, Back + Tokens), emqx_limiter_bucket_ref:try_restore(Back, Bucket), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl index 514145c88..4265e64ae 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_bucket_ref.erl @@ -83,7 +83,7 @@ try_restore(_, infinity) -> ok; try_restore(Inc, #{counter := Counter, index := Index}) -> case counters:get(Counter, Index) of - Tokens when Tokens < 0 -> + Tokens when Tokens =< 0 -> counters:add(Counter, Index, Inc); _ -> ok diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index b36bb958a..4f19e0a24 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -33,7 +33,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, format_status/2]). --export([ start_link/1, connect/2, info/2 +-export([ start_link/1, connect/2, info/1 , name/1, get_initial_val/1]). -type root() :: #{ rate := rate() %% number of tokens generated per period @@ -83,7 +83,7 @@ -type decimal() :: emqx_limiter_decimal:decimal(). -type index() :: pos_integer(). --define(CALL(Type, Msg), gen_server:call(name(Type), {?FUNCTION_NAME, Msg})). +-define(CALL(Type), gen_server:call(name(Type), ?FUNCTION_NAME)). -define(OVERLOAD_MIN_ALLOC, 0.3). %% minimum coefficient for overloaded limiter -export_type([index/0]). @@ -123,9 +123,9 @@ connect(Type, BucketName) when is_atom(BucketName) -> connect(Type, Names) -> connect(Type, maps:get(Type, Names, default)). --spec info(limiter_type(), atom()) -> term(). -info(Type, Info) -> - ?CALL(Type, Info). +-spec info(limiter_type()) -> state(). +info(Type) -> + ?CALL(Type). -spec name(limiter_type()) -> atom(). name(Type) -> @@ -183,6 +183,9 @@ init([Type]) -> {noreply, NewState :: term(), hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: term()} | {stop, Reason :: term(), NewState :: term()}. +handle_call(info, _From, State) -> + {reply, State, State}; + handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. @@ -375,11 +378,11 @@ maybe_burst(#{buckets := Buckets, index := Index, zone := Zone} = maps:get(Id, Nodes), case counters:get(Counter, Index) of - Any when Any =< 0 -> - Group = maps:get(Zone, Groups, []), - maps:put(Zone, [Id | Group], Groups); - _ -> - Groups + Any when Any =< 0 -> + Group = maps:get(Zone, Groups, []), + maps:put(Zone, [Id | Group], Groups); + _ -> + Groups end end, diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 630a1ffa5..60616c5d5 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -112,12 +112,63 @@ base_conf() -> %%-------------------------------------------------------------------- %% Test Cases Bucket Level %%-------------------------------------------------------------------- +t_consume(_) -> + Cfg = fun(Cfg) -> + Cfg#{rate := 100, + capacity := 100, + initial := 100, + max_retry_time := 1000, + failure_strategy := force} + end, + Case = fun() -> + Client = connect(default), + {ok, L2} = emqx_htb_limiter:consume(50, Client), + {ok, _L3} = emqx_htb_limiter:consume(150, L2) + end, + with_per_client(default, Cfg, Case). + +t_retry(_) -> + Cfg = fun(Cfg) -> + Cfg#{rate := 50, + capacity := 200, + initial := 0, + max_retry_time := 1000, + failure_strategy := force} + end, + Case = fun() -> + Client = connect(default), + {ok, Client} = emqx_htb_limiter:retry(Client), + {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client), + L3 = emqx_htb_limiter:set_retry(Retry, L2), + timer:sleep(500), + {ok, _L4} = emqx_htb_limiter:retry(L3) + end, + with_per_client(default, Cfg, Case). + +t_restore(_) -> + Cfg = fun(Cfg) -> + Cfg#{rate := 1, + capacity := 200, + initial := 50, + max_retry_time := 100, + failure_strategy := force} + end, + Case = fun() -> + Client = connect(default), + {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client), + timer:sleep(200), + {ok, L3} = emqx_htb_limiter:check(Retry, L2), + Avaiable = emqx_htb_limiter:available(L3), + ?assert(Avaiable >= 50) + end, + with_per_client(default, Cfg, Case). + t_max_retry_time(_) -> Cfg = fun(Cfg) -> - Cfg#{rate := 1, - capacity := 1, - max_retry_time := 500, - failure_strategy := drop} + Cfg#{rate := 1, + capacity := 1, + max_retry_time := 500, + failure_strategy := drop} end, Case = fun() -> Client = connect(default), @@ -186,6 +237,26 @@ t_infinity_client(_) -> end, with_bucket(default, Fun, Case). +t_try_restore_agg(_) -> + Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> + Aggr2 = Aggr#{rate := 1, + capacity := 200, + initial := 50}, + Cli2 = Cli#{rate := infinity, capacity := infinity, divisible := true, + max_retry_time := 100, failure_strategy := force}, + Bucket#{aggregated := Aggr2, + per_client := Cli2} + end, + Case = fun() -> + Client = connect(default), + {_, _, Retry, L2} = emqx_htb_limiter:check(150, Client), + timer:sleep(200), + {ok, L3} = emqx_htb_limiter:check(Retry, L2), + Avaiable = emqx_htb_limiter:available(L3), + ?assert(Avaiable >= 50) + end, + with_bucket(default, Fun, Case). + t_short_board(_) -> Fun = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> Aggr2 = Aggr#{rate := ?RATE("100/1s"), @@ -285,7 +356,7 @@ t_limit_zone_with_unlimit_bucket(_) -> t_burst_and_fairness(_) -> GlobalMod = fun(Cfg) -> Cfg#{burst := ?RATE("60/1s")} - end, + end, ZoneMod = fun(Cfg) -> Cfg#{rate := ?RATE("600/1s"), @@ -319,9 +390,50 @@ t_burst_and_fairness(_) -> [{b1, Bucket}, {b2, Bucket}], Case). +t_burst(_) -> + GlobalMod = fun(Cfg) -> + Cfg#{burst := ?RATE("60/1s")} + end, + + ZoneMod = fun(Cfg) -> + Cfg#{rate := ?RATE("60/1s"), + burst := ?RATE("60/1s")} + end, + + Bucket = fun(#{aggregated := Aggr, per_client := Cli} = Bucket) -> + Aggr2 = Aggr#{rate := ?RATE("500/1s"), + initial := 0, + capacity := 500}, + Cli2 = Cli#{rate := ?RATE("600/1s"), + capacity := 600, + divisible := true}, + Bucket#{aggregated := Aggr2, + per_client := Cli2} + end, + + Case = fun() -> + C1 = counters:new(1, []), + C2 = counters:new(1, []), + C3 = counters:new(1, []), + start_client(b1, ?NOW + 2000, C1, 20), + start_client(b2, ?NOW + 2000, C2, 30), + start_client(b3, ?NOW + 2000, C3, 30), + timer:sleep(2100), + + Total = lists:sum([counters:get(X, 1) || X <- [C1, C2, C3]]), + in_range(Total / 2, 30) + end, + + with_global(GlobalMod, + default, + ZoneMod, + [{b1, Bucket}, {b2, Bucket}, {b3, Bucket}], + Case). + + t_limit_global_with_unlimit_other(_) -> GlobalMod = fun(Cfg) -> - Cfg#{rate := ?RATE("600/1s")} + Cfg#{rate := ?RATE("600/1s")} end, ZoneMod = fun(Cfg) -> Cfg#{rate := infinity} end, @@ -483,14 +595,85 @@ t_zone_hunger_and_fair(_) -> [{b1, Bucket(z1, ?RATE("600/1s"))}, {b2, Bucket(z2, ?RATE("50/1s"))}], Case). +%%-------------------------------------------------------------------- +%% Test Cases container +%%-------------------------------------------------------------------- +t_new_container(_) -> + C1 = emqx_limiter_container:new(), + C2 = emqx_limiter_container:new([message_routing]), + C3 = emqx_limiter_container:update_by_name(message_routing, default, C1), + ?assertMatch(#{message_routing := _, + retry_ctx := undefined, + {retry, message_routing} := _}, C2), + ?assertMatch(#{message_routing := _, + retry_ctx := undefined, + {retry, message_routing} := _}, C3), + ok. + +t_check_container(_) -> + Cfg = fun(Cfg) -> + Cfg#{rate := ?RATE("1000/1s"), + initial := 1000, + capacity := 1000} + end, + Case = fun() -> + C1 = emqx_limiter_container:new([message_routing]), + {ok, C2} = emqx_limiter_container:check(1000, message_routing, C1), + {pause, Pause, C3} = emqx_limiter_container:check(1000, message_routing, C2), + timer:sleep(Pause), + {ok, C4} = emqx_limiter_container:retry(message_routing, C3), + Context = test, + C5 = emqx_limiter_container:set_retry_context(Context, C4), + RetryData = emqx_limiter_container:get_retry_context(C5), + ?assertEqual(Context, RetryData) + end, + with_per_client(default, Cfg, Case). + +%%-------------------------------------------------------------------- +%% Test Cases misc +%%-------------------------------------------------------------------- +t_limiter_manager(_) -> + {error, _} = emqx_limiter_manager:start_server(message_routing), + ignore = gen_server:call(emqx_limiter_manager, unexpected_call), + ok = gen_server:cast(emqx_limiter_manager, unexpected_cast), + erlang:send(erlang:whereis(emqx_limiter_manager), unexpected_info), + ok = emqx_limiter_manager:format_status(normal, ok), + ok. + +t_limiter_app(_) -> + try + _ = emqx_limiter_app:start(undefined, undefined) + catch _:_ -> + ok + end, + ok = emqx_limiter_app:stop(undefined), + ok. + +t_limiter_server(_) -> + State = emqx_limiter_server:info(message_routing), + ?assertMatch(#{root := _, + counter := _, + index := _, + zones := _, + buckets := _, + nodes := _, + type := message_routing}, State), + + Name = emqx_limiter_server:name(message_routing), + ignored = gen_server:call(Name, unexpected_call), + ok = gen_server:cast(Name, unexpected_cast), + erlang:send(erlang:whereis(Name), unexpected_info), + ok = emqx_limiter_server:format_status(normal, ok), + ok. + %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- start_client(Name, EndTime, Counter, Number) -> lists:foreach(fun(_) -> - spawn(fun() -> - start_client(Name, EndTime, Counter) - end) + spawn(fun() -> + start_client(Name, EndTime, Counter) + end) end, lists:seq(1, Number)). @@ -612,16 +795,18 @@ with_config(Path, Modifier, Case) -> emqx_config:put(Path, NewCfg), emqx_limiter_manager:restart_server(message_routing), timer:sleep(100), - DelayReturn - = try - Return = Case(), - fun() -> Return end - catch Type:Reason:Trace -> - fun() -> erlang:raise(Type, Reason, Trace) end - end, + DelayReturn = delay_return(Case), emqx_config:put(Path, Cfg), DelayReturn(). +delay_return(Case) -> + try + Return = Case(), + fun() -> Return end + catch Type:Reason:Trace -> + fun() -> erlang:raise(Type, Reason, Trace) end + end. + connect(Name) -> emqx_limiter_server:connect(message_routing, Name). @@ -636,7 +821,7 @@ print_average_rate(Counter, Second) -> PerSec = Cost / Second, ct:pal("Cost:~p PerSec:~p ~n", [Cost, PerSec]). -in_range(Val, Expected) when Val < Expected * 0.6 -> +in_range(Val, Expected) when Val < Expected * 0.5 -> ct:pal("Val:~p smaller than min bound", [Val]), false; in_range(Val, Expected) when Val > Expected * 1.8 -> @@ -663,6 +848,6 @@ apply_modifier(Name, Modifier, #{default := Template} = Cfg) -> apply_modifier(Pairs, #{default := Template}) -> Fun = fun({N, M}, Acc) -> - Acc#{N => M(Template)} + Acc#{N => M(Template)} end, lists:foldl(Fun, #{}, Pairs).