Merge pull request #7794 from lafirest/fix/sub_api_return

fix(mgmt): add subscribe options support for some APIS
This commit is contained in:
lafirest 2022-04-27 16:51:21 +08:00 committed by GitHub
commit 291b6a2530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 52 deletions

View File

@ -513,7 +513,10 @@ fields(keepalive) ->
fields(subscribe) -> fields(subscribe) ->
[ [
{topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})}, {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})},
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})} {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})},
{nl, hoconsc:mk(integer(), #{default => 0, desc => <<"No Local">>})},
{rap, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain as Published">>})},
{rh, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain Handling">>})}
]; ];
fields(unsubscribe) -> fields(unsubscribe) ->
[ [
@ -536,9 +539,8 @@ authz_cache(delete, #{bindings := Bindings}) ->
clean_authz_cache(Bindings). clean_authz_cache(Bindings).
subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Topic = maps:get(<<"topic">>, TopicInfo), Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo),
Qos = maps:get(<<"qos">>, TopicInfo, 0), subscribe(Opts#{clientid => ClientID}).
subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}).
unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Topic = maps:get(<<"topic">>, TopicInfo), Topic = maps:get(<<"topic">>, TopicInfo),
@ -548,11 +550,7 @@ unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
Topics = Topics =
[ [
begin emqx_map_lib:unsafe_atom_key_map(TopicInfo)
Topic = maps:get(<<"topic">>, TopicInfo),
Qos = maps:get(<<"qos">>, TopicInfo, 0),
#{topic => Topic, qos => Qos}
end
|| TopicInfo <- TopicInfos || TopicInfo <- TopicInfos
], ],
subscribe_batch(#{clientid => ClientID, topics => Topics}). subscribe_batch(#{clientid => ClientID, topics => Topics}).
@ -564,12 +562,14 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
{Node, Subs} -> {Node, Subs} ->
Formatter = Formatter =
fun({Topic, SubOpts}) -> fun({Topic, SubOpts}) ->
#{ maps:merge(
node => Node, #{
clientid => ClientID, node => Node,
topic => Topic, clientid => ClientID,
qos => maps:get(qos, SubOpts) topic => Topic
} },
maps:with([qos, nl, rap, rh], SubOpts)
)
end, end,
{200, lists:map(Formatter, Subs)} {200, lists:map(Formatter, Subs)}
end. end.
@ -659,21 +659,16 @@ clean_authz_cache(#{clientid := ClientID}) ->
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}} {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}
end. end.
subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) -> subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
case do_subscribe(ClientID, Topic, Qos) of Opts = maps:with([qos, nl, rap, rh], Sub),
case do_subscribe(ClientID, Topic, Opts) of
{error, channel_not_found} -> {error, channel_not_found} ->
{404, ?CLIENT_ID_NOT_FOUND}; {404, ?CLIENT_ID_NOT_FOUND};
{error, Reason} -> {error, Reason} ->
Message = list_to_binary(io_lib:format("~p", [Reason])), Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; {500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
{ok, Node} -> {ok, Node} ->
Response = Response = Sub#{node => Node},
#{
clientid => ClientID,
topic => Topic,
qos => Qos,
node => Node
},
{200, Response} {200, Response}
end. end.
@ -686,15 +681,18 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) ->
end. end.
subscribe_batch(#{clientid := ClientID, topics := Topics}) -> subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
ArgList = [[ClientID, Topic, Qos] || #{topic := Topic, qos := Qos} <- Topics], ArgList = [
[ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)]
|| #{topic := Topic} = Sub <- Topics
],
emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList). emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% internal function %% internal function
do_subscribe(ClientID, Topic0, Qos) -> do_subscribe(ClientID, Topic0, Options) ->
{Topic, Opts} = emqx_topic:parse(Topic0), {Topic, Opts} = emqx_topic:parse(Topic0, Options),
TopicTable = [{Topic, Opts#{qos => Qos}}], TopicTable = [{Topic, Opts}],
case emqx_mgmt:subscribe(ClientID, TopicTable) of case emqx_mgmt:subscribe(ClientID, TopicTable) of
{error, Reason} -> {error, Reason} ->
{error, Reason}; {error, Reason};

View File

@ -72,7 +72,10 @@ fields(subscription) ->
{node, hoconsc:mk(binary(), #{desc => <<"Access type">>})}, {node, hoconsc:mk(binary(), #{desc => <<"Access type">>})},
{topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})}, {topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})},
{clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})}, {clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})},
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})} {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})},
{nl, hoconsc:mk(integer(), #{desc => <<"No Local">>})},
{rap, hoconsc:mk(integer(), #{desc => <<"Retain as Published">>})},
{rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>})}
]. ].
parameters() -> parameters() ->
@ -163,22 +166,20 @@ format(Items) when is_list(Items) ->
[format(Item) || Item <- Items]; [format(Item) || Item <- Items];
format({{Subscriber, Topic}, Options}) -> format({{Subscriber, Topic}, Options}) ->
format({Subscriber, Topic, Options}); format({Subscriber, Topic, Options});
format({_Subscriber, Topic, Options = #{share := Group}}) ->
QoS = maps:get(qos, Options),
#{
topic => filename:join([<<"$share">>, Group, Topic]),
clientid => maps:get(subid, Options),
qos => QoS,
node => node()
};
format({_Subscriber, Topic, Options}) -> format({_Subscriber, Topic, Options}) ->
QoS = maps:get(qos, Options), maps:merge(
#{ #{
topic => Topic, topic => get_topic(Topic, Options),
clientid => maps:get(subid, Options), clientid => maps:get(subid, Options),
qos => QoS, node => node()
node => node() },
}. maps:with([qos, nl, rap, rh], Options)
).
get_topic(Topic, #{share := Group}) ->
filename:join([<<"$share">>, Group, Topic]);
get_topic(Topic, _) ->
Topic.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Query Function %% Query Function

View File

@ -43,7 +43,9 @@ t_clients(_) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
{ok, C1} = emqtt:start_link(#{username => Username1, clientid => ClientId1}), {ok, C1} = emqtt:start_link(#{
username => Username1, clientid => ClientId1, proto_ver => v5
}),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
{ok, C2} = emqtt:start_link(#{username => Username2, clientid => ClientId2}), {ok, C2} = emqtt:start_link(#{username => Username2, clientid => ClientId2}),
{ok, _} = emqtt:connect(C2), {ok, _} = emqtt:connect(C2),
@ -87,7 +89,7 @@ t_clients(_) ->
?assertEqual("[]", Client1AuthzCache), ?assertEqual("[]", Client1AuthzCache),
%% post /clients/:clientid/subscribe %% post /clients/:clientid/subscribe
SubscribeBody = #{topic => Topic, qos => Qos}, SubscribeBody = #{topic => Topic, qos => Qos, nl => 1, rh => 1},
SubscribePath = emqx_mgmt_api_test_util:api_path([ SubscribePath = emqx_mgmt_api_test_util:api_path([
"clients", "clients",
binary_to_list(ClientId1), binary_to_list(ClientId1),
@ -105,6 +107,32 @@ t_clients(_) ->
?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubTopic, Topic),
?assertEqual(AfterSubQos, Qos), ?assertEqual(AfterSubQos, Qos),
%% get /clients/:clientid/subscriptions
SubscriptionsPath = emqx_mgmt_api_test_util:api_path([
"clients",
binary_to_list(ClientId1),
"subscriptions"
]),
{ok, SubscriptionsRes} = emqx_mgmt_api_test_util:request_api(
get,
SubscriptionsPath,
"",
AuthHeader
),
[SubscriptionsData] = emqx_json:decode(SubscriptionsRes, [return_maps]),
?assertMatch(
#{
<<"clientid">> := ClientId1,
<<"nl">> := 1,
<<"rap">> := 0,
<<"rh">> := 1,
<<"node">> := _,
<<"qos">> := Qos,
<<"topic">> := Topic
},
SubscriptionsData
),
%% post /clients/:clientid/unsubscribe %% post /clients/:clientid/unsubscribe
UnSubscribePath = emqx_mgmt_api_test_util:api_path([ UnSubscribePath = emqx_mgmt_api_test_util:api_path([
"clients", "clients",

View File

@ -25,6 +25,10 @@
%% notice: integer topic for sort response %% notice: integer topic for sort response
-define(TOPIC1, <<"t/0000">>). -define(TOPIC1, <<"t/0000">>).
-define(TOPIC1RH, 1).
-define(TOPIC1RAP, false).
-define(TOPIC1NL, false).
-define(TOPIC1QOS, 1).
-define(TOPIC2, <<"$share/test_group/t/0001">>). -define(TOPIC2, <<"$share/test_group/t/0001">>).
-define(TOPIC2_TOPIC_ONLY, <<"t/0001">>). -define(TOPIC2_TOPIC_ONLY, <<"t/0001">>).
@ -41,9 +45,13 @@ end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite(). emqx_mgmt_api_test_util:end_suite().
t_subscription_api(_) -> t_subscription_api(_) ->
{ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID}), {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}),
{ok, _} = emqtt:connect(Client), {ok, _} = emqtt:connect(Client),
{ok, _, _} = emqtt:subscribe(Client, ?TOPIC1), {ok, _, _} = emqtt:subscribe(
Client, [
{?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]}
]
),
{ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2),
Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path),
@ -59,9 +67,21 @@ t_subscription_api(_) ->
maps:get(T1, ?TOPIC_SORT) =< maps:get(T2, ?TOPIC_SORT) maps:get(T1, ?TOPIC_SORT) =< maps:get(T2, ?TOPIC_SORT)
end, end,
[Subscriptions1, Subscriptions2] = lists:sort(Sort, Subscriptions), [Subscriptions1, Subscriptions2] = lists:sort(Sort, Subscriptions),
?assertEqual(maps:get(<<"topic">>, Subscriptions1), ?TOPIC1),
?assertMatch(
#{
<<"topic">> := ?TOPIC1,
<<"qos">> := ?TOPIC1QOS,
<<"nl">> := _,
<<"rap">> := _,
<<"rh">> := ?TOPIC1RH,
<<"clientid">> := ?CLIENTID,
<<"node">> := _
},
Subscriptions1
),
?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2), ?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2),
?assertEqual(maps:get(<<"clientid">>, Subscriptions1), ?CLIENTID),
?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID), ?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID),
QS = uri_string:compose_query([ QS = uri_string:compose_query([
@ -94,8 +114,8 @@ t_subscription_api(_) ->
MatchMeta = maps:get(<<"meta">>, MatchData), MatchMeta = maps:get(<<"meta">>, MatchData),
?assertEqual(1, maps:get(<<"page">>, MatchMeta)), ?assertEqual(1, maps:get(<<"page">>, MatchMeta)),
?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)), ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)),
?assertEqual(2, maps:get(<<"count">>, MatchMeta)), ?assertEqual(1, maps:get(<<"count">>, MatchMeta)),
MatchSubs = maps:get(<<"data">>, MatchData), MatchSubs = maps:get(<<"data">>, MatchData),
?assertEqual(length(MatchSubs), 2), ?assertEqual(1, length(MatchSubs)),
emqtt:disconnect(Client). emqtt:disconnect(Client).