Merge pull request #8356 from DDDHuang/pubsub_api

Pubsub api
This commit is contained in:
DDDHuang 2022-07-01 17:24:18 +08:00 committed by GitHub
commit 25c37a63ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 279 additions and 59 deletions

View File

@ -13,6 +13,7 @@
{emqx_gateway_http,1}. {emqx_gateway_http,1}.
{emqx_license,1}. {emqx_license,1}.
{emqx_management,1}. {emqx_management,1}.
{emqx_management,2}.
{emqx_mgmt_api_plugins,1}. {emqx_mgmt_api_plugins,1}.
{emqx_mgmt_cluster,1}. {emqx_mgmt_cluster,1}.
{emqx_mgmt_trace,1}. {emqx_mgmt_trace,1}.

View File

@ -1,7 +1,25 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"5.0.0",[{load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]}]}, [{"5.0.0",[
{load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]},
{add_module,emqx_management_proto_v2},
{load_module,emqx_mgmt_api_clients,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt_api_publish,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt_api_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt_api_configs,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt_util,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt,brutal_purge,soft_purge,[]}
]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"5.0.0",[{load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]}]}, [{"5.0.0",[
{load_module,emqx_mgmt_cli,brutal_purge,soft_purge,[]},
{delete_module,emqx_management_proto_v2},
{load_module,emqx_mgmt_api_clients,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt_api_publish,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt_api_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt_api_configs,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt_util,brutal_purge,soft_purge,[]},
{load_module,emqx_mgmt,brutal_purge,soft_purge,[]}
]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -83,7 +83,9 @@
do_subscribe/2, do_subscribe/2,
publish/1, publish/1,
unsubscribe/2, unsubscribe/2,
do_unsubscribe/2 do_unsubscribe/2,
unsubscribe_batch/2,
do_unsubscribe_batch/2
]). ]).
%% Alarms %% Alarms
@ -151,7 +153,7 @@ get_sys_memory() ->
end. end.
node_info(Node) -> node_info(Node) ->
wrap_rpc(emqx_management_proto_v1:node_info(Node)). wrap_rpc(emqx_management_proto_v2:node_info(Node)).
stopped_node_info(Node) -> stopped_node_info(Node) ->
#{name => Node, node_status => 'Stopped'}. #{name => Node, node_status => 'Stopped'}.
@ -171,7 +173,7 @@ broker_info() ->
Info#{node => node(), otp_release => otp_rel(), node_status => 'Running'}. Info#{node => node(), otp_release => otp_rel(), node_status => 'Running'}.
broker_info(Node) -> broker_info(Node) ->
wrap_rpc(emqx_management_proto_v1:broker_info(Node)). wrap_rpc(emqx_management_proto_v2:broker_info(Node)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Metrics and Stats %% Metrics and Stats
@ -355,7 +357,7 @@ do_call_client(ClientId, Req) ->
%% @private %% @private
call_client(Node, ClientId, Req) -> call_client(Node, ClientId, Req) ->
wrap_rpc(emqx_management_proto_v1:call_client(Node, ClientId, Req)). wrap_rpc(emqx_management_proto_v2:call_client(Node, ClientId, Req)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Subscriptions %% Subscriptions
@ -374,7 +376,7 @@ do_list_subscriptions() ->
end. end.
list_subscriptions(Node) -> list_subscriptions(Node) ->
wrap_rpc(emqx_management_proto_v1:list_subscriptions(Node)). wrap_rpc(emqx_management_proto_v2:list_subscriptions(Node)).
list_subscriptions_via_topic(Topic, FormatFun) -> list_subscriptions_via_topic(Topic, FormatFun) ->
lists:append([ lists:append([
@ -402,7 +404,7 @@ subscribe(ClientId, TopicTables) ->
subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables). subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
subscribe([Node | Nodes], ClientId, TopicTables) -> subscribe([Node | Nodes], ClientId, TopicTables) ->
case wrap_rpc(emqx_management_proto_v1:subscribe(Node, ClientId, TopicTables)) of case wrap_rpc(emqx_management_proto_v2:subscribe(Node, ClientId, TopicTables)) of
{error, _} -> subscribe(Nodes, ClientId, TopicTables); {error, _} -> subscribe(Nodes, ClientId, TopicTables);
{subscribe, Res} -> {subscribe, Res, Node} {subscribe, Res} -> {subscribe, Res, Node}
end; end;
@ -417,7 +419,6 @@ do_subscribe(ClientId, TopicTables) ->
[{_, Pid}] -> Pid ! {subscribe, TopicTables} [{_, Pid}] -> Pid ! {subscribe, TopicTables}
end. end.
%%TODO: ???
publish(Msg) -> publish(Msg) ->
emqx_metrics:inc_msg(Msg), emqx_metrics:inc_msg(Msg),
emqx:publish(Msg). emqx:publish(Msg).
@ -430,7 +431,7 @@ unsubscribe(ClientId, Topic) ->
-spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) -> -spec unsubscribe([node()], emqx_types:clientid(), emqx_types:topic()) ->
{unsubscribe, _} | {error, channel_not_found}. {unsubscribe, _} | {error, channel_not_found}.
unsubscribe([Node | Nodes], ClientId, Topic) -> unsubscribe([Node | Nodes], ClientId, Topic) ->
case wrap_rpc(emqx_management_proto_v1:unsubscribe(Node, ClientId, Topic)) of case wrap_rpc(emqx_management_proto_v2:unsubscribe(Node, ClientId, Topic)) of
{error, _} -> unsubscribe(Nodes, ClientId, Topic); {error, _} -> unsubscribe(Nodes, ClientId, Topic);
Re -> Re Re -> Re
end; end;
@ -445,6 +446,29 @@ do_unsubscribe(ClientId, Topic) ->
[{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]} [{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
end. end.
-spec unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe, _} | {error, channel_not_found}.
unsubscribe_batch(ClientId, Topics) ->
unsubscribe_batch(mria_mnesia:running_nodes(), ClientId, Topics).
-spec unsubscribe_batch([node()], emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe_batch, _} | {error, channel_not_found}.
unsubscribe_batch([Node | Nodes], ClientId, Topics) ->
case wrap_rpc(emqx_management_proto_v2:unsubscribe_batch(Node, ClientId, Topics)) of
{error, _} -> unsubscribe_batch(Nodes, ClientId, Topics);
Re -> Re
end;
unsubscribe_batch([], _ClientId, _Topics) ->
{error, channel_not_found}.
-spec do_unsubscribe_batch(emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe_batch, _} | {error, _}.
do_unsubscribe_batch(ClientId, Topics) ->
case ets:lookup(emqx_channel, ClientId) of
[] -> {error, channel_not_found};
[{_, Pid}] -> Pid ! {unsubscribe, [emqx_topic:parse(Topic) || Topic <- Topics]}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Get Alarms %% Get Alarms
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -39,8 +39,9 @@
subscriptions/2, subscriptions/2,
authz_cache/2, authz_cache/2,
subscribe/2, subscribe/2,
unsubscribe/2,
subscribe_batch/2, subscribe_batch/2,
unsubscribe/2,
unsubscribe_batch/2,
set_keepalive/2 set_keepalive/2
]). ]).
@ -88,7 +89,9 @@ paths() ->
"/clients/:clientid/authorization/cache", "/clients/:clientid/authorization/cache",
"/clients/:clientid/subscriptions", "/clients/:clientid/subscriptions",
"/clients/:clientid/subscribe", "/clients/:clientid/subscribe",
"/clients/:clientid/subscribe/bulk",
"/clients/:clientid/unsubscribe", "/clients/:clientid/unsubscribe",
"/clients/:clientid/unsubscribe/bulk",
"/clients/:clientid/keepalive" "/clients/:clientid/keepalive"
]. ].
@ -293,6 +296,21 @@ schema("/clients/:clientid/subscribe") ->
} }
} }
}; };
schema("/clients/:clientid/subscribe/bulk") ->
#{
'operationId' => subscribe_batch,
post => #{
description => <<"Subscribe">>,
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscribe))),
responses => #{
200 => hoconsc:array(hoconsc:ref(emqx_mgmt_api_subscriptions, subscription)),
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>
)
}
}
};
schema("/clients/:clientid/unsubscribe") -> schema("/clients/:clientid/unsubscribe") ->
#{ #{
'operationId' => unsubscribe, 'operationId' => unsubscribe,
@ -308,6 +326,21 @@ schema("/clients/:clientid/unsubscribe") ->
} }
} }
}; };
schema("/clients/:clientid/unsubscribe/bulk") ->
#{
'operationId' => unsubscribe_batch,
post => #{
description => <<"Unsubscribe">>,
parameters => [{clientid, hoconsc:mk(binary(), #{in => path})}],
'requestBody' => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, unsubscribe))),
responses => #{
204 => <<"Unsubscribe OK">>,
404 => emqx_dashboard_swagger:error_codes(
['CLIENTID_NOT_FOUND'], <<"Client id not found">>
)
}
}
};
schema("/clients/:clientid/keepalive") -> schema("/clients/:clientid/keepalive") ->
#{ #{
'operationId' => set_keepalive, 'operationId' => set_keepalive,
@ -543,11 +576,6 @@ subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo), Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo),
subscribe(Opts#{clientid => ClientID}). subscribe(Opts#{clientid => ClientID}).
unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Topic = maps:get(<<"topic">>, TopicInfo),
unsubscribe(#{clientid => ClientID, topic => Topic}).
%% TODO: batch
subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) -> subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
Topics = Topics =
[ [
@ -556,6 +584,14 @@ subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}
], ],
subscribe_batch(#{clientid => ClientID, topics => Topics}). subscribe_batch(#{clientid => ClientID, topics => Topics}).
unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
Topic = maps:get(<<"topic">>, TopicInfo),
unsubscribe(#{clientid => ClientID, topic => Topic}).
unsubscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
Topics = [Topic || #{<<"topic">> := Topic} <- TopicInfos],
unsubscribe_batch(#{clientid => ClientID, topics => Topics}).
subscriptions(get, #{bindings := #{clientid := ClientID}}) -> subscriptions(get, #{bindings := #{clientid := ClientID}}) ->
case emqx_mgmt:list_client_subscriptions(ClientID) of case emqx_mgmt:list_client_subscriptions(ClientID) of
[] -> [] ->
@ -668,9 +704,20 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
{error, Reason} -> {error, Reason} ->
Message = list_to_binary(io_lib:format("~p", [Reason])), Message = list_to_binary(io_lib:format("~p", [Reason])),
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}}; {500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
{ok, Node} -> {ok, SubInfo} ->
Response = Sub#{node => Node}, {200, SubInfo}
{200, Response} end.
subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
case lookup(#{clientid => ClientID}) of
{200, _} ->
ArgList = [
[ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)]
|| #{topic := Topic} = Sub <- Topics
],
{200, emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList)};
{404, ?CLIENT_ID_NOT_FOUND} ->
{404, ?CLIENT_ID_NOT_FOUND}
end. end.
unsubscribe(#{clientid := ClientID, topic := Topic}) -> unsubscribe(#{clientid := ClientID, topic := Topic}) ->
@ -681,12 +728,14 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) ->
{204} {204}
end. end.
subscribe_batch(#{clientid := ClientID, topics := Topics}) -> unsubscribe_batch(#{clientid := ClientID, topics := Topics}) ->
ArgList = [ case lookup(#{clientid => ClientID}) of
[ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)] {200, _} ->
|| #{topic := Topic} = Sub <- Topics _ = emqx_mgmt:unsubscribe_batch(ClientID, Topics),
], {204};
emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList). {404, ?CLIENT_ID_NOT_FOUND} ->
{404, ?CLIENT_ID_NOT_FOUND}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% internal function %% internal function
@ -700,7 +749,7 @@ do_subscribe(ClientID, Topic0, Options) ->
{subscribe, Subscriptions, Node} -> {subscribe, Subscriptions, Node} ->
case proplists:is_defined(Topic, Subscriptions) of case proplists:is_defined(Topic, Subscriptions) of
true -> true ->
{ok, Node}; {ok, Options#{node => Node, clientid => ClientID, topic => Topic}};
false -> false ->
{error, unknow_error} {error, unknow_error}
end end

View File

@ -305,7 +305,7 @@ configs(get, Params, _Req) ->
Node = maps:get(node, Params, node()), Node = maps:get(node, Params, node()),
case case
lists:member(Node, mria_mnesia:running_nodes()) andalso lists:member(Node, mria_mnesia:running_nodes()) andalso
emqx_management_proto_v1:get_full_config(Node) emqx_management_proto_v2:get_full_config(Node)
of of
false -> false ->
Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])), Message = list_to_binary(io_lib:format("Bad node ~p, reason not found", [Node])),

View File

@ -445,7 +445,7 @@ list_listeners() ->
[list_listeners(Node) || Node <- mria_mnesia:running_nodes()]. [list_listeners(Node) || Node <- mria_mnesia:running_nodes()].
list_listeners(Node) -> list_listeners(Node) ->
wrap_rpc(emqx_management_proto_v1:list_listeners(Node)). wrap_rpc(emqx_management_proto_v2:list_listeners(Node)).
listener_status_by_id(NodeL) -> listener_status_by_id(NodeL) ->
Listeners = maps:to_list(listener_status_by_id(NodeL, #{})), Listeners = maps:to_list(listener_status_by_id(NodeL, #{})),

View File

@ -61,7 +61,7 @@ schema("/publish/bulk") ->
} }
}. }.
fields(publish_message) -> fields(message) ->
[ [
{topic, {topic,
hoconsc:mk(binary(), #{ hoconsc:mk(binary(), #{
@ -75,7 +75,7 @@ fields(publish_message) ->
required => false, required => false,
default => 0 default => 0
})}, })},
{from, {clientid,
hoconsc:mk(binary(), #{ hoconsc:mk(binary(), #{
desc => <<"From client ID">>, desc => <<"From client ID">>,
required => false, required => false,
@ -94,34 +94,76 @@ fields(publish_message) ->
default => false default => false
})} })}
]; ];
fields(publish_message) ->
[
{payload_encoding,
hoconsc:mk(hoconsc:enum([plain, base64]), #{
desc => <<"MQTT Payload Encoding, base64 or plain">>,
required => false,
default => plain
})}
] ++ fields(message);
fields(publish_message_info) -> fields(publish_message_info) ->
[ [
{id, {id,
hoconsc:mk(binary(), #{ hoconsc:mk(binary(), #{
desc => <<"Internal Message ID">> desc => <<"Internal Message ID">>
})} })}
] ++ fields(publish_message). ] ++ fields(message).
publish(post, #{body := Body}) -> publish(post, #{body := Body}) ->
Message = message(Body), case message(Body) of
{ok, Message} ->
_ = emqx_mgmt:publish(Message), _ = emqx_mgmt:publish(Message),
{200, format_message(Message)}. {200, format_message(Message)};
{error, R} ->
{400, 'BAD_REQUEST', to_binary(R)}
end.
publish_batch(post, #{body := Body}) -> publish_batch(post, #{body := Body}) ->
Messages = messages(Body), case messages(Body) of
{ok, Messages} ->
_ = [emqx_mgmt:publish(Message) || Message <- Messages], _ = [emqx_mgmt:publish(Message) || Message <- Messages],
{200, format_message(Messages)}. {200, format_message(Messages)};
{error, R} ->
{400, 'BAD_REQUEST', to_binary(R)}
end.
message(Map) -> message(Map) ->
From = maps:get(<<"from">>, Map, http_api), Encoding = maps:get(<<"payload_encoding">>, Map, plain),
case encode_payload(Encoding, maps:get(<<"payload">>, Map)) of
{ok, Payload} ->
From = maps:get(<<"clientid">>, Map, http_api),
QoS = maps:get(<<"qos">>, Map, 0), QoS = maps:get(<<"qos">>, Map, 0),
Topic = maps:get(<<"topic">>, Map), Topic = maps:get(<<"topic">>, Map),
Payload = maps:get(<<"payload">>, Map),
Retain = maps:get(<<"retain">>, Map, false), Retain = maps:get(<<"retain">>, Map, false),
emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}). {ok, emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{})};
{error, R} ->
{error, R}
end.
encode_payload(plain, Payload) ->
{ok, Payload};
encode_payload(base64, Payload) ->
try
{ok, base64:decode(Payload)}
catch
_:_ ->
{error, {decode_base64_payload_failed, Payload}}
end.
messages(List) -> messages(List) ->
[message(MessageMap) || MessageMap <- List]. messages(List, []).
messages([], Res) ->
{ok, lists:reverse(Res)};
messages([MessageMap | List], Res) ->
case message(MessageMap) of
{ok, Message} ->
messages(List, [Message | Res]);
{error, R} ->
{error, R}
end.
format_message(Messages) when is_list(Messages) -> format_message(Messages) when is_list(Messages) ->
[format_message(Message) || Message <- Messages]; [format_message(Message) || Message <- Messages];
@ -134,7 +176,7 @@ format_message(#message{
topic => Topic, topic => Topic,
payload => Payload, payload => Payload,
retain => maps:get(retain, Flags, false), retain => maps:get(retain, Flags, false),
from => to_binary(From) clientid => to_binary(From)
}. }.
to_binary(Data) when is_binary(Data) -> to_binary(Data) when is_binary(Data) ->

