emqx/apps/emqx_limiter/test/emqx_limiter_SUITE.erl

273 lines
10 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_limiter_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-define(APP, emqx_limiter).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BASE_CONF, <<"""
emqx_limiter {
bytes_in {global = \"100KB/10s\"
zone.default = \"100kB/10s\"
zone.external = \"20kB/10s\"
bucket.tcp {zone = default
aggregated = \"100kB/10s,1Mb\"
per_client = \"100KB/10s,10Kb\"}
bucket.ssl {zone = external
aggregated = \"100kB/10s,1Mb\"
per_client = \"100KB/10s,10Kb\"}
}
message_in {global = \"100/10s\"
zone.default = \"100/10s\"
bucket.bucket1 {zone = default
aggregated = \"100/10s,1000\"
per_client = \"100/10s,100\"}
}
connection {global = \"100/10s\"
zone.default = \"100/10s\"
bucket.bucket1 {zone = default
aggregated = \"100/10s,100\"
per_client = \"100/10s,10\"
}
}
message_routing {global = \"100/10s\"
zone.default = \"100/10s\"
bucket.bucket1 {zone = default
aggregated = \"100/10s,100\"
per_client = \"100/10s,10\"
}
}
}""">>).
-define(LOGT(Format, Args), ct:pal("TEST_SUITE: " ++ Format, Args)).
-record(client_options, { interval :: non_neg_integer()
, per_cost :: non_neg_integer()
, type :: atom()
, bucket :: atom()
, lifetime :: non_neg_integer()
, rates :: list(tuple())
}).
-record(client_state, { client :: emqx_limiter_client:limiter()
, pid :: pid()
, got :: non_neg_integer()
, options :: #client_options{}}).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ok = emqx_config:init_load(emqx_limiter_schema, ?BASE_CONF),
emqx_common_test_helpers:start_apps([?APP]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([?APP]).
init_per_testcase(_TestCase, Config) ->
Config.
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_un_overload(_) ->
Conf = emqx:get_config([emqx_limiter]),
Conn = #{global => to_rate("infinity"),
zone => #{z1 => to_rate("1000/1s"),
z2 => to_rate("1000/1s")},
bucket => #{b1 => #{zone => z1,
aggregated => to_bucket_rate("100/1s, 500"),
per_client => to_bucket_rate("10/1s, 50")},
b2 => #{zone => z2,
aggregated => to_bucket_rate("500/1s, 500"),
per_client => to_bucket_rate("100/1s, infinity")
}}},
Conf2 = Conf#{connection => Conn},
emqx_config:put([emqx_limiter], Conf2),
{ok, _} = emqx_limiter_manager:restart_server(connection),
timer:sleep(200),
B1C = #client_options{interval = 100,
per_cost = 1,
type = connection,
bucket = b1,
lifetime = timer:seconds(3),
rates = [{fun erlang:'=<'/2, ["1000/1s", "100/1s"]},
{fun erlang:'=:='/2, ["10/1s"]}]},
B2C = #client_options{interval = 100,
per_cost = 10,
type = connection,
bucket = b2,
lifetime = timer:seconds(3),
rates = [{fun erlang:'=<'/2, ["1000/1s", "500/1s"]},
{fun erlang:'=:='/2, ["100/1s"]}]},
lists:foreach(fun(_) -> start_client(B1C) end,
lists:seq(1, 10)),
lists:foreach(fun(_) -> start_client(B2C) end,
lists:seq(1, 5)),
?assert(check_client_result(10 + 5)).
t_infinity(_) ->
Conf = emqx:get_config([emqx_limiter]),
Conn = #{global => to_rate("infinity"),
zone => #{z1 => to_rate("1000/1s"),
z2 => to_rate("infinity")},
bucket => #{b1 => #{zone => z1,
aggregated => to_bucket_rate("100/1s, infinity"),
per_client => to_bucket_rate("10/1s, 100")},
b2 => #{zone => z2,
aggregated => to_bucket_rate("infinity, 600"),
per_client => to_bucket_rate("100/1s, infinity")
}}},
Conf2 = Conf#{connection => Conn},
emqx_config:put([emqx_limiter], Conf2),
{ok, _} = emqx_limiter_manager:restart_server(connection),
timer:sleep(200),
B1C = #client_options{interval = 100,
per_cost = 1,
type = connection,
bucket = b1,
lifetime = timer:seconds(3),
rates = [{fun erlang:'=<'/2, ["1000/1s", "100/1s"]},
{fun erlang:'=:='/2, ["10/1s"]}]},
B2C = #client_options{interval = 100,
per_cost = 10,
type = connection,
bucket = b2,
lifetime = timer:seconds(3),
rates = [{fun erlang:'=:='/2, ["100/1s"]}]},
lists:foreach(fun(_) -> start_client(B1C) end,
lists:seq(1, 8)),
lists:foreach(fun(_) -> start_client(B2C) end,
lists:seq(1, 4)),
?assert(check_client_result(8 + 4)).
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
start_client(Opts) ->
Pid = self(),
erlang:spawn(fun() -> enter_client(Opts, Pid) end).
enter_client(#client_options{type = Type,
bucket = Bucket,
lifetime = Lifetime} = Opts,
Pid) ->
erlang:send_after(Lifetime, self(), stop),
erlang:send(self(), consume),
Client = emqx_limiter_server:connect(Type, Bucket),
client_loop(#client_state{client = Client,
pid = Pid,
got = 0,
options = Opts}).
client_loop(#client_state{client = Client,
got = Got,
pid = Pid,
options = #client_options{interval = Interval,
per_cost = PerCost,
lifetime = Lifetime,
rates = Rates}} = State) ->
receive
consume ->
case emqx_limiter_client:consume(PerCost, Client) of
{ok, Client2} ->
erlang:send_after(Interval, self(), consume),
client_loop(State#client_state{client = Client2,
got = Got + PerCost});
{pause, MS, Client2} ->
erlang:send_after(MS, self(), {resume, erlang:system_time(millisecond)}),
client_loop(State#client_state{client = Client2})
end;
stop ->
Rate = Got * emqx_limiter_schema:minimum_period() / Lifetime,
?LOGT("Got:~p, Rate is:~p Checks:~p~n", [Got, Rate, Rate]),
Check = check_rates(Rate, Rates),
erlang:send(Pid, {client, Check});
{resume, Begin} ->
case emqx_limiter_client:consume(PerCost, Client) of
{ok, Client2} ->
Now = erlang:system_time(millisecond),
Diff = erlang:max(0, Interval - (Now - Begin)),
erlang:send_after(Diff, self(), consume),
client_loop(State#client_state{client = Client2,
got = Got + PerCost});
{pause, MS, Client2} ->
erlang:send_after(MS, self(), {resume, Begin}),
client_loop(State#client_state{client = Client2})
end
end.
check_rates(Rate, [{Fun, Rates} | T]) ->
case lists:all(fun(E) -> Fun(Rate, to_rate(E)) end, Rates) of
true ->
check_rates(Rate, T);
false ->
false
end;
check_rates(_, _) ->
true.
check_client_result(0) ->
true;
check_client_result(N) ->
?LOGT("check_client_result:~p~n", [N]),
receive
{client, true} ->
check_client_result(N - 1);
{client, false} ->
false;
Any ->
?LOGT(">>>> other:~p~n", [Any])
after 3500 ->
?LOGT(">>>> timeout~n", []),
false
end.
to_rate(Str) ->
{ok, Rate} = emqx_limiter_schema:to_rate(Str),
Rate.
to_bucket_rate(Str) ->
{ok, Result} = emqx_limiter_schema:to_bucket_rate(Str),
Result.