Merge pull request #9994 from sstrigler/EMQX-3688-emqx-mgmt

emqx mgmt code coverage and cleanup
This commit is contained in:
Stefan Strigler 2023-02-22 14:30:36 +01:00 committed by GitHub
commit 0cfa5e2ce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 472 additions and 78 deletions

View File

@ -22,6 +22,8 @@
-export([
all/1,
init_per_testcase/3,
end_per_testcase/3,
boot_modules/1,
start_apps/1,
start_apps/2,
@ -150,6 +152,19 @@ all(Suite) ->
string:substr(atom_to_list(F), 1, 2) == "t_"
]).
init_per_testcase(Module, TestCase, Config) ->
case erlang:function_exported(Module, TestCase, 2) of
true -> Module:TestCase(init, Config);
false -> Config
end.
end_per_testcase(Module, TestCase, Config) ->
case erlang:function_exported(Module, TestCase, 2) of
true -> Module:TestCase('end', Config);
false -> ok
end,
Config.
%% set emqx app boot modules
-spec boot_modules(all | list(atom())) -> ok.
boot_modules(Mods) ->

View File

@ -16,4 +16,4 @@
-define(MANAGEMENT_SHARD, emqx_management_shard).
-define(MAX_ROW_LIMIT, 100).
-define(DEFAULT_ROW_LIMIT, 100).

View File

