Merge pull request #6814 from zhongwencool/subscribe-api-meta
fix(api): sub api return meta
This commit is contained in:
commit
4af01bb601
|
@ -30,6 +30,10 @@
|
||||||
|
|
||||||
-export([do_query/5]).
|
-export([do_query/5]).
|
||||||
|
|
||||||
|
-export([ page/1
|
||||||
|
, limit/1
|
||||||
|
]).
|
||||||
|
|
||||||
paginate(Tables, Params, RowFun) ->
|
paginate(Tables, Params, RowFun) ->
|
||||||
Qh = query_handle(Tables),
|
Qh = query_handle(Tables),
|
||||||
Count = count(Tables),
|
Count = count(Tables),
|
||||||
|
|
|
@ -64,8 +64,10 @@ list(Bindings, Params) when map_size(Bindings) == 0 ->
|
||||||
case proplists:get_value(<<"topic">>, Params) of
|
case proplists:get_value(<<"topic">>, Params) of
|
||||||
undefined ->
|
undefined ->
|
||||||
minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
||||||
Topic ->
|
Topic0 ->
|
||||||
minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)})
|
Topic = emqx_mgmt_util:urldecode(Topic0),
|
||||||
|
Data = emqx_mgmt:list_subscriptions_via_topic(Topic, ?format_fun),
|
||||||
|
minirest:return({ok, add_meta(Params, Data)})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
list(#{node := Node} = Bindings, Params) ->
|
list(#{node := Node} = Bindings, Params) ->
|
||||||
|
@ -80,10 +82,28 @@ list(#{node := Node} = Bindings, Params) ->
|
||||||
Res -> Res
|
Res -> Res
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
Topic ->
|
Topic0 ->
|
||||||
minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)})
|
Topic = emqx_mgmt_util:urldecode(Topic0),
|
||||||
|
Data = emqx_mgmt:list_subscriptions_via_topic(Node, Topic, ?format_fun),
|
||||||
|
minirest:return({ok, add_meta(Params, Data)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
add_meta(Params, List) ->
|
||||||
|
Page = emqx_mgmt_api:page(Params),
|
||||||
|
Limit = emqx_mgmt_api:limit(Params),
|
||||||
|
Count = erlang:length(List),
|
||||||
|
Start = (Page - 1) * Limit + 1,
|
||||||
|
Data = lists:sublist(List, Start, Limit),
|
||||||
|
#{meta => #{
|
||||||
|
page => Page,
|
||||||
|
limit => Limit,
|
||||||
|
hasnext => Start + Limit - 1 < Count,
|
||||||
|
count => Count
|
||||||
|
},
|
||||||
|
data => Data,
|
||||||
|
code => 0
|
||||||
|
}.
|
||||||
|
|
||||||
lookup(#{node := Node, clientid := ClientId}, _Params) ->
|
lookup(#{node := Node, clientid := ClientId}, _Params) ->
|
||||||
minirest:return({ok, emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId), ?format_fun)});
|
minirest:return({ok, emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId), ?format_fun)});
|
||||||
|
|
||||||
|
|
|
@ -551,6 +551,8 @@ t_routes_and_subscriptions(_) ->
|
||||||
[Subscription] = get(<<"data">>, Result3),
|
[Subscription] = get(<<"data">>, Result3),
|
||||||
?assertEqual(Topic, maps:get(<<"topic">>, Subscription)),
|
?assertEqual(Topic, maps:get(<<"topic">>, Subscription)),
|
||||||
?assertEqual(ClientId, maps:get(<<"clientid">>, Subscription)),
|
?assertEqual(ClientId, maps:get(<<"clientid">>, Subscription)),
|
||||||
|
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 1},
|
||||||
|
get(<<"meta">>, Result3)),
|
||||||
|
|
||||||
{ok, Result3} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), auth_header_()),
|
{ok, Result3} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), auth_header_()),
|
||||||
|
|
||||||
|
@ -561,6 +563,61 @@ t_routes_and_subscriptions(_) ->
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_subscription_topic(_Config) ->
|
||||||
|
ClientId = <<"myclient">>,
|
||||||
|
Topic = <<"topic">>,
|
||||||
|
Query = "topic=" ++ binary_to_list(Topic),
|
||||||
|
{ok, NonSubscription} = request_api(get, api_path(["subscriptions"]), Query, auth_header_()),
|
||||||
|
?assertEqual([], get(<<"data">>, NonSubscription)),
|
||||||
|
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 0},
|
||||||
|
get(<<"meta">>, NonSubscription)),
|
||||||
|
{ok, NonSubscription1} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]),
|
||||||
|
Query, auth_header_()),
|
||||||
|
?assertEqual([], get(<<"data">>, NonSubscription1)),
|
||||||
|
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 0},
|
||||||
|
get(<<"meta">>, NonSubscription)),
|
||||||
|
|
||||||
|
Conn =
|
||||||
|
[begin
|
||||||
|
{ok, C1} = emqtt:start_link(#{clean_start => true, proto_ver => ?MQTT_PROTO_V5,
|
||||||
|
clientid => <<ClientId/binary, (integer_to_binary(I))/binary>>}),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
{ok, _, [2]} = emqtt:subscribe(C1, Topic, qos2),
|
||||||
|
C1
|
||||||
|
end|| I <- lists:seq(1,10)],
|
||||||
|
|
||||||
|
{ok, Result3} = request_api(get, api_path(["subscriptions"]), Query, auth_header_()),
|
||||||
|
[Subscription | Subscriptions] = get(<<"data">>, Result3),
|
||||||
|
?assertEqual(Topic, maps:get(<<"topic">>, Subscription)),
|
||||||
|
?assertEqual(9, erlang:length(Subscriptions)),
|
||||||
|
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 10},
|
||||||
|
get(<<"meta">>, Result3)),
|
||||||
|
|
||||||
|
{ok, Result3} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), Query, auth_header_()),
|
||||||
|
|
||||||
|
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 10},
|
||||||
|
get(<<"meta">>, Result3)),
|
||||||
|
|
||||||
|
Query1 = Query ++ "&_page=1&_limit=5",
|
||||||
|
{ok, Result4} = request_api(get, api_path(["subscriptions"]), Query1, auth_header_()),
|
||||||
|
?assertMatch(#{<<"page">> := 1, <<"limit">> := 5, <<"hasnext">> := true, <<"count">> := 10},
|
||||||
|
get(<<"meta">>, Result4)),
|
||||||
|
?assertEqual(5, erlang:length(get(<<"data">>, Result4))),
|
||||||
|
|
||||||
|
Query2 = Query ++ "&_page=2&_limit=5",
|
||||||
|
{ok, Result5} = request_api(get, api_path(["subscriptions"]), Query2, auth_header_()),
|
||||||
|
?assertMatch(#{<<"page">> := 2, <<"limit">> := 5, <<"hasnext">> := false, <<"count">> := 10},
|
||||||
|
get(<<"meta">>, Result5)),
|
||||||
|
?assertEqual(5, erlang:length(get(<<"data">>, Result4))),
|
||||||
|
|
||||||
|
Query3 = Query ++ "&_page=3&_limit=3",
|
||||||
|
{ok, Result6} = request_api(get, api_path(["subscriptions"]), Query3, auth_header_()),
|
||||||
|
?assertMatch(#{<<"page">> := 3, <<"limit">> := 3, <<"hasnext">> := true, <<"count">> := 10},
|
||||||
|
get(<<"meta">>, Result6)),
|
||||||
|
|
||||||
|
[ok = emqtt:disconnect(C1) ||C1 <- Conn],
|
||||||
|
ok.
|
||||||
|
|
||||||
t_stats(_) ->
|
t_stats(_) ->
|
||||||
{ok, _} = request_api(get, api_path(["stats"]), auth_header_()),
|
{ok, _} = request_api(get, api_path(["stats"]), auth_header_()),
|
||||||
{ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "stats"]), auth_header_()),
|
{ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "stats"]), auth_header_()),
|
||||||
|
|
Loading…
Reference in New Issue