diff --git a/apps/emqx/src/emqx_congestion.erl b/apps/emqx/src/emqx_congestion.erl index fa2788bc1..ca94ded90 100644 --- a/apps/emqx/src/emqx_congestion.erl +++ b/apps/emqx/src/emqx_congestion.erl @@ -20,6 +20,8 @@ , cancel_alarms/3 ]). +-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_congestion]}}]). + -define(ALARM_CONN_CONGEST(Channel, Reason), list_to_binary( io_lib:format("~ts/~ts/~ts", @@ -78,14 +80,14 @@ cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> do_alarm_congestion(Socket, Transport, Channel, Reason) -> ok = update_alarm_sent_at(Reason), AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), - Message = io_lib:format("connection congested: ~ts", [AlarmDetails]), + Message = io_lib:format("connection congested: ~0p", [AlarmDetails]), emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message), ok. do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) -> ok = remove_alarm_sent_at(Reason), AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel), - Message = io_lib:format("connection congested: ~ts", [AlarmDetails]), + Message = io_lib:format("connection congested: ~0p", [AlarmDetails]), emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message), ok. diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index 5d2c31441..f96ac6ca7 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -335,7 +335,8 @@ t_handle_info(_) -> t_ensure_rate_limit(_) -> WhenOk = fun emqx_connection:next_incoming_msgs/3, - {ok, [], State} = emqx_connection:check_limiter([], [], WhenOk, [], st(#{limiter => undefined})), + {ok, [], State} = emqx_connection:check_limiter([], [], WhenOk, [], + st(#{limiter => undefined})), ?assertEqual(undefined, emqx_connection:info(limiter, State)), Limiter = init_limiter(), @@ -344,7 +345,8 @@ t_ensure_rate_limit(_) -> ok = meck:expect(emqx_htb_limiter, check, fun(_, Client) -> {pause, 3000, undefined, Client} end), - {ok, State2} = emqx_connection:check_limiter([{1000, bytes_in}], [], WhenOk, [], st(#{limiter => Limiter})), + {ok, State2} = emqx_connection:check_limiter([{1000, bytes_in}], [], + WhenOk, [], st(#{limiter => Limiter})), meck:unload(emqx_htb_limiter), ok = meck:new(emqx_htb_limiter, [passthrough, no_history, no_link]), ?assertNotEqual(undefined, emqx_connection:info(limiter_timer, State2)). @@ -432,6 +434,32 @@ t_oom_shutdown(_) -> end, Opts), ok. +t_cancel_congestion_alarm(_) -> + Opts = #{trap_exit => false}, + ok = meck:expect(emqx_transport, getstat, + fun(_Sock, [send_pend]) -> + %% simulate congestion + {ok, [{send_pend, 999}]}; + (_Sock, Options) -> + {ok, [{K, 0} || K <- Options]} + end), + with_conn( + fun(Pid) -> + #{ channel := Channel + , transport := Transport + , socket := Socket + } = emqx_connection:get_state(Pid), + %% precondition + Zone = emqx_channel:info(zone, Channel), + true = emqx_config:get_zone_conf(Zone, [conn_congestion, enable_alarm]), + %% should not raise errors + ok = emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel), + %% should not raise errors either + ok = emqx_congestion:cancel_alarms(Socket, Transport, Channel), + ok + end, Opts), + ok. + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -524,7 +552,10 @@ channel(InitFields) -> maps:fold(fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}), + emqx_channel:init(ConnInfo, #{ zone => default + , limiter => limiter_cfg() + , listener => {tcp, default} + }), maps:merge(#{clientinfo => ClientInfo, session => Session, conn_state => connected