255 lines
8.0 KiB
Erlang
255 lines
8.0 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt_api_topics).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
-include_lib("typerefl/include/types.hrl").
|
|
-include_lib("hocon/include/hoconsc.hrl").
|
|
|
|
%% API
|
|
-behaviour(minirest_api).
|
|
|
|
-export([
|
|
api_spec/0,
|
|
paths/0,
|
|
schema/1,
|
|
fields/1,
|
|
namespace/0
|
|
]).
|
|
|
|
-export([
|
|
topics/2,
|
|
topic/2
|
|
]).
|
|
|
|
-define(TOPIC_NOT_FOUND, 'TOPIC_NOT_FOUND').
|
|
|
|
-define(TOPICS_QUERY_SCHEMA, [{<<"topic">>, binary}, {<<"node">>, atom}]).
|
|
-define(TAGS, [<<"Topics">>]).
|
|
|
|
namespace() -> undefined.
|
|
|
|
api_spec() ->
|
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true, translate_body => true}).
|
|
|
|
paths() ->
|
|
["/topics", "/topics/:topic"].
|
|
|
|
schema("/topics") ->
|
|
#{
|
|
'operationId' => topics,
|
|
get => #{
|
|
description => ?DESC(topic_list),
|
|
tags => ?TAGS,
|
|
parameters => [
|
|
topic_param(query),
|
|
node_param(),
|
|
hoconsc:ref(emqx_dashboard_swagger, page),
|
|
hoconsc:ref(emqx_dashboard_swagger, limit)
|
|
],
|
|
responses => #{
|
|
200 => [
|
|
{data, hoconsc:mk(hoconsc:array(hoconsc:ref(topic)), #{})},
|
|
{meta, hoconsc:mk(hoconsc:ref(emqx_dashboard_swagger, meta), #{})}
|
|
]
|
|
}
|
|
}
|
|
};
|
|
schema("/topics/:topic") ->
|
|
#{
|
|
'operationId' => topic,
|
|
get => #{
|
|
description => ?DESC(topic_info_by_name),
|
|
tags => ?TAGS,
|
|
parameters => [topic_param(path)],
|
|
responses => #{
|
|
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(topic)), #{}),
|
|
404 =>
|
|
emqx_dashboard_swagger:error_codes(['TOPIC_NOT_FOUND'], <<"Topic not found">>)
|
|
}
|
|
}
|
|
}.
|
|
|
|
fields(topic) ->
|
|
[
|
|
{topic,
|
|
hoconsc:mk(binary(), #{
|
|
desc => <<"Topic Name">>,
|
|
required => true
|
|
})},
|
|
{node,
|
|
hoconsc:mk(binary(), #{
|
|
desc => <<"Node">>,
|
|
required => true
|
|
})},
|
|
{session,
|
|
hoconsc:mk(binary(), #{
|
|
desc => <<"Session ID">>,
|
|
required => false
|
|
})}
|
|
].
|
|
|
|
%%%==============================================================================================
|
|
%% parameters trans
|
|
topics(get, #{query_string := Qs}) ->
|
|
do_list(Qs).
|
|
|
|
topic(get, #{bindings := Bindings}) ->
|
|
lookup(Bindings).
|
|
|
|
%%%==============================================================================================
|
|
%% api apply
|
|
do_list(Params) ->
|
|
try
|
|
Pager = parse_pager_params(Params),
|
|
{_, Query} = emqx_mgmt_api:parse_qstring(Params, ?TOPICS_QUERY_SCHEMA),
|
|
Stream = mk_topic_stream(qs2ms(Query)),
|
|
QResult = eval_topic_query(Stream, Pager, emqx_mgmt_api:init_query_result()),
|
|
{200, format_list_response(Pager, Query, QResult)}
|
|
catch
|
|
throw:{error, page_limit_invalid} ->
|
|
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
|
|
error:{invalid_topic_filter, _} ->
|
|
{400, #{code => <<"INVALID_PARAMTER">>, message => <<"topic_filter_invalid">>}}
|
|
end.
|
|
|
|
lookup(#{topic := Topic}) ->
|
|
case emqx_router:lookup_routes(Topic) of
|
|
[] ->
|
|
{404, #{code => ?TOPIC_NOT_FOUND, message => <<"Topic not found">>}};
|
|
Routes when is_list(Routes) ->
|
|
Formatted = [format(Route) || Route <- Routes],
|
|
{200, Formatted}
|
|
end.
|
|
|
|
%%%==============================================================================================
|
|
%% internal
|
|
|
|
parse_pager_params(Params) ->
|
|
try emqx_mgmt_api:parse_pager_params(Params) of
|
|
Pager = #{} ->
|
|
Pager;
|
|
false ->
|
|
throw({error, page_limit_invalid})
|
|
catch
|
|
error:badarg ->
|
|
throw({error, page_limit_invalid})
|
|
end.
|
|
|
|
-spec qs2ms({list(), list()}) -> tuple().
|
|
qs2ms({Qs, _}) ->
|
|
lists:foldl(fun gen_match_spec/2, {'_', '_'}, Qs).
|
|
|
|
gen_match_spec({topic, '=:=', QTopic}, {_MTopic, MNode}) when is_atom(MNode) ->
|
|
case emqx_topic:parse(QTopic) of
|
|
{#share{group = Group, topic = Topic}, _SubOpts} ->
|
|
{Topic, {Group, MNode}};
|
|
{Topic, _SubOpts} ->
|
|
{Topic, MNode}
|
|
end;
|
|
gen_match_spec({node, '=:=', QNode}, {MTopic, _MDest}) ->
|
|
{MTopic, QNode}.
|
|
|
|
mk_topic_stream(Spec = {MTopic, _MDest = '_'}) ->
|
|
emqx_utils_stream:chain(emqx_router:stream(Spec), mk_persistent_topic_stream(MTopic));
|
|
mk_topic_stream(Spec) ->
|
|
%% NOTE: Assuming that no persistent topic ever matches a query with `node` filter.
|
|
emqx_router:stream(Spec).
|
|
|
|
mk_persistent_topic_stream(Spec) ->
|
|
case emqx_persistent_message:is_persistence_enabled() of
|
|
true ->
|
|
emqx_persistent_session_ds_router:stream(Spec);
|
|
false ->
|
|
emqx_utils_stream:empty()
|
|
end.
|
|
|
|
eval_count() ->
|
|
emqx_router:stats(n_routes) + eval_persistent_count().
|
|
|
|
eval_persistent_count() ->
|
|
case emqx_persistent_message:is_persistence_enabled() of
|
|
true ->
|
|
emqx_persistent_session_ds_router:stats(n_routes);
|
|
false ->
|
|
0
|
|
end.
|
|
|
|
eval_topic_query(Stream, QState = #{limit := Limit}, QResult) ->
|
|
case emqx_utils_stream:consume(Limit, Stream) of
|
|
{Rows, NStream} ->
|
|
case emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult) of
|
|
{more, NQResult} ->
|
|
eval_topic_query(NStream, QState, NQResult);
|
|
{enough, NQResult} ->
|
|
finalize_query(false, NQResult)
|
|
end;
|
|
Rows when is_list(Rows) ->
|
|
{_, NQResult} = emqx_mgmt_api:accumulate_query_rows(node(), Rows, QState, QResult),
|
|
finalize_query(true, NQResult)
|
|
end.
|
|
|
|
finalize_query(Complete, QResult = #{overflow := Overflow}) ->
|
|
HasNext = Overflow orelse not Complete,
|
|
QResult#{complete => Complete, hasnext => HasNext}.
|
|
|
|
format_list_response(Meta, Query, QResult = #{rows := RowsAcc}) ->
|
|
#{
|
|
meta => format_response_meta(Meta, Query, QResult),
|
|
data => lists:flatmap(
|
|
fun({_Node, Rows}) -> [format(R) || R <- Rows] end,
|
|
RowsAcc
|
|
)
|
|
}.
|
|
|
|
format_response_meta(Meta, _Query, #{hasnext := HasNext, complete := true, cursor := Cursor}) ->
|
|
Meta#{hasnext => HasNext, count => Cursor};
|
|
format_response_meta(Meta, _Query = {[], []}, #{hasnext := HasNext}) ->
|
|
Meta#{hasnext => HasNext, count => eval_count()};
|
|
format_response_meta(Meta, _Query, #{hasnext := HasNext}) ->
|
|
Meta#{hasnext => HasNext}.
|
|
|
|
format(#route{topic = Topic, dest = {Group, Node}}) ->
|
|
#{topic => ?SHARE(Group, Topic), node => Node};
|
|
format(#route{topic = Topic, dest = Node}) when is_atom(Node) ->
|
|
#{topic => Topic, node => Node};
|
|
format(#route{topic = Topic, dest = SessionId}) when is_binary(SessionId) ->
|
|
#{topic => Topic, session => SessionId}.
|
|
|
|
topic_param(In) ->
|
|
{
|
|
topic,
|
|
hoconsc:mk(binary(), #{
|
|
desc => <<"Topic Name">>,
|
|
in => In,
|
|
required => (In == path),
|
|
example => <<"">>
|
|
})
|
|
}.
|
|
|
|
node_param() ->
|
|
{
|
|
node,
|
|
hoconsc:mk(binary(), #{
|
|
desc => <<"Node Name">>,
|
|
in => query,
|
|
required => false,
|
|
example => node()
|
|
})
|
|
}.
|