chore: add node param in subscription
This commit is contained in:
parent
054406322a
commit
1064278db1
|
@ -283,7 +283,7 @@ list_client_subscriptions(ClientId) ->
|
|||
end.
|
||||
|
||||
client_subscriptions(Node, ClientId) when Node =:= node() ->
|
||||
emqx_broker:subscriptions(ClientId);
|
||||
{Node, emqx_broker:subscriptions(ClientId)};
|
||||
|
||||
client_subscriptions(Node, ClientId) ->
|
||||
rpc_call(Node, client_subscriptions, [Node, ClientId]).
|
||||
|
|
|
@ -389,15 +389,6 @@ clients_authz_cache_api() ->
|
|||
{"/clients/:clientid/authz_cache", Metadata, authz_cache}.
|
||||
|
||||
clients_subscriptions_api() ->
|
||||
SubscriptionSchema = #{
|
||||
type => object,
|
||||
properties => #{
|
||||
topic => #{
|
||||
type => string},
|
||||
qos => #{
|
||||
type => integer,
|
||||
enum => [0,1,2]}}
|
||||
},
|
||||
Metadata = #{
|
||||
get => #{
|
||||
description => <<"Get client subscriptions">>,
|
||||
|
@ -409,7 +400,7 @@ clients_subscriptions_api() ->
|
|||
}],
|
||||
responses => #{
|
||||
<<"200">> =>
|
||||
emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, SubscriptionSchema)}}
|
||||
emqx_mgmt_util:response_array_schema(<<"Get client subscriptions">>, subscription)}}
|
||||
},
|
||||
{"/clients/:clientid/subscriptions", Metadata, subscriptions}.
|
||||
|
||||
|
@ -513,9 +504,9 @@ subscribe_batch(post, Request) ->
|
|||
|
||||
subscriptions(get, Request) ->
|
||||
ClientID = cowboy_req:binding(clientid, Request),
|
||||
Subs0 = emqx_mgmt:list_client_subscriptions(ClientID),
|
||||
{Node, Subs0} = emqx_mgmt:list_client_subscriptions(ClientID),
|
||||
Subs = lists:map(fun({Topic, SubOpts}) ->
|
||||
#{topic => Topic, qos => maps:get(qos, SubOpts)}
|
||||
#{node => Node, clientid => ClientID, topic => Topic, qos => maps:get(qos, SubOpts)}
|
||||
end, Subs0),
|
||||
{200, Subs}.
|
||||
|
||||
|
|
|
@ -67,6 +67,12 @@ subscriptions_api() ->
|
|||
description => <<"Client ID">>,
|
||||
schema => #{type => string}
|
||||
},
|
||||
#{
|
||||
name => node,
|
||||
in => query,
|
||||
description => <<"Node name">>,
|
||||
schema => #{type => string}
|
||||
},
|
||||
#{
|
||||
name => qos,
|
||||
in => query,
|
||||
|
@ -101,6 +107,8 @@ subscription_schema() ->
|
|||
subscription => #{
|
||||
type => object,
|
||||
properties => #{
|
||||
node => #{
|
||||
type => string},
|
||||
topic => #{
|
||||
type => string},
|
||||
clientid => #{
|
||||
|
@ -115,8 +123,12 @@ subscriptions(get, Request) ->
|
|||
list(Params).
|
||||
|
||||
list(Params) ->
|
||||
{200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)}.
|
||||
|
||||
case proplists:get_value(<<"node">>, Params, undefined) of
|
||||
undefined ->
|
||||
{200, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)};
|
||||
Node ->
|
||||
{200, emqx_mgmt_api:node_query(binary_to_atom(Node, utf8), Params, ?SUBS_QS_SCHEMA, ?query_fun)}
|
||||
end.
|
||||
|
||||
format(Items) when is_list(Items) ->
|
||||
[format(Item) || Item <- Items];
|
||||
|
@ -126,10 +138,20 @@ format({{Subscriber, Topic}, Options}) ->
|
|||
|
||||
format({_Subscriber, Topic, Options = #{share := Group}}) ->
|
||||
QoS = maps:get(qos, Options),
|
||||
#{topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS};
|
||||
#{
|
||||
topic => filename:join([<<"$share">>, Group, Topic]),
|
||||
clientid => maps:get(subid, Options),
|
||||
qos => QoS,
|
||||
node => node()
|
||||
};
|
||||
format({_Subscriber, Topic, Options}) ->
|
||||
QoS = maps:get(qos, Options),
|
||||
#{topic => Topic, clientid => maps:get(subid, Options), qos => QoS}.
|
||||
#{
|
||||
topic => Topic,
|
||||
clientid => maps:get(subid, Options),
|
||||
qos => QoS,
|
||||
node => node()
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Query Function
|
||||
|
|
Loading…
Reference in New Issue