emqx/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUIT...

203 lines
7.0 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_dashboard_monitor_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_dashboard_SUITE, [auth_header_/0]).
-include_lib("eunit/include/eunit.hrl").
-include("emqx_dashboard.hrl").
-define(SERVER, "http://127.0.0.1:18083").
-define(BASE_PATH, "/api/v5").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([]),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite([]).
t_monitor_samplers_all(_Config) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
Size = mnesia:table_info(emqx_dashboard_monitor, size),
All = emqx_dashboard_monitor:samplers(all, infinity),
All2 = emqx_dashboard_monitor:samplers(),
?assert(erlang:length(All) == Size),
?assert(erlang:length(All2) == Size),
ok.
t_monitor_samplers_latest(_Config) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
Samplers = emqx_dashboard_monitor:samplers(node(), 2),
Latest = emqx_dashboard_monitor:samplers(node(), 1),
?assert(erlang:length(Samplers) == 2),
?assert(erlang:length(Latest) == 1),
?assert(hd(Latest) == lists:nth(2, Samplers)),
ok.
t_monitor_sampler_format(_Config) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
Latest = hd(emqx_dashboard_monitor:samplers(node(), 1)),
SamplerKeys = maps:keys(Latest),
[?assert(lists:member(SamplerName, SamplerKeys)) || SamplerName <- ?SAMPLER_LIST],
ok.
t_monitor_api(_) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, Samplers} = request(["monitor"], "latest=20"),
?assert(erlang:length(Samplers) >= 2),
Fun =
fun(Sampler) ->
Keys = [binary_to_atom(Key, utf8) || Key <- maps:keys(Sampler)],
[?assert(lists:member(SamplerName, Keys)) || SamplerName <- ?SAMPLER_LIST]
end,
[Fun(Sampler) || Sampler <- Samplers],
{ok, NodeSamplers} = request(["monitor", "nodes", node()]),
[Fun(NodeSampler) || NodeSampler <- NodeSamplers],
ok.
t_monitor_current_api(_) ->
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, Rate} = request(["monitor_current"]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
],
{ok, NodeRate} = request(["monitor_current", "nodes", node()]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), NodeRate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
],
ok.
t_monitor_current_api_live_connections(_) ->
process_flag(trap_exit, true),
ClientId = <<"live_conn_tests">>,
ClientId1 = <<"live_conn_tests1">>,
{ok, C} = emqtt:start_link([{clean_start, false}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
ok = emqtt:disconnect(C),
{ok, C1} = emqtt:start_link([{clean_start, true}, {clientid, ClientId1}]),
{ok, _} = emqtt:connect(C1),
ok = waiting_emqx_stats_and_monitor_update('live_connections.max'),
{ok, Rate} = request(["monitor_current"]),
?assertEqual(1, maps:get(<<"live_connections">>, Rate)),
?assertEqual(2, maps:get(<<"connections">>, Rate)),
%% clears
ok = emqtt:disconnect(C1),
{ok, C2} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]),
{ok, _} = emqtt:connect(C2),
ok = emqtt:disconnect(C2).
t_monitor_reset(_) ->
restart_monitor(),
{ok, Rate} = request(["monitor_current"]),
[
?assert(maps:is_key(atom_to_binary(Key, utf8), Rate))
|| Key <- maps:values(?DELTA_SAMPLER_RATE_MAP) ++ ?GAUGE_SAMPLER_LIST
],
timer:sleep(?DEFAULT_SAMPLE_INTERVAL * 2 * 1000 + 20),
{ok, Samplers} = request(["monitor"], "latest=1"),
?assertEqual(1, erlang:length(Samplers)),
ok.
t_monitor_api_error(_) ->
{error, {404, #{<<"code">> := <<"NOT_FOUND">>}}} =
request(["monitor", "nodes", 'emqx@127.0.0.2']),
{error, {404, #{<<"code">> := <<"NOT_FOUND">>}}} =
request(["monitor_current", "nodes", 'emqx@127.0.0.2']),
{error, {400, #{<<"code">> := <<"BAD_REQUEST">>}}} =
request(["monitor"], "latest=0"),
{error, {400, #{<<"code">> := <<"BAD_REQUEST">>}}} =
request(["monitor"], "latest=-1"),
ok.
request(Path) ->
request(Path, "").
request(Path, QS) ->
Url = url(Path, QS),
do_request_api(get, {Url, [auth_header_()]}).
url(Parts, QS) ->
case QS of
"" ->
?SERVER ++ filename:join([?BASE_PATH | Parts]);
_ ->
?SERVER ++ filename:join([?BASE_PATH | Parts]) ++ "?" ++ QS
end.
do_request_api(Method, Request) ->
ct:pal("Req ~p ~p~n", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return}} when
Code >= 200 andalso Code =< 299
->
ct:pal("Resp ~p ~p~n", [Code, Return]),
{ok, emqx_utils_json:decode(Return, [return_maps])};
{ok, {{"HTTP/1.1", Code, _}, _, Return}} ->
ct:pal("Resp ~p ~p~n", [Code, Return]),
{error, {Code, emqx_utils_json:decode(Return, [return_maps])}};
{error, Reason} ->
{error, Reason}
end.
restart_monitor() ->
OldMonitor = erlang:whereis(emqx_dashboard_monitor),
erlang:exit(OldMonitor, kill),
?assertEqual(ok, wait_new_monitor(OldMonitor, 10)).
wait_new_monitor(_OldMonitor, Count) when Count =< 0 -> timeout;
wait_new_monitor(OldMonitor, Count) ->
NewMonitor = erlang:whereis(emqx_dashboard_monitor),
case is_pid(NewMonitor) andalso NewMonitor =/= OldMonitor of
true ->
ok;
false ->
timer:sleep(100),
wait_new_monitor(OldMonitor, Count - 1)
end.
waiting_emqx_stats_and_monitor_update(WaitKey) ->
Self = self(),
meck:new(emqx_stats, [passthrough]),
meck:expect(
emqx_stats,
setstat,
fun(Stat, MaxStat, Val) ->
(Stat =:= WaitKey orelse MaxStat =:= WaitKey) andalso (Self ! updated),
meck:passthrough([Stat, MaxStat, Val])
end
),
receive
updated -> ok
after 5000 ->
error(waiting_emqx_stats_update_timeout)
end,
meck:unload([emqx_stats]),
%% manually call monitor update
_ = emqx_dashboard_monitor:current_rate(),
ok.