@ -21,8 +21,6 @@
-elvis([{elvis_style, god_modules, disable}]).
-include_lib("stdlib/include/qlc.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
%% Nodes and Brokers API
-export([
@ -71,8 +69,6 @@
list_subscriptions/1,
list_subscriptions_via_topic/2,
list_subscriptions_via_topic/3,
lookup_subscriptions/1,
lookup_subscriptions/2,
do_list_subscriptions/0
]).
@ -105,12 +101,10 @@
%% Common Table API
-export([
max_row_limit/0,
default_row_limit/0,
vm_stats/0
]).
-define(APP, emqx_management).
-elvis([{elvis_style, god_modules, disable}]).
%%--------------------------------------------------------------------
@ -162,7 +156,7 @@ node_info(Nodes) ->
emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)).
stopped_node_info(Node) ->
#{name => Node, node_status => 'stopped'}.
{Node, #{node => Node, node_status => 'stopped'}}.
vm_stats() ->
Idle =
@ -194,8 +188,13 @@ lookup_broker(Node) ->
Broker.
broker_info() ->
Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]),
Info#{node => node(), otp_release => otp_rel(), node_status => 'Running'}.
Info = lists:foldl(fun convert_broker_info/2, #{}, emqx_sys:info()),
Info#{node => node(), otp_release => otp_rel(), node_status => 'running'}.
convert_broker_info({uptime, Uptime}, M) ->
M#{uptime => emqx_datetime:human_readable_duration_string(Uptime)};
convert_broker_info({K, V}, M) ->
M#{K => iolist_to_binary(V)}.
broker_info(Nodes) ->
emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)).
@ -265,7 +264,7 @@ lookup_client({username, Username}, FormatFun) ->
|| Node <- mria_mnesia:running_nodes()
]).
lookup_client(Node, Key, {M, F}) ->
lookup_client(Node, Key, FormatFun) ->
case unwrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of
{error, Err} ->
{error, Err};
@ -273,18 +272,23 @@ lookup_client(Node, Key, {M, F}) ->
lists:map(
fun({Chan, Info0, Stats}) ->
Info = Info0#{node => Node},
M:F({Chan, Info, Stats})
maybe_format(FormatFun, {Chan, Info, Stats})
end,
L
)
end.
kickout_client({ClientID, FormatFun}) ->
case lookup_client({clientid, ClientID}, FormatFun) of
maybe_format(undefined, A) ->
A;
maybe_format({M, F}, A) ->
M:F(A).
kickout_client(ClientId) ->
case lookup_client({clientid, ClientId}, undefined) of
[] ->
{error, not_found};
_ ->
Results = [kickout_client(Node, ClientID) || Node <- mria_mnesia:running_nodes()],
Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
check_results(Results)
end.
@ -295,17 +299,22 @@ list_authz_cache(ClientId) ->
call_client(ClientId, list_authz_cache).
list_client_subscriptions(ClientId) ->
Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
Filter =
fun
({error, _}) ->
false;
({_Node, List}) ->
erlang:is_list(List) andalso 0 < erlang:length(List)
end,
case lists:filter(Filter, Results) of
[] -> [];
[Result | _] -> Result
case lookup_client({clientid, ClientId}, undefined) of
[] ->
{error, not_found};
_ ->
Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
Filter =
fun
({error, _}) ->
false;
({_Node, List}) ->
erlang:is_list(List) andalso 0 < erlang:length(List)
end,
case lists:filter(Filter, Results) of
[] -> [];
[Result | _] -> Result
end
end.
client_subscriptions(Node, ClientId) ->
@ -388,17 +397,11 @@ call_client(Node, ClientId, Req) ->
%% Subscriptions
%%--------------------------------------------------------------------
-spec do_list_subscriptions() -> [map()].
-spec do_list_subscriptions() -> no_return().
do_list_subscriptions() ->
case check_row_limit([mqtt_subproperty]) of
false ->
throw(max_row_limit);
ok ->
[
#{topic => Topic, clientid => ClientId, options => Options}
|| {{Topic, ClientId}, Options} <- ets:tab2list(mqtt_subproperty)
]
end.
%% [FIXME] Add function to `emqx_broker` that returns list of subscriptions
%% and either redirect from here or bpapi directly (EMQX-8993).
throw(not_implemented).
list_subscriptions(Node) ->
unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)).
@ -415,12 +418,6 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) ->
Result -> M:F(Result)
end.
lookup_subscriptions(ClientId) ->
lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
lookup_subscriptions(Node, ClientId) ->
unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)).
%%--------------------------------------------------------------------
%% PubSub
%%--------------------------------------------------------------------
@ -556,24 +553,11 @@ unwrap_rpc(Res) ->
otp_rel() ->
iolist_to_binary([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
check_row_limit(Tables) ->
check_row_limit(Tables, max_row_limit()).
check_row_limit([], _Limit) ->
ok;
check_row_limit([Tab | Tables], Limit) ->
case table_size(Tab) > Limit of
true -> false;
false -> check_row_limit(Tables, Limit)
end.
check_results(Results) ->
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> unwrap_rpc(lists:last(Results))
end.
max_row_limit() ->
?MAX_ROW_LIMIT.
table_size(Tab) -> ets:info(Tab, size).
default_row_limit() ->
?DEFAULT_ROW_LIMIT.

View File

@ -98,8 +98,8 @@ count(Table) ->
page(Params) ->
maps:get(<<"page">>, Params, 1).
limit(Params) ->
maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit()).
limit(Params) when is_map(Params) ->
maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()).
%%--------------------------------------------------------------------
%% Node Query
@ -683,7 +683,7 @@ paginate_test_() ->
Size = 1000,
MyLimit = 10,
ets:insert(?MODULE, [{I, foo} || I <- lists:seq(1, Size)]),
DefaultLimit = emqx_mgmt:max_row_limit(),
DefaultLimit = emqx_mgmt:default_row_limit(),
NoParamsResult = paginate(?MODULE, #{}, {?MODULE, paginate_test_format}),
PaginateResults = [
paginate(

View File

@ -274,11 +274,10 @@ schema("/clients/:clientid/subscriptions") ->
responses => #{
200 => hoconsc:mk(
hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)), #{}
),
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
)
%% returns [] if client not existed in cluster
%404 => emqx_dashboard_swagger:error_codes(
% ['CLIENTID_NOT_FOUND'], <<"Client ID not found">>
%)
}
}
};
@ -599,6 +598,8 @@ unsubscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfo
subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
case emqx_mgmt:list_client_subscriptions(ClientID) of
{error, not_found} ->
{404, ?CLIENTID_NOT_FOUND};
[] ->
{200, []};
{Node, Subs} ->
@ -677,7 +678,7 @@ lookup(#{clientid := ClientID}) ->
end.
kickout(#{clientid := ClientID}) ->
case emqx_mgmt:kickout_client({ClientID, ?FORMAT_FUN}) of
case emqx_mgmt:kickout_client(ClientID) of
{error, not_found} ->
{404, ?CLIENTID_NOT_FOUND};
_ ->

View File

@ -302,7 +302,7 @@ page_params() ->
name => limit,
in => query,
description => <<"Page size">>,
schema => #{type => integer, default => emqx_mgmt:max_row_limit()}
schema => #{type => integer, default => emqx_mgmt:default_row_limit()}
}
].

