diff --git a/apps/emqx_management/src/emqx_mgmt_api.erl b/apps/emqx_management/src/emqx_mgmt_api.erl index c34791b57..7550e8bce 100644 --- a/apps/emqx_management/src/emqx_mgmt_api.erl +++ b/apps/emqx_management/src/emqx_mgmt_api.erl @@ -30,6 +30,10 @@ -export([do_query/5]). +-export([ page/1 + , limit/1 + ]). + paginate(Tables, Params, RowFun) -> Qh = query_handle(Tables), Count = count(Tables), diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 464df33c9..01e14987f 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -64,8 +64,10 @@ list(Bindings, Params) when map_size(Bindings) == 0 -> case proplists:get_value(<<"topic">>, Params) of undefined -> minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}); - Topic -> - minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)}) + Topic0 -> + Topic = emqx_mgmt_util:urldecode(Topic0), + Data = emqx_mgmt:list_subscriptions_via_topic(Topic, ?format_fun), + minirest:return({ok, add_meta(Params, Data)}) end; list(#{node := Node} = Bindings, Params) -> @@ -80,10 +82,28 @@ list(#{node := Node} = Bindings, Params) -> Res -> Res end end; - Topic -> - minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)}) + Topic0 -> + Topic = emqx_mgmt_util:urldecode(Topic0), + Data = emqx_mgmt:list_subscriptions_via_topic(Node, Topic, ?format_fun), + minirest:return({ok, add_meta(Params, Data)}) end. +add_meta(Params, List) -> + Page = emqx_mgmt_api:page(Params), + Limit = emqx_mgmt_api:limit(Params), + Count = erlang:length(List), + Start = (Page - 1) * Limit + 1, + Data = lists:sublist(List, Start, Limit), + #{meta => #{ + page => Page, + limit => Limit, + hasnext => Start + Limit - 1 < Count, + count => Count + }, + data => Data, + code => 0 + }. + lookup(#{node := Node, clientid := ClientId}, _Params) -> minirest:return({ok, emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId), ?format_fun)}); diff --git a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl index bbec02722..5d0dab7ea 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_SUITE.erl @@ -551,6 +551,8 @@ t_routes_and_subscriptions(_) -> [Subscription] = get(<<"data">>, Result3), ?assertEqual(Topic, maps:get(<<"topic">>, Subscription)), ?assertEqual(ClientId, maps:get(<<"clientid">>, Subscription)), + ?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 1}, + get(<<"meta">>, Result3)), {ok, Result3} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), auth_header_()), @@ -561,6 +563,61 @@ t_routes_and_subscriptions(_) -> ok = emqtt:disconnect(C1). +t_subscription_topic(_Config) -> + ClientId = <<"myclient">>, + Topic = <<"topic">>, + Query = "topic=" ++ binary_to_list(Topic), + {ok, NonSubscription} = request_api(get, api_path(["subscriptions"]), Query, auth_header_()), + ?assertEqual([], get(<<"data">>, NonSubscription)), + ?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 0}, + get(<<"meta">>, NonSubscription)), + {ok, NonSubscription1} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), + Query, auth_header_()), + ?assertEqual([], get(<<"data">>, NonSubscription1)), + ?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 0}, + get(<<"meta">>, NonSubscription)), + + Conn = + [begin + {ok, C1} = emqtt:start_link(#{clean_start => true, proto_ver => ?MQTT_PROTO_V5, + clientid => <>}), + {ok, _} = emqtt:connect(C1), + {ok, _, [2]} = emqtt:subscribe(C1, Topic, qos2), + C1 + end|| I <- lists:seq(1,10)], + + {ok, Result3} = request_api(get, api_path(["subscriptions"]), Query, auth_header_()), + [Subscription | Subscriptions] = get(<<"data">>, Result3), + ?assertEqual(Topic, maps:get(<<"topic">>, Subscription)), + ?assertEqual(9, erlang:length(Subscriptions)), + ?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 10}, + get(<<"meta">>, Result3)), + + {ok, Result3} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), Query, auth_header_()), + + ?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 10}, + get(<<"meta">>, Result3)), + + Query1 = Query ++ "&_page=1&_limit=5", + {ok, Result4} = request_api(get, api_path(["subscriptions"]), Query1, auth_header_()), + ?assertMatch(#{<<"page">> := 1, <<"limit">> := 5, <<"hasnext">> := true, <<"count">> := 10}, + get(<<"meta">>, Result4)), + ?assertEqual(5, erlang:length(get(<<"data">>, Result4))), + + Query2 = Query ++ "&_page=2&_limit=5", + {ok, Result5} = request_api(get, api_path(["subscriptions"]), Query2, auth_header_()), + ?assertMatch(#{<<"page">> := 2, <<"limit">> := 5, <<"hasnext">> := false, <<"count">> := 10}, + get(<<"meta">>, Result5)), + ?assertEqual(5, erlang:length(get(<<"data">>, Result4))), + + Query3 = Query ++ "&_page=3&_limit=3", + {ok, Result6} = request_api(get, api_path(["subscriptions"]), Query3, auth_header_()), + ?assertMatch(#{<<"page">> := 3, <<"limit">> := 3, <<"hasnext">> := true, <<"count">> := 10}, + get(<<"meta">>, Result6)), + + [ok = emqtt:disconnect(C1) ||C1 <- Conn], + ok. + t_stats(_) -> {ok, _} = request_api(get, api_path(["stats"]), auth_header_()), {ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "stats"]), auth_header_()),