diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index fe1dfa35e..5149b8b8a 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -22,6 +22,8 @@ -export([ all/1, + init_per_testcase/3, + end_per_testcase/3, boot_modules/1, start_apps/1, start_apps/2, @@ -150,6 +152,19 @@ all(Suite) -> string:substr(atom_to_list(F), 1, 2) == "t_" ]). +init_per_testcase(Module, TestCase, Config) -> + case erlang:function_exported(Module, TestCase, 2) of + true -> Module:TestCase(init, Config); + false -> Config + end. + +end_per_testcase(Module, TestCase, Config) -> + case erlang:function_exported(Module, TestCase, 2) of + true -> Module:TestCase('end', Config); + false -> ok + end, + Config. + %% set emqx app boot modules -spec boot_modules(all | list(atom())) -> ok. boot_modules(Mods) -> diff --git a/apps/emqx_management/include/emqx_mgmt.hrl b/apps/emqx_management/include/emqx_mgmt.hrl index b68a9a634..7f6b5a675 100644 --- a/apps/emqx_management/include/emqx_mgmt.hrl +++ b/apps/emqx_management/include/emqx_mgmt.hrl @@ -16,4 +16,4 @@ -define(MANAGEMENT_SHARD, emqx_management_shard). --define(MAX_ROW_LIMIT, 100). +-define(DEFAULT_ROW_LIMIT, 100). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 2d6fa854e..efa5a03bd 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -21,8 +21,6 @@ -elvis([{elvis_style, god_modules, disable}]). -include_lib("stdlib/include/qlc.hrl"). --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). %% Nodes and Brokers API -export([ @@ -71,8 +69,6 @@ list_subscriptions/1, list_subscriptions_via_topic/2, list_subscriptions_via_topic/3, - lookup_subscriptions/1, - lookup_subscriptions/2, do_list_subscriptions/0 ]). @@ -105,12 +101,10 @@ %% Common Table API -export([ - max_row_limit/0, + default_row_limit/0, vm_stats/0 ]). --define(APP, emqx_management). - -elvis([{elvis_style, god_modules, disable}]). %%-------------------------------------------------------------------- @@ -162,7 +156,7 @@ node_info(Nodes) -> emqx_rpc:unwrap_erpc(emqx_management_proto_v3:node_info(Nodes)). stopped_node_info(Node) -> - #{name => Node, node_status => 'stopped'}. + {Node, #{node => Node, node_status => 'stopped'}}. vm_stats() -> Idle = @@ -194,8 +188,13 @@ lookup_broker(Node) -> Broker. broker_info() -> - Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]), - Info#{node => node(), otp_release => otp_rel(), node_status => 'Running'}. + Info = lists:foldl(fun convert_broker_info/2, #{}, emqx_sys:info()), + Info#{node => node(), otp_release => otp_rel(), node_status => 'running'}. + +convert_broker_info({uptime, Uptime}, M) -> + M#{uptime => emqx_datetime:human_readable_duration_string(Uptime)}; +convert_broker_info({K, V}, M) -> + M#{K => iolist_to_binary(V)}. broker_info(Nodes) -> emqx_rpc:unwrap_erpc(emqx_management_proto_v3:broker_info(Nodes)). @@ -265,7 +264,7 @@ lookup_client({username, Username}, FormatFun) -> || Node <- mria_mnesia:running_nodes() ]). -lookup_client(Node, Key, {M, F}) -> +lookup_client(Node, Key, FormatFun) -> case unwrap_rpc(emqx_cm_proto_v1:lookup_client(Node, Key)) of {error, Err} -> {error, Err}; @@ -273,18 +272,23 @@ lookup_client(Node, Key, {M, F}) -> lists:map( fun({Chan, Info0, Stats}) -> Info = Info0#{node => Node}, - M:F({Chan, Info, Stats}) + maybe_format(FormatFun, {Chan, Info, Stats}) end, L ) end. -kickout_client({ClientID, FormatFun}) -> - case lookup_client({clientid, ClientID}, FormatFun) of +maybe_format(undefined, A) -> + A; +maybe_format({M, F}, A) -> + M:F(A). + +kickout_client(ClientId) -> + case lookup_client({clientid, ClientId}, undefined) of [] -> {error, not_found}; _ -> - Results = [kickout_client(Node, ClientID) || Node <- mria_mnesia:running_nodes()], + Results = [kickout_client(Node, ClientId) || Node <- mria_mnesia:running_nodes()], check_results(Results) end. @@ -295,17 +299,22 @@ list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). list_client_subscriptions(ClientId) -> - Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()], - Filter = - fun - ({error, _}) -> - false; - ({_Node, List}) -> - erlang:is_list(List) andalso 0 < erlang:length(List) - end, - case lists:filter(Filter, Results) of - [] -> []; - [Result | _] -> Result + case lookup_client({clientid, ClientId}, undefined) of + [] -> + {error, not_found}; + _ -> + Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()], + Filter = + fun + ({error, _}) -> + false; + ({_Node, List}) -> + erlang:is_list(List) andalso 0 < erlang:length(List) + end, + case lists:filter(Filter, Results) of + [] -> []; + [Result | _] -> Result + end end. client_subscriptions(Node, ClientId) -> @@ -388,17 +397,11 @@ call_client(Node, ClientId, Req) -> %% Subscriptions %%-------------------------------------------------------------------- --spec do_list_subscriptions() -> [map()]. +-spec do_list_subscriptions() -> no_return(). do_list_subscriptions() -> - case check_row_limit([mqtt_subproperty]) of - false -> - throw(max_row_limit); - ok -> - [ - #{topic => Topic, clientid => ClientId, options => Options} - || {{Topic, ClientId}, Options} <- ets:tab2list(mqtt_subproperty) - ] - end. + %% [FIXME] Add function to `emqx_broker` that returns list of subscriptions + %% and either redirect from here or bpapi directly (EMQX-8993). + throw(not_implemented). list_subscriptions(Node) -> unwrap_rpc(emqx_management_proto_v3:list_subscriptions(Node)). @@ -415,12 +418,6 @@ list_subscriptions_via_topic(Node, Topic, _FormatFun = {M, F}) -> Result -> M:F(Result) end. -lookup_subscriptions(ClientId) -> - lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]). - -lookup_subscriptions(Node, ClientId) -> - unwrap_rpc(emqx_broker_proto_v1:list_client_subscriptions(Node, ClientId)). - %%-------------------------------------------------------------------- %% PubSub %%-------------------------------------------------------------------- @@ -556,24 +553,11 @@ unwrap_rpc(Res) -> otp_rel() -> iolist_to_binary([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]). -check_row_limit(Tables) -> - check_row_limit(Tables, max_row_limit()). - -check_row_limit([], _Limit) -> - ok; -check_row_limit([Tab | Tables], Limit) -> - case table_size(Tab) > Limit of - true -> false; - false -> check_row_limit(Tables, Limit) - end. - check_results(Results) -> case lists:any(fun(Item) -> Item =:= ok end, Results) of true -> ok; false -> unwrap_rpc(lists:last(Results)) end. -max_row_limit() -> - ?MAX_ROW_LIMIT. - -table_size(Tab) -> ets:info(Tab, size). +default_row_limit() -> + ?DEFAULT_ROW_LIMIT. diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index 3c4d787d3..a0a40533d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -98,8 +98,8 @@ count(Table) -> page(Params) -> maps:get(<<"page">>, Params, 1). -limit(Params) -> - maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit()). +limit(Params) when is_map(Params) -> + maps:get(<<"limit">>, Params, emqx_mgmt:default_row_limit()). %%-------------------------------------------------------------------- %% Node Query @@ -683,7 +683,7 @@ paginate_test_() -> Size = 1000, MyLimit = 10, ets:insert(?MODULE, [{I, foo} || I <- lists:seq(1, Size)]), - DefaultLimit = emqx_mgmt:max_row_limit(), + DefaultLimit = emqx_mgmt:default_row_limit(), NoParamsResult = paginate(?MODULE, #{}, {?MODULE, paginate_test_format}), PaginateResults = [ paginate( diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 571f190f2..cac3edaed 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -274,11 +274,10 @@ schema("/clients/:clientid/subscriptions") -> responses => #{ 200 => hoconsc:mk( hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)), #{} + ), + 404 => emqx_dashboard_swagger:error_codes( + ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> ) - %% returns [] if client not existed in cluster - %404 => emqx_dashboard_swagger:error_codes( - % ['CLIENTID_NOT_FOUND'], <<"Client ID not found">> - %) } } }; @@ -599,6 +598,8 @@ unsubscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfo subscriptions(get, #{bindings := #{clientid := ClientID}}) -> case emqx_mgmt:list_client_subscriptions(ClientID) of + {error, not_found} -> + {404, ?CLIENTID_NOT_FOUND}; [] -> {200, []}; {Node, Subs} -> @@ -677,7 +678,7 @@ lookup(#{clientid := ClientID}) -> end. kickout(#{clientid := ClientID}) -> - case emqx_mgmt:kickout_client({ClientID, ?FORMAT_FUN}) of + case emqx_mgmt:kickout_client(ClientID) of {error, not_found} -> {404, ?CLIENTID_NOT_FOUND}; _ -> diff --git a/apps/emqx_management/src/emqx_mgmt_util.erl b/apps/emqx_management/src/emqx_mgmt_util.erl index c0d9e6036..b81b39b07 100644 --- a/apps/emqx_management/src/emqx_mgmt_util.erl +++ b/apps/emqx_management/src/emqx_mgmt_util.erl @@ -302,7 +302,7 @@ page_params() -> name => limit, in => query, description => <<"Page size">>, - schema => #{type => integer, default => emqx_mgmt:max_row_limit()} + schema => #{type => integer, default => emqx_mgmt:default_row_limit()} } ]. diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl new file mode 100644 index 000000000..4619905cb --- /dev/null +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -0,0 +1,387 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_mgmt_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-export([ident/1]). + +-define(FORMATFUN, {?MODULE, ident}). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]), + Config. + +end_per_suite(_) -> + emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]). + +init_per_testcase(TestCase, Config) -> + meck:expect(mria_mnesia, running_nodes, 0, [node()]), + emqx_common_test_helpers:init_per_testcase(?MODULE, TestCase, Config). + +end_per_testcase(TestCase, Config) -> + meck:unload(mria_mnesia), + emqx_common_test_helpers:end_per_testcase(?MODULE, TestCase, Config). + +t_list_nodes(init, Config) -> + meck:expect( + mria_mnesia, + cluster_nodes, + fun + (running) -> [node()]; + (stopped) -> ['stopped@node'] + end + ), + Config; +t_list_nodes('end', _Config) -> + ok. + +t_list_nodes(_) -> + NodeInfos = emqx_mgmt:list_nodes(), + Node = node(), + ?assertMatch( + [ + {Node, #{node := Node, node_status := 'running'}}, + {'stopped@node', #{node := 'stopped@node', node_status := 'stopped'}} + ], + NodeInfos + ). + +t_lookup_node(init, Config) -> + meck:new(os, [passthrough, unstick, no_link]), + OsType = os:type(), + meck:expect(os, type, 0, {win32, winME}), + [{os_type, OsType} | Config]; +t_lookup_node('end', Config) -> + %% We need to restore the original behavior so that rebar3 doesn't crash. If + %% we'd `meck:unload(os)` or not set `no_link` then `ct` crashes calling + %% `os` with "The code server called the unloaded module `os'". + OsType = ?config(os_type, Config), + meck:expect(os, type, 0, OsType), + ok. + +t_lookup_node(_) -> + Node = node(), + ?assertMatch( + #{node := Node, node_status := 'running', memory_total := 0}, + emqx_mgmt:lookup_node(node()) + ), + ?assertMatch( + {error, _}, + emqx_mgmt:lookup_node('fake@nohost') + ), + ok. + +t_list_brokers(_) -> + Node = node(), + ?assertMatch( + [{Node, #{node := Node, node_status := running, uptime := _}}], + emqx_mgmt:list_brokers() + ). + +t_lookup_broker(_) -> + Node = node(), + ?assertMatch( + #{node := Node, node_status := running, uptime := _}, + emqx_mgmt:lookup_broker(Node) + ). + +t_get_metrics(_) -> + Metrics = emqx_mgmt:get_metrics(), + ?assert(maps:size(Metrics) > 0), + ?assertMatch( + Metrics, maps:from_list(emqx_mgmt:get_metrics(node())) + ). + +t_lookup_client(init, Config) -> + setup_clients(Config); +t_lookup_client('end', Config) -> + disconnect_clients(Config). + +t_lookup_client(_Config) -> + [{Chan, Info, Stats}] = emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN), + ?assertEqual( + [{Chan, Info, Stats}], + emqx_mgmt:lookup_client({username, <<"user1">>}, ?FORMATFUN) + ), + ?assertEqual([], emqx_mgmt:lookup_client({clientid, <<"notfound">>}, ?FORMATFUN)), + meck:expect(mria_mnesia, running_nodes, 0, [node(), 'fake@nonode']), + ?assertMatch( + [_ | {error, nodedown}], emqx_mgmt:lookup_client({clientid, <<"client1">>}, ?FORMATFUN) + ). + +t_kickout_client(init, Config) -> + process_flag(trap_exit, true), + setup_clients(Config); +t_kickout_client('end', _Config) -> + ok. + +t_kickout_client(Config) -> + [C | _] = ?config(clients, Config), + ok = emqx_mgmt:kickout_client(<<"client1">>), + receive + {'EXIT', C, Reason} -> + ?assertEqual({shutdown, tcp_closed}, Reason); + Foo -> + error({unexpected, Foo}) + after 1000 -> + error(timeout) + end, + ?assertEqual({error, not_found}, emqx_mgmt:kickout_client(<<"notfound">>)). + +t_list_authz_cache(init, Config) -> + setup_clients(Config); +t_list_authz_cache('end', Config) -> + disconnect_clients(Config). + +t_list_authz_cache(_) -> + ?assertNotMatch({error, _}, emqx_mgmt:list_authz_cache(<<"client1">>)), + ?assertMatch({error, not_found}, emqx_mgmt:list_authz_cache(<<"notfound">>)). + +t_list_client_subscriptions(init, Config) -> + setup_clients(Config); +t_list_client_subscriptions('end', Config) -> + disconnect_clients(Config). + +t_list_client_subscriptions(Config) -> + [Client | _] = ?config(clients, Config), + ?assertEqual([], emqx_mgmt:list_client_subscriptions(<<"client1">>)), + emqtt:subscribe(Client, <<"t/#">>), + ?assertMatch({_, [{<<"t/#">>, _Opts}]}, emqx_mgmt:list_client_subscriptions(<<"client1">>)), + ?assertEqual({error, not_found}, emqx_mgmt:list_client_subscriptions(<<"notfound">>)). + +t_clean_cache(init, Config) -> + setup_clients(Config); +t_clean_cache('end', Config) -> + disconnect_clients(Config). + +t_clean_cache(_Config) -> + ?assertNotMatch( + {error, _}, + emqx_mgmt:clean_authz_cache(<<"client1">>) + ), + ?assertNotMatch( + {error, _}, + emqx_mgmt:clean_authz_cache_all() + ), + ?assertNotMatch( + {error, _}, + emqx_mgmt:clean_pem_cache_all() + ), + meck:expect(mria_mnesia, running_nodes, 0, [node(), 'fake@nonode']), + ?assertMatch( + {error, [{'fake@nonode', {error, _}}]}, + emqx_mgmt:clean_authz_cache_all() + ), + ?assertMatch( + {error, [{'fake@nonode', {error, _}}]}, + emqx_mgmt:clean_pem_cache_all() + ). + +t_set_client_props(init, Config) -> + setup_clients(Config); +t_set_client_props('end', Config) -> + disconnect_clients(Config). + +t_set_client_props(_Config) -> + ?assertEqual( + % [FIXME] not implemented at this point? + ignored, + emqx_mgmt:set_ratelimit_policy(<<"client1">>, foo) + ), + ?assertEqual( + {error, not_found}, + emqx_mgmt:set_ratelimit_policy(<<"notfound">>, foo) + ), + ?assertEqual( + % [FIXME] not implemented at this point? + ignored, + emqx_mgmt:set_quota_policy(<<"client1">>, foo) + ), + ?assertEqual( + {error, not_found}, + emqx_mgmt:set_quota_policy(<<"notfound">>, foo) + ), + ?assertEqual( + ok, + emqx_mgmt:set_keepalive(<<"client1">>, 3600) + ), + ?assertMatch( + {error, _}, + emqx_mgmt:set_keepalive(<<"client1">>, true) + ), + ?assertEqual( + {error, not_found}, + emqx_mgmt:set_keepalive(<<"notfound">>, 3600) + ), + ok. + +t_list_subscriptions_via_topic(init, Config) -> + setup_clients(Config); +t_list_subscriptions_via_topic('end', Config) -> + disconnect_clients(Config). + +t_list_subscriptions_via_topic(Config) -> + [Client | _] = ?config(clients, Config), + ?assertEqual([], emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN)), + emqtt:subscribe(Client, <<"t/#">>), + ?assertMatch( + [{{<<"t/#">>, _SubPid}, _Opts}], + emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN) + ). + +t_pubsub_api(init, Config) -> + setup_clients(Config); +t_pubsub_api('end', Config) -> + disconnect_clients(Config). + +-define(TT(Topic), {Topic, #{qos => 0}}). + +t_pubsub_api(Config) -> + [Client | _] = ?config(clients, Config), + ?assertEqual([], emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN)), + ?assertMatch( + {subscribe, _, _}, + emqx_mgmt:subscribe(<<"client1">>, [?TT(<<"t/#">>), ?TT(<<"t1/#">>), ?TT(<<"t2/#">>)]) + ), + timer:sleep(100), + ?assertMatch( + [{{<<"t/#">>, _SubPid}, _Opts}], + emqx_mgmt:list_subscriptions_via_topic(<<"t/#">>, ?FORMATFUN) + ), + Message = emqx_message:make(?MODULE, 0, <<"t/foo">>, <<"helloworld">>, #{}, #{}), + emqx_mgmt:publish(Message), + Recv = + receive + {publish, #{client_pid := Client, payload := <<"helloworld">>}} -> + ok + after 100 -> + timeout + end, + ?assertEqual(ok, Recv), + ?assertEqual({error, channel_not_found}, emqx_mgmt:subscribe(<<"notfound">>, [?TT(<<"t/#">>)])), + ?assertNotMatch({error, _}, emqx_mgmt:unsubscribe(<<"client1">>, <<"t/#">>)), + ?assertEqual({error, channel_not_found}, emqx_mgmt:unsubscribe(<<"notfound">>, <<"t/#">>)), + Node = node(), + ?assertMatch( + {Node, [{<<"t1/#">>, _}, {<<"t2/#">>, _}]}, + emqx_mgmt:list_client_subscriptions(<<"client1">>) + ), + ?assertMatch( + {unsubscribe, [{<<"t1/#">>, _}, {<<"t2/#">>, _}]}, + emqx_mgmt:unsubscribe_batch(<<"client1">>, [<<"t1/#">>, <<"t2/#">>]) + ), + timer:sleep(100), + ?assertMatch([], emqx_mgmt:list_client_subscriptions(<<"client1">>)), + ?assertEqual( + {error, channel_not_found}, + emqx_mgmt:unsubscribe_batch(<<"notfound">>, [<<"t1/#">>, <<"t2/#">>]) + ). + +t_alarms(init, Config) -> + [ + emqx_mgmt:deactivate(Node, Name) + || {Node, ActiveAlarms} <- emqx_mgmt:get_alarms(activated), #{name := Name} <- ActiveAlarms + ], + emqx_mgmt:delete_all_deactivated_alarms(), + Config; +t_alarms('end', Config) -> + Config. + +t_alarms(_) -> + Node = node(), + ?assertEqual( + [{node(), []}], + emqx_mgmt:get_alarms(all) + ), + emqx_alarm:activate(foo), + ?assertMatch( + [{Node, [#{name := foo, activated := true, duration := _}]}], + emqx_mgmt:get_alarms(all) + ), + emqx_alarm:activate(bar), + ?assertMatch( + [{Node, [#{name := foo, activated := true}, #{name := bar, activated := true}]}], + sort_alarms(emqx_mgmt:get_alarms(all)) + ), + ?assertEqual( + ok, + emqx_mgmt:deactivate(node(), bar) + ), + ?assertMatch( + [{Node, [#{name := foo, activated := true}, #{name := bar, activated := false}]}], + sort_alarms(emqx_mgmt:get_alarms(all)) + ), + ?assertMatch( + [{Node, [#{name := foo, activated := true}]}], + emqx_mgmt:get_alarms(activated) + ), + ?assertMatch( + [{Node, [#{name := bar, activated := false}]}], + emqx_mgmt:get_alarms(deactivated) + ), + ?assertEqual( + [ok], + emqx_mgmt:delete_all_deactivated_alarms() + ), + ?assertMatch( + [{Node, [#{name := foo, activated := true}]}], + emqx_mgmt:get_alarms(all) + ), + ?assertEqual( + {error, not_found}, + emqx_mgmt:deactivate(node(), bar) + ). + +t_banned(_) -> + Banned = #{ + who => {clientid, <<"TestClient">>}, + by => <<"banned suite">>, + reason => <<"test">>, + at => erlang:system_time(second), + until => erlang:system_time(second) + 1 + }, + ?assertMatch( + {ok, _}, + emqx_mgmt:create_banned(Banned) + ), + ?assertEqual( + ok, + emqx_mgmt:delete_banned({clientid, <<"TestClient">>}) + ). + +%%% helpers +ident(Arg) -> + Arg. + +sort_alarms([{Node, Alarms}]) -> + [{Node, lists:sort(fun(#{activate_at := A}, #{activate_at := B}) -> A < B end, Alarms)}]. + +setup_clients(Config) -> + {ok, C} = emqtt:start_link([{clientid, <<"client1">>}, {username, <<"user1">>}]), + {ok, _} = emqtt:connect(C), + [{clients, [C]} | Config]. + +disconnect_clients(Config) -> + Clients = ?config(clients, Config), + lists:foreach(fun emqtt:disconnect/1, Clients). diff --git a/apps/emqx_management/test/emqx_mgmt_api_alarms_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_alarms_SUITE.erl index 2c61651bf..69ace16e8 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_alarms_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_alarms_SUITE.erl @@ -62,5 +62,5 @@ get_alarms(AssertCount, Activated) -> Limit = maps:get(<<"limit">>, Meta), Count = maps:get(<<"count">>, Meta), ?assertEqual(Page, 1), - ?assertEqual(Limit, emqx_mgmt:max_row_limit()), + ?assertEqual(Limit, emqx_mgmt:default_row_limit()), ?assert(Count >= AssertCount). diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 1a74d3af6..9f26f8542 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -64,7 +64,7 @@ t_clients(_) -> ClientsLimit = maps:get(<<"limit">>, ClientsMeta), ClientsCount = maps:get(<<"count">>, ClientsMeta), ?assertEqual(ClientsPage, 1), - ?assertEqual(ClientsLimit, emqx_mgmt:max_row_limit()), + ?assertEqual(ClientsLimit, emqx_mgmt:default_row_limit()), ?assertEqual(ClientsCount, 2), %% get /clients/:clientid @@ -78,7 +78,14 @@ t_clients(_) -> %% delete /clients/:clientid kickout Client2Path = emqx_mgmt_api_test_util:api_path(["clients", binary_to_list(ClientId2)]), {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client2Path), - timer:sleep(300), + Kick = + receive + {'EXIT', C2, _} -> + ok + after 300 -> + timeout + end, + ?assertEqual(ok, Kick), AfterKickoutResponse2 = emqx_mgmt_api_test_util:request_api(get, Client2Path), ?assertEqual({error, {"HTTP/1.1", 404, "Not Found"}}, AfterKickoutResponse2), @@ -107,7 +114,7 @@ t_clients(_) -> SubscribeBody ), timer:sleep(100), - [{AfterSubTopic, #{qos := AfterSubQos}}] = emqx_mgmt:lookup_subscriptions(ClientId1), + {_, [{AfterSubTopic, #{qos := AfterSubQos}}]} = emqx_mgmt:list_client_subscriptions(ClientId1), ?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubQos, Qos), @@ -152,7 +159,7 @@ t_clients(_) -> UnSubscribeBody ), timer:sleep(100), - ?assertEqual([], emqx_mgmt:lookup_subscriptions(Client1)), + ?assertEqual([], emqx_mgmt:list_client_subscriptions(ClientId1)), %% testcase cleanup, kickout client1 {ok, _} = emqx_mgmt_api_test_util:request_api(delete, Client1Path), @@ -272,7 +279,7 @@ t_client_id_not_found(_Config) -> %% Client kickout ?assertMatch({error, {Http, _, Body}}, ReqFun(delete, PathFun([]))), %% Client Subscription list - ?assertMatch({ok, {{"HTTP/1.1", 200, "OK"}, _, "[]"}}, ReqFun(get, PathFun(["subscriptions"]))), + ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["subscriptions"]))), %% AuthZ Cache lookup ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["authorization", "cache"]))), %% AuthZ Cache clean diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index 2ab213e30..ccfa30037 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -57,7 +57,7 @@ t_subscription_api(Config) -> Data = emqx_json:decode(Response, [return_maps]), Meta = maps:get(<<"meta">>, Data), ?assertEqual(1, maps:get(<<"page">>, Meta)), - ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta)), + ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)), ?assertEqual(2, maps:get(<<"count">>, Meta)), Subscriptions = maps:get(<<"data">>, Data), ?assertEqual(length(Subscriptions), 2), @@ -95,7 +95,7 @@ t_subscription_api(Config) -> DataTopic2 = #{<<"meta">> := Meta2} = request_json(get, QS, Headers), ?assertEqual(1, maps:get(<<"page">>, Meta2)), - ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta2)), + ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta2)), ?assertEqual(1, maps:get(<<"count">>, Meta2)), SubscriptionsList2 = maps:get(<<"data">>, DataTopic2), ?assertEqual(length(SubscriptionsList2), 1). @@ -120,7 +120,7 @@ t_subscription_fuzzy_search(Config) -> MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers), ?assertEqual(1, maps:get(<<"page">>, MatchMeta1)), - ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta1)), + ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, MatchMeta1)), %% count is undefined in fuzzy searching ?assertNot(maps:is_key(<<"count">>, MatchMeta1)), ?assertMatch(3, length(maps:get(<<"data">>, MatchData1))), diff --git a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl index 8f9b224ef..0c2e684b4 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_topics_SUITE.erl @@ -52,7 +52,7 @@ t_nodes_api(Config) -> RoutesData = emqx_json:decode(Response, [return_maps]), Meta = maps:get(<<"meta">>, RoutesData), ?assertEqual(1, maps:get(<<"page">>, Meta)), - ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, Meta)), + ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)), ?assertEqual(1, maps:get(<<"count">>, Meta)), Data = maps:get(<<"data">>, RoutesData), Route = erlang:hd(Data), diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index fa11b00f4..7b1337140 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -166,7 +166,7 @@ config(put, #{body := Body}) -> %%------------------------------------------------------------------------------ lookup_retained(get, #{query_string := Qs}) -> Page = maps:get(<<"page">>, Qs, 1), - Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:max_row_limit()), + Limit = maps:get(<<"limit">>, Qs, emqx_mgmt:default_row_limit()), {ok, Msgs} = emqx_retainer_mnesia:page_read(undefined, undefined, Page, Limit), {200, #{ data => [format_message(Msg) || Msg <- Msgs],