View File

@ -0,0 +1,387 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-export([ident/1]).
-define(FORMATFUN, {?MODULE, ident}).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
Config.
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
init_per_testcase(TestCase, Config) ->
meck:expect(mria_mnesia, running_nodes, 0, [node()]),
emqx_common_test_helpers:init_per_testcase(?MODULE, TestCase, Config).
end_per_testcase(TestCase, Config) ->
meck:unload(mria_mnesia),
emqx_common_test_helpers:end_per_testcase(?MODULE, TestCase, Config).
t_list_nodes(init, Config) ->
meck:expect(
mria_mnesia,
cluster_nodes,
fun
(running) -> [node()];
(stopped) -> ['stopped@node']
end
),
Config;
t_list_nodes('end', _Config) ->
ok.
t_list_nodes(_) ->
NodeInfos = emqx_mgmt:list_nodes(),
Node = node(),
?assertMatch(
[
{Node, #{node := Node, node_status := 'running'}},
{'stopped@node', #{node := 'stopped@node', node_status := 'stopped'}}
],
NodeInfos
).
t_lookup_node(init, Config) ->
meck:new(os, [passthrough, unstick, no_link]),
OsType = os:type(),
meck:expect(os, type, 0, {win32, winME}),
[{os_type, OsType} | Config];
t_lookup_node('end', Config) ->
%% We need to restore the original behavior so that rebar3 doesn't crash. If
%% we'd `meck:unload(os)` or not set `no_link` then `ct` crashes calling
%% `os` with "The code server called the unloaded module `os'".
OsType = ?config(os_type, Config),
meck:expect(os, type, 0, OsType),
ok.
t_lookup_node(_) ->
Node = node(),
?assertMatch(
#{node := Node, node_status := 'running', memory_total := 0},
emqx_mgmt:lookup_node(node())
),
?assertMatch(
{error, _},
emqx_mgmt:lookup_node('fake@nohost')
),
ok.
t_list_brokers(_) ->
Node = node(),
?assertMatch(
[{Node, #{node := Node, node_status := running, uptime := _}}],
emqx_mgmt:list_brokers()
).
t_lookup_broker(_) ->
Node = node(),
?assertMatch(
#{node := Node, node_status := running, uptime := _},
emqx_mgmt:lookup_broker(Node)
).
t_get_metrics(_) ->
Metrics = emqx_mgmt:get_metrics(),
?assert(maps:size(Metrics) > 0),
?assertMatch(
Metrics, maps:from_list(emqx_mgmt:get_metrics(node()))
).
t_lookup_client(init, Config) ->
setup_clients(Config);
t_lookup_client('end', Config) ->
disconnect_clients(Config).
t_lookup_client(_Config) ->
[{Chan, Info, Stats}] = emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN),
?assertEqual(
[{Chan, Info, Stats}],
emqx_mgmt:lookup_client({username, <<"user1">>}, ?FORMATFUN)
),
?assertEqual([], emqx_mgmt:lookup_client({clientid, <<"notfound">>}, ?FORMATFUN)),
meck:expect(mria_mnesia, running_nodes, 0, [node(), 'fake@nonode']),
?assertMatch(
[_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN)
).
t_kickout_client(init, Config) ->
process_flag(trap_exit, true),
setup_clients(Config);
t_kickout_client('end', _Config) ->
ok.
t_kickout_client(Config) ->
[C | _] = ?config(clients, Config),
ok = emqx_mgmt:kickout_client(<<"client1">>),
receive
{'EXIT', C, Reason} ->
?assertEqual({shutdown, tcp_closed}, Reason);
Foo ->
error({unexpected, Foo})
after 1000 ->
error(timeout)
end,
?assertEqual({error, not_found}, emqx_mgmt:kickout_client(<<"notfound">>)).
t_list_authz_cache(init, Config) ->
setup_clients(Config);
t_list_authz_cache('end', Config) ->
disconnect_clients(Config).
t_list_authz_cache(_) ->
?assertNotMatch({error, _}, emqx_mgmt:list_authz_cache(<<"client1">>)),
?assertMatch({error, not_found}, emqx_mgmt:list_authz_cache(<<"notfound">>)).
t_list_client_subscriptions(init, Config) ->
setup_clients(Config);
t_list_client_subscriptions('end', Config) ->
disconnect_clients(Config).
t_list_client_subscriptions(Config) ->
[Client | _] = ?config(clients, Config),
?assertEqual([], emqx_mgmt:list_client_subscriptions(<<"client1">>)),
emqtt:subscribe(Client, <<"t/#">>),
?assertMatch({_, [{<<"t/#">>, _Opts}]}, emqx_mgmt:list_client_subscriptions(<<"client1">>)),
?assertEqual({error, not_found}, emqx_mgmt:list_client_subscriptions(<<"notfound">>)).
t_clean_cache(init, Config) ->
setup_clients(Config);
t_clean_cache('end', Config) ->
disconnect_clients(Config).
t_clean_cache(_Config) ->
?assertNotMatch(
{error, _},
emqx_mgmt:clean_authz_cache(<<"client1">>)
),
?assertNotMatch(
{error, _},
emqx_mgmt:clean_authz_cache_all()
),
?assertNotMatch(
{error, _},
emqx_mgmt:clean_pem_cache_all()
),
meck:expect(mria_mnesia, running_nodes, 0, [node(), 'fake@nonode']),
?assertMatch(
{error, [{'fake@nonode', {error, _}}]},
emqx_mgmt:clean_authz_cache_all()
),
?assertMatch(
{error, [{'fake@nonode', {error, _}}]},
emqx_mgmt:clean_pem_cache_all()
).
t_set_client_props(init, Config) ->
setup_clients(Config);
t_set_client_props('end', Config) ->
disconnect_clients(Config).
t_set_client_props(_Config) ->
?assertEqual(
% [FIXME] not implemented at this point?
ignored,
emqx_mgmt:set_ratelimit_policy(<<"client1">>, foo)
),
?assertEqual(
{error, not_found},
emqx_mgmt:set_ratelimit_policy(<<"notfound">>, foo)
),
?assertEqual(
% [FIXME] not implemented at this point?
ignored,
emqx_mgmt:set_quota_policy(<<"client1">>, foo)
),
?assertEqual(
{error, not_found},
emqx_mgmt:set_quota_policy(<<"notfound">>, foo)
),
?assertEqual(
ok,
emqx_mgmt:set_keepalive(<<"client1">>, 3600)
),
?assertMatch(
{error, _},
emqx_mgmt:set_keepalive(<<"client1">>, true)
),
?assertEqual(
{error, not_found},
emqx_mgmt:set_keepalive(<<"notfound">>, 3600)
),
ok.
t_list_subscriptions_via_topic(init, Config) ->
setup_clients(Config);
t_list_subscriptions_via_topic('end', Config) ->
disconnect_clients(Config).
t_list_subscriptions_via_topic(Config) ->
[Client | _] = ?config(clients, Config),
?assertEqual([], emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN)),
emqtt:subscribe(Client, <<"t/#">>),
?assertMatch(
[{{<<"t/#">>, _SubPid}, _Opts}],
emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN)
).
t_pubsub_api(init, Config) ->
setup_clients(Config);
t_pubsub_api('end', Config) ->
disconnect_clients(Config).
-define(TT(Topic), {Topic, #{qos => 0}}).
t_pubsub_api(Config) ->
[Client | _] = ?config(clients, Config),
?assertEqual([], emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN)),
?assertMatch(
{subscribe, _, _},
emqx_mgmt:subscribe(<<"client1">>, [?TT(<<"t/#">>), ?TT(<<"t1/#">>), ?TT(<<"t2/#">>)])
),
timer:sleep(100),
?assertMatch(
[{{<<"t/#">>, _SubPid}, _Opts}],
emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN)
),
Message = emqx_message:make(?MODULE, 0, <<"t/foo">>, <<"helloworld">>, #{}, #{}),
emqx_mgmt:publish(Message),
Recv =
receive
{publish, #{client_pid := Client, payload := <<"helloworld">>}} ->
ok
after 100 ->
timeout
end,
?assertEqual(ok, Recv),
?assertEqual({error, channel_not_found}, emqx_mgmt:subscribe(<<"notfound">>, [?TT(<<"t/#">>)])),
?assertNotMatch({error, _}, emqx_mgmt:unsubscribe(<<"client1">>, <<"t/#">>)),
?assertEqual({error, channel_not_found}, emqx_mgmt:unsubscribe(<<"notfound">>, <<"t/#">>)),
Node = node(),
?assertMatch(
{Node, [{<<"t1/#">>, _}, {<<"t2/#">>, _}]},
emqx_mgmt:list_client_subscriptions(<<"client1">>)
),
?assertMatch(
{unsubscribe, [{<<"t1/#">>, _}, {<<"t2/#">>, _}]},
emqx_mgmt:unsubscribe_batch(<<"client1">>, [<<"t1/#">>, <<"t2/#">>])
),
timer:sleep(100),
?assertMatch([], emqx_mgmt:list_client_subscriptions(<<"client1">>)),
?assertEqual(
{error, channel_not_found},
emqx_mgmt:unsubscribe_batch(<<"notfound">>, [<<"t1/#">>, <<"t2/#">>])
).
t_alarms(init, Config) ->
[
emqx_mgmt:deactivate(Node, Name)
|| {Node, ActiveAlarms} <- emqx_mgmt:get_alarms(activated), #{name := Name} <- ActiveAlarms
],
emqx_mgmt:delete_all_deactivated_alarms(),
Config;
t_alarms('end', Config) ->
Config.
t_alarms(_) ->
Node = node(),
?assertEqual(
[{node(), []}],
emqx_mgmt:get_alarms(all)
),
emqx_alarm:activate(foo),
?assertMatch(
[{Node, [#{name := foo, activated := true, duration := _}]}],
emqx_mgmt:get_alarms(all)
),
emqx_alarm:activate(bar),
?assertMatch(
[{Node, [#{name := foo, activated := true}, #{name := bar, activated := true}]}],
sort_alarms(emqx_mgmt:get_alarms(all))
),
?assertEqual(
ok,
emqx_mgmt:deactivate(node(), bar)
),
?assertMatch(
[{Node, [#{name := foo, activated := true}, #{name := bar, activated := false}]}],
sort_alarms(emqx_mgmt:get_alarms(all))
),
?assertMatch(
[{Node, [#{name := foo, activated := true}]}],
emqx_mgmt:get_alarms(activated)
),
?assertMatch(
[{Node, [#{name := bar, activated := false}]}],
emqx_mgmt:get_alarms(deactivated)
),
?assertEqual(
[ok],
emqx_mgmt:delete_all_deactivated_alarms()
),
?assertMatch(
[{Node, [#{name := foo, activated := true}]}],
emqx_mgmt:get_alarms(all)
),
?assertEqual(
{error, not_found},
emqx_mgmt:deactivate(node(), bar)
).
t_banned(_) ->
Banned = #{
who => {clientid, <<"TestClient">>},
by => <<"banned suite">>,
reason => <<"test">>,
at => erlang:system_time(second),
until => erlang:system_time(second) + 1
},
?assertMatch(
{ok, _},
emqx_mgmt:create_banned(Banned)
),
?assertEqual(
ok,
emqx_mgmt:delete_banned({clientid, <<"TestClient">>})
).
%%% helpers
ident(Arg) ->
Arg.
sort_alarms([{Node, Alarms}]) ->
[{Node, lists:sort(fun(#{activate_at := A}, #{activate_at := B}) -> A < B end, Alarms)}].
setup_clients(Config) ->
{ok, C} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"user1">>}]),
{ok, _} = emqtt:connect(C),
[{clients, [C]} | Config].
disconnect_clients(Config) ->
Clients = ?config(clients, Config),
lists:foreach(fun emqtt:disconnect/1, Clients).

View File

@ -62,5 +62,5 @@ get_alarms(AssertCount, Activated) ->
Limit = maps:get(<<"limit">>, Meta),
Count = maps:get(<<"count">>, Meta),
?assertEqual(Page, 1),
?assertEqual(Limit, emqx_mgmt:max_row_limit()),
?assertEqual(Limit, emqx_mgmt:default_row_limit()),
?assert(Count >= AssertCount).

View File

@ -64,7 +64,7 @@ t_clients(_) ->
ClientsLimit = maps:get(<<"limit">>, ClientsMeta),
ClientsCount = maps:get(<<"count">>, ClientsMeta),
?assertEqual(ClientsPage, 1),
?assertEqual(ClientsLimit, emqx_mgmt:max_row_limit()),
?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()),
?assertEqual(ClientsCount, 2),
%% get /clients/:clientid
@ -78,7 +78,14 @@ t_clients(_) ->
%% delete /clients/:clientid kickout
Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]),
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path),
timer:sleep(300),
Kick =
receive
{'EXIT', C2, _} ->
ok
after 300 ->
timeout
end,
?assertEqual(ok, Kick),
AfterKickoutResponse2 = emqx_mgmt_api_test_util:request_api(get, Client2Path),
?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse2),
@ -107,7 +114,7 @@ t_clients(_) ->
SubscribeBody
),
timer:sleep(100),
[{AfterSubTopic, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1),
{_, [{AfterSubTopic, #{qos := AfterSubQos}}]} = emqx_mgmt:list_client_subscriptions(ClientId1),
?assertEqual(AfterSubTopic, Topic),
?assertEqual(AfterSubQos, Qos),
@ -152,7 +159,7 @@ t_clients(_) ->
UnSubscribeBody
),
timer:sleep(100),
?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)),
?assertEqual([], emqx_mgmt:list_client_subscriptions(ClientId1)),
%% testcase cleanup, kickout client1
{ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path),
@ -272,7 +279,7 @@ t_client_id_not_found(_Config) ->
%% Client kickout
?assertMatch({error, {Http, _, Body}}, ReqFun(delete, PathFun([]))),
%% Client Subscription list
?assertMatch({ok, {{"HTTP/1.1", 200, "OK"}, _, "[]"}}, ReqFun(get, PathFun(["subscriptions"]))),
?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["subscriptions"]))),
%% AuthZ Cache lookup
?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["authorization", "cache"]))),
%% AuthZ Cache clean

View File

@ -57,7 +57,7 @@ t_subscription_api(Config) ->
Data = emqx_json:decode(Response, [return_maps]),
Meta = maps:get(<<"meta">>, Data),
?assertEqual(1, maps:get(<<"page">>, Meta)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta)),
?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)),
?assertEqual(2, maps:get(<<"count">>, Meta)),
Subscriptions = maps:get(<<"data">>, Data),
?assertEqual(length(Subscriptions), 2),
@ -95,7 +95,7 @@ t_subscription_api(Config) ->
DataTopic2 = #{<<"meta">> := Meta2} = request_json(get, QS, Headers),
?assertEqual(1, maps:get(<<"page">>, Meta2)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta2)),
?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta2)),
?assertEqual(1, maps:get(<<"count">>, Meta2)),
SubscriptionsList2 = maps:get(<<"data">>, DataTopic2),
?assertEqual(length(SubscriptionsList2), 1).
@ -120,7 +120,7 @@ t_subscription_fuzzy_search(Config) ->
MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers),
?assertEqual(1, maps:get(<<"page">>, MatchMeta1)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta1)),
?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, MatchMeta1)),
%% count is undefined in fuzzy searching
?assertNot(maps:is_key(<<"count">>, MatchMeta1)),
?assertMatch(3, length(maps:get(<<"data">>, MatchData1))),

View File

@ -52,7 +52,7 @@ t_nodes_api(Config) ->
RoutesData = emqx_json:decode(Response, [return_maps]),
Meta = maps:get(<<"meta">>, RoutesData),
?assertEqual(1, maps:get(<<"page">>, Meta)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta)),
?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)),
?assertEqual(1, maps:get(<<"count">>, Meta)),
Data = maps:get(<<"data">>, RoutesData),
Route = erlang:hd(Data),

View File

@ -166,7 +166,7 @@ config(put, #{body := Body}) ->
%%------------------------------------------------------------------------------
lookup_retained(get, #{query_string := Qs}) ->
Page = maps:get(<<"page">>, Qs, 1),
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:max_row_limit()),
Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()),
{ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit),
{200, #{
data => [format_message(Msg) || Msg <- Msgs],