diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index f7b2eb644..7fb700c55 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -283,7 +283,7 @@ list_client_subscriptions(ClientId) -> end. client_subscriptions(Node, ClientId) when Node =:= node() -> - emqx_broker:subscriptions(ClientId); + {Node, emqx_broker:subscriptions(ClientId)}; client_subscriptions(Node, ClientId) -> rpc_call(Node, client_subscriptions, [Node, ClientId]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 42217e6ad..1bc01e6d6 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -389,15 +389,6 @@ clients_authz_cache_api() -> {"/clients/:clientid/authz_cache", Metadata, authz_cache}. clients_subscriptions_api() -> - SubscriptionSchema = #{ - type => object, - properties => #{ - topic => #{ - type => string}, - qos => #{ - type => integer, - enum => [0,1,2]}} - }, Metadata = #{ get => #{ description => <<"Get client subscriptions">>, @@ -409,7 +400,7 @@ clients_subscriptions_api() -> }], responses => #{ <<"200">> => - emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, SubscriptionSchema)}} + emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}} }, {"/clients/:clientid/subscriptions", Metadata, subscriptions}. @@ -513,9 +504,9 @@ subscribe_batch(post, Request) -> subscriptions(get, Request) -> ClientID = cowboy_req:binding(clientid, Request), - Subs0 = emqx_mgmt:list_client_subscriptions(ClientID), + {Node, Subs0} = emqx_mgmt:list_client_subscriptions(ClientID), Subs = lists:map(fun({Topic, SubOpts}) -> - #{topic => Topic, qos => maps:get(qos, SubOpts)} + #{node => Node, clientid => ClientID, topic => Topic, qos => maps:get(qos, SubOpts)} end, Subs0), {200, Subs}. diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 0a098116f..27e8c898a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -67,6 +67,12 @@ subscriptions_api() -> description => <<"Client ID">>, schema => #{type => string} }, + #{ + name => node, + in => query, + description => <<"Node name">>, + schema => #{type => string} + }, #{ name => qos, in => query, @@ -101,6 +107,8 @@ subscription_schema() -> subscription => #{ type => object, properties => #{ + node => #{ + type => string}, topic => #{ type => string}, clientid => #{ @@ -115,8 +123,12 @@ subscriptions(get, Request) -> list(Params). list(Params) -> - {200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}. - + case proplists:get_value(<<"node">>, Params, undefined) of + undefined -> + {200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}; + Node -> + {200, emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params, ?SUBS_QS_SCHEMA, ?query_fun)} + end. format(Items) when is_list(Items) -> [format(Item) || Item <- Items]; @@ -126,10 +138,20 @@ format({{Subscriber, Topic}, Options}) -> format({_Subscriber, Topic, Options = #{share := Group}}) -> QoS = maps:get(qos, Options), - #{topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS}; + #{ + topic => filename:join([<<"$share">>, Group, Topic]), + clientid => maps:get(subid, Options), + qos => QoS, + node => node() + }; format({_Subscriber, Topic, Options}) -> QoS = maps:get(qos, Options), - #{topic => Topic, clientid => maps:get(subid, Options), qos => QoS}. + #{ + topic => Topic, + clientid => maps:get(subid, Options), + qos => QoS, + node => node() + }. %%-------------------------------------------------------------------- %% Query Function