From 6b8401625991dacdc539bf07781d9f7e36479793 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 27 Apr 2022 11:34:57 +0800 Subject: [PATCH 1/3] fix(mgmt): add subscribe options into the result of the /subscriptions API --- .../src/emqx_mgmt_api_subscriptions.erl | 33 ++++++++++--------- .../test/emqx_mgmt_api_subscription_SUITE.erl | 32 ++++++++++++++---- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index be3d3f293..8bd418d43 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -72,7 +72,10 @@ fields(subscription) -> {node, hoconsc:mk(binary(), #{desc => <<"Access type">>})}, {topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})}, {clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})}, - {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})} + {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})}, + {nl, hoconsc:mk(integer(), #{desc => <<"No Local">>})}, + {rap, hoconsc:mk(integer(), #{desc => <<"Retain as Published">>})}, + {rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>})} ]. parameters() -> @@ -163,22 +166,20 @@ format(Items) when is_list(Items) -> [format(Item) || Item <- Items]; format({{Subscriber, Topic}, Options}) -> format({Subscriber, Topic, Options}); -format({_Subscriber, Topic, Options = #{share := Group}}) -> - QoS = maps:get(qos, Options), - #{ - topic => filename:join([<<"$share">>, Group, Topic]), - clientid => maps:get(subid, Options), - qos => QoS, - node => node() - }; format({_Subscriber, Topic, Options}) -> - QoS = maps:get(qos, Options), - #{ - topic => Topic, - clientid => maps:get(subid, Options), - qos => QoS, - node => node() - }. + maps:merge( + #{ + topic => get_topic(Topic, Options), + clientid => maps:get(subid, Options), + node => node() + }, + maps:with([qos, nl, rap, rh], Options) + ). + +get_topic(Topic, #{share := Group}) -> + filename:join([<<"$share">>, Group, Topic]); +get_topic(Topic, _) -> + Topic. %%-------------------------------------------------------------------- %% Query Function diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index 89c36d933..d1cf4e418 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -25,6 +25,10 @@ %% notice: integer topic for sort response -define(TOPIC1, <<"t/0000">>). +-define(TOPIC1RH, 1). +-define(TOPIC1RAP, false). +-define(TOPIC1NL, false). +-define(TOPIC1QOS, 1). -define(TOPIC2, <<"$share/test_group/t/0001">>). -define(TOPIC2_TOPIC_ONLY, <<"t/0001">>). @@ -41,9 +45,13 @@ end_per_suite(_) -> emqx_mgmt_api_test_util:end_suite(). t_subscription_api(_) -> - {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID}), + {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}), {ok, _} = emqtt:connect(Client), - {ok, _, _} = emqtt:subscribe(Client, ?TOPIC1), + {ok, _, _} = emqtt:subscribe( + Client, [ + {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]} + ] + ), {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), @@ -59,9 +67,21 @@ t_subscription_api(_) -> maps:get(T1, ?TOPIC_SORT) =< maps:get(T2, ?TOPIC_SORT) end, [Subscriptions1, Subscriptions2] = lists:sort(Sort, Subscriptions), - ?assertEqual(maps:get(<<"topic">>, Subscriptions1), ?TOPIC1), + + ?assertMatch( + #{ + <<"topic">> := ?TOPIC1, + <<"qos">> := ?TOPIC1QOS, + <<"nl">> := _, + <<"rap">> := _, + <<"rh">> := ?TOPIC1RH, + <<"clientid">> := ?CLIENTID, + <<"node">> := _ + }, + Subscriptions1 + ), + ?assertEqual(maps:get(<<"topic">>, Subscriptions2), ?TOPIC2), - ?assertEqual(maps:get(<<"clientid">>, Subscriptions1), ?CLIENTID), ?assertEqual(maps:get(<<"clientid">>, Subscriptions2), ?CLIENTID), QS = uri_string:compose_query([ @@ -94,8 +114,8 @@ t_subscription_api(_) -> MatchMeta = maps:get(<<"meta">>, MatchData), ?assertEqual(1, maps:get(<<"page">>, MatchMeta)), ?assertEqual(emqx_mgmt:max_row_limit(), maps:get(<<"limit">>, MatchMeta)), - ?assertEqual(2, maps:get(<<"count">>, MatchMeta)), + ?assertEqual(1, maps:get(<<"count">>, MatchMeta)), MatchSubs = maps:get(<<"data">>, MatchData), - ?assertEqual(length(MatchSubs), 2), + ?assertEqual(1, length(MatchSubs)), emqtt:disconnect(Client). From 8d01e8a6973813cd44b1cf33cc011c0f7f5f7b71 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 27 Apr 2022 11:37:58 +0800 Subject: [PATCH 2/3] fix(mgmt): add subscribe options into the result of the client subscriptions API --- .../src/emqx_mgmt_api_clients.erl | 14 +++++----- .../test/emqx_mgmt_api_clients_SUITE.erl | 26 +++++++++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 8330c11b4..d5d5381cc 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -564,12 +564,14 @@ subscriptions(get, #{bindings := #{clientid := ClientID}}) -> {Node, Subs} -> Formatter = fun({Topic, SubOpts}) -> - #{ - node => Node, - clientid => ClientID, - topic => Topic, - qos => maps:get(qos, SubOpts) - } + maps:merge( + #{ + node => Node, + clientid => ClientID, + topic => Topic + }, + maps:with([qos, nl, rap, rh], SubOpts) + ) end, {200, lists:map(Formatter, Subs)} end. diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 897862b20..57bf25268 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -105,6 +105,32 @@ t_clients(_) -> ?assertEqual(AfterSubTopic, Topic), ?assertEqual(AfterSubQos, Qos), + %% get /clients/:clientid/subscriptions + SubscriptionsPath = emqx_mgmt_api_test_util:api_path([ + "clients", + binary_to_list(ClientId1), + "subscriptions" + ]), + {ok, SubscriptionsRes} = emqx_mgmt_api_test_util:request_api( + get, + SubscriptionsPath, + "", + AuthHeader + ), + [SubscriptionsData] = emqx_json:decode(SubscriptionsRes, [return_maps]), + ?assertMatch( + #{ + <<"clientid">> := ClientId1, + <<"nl">> := _, + <<"rap">> := _, + <<"rh">> := _, + <<"node">> := _, + <<"qos">> := Qos, + <<"topic">> := Topic + }, + SubscriptionsData + ), + %% post /clients/:clientid/unsubscribe UnSubscribePath = emqx_mgmt_api_test_util:api_path([ "clients", From d9c3cf5c97c9038aa34e53d1e3211ce24fe406c7 Mon Sep 17 00:00:00 2001 From: firest Date: Wed, 27 Apr 2022 12:42:22 +0800 Subject: [PATCH 3/3] fix(mgmt): add subscribe options in client subscribe API --- .../src/emqx_mgmt_api_clients.erl | 40 +++++++++---------- .../test/emqx_mgmt_api_clients_SUITE.erl | 12 +++--- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index d5d5381cc..997f84227 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -513,7 +513,10 @@ fields(keepalive) -> fields(subscribe) -> [ {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})}, - {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})} + {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})}, + {nl, hoconsc:mk(integer(), #{default => 0, desc => <<"No Local">>})}, + {rap, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain as Published">>})}, + {rh, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain Handling">>})} ]; fields(unsubscribe) -> [ @@ -536,9 +539,8 @@ authz_cache(delete, #{bindings := Bindings}) -> clean_authz_cache(Bindings). subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> - Topic = maps:get(<<"topic">>, TopicInfo), - Qos = maps:get(<<"qos">>, TopicInfo, 0), - subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}). + Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo), + subscribe(Opts#{clientid => ClientID}). unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> Topic = maps:get(<<"topic">>, TopicInfo), @@ -548,11 +550,7 @@ unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> Topics = [ - begin - Topic = maps:get(<<"topic">>, TopicInfo), - Qos = maps:get(<<"qos">>, TopicInfo, 0), - #{topic => Topic, qos => Qos} - end + emqx_map_lib:unsafe_atom_key_map(TopicInfo) || TopicInfo <- TopicInfos ], subscribe_batch(#{clientid => ClientID, topics => Topics}). @@ -661,21 +659,16 @@ clean_authz_cache(#{clientid := ClientID}) -> {500, #{code => <<"UNKNOW_ERROR">>, message => Message}} end. -subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) -> - case do_subscribe(ClientID, Topic, Qos) of +subscribe(#{clientid := ClientID, topic := Topic} = Sub) -> + Opts = maps:with([qos, nl, rap, rh], Sub), + case do_subscribe(ClientID, Topic, Opts) of {error, channel_not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; {ok, Node} -> - Response = - #{ - clientid => ClientID, - topic => Topic, - qos => Qos, - node => Node - }, + Response = Sub#{node => Node}, {200, Response} end. @@ -688,15 +681,18 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) -> end. subscribe_batch(#{clientid := ClientID, topics := Topics}) -> - ArgList = [[ClientID, Topic, Qos] || #{topic := Topic, qos := Qos} <- Topics], + ArgList = [ + [ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)] + || #{topic := Topic} = Sub <- Topics + ], emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList). %%-------------------------------------------------------------------- %% internal function -do_subscribe(ClientID, Topic0, Qos) -> - {Topic, Opts} = emqx_topic:parse(Topic0), - TopicTable = [{Topic, Opts#{qos => Qos}}], +do_subscribe(ClientID, Topic0, Options) -> + {Topic, Opts} = emqx_topic:parse(Topic0, Options), + TopicTable = [{Topic, Opts}], case emqx_mgmt:subscribe(ClientID, TopicTable) of {error, Reason} -> {error, Reason}; diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 57bf25268..24c857288 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -43,7 +43,9 @@ t_clients(_) -> AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - {ok, C1} = emqtt:start_link(#{username => Username1, clientid => ClientId1}), + {ok, C1} = emqtt:start_link(#{ + username => Username1, clientid => ClientId1, proto_ver => v5 + }), {ok, _} = emqtt:connect(C1), {ok, C2} = emqtt:start_link(#{username => Username2, clientid => ClientId2}), {ok, _} = emqtt:connect(C2), @@ -87,7 +89,7 @@ t_clients(_) -> ?assertEqual("[]", Client1AuthzCache), %% post /clients/:clientid/subscribe - SubscribeBody = #{topic => Topic, qos => Qos}, + SubscribeBody = #{topic => Topic, qos => Qos, nl => 1, rh => 1}, SubscribePath = emqx_mgmt_api_test_util:api_path([ "clients", binary_to_list(ClientId1), @@ -121,9 +123,9 @@ t_clients(_) -> ?assertMatch( #{ <<"clientid">> := ClientId1, - <<"nl">> := _, - <<"rap">> := _, - <<"rh">> := _, + <<"nl">> := 1, + <<"rap">> := 0, + <<"rh">> := 1, <<"node">> := _, <<"qos">> := Qos, <<"topic">> := Topic