diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index d5d5381cc..997f84227 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -513,7 +513,10 @@ fields(keepalive) -> fields(subscribe) -> [ {topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})}, - {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})} + {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})}, + {nl, hoconsc:mk(integer(), #{default => 0, desc => <<"No Local">>})}, + {rap, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain as Published">>})}, + {rh, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain Handling">>})} ]; fields(unsubscribe) -> [ @@ -536,9 +539,8 @@ authz_cache(delete, #{bindings := Bindings}) -> clean_authz_cache(Bindings). subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> - Topic = maps:get(<<"topic">>, TopicInfo), - Qos = maps:get(<<"qos">>, TopicInfo, 0), - subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}). + Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo), + subscribe(Opts#{clientid => ClientID}). unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> Topic = maps:get(<<"topic">>, TopicInfo), @@ -548,11 +550,7 @@ unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) -> subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> Topics = [ - begin - Topic = maps:get(<<"topic">>, TopicInfo), - Qos = maps:get(<<"qos">>, TopicInfo, 0), - #{topic => Topic, qos => Qos} - end + emqx_map_lib:unsafe_atom_key_map(TopicInfo) || TopicInfo <- TopicInfos ], subscribe_batch(#{clientid => ClientID, topics => Topics}). @@ -661,21 +659,16 @@ clean_authz_cache(#{clientid := ClientID}) -> {500, #{code => <<"UNKNOW_ERROR">>, message => Message}} end. -subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) -> - case do_subscribe(ClientID, Topic, Qos) of +subscribe(#{clientid := ClientID, topic := Topic} = Sub) -> + Opts = maps:with([qos, nl, rap, rh], Sub), + case do_subscribe(ClientID, Topic, Opts) of {error, channel_not_found} -> {404, ?CLIENT_ID_NOT_FOUND}; {error, Reason} -> Message = list_to_binary(io_lib:format("~p", [Reason])), {500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; {ok, Node} -> - Response = - #{ - clientid => ClientID, - topic => Topic, - qos => Qos, - node => Node - }, + Response = Sub#{node => Node}, {200, Response} end. @@ -688,15 +681,18 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) -> end. subscribe_batch(#{clientid := ClientID, topics := Topics}) -> - ArgList = [[ClientID, Topic, Qos] || #{topic := Topic, qos := Qos} <- Topics], + ArgList = [ + [ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)] + || #{topic := Topic} = Sub <- Topics + ], emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList). %%-------------------------------------------------------------------- %% internal function -do_subscribe(ClientID, Topic0, Qos) -> - {Topic, Opts} = emqx_topic:parse(Topic0), - TopicTable = [{Topic, Opts#{qos => Qos}}], +do_subscribe(ClientID, Topic0, Options) -> + {Topic, Opts} = emqx_topic:parse(Topic0, Options), + TopicTable = [{Topic, Opts}], case emqx_mgmt:subscribe(ClientID, TopicTable) of {error, Reason} -> {error, Reason}; diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 57bf25268..24c857288 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -43,7 +43,9 @@ t_clients(_) -> AuthHeader = emqx_mgmt_api_test_util:auth_header_(), - {ok, C1} = emqtt:start_link(#{username => Username1, clientid => ClientId1}), + {ok, C1} = emqtt:start_link(#{ + username => Username1, clientid => ClientId1, proto_ver => v5 + }), {ok, _} = emqtt:connect(C1), {ok, C2} = emqtt:start_link(#{username => Username2, clientid => ClientId2}), {ok, _} = emqtt:connect(C2), @@ -87,7 +89,7 @@ t_clients(_) -> ?assertEqual("[]", Client1AuthzCache), %% post /clients/:clientid/subscribe - SubscribeBody = #{topic => Topic, qos => Qos}, + SubscribeBody = #{topic => Topic, qos => Qos, nl => 1, rh => 1}, SubscribePath = emqx_mgmt_api_test_util:api_path([ "clients", binary_to_list(ClientId1), @@ -121,9 +123,9 @@ t_clients(_) -> ?assertMatch( #{ <<"clientid">> := ClientId1, - <<"nl">> := _, - <<"rap">> := _, - <<"rh">> := _, + <<"nl">> := 1, + <<"rap">> := 0, + <<"rh">> := 1, <<"node">> := _, <<"qos">> := Qos, <<"topic">> := Topic