150 lines
4.6 KiB
Erlang
150 lines
4.6 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_slow_subs_SUITE).
|
|
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include("include/emqx_mqtt.hrl").
|
|
-include_lib("include/emqx.hrl").
|
|
-define(LANTENCY, 101).
|
|
|
|
%-define(LOGT(Format, Args), ct:pal(Format, Args)).
|
|
|
|
-define(TOPK_TAB, emqx_slow_subs_topk).
|
|
-define(NOW, erlang:system_time(millisecond)).
|
|
|
|
all() ->
|
|
[ {group, whole}
|
|
, {group, internal}
|
|
, {group, response}
|
|
].
|
|
|
|
groups() ->
|
|
Cases = emqx_ct:all(?MODULE),
|
|
[ {whole, [], Cases}
|
|
, {internal, [], Cases}
|
|
, {response, [], Cases}
|
|
].
|
|
|
|
init_per_suite(Config) ->
|
|
emqx_ct_helpers:start_apps([emqx]),
|
|
Config.
|
|
|
|
end_per_suite(Config) ->
|
|
emqx_ct_helpers:stop_apps([emqx]),
|
|
Config.
|
|
|
|
init_per_testcase(_, Config) ->
|
|
Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
|
|
emqx_mod_slow_subs:load(base_conf(Group)),
|
|
Config.
|
|
|
|
end_per_testcase(_, _) ->
|
|
emqx_mod_slow_subs:unload([]),
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Test Cases
|
|
%%--------------------------------------------------------------------
|
|
t_log_and_pub(Config) ->
|
|
%% Sub topic first
|
|
Subs = [{<<"/test1/+">>, ?QOS_1}, {<<"/test2/+">>, ?QOS_2}],
|
|
Clients = start_client(Subs, Config),
|
|
timer:sleep(1500),
|
|
Now = ?NOW,
|
|
|
|
%% publish
|
|
lists:foreach(fun(I) ->
|
|
Topic = list_to_binary(io_lib:format("/test1/~p", [I])),
|
|
Msg = emqx_message:make(undefined, ?QOS_1, Topic, <<"Hello">>),
|
|
emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
|
|
end,
|
|
lists:seq(1, 10)),
|
|
|
|
lists:foreach(fun(I) ->
|
|
Topic = list_to_binary(io_lib:format("/test2/~p", [I])),
|
|
Msg = emqx_message:make(undefined, ?QOS_2, Topic, <<"Hello">>),
|
|
emqx:publish(Msg#message{timestamp = Now - ?LANTENCY})
|
|
end,
|
|
lists:seq(1, 10)),
|
|
|
|
timer:sleep(2000),
|
|
Size = ets:info(?TOPK_TAB, size),
|
|
%% some time record maybe delete due to it expired or the ets size exceeds 5 due to race conditions
|
|
?assert(Size =< 8 andalso Size >= 3,
|
|
unicode:characters_to_binary(io_lib:format("size is :~p~n", [Size]))),
|
|
|
|
?assert(
|
|
lists:all(
|
|
fun(#{timespan := Ts}) ->
|
|
Ts >= 101 andalso Ts < ?NOW - Now
|
|
end,
|
|
emqx_slow_subs_api:get_history()
|
|
)
|
|
),
|
|
|
|
timer:sleep(3000),
|
|
?assert(ets:info(?TOPK_TAB, size) =:= 0),
|
|
[Client ! stop || Client <- Clients],
|
|
ok.
|
|
base_conf(Type) ->
|
|
[ {threshold, 100}
|
|
, {top_k_num, 5}
|
|
, {expire_interval, timer:seconds(3)}
|
|
, {stats_type, Type}
|
|
].
|
|
|
|
start_client(Subs, Config) ->
|
|
[spawn(fun() -> client(I, Subs, Config) end) || I <- lists:seq(1, 10)].
|
|
|
|
client(I, Subs, Config) ->
|
|
Group = proplists:get_value(name, proplists:get_value(tc_group_properties, Config)),
|
|
ConnOptions = make_conn_options(Group, I),
|
|
{ok, C} = emqtt:start_link(ConnOptions),
|
|
{ok, _} = emqtt:connect(C),
|
|
|
|
Len = erlang:length(Subs),
|
|
Sub = lists:nth(I rem Len + 1, Subs),
|
|
_ = emqtt:subscribe(C, Sub),
|
|
|
|
receive
|
|
stop ->
|
|
ok
|
|
end.
|
|
|
|
try_receive(Acc) ->
|
|
receive
|
|
{deliver, _, #message{payload = Payload}} ->
|
|
#{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]),
|
|
try_receive([length(Logs) | Acc])
|
|
after 500 ->
|
|
Acc
|
|
end.
|
|
|
|
make_conn_options(response, I) ->
|
|
[ {msg_handler,
|
|
#{publish => fun(_) -> timer:sleep(50) end,
|
|
disconnected => fun(_) -> ok end}}
|
|
| make_conn_options(whole, I)];
|
|
make_conn_options(_, I) ->
|
|
[{host, "localhost"},
|
|
{clientid, io_lib:format("slow_subs_~p", [I])},
|
|
{username, <<"plain">>},
|
|
{password, <<"plain">>}].
|