616 lines
19 KiB
Erlang
616 lines
19 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mgmt).
|
|
|
|
-include("emqx_mgmt.hrl").
|
|
|
|
-include_lib("stdlib/include/qlc.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx_mqtt.hrl").
|
|
|
|
%% Nodes and Brokers API
|
|
-export([ list_nodes/0
|
|
, lookup_node/1
|
|
, list_brokers/0
|
|
, lookup_broker/1
|
|
, node_info/1
|
|
, broker_info/1
|
|
]).
|
|
|
|
%% Metrics and Stats
|
|
-export([ get_metrics/0
|
|
, get_metrics/1
|
|
, get_stats/0
|
|
, get_stats/1
|
|
]).
|
|
|
|
%% Clients, Sessions
|
|
-export([ lookup_client/2
|
|
, lookup_client/3
|
|
, kickout_client/1
|
|
, list_acl_cache/1
|
|
, clean_acl_cache/1
|
|
, clean_acl_cache/2
|
|
, clean_acl_cache_all/0
|
|
, clean_acl_cache_all/1
|
|
, set_ratelimit_policy/2
|
|
, set_quota_policy/2
|
|
]).
|
|
|
|
-export([ clean_pem_cache/0
|
|
, clean_pem_cache/1
|
|
]).
|
|
|
|
%% Internal funcs
|
|
-export([call_client/3]).
|
|
|
|
%% Subscriptions
|
|
-export([ list_subscriptions/1
|
|
, list_subscriptions_via_topic/2
|
|
, list_subscriptions_via_topic/3
|
|
, lookup_subscriptions/2
|
|
, lookup_subscriptions/3
|
|
]).
|
|
|
|
%% Routes
|
|
-export([ lookup_routes/1
|
|
]).
|
|
|
|
%% PubSub
|
|
-export([ subscribe/2
|
|
, do_subscribe/2
|
|
, publish/1
|
|
, unsubscribe/2
|
|
, do_unsubscribe/2
|
|
]).
|
|
|
|
%% Plugins
|
|
-export([ list_plugins/0
|
|
, list_plugins/1
|
|
, load_plugin/2
|
|
, unload_plugin/2
|
|
, reload_plugin/2
|
|
]).
|
|
|
|
%% Listeners
|
|
-export([ list_listeners/0
|
|
, list_listeners/1
|
|
, restart_listener/2
|
|
]).
|
|
|
|
%% Alarms
|
|
-export([ get_alarms/1
|
|
, get_alarms/2
|
|
, deactivate/2
|
|
, delete_all_deactivated_alarms/0
|
|
, delete_all_deactivated_alarms/1
|
|
]).
|
|
|
|
%% Banned
|
|
-export([ create_banned/1
|
|
, delete_banned/1
|
|
]).
|
|
|
|
-ifndef(EMQX_ENTERPRISE).
|
|
|
|
-export([ enable_telemetry/0
|
|
, disable_telemetry/0
|
|
, get_telemetry_status/0
|
|
, get_telemetry_data/0
|
|
]).
|
|
|
|
-endif.
|
|
|
|
%% Common Table API
|
|
-export([ item/2
|
|
, max_row_limit/0
|
|
]).
|
|
|
|
-define(MAX_ROW_LIMIT, 10000).
|
|
|
|
-define(APP, emqx_management).
|
|
%%--------------------------------------------------------------------
|
|
%% Node Info
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_nodes() ->
|
|
Running = mnesia:system_info(running_db_nodes),
|
|
Stopped = mnesia:system_info(db_nodes) -- Running,
|
|
DownNodes = lists:map(fun stopped_node_info/1, Stopped),
|
|
[{Node, node_info(Node)} || Node <- Running] ++ DownNodes.
|
|
|
|
lookup_node(Node) -> node_info(Node).
|
|
|
|
node_info(Node) when Node =:= node() ->
|
|
{UsedRatio, Total} = get_sys_memory(),
|
|
Info = maps:from_list([{K, list_to_binary(V)} || {K, V} <- emqx_vm:loads()]),
|
|
BrokerInfo = emqx_sys:info(),
|
|
Info#{node => node(),
|
|
otp_release => iolist_to_binary(otp_rel()),
|
|
memory_total => Total,
|
|
memory_used => erlang:round(Total * UsedRatio),
|
|
process_available => erlang:system_info(process_limit),
|
|
process_used => erlang:system_info(process_count),
|
|
max_fds =>
|
|
proplists:get_value( max_fds
|
|
, lists:usort(lists:flatten(erlang:system_info(check_io)))),
|
|
connections => ets:info(emqx_channel, size),
|
|
node_status => 'Running',
|
|
uptime => iolist_to_binary(proplists:get_value(uptime, BrokerInfo)),
|
|
version => iolist_to_binary(proplists:get_value(version, BrokerInfo))
|
|
};
|
|
node_info(Node) ->
|
|
rpc_call(Node, node_info, [Node]).
|
|
|
|
get_sys_memory() ->
|
|
case os:type() of
|
|
{unix, linux} ->
|
|
load_ctl:get_sys_memory();
|
|
_ ->
|
|
{0, 0}
|
|
end.
|
|
|
|
stopped_node_info(Node) ->
|
|
#{name => Node, node_status => 'Stopped'}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Brokers
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_brokers() ->
|
|
[{Node, broker_info(Node)} || Node <- ekka_mnesia:running_nodes()].
|
|
|
|
lookup_broker(Node) ->
|
|
broker_info(Node).
|
|
|
|
broker_info(Node) when Node =:= node() ->
|
|
Info = maps:from_list([{K, iolist_to_binary(V)} || {K, V} <- emqx_sys:info()]),
|
|
Info#{node => Node, otp_release => iolist_to_binary(otp_rel()), node_status => 'Running'};
|
|
|
|
broker_info(Node) ->
|
|
rpc_call(Node, broker_info, [Node]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Metrics and Stats
|
|
%%--------------------------------------------------------------------
|
|
|
|
get_metrics() ->
|
|
[{Node, get_metrics(Node)} || Node <- ekka_mnesia:running_nodes()].
|
|
|
|
get_metrics(Node) when Node =:= node() ->
|
|
emqx_metrics:all();
|
|
get_metrics(Node) ->
|
|
rpc_call(Node, get_metrics, [Node]).
|
|
|
|
get_stats() ->
|
|
[{Node, get_stats(Node)} || Node <- ekka_mnesia:running_nodes()].
|
|
|
|
get_stats(Node) when Node =:= node() ->
|
|
emqx_stats:getstats();
|
|
get_stats(Node) ->
|
|
rpc_call(Node, get_stats, [Node]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Clients
|
|
%%--------------------------------------------------------------------
|
|
|
|
lookup_client({clientid, ClientId}, FormatFun) ->
|
|
lists:append([lookup_client(Node, {clientid, ClientId}, FormatFun)
|
|
|| Node <- ekka_mnesia:running_nodes()]);
|
|
|
|
lookup_client({username, Username}, FormatFun) ->
|
|
lists:append([lookup_client(Node, {username, Username}, FormatFun)
|
|
|| Node <- ekka_mnesia:running_nodes()]).
|
|
|
|
lookup_client(Node, {clientid, ClientId}, {M,F}) when Node =:= node() ->
|
|
lists:append(lists:map(
|
|
fun(Key) ->
|
|
lists:map(fun M:F/1, ets:lookup(emqx_channel_info, Key))
|
|
end, ets:lookup(emqx_channel, ClientId)));
|
|
|
|
lookup_client(Node, {clientid, ClientId}, FormatFun) ->
|
|
rpc_call(Node, lookup_client, [Node, {clientid, ClientId}, FormatFun]);
|
|
|
|
lookup_client(Node, {username, Username}, {M,F}) when Node =:= node() ->
|
|
MatchSpec = [{ {'_', #{clientinfo => #{username => '$1'}}, '_'}
|
|
, [{'=:=','$1', Username}]
|
|
, ['$_']
|
|
}],
|
|
lists:map(fun M:F/1, ets:select(emqx_channel_info, MatchSpec));
|
|
|
|
lookup_client(Node, {username, Username}, FormatFun) ->
|
|
rpc_call(Node, lookup_client, [Node, {username, Username}, FormatFun]).
|
|
|
|
kickout_client(ClientId) ->
|
|
Results = [kickout_client(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
|
|
check_every_ok(Results).
|
|
|
|
kickout_client(Node, ClientId) when Node =:= node() ->
|
|
emqx_cm:kick_session(ClientId);
|
|
|
|
kickout_client(Node, ClientId) ->
|
|
rpc_call(Node, kickout_client, [Node, ClientId]).
|
|
|
|
list_acl_cache(ClientId) ->
|
|
call_client(ClientId, list_acl_cache).
|
|
|
|
clean_acl_cache(ClientId) ->
|
|
Results = [clean_acl_cache(Node, ClientId) || Node <- ekka_mnesia:running_nodes()],
|
|
check_every_ok(Results).
|
|
|
|
clean_acl_cache(Node, ClientId) when Node =:= node() ->
|
|
case emqx_cm:lookup_channels(ClientId) of
|
|
[] ->
|
|
{error, not_found};
|
|
Pids when is_list(Pids) ->
|
|
erlang:send(lists:last(Pids), clean_acl_cache),
|
|
ok
|
|
end;
|
|
clean_acl_cache(Node, ClientId) ->
|
|
rpc_call(Node, clean_acl_cache, [Node, ClientId]).
|
|
|
|
clean_acl_cache_all() ->
|
|
for_nodes(fun clean_acl_cache_all/1).
|
|
|
|
for_nodes(F) ->
|
|
Results = [{Node, F(Node)} || Node <- ekka_mnesia:running_nodes()],
|
|
case lists:filter(fun({_Node, Res}) -> Res =/= ok end, Results) of
|
|
[] -> ok;
|
|
BadNodes -> {error, BadNodes}
|
|
end.
|
|
|
|
clean_acl_cache_all(Node) when Node =:= node() ->
|
|
emqx_acl_cache:drain_cache();
|
|
clean_acl_cache_all(Node) ->
|
|
rpc_call(Node, clean_acl_cache_all, [Node]).
|
|
|
|
set_ratelimit_policy(ClientId, Policy) ->
|
|
call_client(ClientId, {ratelimit, Policy}).
|
|
|
|
set_quota_policy(ClientId, Policy) ->
|
|
call_client(ClientId, {quota, Policy}).
|
|
|
|
clean_pem_cache() ->
|
|
for_nodes(fun clean_pem_cache/1).
|
|
|
|
clean_pem_cache(Node) when Node =:= node() ->
|
|
_ = ssl_pem_cache:clear(),
|
|
ok;
|
|
clean_pem_cache(Node) ->
|
|
rpc_call(Node, ?FUNCTION_NAME, [Node]).
|
|
|
|
%% @private
|
|
call_client(ClientId, Req) ->
|
|
Results = [call_client(Node, ClientId, Req) || Node <- ekka_mnesia:running_nodes()],
|
|
Expected = lists:filter(fun({error, _}) -> false;
|
|
(_) -> true
|
|
end, Results),
|
|
case Expected of
|
|
[] -> {error, not_found};
|
|
[Result | _] -> Result
|
|
end.
|
|
|
|
%% @private
|
|
call_client(Node, ClientId, Req) when Node =:= node() ->
|
|
case emqx_cm:lookup_channels(ClientId) of
|
|
[] -> {error, not_found};
|
|
Pids when is_list(Pids) ->
|
|
Pid = lists:last(Pids),
|
|
case emqx_cm:get_chan_info(ClientId, Pid) of
|
|
#{conninfo := #{conn_mod := ConnMod}} ->
|
|
erlang:apply(ConnMod, call, [Pid, Req]);
|
|
undefined -> {error, not_found}
|
|
end
|
|
end;
|
|
call_client(Node, ClientId, Req) ->
|
|
rpc_call(Node, call_client, [Node, ClientId, Req]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Subscriptions
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_subscriptions(Node) when Node =:= node() ->
|
|
case check_row_limit([mqtt_subproperty]) of
|
|
false -> throw(max_row_limit);
|
|
ok -> [item(subscription, Sub) || Sub <- ets:tab2list(mqtt_subproperty)]
|
|
end;
|
|
|
|
list_subscriptions(Node) ->
|
|
rpc_call(Node, list_subscriptions, [Node]).
|
|
|
|
list_subscriptions_via_topic(Topic, FormatFun) ->
|
|
lists:append([list_subscriptions_via_topic(Node, Topic, FormatFun)
|
|
|| Node <- ekka_mnesia:running_nodes()]).
|
|
|
|
list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
|
|
MatchSpec = [{{{'_', '$1'}, '_'}, [{'=:=','$1', Topic}], ['$_']}],
|
|
erlang:apply(M, F, [ets:select(emqx_suboption, MatchSpec)]);
|
|
|
|
list_subscriptions_via_topic(Node, Topic, FormatFun) ->
|
|
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
|
|
|
|
lookup_subscriptions(ClientId, FormatFun) ->
|
|
lists:append([lookup_subscriptions(Node, ClientId, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
|
|
|
|
lookup_subscriptions(Node, ClientId, {M, F}) when Node =:= node() ->
|
|
Result = case ets:lookup(emqx_subid, ClientId) of
|
|
[] -> [];
|
|
[{_, Pid}] ->
|
|
ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
|
|
end,
|
|
%% format at the called node
|
|
erlang:apply(M, F, [Result]);
|
|
|
|
lookup_subscriptions(Node, ClientId, FormatFun) ->
|
|
rpc_call(Node, lookup_subscriptions, [Node, ClientId, FormatFun]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Routes
|
|
%%--------------------------------------------------------------------
|
|
|
|
lookup_routes(Topic) ->
|
|
emqx_router:lookup_routes(Topic).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% PubSub
|
|
%%--------------------------------------------------------------------
|
|
|
|
subscribe(ClientId, TopicTables) ->
|
|
subscribe(ekka_mnesia:running_nodes(), ClientId, TopicTables).
|
|
|
|
subscribe([Node | Nodes], ClientId, TopicTables) ->
|
|
case rpc_call(Node, do_subscribe, [ClientId, TopicTables]) of
|
|
{error, _} -> subscribe(Nodes, ClientId, TopicTables);
|
|
Re -> Re
|
|
end;
|
|
|
|
subscribe([], _ClientId, _TopicTables) ->
|
|
{error, channel_not_found}.
|
|
|
|
do_subscribe(ClientId, TopicTables) ->
|
|
case ets:lookup(emqx_channel, ClientId) of
|
|
[] -> {error, channel_not_found};
|
|
[{_, Pid}] ->
|
|
Pid ! {subscribe, TopicTables}
|
|
end.
|
|
|
|
%%TODO: ???
|
|
publish(Msg) ->
|
|
emqx_metrics:inc_msg(Msg),
|
|
emqx:publish(Msg).
|
|
|
|
unsubscribe(ClientId, Topic) ->
|
|
unsubscribe(ekka_mnesia:running_nodes(), ClientId, Topic).
|
|
|
|
unsubscribe([Node | Nodes], ClientId, Topic) ->
|
|
case rpc_call(Node, do_unsubscribe, [ClientId, Topic]) of
|
|
{error, _} -> unsubscribe(Nodes, ClientId, Topic);
|
|
Re -> Re
|
|
end;
|
|
|
|
unsubscribe([], _ClientId, _Topic) ->
|
|
{error, channel_not_found}.
|
|
|
|
do_unsubscribe(ClientId, Topic) ->
|
|
case ets:lookup(emqx_channel, ClientId) of
|
|
[] -> {error, channel_not_found};
|
|
[{_, Pid}] ->
|
|
Pid ! {unsubscribe, [emqx_topic:parse(Topic)]}
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Plugins
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_plugins() ->
|
|
[{Node, list_plugins(Node)} || Node <- ekka_mnesia:running_nodes()].
|
|
|
|
list_plugins(Node) when Node =:= node() ->
|
|
emqx_plugins:list();
|
|
list_plugins(Node) ->
|
|
rpc_call(Node, list_plugins, [Node]).
|
|
|
|
load_plugin(Node, Plugin) when Node =:= node() ->
|
|
emqx_plugins:load(Plugin);
|
|
load_plugin(Node, Plugin) ->
|
|
rpc_call(Node, load_plugin, [Node, Plugin]).
|
|
|
|
unload_plugin(Node, Plugin) when Node =:= node() ->
|
|
emqx_plugins:unload(Plugin);
|
|
unload_plugin(Node, Plugin) ->
|
|
rpc_call(Node, unload_plugin, [Node, Plugin]).
|
|
|
|
reload_plugin(Node, Plugin) when Node =:= node() ->
|
|
emqx_plugins:reload(Plugin);
|
|
reload_plugin(Node, Plugin) ->
|
|
rpc_call(Node, reload_plugin, [Node, Plugin]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Listeners
|
|
%%--------------------------------------------------------------------
|
|
|
|
list_listeners() ->
|
|
[{Node, list_listeners(Node)} || Node <- ekka_mnesia:running_nodes()].
|
|
|
|
list_listeners(Node) when Node =:= node() ->
|
|
Tcp = lists:map(fun({{Protocol, ListenOn}, _Pid}) ->
|
|
#{protocol => Protocol,
|
|
listen_on => ListenOn,
|
|
identifier => emqx_listeners:find_id_by_listen_on(ListenOn),
|
|
acceptors => esockd:get_acceptors({Protocol, ListenOn}),
|
|
max_conns => esockd:get_max_connections({Protocol, ListenOn}),
|
|
current_conns => esockd:get_current_connections({Protocol, ListenOn}),
|
|
shutdown_count => esockd:get_shutdown_count({Protocol, ListenOn})}
|
|
end, esockd:listeners()),
|
|
Http = lists:map(fun({Protocol, Opts}) ->
|
|
#{protocol => Protocol,
|
|
listen_on => format_http_bind(Opts),
|
|
acceptors => maps:get( num_acceptors
|
|
, proplists:get_value(transport_options, Opts, #{}), 0),
|
|
max_conns => proplists:get_value(max_connections, Opts),
|
|
current_conns => proplists:get_value(all_connections, Opts),
|
|
shutdown_count => []}
|
|
end, ranch:info()),
|
|
Tcp ++ Http;
|
|
|
|
list_listeners(Node) ->
|
|
rpc_call(Node, list_listeners, [Node]).
|
|
|
|
restart_listener(Node, Identifier) when Node =:= node() ->
|
|
emqx_listeners:restart_listener(Identifier);
|
|
|
|
restart_listener(Node, Identifier) ->
|
|
rpc_call(Node, restart_listener, [Node, Identifier]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Get Alarms
|
|
%%--------------------------------------------------------------------
|
|
|
|
get_alarms(Type) ->
|
|
[{Node, get_alarms(Node, Type)} || Node <- ekka_mnesia:running_nodes()].
|
|
|
|
get_alarms(Node, Type) when Node =:= node() ->
|
|
add_duration_field(emqx_alarm:get_alarms(Type));
|
|
get_alarms(Node, Type) ->
|
|
rpc_call(Node, get_alarms, [Node, Type]).
|
|
|
|
deactivate(Node, Name) when Node =:= node() ->
|
|
emqx_alarm:deactivate(Name);
|
|
deactivate(Node, Name) ->
|
|
rpc_call(Node, deactivate, [Node, Name]).
|
|
|
|
delete_all_deactivated_alarms() ->
|
|
[delete_all_deactivated_alarms(Node) || Node <- ekka_mnesia:running_nodes()].
|
|
|
|
delete_all_deactivated_alarms(Node) when Node =:= node() ->
|
|
emqx_alarm:delete_all_deactivated_alarms();
|
|
delete_all_deactivated_alarms(Node) ->
|
|
rpc_call(Node, emqx_alarm, delete_all_deactivated_alarms, []).
|
|
|
|
add_duration_field(Alarms) ->
|
|
Now = erlang:system_time(microsecond),
|
|
add_duration_field(Alarms, Now, []).
|
|
|
|
add_duration_field([], _Now, Acc) ->
|
|
Acc;
|
|
add_duration_field([Alarm = #{activated := true, activate_at := ActivateAt} | Rest], Now, Acc) ->
|
|
add_duration_field(Rest, Now, [Alarm#{duration => Now - ActivateAt} | Acc]);
|
|
add_duration_field([Alarm = #{ activated := false
|
|
, activate_at := ActivateAt
|
|
, deactivate_at := DeactivateAt}
|
|
| Rest], Now, Acc) ->
|
|
add_duration_field(Rest, Now, [Alarm#{duration => DeactivateAt - ActivateAt} | Acc]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Banned API
|
|
%%--------------------------------------------------------------------
|
|
|
|
create_banned(Banned) ->
|
|
emqx_banned:create(Banned).
|
|
|
|
delete_banned(Who) ->
|
|
emqx_banned:delete(Who).
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Telemetry API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-ifndef(EMQX_ENTERPRISE).
|
|
|
|
enable_telemetry() ->
|
|
lists:foreach(fun enable_telemetry/1,ekka_mnesia:running_nodes()).
|
|
|
|
enable_telemetry(Node) when Node =:= node() ->
|
|
emqx_telemetry:enable();
|
|
enable_telemetry(Node) ->
|
|
rpc_call(Node, enable_telemetry, [Node]).
|
|
|
|
disable_telemetry() ->
|
|
lists:foreach(fun disable_telemetry/1,ekka_mnesia:running_nodes()).
|
|
|
|
disable_telemetry(Node) when Node =:= node() ->
|
|
emqx_telemetry:disable();
|
|
disable_telemetry(Node) ->
|
|
rpc_call(Node, disable_telemetry, [Node]).
|
|
|
|
get_telemetry_status() ->
|
|
[{enabled, emqx_telemetry:is_enabled()}].
|
|
|
|
get_telemetry_data() ->
|
|
emqx_telemetry:get_telemetry().
|
|
|
|
-endif.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Common Table API
|
|
%%--------------------------------------------------------------------
|
|
|
|
item(subscription, {{Topic, ClientId}, Options}) ->
|
|
#{topic => Topic, clientid => ClientId, options => Options};
|
|
|
|
item(route, #route{topic = Topic, dest = Node}) ->
|
|
#{topic => Topic, node => Node};
|
|
item(route, {Topic, Node}) ->
|
|
#{topic => Topic, node => Node}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal Functions.
|
|
%%--------------------------------------------------------------------
|
|
|
|
rpc_call(Node, Fun, Args) ->
|
|
rpc_call(Node, ?MODULE, Fun, Args).
|
|
|
|
rpc_call(Node, Mod, Fun, Args) ->
|
|
case rpc:call(Node, Mod, Fun, Args) of
|
|
{badrpc, Reason} -> {error, Reason};
|
|
Res -> Res
|
|
end.
|
|
|
|
otp_rel() ->
|
|
lists:concat([emqx_vm:get_otp_version(), "/", erlang:system_info(version)]).
|
|
|
|
check_row_limit(Tables) ->
|
|
check_row_limit(Tables, max_row_limit()).
|
|
|
|
check_row_limit([], _Limit) ->
|
|
ok;
|
|
check_row_limit([Tab | Tables], Limit) ->
|
|
case table_size(Tab) > Limit of
|
|
true -> false;
|
|
false -> check_row_limit(Tables, Limit)
|
|
end.
|
|
|
|
check_every_ok(Results) ->
|
|
case lists:any(fun(Item) -> Item =:= ok end, Results) of
|
|
true -> ok;
|
|
false -> lists:last(Results)
|
|
end.
|
|
|
|
max_row_limit() ->
|
|
application:get_env(?APP, max_row_limit, ?MAX_ROW_LIMIT).
|
|
|
|
table_size(Tab) -> ets:info(Tab, size).
|
|
|
|
format_http_bind(Opts) ->
|
|
Port = proplists:get_value(port, Opts),
|
|
case proplists:get_value(ip, Opts) of
|
|
undefined -> Port;
|
|
IP -> {IP, Port}
|
|
end.
|