226 lines
6.9 KiB
Erlang
226 lines
6.9 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt_api_subscriptions).
|
|
|
|
-behaviour(minirest_api).
|
|
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
-export([
|
|
api_spec/0,
|
|
paths/0,
|
|
schema/1,
|
|
fields/1
|
|
]).
|
|
|
|
-export([subscriptions/2]).
|
|
|
|
-export([
|
|
qs2ms/2,
|
|
run_fuzzy_filter/2,
|
|
format/2
|
|
]).
|
|
|
|
-define(SUBS_QTABLE, emqx_suboption).
|
|
|
|
-define(SUBS_QSCHEMA, [
|
|
{<<"clientid">>, binary},
|
|
{<<"topic">>, binary},
|
|
{<<"share">>, binary},
|
|
{<<"share_group">>, binary},
|
|
{<<"qos">>, integer},
|
|
{<<"match_topic">>, binary}
|
|
]).
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
|
|
|
paths() ->
|
|
["/subscriptions"].
|
|
|
|
schema("/subscriptions") ->
|
|
#{
|
|
'operationId' => subscriptions,
|
|
get => #{
|
|
description => <<"List subscriptions">>,
|
|
tags => [<<"Subscriptions">>],
|
|
parameters => parameters(),
|
|
responses => #{
|
|
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscription)), #{}),
|
|
400 => emqx_dashboard_swagger:error_codes(
|
|
['INVALID_PARAMETER'], <<"Invalid parameter">>
|
|
),
|
|
500 => emqx_dashboard_swagger:error_codes(['NODE_DOWN'], <<"Bad RPC">>)
|
|
}
|
|
}
|
|
}.
|
|
|
|
fields(subscription) ->
|
|
[
|
|
{node, hoconsc:mk(binary(), #{desc => <<"Access type">>})},
|
|
{topic, hoconsc:mk(binary(), #{desc => <<"Topic name">>})},
|
|
{clientid, hoconsc:mk(binary(), #{desc => <<"Client identifier">>})},
|
|
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})},
|
|
{nl, hoconsc:mk(integer(), #{desc => <<"No Local">>})},
|
|
{rap, hoconsc:mk(integer(), #{desc => <<"Retain as Published">>})},
|
|
{rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>})}
|
|
].
|
|
|
|
parameters() ->
|
|
[
|
|
hoconsc:ref(emqx_dashboard_swagger, page),
|
|
hoconsc:ref(emqx_dashboard_swagger, limit),
|
|
{
|
|
node,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Node name">>,
|
|
example => atom_to_list(node())
|
|
})
|
|
},
|
|
{
|
|
clientid,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Client ID">>
|
|
})
|
|
},
|
|
{
|
|
qos,
|
|
hoconsc:mk(emqx_schema:qos(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"QoS">>
|
|
})
|
|
},
|
|
{
|
|
topic,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Topic, url encoding">>
|
|
})
|
|
},
|
|
{
|
|
match_topic,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Match topic string, url encoding">>
|
|
})
|
|
},
|
|
{
|
|
share_group,
|
|
hoconsc:mk(binary(), #{
|
|
in => query,
|
|
required => false,
|
|
desc => <<"Shared subscription group name">>
|
|
})
|
|
}
|
|
].
|
|
|
|
subscriptions(get, #{query_string := QString}) ->
|
|
Response =
|
|
case maps:get(<<"node">>, QString, undefined) of
|
|
undefined ->
|
|
emqx_mgmt_api:cluster_query(
|
|
?SUBS_QTABLE,
|
|
QString,
|
|
?SUBS_QSCHEMA,
|
|
fun ?MODULE:qs2ms/2,
|
|
fun ?MODULE:format/2
|
|
);
|
|
Node0 ->
|
|
case emqx_misc:safe_to_existing_atom(Node0) of
|
|
{ok, Node1} ->
|
|
emqx_mgmt_api:node_query(
|
|
Node1,
|
|
?SUBS_QTABLE,
|
|
QString,
|
|
?SUBS_QSCHEMA,
|
|
fun ?MODULE:qs2ms/2,
|
|
fun ?MODULE:format/2
|
|
);
|
|
{error, _} ->
|
|
{error, Node0, {badrpc, <<"invalid node">>}}
|
|
end
|
|
end,
|
|
case Response of
|
|
{error, page_limit_invalid} ->
|
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
|
{error, Node, {badrpc, R}} ->
|
|
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
|
|
{500, #{code => <<"NODE_DOWN">>, message => Message}};
|
|
Result ->
|
|
{200, Result}
|
|
end.
|
|
|
|
format(WhichNode, {{_Subscriber, Topic}, Options}) ->
|
|
maps:merge(
|
|
#{
|
|
topic => get_topic(Topic, Options),
|
|
clientid => maps:get(subid, Options),
|
|
node => WhichNode
|
|
},
|
|
maps:with([qos, nl, rap, rh], Options)
|
|
).
|
|
|
|
get_topic(Topic, #{share := Group}) ->
|
|
emqx_topic:join([<<"$share">>, Group, Topic]);
|
|
get_topic(Topic, _) ->
|
|
Topic.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% QueryString to MatchSpec
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
|
|
qs2ms(_Tab, {Qs, Fuzzy}) ->
|
|
#{match_spec => gen_match_spec(Qs), fuzzy_fun => fuzzy_filter_fun(Fuzzy)}.
|
|
|
|
gen_match_spec(Qs) ->
|
|
MtchHead = gen_match_spec(Qs, {{'_', '_'}, #{}}),
|
|
[{MtchHead, [], ['$_']}].
|
|
|
|
gen_match_spec([], MtchHead) ->
|
|
MtchHead;
|
|
gen_match_spec([{Key, '=:=', Value} | More], MtchHead) ->
|
|
gen_match_spec(More, update_ms(Key, Value, MtchHead)).
|
|
|
|
update_ms(clientid, X, {{Pid, Topic}, Opts}) ->
|
|
{{Pid, Topic}, Opts#{subid => X}};
|
|
update_ms(topic, X, {{Pid, _Topic}, Opts}) ->
|
|
{{Pid, X}, Opts};
|
|
update_ms(share_group, X, {{Pid, Topic}, Opts}) ->
|
|
{{Pid, Topic}, Opts#{share => X}};
|
|
update_ms(qos, X, {{Pid, Topic}, Opts}) ->
|
|
{{Pid, Topic}, Opts#{qos => X}}.
|
|
|
|
fuzzy_filter_fun([]) ->
|
|
undefined;
|
|
fuzzy_filter_fun(Fuzzy) ->
|
|
{fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
|
|
|
|
run_fuzzy_filter(_, []) ->
|
|
true;
|
|
run_fuzzy_filter(E = {{_, Topic}, _}, [{topic, match, TopicFilter} | Fuzzy]) ->
|
|
emqx_topic:match(Topic, TopicFilter) andalso run_fuzzy_filter(E, Fuzzy).
|