emqx/apps/emqx_management/src/emqx_mgmt_api_subscriptions...

226 lines
6.9 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt_api_subscriptions).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-export([
api_spec/0,
paths/0,
schema/1,
fields/1
]).
-export([subscriptions/2]).
-export([
qs2ms/2,
run_fuzzy_filter/2,
format/2
]).
-define(SUBS_QTABLE, emqx_suboption).
-define(SUBS_QSCHEMA, [
{<<"clientid">>, binary},
{<<"topic">>, binary},
{<<"share">>, binary},
{<<"share_group">>, binary},
{<<"qos">>, integer},
{<<"match_topic">>, binary}
]).
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
["/subscriptions"].
schema("/subscriptions") ->
#{
'operationId' => subscriptions,
get => #{
description => <<"List subscriptions">>,
tags => [<<"Subscriptions">>],
parameters => parameters(),
responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscription)), #{}),
400 => emqx_dashboard_swagger:error_codes(
['INVALID_PARAMETER'], <<"Invalid parameter">>
),
500 => emqx_dashboard_swagger:error_codes(['NODE_DOWN'], <<"Bad RPC">>)
}
}
}.
fields(subscription) ->
[
{node, hoconsc:mk(binary(), #{desc => <<"Access type">>})},
{topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})},
{clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})},
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})},
{nl, hoconsc:mk(integer(), #{desc => <<"No Local">>})},
{rap, hoconsc:mk(integer(), #{desc => <<"Retain as Published">>})},
{rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>})}
].
parameters() ->
[
hoconsc:ref(emqx_dashboard_swagger, page),
hoconsc:ref(emqx_dashboard_swagger, limit),
{
node,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Node name">>,
example => atom_to_list(node())
})
},
{
clientid,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Client ID">>
})
},
{
qos,
hoconsc:mk(emqx_schema:qos(), #{
in => query,
required => false,
desc => <<"QoS">>
})
},
{
topic,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Topic, url encoding">>
})
},
{
match_topic,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Match topic string, url encoding">>
})
},
{
share_group,
hoconsc:mk(binary(), #{
in => query,
required => false,
desc => <<"Shared subscription group name">>
})
}
].
subscriptions(get, #{query_string := QString}) ->
Response =
case maps:get(<<"node">>, QString, undefined) of
undefined ->
emqx_mgmt_api:cluster_query(
?SUBS_QTABLE,
QString,
?SUBS_QSCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format/2
);
Node0 ->
case emqx_misc:safe_to_existing_atom(Node0) of
{ok, Node1} ->
emqx_mgmt_api:node_query(
Node1,
?SUBS_QTABLE,
QString,
?SUBS_QSCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format/2
);
{error, _} ->
{error, Node0, {badrpc, <<"invalid node">>}}
end
end,
case Response of
{error, page_limit_invalid} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
{error, Node, {badrpc, R}} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
{500, #{code => <<"NODE_DOWN">>, message => Message}};
Result ->
{200, Result}
end.
format(WhichNode, {{_Subscriber, Topic}, Options}) ->
maps:merge(
#{
topic => get_topic(Topic, Options),
clientid => maps:get(subid, Options),
node => WhichNode
},
maps:with([qos, nl, rap, rh], Options)
).
get_topic(Topic, #{share := Group}) ->
emqx_topic:join([<<"$share">>, Group, Topic]);
get_topic(Topic, _) ->
Topic.
%%--------------------------------------------------------------------
%% QueryString to MatchSpec
%%--------------------------------------------------------------------
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
qs2ms(_Tab, {Qs, Fuzzy}) ->
#{match_spec => gen_match_spec(Qs), fuzzy_fun => fuzzy_filter_fun(Fuzzy)}.
gen_match_spec(Qs) ->
MtchHead = gen_match_spec(Qs, {{'_', '_'}, #{}}),
[{MtchHead, [], ['$_']}].
gen_match_spec([], MtchHead) ->
MtchHead;
gen_match_spec([{Key, '=:=', Value} | More], MtchHead) ->
gen_match_spec(More, update_ms(Key, Value, MtchHead)).
update_ms(clientid, X, {{Pid, Topic}, Opts}) ->
{{Pid, Topic}, Opts#{subid => X}};
update_ms(topic, X, {{Pid, _Topic}, Opts}) ->
{{Pid, X}, Opts};
update_ms(share_group, X, {{Pid, Topic}, Opts}) ->
{{Pid, Topic}, Opts#{share => X}};
update_ms(qos, X, {{Pid, Topic}, Opts}) ->
{{Pid, Topic}, Opts#{qos => X}}.
fuzzy_filter_fun([]) ->
undefined;
fuzzy_filter_fun(Fuzzy) ->
{fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
run_fuzzy_filter(_, []) ->
true;
run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).