View File

@ -202,23 +202,29 @@ json_content_schema(Schema, Desc) ->
%%%============================================================================================== %%%==============================================================================================
batch_operation(Module, Function, ArgsList) -> batch_operation(Module, Function, ArgsList) ->
Failed = batch_operation(Module, Function, ArgsList, []), {Succeed, Failed} = batch_operation(Module, Function, ArgsList, {[], []}),
Len = erlang:length(Failed), case erlang:length(Failed) of
Success = erlang:length(ArgsList) - Len, 0 ->
Succeed;
_FLen ->
Fun = Fun =
fun({Args, Reason}, Detail) -> fun({Args, Reason}, Detail) ->
[#{data => Args, reason => io_lib:format("~p", [Reason])} | Detail] [
#{data => Args, reason => list_to_binary(io_lib:format("~p", [Reason]))}
| Detail
]
end, end,
#{success => Success, failed => Len, detail => lists:foldl(Fun, [], Failed)}. #{succeed => Succeed, failed => lists:foldl(Fun, [], Failed)}
end.
batch_operation(_Module, _Function, [], Failed) -> batch_operation(_Module, _Function, [], {Succeed, Failed}) ->
lists:reverse(Failed); {lists:reverse(Succeed), lists:reverse(Failed)};
batch_operation(Module, Function, [Args | ArgsList], Failed) -> batch_operation(Module, Function, [Args | ArgsList], {Succeed, Failed}) ->
case erlang:apply(Module, Function, Args) of case erlang:apply(Module, Function, Args) of
ok -> {ok, Res} ->
batch_operation(Module, Function, ArgsList, Failed); batch_operation(Module, Function, ArgsList, {[Res | Succeed], Failed});
{error, Reason} -> {error, Reason} ->
batch_operation(Module, Function, ArgsList, [{Args, Reason} | Failed]) batch_operation(Module, Function, ArgsList, {Succeed, [{Args, Reason} | Failed]})
end. end.
properties(Props) -> properties(Props) ->

