emqx/apps/emqx_management/src/emqx_mgmt.erl

654 lines
21 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mgmt).
-include("emqx_mgmt.hrl").
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
-elvis([{elvis_style, god_modules, disable}]).
-include_lib("stdlib/include/qlc.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
%% Nodes and Brokers API
-export([ list_nodes/0
, lookup_node/1
, list_brokers/0
, lookup_broker/1
, node_info/1
, broker_info/1
]).
%% Metrics and Stats
-export([ get_metrics/0
, get_metrics/1
, get_stats/0
, get_stats/1
]).
%% Clients, Sessions
-export([ lookup_client/2
, lookup_client/3
, kickout_client/1
, list_authz_cache/1
, list_client_subscriptions/1
, client_subscriptions/2
, clean_authz_cache/1
, clean_authz_cache/2
, clean_authz_cache_all/0
, clean_authz_cache_all/1
, set_ratelimit_policy/2
, set_quota_policy/2
, set_keepalive/2
]).
%% Internal funcs
-export([call_client/3]).
%% Subscriptions
-export([ list_subscriptions/1
, list_subscriptions_via_topic/2
, list_subscriptions_via_topic/3
, lookup_subscriptions/1
, lookup_subscriptions/2
]).
%% Routes
-export([ lookup_routes/1
]).
%% PubSub
-export([ subscribe/2
, do_subscribe/2
, publish/1
, unsubscribe/2
, do_unsubscribe/2
]).
%% Plugins
-export([ list_plugins/0
, list_plugins/1
, load_plugin/2
, unload_plugin/2
, reload_plugin/2
]).
%% Listeners
-export([ list_listeners/0
, list_listeners/1
, list_listeners_by_id/1
, get_listener/2
, manage_listener/2
, update_listener/2
, update_listener/3
, remove_listener/1
, remove_listener/2
]).
%% Alarms
-export([ get_alarms/1
, get_alarms/2
, deactivate/2
, delete_all_deactivated_alarms/0
, delete_all_deactivated_alarms/1
]).
%% Banned
-export([ create_banned/1
, delete_banned/1
]).
%% Common Table API
-export([ item/2
, max_row_limit/0
]).
-export([ return/0
, return/1]).
-define(APP, emqx_management).
-elvis([{elvis_style, god_modules, disable}]).
%% TODO: remove these function after all api use minirest version 1.X
return() ->
ok.
return(_Response) ->
ok.
%%--------------------------------------------------------------------
%% Node Info
%%--------------------------------------------------------------------
list_nodes() ->
Running = mnesia:system_info(running_db_nodes),
Stopped = mnesia:system_info(db_nodes) -- Running,
DownNodes = lists:map(fun stopped_node_info/1, Stopped),
[{Node, node_info(Node)} || Node <- Running] ++ DownNodes.
lookup_node(Node) -> node_info(Node).
node_info(Node) when Node =:= node() ->
Memory = emqx_vm:get_memory(),
Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]),
BrokerInfo = emqx_sys:info(),
Info#{node => node(),
otp_release => iolist_to_binary(otp_rel()),
memory_total => proplists:get_value(allocated, Memory),
memory_used => proplists:get_value(total, Memory),
process_available => erlang:system_info(process_limit),
process_used => erlang:system_info(process_count),
max_fds => proplists:get_value(
max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))),
connections => ets:info(emqx_channel, size),
node_status => 'Running',
uptime => proplists:get_value(uptime, BrokerInfo),
version => iolist_to_binary(proplists:get_value(version, BrokerInfo))
};
node_info(Node) ->
rpc_call(Node, node_info, [Node]).
stopped_node_info(Node) ->
#{name => Node, node_status => 'Stopped'}.
%%--------------------------------------------------------------------
%% Brokers
%%--------------------------------------------------------------------
list_brokers() ->
[{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()].
lookup_broker(Node) ->
broker_info(Node).
broker_info(Node) when Node =:= node() ->
Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]),
Info#{node => Node, otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'};
broker_info(Node) ->
rpc_call(Node, broker_info, [Node]).
%%--------------------------------------------------------------------
%% Metrics and Stats
%%--------------------------------------------------------------------
get_metrics() ->
nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
get_metrics(Node) when Node =:= node() ->
emqx_metrics:all();
get_metrics(Node) ->
rpc_call(Node, get_metrics, [Node]).
get_stats() ->
GlobalStatsKeys =
[ 'retained.count'
, 'retained.max'
, 'routes.count'
, 'routes.max'
, 'subscriptions.shared.count'
, 'subscriptions.shared.max'
],
CountStats = nodes_info_count([
begin
Stats = get_stats(Node),
delete_keys(Stats, GlobalStatsKeys)
end || Node <- mria_mnesia:running_nodes()]),
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
maps:merge(CountStats, GlobalStats).
delete_keys(List, []) ->
List;
delete_keys(List, [Key | Keys]) ->
delete_keys(proplists:delete(Key, List), Keys).
get_stats(Node) when Node =:= node() ->
emqx_stats:getstats();
get_stats(Node) ->
rpc_call(Node, get_stats, [Node]).
nodes_info_count(PropList) ->
NodeCount =
fun({Key, Value}, Result) ->
Count = maps:get(Key, Result, 0),
Result#{Key => Count + Value}
end,
AllCount =
fun(StatsMap, Result) ->
lists:foldl(NodeCount, Result, StatsMap)
end,
lists:foldl(AllCount, #{}, PropList).
%%--------------------------------------------------------------------
%% Clients
%%--------------------------------------------------------------------
lookup_client({clientid, ClientId}, FormatFun) ->
lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun)
|| Node <- mria_mnesia:running_nodes()]);
lookup_client({username, Username}, FormatFun) ->
lists:append([lookup_client(Node, {username, Username}, FormatFun)
|| Node <- mria_mnesia:running_nodes()]).
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
lists:append(lists:map(
fun(Key) ->
lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key))
end, ets:lookup(emqx_channel, ClientId)));
lookup_client(Node, {clientid, ClientId}, FormatFun) ->
rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]);
lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() ->
MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'}
, [{'=:=','$1', Username}]
, ['$_']
}],
lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec));
lookup_client(Node, {username, Username}, FormatFun) ->
rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
kickout_client({ClientID, FormatFun}) ->
case lookup_client({clientid, ClientID}, FormatFun) of
[] ->
{error, not_found};
_ ->
Results = [kickout_client(Node, ClientID) || Node <- mria_mnesia:running_nodes()],
check_results(Results)
end.
kickout_client(Node, ClientId) when Node =:= node() ->
emqx_cm:kick_session(ClientId);
kickout_client(Node, ClientId) ->
rpc_call(Node, kickout_client, [Node, ClientId]).
list_authz_cache(ClientId) ->
call_client(ClientId, list_authz_cache).
list_client_subscriptions(ClientId) ->
Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
Expected = lists:filter(fun({error, _}) -> false;
([]) -> false;
(_) -> true
end, Results),
case Expected of
[] -> [];
[Result | _] -> Result
end.
client_subscriptions(Node, ClientId) when Node =:= node() ->
{Node, emqx_broker:subscriptions(ClientId)};
client_subscriptions(Node, ClientId) ->
rpc_call(Node, client_subscriptions, [Node, ClientId]).
clean_authz_cache(ClientId) ->
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
check_results(Results).
clean_authz_cache(Node, ClientId) when Node =:= node() ->
case emqx_cm:lookup_channels(ClientId) of
[] ->
{error, not_found};
Pids when is_list(Pids) ->
erlang:send(lists:last(Pids), clean_authz_cache),
ok
end;
clean_authz_cache(Node, ClientId) ->
rpc_call(Node, clean_authz_cache, [Node, ClientId]).
clean_authz_cache_all() ->
Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
[] -> ok;
BadNodes -> {error, BadNodes}
end.
clean_authz_cache_all(Node) when Node =:= node() ->
emqx_authz_cache:drain_cache();
clean_authz_cache_all(Node) ->
rpc_call(Node, clean_authz_cache_all, [Node]).
set_ratelimit_policy(ClientId, Policy) ->
call_client(ClientId, {ratelimit, Policy}).
set_quota_policy(ClientId, Policy) ->
call_client(ClientId, {quota, Policy}).
set_keepalive(ClientId, Interval) ->
call_client(ClientId, {keepalive, Interval}).
%% @private
call_client(ClientId, Req) ->
Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()],
Expected = lists:filter(fun({error, _}) -> false;
(_) -> true
end, Results),
case Expected of
[] -> {error, not_found};
[Result | _] -> Result
end.
%% @private
call_client(Node, ClientId, Req) when Node =:= node() ->
case emqx_cm:lookup_channels(ClientId) of
[] -> {error, not_found};
Pids when is_list(Pids) ->
Pid = lists:last(Pids),
case emqx_cm:get_chan_info(ClientId, Pid) of
#{conninfo := #{conn_mod := ConnMod}} ->
erlang:apply(ConnMod, call, [Pid, Req]);
undefined -> {error, not_found}
end
end;
call_client(Node, ClientId, Req) ->
rpc_call(Node, call_client, [Node, ClientId, Req]).
%%--------------------------------------------------------------------
%% Subscriptions
%%--------------------------------------------------------------------
list_subscriptions(Node) when Node =:= node() ->
case check_row_limit([mqtt_subproperty]) of
false -> throw(max_row_limit);
ok -> [item(subscription, Sub) || Sub <- ets:tab2list(mqtt_subproperty)]
end;
list_subscriptions(Node) ->
rpc_call(Node, list_subscriptions, [Node]).
list_subscriptions_via_topic(Topic, FormatFun) ->
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|| Node <- mria_mnesia:running_nodes()]).
list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]);
list_subscriptions_via_topic(Node, Topic, FormatFun) ->
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
lookup_subscriptions(ClientId) ->
lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
lookup_subscriptions(Node, ClientId) when Node =:= node() ->
case ets:lookup(emqx_subid, ClientId) of
[] -> [];
[{_, Pid}] ->
ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
end;
lookup_subscriptions(Node, ClientId) ->
rpc_call(Node, lookup_subscriptions, [Node, ClientId]).
%%--------------------------------------------------------------------
%% Routes
%%--------------------------------------------------------------------
lookup_routes(Topic) ->
emqx_router:lookup_routes(Topic).
%%--------------------------------------------------------------------
%% PubSub
%%--------------------------------------------------------------------
subscribe(ClientId, TopicTables) ->
subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
subscribe([Node | Nodes], ClientId, TopicTables) ->
case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
Re -> Re
end;
subscribe([], _ClientId, _TopicTables) ->
{error, channel_not_found}.
do_subscribe(ClientId, TopicTables) ->
case ets:lookup(emqx_channel, ClientId) of
[] -> {error, channel_not_found};
[{_, Pid}] ->
Pid ! {subscribe, TopicTables}
end.
%%TODO: ???
publish(Msg) ->
emqx_metrics:inc_msg(Msg),
emqx:publish(Msg).
unsubscribe(ClientId, Topic) ->
unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic).
unsubscribe([Node | Nodes], ClientId, Topic) ->
case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
Re -> Re
end;
unsubscribe([], _ClientId, _Topic) ->
{error, channel_not_found}.
do_unsubscribe(ClientId, Topic) ->
case ets:lookup(emqx_channel, ClientId) of
[] -> {error, channel_not_found};
[{_, Pid}] ->
Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
end.
%%--------------------------------------------------------------------
%% Plugins
%%--------------------------------------------------------------------
list_plugins() ->
[{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()].
list_plugins(Node) when Node =:= node() ->
emqx_plugins:list();
list_plugins(Node) ->
rpc_call(Node, list_plugins, [Node]).
load_plugin(Node, Plugin) when Node =:= node() ->
emqx_plugins:load(Plugin);
load_plugin(Node, Plugin) ->
rpc_call(Node, load_plugin, [Node, Plugin]).
unload_plugin(Node, Plugin) when Node =:= node() ->
emqx_plugins:unload(Plugin);
unload_plugin(Node, Plugin) ->
rpc_call(Node, unload_plugin, [Node, Plugin]).
reload_plugin(Node, Plugin) when Node =:= node() ->
emqx_plugins:reload(Plugin);
reload_plugin(Node, Plugin) ->
rpc_call(Node, reload_plugin, [Node, Plugin]).
%%--------------------------------------------------------------------
%% Listeners
%%--------------------------------------------------------------------
list_listeners() ->
lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
list_listeners(Node) when Node =:= node() ->
[Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()];
list_listeners(Node) ->
rpc_call(Node, list_listeners, [Node]).
list_listeners_by_id(Id) ->
listener_id_filter(Id, list_listeners()).
get_listener(Node, Id) ->
case listener_id_filter(Id, list_listeners(Node)) of
[] ->
{error, not_found};
[Listener] ->
Listener
end.
listener_id_filter(Id, Listeners) ->
Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
lists:filter(Filter, Listeners).
-spec manage_listener( Operation :: start_listener
| stop_listener
| restart_listener
, Param :: map()) ->
ok | {error, Reason :: term()}.
manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->
erlang:apply(emqx_listeners, Operation, [ID]);
manage_listener(Operation, Param = #{node := Node}) ->
rpc_call(Node, manage_listener, [Operation, Param]).
update_listener(Id, Config) ->
[update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
update_listener(Node, Id, Config) when Node =:= node() ->
case emqx_listeners:parse_listener_id(Id) of
{error, {invalid_listener_id, Id}} ->
{error, {invalid_listener_id, Id}};
{Type, Name} ->
case emqx:update_config([listeners, Type, Name], Config, #{}) of
{ok, #{raw_config := RawConf}} ->
RawConf#{node => Node, id => Id, running => true};
{error, Reason} ->
{error, Reason}
end
end;
update_listener(Node, Id, Config) ->
rpc_call(Node, update_listener, [Node, Id, Config]).
remove_listener(Id) ->
[remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
remove_listener(Node, Id) when Node =:= node() ->
{Type, Name} = emqx_listeners:parse_listener_id(Id),
case emqx:remove_config([listeners, Type, Name], #{}) of
{ok, _} -> ok;
{error, Reason} ->
error(Reason)
end;
remove_listener(Node, Id) ->
rpc_call(Node, remove_listener, [Node, Id]).
%%--------------------------------------------------------------------
%% Get Alarms
%%--------------------------------------------------------------------
get_alarms(Type) ->
[{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
get_alarms(Node, Type) when Node =:= node() ->
add_duration_field(emqx_alarm:get_alarms(Type));
get_alarms(Node, Type) ->
rpc_call(Node, get_alarms, [Node, Type]).
deactivate(Node, Name) when Node =:= node() ->
emqx_alarm:deactivate(Name);
deactivate(Node, Name) ->
rpc_call(Node, deactivate, [Node, Name]).
delete_all_deactivated_alarms() ->
[delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
delete_all_deactivated_alarms(Node) when Node =:= node() ->
emqx_alarm:delete_all_deactivated_alarms();
delete_all_deactivated_alarms(Node) ->
rpc_call(Node, delete_deactivated_alarms, [Node]).
add_duration_field(Alarms) ->
Now = erlang:system_time(microsecond),
add_duration_field(Alarms, Now, []).
add_duration_field([], _Now, Acc) ->
Acc;
add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt} | Rest], Now, Acc) ->
add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]);
add_duration_field( [Alarm = #{ activated := false
, activate_at := ActivateAt
, deactivate_at := DeactivateAt} | Rest]
, Now, Acc) ->
add_duration_field(Rest, Now, [Alarm#{duration => DeactivateAt - ActivateAt} | Acc]).
%%--------------------------------------------------------------------
%% Banned API
%%--------------------------------------------------------------------
create_banned(Banned) ->
emqx_banned:create(Banned).
delete_banned(Who) ->
emqx_banned:delete(Who).
%%--------------------------------------------------------------------
%% Common Table API
%%--------------------------------------------------------------------
item(subscription, {{Topic, ClientId}, Options}) ->
#{topic => Topic, clientid => ClientId, options => Options};
item(route, #route{topic = Topic, dest = Node}) ->
#{topic => Topic, node => Node};
item(route, {Topic, Node}) ->
#{topic => Topic, node => Node}.
%%--------------------------------------------------------------------
%% Internal Functions.
%%--------------------------------------------------------------------
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
otp_rel() ->
lists:concat([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
check_row_limit(Tables) ->
check_row_limit(Tables, max_row_limit()).
check_row_limit([], _Limit) ->
ok;
check_row_limit([Tab | Tables], Limit) ->
case table_size(Tab) > Limit of
true -> false;
false -> check_row_limit(Tables, Limit)
end.
check_results(Results) ->
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
max_row_limit() ->
?MAX_ROW_LIMIT.
table_size(Tab) -> ets:info(Tab, size).