refactor(emqx_mgmt): Decorate RPCs

This commit is contained in:
k32 2022-01-13 20:46:33 +01:00
parent aed010da05
commit 288f9254ba
5 changed files with 133 additions and 43 deletions

View File

@ -28,6 +28,7 @@
, get_cache_ttl/0 , get_cache_ttl/0
, is_enabled/0 , is_enabled/0
, drain_cache/0 , drain_cache/0
, drain_cache/1
]). ]).
%% export for test %% export for test
@ -154,6 +155,16 @@ drain_cache() ->
_ = persistent_term:put(drain_k(), time_now()), _ = persistent_term:put(drain_k(), time_now()),
ok. ok.
-spec drain_cache(emqx_types:clientid()) -> ok | {error, not_found}.
drain_cache(ClientId) ->
case emqx_cm:lookup_channels(ClientId) of
[] ->
{error, not_found};
Pids when is_list(Pids) ->
erlang:send(lists:last(Pids), clean_authz_cache),
ok
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -19,8 +19,12 @@
-behaviour(emqx_bpapi). -behaviour(emqx_bpapi).
-export([ introduced_in/0 -export([ introduced_in/0
, forward/3 , forward/3
, forward_async/3 , forward_async/3
, client_subscriptions/2
, kickout_client/2
]). ]).
-include("bpapi.hrl"). -include("bpapi.hrl").
@ -37,3 +41,13 @@ forward(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
-spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true. -spec forward_async(node(), emqx_types:topic(), emqx_types:delivery()) -> true.
forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) -> forward_async(Node, Topic, Delivery = #delivery{}) when is_binary(Topic) ->
emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]). emqx_rpc:cast(Topic, Node, emqx_broker, dispatch, [Topic, Delivery]).
-spec client_subscriptions(node(), emqx_types:clientid()) ->
[{emqx_types:topic(), emqx_types:subopts()}]
| emqx_rpc:badrpc().
client_subscriptions(Node, ClientId) ->
rpc:call(Node, emqx_broker, subscriptions, [ClientId]).
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
kickout_client(Node, ClientId) ->
rpc:call(Node, emqx_cm, kick_session, [ClientId]).

View File

@ -21,7 +21,14 @@
-include("bpapi.hrl"). -include("bpapi.hrl").
-export([ introduced_in/0 -export([ introduced_in/0
, is_running/1 , is_running/1
, get_stats/1
, get_metrics/1
, clean_authz_cache/1
, clean_authz_cache/2
]). ]).
introduced_in() -> introduced_in() ->
@ -30,3 +37,22 @@ introduced_in() ->
-spec is_running(node()) -> boolean() | {badrpc, term()}. -spec is_running(node()) -> boolean() | {badrpc, term()}.
is_running(Node) -> is_running(Node) ->
rpc:call(Node, emqx, is_running, []). rpc:call(Node, emqx, is_running, []).
-spec get_stats(node()) -> emqx_stats:stats() | {badrpc, _}.
get_stats(Node) ->
rpc:call(Node, emqx_stats, getstats, []).
-spec get_metrics(node()) -> [{emqx_metrics:metric_name(), non_neg_integer()}] | {badrpc, _}.
get_metrics(Node) ->
rpc:call(Node, emqx_metrics, all, []).
-spec clean_authz_cache(node(), emqx_types:clientid()) ->
ok
| {error, not_found}
| {badrpc, _}.
clean_authz_cache(Node, ClientId) ->
rpc:call(Node, emqx_authz_cache, drain_cache, [ClientId]).
-spec clean_authz_cache(node()) -> ok | {badrpc, _}.
clean_authz_cache(Node) ->
rpc:call(Node, emqx_authz_cache, drain_cache, []).

View File

