146 lines
5.6 KiB
Erlang
146 lines
5.6 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_congestion).
|
|
|
|
-export([ maybe_alarm_conn_congestion/3
|
|
, cancel_alarms/3
|
|
]).
|
|
|
|
-define(ALARM_CONN_CONGEST(Channel, Reason),
|
|
list_to_binary(
|
|
io_lib:format("~s/~s/~s",
|
|
[Reason, emqx_channel:info(clientid, Channel),
|
|
maps:get(username, emqx_channel:info(clientinfo, Channel),
|
|
<<"unknown_user">>)]))).
|
|
|
|
-define(ALARM_CONN_INFO_KEYS, [socktype, sockname, peername, clientid, username,
|
|
proto_name, proto_ver, connected_at, conn_state]).
|
|
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
|
|
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
|
|
-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]).
|
|
-define(ALARM_SENT(REASON), {alarm_sent, REASON}).
|
|
-define(ALL_ALARM_REASONS, [conn_congestion]).
|
|
-define(WONT_CLEAR_IN, 60000).
|
|
|
|
maybe_alarm_conn_congestion(Socket, Transport, Channel) ->
|
|
case is_alarm_enabled(Channel) of
|
|
false -> ok;
|
|
true ->
|
|
case is_tcp_congested(Socket, Transport) of
|
|
true -> alarm_congestion(Socket, Transport, Channel, conn_congestion);
|
|
false -> cancel_alarm_congestion(Socket, Transport, Channel, conn_congestion)
|
|
end
|
|
end.
|
|
|
|
cancel_alarms(Socket, Transport, Channel) ->
|
|
lists:foreach(fun(Reason) ->
|
|
case has_alarm_sent(Reason) of
|
|
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
|
|
false -> ok
|
|
end
|
|
end, ?ALL_ALARM_REASONS).
|
|
|
|
is_alarm_enabled(Channel) ->
|
|
emqx_zone:get_env(emqx_channel:info(zone, Channel),
|
|
conn_congestion_alarm_enabled, false).
|
|
|
|
alarm_congestion(Socket, Transport, Channel, Reason) ->
|
|
case has_alarm_sent(Reason) of
|
|
false -> do_alarm_congestion(Socket, Transport, Channel, Reason);
|
|
true ->
|
|
%% pretend we have sent an alarm again
|
|
update_alarm_sent_at(Reason)
|
|
end.
|
|
|
|
cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
|
Zone = emqx_channel:info(zone, Channel),
|
|
WontClearIn = emqx_zone:get_env(Zone, conn_congestion_min_alarm_sustain_duration,
|
|
?WONT_CLEAR_IN),
|
|
case has_alarm_sent(Reason) andalso long_time_since_last_alarm(Reason, WontClearIn) of
|
|
true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason);
|
|
false -> ok
|
|
end.
|
|
|
|
do_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
|
ok = update_alarm_sent_at(Reason),
|
|
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
|
emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails),
|
|
ok.
|
|
|
|
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
|
ok = remove_alarm_sent_at(Reason),
|
|
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
|
emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails),
|
|
ok.
|
|
|
|
is_tcp_congested(Socket, Transport) ->
|
|
case Transport:getstat(Socket, [send_pend]) of
|
|
{ok, [{send_pend, N}]} when N > 0 -> true;
|
|
_ -> false
|
|
end.
|
|
|
|
has_alarm_sent(Reason) ->
|
|
case get_alarm_sent_at(Reason) of
|
|
0 -> false;
|
|
_ -> true
|
|
end.
|
|
update_alarm_sent_at(Reason) ->
|
|
erlang:put(?ALARM_SENT(Reason), timenow()),
|
|
ok.
|
|
remove_alarm_sent_at(Reason) ->
|
|
erlang:erase(?ALARM_SENT(Reason)),
|
|
ok.
|
|
get_alarm_sent_at(Reason) ->
|
|
case erlang:get(?ALARM_SENT(Reason)) of
|
|
undefined -> 0;
|
|
LastSentAt -> LastSentAt
|
|
end.
|
|
long_time_since_last_alarm(Reason, WontClearIn) ->
|
|
%% only sent clears when the alarm was not triggered in the last
|
|
%% WontClearIn time
|
|
case timenow() - get_alarm_sent_at(Reason) of
|
|
Elapse when Elapse >= WontClearIn -> true;
|
|
_ -> false
|
|
end.
|
|
|
|
timenow() ->
|
|
erlang:system_time(millisecond).
|
|
|
|
%%==============================================================================
|
|
%% Alarm message
|
|
%%==============================================================================
|
|
tcp_congestion_alarm_details(Socket, Transport, Channel) ->
|
|
ProcInfo = process_info(self(), ?PROC_INFO_KEYS),
|
|
BasicInfo = [{pid, list_to_binary(pid_to_list(self()))} | ProcInfo],
|
|
Stat = case Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS) of
|
|
{ok, Stat0} -> Stat0;
|
|
{error, _} -> []
|
|
end,
|
|
Opts = case Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS) of
|
|
{ok, Opts0} -> Opts0;
|
|
{error, _} -> []
|
|
end,
|
|
SockInfo = Stat ++ Opts,
|
|
ConnInfo = [conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS],
|
|
maps:from_list(BasicInfo ++ ConnInfo ++ SockInfo).
|
|
|
|
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
|
|
{IPStr, Port} = emqx_channel:info(Key, Channel),
|
|
{Key, iolist_to_binary([inet:ntoa(IPStr), ":", integer_to_list(Port)])};
|
|
conn_info(Key, Channel) ->
|
|
{Key, emqx_channel:info(Key, Channel)}.
|