diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 71b89f153..d9356ccd7 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -131,6 +131,20 @@ info(zone, #channel{clientinfo = #{zone := Zone}}) -> Zone; info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; +info(username, #channel{clientinfo = #{username := Username}}) -> + Username; +info(socktype, #channel{conninfo = #{socktype := SockType}}) -> + SockType; +info(peername, #channel{conninfo = #{peername := Peername}}) -> + Peername; +info(sockname, #channel{conninfo = #{sockname := Sockname}}) -> + Sockname; +info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) -> + ProtoName; +info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) -> + ProtoVer; +info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) -> + ConnectedAt; info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 0714d1798..f5635a147 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -104,7 +104,16 @@ -define(ENABLED(X), (X =/= undefined)). -define(ALARM_TCP_CONGEST(Channel), - list_to_binary(io_lib:format("mqtt_conn/congested/~s", [emqx_channel:info(clientid, Channel)]))). + list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s", + [emqx_channel:info(clientid, Channel), + emqx_channel:info(username, Channel)]))). + +-define(ALARM_CONN_INFO_KEYS, [ + socktype, sockname, peername, + clientid, username, proto_name, proto_ver, connected_at +]). +-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). +-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [ init/4 @@ -616,10 +625,9 @@ maybe_warn_congestion(Socket, Transport, Channel) -> IsCongestAlarmSet = is_congestion_alarm_set(), case is_congested(Socket, Transport) of true when not IsCongestAlarmSet -> - {ok, Stat} = Transport:getstat(Socket, [recv_cnt, recv_oct, send_cnt, send_oct]), - {ok, Opts} = Transport:getopts(Socket, [high_watermark,high_msgq_watermark, sndbuf, recbuf, buffer]), ok = set_congestion_alarm(), - emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), maps:from_list(Stat++Opts)); + emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), + tcp_congestion_alarm_details(Socket, Transport, Channel)); false when IsCongestAlarmSet -> ok = clear_congestion_alarm(), emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)); @@ -642,6 +650,19 @@ set_congestion_alarm() -> clear_congestion_alarm() -> erlang:put(conn_congested, false), ok. +tcp_congestion_alarm_details(Socket, Transport, Channel) -> + {ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS), + {ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS), + SockInfo = maps:from_list(Stat ++ Opts), + ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]), + maps:merge(ConnInfo, SockInfo). + +conn_info(Key, Channel) when Key =:= sockname; Key =:= peername -> + {IPStr, Port} = emqx_channel:info(Key, Channel), + {Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])}; +conn_info(Key, Channel) -> + {Key, emqx_channel:info(Key, Channel)}. + %%-------------------------------------------------------------------- %% Handle Info