From 6c1129dc6afcc0bca2c9cbf984421a474300ac93 Mon Sep 17 00:00:00 2001 From: emqx-ci-robot <77955145+emqx-ci-robot@users.noreply.github.com> Date: Sat, 30 Jan 2021 10:30:18 +0800 Subject: [PATCH] Auto-pull-request-on-2021-01-29 (#4114) * fix(share sub): fix the issue that the number of subscriptions dropped to 0 during the picking subscriber and caused a crash * Connection Busy Alarms (#3992) feat(emqx_connection): improve port_busy alarm * fix(emqx_connection): tune the congestion alarm params * chore(deps): upgrade esockd to 5.7.5 * fix(appup): correct the appup file --- rebar.config | 2 +- src/emqx.appup.src | 38 +++++++- src/emqx_alarm.erl | 96 ++++++++++++-------- src/emqx_channel.erl | 44 +++++---- src/emqx_congestion.erl | 161 +++++++++++++++++++++++++++++++++ src/emqx_connection.erl | 81 ++++------------- src/emqx_os_mon.erl | 4 +- src/emqx_shared_sub.erl | 2 +- test/emqx_connection_SUITE.erl | 4 + 9 files changed, 303 insertions(+), 129 deletions(-) create mode 100644 src/emqx_congestion.erl diff --git a/rebar.config b/rebar.config index a7ae1409f..2f5e362c8 100644 --- a/rebar.config +++ b/rebar.config @@ -6,7 +6,7 @@ [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} diff --git a/src/emqx.appup.src b/src/emqx.appup.src index ed4b0695f..7322f75f5 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,7 @@ {VSN, [ {"4.2.0", [ + {add_module, emqx_congestion}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, @@ -11,37 +12,68 @@ {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, {update, emqx_ws_connection, {advanced, []}}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {resume, [esockd_acceptor,emqx_connection, emqx_ws_connection]} ]}, {"4.2.1", [ + {add_module, emqx_congestion}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, {update, emqx_ws_connection, {advanced, []}}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, {resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]} ]}, + {<<"4.2.[23]">>, [ + {add_module, emqx_congestion}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_shared_sub, brutal_purge, soft_purge, []} + ]}, {<<".*">>, []} ], [ {"4.2.0", [ + {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_alarm, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_metrics, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, {update, emqx_ws_connection, {advanced, []}}, - {resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]} + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, + {delete_module, emqx_congestion} ]}, {"4.2.1", [ + {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, {load_module, emqx_limiter, brutal_purge, soft_purge, []}, {suspend, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, {load_module, emqx_frame, brutal_purge, soft_purge, []}, {update, emqx_connection, {advanced, []}}, {update, emqx_ws_connection, {advanced, []}}, - {resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]} + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {resume, [esockd_acceptor, emqx_connection, emqx_ws_connection]}, + {delete_module, emqx_congestion} + ]}, + {<<"4.2.[23]">>, [ + {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, + {load_module, emqx_os_mon, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_alarm, brutal_purge, soft_purge, []}, + {delete_module, emqx_congestion} ]}, {<<".*">>, []} ] diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index ec2cb4338..12c888e6c 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -37,6 +37,7 @@ -export([ activate/1 , activate/2 , deactivate/1 + , deactivate/2 , delete_all_deactivated_alarms/0 , get_alarms/0 , get_alarms/1 @@ -132,7 +133,10 @@ activate(Name, Details) -> gen_server:call(?MODULE, {activate_alarm, Name, Details}). deactivate(Name) -> - gen_server:call(?MODULE, {deactivate_alarm, Name}). + gen_server:call(?MODULE, {deactivate_alarm, Name, no_details}). + +deactivate(Name, Details) -> + gen_server:call(?MODULE, {deactivate_alarm, Name, Details}). delete_all_deactivated_alarms() -> gen_server:call(?MODULE, delete_all_deactivated_alarms). @@ -179,34 +183,13 @@ handle_call({activate_alarm, Name, Details}, _From, State = #state{actions = Act {reply, ok, State} end; -handle_call({deactivate_alarm, Name}, _From, State = #state{actions = Actions, - size_limit = SizeLimit}) -> +handle_call({deactivate_alarm, Name, Details}, _From, State = #state{ + actions = Actions, size_limit = SizeLimit}) -> case mnesia:dirty_read(?ACTIVATED_ALARM, Name) of [] -> {reply, {error, not_found}, State}; - [#activated_alarm{name = Name, - details = Details, - message = Message, - activate_at = ActivateAt}] -> - case SizeLimit > 0 andalso (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of - true -> - case mnesia:dirty_first(?DEACTIVATED_ALARM) of - '$end_of_table' -> - ok; - ActivateAt2 -> - mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) - end; - false -> - ok - end, - Alarm = #deactivated_alarm{activate_at = ActivateAt, - name = Name, - details = Details, - message = Message, - deactivate_at = erlang:system_time(microsecond)}, - mnesia:dirty_delete(?ACTIVATED_ALARM, Name), - mnesia:dirty_write(?DEACTIVATED_ALARM, Alarm), - do_actions(deactivate, Alarm, Actions), + [Alarm] -> + deactivate_alarm(Details, SizeLimit, Actions, Alarm), {reply, ok, State} end; @@ -254,18 +237,50 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ +deactivate_alarm(Details, SizeLimit, Actions, #activated_alarm{ + activate_at = ActivateAt, name = Name, details = Details0, + message = Msg0}) -> + case SizeLimit > 0 andalso + (mnesia:table_info(?DEACTIVATED_ALARM, size) >= SizeLimit) of + true -> + case mnesia:dirty_first(?DEACTIVATED_ALARM) of + '$end_of_table' -> ok; + ActivateAt2 -> + mnesia:dirty_delete(?DEACTIVATED_ALARM, ActivateAt2) + end; + false -> ok + end, + HistoryAlarm = make_deactivated_alarm(ActivateAt, Name, Details0, Msg0, + erlang:system_time(microsecond)), + DeActAlarm = make_deactivated_alarm(ActivateAt, Name, Details, + normalize_message(Name, Details), + erlang:system_time(microsecond)), + mnesia:dirty_write(?DEACTIVATED_ALARM, HistoryAlarm), + mnesia:dirty_delete(?ACTIVATED_ALARM, Name), + do_actions(deactivate, DeActAlarm, Actions). + +make_deactivated_alarm(ActivateAt, Name, Details, Message, DeActivateAt) -> + #deactivated_alarm{ + activate_at = ActivateAt, + name = Name, + details = Details, + message = Message, + deactivate_at = DeActivateAt}. + deactivate_all_alarms() -> - lists:foreach(fun(#activated_alarm{name = Name, - details = Details, - message = Message, - activate_at = ActivateAt}) -> - mnesia:dirty_write(?DEACTIVATED_ALARM, - #deactivated_alarm{activate_at = ActivateAt, - name = Name, - details = Details, - message = Message, - deactivate_at = erlang:system_time(microsecond)}) - end, ets:tab2list(?ACTIVATED_ALARM)), + lists:foreach( + fun(#activated_alarm{name = Name, + details = Details, + message = Message, + activate_at = ActivateAt}) -> + mnesia:dirty_write(?DEACTIVATED_ALARM, + #deactivated_alarm{ + activate_at = ActivateAt, + name = Name, + details = Details, + message = Message, + deactivate_at = erlang:system_time(microsecond)}) + end, ets:tab2list(?ACTIVATED_ALARM)), mnesia:clear_table(?ACTIVATED_ALARM). ensure_delete_timer(State = #state{validity_period = ValidityPeriod}) -> @@ -332,6 +347,8 @@ normalize(#deactivated_alarm{activate_at = ActivateAt, deactivate_at => DeactivateAt, activated => false}. +normalize_message(Name, no_details) -> + list_to_binary(io_lib:format("~p", [Name])); normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) -> list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark])); normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) -> @@ -344,8 +361,7 @@ normalize_message(partition, #{occurred := Node}) -> list_to_binary(io_lib:format("Partition occurs at node ~s", [Node])); normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) -> list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID])); -normalize_message(<<"mqtt_conn/congested/", ClientId/binary>>, _) -> - list_to_binary(io_lib:format("MQTT connection for clientid '~s' is congested", [ClientId])); +normalize_message(<<"mqtt_conn/congested/", Info/binary>>, _) -> + list_to_binary(io_lib:format("MQTT connection congested: ~s", [Info])); normalize_message(_Name, _UnknownDetails) -> <<"Unknown alarm">>. - diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index d9356ccd7..f4c71d5d2 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -31,6 +31,7 @@ -export([ info/1 , info/2 + , set_conn_state/2 , stats/1 , caps/1 ]). @@ -87,7 +88,7 @@ pendings :: list() }). --opaque(channel() :: #channel{}). +-type(channel() :: #channel{}). -type(conn_state() :: idle | connecting | connected | disconnected). @@ -127,26 +128,26 @@ info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; info(conninfo, #channel{conninfo = ConnInfo}) -> ConnInfo; -info(zone, #channel{clientinfo = #{zone := Zone}}) -> - Zone; -info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> - ClientId; -info(username, #channel{clientinfo = #{username := Username}}) -> - Username; -info(socktype, #channel{conninfo = #{socktype := SockType}}) -> - SockType; -info(peername, #channel{conninfo = #{peername := Peername}}) -> - Peername; -info(sockname, #channel{conninfo = #{sockname := Sockname}}) -> - Sockname; -info(proto_name, #channel{conninfo = #{proto_name := ProtoName}}) -> - ProtoName; -info(proto_ver, #channel{conninfo = #{proto_ver := ProtoVer}}) -> - ProtoVer; -info(connected_at, #channel{conninfo = #{connected_at := ConnectedAt}}) -> - ConnectedAt; +info(socktype, #channel{conninfo = ConnInfo}) -> + maps:get(socktype, ConnInfo, undefined); +info(peername, #channel{conninfo = ConnInfo}) -> + maps:get(peername, ConnInfo, undefined); +info(sockname, #channel{conninfo = ConnInfo}) -> + maps:get(sockname, ConnInfo, undefined); +info(proto_name, #channel{conninfo = ConnInfo}) -> + maps:get(proto_name, ConnInfo, undefined); +info(proto_ver, #channel{conninfo = ConnInfo}) -> + maps:get(proto_ver, ConnInfo, undefined); +info(connected_at, #channel{conninfo = ConnInfo}) -> + maps:get(connected_at, ConnInfo, undefined); info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; +info(zone, #channel{clientinfo = ClientInfo}) -> + maps:get(zone, ClientInfo, undefined); +info(clientid, #channel{clientinfo = ClientInfo}) -> + maps:get(clientid, ClientInfo, undefined); +info(username, #channel{clientinfo = ClientInfo}) -> + maps:get(username, ClientInfo, undefined); info(session, #channel{session = Session}) -> maybe_apply(fun emqx_session:info/1, Session); info(conn_state, #channel{conn_state = ConnState}) -> @@ -163,6 +164,9 @@ info(alias_maximum, #channel{alias_maximum = Limits}) -> Limits; info(timers, #channel{timers = Timers}) -> Timers. +set_conn_state(ConnState, Channel) -> + Channel#channel{conn_state = ConnState}. + %% TODO: Add more stats. -spec(stats(channel()) -> emqx_types:stats()). stats(#channel{session = Session})-> @@ -1290,7 +1294,7 @@ packing_alias(Packet = #mqtt_packet{ }, Channel = ?IS_MQTT_V5 = #channel{topic_aliases = TopicAliases, alias_maximum = Limits}) -> case find_alias(outbound, Topic, TopicAliases) of - {ok, AliasId} -> + {ok, AliasId} -> NPublish = Publish#mqtt_packet_publish{ topic_name = <<>>, properties = maps:merge(Prop, #{'Topic-Alias' => AliasId}) diff --git a/src/emqx_congestion.erl b/src/emqx_congestion.erl new file mode 100644 index 000000000..36dc7ac9f --- /dev/null +++ b/src/emqx_congestion.erl @@ -0,0 +1,161 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_congestion). + +-export([ maybe_alarm_port_busy/3 + , maybe_alarm_port_busy/4 + , maybe_alarm_too_many_publish/5 + , maybe_alarm_too_many_publish/6 + , cancel_alarms/3 + ]). + +-define(ALARM_CONN_CONGEST(Channel, Reason), + list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s/~s", [emqx_channel:info(clientid, Channel), + maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>), + Reason]))). + +-define(ALARM_CONN_INFO_KEYS, [ + socktype, sockname, peername, clientid, username, proto_name, proto_ver, + connected_at, conn_state +]). +-define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). +-define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). +-define(PROC_INFO_KEYS, [message_queue_len, memory, reductions]). +-define(ALARM_SENT(REASON), {alarm_sent, REASON}). +-define(ALL_ALARM_REASONS, [port_busy, too_many_publish]). +-define(CONFIRM_CLEAR(REASON), {alarm_confirm_clear, REASON}). +-define(CONFIRM_CLEAR_INTERVAL, 10000). + +maybe_alarm_port_busy(Socket, Transport, Channel) -> + maybe_alarm_port_busy(Socket, Transport, Channel, false). + +maybe_alarm_port_busy(Socket, Transport, Channel, ForceClear) -> + case is_tcp_congested(Socket, Transport) of + true -> alarm_congestion(Socket, Transport, Channel, port_busy); + false -> cancel_alarm_congestion(Socket, Transport, Channel, port_busy, + ForceClear) + end. + +maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, + MaxBatchSize) -> + maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, + MaxBatchSize, false). + +maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, + PubMsgCount = _MaxBatchSize, _ForceClear) -> + %% we only alarm it when the process is "too busy" + alarm_congestion(Socket, Transport, Channel, too_many_publish); +maybe_alarm_too_many_publish(Socket, Transport, Channel, PubMsgCount, + _MaxBatchSize, ForceClear) when PubMsgCount == 0 -> + %% but we clear the alarm until it is really "idle", to avoid sending + %% alarms and clears too frequently + cancel_alarm_congestion(Socket, Transport, Channel, too_many_publish, + ForceClear); +maybe_alarm_too_many_publish(_Socket, _Transport, _Channel, _PubMsgCount, + _MaxBatchSize, _ForceClear) -> + ok. + +cancel_alarms(Socket, Transport, Channel) -> + lists:foreach(fun(Reason) -> + do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) + end, ?ALL_ALARM_REASONS). + +alarm_congestion(Socket, Transport, Channel, Reason) -> + case has_alarm_sent(Reason) of + false -> do_alarm_congestion(Socket, Transport, Channel, Reason); + true -> + %% pretend we have sent an alarm again + update_alarm_sent_at(Reason) + end. + +cancel_alarm_congestion(Socket, Transport, Channel, Reason, ForceClear) -> + case is_alarm_allowed_clear(Reason, ForceClear) of + true -> do_cancel_alarm_congestion(Socket, Transport, Channel, Reason); + false -> ok + end. + +do_alarm_congestion(Socket, Transport, Channel, Reason) -> + ok = update_alarm_sent_at(Reason), + AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), + emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails), + ok. + +do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> + ok = remove_alarm_sent_at(Reason), + AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), + emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails), + ok. + +is_tcp_congested(Socket, Transport) -> + case Transport:getstat(Socket, [send_pend]) of + {ok, [{send_pend, N}]} when N > 0 -> true; + _ -> false + end. + +has_alarm_sent(Reason) -> + case get_alarm_sent_at(Reason) of + 0 -> false; + _ -> true + end. +update_alarm_sent_at(Reason) -> + erlang:put(?ALARM_SENT(Reason), timenow()), + ok. +remove_alarm_sent_at(Reason) -> + erlang:erase(?ALARM_SENT(Reason)), + ok. +get_alarm_sent_at(Reason) -> + case erlang:get(?ALARM_SENT(Reason)) of + undefined -> 0; + LastSentAt -> LastSentAt + end. + +is_alarm_allowed_clear(Reason, _ForceClear = true) -> + has_alarm_sent(Reason); +is_alarm_allowed_clear(Reason, _ForceClear = false) -> + %% only sent clears when the alarm was not triggered in the last + %% ?CONFIRM_CLEAR_INTERVAL time + case timenow() - get_alarm_sent_at(Reason) of + Elapse when Elapse >= ?CONFIRM_CLEAR_INTERVAL -> true; + _ -> false + end. + +timenow() -> + erlang:system_time(millisecond). + +%%============================================================================== +%% Alarm message +%%============================================================================== +tcp_congestion_alarm_details(Socket, Transport, Channel) -> + ProcInfo = process_info(self(), ?PROC_INFO_KEYS), + BasicInfo = [{pid, list_to_binary(pid_to_list(self()))} | ProcInfo], + Stat = case Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS) of + {ok, Stat0} -> Stat0; + {error, _} -> [] + end, + Opts = case Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS) of + {ok, Opts0} -> Opts0; + {error, _} -> [] + end, + SockInfo = Stat ++ Opts, + ConnInfo = [conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS], + maps:from_list(BasicInfo ++ ConnInfo ++ SockInfo). + +conn_info(Key, Channel) when Key =:= sockname; Key =:= peername -> + {IPStr, Port} = emqx_channel:info(Key, Channel), + {Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])}; +conn_info(Key, Channel) -> + {Key, emqx_channel:info(Key, Channel)}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 6e8ed9a46..91088f027 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -103,17 +103,6 @@ -define(ENABLED(X), (X =/= undefined)). --define(ALARM_TCP_CONGEST(Channel), - list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s", [emqx_channel:info(clientid, Channel), maps:get(username, emqx_channel:info(clientinfo, Channel), <<"undefined">>)]))). - - --define(ALARM_CONN_INFO_KEYS, [ - socktype, sockname, peername, - clientid, username, proto_name, proto_ver, connected_at -]). --define(ALARM_SOCK_STATS_KEYS, [send_pend, recv_cnt, recv_oct, send_cnt, send_oct]). --define(ALARM_SOCK_OPTS_KEYS, [high_watermark, high_msgq_watermark, sndbuf, recbuf, buffer]). - -dialyzer({no_match, [info/2]}). -dialyzer({nowarn_function, [ init/4 , init_state/3 @@ -272,7 +261,7 @@ recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) -> Msg -> process_msg([Msg], Parent, ensure_stats_timer(IdleTimeout, State)) after - IdleTimeout -> + IdleTimeout + 100 -> hibernate(Parent, cancel_stats_timer(State)) end. @@ -385,8 +374,12 @@ handle_msg({Passive, _Sock}, State) handle_info(activate_socket, NState1); handle_msg(Deliver = {deliver, _Topic, _Msg}, - State = #state{active_n = ActiveN}) -> - Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)], + #state{active_n = MaxBatchSize, transport = Transport, + socket = Socket, channel = Channel} = State) -> + Delivers0 = emqx_misc:drain_deliver(MaxBatchSize), + emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel, + length(Delivers0), MaxBatchSize), + Delivers = [Deliver|Delivers0], with_channel(handle_deliver, [Delivers], State); %% Something sent @@ -438,10 +431,12 @@ handle_msg(Msg, State) -> %%-------------------------------------------------------------------- %% Terminate -terminate(Reason, State = #state{channel = Channel}) -> +terminate(Reason, State = #state{channel = Channel, transport = Transport, + socket = Socket}) -> ?LOG(debug, "Terminated due to ~p", [Reason]), - emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)), - emqx_channel:terminate(Reason, Channel), + Channel1 = emqx_channel:set_conn_state(disconnected, Channel), + emqx_congestion:cancel_alarms(Socket, Transport, Channel1), + emqx_channel:terminate(Reason, Channel1), close_socket(State), exit(Reason). @@ -553,8 +548,12 @@ handle_timeout(_TRef, limit_timeout, State) -> }, handle_info(activate_socket, NState); -handle_timeout(_TRef, emit_stats, State = - #state{channel = Channel}) -> +handle_timeout(_TRef, emit_stats, State = #state{active_n = MaxBatchSize, + channel = Channel, transport = Transport, socket = Socket}) -> + {_, MsgQLen} = erlang:process_info(self(), message_queue_len), + emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel, true), + emqx_congestion:maybe_alarm_too_many_publish(Socket, Transport, Channel, + MsgQLen, MaxBatchSize, true), ClientId = emqx_channel:info(clientid, Channel), emqx_cm:set_chan_stats(ClientId, stats(State)), {ok, State#state{stats_timer = undefined}}; @@ -667,7 +666,7 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), emqx_pd:inc_counter(outgoing_bytes, Oct), - maybe_warn_congestion(Socket, Transport, Channel), + emqx_congestion:maybe_alarm_port_busy(Socket, Transport, Channel), case Transport:async_send(Socket, IoData, [nosuspend]) of ok -> ok; Error = {error, _Reason} -> @@ -676,48 +675,6 @@ send(IoData, #state{transport = Transport, socket = Socket, channel = Channel}) ok end. -maybe_warn_congestion(Socket, Transport, Channel) -> - IsCongestAlarmSet = is_congestion_alarm_set(), - case is_congested(Socket, Transport) of - true when not IsCongestAlarmSet -> - ok = set_congestion_alarm(), - emqx_alarm:activate(?ALARM_TCP_CONGEST(Channel), - tcp_congestion_alarm_details(Socket, Transport, Channel)); - false when IsCongestAlarmSet -> - ok = clear_congestion_alarm(), - emqx_alarm:deactivate(?ALARM_TCP_CONGEST(Channel)); - _ -> ok - end. - -is_congested(Socket, Transport) -> - case Transport:getstat(Socket, [send_pend]) of - {ok, [{send_pend, N}]} when N > 0 -> true; - _ -> false - end. - -is_congestion_alarm_set() -> - case erlang:get(conn_congested) of - true -> true; - _ -> false - end. -set_congestion_alarm() -> - erlang:put(conn_congested, true), ok. -clear_congestion_alarm() -> - erlang:put(conn_congested, false), ok. - -tcp_congestion_alarm_details(Socket, Transport, Channel) -> - {ok, Stat} = Transport:getstat(Socket, ?ALARM_SOCK_STATS_KEYS), - {ok, Opts} = Transport:getopts(Socket, ?ALARM_SOCK_OPTS_KEYS), - SockInfo = maps:from_list(Stat ++ Opts), - ConnInfo = maps:from_list([conn_info(Key, Channel) || Key <- ?ALARM_CONN_INFO_KEYS]), - maps:merge(ConnInfo, SockInfo). - -conn_info(Key, Channel) when Key =:= sockname; Key =:= peername -> - {IPStr, Port} = emqx_channel:info(Key, Channel), - {Key, iolist_to_binary([inet:ntoa(IPStr),":",integer_to_list(Port)])}; -conn_info(Key, Channel) -> - {Key, emqx_channel:info(Key, Channel)}. - %%-------------------------------------------------------------------- %% Handle Info diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 49517d8a6..2852c917b 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -145,12 +145,12 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, case emqx_vm:cpu_util() of %% TODO: should be improved? 0 -> State#{timer := undefined}; - Busy when Busy / 100 >= CPUHighWatermark -> + Busy when Busy >= CPUHighWatermark -> emqx_alarm:activate(high_cpu_usage, #{usage => Busy, high_watermark => CPUHighWatermark, low_watermark => CPULowWatermark}), ensure_check_timer(State); - Busy when Busy / 100 =< CPULowWatermark -> + Busy when Busy =< CPULowWatermark -> emqx_alarm:deactivate(high_cpu_usage), ensure_check_timer(State); _Busy -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 71372cd90..17476199b 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -246,7 +246,7 @@ pick(Strategy, ClientId, Group, Topic, FailedSubs) -> do_pick(Strategy, ClientId, Group, Topic, FailedSubs) -> All = subscribers(Group, Topic), case All -- FailedSubs of - [] when FailedSubs =:= [] -> + [] when All =:= [] -> %% Genuinely no subscriber false; [] -> diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 2538aeecb..d360bb5ae 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -54,6 +54,7 @@ init_per_suite(Config) -> ok = meck:expect(emqx_alarm, activate, fun(_, _) -> ok end), ok = meck:expect(emqx_alarm, deactivate, fun(_) -> ok end), + ok = meck:expect(emqx_alarm, deactivate, fun(_, _) -> ok end), Config. @@ -77,6 +78,9 @@ init_per_testcase(_TestCase, Config) -> (peercert, [sock]) -> undefined end), ok = meck:expect(emqx_transport, setopts, fun(_Sock, _Opts) -> ok end), + ok = meck:expect(emqx_transport, getopts, fun(_Sock, Options) -> + {ok, [{K, 0} || K <- Options]} + end), ok = meck:expect(emqx_transport, getstat, fun(_Sock, Options) -> {ok, [{K, 0} || K <- Options]} end),