View File

@ -0,0 +1,80 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_management_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
node_info/1,
broker_info/1,
list_subscriptions/1,
list_listeners/1,
subscribe/3,
unsubscribe/3,
unsubscribe_batch/3,
call_client/3,
get_full_config/1
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.1".
-spec unsubscribe_batch(node(), emqx_types:clientid(), [emqx_types:topic()]) ->
{unsubscribe, _} | {error, _} | {badrpc, _}.
unsubscribe_batch(Node, ClientId, Topics) ->
rpc:call(Node, emqx_mgmt, do_unsubscribe_batch, [ClientId, Topics]).
-spec node_info(node()) -> map() | {badrpc, _}.
node_info(Node) ->
rpc:call(Node, emqx_mgmt, node_info, []).
-spec broker_info(node()) -> map() | {badrpc, _}.
broker_info(Node) ->
rpc:call(Node, emqx_mgmt, broker_info, []).
-spec list_subscriptions(node()) -> [map()] | {badrpc, _}.
list_subscriptions(Node) ->
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
-spec list_listeners(node()) -> map() | {badrpc, _}.
list_listeners(Node) ->
rpc:call(Node, emqx_mgmt_api_listeners, do_list_listeners, []).
-spec subscribe(node(), emqx_types:clientid(), emqx_types:topic_filters()) ->
{subscribe, _} | {error, atom()} | {badrpc, _}.
subscribe(Node, ClientId, TopicTables) ->
rpc:call(Node, emqx_mgmt, do_subscribe, [ClientId, TopicTables]).
-spec unsubscribe(node(), emqx_types:clientid(), emqx_types:topic()) ->
{unsubscribe, _} | {error, _} | {badrpc, _}.
unsubscribe(Node, ClientId, Topic) ->
rpc:call(Node, emqx_mgmt, do_unsubscribe, [ClientId, Topic]).
-spec call_client(node(), emqx_types:clientid(), term()) -> term().
call_client(Node, ClientId, Req) ->
rpc:call(Node, emqx_mgmt, do_call_client, [ClientId, Req]).
-spec get_full_config(node()) -> map() | list() | {badrpc, _}.
get_full_config(Node) ->
rpc:call(Node, emqx_mgmt_api_configs, get_full_config, []).