diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index e46047521..3c4d787d3 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -23,8 +23,7 @@ -define(LONG_QUERY_TIMEOUT, 50000). -export([ - paginate/3, - paginate/4 + paginate/3 ]). %% first_next query APIs @@ -34,6 +33,10 @@ b2i/1 ]). +-ifdef(TEST). +-export([paginate_test_format/1]). +-endif. + -export_type([ match_spec_and_filter/0 ]). @@ -58,14 +61,14 @@ -export([do_query/2, apply_total_query/1]). -paginate(Tables, Params, {Module, FormatFun}) -> - Qh = query_handle(Tables), - Count = count(Tables), - do_paginate(Qh, Count, Params, {Module, FormatFun}). - -paginate(Tables, MatchSpec, Params, {Module, FormatFun}) -> - Qh = query_handle(Tables, MatchSpec), - Count = count(Tables, MatchSpec), +-spec paginate(atom(), map(), {atom(), atom()}) -> + #{ + meta => #{page => pos_integer(), limit => pos_integer(), count => pos_integer()}, + data => list(term()) + }. +paginate(Table, Params, {Module, FormatFun}) -> + Qh = query_handle(Table), + Count = count(Table), do_paginate(Qh, Count, Params, {Module, FormatFun}). do_paginate(Qh, Count, Params, {Module, FormatFun}) -> @@ -86,57 +89,17 @@ do_paginate(Qh, Count, Params, {Module, FormatFun}) -> data => [erlang:apply(Module, FormatFun, [Row]) || Row <- Rows] }. -query_handle(Table) when is_atom(Table) -> - qlc:q([R || R <- ets:table(Table)]); -query_handle({Table, Opts}) when is_atom(Table) -> - qlc:q([R || R <- ets:table(Table, Opts)]); -query_handle([Table]) when is_atom(Table) -> - qlc:q([R || R <- ets:table(Table)]); -query_handle([{Table, Opts}]) when is_atom(Table) -> - qlc:q([R || R <- ets:table(Table, Opts)]); -query_handle(Tables) -> - % - qlc:append([query_handle(T) || T <- Tables]). +query_handle(Table) -> + qlc:q([R || R <- ets:table(Table)]). -query_handle(Table, MatchSpec) when is_atom(Table) -> - Options = {traverse, {select, MatchSpec}}, - qlc:q([R || R <- ets:table(Table, Options)]); -query_handle([Table], MatchSpec) when is_atom(Table) -> - Options = {traverse, {select, MatchSpec}}, - qlc:q([R || R <- ets:table(Table, Options)]); -query_handle(Tables, MatchSpec) -> - Options = {traverse, {select, MatchSpec}}, - qlc:append([qlc:q([E || E <- ets:table(T, Options)]) || T <- Tables]). +count(Table) -> + ets:info(Table, size). -count(Table) when is_atom(Table) -> - ets:info(Table, size); -count({Table, _}) when is_atom(Table) -> - ets:info(Table, size); -count([Table]) when is_atom(Table) -> - ets:info(Table, size); -count([{Table, _}]) when is_atom(Table) -> - ets:info(Table, size); -count(Tables) -> - lists:sum([count(T) || T <- Tables]). - -count(Table, MatchSpec) when is_atom(Table) -> - [{MatchPattern, Where, _Re}] = MatchSpec, - NMatchSpec = [{MatchPattern, Where, [true]}], - ets:select_count(Table, NMatchSpec); -count([Table], MatchSpec) when is_atom(Table) -> - count(Table, MatchSpec); -count(Tables, MatchSpec) -> - lists:sum([count(T, MatchSpec) || T <- Tables]). - -page(Params) when is_map(Params) -> - maps:get(<<"page">>, Params, 1); page(Params) -> - proplists:get_value(<<"page">>, Params, <<"1">>). + maps:get(<<"page">>, Params, 1). -limit(Params) when is_map(Params) -> - maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit()); limit(Params) -> - proplists:get_value(<<"limit">>, Params, emqx_mgmt:max_row_limit()). + maps:get(<<"limit">>, Params, emqx_mgmt:max_row_limit()). %%-------------------------------------------------------------------- %% Node Query @@ -210,8 +173,6 @@ cluster_query(Tab, QString, QSchema, MsFun, FmtFun) -> end. %% @private -do_cluster_query([], QueryState, ResultAcc) -> - finalize_query(ResultAcc, mark_complete(QueryState)); do_cluster_query( [Node | Tail] = Nodes, QueryState, @@ -605,7 +566,7 @@ to_type(V, TargetType) -> to_type_(V, atom) -> to_atom(V); to_type_(V, integer) -> to_integer(V); to_type_(V, timestamp) -> to_timestamp(V); -to_type_(V, ip) -> aton(V); +to_type_(V, ip) -> to_ip(V); to_type_(V, ip_port) -> to_ip_port(V); to_type_(V, _) -> V. @@ -624,14 +585,16 @@ to_timestamp(I) when is_integer(I) -> to_timestamp(B) when is_binary(B) -> binary_to_integer(B). -aton(B) when is_binary(B) -> - list_to_tuple([binary_to_integer(T) || T <- re:split(B, "[.]")]). +to_ip(IP0) when is_binary(IP0) -> + ensure_ok(inet:parse_address(binary_to_list(IP0))). to_ip_port(IPAddress) -> - [IP0, Port0] = string:tokens(binary_to_list(IPAddress), ":"), - {ok, IP} = inet:parse_address(IP0), - Port = list_to_integer(Port0), - {IP, Port}. + ensure_ok(emqx_schema:to_ip_port(IPAddress)). + +ensure_ok({ok, V}) -> + V; +ensure_ok({error, _R} = E) -> + throw(E). b2i(Bin) when is_binary(Bin) -> binary_to_integer(Bin); @@ -645,40 +608,115 @@ b2i(Any) -> -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -params2qs_test() -> +params2qs_test_() -> QSchema = [ {<<"str">>, binary}, {<<"int">>, integer}, + {<<"binatom">>, atom}, {<<"atom">>, atom}, {<<"ts">>, timestamp}, {<<"gte_range">>, integer}, {<<"lte_range">>, integer}, {<<"like_fuzzy">>, binary}, - {<<"match_topic">>, binary} + {<<"match_topic">>, binary}, + {<<"ip">>, ip}, + {<<"ip_port">>, ip_port} ], QString = [ {<<"str">>, <<"abc">>}, {<<"int">>, <<"123">>}, - {<<"atom">>, <<"connected">>}, + {<<"binatom">>, <<"connected">>}, + {<<"atom">>, ok}, {<<"ts">>, <<"156000">>}, {<<"gte_range">>, <<"1">>}, {<<"lte_range">>, <<"5">>}, {<<"like_fuzzy">>, <<"user">>}, - {<<"match_topic">>, <<"t/#">>} + {<<"match_topic">>, <<"t/#">>}, + {<<"ip">>, <<"127.0.0.1">>}, + {<<"ip_port">>, <<"127.0.0.1:8888">>} ], ExpectedQs = [ {str, '=:=', <<"abc">>}, {int, '=:=', 123}, - {atom, '=:=', connected}, + {binatom, '=:=', connected}, + {atom, '=:=', ok}, {ts, '=:=', 156000}, - {range, '>=', 1, '=<', 5} + {range, '>=', 1, '=<', 5}, + {ip, '=:=', {127, 0, 0, 1}}, + {ip_port, '=:=', {{127, 0, 0, 1}, 8888}} ], FuzzyNQString = [ {fuzzy, like, <<"user">>}, {topic, match, <<"t/#">>} ], - ?assertEqual({7, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)), - {0, {[], []}} = parse_qstring([{not_a_predefined_params, val}], QSchema). + [ + ?_assertEqual({10, {ExpectedQs, FuzzyNQString}}, parse_qstring(QString, QSchema)), + ?_assertEqual({0, {[], []}}, parse_qstring([{not_a_predefined_params, val}], QSchema)), + ?_assertEqual( + {1, {[{ip, '=:=', {0, 0, 0, 0, 0, 0, 0, 1}}], []}}, + parse_qstring([{<<"ip">>, <<"::1">>}], QSchema) + ), + ?_assertEqual( + {1, {[{ip_port, '=:=', {{0, 0, 0, 0, 0, 0, 0, 1}, 8888}}], []}}, + parse_qstring([{<<"ip_port">>, <<"::1:8888">>}], QSchema) + ), + ?_assertThrow( + {bad_value_type, {<<"ip">>, ip, <<"helloworld">>}}, + parse_qstring([{<<"ip">>, <<"helloworld">>}], QSchema) + ), + ?_assertThrow( + {bad_value_type, {<<"ip_port">>, ip_port, <<"127.0.0.1">>}}, + parse_qstring([{<<"ip_port">>, <<"127.0.0.1">>}], QSchema) + ), + ?_assertThrow( + {bad_value_type, {<<"ip_port">>, ip_port, <<"helloworld:abcd">>}}, + parse_qstring([{<<"ip_port">>, <<"helloworld:abcd">>}], QSchema) + ) + ]. +paginate_test_format(Row) -> + Row. + +paginate_test_() -> + _ = ets:new(?MODULE, [named_table]), + Size = 1000, + MyLimit = 10, + ets:insert(?MODULE, [{I, foo} || I <- lists:seq(1, Size)]), + DefaultLimit = emqx_mgmt:max_row_limit(), + NoParamsResult = paginate(?MODULE, #{}, {?MODULE, paginate_test_format}), + PaginateResults = [ + paginate( + ?MODULE, #{<<"page">> => I, <<"limit">> => MyLimit}, {?MODULE, paginate_test_format} + ) + || I <- lists:seq(1, floor(Size / MyLimit)) + ], + [ + ?_assertMatch( + #{meta := #{count := Size, page := 1, limit := DefaultLimit}}, NoParamsResult + ), + ?_assertEqual(DefaultLimit, length(maps:get(data, NoParamsResult))), + ?_assertEqual( + #{data => [], meta => #{count => Size, limit => DefaultLimit, page => 100}}, + paginate(?MODULE, #{<<"page">> => <<"100">>}, {?MODULE, paginate_test_format}) + ) + ] ++ assert_paginate_results(PaginateResults, Size, MyLimit). + +assert_paginate_results(Results, Size, Limit) -> + AllData = lists:flatten([Data || #{data := Data} <- Results]), + [ + begin + Result = lists:nth(I, Results), + [ + ?_assertMatch(#{meta := #{count := Size, limit := Limit, page := I}}, Result), + ?_assertEqual(Limit, length(maps:get(data, Result))) + ] + end + || I <- lists:seq(1, floor(Size / Limit)) + ] ++ + [ + ?_assertEqual(floor(Size / Limit), length(Results)), + ?_assertEqual(Size, length(AllData)), + ?_assertEqual(Size, sets:size(sets:from_list(AllData))) + ]. -endif. diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index a8bbfa6d9..4d0262e6a 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -67,7 +67,7 @@ t_cluster_query(_Config) -> %% assert: AllPage = Page1 + Page2 + Page3 + Page4 %% !!!Note: this equation requires that the queried tables must be ordered_set - {200, ClientsPage2} = query_clients(Node1, #{<<"page">> => 2, <<"limit">> => 5}), + {200, ClientsPage2} = query_clients(Node1, #{<<"page">> => <<"2">>, <<"limit">> => 5}), {200, ClientsPage3} = query_clients(Node2, #{<<"page">> => 3, <<"limit">> => 5}), {200, ClientsPage4} = query_clients(Node1, #{<<"page">> => 4, <<"limit">> => 5}), GetClientIds = fun(L) -> lists:map(fun(#{clientid := Id}) -> Id end, L) end, @@ -79,6 +79,78 @@ t_cluster_query(_Config) -> ) ), + %% Scroll past count + {200, ClientsPage10} = query_clients(Node1, #{<<"page">> => <<"10">>, <<"limit">> => 5}), + ?assertEqual( + #{data => [], meta => #{page => 10, limit => 5, count => 20, hasnext => false}}, + ClientsPage10 + ), + + %% Node queries + {200, ClientsNode2} = query_clients(Node1, #{<<"node">> => Node2}), + ?assertEqual({200, ClientsNode2}, query_clients(Node2, #{<<"node">> => Node2})), + ?assertMatch( + #{page := 1, limit := 100, count := 10}, + maps:get(meta, ClientsNode2) + ), + ?assertMatch(10, length(maps:get(data, ClientsNode2))), + + {200, ClientsNode2Page1} = query_clients(Node2, #{<<"node">> => Node2, <<"limit">> => 5}), + {200, ClientsNode2Page2} = query_clients(Node1, #{ + <<"node">> => Node2, <<"page">> => <<"2">>, <<"limit">> => 5 + }), + {200, ClientsNode2Page3} = query_clients(Node2, #{ + <<"node">> => Node2, <<"page">> => 3, <<"limit">> => 5 + }), + {200, ClientsNode2Page4} = query_clients(Node1, #{ + <<"node">> => Node2, <<"page">> => 4, <<"limit">> => 5 + }), + ?assertEqual( + GetClientIds(maps:get(data, ClientsNode2)), + GetClientIds( + lists:append([ + maps:get(data, Page) + || Page <- [ + ClientsNode2Page1, + ClientsNode2Page2, + ClientsNode2Page3, + ClientsNode2Page4 + ] + ]) + ) + ), + + %% Scroll past count + {200, ClientsNode2Page10} = query_clients(Node1, #{ + <<"node">> => Node2, <<"page">> => <<"10">>, <<"limit">> => 5 + }), + ?assertEqual( + #{data => [], meta => #{page => 10, limit => 5, count => 10, hasnext => false}}, + ClientsNode2Page10 + ), + + %% Query with bad params + ?assertEqual( + {400, #{ + code => <<"INVALID_PARAMETER">>, + message => <<"page_limit_invalid">> + }}, + query_clients(Node1, #{<<"page">> => -1}) + ), + ?assertEqual( + {400, #{ + code => <<"INVALID_PARAMETER">>, + message => <<"page_limit_invalid">> + }}, + query_clients(Node1, #{<<"node">> => Node1, <<"page">> => -1}) + ), + + %% Query bad node + ?assertMatch( + {500, #{code := <<"NODE_DOWN">>}}, + query_clients(Node1, #{<<"node">> => 'nonode@nohost'}) + ), + %% exact match can return non-zero total {200, ClientsNode1} = query_clients(Node2, #{<<"username">> => <<"corenode1@127.0.0.1">>}), ?assertMatch( @@ -87,11 +159,11 @@ t_cluster_query(_Config) -> ), %% fuzzy searching can't return total - {200, ClientsNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}), - MetaNode2 = maps:get(meta, ClientsNode2), + {200, ClientsFuzzyNode2} = query_clients(Node2, #{<<"like_username">> => <<"corenode2">>}), + MetaNode2 = maps:get(meta, ClientsFuzzyNode2), ?assertNotMatch(#{count := _}, MetaNode2), ?assertMatch(#{hasnext := false}, MetaNode2), - ?assertMatch(10, length(maps:get(data, ClientsNode2))), + ?assertMatch(10, length(maps:get(data, ClientsFuzzyNode2))), _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1), _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs2) @@ -101,6 +173,23 @@ t_cluster_query(_Config) -> end, ok. +t_bad_rpc(_) -> + emqx_mgmt_api_test_util:init_suite(), + process_flag(trap_exit, true), + ClientLs1 = [start_emqtt_client(node(), I, 1883) || I <- lists:seq(1, 10)], + Path = emqx_mgmt_api_test_util:api_path(["clients?limit=2&page=2"]), + try + meck:expect(mria_mnesia, running_nodes, 0, ['fake@nohost']), + {error, {_, 500, _}} = emqx_mgmt_api_test_util:request_api(get, Path), + %% good cop, bad cop + meck:expect(mria_mnesia, running_nodes, 0, [node(), 'fake@nohost']), + {error, {_, 500, _}} = emqx_mgmt_api_test_util:request_api(get, Path) + after + _ = lists:foreach(fun(C) -> emqtt:disconnect(C) end, ClientLs1), + meck:unload(mria_mnesia), + emqx_mgmt_api_test_util:end_suite() + end. + %%-------------------------------------------------------------------- %% helpers %%--------------------------------------------------------------------