201 lines
7.2 KiB
Erlang
201 lines
7.2 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt_api_subscriptions).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
|
-define(SUBS_QS_SCHEMA, {emqx_suboption,
|
|
[{<<"clientid">>, binary},
|
|
{<<"topic">>, binary},
|
|
{<<"share">>, binary},
|
|
{<<"qos">>, integer},
|
|
{<<"_match_topic">>, binary}]}).
|
|
|
|
-rest_api(#{name => list_subscriptions,
|
|
method => 'GET',
|
|
path => "/subscriptions/",
|
|
func => list,
|
|
descr => "A list of subscriptions in the cluster"}).
|
|
|
|
-rest_api(#{name => list_node_subscriptions,
|
|
method => 'GET',
|
|
path => "/nodes/:atom:node/subscriptions/",
|
|
func => list,
|
|
descr => "A list of subscriptions on a node"}).
|
|
|
|
-rest_api(#{name => lookup_client_subscriptions,
|
|
method => 'GET',
|
|
path => "/subscriptions/:bin:clientid",
|
|
func => lookup,
|
|
descr => "A list of subscriptions of a client"}).
|
|
|
|
-rest_api(#{name => lookup_client_subscriptions_with_node,
|
|
method => 'GET',
|
|
path => "/nodes/:atom:node/subscriptions/:bin:clientid",
|
|
func => lookup,
|
|
descr => "A list of subscriptions of a client on the node"}).
|
|
|
|
-export([ list/2
|
|
, lookup/2
|
|
]).
|
|
|
|
-export([ query/3
|
|
, format/1
|
|
]).
|
|
|
|
-define(query_fun, {?MODULE, query}).
|
|
-define(format_fun, {?MODULE, format}).
|
|
|
|
list(Bindings, Params) when map_size(Bindings) == 0 ->
|
|
case proplists:get_value(<<"topic">>, Params) of
|
|
undefined ->
|
|
minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
|
Topic0 ->
|
|
Topic = emqx_mgmt_util:urldecode(Topic0),
|
|
Data = emqx_mgmt:list_subscriptions_via_topic(Topic, ?format_fun),
|
|
FilterData = filter_subscriptions(Data, Params),
|
|
minirest:return({ok, add_meta(Params, FilterData)})
|
|
end;
|
|
|
|
list(#{node := Node} = Bindings, Params) ->
|
|
case proplists:get_value(<<"topic">>, Params) of
|
|
undefined ->
|
|
case Node =:= node() of
|
|
true ->
|
|
minirest:return({ok, emqx_mgmt_api:node_query(Node, Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
|
false ->
|
|
case rpc:call(Node, ?MODULE, list, [Bindings, Params]) of
|
|
{badrpc, Reason} -> minirest:return({error, Reason});
|
|
Res -> Res
|
|
end
|
|
end;
|
|
Topic0 ->
|
|
Topic = emqx_mgmt_util:urldecode(Topic0),
|
|
Data = emqx_mgmt:list_subscriptions_via_topic(Node, Topic, ?format_fun),
|
|
FilterData = filter_subscriptions(Data, Params),
|
|
minirest:return({ok, add_meta(Params, FilterData)})
|
|
end.
|
|
|
|
add_meta(Params, List) ->
|
|
Page = emqx_mgmt_api:page(Params),
|
|
Limit = emqx_mgmt_api:limit(Params),
|
|
Count = erlang:length(List),
|
|
Start = (Page - 1) * Limit + 1,
|
|
Data = lists:sublist(List, Start, Limit),
|
|
#{meta => #{
|
|
page => Page,
|
|
limit => Limit,
|
|
hasnext => Start + Limit - 1 < Count,
|
|
count => Count},
|
|
data => Data,
|
|
code => 0
|
|
}.
|
|
|
|
lookup(#{node := Node, clientid := ClientId}, _Params) ->
|
|
minirest:return({ok, emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId), ?format_fun)});
|
|
|
|
lookup(#{clientid := ClientId}, _Params) ->
|
|
minirest:return({ok, emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId), ?format_fun)}).
|
|
|
|
format(Items) when is_list(Items) ->
|
|
[format(Item) || Item <- Items];
|
|
|
|
format({{Subscriber, Topic}, Options}) ->
|
|
format({Subscriber, Topic, Options});
|
|
|
|
format({_Subscriber, Topic, Options = #{share := Group}}) ->
|
|
QoS = maps:get(qos, Options),
|
|
#{node => node(), topic => filename:join([<<"$share">>, Group, Topic]), clientid => maps:get(subid, Options), qos => QoS};
|
|
format({_Subscriber, Topic, Options}) ->
|
|
QoS = maps:get(qos, Options),
|
|
#{node => node(), topic => Topic, clientid => maps:get(subid, Options, ""), qos => QoS}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Query Function
|
|
%%--------------------------------------------------------------------
|
|
|
|
query({Qs, []}, Start, Limit) ->
|
|
Ms = qs2ms(Qs),
|
|
emqx_mgmt_api:select_table(emqx_suboption, Ms, Start, Limit, fun format/1);
|
|
|
|
query({Qs, Fuzzy}, Start, Limit) ->
|
|
Ms = qs2ms(Qs),
|
|
MatchFun = match_fun(Ms, Fuzzy),
|
|
emqx_mgmt_api:traverse_table(emqx_suboption, MatchFun, Start, Limit, fun format/1).
|
|
|
|
match_fun(Ms, Fuzzy) ->
|
|
MsC = ets:match_spec_compile(Ms),
|
|
fun(Rows) ->
|
|
case ets:match_spec_run(Rows, MsC) of
|
|
[] -> [];
|
|
Ls -> lists:filter(fun(E) -> run_fuzzy_match(E, Fuzzy) end, Ls)
|
|
end
|
|
end.
|
|
|
|
run_fuzzy_match(_, []) ->
|
|
true;
|
|
run_fuzzy_match(E = {{_, Topic}, _}, [{topic, match, TopicFilter}|Fuzzy]) ->
|
|
emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_match(E, Fuzzy).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Query String to Match Spec
|
|
|
|
qs2ms(Qs) ->
|
|
MtchHead = qs2ms(Qs, {{'_', '_'}, #{}}),
|
|
[{MtchHead, [], ['$_']}].
|
|
|
|
qs2ms([], MtchHead) ->
|
|
MtchHead;
|
|
qs2ms([{Key, '=:=', Value} | More], MtchHead) ->
|
|
qs2ms(More, update_ms(Key, Value, MtchHead)).
|
|
|
|
update_ms(clientid, X, {{Pid, Topic}, Opts}) ->
|
|
{{Pid, Topic}, Opts#{subid => X}};
|
|
update_ms(topic, X, {{Pid, _Topic}, Opts}) ->
|
|
{{Pid, X}, Opts};
|
|
update_ms(share, X, {{Pid, Topic}, Opts}) ->
|
|
{{Pid, Topic}, Opts#{share => X}};
|
|
update_ms(qos, X, {{Pid, Topic}, Opts}) ->
|
|
{{Pid, Topic}, Opts#{qos => X}}.
|
|
|
|
filter_subscriptions(Data0, Params) ->
|
|
Data1 = filter_by_key(qos, qos(Params), Data0),
|
|
Data2 = filter_by_key(clientid, proplists:get_value(<<"clientid">>, Params), Data1),
|
|
case proplists:get_value(<<"share">>, Params) of
|
|
undefined -> Data2;
|
|
Share ->
|
|
Prefix = filename:join([<<"$share">>, Share]),
|
|
Size = byte_size(Prefix),
|
|
lists:filter(fun(#{topic := Topic}) ->
|
|
case Topic of
|
|
<<Prefix:Size/binary, _/binary>> -> true;
|
|
_ -> false
|
|
end
|
|
end,
|
|
Data2)
|
|
end.
|
|
|
|
qos(Params) ->
|
|
case proplists:get_value(<<"qos">>, Params) of
|
|
undefined -> undefined;
|
|
Qos when is_integer(Qos) -> Qos;
|
|
Qos when is_binary(Qos) -> binary_to_integer(Qos)
|
|
end.
|
|
|
|
filter_by_key(_Key, undefined, List) -> List;
|
|
filter_by_key(Key, Value, List) -> lists:filter(fun(E) -> Value =:= maps:get(Key, E) end, List).
|