@ -29,7 +29,9 @@
, lookup_node/1 , lookup_node/1
, list_brokers/0 , list_brokers/0
, lookup_broker/1 , lookup_broker/1
, node_info/0
, node_info/1 , node_info/1
, broker_info/0
, broker_info/1 , broker_info/1
]). ]).
@ -65,6 +67,8 @@
, list_subscriptions_via_topic/3 , list_subscriptions_via_topic/3
, lookup_subscriptions/1 , lookup_subscriptions/1
, lookup_subscriptions/2 , lookup_subscriptions/2
, do_list_subscriptions/0
]). ]).
%% Routes %% Routes
@ -80,7 +84,8 @@
]). ]).
%% Listeners %% Listeners
-export([ list_listeners/0 -export([ do_list_listeners/0
, list_listeners/0
, list_listeners/1 , list_listeners/1
, list_listeners_by_id/1 , list_listeners_by_id/1
, get_listener/2 , get_listener/2
@ -134,7 +139,7 @@ list_nodes() ->
lookup_node(Node) -> node_info(Node). lookup_node(Node) -> node_info(Node).
node_info(Node) when Node =:= node() -> node_info() ->
Memory = emqx_vm:get_memory(), Memory = emqx_vm:get_memory(),
Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]), Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]),
BrokerInfo = emqx_sys:info(), BrokerInfo = emqx_sys:info(),
@ -151,9 +156,10 @@ node_info(Node) when Node =:= node() ->
node_status => 'Running', node_status => 'Running',
uptime => proplists:get_value(uptime, BrokerInfo), uptime => proplists:get_value(uptime, BrokerInfo),
version => iolist_to_binary(proplists:get_value(version, BrokerInfo)) version => iolist_to_binary(proplists:get_value(version, BrokerInfo))
}; }.
node_info(Node) -> node_info(Node) ->
rpc_call(Node, node_info, [Node]). wrap_rpc(emqx_management_proto_v1:node_info(Node)).
stopped_node_info(Node) -> stopped_node_info(Node) ->
#{name => Node, node_status => 'Stopped'}. #{name => Node, node_status => 'Stopped'}.
@ -168,12 +174,12 @@ list_brokers() ->
lookup_broker(Node) -> lookup_broker(Node) ->
broker_info(Node). broker_info(Node).
broker_info(Node) when Node =:= node() -> broker_info() ->
Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]), Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]),
Info#{node => Node, otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'}; Info#{node => node(), otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'}.
broker_info(Node) -> broker_info(Node) ->
rpc_call(Node, broker_info, [Node]). wrap_rpc(emqx_management_proto_v1:broker_info(Node)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Metrics and Stats %% Metrics and Stats
@ -182,10 +188,8 @@ broker_info(Node) ->
get_metrics() -> get_metrics() ->
nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]). nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
get_metrics(Node) when Node =:= node() ->
emqx_metrics:all();
get_metrics(Node) -> get_metrics(Node) ->
rpc_call(Node, get_metrics, [Node]). wrap_rpc(emqx_proto_v1:get_metrics(Node)).
get_stats() -> get_stats() ->
GlobalStatsKeys = GlobalStatsKeys =
@ -209,10 +213,8 @@ delete_keys(List, []) ->
delete_keys(List, [Key | Keys]) -> delete_keys(List, [Key | Keys]) ->
delete_keys(proplists:delete(Key, List), Keys). delete_keys(proplists:delete(Key, List), Keys).
get_stats(Node) when Node =:= node() ->
emqx_stats:getstats();
get_stats(Node) -> get_stats(Node) ->
rpc_call(Node, get_stats, [Node]). wrap_rpc(emqx_proto_v1:get_stats(Node)).
nodes_info_count(PropList) -> nodes_info_count(PropList) ->
NodeCount = NodeCount =
@ -267,11 +269,8 @@ kickout_client({ClientID, FormatFun}) ->
check_results(Results) check_results(Results)
end. end.
kickout_client(Node, ClientId) when Node =:= node() ->
emqx_cm:kick_session(ClientId);
kickout_client(Node, ClientId) -> kickout_client(Node, ClientId) ->
rpc_call(Node, kickout_client, [Node, ClientId]). wrap_rpc(emqx_broker_proto_v1:kickout_client(Node, ClientId)).
list_authz_cache(ClientId) -> list_authz_cache(ClientId) ->
call_client(ClientId, list_authz_cache). call_client(ClientId, list_authz_cache).
@ -287,27 +286,15 @@ list_client_subscriptions(ClientId) ->
[Result | _] -> Result [Result | _] -> Result
end. end.
client_subscriptions(Node, ClientId) when Node =:= node() ->
{Node, emqx_broker:subscriptions(ClientId)};
client_subscriptions(Node, ClientId) -> client_subscriptions(Node, ClientId) ->
rpc_call(Node, client_subscriptions, [Node, ClientId]). {Node, wrap_rpc(emqx_broker_proto_v1:client_subscriptions(Node, ClientId))}.
clean_authz_cache(ClientId) -> clean_authz_cache(ClientId) ->
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()], Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
check_results(Results). check_results(Results).
clean_authz_cache(Node, ClientId) when Node =:= node() ->
case emqx_cm:lookup_channels(ClientId) of
[] ->
{error, not_found};
Pids when is_list(Pids) ->
erlang:send(lists:last(Pids), clean_authz_cache),
ok
end;
clean_authz_cache(Node, ClientId) -> clean_authz_cache(Node, ClientId) ->
rpc_call(Node, clean_authz_cache, [Node, ClientId]). wrap_rpc(emqx_proto_v1:clean_authz_cache(Node, ClientId)).
clean_authz_cache_all() -> clean_authz_cache_all() ->
Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()], Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
@ -316,11 +303,8 @@ clean_authz_cache_all() ->
BadNodes -> {error, BadNodes} BadNodes -> {error, BadNodes}
end. end.
clean_authz_cache_all(Node) when Node =:= node() ->
emqx_authz_cache:drain_cache();
clean_authz_cache_all(Node) -> clean_authz_cache_all(Node) ->
rpc_call(Node, clean_authz_cache_all, [Node]). wrap_rpc(emqx_proto_v1:clean_authz_cache(Node)).
set_ratelimit_policy(ClientId, Policy) -> set_ratelimit_policy(ClientId, Policy) ->
call_client(ClientId, {ratelimit, Policy}). call_client(ClientId, {ratelimit, Policy}).
@ -363,14 +347,15 @@ call_client(Node, ClientId, Req) ->
%% Subscriptions %% Subscriptions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
list_subscriptions(Node) when Node =:= node() -> -spec do_list_subscriptions() -> [map()].
do_list_subscriptions() ->
case check_row_limit([mqtt_subproperty]) of case check_row_limit([mqtt_subproperty]) of
false -> throw(max_row_limit); false -> throw(max_row_limit);
ok -> [item(subscription, Sub) || Sub <- ets:tab2list(mqtt_subproperty)] ok -> [item(subscription, Sub) || Sub <- ets:tab2list(mqtt_subproperty)]
end; end.
list_subscriptions(Node) -> list_subscriptions(Node) ->
rpc_call(Node, list_subscriptions, [Node]). wrap_rpc(emqx_management_proto_v1:list_subscriptions(Node)).
list_subscriptions_via_topic(Topic, FormatFun) -> list_subscriptions_via_topic(Topic, FormatFun) ->
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun) lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
@ -455,14 +440,14 @@ do_unsubscribe(ClientId, Topic) ->
%% Listeners %% Listeners
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
do_list_listeners() ->
[Conf#{node => node(), id => Id} || {Id, Conf} <- emqx_listeners:list()].
list_listeners() -> list_listeners() ->
lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]). lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
list_listeners(Node) when Node =:= node() ->
[Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()];
list_listeners(Node) -> list_listeners(Node) ->
rpc_call(Node, list_listeners, [Node]). wrap_rpc(emqx_management_proto_v1:list_listeners(Node)).
list_listeners_by_id(Id) -> list_listeners_by_id(Id) ->
listener_id_filter(Id, list_listeners()). listener_id_filter(Id, list_listeners()).
@ -593,6 +578,11 @@ rpc_call(Node, Fun, Args) ->
Res -> Res Res -> Res
end. end.
wrap_rpc({badrpc, Reason}) ->
{error, Reason};
wrap_rpc(Res) ->
Res.
otp_rel() -> otp_rel() ->
lists:concat([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]). lists:concat([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
@ -610,7 +600,7 @@ check_row_limit([Tab | Tables], Limit) ->
check_results(Results) -> check_results(Results) ->
case lists:any(fun(Item) -> Item =:= ok end, Results) of case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok; true -> ok;
false -> lists:last(Results) false -> wrap_rpc(lists:last(Results))
end. end.
max_row_limit() -> max_row_limit() ->

View File

@ -0,0 +1,49 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_management_proto_v1).
-behaviour(emqx_bpapi).
-export([ introduced_in/0
, node_info/1
, broker_info/1
, list_subscriptions/1
, list_listeners/1
]).
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.0".
-spec node_info(node()) -> map() | {badrpc, _}.
node_info(Node) ->
rpc:call(Node, emqx_mgmt, node_info, []).
-spec broker_info(node()) -> map() | {badrpc, _}.
broker_info(Node) ->
rpc:call(Node, emqx_mgmt, broker_info, []).
-spec list_subscriptions(node()) -> [map()] | {badrpc, _}.
list_subscriptions(Node) ->
rpc:call(Node, emqx_mgmt, do_list_subscriptions, []).
-spec list_listeners(node()) -> [map()] | {badrpc, _}.
list_listeners(Node) ->
rpc:call(Node, emqx_mgmt, do_list_listeners, []).