654 lines
21 KiB
Erlang
654 lines
21 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt).
|
|
|
|
-include("emqx_mgmt.hrl").
|
|
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
|
|
-elvis([{elvis_style, god_modules, disable}]).
|
|
|
|
-include_lib("stdlib/include/qlc.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
%% Nodes and Brokers API
|
|
-export([ list_nodes/0
|
|
, lookup_node/1
|
|
, list_brokers/0
|
|
, lookup_broker/1
|
|
, node_info/1
|
|
, broker_info/1
|
|
]).
|
|
|
|
%% Metrics and Stats
|
|
-export([ get_metrics/0
|
|
, get_metrics/1
|
|
, get_stats/0
|
|
, get_stats/1
|
|
]).
|
|
|
|
%% Clients, Sessions
|
|
-export([ lookup_client/2
|
|
, lookup_client/3
|
|
, kickout_client/1
|
|
, list_authz_cache/1
|
|
, list_client_subscriptions/1
|
|
, client_subscriptions/2
|
|
, clean_authz_cache/1
|
|
, clean_authz_cache/2
|
|
, clean_authz_cache_all/0
|
|
, clean_authz_cache_all/1
|
|
, set_ratelimit_policy/2
|
|
, set_quota_policy/2
|
|
, set_keepalive/2
|
|
]).
|
|
|
|
%% Internal funcs
|
|
-export([call_client/3]).
|
|
|
|
%% Subscriptions
|
|
-export([ list_subscriptions/1
|
|
, list_subscriptions_via_topic/2
|
|
, list_subscriptions_via_topic/3
|
|
, lookup_subscriptions/1
|
|
, lookup_subscriptions/2
|
|
]).
|
|
|
|
%% Routes
|
|
-export([ lookup_routes/1
|
|
]).
|
|
|
|
%% PubSub
|
|
-export([ subscribe/2
|
|
, do_subscribe/2
|
|
, publish/1
|
|
, unsubscribe/2
|
|
, do_unsubscribe/2
|
|
]).
|
|
|
|
%% Plugins
|
|
-export([ list_plugins/0
|
|
, list_plugins/1
|
|
, load_plugin/2
|
|
, unload_plugin/2
|
|
, reload_plugin/2
|
|
]).
|
|
|
|
%% Listeners
|
|
-export([ list_listeners/0
|
|
, list_listeners/1
|
|
, list_listeners_by_id/1
|
|
, get_listener/2
|
|
, manage_listener/2
|
|
, update_listener/2
|
|
, update_listener/3
|
|
, remove_listener/1
|
|
, remove_listener/2
|
|
]).
|
|
|
|
%% Alarms
|
|
-export([ get_alarms/1
|
|
, get_alarms/2
|
|
, deactivate/2
|
|
, delete_all_deactivated_alarms/0
|
|
, delete_all_deactivated_alarms/1
|
|
]).
|
|
|
|
%% Banned
|
|
-export([ create_banned/1
|
|
, delete_banned/1
|
|
]).
|
|
|
|
%% Common Table API
|
|
-export([ item/2
|
|
, max_row_limit/0
|
|
]).
|
|
|
|
-export([ return/0
|
|
, return/1]).
|
|
|
|
-define(APP, emqx_management).
|
|
|
|
-elvis([{elvis_style, god_modules, disable}]).
|
|
|
|
%% TODO: remove these function after all api use minirest version 1.X
|
|
return() ->
|
|
ok.
|
|
return(_Response) ->
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Node Info
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_nodes() ->
|
|
Running = mnesia:system_info(running_db_nodes),
|
|
Stopped = mnesia:system_info(db_nodes) -- Running,
|
|
DownNodes = lists:map(fun stopped_node_info/1, Stopped),
|
|
[{Node, node_info(Node)} || Node <- Running] ++ DownNodes.
|
|
|
|
lookup_node(Node) -> node_info(Node).
|
|
|
|
node_info(Node) when Node =:= node() ->
|
|
Memory = emqx_vm:get_memory(),
|
|
Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]),
|
|
BrokerInfo = emqx_sys:info(),
|
|
Info#{node => node(),
|
|
otp_release => iolist_to_binary(otp_rel()),
|
|
memory_total => proplists:get_value(allocated, Memory),
|
|
memory_used => proplists:get_value(total, Memory),
|
|
process_available => erlang:system_info(process_limit),
|
|
process_used => erlang:system_info(process_count),
|
|
|
|
max_fds => proplists:get_value(
|
|
max_fds, lists:usort(lists:flatten(erlang:system_info(check_io)))),
|
|
connections => ets:info(emqx_channel, size),
|
|
node_status => 'Running',
|
|
uptime => proplists:get_value(uptime, BrokerInfo),
|
|
version => iolist_to_binary(proplists:get_value(version, BrokerInfo))
|
|
};
|
|
node_info(Node) ->
|
|
rpc_call(Node, node_info, [Node]).
|
|
|
|
stopped_node_info(Node) ->
|
|
#{name => Node, node_status => 'Stopped'}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Brokers
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_brokers() ->
|
|
[{Node, broker_info(Node)} || Node <- mria_mnesia:running_nodes()].
|
|
|
|
lookup_broker(Node) ->
|
|
broker_info(Node).
|
|
|
|
broker_info(Node) when Node =:= node() ->
|
|
Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]),
|
|
Info#{node => Node, otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'};
|
|
|
|
broker_info(Node) ->
|
|
rpc_call(Node, broker_info, [Node]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Metrics and Stats
|
|
%%--------------------------------------------------------------------
|
|
|
|
get_metrics() ->
|
|
nodes_info_count([get_metrics(Node) || Node <- mria_mnesia:running_nodes()]).
|
|
|
|
get_metrics(Node) when Node =:= node() ->
|
|
emqx_metrics:all();
|
|
get_metrics(Node) ->
|
|
rpc_call(Node, get_metrics, [Node]).
|
|
|
|
get_stats() ->
|
|
GlobalStatsKeys =
|
|
[ 'retained.count'
|
|
, 'retained.max'
|
|
, 'routes.count'
|
|
, 'routes.max'
|
|
, 'subscriptions.shared.count'
|
|
, 'subscriptions.shared.max'
|
|
],
|
|
CountStats = nodes_info_count([
|
|
begin
|
|
Stats = get_stats(Node),
|
|
delete_keys(Stats, GlobalStatsKeys)
|
|
end || Node <- mria_mnesia:running_nodes()]),
|
|
GlobalStats = maps:with(GlobalStatsKeys, maps:from_list(get_stats(node()))),
|
|
maps:merge(CountStats, GlobalStats).
|
|
|
|
delete_keys(List, []) ->
|
|
List;
|
|
delete_keys(List, [Key | Keys]) ->
|
|
delete_keys(proplists:delete(Key, List), Keys).
|
|
|
|
get_stats(Node) when Node =:= node() ->
|
|
emqx_stats:getstats();
|
|
get_stats(Node) ->
|
|
rpc_call(Node, get_stats, [Node]).
|
|
|
|
nodes_info_count(PropList) ->
|
|
NodeCount =
|
|
fun({Key, Value}, Result) ->
|
|
Count = maps:get(Key, Result, 0),
|
|
Result#{Key => Count + Value}
|
|
end,
|
|
AllCount =
|
|
fun(StatsMap, Result) ->
|
|
lists:foldl(NodeCount, Result, StatsMap)
|
|
end,
|
|
lists:foldl(AllCount, #{}, PropList).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Clients
|
|
%%--------------------------------------------------------------------
|
|
|
|
lookup_client({clientid, ClientId}, FormatFun) ->
|
|
|
|
lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun)
|
|
|| Node <- mria_mnesia:running_nodes()]);
|
|
|
|
lookup_client({username, Username}, FormatFun) ->
|
|
lists:append([lookup_client(Node, {username, Username}, FormatFun)
|
|
|| Node <- mria_mnesia:running_nodes()]).
|
|
|
|
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
|
|
lists:append(lists:map(
|
|
fun(Key) ->
|
|
lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key))
|
|
end, ets:lookup(emqx_channel, ClientId)));
|
|
|
|
lookup_client(Node, {clientid, ClientId}, FormatFun) ->
|
|
rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]);
|
|
|
|
lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() ->
|
|
MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'}
|
|
, [{'=:=','$1', Username}]
|
|
, ['$_']
|
|
}],
|
|
lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec));
|
|
|
|
lookup_client(Node, {username, Username}, FormatFun) ->
|
|
rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
|
|
|
|
kickout_client({ClientID, FormatFun}) ->
|
|
case lookup_client({clientid, ClientID}, FormatFun) of
|
|
[] ->
|
|
{error, not_found};
|
|
_ ->
|
|
Results = [kickout_client(Node, ClientID) || Node <- mria_mnesia:running_nodes()],
|
|
check_results(Results)
|
|
end.
|
|
|
|
kickout_client(Node, ClientId) when Node =:= node() ->
|
|
emqx_cm:kick_session(ClientId);
|
|
|
|
kickout_client(Node, ClientId) ->
|
|
rpc_call(Node, kickout_client, [Node, ClientId]).
|
|
|
|
list_authz_cache(ClientId) ->
|
|
call_client(ClientId, list_authz_cache).
|
|
|
|
list_client_subscriptions(ClientId) ->
|
|
Results = [client_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
|
|
Expected = lists:filter(fun({error, _}) -> false;
|
|
([]) -> false;
|
|
(_) -> true
|
|
end, Results),
|
|
case Expected of
|
|
[] -> [];
|
|
[Result | _] -> Result
|
|
end.
|
|
|
|
client_subscriptions(Node, ClientId) when Node =:= node() ->
|
|
{Node, emqx_broker:subscriptions(ClientId)};
|
|
|
|
client_subscriptions(Node, ClientId) ->
|
|
rpc_call(Node, client_subscriptions, [Node, ClientId]).
|
|
|
|
clean_authz_cache(ClientId) ->
|
|
Results = [clean_authz_cache(Node, ClientId) || Node <- mria_mnesia:running_nodes()],
|
|
check_results(Results).
|
|
|
|
|
|
clean_authz_cache(Node, ClientId) when Node =:= node() ->
|
|
case emqx_cm:lookup_channels(ClientId) of
|
|
[] ->
|
|
{error, not_found};
|
|
Pids when is_list(Pids) ->
|
|
erlang:send(lists:last(Pids), clean_authz_cache),
|
|
ok
|
|
end;
|
|
clean_authz_cache(Node, ClientId) ->
|
|
rpc_call(Node, clean_authz_cache, [Node, ClientId]).
|
|
|
|
clean_authz_cache_all() ->
|
|
Results = [{Node, clean_authz_cache_all(Node)} || Node <- mria_mnesia:running_nodes()],
|
|
case lists:filter(fun({_Node, Item}) -> Item =/= ok end, Results) of
|
|
[] -> ok;
|
|
BadNodes -> {error, BadNodes}
|
|
end.
|
|
|
|
clean_authz_cache_all(Node) when Node =:= node() ->
|
|
emqx_authz_cache:drain_cache();
|
|
|
|
clean_authz_cache_all(Node) ->
|
|
rpc_call(Node, clean_authz_cache_all, [Node]).
|
|
|
|
set_ratelimit_policy(ClientId, Policy) ->
|
|
call_client(ClientId, {ratelimit, Policy}).
|
|
|
|
set_quota_policy(ClientId, Policy) ->
|
|
call_client(ClientId, {quota, Policy}).
|
|
|
|
set_keepalive(ClientId, Interval) ->
|
|
call_client(ClientId, {keepalive, Interval}).
|
|
|
|
%% @private
|
|
call_client(ClientId, Req) ->
|
|
Results = [call_client(Node, ClientId, Req) || Node <- mria_mnesia:running_nodes()],
|
|
Expected = lists:filter(fun({error, _}) -> false;
|
|
(_) -> true
|
|
end, Results),
|
|
case Expected of
|
|
[] -> {error, not_found};
|
|
[Result | _] -> Result
|
|
end.
|
|
|
|
%% @private
|
|
call_client(Node, ClientId, Req) when Node =:= node() ->
|
|
case emqx_cm:lookup_channels(ClientId) of
|
|
[] -> {error, not_found};
|
|
Pids when is_list(Pids) ->
|
|
Pid = lists:last(Pids),
|
|
case emqx_cm:get_chan_info(ClientId, Pid) of
|
|
#{conninfo := #{conn_mod := ConnMod}} ->
|
|
erlang:apply(ConnMod, call, [Pid, Req]);
|
|
undefined -> {error, not_found}
|
|
end
|
|
end;
|
|
call_client(Node, ClientId, Req) ->
|
|
rpc_call(Node, call_client, [Node, ClientId, Req]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Subscriptions
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_subscriptions(Node) when Node =:= node() ->
|
|
case check_row_limit([mqtt_subproperty]) of
|
|
false -> throw(max_row_limit);
|
|
ok -> [item(subscription, Sub) || Sub <- ets:tab2list(mqtt_subproperty)]
|
|
end;
|
|
|
|
list_subscriptions(Node) ->
|
|
rpc_call(Node, list_subscriptions, [Node]).
|
|
|
|
list_subscriptions_via_topic(Topic, FormatFun) ->
|
|
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|
|
|| Node <- mria_mnesia:running_nodes()]).
|
|
|
|
|
|
list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
|
|
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
|
|
erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]);
|
|
|
|
list_subscriptions_via_topic(Node, Topic, FormatFun) ->
|
|
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
|
|
|
|
lookup_subscriptions(ClientId) ->
|
|
lists:append([lookup_subscriptions(Node, ClientId) || Node <- mria_mnesia:running_nodes()]).
|
|
|
|
lookup_subscriptions(Node, ClientId) when Node =:= node() ->
|
|
case ets:lookup(emqx_subid, ClientId) of
|
|
[] -> [];
|
|
[{_, Pid}] ->
|
|
ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
|
|
end;
|
|
|
|
lookup_subscriptions(Node, ClientId) ->
|
|
rpc_call(Node, lookup_subscriptions, [Node, ClientId]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Routes
|
|
%%--------------------------------------------------------------------
|
|
|
|
lookup_routes(Topic) ->
|
|
emqx_router:lookup_routes(Topic).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% PubSub
|
|
%%--------------------------------------------------------------------
|
|
|
|
subscribe(ClientId, TopicTables) ->
|
|
subscribe(mria_mnesia:running_nodes(), ClientId, TopicTables).
|
|
|
|
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
|
case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of
|
|
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
|
Re -> Re
|
|
end;
|
|
|
|
subscribe([], _ClientId, _TopicTables) ->
|
|
{error, channel_not_found}.
|
|
|
|
do_subscribe(ClientId, TopicTables) ->
|
|
case ets:lookup(emqx_channel, ClientId) of
|
|
[] -> {error, channel_not_found};
|
|
[{_, Pid}] ->
|
|
Pid ! {subscribe, TopicTables}
|
|
end.
|
|
|
|
%%TODO: ???
|
|
publish(Msg) ->
|
|
emqx_metrics:inc_msg(Msg),
|
|
emqx:publish(Msg).
|
|
|
|
unsubscribe(ClientId, Topic) ->
|
|
unsubscribe(mria_mnesia:running_nodes(), ClientId, Topic).
|
|
|
|
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
|
case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of
|
|
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
|
Re -> Re
|
|
end;
|
|
|
|
unsubscribe([], _ClientId, _Topic) ->
|
|
{error, channel_not_found}.
|
|
|
|
do_unsubscribe(ClientId, Topic) ->
|
|
case ets:lookup(emqx_channel, ClientId) of
|
|
[] -> {error, channel_not_found};
|
|
[{_, Pid}] ->
|
|
Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Plugins
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_plugins() ->
|
|
[{Node, list_plugins(Node)} || Node <- mria_mnesia:running_nodes()].
|
|
|
|
list_plugins(Node) when Node =:= node() ->
|
|
emqx_plugins:list();
|
|
list_plugins(Node) ->
|
|
rpc_call(Node, list_plugins, [Node]).
|
|
|
|
load_plugin(Node, Plugin) when Node =:= node() ->
|
|
emqx_plugins:load(Plugin);
|
|
load_plugin(Node, Plugin) ->
|
|
rpc_call(Node, load_plugin, [Node, Plugin]).
|
|
|
|
unload_plugin(Node, Plugin) when Node =:= node() ->
|
|
emqx_plugins:unload(Plugin);
|
|
unload_plugin(Node, Plugin) ->
|
|
rpc_call(Node, unload_plugin, [Node, Plugin]).
|
|
|
|
reload_plugin(Node, Plugin) when Node =:= node() ->
|
|
emqx_plugins:reload(Plugin);
|
|
reload_plugin(Node, Plugin) ->
|
|
rpc_call(Node, reload_plugin, [Node, Plugin]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Listeners
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_listeners() ->
|
|
lists:append([list_listeners(Node) || Node <- mria_mnesia:running_nodes()]).
|
|
|
|
list_listeners(Node) when Node =:= node() ->
|
|
[Conf#{node => Node, id => Id} || {Id, Conf} <- emqx_listeners:list()];
|
|
|
|
list_listeners(Node) ->
|
|
rpc_call(Node, list_listeners, [Node]).
|
|
|
|
list_listeners_by_id(Id) ->
|
|
listener_id_filter(Id, list_listeners()).
|
|
|
|
get_listener(Node, Id) ->
|
|
case listener_id_filter(Id, list_listeners(Node)) of
|
|
[] ->
|
|
{error, not_found};
|
|
[Listener] ->
|
|
Listener
|
|
end.
|
|
|
|
listener_id_filter(Id, Listeners) ->
|
|
Filter = fun(#{id := Id0}) -> Id0 =:= Id end,
|
|
lists:filter(Filter, Listeners).
|
|
|
|
|
|
-spec manage_listener( Operation :: start_listener
|
|
| stop_listener
|
|
| restart_listener
|
|
, Param :: map()) ->
|
|
ok | {error, Reason :: term()}.
|
|
manage_listener(Operation, #{id := ID, node := Node}) when Node =:= node()->
|
|
erlang:apply(emqx_listeners, Operation, [ID]);
|
|
manage_listener(Operation, Param = #{node := Node}) ->
|
|
rpc_call(Node, manage_listener, [Operation, Param]).
|
|
|
|
update_listener(Id, Config) ->
|
|
[update_listener(Node, Id, Config) || Node <- mria_mnesia:running_nodes()].
|
|
|
|
update_listener(Node, Id, Config) when Node =:= node() ->
|
|
case emqx_listeners:parse_listener_id(Id) of
|
|
{error, {invalid_listener_id, Id}} ->
|
|
{error, {invalid_listener_id, Id}};
|
|
{Type, Name} ->
|
|
case emqx:update_config([listeners, Type, Name], Config, #{}) of
|
|
{ok, #{raw_config := RawConf}} ->
|
|
RawConf#{node => Node, id => Id, running => true};
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end
|
|
end;
|
|
update_listener(Node, Id, Config) ->
|
|
rpc_call(Node, update_listener, [Node, Id, Config]).
|
|
|
|
remove_listener(Id) ->
|
|
[remove_listener(Node, Id) || Node <- mria_mnesia:running_nodes()].
|
|
|
|
remove_listener(Node, Id) when Node =:= node() ->
|
|
{Type, Name} = emqx_listeners:parse_listener_id(Id),
|
|
case emqx:remove_config([listeners, Type, Name], #{}) of
|
|
{ok, _} -> ok;
|
|
{error, Reason} ->
|
|
error(Reason)
|
|
end;
|
|
remove_listener(Node, Id) ->
|
|
rpc_call(Node, remove_listener, [Node, Id]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Get Alarms
|
|
%%--------------------------------------------------------------------
|
|
|
|
get_alarms(Type) ->
|
|
[{Node, get_alarms(Node, Type)} || Node <- mria_mnesia:running_nodes()].
|
|
|
|
get_alarms(Node, Type) when Node =:= node() ->
|
|
add_duration_field(emqx_alarm:get_alarms(Type));
|
|
get_alarms(Node, Type) ->
|
|
rpc_call(Node, get_alarms, [Node, Type]).
|
|
|
|
deactivate(Node, Name) when Node =:= node() ->
|
|
emqx_alarm:deactivate(Name);
|
|
deactivate(Node, Name) ->
|
|
rpc_call(Node, deactivate, [Node, Name]).
|
|
|
|
delete_all_deactivated_alarms() ->
|
|
[delete_all_deactivated_alarms(Node) || Node <- mria_mnesia:running_nodes()].
|
|
|
|
delete_all_deactivated_alarms(Node) when Node =:= node() ->
|
|
emqx_alarm:delete_all_deactivated_alarms();
|
|
delete_all_deactivated_alarms(Node) ->
|
|
rpc_call(Node, delete_deactivated_alarms, [Node]).
|
|
|
|
add_duration_field(Alarms) ->
|
|
Now = erlang:system_time(microsecond),
|
|
add_duration_field(Alarms, Now, []).
|
|
|
|
add_duration_field([], _Now, Acc) ->
|
|
Acc;
|
|
add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt} | Rest], Now, Acc) ->
|
|
add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]);
|
|
|
|
add_duration_field( [Alarm = #{ activated := false
|
|
, activate_at := ActivateAt
|
|
, deactivate_at := DeactivateAt} | Rest]
|
|
, Now, Acc) ->
|
|
add_duration_field(Rest, Now, [Alarm#{duration => DeactivateAt - ActivateAt} | Acc]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Banned API
|
|
%%--------------------------------------------------------------------
|
|
|
|
create_banned(Banned) ->
|
|
emqx_banned:create(Banned).
|
|
|
|
delete_banned(Who) ->
|
|
emqx_banned:delete(Who).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Common Table API
|
|
%%--------------------------------------------------------------------
|
|
|
|
item(subscription, {{Topic, ClientId}, Options}) ->
|
|
#{topic => Topic, clientid => ClientId, options => Options};
|
|
|
|
item(route, #route{topic = Topic, dest = Node}) ->
|
|
#{topic => Topic, node => Node};
|
|
item(route, {Topic, Node}) ->
|
|
#{topic => Topic, node => Node}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal Functions.
|
|
%%--------------------------------------------------------------------
|
|
|
|
rpc_call(Node, Fun, Args) ->
|
|
case rpc:call(Node, ?MODULE, Fun, Args) of
|
|
{badrpc, Reason} -> {error, Reason};
|
|
Res -> Res
|
|
end.
|
|
|
|
otp_rel() ->
|
|
lists:concat([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
|
|
|
|
check_row_limit(Tables) ->
|
|
check_row_limit(Tables, max_row_limit()).
|
|
|
|
check_row_limit([], _Limit) ->
|
|
ok;
|
|
check_row_limit([Tab | Tables], Limit) ->
|
|
case table_size(Tab) > Limit of
|
|
true -> false;
|
|
false -> check_row_limit(Tables, Limit)
|
|
end.
|
|
|
|
check_results(Results) ->
|
|
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
|
true -> ok;
|
|
false -> lists:last(Results)
|
|
end.
|
|
|
|
max_row_limit() ->
|
|
?MAX_ROW_LIMIT.
|
|
|
|
table_size(Tab) -> ets:info(Tab, size).
|
|
|