emqx/lib-ce/emqx_modules/test/emqx_st_statistics_SUITE.erl

113 lines
3.4 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("include/emqx.hrl").
%-define(LOGT(Format, Args), ct:pal(Format, Args)).
-define(LOG_TAB, emqx_st_statistics_log).
-define(TOPK_TAB, emqx_st_statistics_topk).
-define(NOW, erlang:system_time(millisecond)).
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx]),
Config.
end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx]),
Config.
init_per_testcase(_, Config) ->
emqx_mod_st_statistics:load(base_conf()),
Config.
end_per_testcase(_, _) ->
emqx_mod_st_statistics:unload(undefined),
ok.
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_log_and_pub(_) ->
%% Sub topic first
SubBase = "/test",
emqx:subscribe("$slow_topics"),
Clients = start_client(SubBase),
timer:sleep(1000),
Now = ?NOW,
%% publish
?assert(ets:info(?LOG_TAB, size) =:= 0),
lists:foreach(fun(I) ->
Topic = list_to_binary(io_lib:format("~s~p", [SubBase, I])),
Msg = emqx_message:make(Topic, <<"Hello">>),
emqx:publish(Msg#message{timestamp = Now - 1000})
end,
lists:seq(1, 10)),
timer:sleep(2400),
?assert(ets:info(?LOG_TAB, size) =:= 0),
?assert(ets:info(?TOPK_TAB, size) =:= 3),
try_receive(3),
try_receive(2),
[Client ! stop || Client <- Clients],
ok.
base_conf() ->
[{top_k_num, 3},
{threshold_time, 10},
{notice_qos, 0},
{notice_batch_size, 3},
{notice_topic,"$slow_topics"},
{time_window, 2000},
{max_log_num, 5}].
start_client(Base) ->
[spawn(fun() ->
Topic = list_to_binary(io_lib:format("~s~p", [Base, I])),
client(Topic)
end)
|| I <- lists:seq(1, 10)].
client(Topic) ->
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, Topic},
{username, <<"plain">>},
{password, <<"plain">>}]),
{ok, _} = emqtt:connect(C),
{ok, _, _} = emqtt:subscribe(C, Topic),
receive
stop ->
ok
end.
try_receive(L) ->
receive
{deliver, _, #message{payload = Payload}} ->
#{<<"logs">> := Logs} = emqx_json:decode(Payload, [return_maps]),
?assertEqual(length(Logs), L)
after 500 ->
?assert(false)
end.