From 4942f6f75a2a1b303cd7339fcd1614cfb3e3378c Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Fri, 21 Jun 2024 18:07:33 +0800 Subject: [PATCH] feat: improve keepalive_multiplier and keepalive_check_interval --- apps/emqx/src/emqx_channel.erl | 33 ++-- apps/emqx/src/emqx_connection.erl | 4 +- apps/emqx/src/emqx_keepalive.erl | 98 ++++++++--- apps/emqx/src/emqx_schema.erl | 10 +- apps/emqx/src/emqx_ws_connection.erl | 3 +- apps/emqx/test/emqx_config_SUITE.erl | 1 + apps/emqx/test/emqx_keepalive_SUITE.erl | 166 +++++++++++++++++- .../src/emqx_coap_channel.erl | 8 +- .../src/emqx_coap_schema.erl | 8 +- .../src/emqx_gateway_coap.app.src | 2 +- .../test/emqx_coap_SUITE.erl | 7 +- .../src/emqx_exproto_channel.erl | 4 +- .../src/emqx_gateway_exproto.app.src | 2 +- .../src/emqx_gateway_gbt32960.app.src | 2 +- .../src/emqx_gbt32960_channel.erl | 2 +- .../src/emqx_gateway_jt808.app.src | 2 +- .../src/emqx_jt808_channel.erl | 2 +- .../src/emqx_gateway_mqttsn.app.src | 2 +- .../src/emqx_mqttsn_channel.erl | 4 +- .../test/emqx_mgmt_api_clients_SUITE.erl | 2 +- rel/i18n/emqx_schema.hocon | 9 + 21 files changed, 301 insertions(+), 70 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index c1a9cc162..1a24cd260 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -544,8 +544,10 @@ handle_in( {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) end; -handle_in(?PACKET(?PINGREQ), Channel) -> - {ok, ?PACKET(?PINGRESP), Channel}; +handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) -> + {ok, NKeepalive} = emqx_keepalive:check(Keepalive), + NChannel = Channel#channel{keepalive = NKeepalive}, + {ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)}; handle_in( ?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo} @@ -1229,11 +1231,12 @@ handle_call( {keepalive, Interval}, Channel = #channel{ keepalive = KeepAlive, - conninfo = ConnInfo + conninfo = ConnInfo, + clientinfo = #{zone := Zone} } ) -> ClientId = info(clientid, Channel), - NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive), + NKeepalive = emqx_keepalive:update(Zone, Interval, KeepAlive), NConnInfo = maps:put(keepalive, Interval, ConnInfo), NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo}, SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}), @@ -1333,22 +1336,22 @@ die_if_test_compiled() -> | {shutdown, Reason :: term(), channel()}. handle_timeout( _TRef, - {keepalive, _StatVal}, + keepalive, Channel = #channel{keepalive = undefined} ) -> {ok, Channel}; handle_timeout( _TRef, - {keepalive, _StatVal}, + keepalive, Channel = #channel{conn_state = disconnected} ) -> {ok, Channel}; handle_timeout( _TRef, - {keepalive, StatVal}, + keepalive, Channel = #channel{keepalive = Keepalive} ) -> - case emqx_keepalive:check(StatVal, Keepalive) of + case emqx_keepalive:check(Keepalive) of {ok, NKeepalive} -> NChannel = Channel#channel{keepalive = NKeepalive}, {ok, reset_timer(keepalive, NChannel)}; @@ -1459,10 +1462,16 @@ reset_timer(Name, Time, Channel) -> ensure_timer(Name, Time, clean_timer(Name, Channel)). clean_timer(Name, Channel = #channel{timers = Timers}) -> - Channel#channel{timers = maps:remove(Name, Timers)}. + case maps:take(Name, Timers) of + error -> + Channel; + {TRef, NTimers} -> + ok = emqx_utils:cancel_timer(TRef), + Channel#channel{timers = NTimers} + end. interval(keepalive, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_delivery, #channel{session = Session}) -> emqx_session:info(retry_interval, Session); interval(expire_awaiting_rel, #channel{session = Session}) -> @@ -2320,9 +2329,7 @@ ensure_keepalive_timer(0, Channel) -> ensure_keepalive_timer(disabled, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> - Multiplier = get_mqtt_conf(Zone, keepalive_multiplier), - RecvCnt = emqx_pd:get_counter(recv_pkt), - Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)), + Keepalive = emqx_keepalive:init(Zone, Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). clear_keepalive(Channel = #channel{timers = Timers}) -> diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index ed62fb63c..517a5cc2f 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -729,9 +729,7 @@ handle_timeout( disconnected -> {ok, State}; _ -> - %% recv_pkt: valid MQTT message - RecvCnt = emqx_pd:get_counter(recv_pkt), - handle_timeout(TRef, {keepalive, RecvCnt}, State) + with_channel(handle_timeout, [TRef, keepalive], State) end; handle_timeout(TRef, Msg, State) -> with_channel(handle_timeout, [TRef, Msg], State). diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl index 8ed685db2..785893d2d 100644 --- a/apps/emqx/src/emqx_keepalive.erl +++ b/apps/emqx/src/emqx_keepalive.erl @@ -19,10 +19,12 @@ -export([ init/1, init/2, + init/3, info/1, info/2, + check/1, check/2, - update/2 + update/3 ]). -elvis([{elvis_style, no_if_expression, disable}]). @@ -30,8 +32,12 @@ -export_type([keepalive/0]). -record(keepalive, { - interval :: pos_integer(), - statval :: non_neg_integer() + check_interval :: pos_integer(), + %% the received packets since last keepalive check + statval :: non_neg_integer(), + %% The number of idle intervals allowed before disconnecting the client. + idle_milliseconds = 0 :: non_neg_integer(), + max_idle_millisecond :: pos_integer() }). -opaque keepalive() :: #keepalive{}. @@ -39,7 +45,11 @@ %% @doc Init keepalive. -spec init(Interval :: non_neg_integer()) -> keepalive(). -init(Interval) -> init(0, Interval). +init(Interval) -> init(default, 0, Interval). + +init(Zone, Interval) -> + RecvCnt = emqx_pd:get_counter(recv_pkt), + init(Zone, RecvCnt, Interval). %% from mqtt-v3.1.1 specific %% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism. @@ -53,42 +63,88 @@ init(Interval) -> init(0, Interval). %% typically this is a few minutes. %% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds. %% @doc Init keepalive. --spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined. -init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL -> - #keepalive{interval = Interval, statval = StatVal}; -init(_, 0) -> +-spec init( + Zone :: atom(), + StatVal :: non_neg_integer(), + Second :: non_neg_integer() +) -> keepalive() | undefined. +init(Zone, StatVal, Second) when Second > 0 andalso Second =< ?MAX_INTERVAL -> + #{keepalive_multiplier := Mul, keepalive_check_interval := CheckInterval} = + emqx_config:get_zone_conf(Zone, [mqtt]), + MilliSeconds = timer:seconds(Second), + Interval = emqx_utils:clamp(CheckInterval, 1000, max(MilliSeconds div 2, 1000)), + MaxIdleMs = ceil(MilliSeconds * Mul), + #keepalive{ + check_interval = Interval, + statval = StatVal, + idle_milliseconds = 0, + max_idle_millisecond = MaxIdleMs + }; +init(_Zone, _, 0) -> undefined; -init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL). +init(Zone, StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(Zone, StatVal, ?MAX_INTERVAL). %% @doc Get Info of the keepalive. -spec info(keepalive()) -> emqx_types:infos(). info(#keepalive{ - interval = Interval, - statval = StatVal + check_interval = Interval, + statval = StatVal, + idle_milliseconds = IdleIntervals, + max_idle_millisecond = MaxMs }) -> #{ - interval => Interval, - statval => StatVal + check_interval => Interval, + statval => StatVal, + idle_milliseconds => IdleIntervals, + max_idle_millisecond => MaxMs }. --spec info(interval | statval, keepalive()) -> +-spec info(check_interval | statval | idle_milliseconds, keepalive()) -> non_neg_integer(). -info(interval, #keepalive{interval = Interval}) -> +info(check_interval, #keepalive{check_interval = Interval}) -> Interval; info(statval, #keepalive{statval = StatVal}) -> StatVal; -info(interval, undefined) -> +info(idle_milliseconds, #keepalive{idle_milliseconds = Val}) -> + Val; +info(check_interval, undefined) -> 0. +check(Keepalive = #keepalive{}) -> + RecvCnt = emqx_pd:get_counter(recv_pkt), + check(RecvCnt, Keepalive); +check(Keepalive) -> + {ok, Keepalive}. + %% @doc Check keepalive. -spec check(non_neg_integer(), keepalive()) -> {ok, keepalive()} | {error, timeout}. -check(Val, #keepalive{statval = Val}) -> {error, timeout}; -check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}. + +check( + NewVal, + #keepalive{ + statval = NewVal, + idle_milliseconds = IdleAcc, + check_interval = Interval, + max_idle_millisecond = Max + } +) when IdleAcc + Interval >= Max -> + {error, timeout}; +check( + NewVal, + #keepalive{ + statval = NewVal, + idle_milliseconds = IdleAcc, + check_interval = Interval + } = KeepAlive +) -> + {ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = IdleAcc + Interval}}; +check(NewVal, #keepalive{} = KeepAlive) -> + {ok, KeepAlive#keepalive{statval = NewVal, idle_milliseconds = 0}}. %% @doc Update keepalive. %% The statval of the previous keepalive will be used, %% and normal checks will begin from the next cycle. --spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined. -update(Interval, undefined) -> init(0, Interval); -update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval). +-spec update(atom(), non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined. +update(Zone, Interval, undefined) -> init(Zone, 0, Interval); +update(Zone, Interval, #keepalive{statval = StatVal}) -> init(Zone, StatVal, Interval). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 9b96ad20a..6b02a4d4b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -3613,9 +3613,17 @@ mqtt_general() -> desc => ?DESC(mqtt_keepalive_multiplier) } )}, + {"keepalive_check_interval", + sc( + timeout_duration(), + #{ + default => <<"30s">>, + desc => ?DESC(mqtt_keepalive_check_interval) + } + )}, {"retry_interval", sc( - duration(), + timeout_duration(), #{ default => <<"30s">>, desc => ?DESC(mqtt_retry_interval) diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 038f3e98e..e46bdc313 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -555,8 +555,7 @@ handle_info(Info, State) -> handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) -> shutdown(idle_timeout, State); handle_timeout(TRef, keepalive, State) when is_reference(TRef) -> - RecvOct = emqx_pd:get_counter(recv_oct), - handle_timeout(TRef, {keepalive, RecvOct}, State); + with_channel(handle_timeout, [TRef, keepalive], State); handle_timeout( TRef, emit_stats, diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index 28f542f81..568f5de20 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -428,6 +428,7 @@ zone_global_defaults() -> ignore_loop_deliver => false, keepalive_backoff => 0.75, keepalive_multiplier => 1.5, + keepalive_check_interval => 30000, max_awaiting_rel => 100, max_clientid_len => 65535, max_inflight => 32, diff --git a/apps/emqx/test/emqx_keepalive_SUITE.erl b/apps/emqx/test/emqx_keepalive_SUITE.erl index 7773774a7..84f66b3a5 100644 --- a/apps/emqx/test/emqx_keepalive_SUITE.erl +++ b/apps/emqx/test/emqx_keepalive_SUITE.erl @@ -19,22 +19,180 @@ -compile(export_all). -compile(nowarn_export_all). +-include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> emqx_common_test_helpers:all(?MODULE). +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, + "listeners {" + "tcp.default.bind = 1883," + "ssl.default = marked_for_deletion," + "quic.default = marked_for_deletion," + "ws.default = marked_for_deletion," + "wss.default = marked_for_deletion" + "}"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(apps, Config)). + +t_check_keepalive_default_timeout(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000), + erlang:process_flag(trap_exit, true), + ClientID = <<"default">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(5000, CheckInterval), + %% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000) + %% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000) + %% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout + Timeout = CheckInterval * 3, + %% connector but not send a packet. + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)). + +t_check_keepalive_other_timeout(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 2000), + erlang:process_flag(trap_exit, true), + ClientID = <<"other">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + {ok, _, [0]} = emqtt:subscribe(C, <<"mytopic">>, []), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + %%CheckInterval = ceil(keepalive_check_factor() * KeepaliveSec * 1000), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(2000, CheckInterval), + %% when keepalive_check_interval is 2s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% subscribe T1(packet = 2, idle_milliseconds = 0) + %% check1 T2(packet = 2, idle_milliseconds = 1 * CheckInterval = 2000) + %% check2 T3(packet = 2, idle_milliseconds = 2 * CheckInterval = 4000) + %% check3 T4(packet = 2, idle_milliseconds = 3 * CheckInterval = 6000) + %% check4 T5(packet = 2, idle_milliseconds = 4 * CheckInterval = 8000) + %% check4 T6(packet = 2, idle_milliseconds = 5 * CheckInterval = 10000) + %% check4 T7(packet = 2, idle_milliseconds = 6 * CheckInterval = 12000) + %% check4 T8(packet = 2, idle_milliseconds = 7 * CheckInterval = 14000) + %% check4 T9(packet = 2, idle_milliseconds = 8 * CheckInterval = 16000) > 15000 timeout + Timeout = CheckInterval * 9, + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200), Timeout). + +t_check_keepalive_ping_reset_timer(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 100000), + erlang:process_flag(trap_exit, true), + ClientID = <<"ping_reset">>, + KeepaliveSec = 10, + {ok, C} = emqtt:start_link([ + {keepalive, KeepaliveSec}, + {clientid, binary_to_list(ClientID)} + ]), + {ok, _} = emqtt:connect(C), + emqtt:pause(C), + ct:sleep(1000), + emqtt:resume(C), + pong = emqtt:ping(C), + emqtt:pause(C), + [ChannelPid] = emqx_cm:lookup_channels(ClientID), + erlang:link(ChannelPid), + CheckInterval = emqx_utils:clamp(keepalive_check_interval(), 1000, 5000), + ?assertMatch(5000, CheckInterval), + %% when keepalive_check_interval is 30s and keepalive_multiplier is 1.5 + %% connect T0(packet = 1, idle_milliseconds = 0) + %% sleep 1000ms + %% ping (packet = 2, idle_milliseconds = 0) restart timer + %% check1 T1(packet = 1, idle_milliseconds = 1 * CheckInterval = 5000) + %% check2 T2(packet = 1, idle_milliseconds = 2 * CheckInterval = 10000) + %% check2 T3(packet = 1, idle_milliseconds = 3 * CheckInterval = 15000) -> timeout + Timeout = CheckInterval * 3, + ?assertMatch( + no_keepalive_timeout_received, + receive_msg_in_time(ChannelPid, C, Timeout - 200), + Timeout - 200 + ), + ?assertMatch(ok, receive_msg_in_time(ChannelPid, C, 1200)). + t_check(_) -> + emqx_config:put_zone_conf(default, [mqtt, keepalive_multiplier], 1.5), + emqx_config:put_zone_conf(default, [mqtt, keepalive_check_interval], 30000), Keepalive = emqx_keepalive:init(60), - ?assertEqual(60, emqx_keepalive:info(interval, Keepalive)), + ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive)), ?assertEqual(0, emqx_keepalive:info(statval, Keepalive)), Info = emqx_keepalive:info(Keepalive), ?assertEqual( #{ - interval => 60, - statval => 0 + check_interval => 30000, + statval => 0, + idle_milliseconds => 0, + %% 60 * 1.5 * 1000 + max_idle_millisecond => 90000 }, Info ), {ok, Keepalive1} = emqx_keepalive:check(1, Keepalive), ?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)), - ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)). + {ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1), + ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)), + {ok, Keepalive3} = emqx_keepalive:check(1, Keepalive2), + ?assertEqual(1, emqx_keepalive:info(statval, Keepalive3)), + ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive3)), + + Keepalive4 = emqx_keepalive:init(90), + ?assertEqual(30000, emqx_keepalive:info(check_interval, Keepalive4)), + + Keepalive5 = emqx_keepalive:init(1), + ?assertEqual(1000, emqx_keepalive:info(check_interval, Keepalive5)), + ok. + +keepalive_multiplier() -> + emqx_config:get_zone_conf(default, [mqtt, keepalive_multiplier]). + +keepalive_check_interval() -> + emqx_config:get_zone_conf(default, [mqtt, keepalive_check_interval]). + +receive_msg_in_time(ChannelPid, C, Timeout) -> + receive + {'EXIT', ChannelPid, {shutdown, keepalive_timeout}} -> + receive + {'EXIT', C, {shutdown, tcp_closed}} -> + ok + after 500 -> + throw(no_tcp_closed_from_mqtt_client) + end + after Timeout -> + no_keepalive_timeout_received + end. diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index fbab1ff14..844677d12 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -85,7 +85,7 @@ -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). --define(DEF_IDLE_TIME, timer:seconds(30)). +-define(DEF_IDLE_SECONDS, 30). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). @@ -149,7 +149,7 @@ init( mountpoint => Mountpoint } ), - Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_TIME), + Heartbeat = maps:get(heartbeat, Config, ?DEF_IDLE_SECONDS), #channel{ ctx = Ctx, conninfo = ConnInfo, @@ -378,7 +378,7 @@ ensure_keepalive_timer(Channel) -> ensure_keepalive_timer(fun ensure_timer/4, Channel). ensure_keepalive_timer(Fun, #channel{keepalive = KeepAlive} = Channel) -> - Heartbeat = emqx_keepalive:info(interval, KeepAlive), + Heartbeat = emqx_keepalive:info(check_interval, KeepAlive), Fun(keepalive, Heartbeat, keepalive, Channel). check_auth_state(Msg, #channel{connection_required = false} = Channel) -> @@ -495,7 +495,7 @@ enrich_conninfo( ) -> case Queries of #{<<"clientid">> := ClientId} -> - Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)), + Interval = emqx_keepalive:info(check_interval, KeepAlive), NConnInfo = ConnInfo#{ clientid => ClientId, proto_name => <<"CoAP">>, diff --git a/apps/emqx_gateway_coap/src/emqx_coap_schema.erl b/apps/emqx_gateway_coap/src/emqx_coap_schema.erl index 7d38a2bb6..61d4b7376 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_schema.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_schema.erl @@ -19,12 +19,6 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). --type duration() :: non_neg_integer(). - --typerefl_from_string({duration/0, emqx_schema, to_duration}). - --reflect_type([duration/0]). - %% config schema provides -export([namespace/0, fields/1, desc/1]). @@ -34,7 +28,7 @@ fields(coap) -> [ {heartbeat, sc( - duration(), + emqx_schema:duration_s(), #{ default => <<"30s">>, desc => ?DESC(coap_heartbeat) diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src index 3a715eac4..e9c1f2b4a 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_coap, [ {description, "CoAP Gateway"}, - {vsn, "0.1.8"}, + {vsn, "0.1.9"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index 3201d5dbf..bd403a463 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -100,7 +100,7 @@ init_per_testcase(t_heartbeat, Config) -> OldConf = emqx:get_raw_config([gateway, coap]), {ok, _} = emqx_gateway_conf:update_gateway( coap, - OldConf#{<<"heartbeat">> => <<"800ms">>} + OldConf#{<<"heartbeat">> => <<"1s">>} ), [ {old_conf, OldConf}, @@ -216,8 +216,9 @@ t_heartbeat(Config) -> [], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) ), - - timer:sleep(Heartbeat * 2), + %% The minimum timeout time is 1 second. + %% 1.5 * Heartbeat + 0.5 * Heartbeat(< 1s) = 1.5 * 1 + 1 = 2.5 + timer:sleep(Heartbeat * 2 + 1000), ?assertEqual( [], emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>) diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index c145506c9..93646acbf 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -715,7 +715,7 @@ ensure_keepalive_timer(Interval, Channel) when Interval =< 0 -> Channel; ensure_keepalive_timer(Interval, Channel) -> StatVal = emqx_gateway_conn:keepalive_stats(recv), - Keepalive = emqx_keepalive:init(StatVal, timer:seconds(Interval)), + Keepalive = emqx_keepalive:init(default, StatVal, Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). ensure_timer(Name, Channel = #channel{timers = Timers}) -> @@ -746,7 +746,7 @@ interval(force_close_idle, #channel{conninfo = #{idle_timeout := IdleTimeout}}) interval(force_close, _) -> 15000; interval(keepalive, #channel{keepalive = Keepalive}) -> - emqx_keepalive:info(interval, Keepalive). + emqx_keepalive:info(check_interval, Keepalive). %%-------------------------------------------------------------------- %% Dispatch diff --git a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src index 34fcca216..fe237779b 100644 --- a/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src +++ b/apps/emqx_gateway_exproto/src/emqx_gateway_exproto.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_exproto, [ {description, "ExProto Gateway"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {applications, [kernel, stdlib, grpc, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src index 123b60203..bcb54e0f1 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src +++ b/apps/emqx_gateway_gbt32960/src/emqx_gateway_gbt32960.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_gbt32960, [ {description, "GBT32960 Gateway"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl index 9652290d3..809a79f7d 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -506,7 +506,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(alive_timer, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_timer, #channel{retx_interval = RetxIntv}) -> RetxIntv. diff --git a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src index 8e5157695..8d1e33f74 100644 --- a/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src +++ b/apps/emqx_gateway_jt808/src/emqx_gateway_jt808.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_jt808, [ {description, "JT/T 808 Gateway"}, - {vsn, "0.0.3"}, + {vsn, "0.1.0"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl index 876f623e9..a74214a1c 100644 --- a/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl +++ b/apps/emqx_gateway_jt808/src/emqx_jt808_channel.erl @@ -616,7 +616,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(alive_timer, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_timer, #channel{retx_interval = RetxIntv}) -> RetxIntv. diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 45a1d5da7..585410356 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.2.0"}, + {vsn, "0.2.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index 501308ea0..c9e109c3f 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -430,7 +430,7 @@ ensure_keepalive(Channel = #channel{conninfo = ConnInfo}) -> ensure_keepalive_timer(0, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel) -> - Keepalive = emqx_keepalive:init(round(timer:seconds(Interval))), + Keepalive = emqx_keepalive:init(Interval), ensure_timer(keepalive, Channel#channel{keepalive = Keepalive}). %%-------------------------------------------------------------------- @@ -2245,7 +2245,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> Channel#channel{timers = maps:remove(Name, Timers)}. interval(keepalive, #channel{keepalive = KeepAlive}) -> - emqx_keepalive:info(interval, KeepAlive); + emqx_keepalive:info(check_interval, KeepAlive); interval(retry_delivery, #channel{session = Session}) -> emqx_mqttsn_session:info(retry_interval, Session); interval(expire_awaiting_rel, #channel{session = Session}) -> diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 2c71e9822..9557c3214 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -1109,7 +1109,7 @@ t_keepalive(_Config) -> [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), %% will reset to max keepalive if keepalive > max keepalive #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid), - ?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))), + ?assertMatch({keepalive, _, _, _, 65536500}, element(5, element(9, sys:get_state(Pid)))), {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]), diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index e80f36817..eca281646 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -855,6 +855,15 @@ The default value 1.5 is following the MQTT 5.0 specification. This multiplier i mqtt_keepalive_multiplier.label: """Keep Alive Multiplier""" +mqtt_keepalive_check_interval.desc: +"""The frequency of checking for incoming MQTT packets determines how often the server will check for new MQTT packets. +If a certain amount of time passes without any packets being sent from the client,this time will be added up. +Once the accumulated time exceeds the keepalive interval * the keepalive multiplier, the connection will be terminated. +The default is set to 30 seconds, with a minimum value of 1 second and a maximum value of Interval/2.""" + +mqtt_keepalive_check_interval.label: +"""Keep Alive Check Interval""" + force_gc_bytes.desc: """GC the process after specified number of bytes have passed through."""