Merge pull request #7215 from thalesmg/fix-congestion-log
fix(congestion): fix congestion message formatting
This commit is contained in:
commit
514e374868
|
@ -20,6 +20,8 @@
|
||||||
, cancel_alarms/3
|
, cancel_alarms/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_congestion]}}]).
|
||||||
|
|
||||||
-define(ALARM_CONN_CONGEST(Channel, Reason),
|
-define(ALARM_CONN_CONGEST(Channel, Reason),
|
||||||
list_to_binary(
|
list_to_binary(
|
||||||
io_lib:format("~ts/~ts/~ts",
|
io_lib:format("~ts/~ts/~ts",
|
||||||
|
@ -78,14 +80,14 @@ cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
do_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
do_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
ok = update_alarm_sent_at(Reason),
|
ok = update_alarm_sent_at(Reason),
|
||||||
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
||||||
Message = io_lib:format("connection congested: ~ts", [AlarmDetails]),
|
Message = io_lib:format("connection congested: ~0p", [AlarmDetails]),
|
||||||
emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message),
|
emqx_alarm:activate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
do_cancel_alarm_congestion(Socket, Transport, Channel, Reason) ->
|
||||||
ok = remove_alarm_sent_at(Reason),
|
ok = remove_alarm_sent_at(Reason),
|
||||||
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
AlarmDetails = tcp_congestion_alarm_details(Socket, Transport, Channel),
|
||||||
Message = io_lib:format("connection congested: ~ts", [AlarmDetails]),
|
Message = io_lib:format("connection congested: ~0p", [AlarmDetails]),
|
||||||
emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message),
|
emqx_alarm:deactivate(?ALARM_CONN_CONGEST(Channel, Reason), AlarmDetails, Message),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -335,7 +335,8 @@ t_handle_info(_) ->
|
||||||
|
|
||||||
t_ensure_rate_limit(_) ->
|
t_ensure_rate_limit(_) ->
|
||||||
WhenOk = fun emqx_connection:next_incoming_msgs/3,
|
WhenOk = fun emqx_connection:next_incoming_msgs/3,
|
||||||
{ok, [], State} = emqx_connection:check_limiter([], [], WhenOk, [], st(#{limiter => undefined})),
|
{ok, [], State} = emqx_connection:check_limiter([], [], WhenOk, [],
|
||||||
|
st(#{limiter => undefined})),
|
||||||
?assertEqual(undefined, emqx_connection:info(limiter, State)),
|
?assertEqual(undefined, emqx_connection:info(limiter, State)),
|
||||||
|
|
||||||
Limiter = init_limiter(),
|
Limiter = init_limiter(),
|
||||||
|
@ -344,7 +345,8 @@ t_ensure_rate_limit(_) ->
|
||||||
|
|
||||||
ok = meck:expect(emqx_htb_limiter, check,
|
ok = meck:expect(emqx_htb_limiter, check,
|
||||||
fun(_, Client) -> {pause, 3000, undefined, Client} end),
|
fun(_, Client) -> {pause, 3000, undefined, Client} end),
|
||||||
{ok, State2} = emqx_connection:check_limiter([{1000, bytes_in}], [], WhenOk, [], st(#{limiter => Limiter})),
|
{ok, State2} = emqx_connection:check_limiter([{1000, bytes_in}], [],
|
||||||
|
WhenOk, [], st(#{limiter => Limiter})),
|
||||||
meck:unload(emqx_htb_limiter),
|
meck:unload(emqx_htb_limiter),
|
||||||
ok = meck:new(emqx_htb_limiter, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_htb_limiter, [passthrough, no_history, no_link]),
|
||||||
?assertNotEqual(undefined, emqx_connection:info(limiter_timer, State2)).
|
?assertNotEqual(undefined, emqx_connection:info(limiter_timer, State2)).
|
||||||
|
@ -432,6 +434,32 @@ t_oom_shutdown(_) ->
|
||||||
end, Opts),
|
end, Opts),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_cancel_congestion_alarm(_) ->
|
||||||
|
Opts = #{trap_exit => false},
|
||||||
|
ok = meck:expect(emqx_transport, getstat,
|
||||||
|
fun(_Sock, [send_pend]) ->
|
||||||
|
%% simulate congestion
|
||||||
|
{ok, [{send_pend, 999}]};
|
||||||
|
(_Sock, Options) ->
|
||||||
|
{ok, [{K, 0} || K <- Options]}
|
||||||
|
end),
|
||||||
|
with_conn(
|
||||||
|
fun(Pid) ->
|
||||||
|
#{ channel := Channel
|
||||||
|
, transport := Transport
|
||||||
|
, socket := Socket
|
||||||
|
} = emqx_connection:get_state(Pid),
|
||||||
|
%% precondition
|
||||||
|
Zone = emqx_channel:info(zone, Channel),
|
||||||
|
true = emqx_config:get_zone_conf(Zone, [conn_congestion, enable_alarm]),
|
||||||
|
%% should not raise errors
|
||||||
|
ok = emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
|
||||||
|
%% should not raise errors either
|
||||||
|
ok = emqx_congestion:cancel_alarms(Socket, Transport, Channel),
|
||||||
|
ok
|
||||||
|
end, Opts),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -524,7 +552,10 @@ channel(InitFields) ->
|
||||||
maps:fold(fun(Field, Value, Channel) ->
|
maps:fold(fun(Field, Value, Channel) ->
|
||||||
emqx_channel:set_field(Field, Value, Channel)
|
emqx_channel:set_field(Field, Value, Channel)
|
||||||
end,
|
end,
|
||||||
emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}),
|
emqx_channel:init(ConnInfo, #{ zone => default
|
||||||
|
, limiter => limiter_cfg()
|
||||||
|
, listener => {tcp, default}
|
||||||
|
}),
|
||||||
maps:merge(#{clientinfo => ClientInfo,
|
maps:merge(#{clientinfo => ClientInfo,
|
||||||
session => Session,
|
session => Session,
|
||||||
conn_state => connected
|
conn_state => connected
|
||||||
|
|
Loading…
Reference in New Issue