Merge pull request #6758 from k32/bpapi-mgmt
refactor(emqx_mgmt): Decorate RPCs
This commit is contained in:
commit
52441c92fa
|
@ -28,6 +28,7 @@
|
||||||
, get_cache_ttl/0
|
, get_cache_ttl/0
|
||||||
, is_enabled/0
|
, is_enabled/0
|
||||||
, drain_cache/0
|
, drain_cache/0
|
||||||
|
, drain_cache/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% export for test
|
%% export for test
|
||||||
|
@ -154,6 +155,16 @@ drain_cache() ->
|
||||||
_ = persistent_term:put(drain_k(), time_now()),
|
_ = persistent_term:put(drain_k(), time_now()),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
-spec drain_cache(emqx_types:clientid()) -> ok | {error, not_found}.
|
||||||
|
drain_cache(ClientId) ->
|
||||||
|
case emqx_cm:lookup_channels(ClientId) of
|
||||||
|
[] ->
|
||||||
|
{error, not_found};
|
||||||
|
Pids when is_list(Pids) ->
|
||||||
|
erlang:send(lists:last(Pids), clean_authz_cache),
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -56,6 +56,8 @@
|
||||||
|
|
||||||
-export([ lookup_channels/1
|
-export([ lookup_channels/1
|
||||||
, lookup_channels/2
|
, lookup_channels/2
|
||||||
|
|
||||||
|
, lookup_client/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Test/debug interface
|
%% Test/debug interface
|
||||||
|
@ -80,8 +82,16 @@
|
||||||
, get_connected_client_count/0
|
, get_connected_client_count/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export_type([ channel_info/0
|
||||||
|
]).
|
||||||
|
|
||||||
-type(chan_pid() :: pid()).
|
-type(chan_pid() :: pid()).
|
||||||
|
|
||||||
|
-type(channel_info() :: { _Chan :: {emqx_types:clientid(), pid()}
|
||||||
|
, _Info :: emqx_types:infos()
|
||||||
|
, _Stats :: emqx_types:stats()
|
||||||
|
}).
|
||||||
|
|
||||||
%% Tables for channel management.
|
%% Tables for channel management.
|
||||||
-define(CHAN_TAB, emqx_channel).
|
-define(CHAN_TAB, emqx_channel).
|
||||||
-define(CHAN_CONN_TAB, emqx_channel_conn).
|
-define(CHAN_CONN_TAB, emqx_channel_conn).
|
||||||
|
@ -502,6 +512,18 @@ lookup_channels(global, ClientId) ->
|
||||||
lookup_channels(local, ClientId) ->
|
lookup_channels(local, ClientId) ->
|
||||||
[ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
|
[ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
|
||||||
|
|
||||||
|
-spec lookup_client({clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
|
||||||
|
[channel_info()].
|
||||||
|
lookup_client({username, Username}) ->
|
||||||
|
MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'}
|
||||||
|
, [{'=:=','$1', Username}]
|
||||||
|
, ['$_']
|
||||||
|
}],
|
||||||
|
ets:select(emqx_channel_info, MatchSpec);
|
||||||
|
lookup_client({clientid, ClientId}) ->
|
||||||
|
[Rec || Key <- ets:lookup(emqx_channel, ClientId)
|
||||||
|
, Rec <- ets:lookup(emqx_channel_info, Key)].
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
rpc_call(Node, Fun, Args, Timeout) ->
|
rpc_call(Node, Fun, Args, Timeout) ->
|
||||||
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
|
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
|
||||||
|
|
|
@ -19,8 +19,13 @@
|
||||||
-behaviour(emqx_bpapi).
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
-export([ introduced_in/0
|
-export([ introduced_in/0
|
||||||
|
|
||||||
, forward/3
|
, forward/3
|
||||||
, forward_async/3
|
, forward_async/3
|
||||||
|
, client_subscriptions/2
|
||||||
|
|
||||||
|
, lookup_client/2
|
||||||
|
, kickout_client/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include("bpapi.hrl").
|
-include("bpapi.hrl").
|
||||||
|
@ -37,3 +42,18 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
|
||||||
-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true.
|
-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true.
|
||||||
forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
|
forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
|
||||||
emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
|
emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
|
||||||
|
|
||||||
|
-spec client_subscriptions(node(), emqx_types:clientid()) ->
|
||||||
|
[{emqx_types:topic(), emqx_types:subopts()}]
|
||||||
|
| emqx_rpc:badrpc().
|
||||||
|
client_subscriptions(Node, ClientId) ->
|
||||||
|
rpc:call(Node, emqx_broker, subscriptions, [ClientId]).
|
||||||
|
|
||||||
|
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
|
||||||
|
kickout_client(Node, ClientId) ->
|
||||||
|
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
|
||||||
|
|
||||||
|
-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
|
||||||
|
[emqx_cm:channel_info()] | {badrpc, _}.
|
||||||
|
lookup_client(Node, Key) ->
|
||||||
|
rpc:call(Node, emqx_cm, lookup_client, [Key]).
|
||||||
|
|
|
@ -21,7 +21,14 @@
|
||||||
-include("bpapi.hrl").
|
-include("bpapi.hrl").
|
||||||
|
|
||||||
-export([ introduced_in/0
|
-export([ introduced_in/0
|
||||||
|
|
||||||
, is_running/1
|
, is_running/1
|
||||||
|
|
||||||
|
, get_stats/1
|
||||||
|
, get_metrics/1
|
||||||
|
|
||||||
|
, clean_authz_cache/1
|
||||||
|
, clean_authz_cache/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
introduced_in() ->
|
introduced_in() ->
|
||||||
|
@ -30,3 +37,22 @@ introduced_in() ->
|
||||||
-spec is_running(node()) -> boolean() | {badrpc, term()}.
|
-spec is_running(node()) -> boolean() | {badrpc, term()}.
|
||||||
is_running(Node) ->
|
is_running(Node) ->
|
||||||
rpc:call(Node, emqx, is_running, []).
|
rpc:call(Node, emqx, is_running, []).
|
||||||
|
|
||||||
|
-spec get_stats(node()) -> emqx_stats:stats() | {badrpc, _}.
|
||||||
|
get_stats(Node) ->
|
||||||
|
rpc:call(Node, emqx_stats, getstats, []).
|
||||||
|
|
||||||
|
-spec get_metrics(node()) -> [{emqx_metrics:metric_name(), non_neg_integer()}] | {badrpc, _}.
|
||||||
|
get_metrics(Node) ->
|
||||||
|
rpc:call(Node, emqx_metrics, all, []).
|
||||||
|
|
||||||
|
-spec clean_authz_cache(node(), emqx_types:clientid()) ->
|
||||||
|
ok
|
||||||
|
| {error, not_found}
|
||||||
|
| {badrpc, _}.
|
||||||
|
clean_authz_cache(Node, ClientId) ->
|
||||||
|
rpc:call(Node, emqx_authz_cache, drain_cache, [ClientId]).
|
||||||
|
|
||||||
|
-spec clean_authz_cache(node()) -> ok | {badrpc, _}.
|
||||||
|
clean_authz_cache(Node) ->
|
||||||
|
rpc:call(Node, emqx_authz_cache, drain_cache, []).
|
||||||
|
|
|
@ -29,7 +29,9 @@
|
||||||
, lookup_node/1
|
, lookup_node/1
|
||||||
, list_brokers/0
|
, list_brokers/0
|
||||||
, lookup_broker/1
|
, lookup_broker/1
|
||||||
|
, node_info/0
|
||||||
, node_info/1
|
, node_info/1
|
||||||
|
, broker_info/0
|
||||||
, broker_info/1
|
, broker_info/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -65,6 +67,8 @@
|
||||||
, list_subscriptions_via_topic/3
|
, list_subscriptions_via_topic/3
|
||||||
, lookup_subscriptions/1
|
, lookup_subscriptions/1
|
||||||
, lookup_subscriptions/2
|
, lookup_subscriptions/2
|
||||||
|
|
||||||
|
, do_list_subscriptions/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Routes
|
%% Routes
|
||||||
|
@ -80,7 +84,8 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Listeners
|
%% Listeners
|
||||||
-export([ list_listeners/0
|
-export([ do_list_listeners/0
|
||||||
|
, list_listeners/0
|
||||||
, list_listeners/1
|
, list_listeners/1
|
||||||
, list_listeners_by_id/1
|
, list_listeners_by_id/1
|
||||||
, get_listener/2
|
, get_listener/2
|
||||||
|
@ -116,6 +121,12 @@
|
||||||
|
|
||||||
-elvis([{elvis_style, god_modules, disable}]).
|
-elvis([{elvis_style, god_modules, disable}]).
|
||||||
|
|
||||||
|
-export_type([listener_manage_op/0]).
|
||||||
|
|
||||||
|
-type listener_manage_op() :: start_listener
|
||||||
|
| stop_listener
|
||||||
|
| restart_listener.
|
||||||
|
|
||||||
%% TODO: remove these function after all api use minirest version 1.X
|
%% TODO: remove these function after all api use minirest version 1.X
|
||||||
return() ->
|
return() ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -134,7 +145,7 @@ list_nodes() ->
|
||||||
|
|
||||||
lookup_node(Node) -> node_info(Node).
|
lookup_node(Node) -> node_info(Node).
|
||||||
|
|
||||||
node_info(Node) when Node =:= node() ->
|
node_info() ->
|
||||||
Memory = emqx_vm:get_memory(),
|
Memory = emqx_vm:get_memory(),
|
||||||
Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]),
|
Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]),
|
||||||
BrokerInfo = emqx_sys:info(),
|
BrokerInfo = emqx_sys:info(),
|
||||||
|
@ -151,9 +162,10 @@ node_info(Node) when Node =:= node() ->
|
||||||
node_status => 'Running',
|
node_status => 'Running',
|
||||||
uptime => proplists:get_value(uptime, BrokerInfo),
|
uptime => proplists:get_value(uptime, BrokerInfo),
|
||||||
version => iolist_to_binary(proplists:get_value(version, BrokerInfo))
|
version => iolist_to_binary(proplists:get_value(version, BrokerInfo))
|
||||||
};
|
}.
|
||||||
|
|
||||||
node_info(Node) ->
|
node_info(Node) ->
|
||||||
rpc_call(Node, node_info, [Node]).
|
wrap_rpc(emqx_management_proto_v1:node_info(Node)).
|
||||||
|
|
||||||
stopped_node_info(Node) ->
|
stopped_node_info(Node) ->
|
||||||
#{name => Node, node_status => 'Stopped'}.
|
#{name => Node, node_status => 'Stopped'}.
|
||||||
|
@ -168,12 +180,12 @@ list_brokers() ->
|
||||||
lookup_broker(Node) ->
|
lookup_broker(Node) ->
|
||||||
broker_info(Node).
|
broker_info(Node).
|
||||||
|
|
||||||
broker_info(Node) when Node =:= node() ->
|
broker_info() ->
|
||||||
Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]),
|
Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]),
|
||||||
Info#{node => Node, otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'};
|
Info#{node => node(), otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'}.
|
||||||
|
|
||||||
broker_info(Node) ->
|
broker_info(Node) ->
|
||||||
rpc_call(Node, broker_info, [Node]).
|
wrap_rpc(emqx_management_proto_v1:broker_info(Node)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Metrics and Stats
|
%% Metrics and Stats
|
||||||
|
@ -182,10 +194,8 @@ broker_info(Node) ->
|
||||||
get_metrics() ->
|
get_metrics() ->
|
||||||
nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
|
nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
|
||||||
|
|
||||||
get_metrics(Node) when Node =:= node() ->
|
|
||||||
emqx_metrics:all();
|
|
||||||
get_metrics(Node) ->
|
get_metrics(Node) ->
|
||||||
rpc_call(Node, get_metrics, [Node]).
|
wrap_rpc(emqx_proto_v1:get_metrics(Node)).
|
||||||
|
|
||||||
get_stats() ->
|
get_stats() ->
|
||||||
GlobalStatsKeys =
|
GlobalStatsKeys =
|
||||||
|
@ -209,10 +219,8 @@ delete_keys(List, []) ->
|
||||||
delete_keys(List, [Key | Keys]) ->
|
delete_keys(List, [Key | Keys]) ->
|
||||||
delete_keys(proplists:delete(Key, List), Keys).
|
delete_keys(proplists:delete(Key, List), Keys).
|
||||||
|
|
||||||
get_stats(Node) when Node =:= node() ->
|
|
||||||
emqx_stats:getstats();
|
|
||||||
get_stats(Node) ->
|
get_stats(Node) ->
|
||||||
rpc_call(Node, get_stats, [Node]).
|
wrap_rpc(emqx_proto_v1:get_stats(Node)).
|
||||||
|
|
||||||
nodes_info_count(PropList) ->
|
nodes_info_count(PropList) ->
|
||||||
NodeCount =
|
NodeCount =
|
||||||
|
@ -239,24 +247,11 @@ lookup_client({username, Username}, FormatFun) ->
|
||||||
lists:append([lookup_client(Node, {username, Username}, FormatFun)
|
lists:append([lookup_client(Node, {username, Username}, FormatFun)
|
||||||
|| Node <- mria_mnesia:running_nodes()]).
|
|| Node <- mria_mnesia:running_nodes()]).
|
||||||
|
|
||||||
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
|
lookup_client(Node, Key, {M, F}) ->
|
||||||
lists:append(lists:map(
|
case wrap_rpc(emqx_broker_proto_v1:lookup_client(Node, Key)) of
|
||||||
fun(Key) ->
|
{error, Err} -> {error, Err};
|
||||||
lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key))
|
L -> lists:map(fun M:F/1, L)
|
||||||
end, ets:lookup(emqx_channel, ClientId)));
|
end.
|
||||||
|
|
||||||
lookup_client(Node, {clientid, ClientId}, FormatFun) ->
|
|
||||||
rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]);
|
|
||||||
|
|
||||||
lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() ->
|
|
||||||
MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'}
|
|
||||||
, [{'=:=','$1', Username}]
|
|
||||||
, ['$_']
|
|
||||||
}],
|
|
||||||
lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec));
|
|
||||||
|
|
||||||
lookup_client(Node, {username, Username}, FormatFun) ->
|
|
||||||
rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
|
|
||||||
|
|
||||||
kickout_client({ClientID, FormatFun}) ->
|
kickout_client({ClientID, FormatFun}) ->
|
||||||
case lookup_client({clientid, ClientID}, FormatFun) of
|
case lookup_client({clientid, ClientID}, FormatFun) of
|
||||||
|
@ -267,11 +262,8 @@ kickout_client({ClientID, FormatFun}) ->
|
||||||
check_results(Results)
|
check_results(Results)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
kickout_client(Node, ClientId) when Node =:= node() ->
|
|
||||||
emqx_cm:kick_session(ClientId);
|
|
||||||
|
|
||||||
kickout_client(Node, ClientId) ->
|
kickout_client(Node, ClientId) ->
|
||||||
rpc_call(Node, kickout_client, [Node, ClientId]).
|
wrap_rpc(emqx_broker_proto_v1:kickout_client(Node, ClientId)).
|
||||||
|
|
||||||
list_authz_cache(ClientId) ->
|
list_authz_cache(ClientId) ->
|
||||||
call_client(ClientId, list_authz_cache).
|
call_client(ClientId, list_authz_cache).
|
||||||
|
@ -287,27 +279,15 @@ list_client_subscriptions(ClientId) ->
|
||||||
[Result | _] -> Result
|
[Result | _] -> Result
|
||||||
end.
|
end.
|
||||||
|
|
||||||
client_subscriptions(Node, ClientId) when Node =:= node() ->
|
|
||||||
{Node, emqx_broker:subscriptions(ClientId)};
|
|
||||||
|
|
||||||
client_subscriptions(Node, ClientId) ->
|
client_subscriptions(Node, ClientId) ->
|
||||||
rpc_call(Node, client_subscriptions, [Node, ClientId]).
|
{Node, wrap_rpc(emqx_broker_proto_v1:client_subscriptions(Node, ClientId))}.
|
||||||
|
|
||||||
clean_authz_cache(ClientId) ->
|
clean_authz_cache(ClientId) ->
|
||||||
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
|
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
|
||||||
check_results(Results).
|
check_results(Results).
|
||||||
|
|
||||||
|
|
||||||
clean_authz_cache(Node, ClientId) when Node =:= node() ->
|
|
||||||
case emqx_cm:lookup_channels(ClientId) of
|
|
||||||
[] ->
|
|
||||||
{error, not_found};
|
|
||||||
Pids when is_list(Pids) ->
|
|
||||||
erlang:send(lists:last(Pids), clean_authz_cache),
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
clean_authz_cache(Node, ClientId) ->
|
clean_authz_cache(Node, ClientId) ->
|
||||||
rpc_call(Node, clean_authz_cache, [Node, ClientId]).
|
wrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)).
|
||||||
|
|
||||||
clean_authz_cache_all() ->
|
clean_authz_cache_all() ->
|
||||||
Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
|
Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
|
||||||
|
@ -316,11 +296,8 @@ clean_authz_cache_all() ->
|
||||||
BadNodes -> {error, BadNodes}
|
BadNodes -> {error, BadNodes}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
clean_authz_cache_all(Node) when Node =:= node() ->
|
|
||||||
emqx_authz_cache:drain_cache();
|
|
||||||
|
|
||||||
clean_authz_cache_all(Node) ->
|
clean_authz_cache_all(Node) ->
|
||||||
rpc_call(Node, clean_authz_cache_all, [Node]).
|
wrap_rpc(emqx_proto_v1:clean_authz_cache(Node)).
|
||||||
|
|
||||||
set_ratelimit_policy(ClientId, Policy) ->
|
set_ratelimit_policy(ClientId, Policy) ->
|
||||||
call_client(ClientId, {ratelimit, Policy}).
|
call_client(ClientId, {ratelimit, Policy}).
|
||||||
|
@ -363,14 +340,15 @@ call_client(Node, ClientId, Req) ->
|
||||||
%% Subscriptions
|
%% Subscriptions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
list_subscriptions(Node) when Node =:= node() ->
|
-spec do_list_subscriptions() -> [map()].
|
||||||
|
do_list_subscriptions() ->
|
||||||
case check_row_limit([mqtt_subproperty]) of
|
case check_row_limit([mqtt_subproperty]) of
|
||||||
false -> throw(max_row_limit);
|
false -> throw(max_row_limit);
|
||||||
ok -> [item(subscription, Sub) || Sub <- ets:tab2list(mqtt_subproperty)]
|
ok -> [item(subscription, Sub) || Sub <- ets:tab2list(mqtt_subproperty)]
|
||||||
end;
|
end.
|
||||||
|
|
||||||
list_subscriptions(Node) ->
|
list_subscriptions(Node) ->
|
||||||
rpc_call(Node, list_subscriptions, [Node]).
|
wrap_rpc(emqx_management_proto_v1:list_subscriptions(Node)).
|
||||||
|
|
||||||
list_subscriptions_via_topic(Topic, FormatFun) ->
|
list_subscriptions_via_topic(Topic, FormatFun) ->
|
||||||
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|
||||||
|
@ -455,14 +433,14 @@ do_unsubscribe(ClientId, Topic) ->
|
||||||
%% Listeners
|
%% Listeners
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
do_list_listeners() ->
|
||||||
|
[Conf#{node => node(), id => Id} || {Id, Conf} <- emqx_listeners:list()].
|
||||||
|
|
||||||
list_listeners() ->
|
list_listeners() ->
|
||||||
lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
|
lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
|
||||||
|
|
||||||
list_listeners(Node) when Node =:= node() ->
|
|
||||||
[Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()];
|
|
||||||
|
|
||||||
list_listeners(Node) ->
|
list_listeners(Node) ->
|
||||||
rpc_call(Node, list_listeners, [Node]).
|
wrap_rpc(emqx_management_proto_v1:list_listeners(Node)).
|
||||||
|
|
||||||
list_listeners_by_id(Id) ->
|
list_listeners_by_id(Id) ->
|
||||||
listener_id_filter(Id, list_listeners()).
|
listener_id_filter(Id, list_listeners()).
|
||||||
|
@ -479,10 +457,7 @@ listener_id_filter(Id, Listeners) ->
|
||||||
Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
|
Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
|
||||||
lists:filter(Filter, Listeners).
|
lists:filter(Filter, Listeners).
|
||||||
|
|
||||||
|
-spec manage_listener( listener_manage_op()
|
||||||
-spec manage_listener( Operation :: start_listener
|
|
||||||
| stop_listener
|
|
||||||
| restart_listener
|
|
||||||
, Param :: map()) ->
|
, Param :: map()) ->
|
||||||
ok | {error, Reason :: term()}.
|
ok | {error, Reason :: term()}.
|
||||||
manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->
|
manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->
|
||||||
|
@ -593,6 +568,11 @@ rpc_call(Node, Fun, Args) ->
|
||||||
Res -> Res
|
Res -> Res
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
wrap_rpc({badrpc, Reason}) ->
|
||||||
|
{error, Reason};
|
||||||
|
wrap_rpc(Res) ->
|
||||||
|
Res.
|
||||||
|
|
||||||
otp_rel() ->
|
otp_rel() ->
|
||||||
lists:concat([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
|
lists:concat([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
|
||||||
|
|
||||||
|
@ -610,7 +590,7 @@ check_row_limit([Tab | Tables], Limit) ->
|
||||||
check_results(Results) ->
|
check_results(Results) ->
|
||||||
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> lists:last(Results)
|
false -> wrap_rpc(lists:last(Results))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
max_row_limit() ->
|
max_row_limit() ->
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_management_proto_v1).
|
||||||
|
|
||||||
|
-behaviour(emqx_bpapi).
|
||||||
|
|
||||||
|
-export([ introduced_in/0
|
||||||
|
|
||||||
|
, node_info/1
|
||||||
|
, broker_info/1
|
||||||
|
, list_subscriptions/1
|
||||||
|
|
||||||
|
, list_listeners/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-include_lib("emqx/include/bpapi.hrl").
|
||||||
|
|
||||||
|
introduced_in() ->
|
||||||
|
"5.0.0".
|
||||||
|
|
||||||
|
-spec node_info(node()) -> map() | {badrpc, _}.
|
||||||
|
node_info(Node) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, node_info, []).
|
||||||
|
|
||||||
|
-spec broker_info(node()) -> map() | {badrpc, _}.
|
||||||
|
broker_info(Node) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, broker_info, []).
|
||||||
|
|
||||||
|
-spec list_subscriptions(node()) -> [map()] | {badrpc, _}.
|
||||||
|
list_subscriptions(Node) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
|
||||||
|
|
||||||
|
-spec list_listeners(node()) -> [map()] | {badrpc, _}.
|
||||||
|
list_listeners(Node) ->
|
||||||
|
rpc:call(Node, emqx_mgmt, do_list_listeners, []).
|
Loading…
Reference in New Issue