118 lines
3.8 KiB
Erlang
118 lines
3.8 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_license_resources).
|
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-define(CHECK_INTERVAL, 5000).
|
|
|
|
-export([start_link/0,
|
|
start_link/1,
|
|
local_connection_count/0,
|
|
connection_count/0]).
|
|
|
|
%% gen_server callbacks
|
|
-export([init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3]).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% API
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec start_link() -> {ok, pid()}.
|
|
start_link() ->
|
|
start_link(?CHECK_INTERVAL).
|
|
|
|
-spec start_link(timeout()) -> {ok, pid()}.
|
|
start_link(CheckInterval) when is_integer(CheckInterval) ->
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [CheckInterval], []).
|
|
|
|
-spec local_connection_count() -> non_neg_integer().
|
|
local_connection_count() ->
|
|
emqx_cm:get_connected_client_count().
|
|
|
|
-spec connection_count() -> non_neg_integer().
|
|
connection_count() ->
|
|
local_connection_count() + cached_remote_connection_count().
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%------------------------------------------------------------------------------
|
|
|
|
init([CheckInterval]) ->
|
|
_ = ets:new(?MODULE, [set, protected, named_table]),
|
|
State = ensure_timer(#{check_peer_interval => CheckInterval}),
|
|
{ok, State}.
|
|
|
|
handle_call(_Req, _From, State) ->
|
|
{noreply, State}.
|
|
|
|
handle_cast(_Msg, State) ->
|
|
{noreply, State}.
|
|
|
|
handle_info(update_resources, State) ->
|
|
true = update_resources(),
|
|
connection_quota_early_alarm(),
|
|
?tp(debug, emqx_license_resources_updated, #{}),
|
|
{noreply, ensure_timer(State)}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Private functions
|
|
%%------------------------------------------------------------------------------
|
|
connection_quota_early_alarm() ->
|
|
connection_quota_early_alarm(emqx_license_checker:limits()).
|
|
|
|
connection_quota_early_alarm({ok, #{max_connections := Max}}) when is_integer(Max) ->
|
|
Count = connection_count(),
|
|
Low = emqx_conf:get([license, connection_low_watermark], 0.75),
|
|
High = emqx_conf:get([license, connection_high_watermark], 0.80),
|
|
if
|
|
Count > Max * High ->
|
|
HighPercent = float_to_binary(High * 100, [{decimals, 0}]),
|
|
Message = iolist_to_binary(["License: live connection number exceeds ", HighPercent, "%"]),
|
|
catch emqx_alarm:activate(license_quota, #{high_watermark => HighPercent}, Message);
|
|
Count < Max * Low ->
|
|
catch emqx_alarm:deactivate(license_quota);
|
|
true ->
|
|
ok
|
|
end;
|
|
connection_quota_early_alarm(_Limits) -> ok.
|
|
|
|
cached_remote_connection_count() ->
|
|
try ets:lookup(?MODULE, remote_connection_count) of
|
|
[{remote_connection_count, N}] -> N;
|
|
_ -> 0
|
|
catch
|
|
error:badarg -> 0
|
|
end.
|
|
|
|
update_resources() ->
|
|
ets:insert(?MODULE, {remote_connection_count, remote_connection_count()}).
|
|
|
|
ensure_timer(#{check_peer_interval := CheckInterval} = State) ->
|
|
_ = case State of
|
|
#{timer := Timer} -> erlang:cancel_timer(Timer);
|
|
_ -> ok
|
|
end,
|
|
State#{timer => erlang:send_after(CheckInterval, self(), update_resources)}.
|
|
|
|
remote_connection_count() ->
|
|
Nodes = mria_mnesia:running_nodes() -- [node()],
|
|
Results = emqx_license_proto_v1:remote_connection_counts(Nodes),
|
|
Counts = [Count || {ok, Count} <- Results],
|
|
lists:sum(Counts).
|