Merge branch 'stable/e4.2.0' of https://github.com/emqx/emqx into stable/e4.2.0

This commit is contained in:
turtled 2020-11-13 10:40:28 +08:00
commit 41a8f2f811
3 changed files with 40 additions and 5 deletions

View File

@ -2,14 +2,16 @@
[
{"4.2.0", [
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, [emqx_channel]},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
]}
],
[
{"4.2.0", [
{load_module, emqx_alarm, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, []},
{load_module, emqx_connection, brutal_purge, soft_purge, [emqx_channel]},
{load_module, emqx_channel, brutal_purge, soft_purge, []},
{load_module, emqx_metrics, brutal_purge, soft_purge, []}
]}
]

View File

@ -131,6 +131,20 @@ info(zone, #channel{clientinfo = #{zone := Zone}}) ->
Zone;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(username, #channel{clientinfo = #{username := Username}}) ->
Username;
info(socktype, #channel{conninfo = #{socktype := SockType}}) ->
SockType;
info(peername, #channel{conninfo = #{peername := Peername}}) ->
Peername;
info(sockname, #channel{conninfo = #{sockname := Sockname}}) ->
Sockname;
info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) ->
ProtoName;
info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) ->
ProtoVer;
info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) ->
ConnectedAt;
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->

View File

@ -107,6 +107,13 @@
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s", [emqx_channel:info(clientid, Channel), maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>)]))).
-define(ALARM_CONN_INFO_KEYS, [
socktype, sockname, peername,
clientid, username, proto_name, proto_ver, connected_at
]).
-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]).
-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]).
-dialyzer({no_match, [info/2]}).
-dialyzer({nowarn_function, [ init/4
, init_state/3
@ -616,10 +623,9 @@ maybe_warn_congestion(Socket, Transport, Channel) ->
IsCongestAlarmSet = is_congestion_alarm_set(),
case is_congested(Socket, Transport) of
true when not IsCongestAlarmSet ->
{ok, Stat} = Transport:getstat(Socket, [recv_cnt, recv_oct, send_cnt, send_oct]),
{ok, Opts} = Transport:getopts(Socket, [high_watermark,high_msgq_watermark, sndbuf, recbuf, buffer]),
ok = set_congestion_alarm(),
emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), maps:from_list(Stat++Opts));
emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel),
tcp_congestion_alarm_details(Socket, Transport, Channel));
false when IsCongestAlarmSet ->
ok = clear_congestion_alarm(),
emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel));
@ -642,6 +648,19 @@ set_congestion_alarm() ->
clear_congestion_alarm() ->
erlang:put(conn_congested, false), ok.
tcp_congestion_alarm_details(Socket, Transport, Channel) ->
{ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS),
{ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS),
SockInfo = maps:from_list(Stat ++ Opts),
ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]),
maps:merge(ConnInfo, SockInfo).
conn_info(Key, Channel) when Key =:= sockname; Key =:= peername ->
{IPStr, Port} = emqx_channel:info(Key, Channel),
{Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])};
conn_info(Key, Channel) ->
{Key, emqx_channel:info(Key, Channel)}.
%%--------------------------------------------------------------------
%% Handle Info