diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 8efe02ea6..a2d88c647 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1202,7 +1202,7 @@ handle_call( } ) -> ClientId = info(clientid, Channel), - NKeepalive = emqx_keepalive:set(interval, Interval * 1000, KeepAlive), + NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive), NConnInfo = maps:put(keepalive, Interval, ConnInfo), NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo}, SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}), @@ -2034,9 +2034,9 @@ ensure_keepalive_timer(0, Channel) -> ensure_keepalive_timer(disabled, Channel) -> Channel; ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> - Backoff = get_mqtt_conf(Zone, keepalive_backoff), - RecvOct = emqx_pd:get_counter(incoming_bytes), - Keepalive = emqx_keepalive:init(RecvOct, round(timer:seconds(Interval) * Backoff)), + Multiplier = get_mqtt_conf(Zone, keepalive_multiplier), + RecvCnt = emqx_pd:get_counter(recv_pkt), + Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). clear_keepalive(Channel = #channel{timers = Timers}) -> diff --git a/apps/emqx/src/emqx_keepalive.erl b/apps/emqx/src/emqx_keepalive.erl index 9ba11e23f..c0a1c7657 100644 --- a/apps/emqx/src/emqx_keepalive.erl +++ b/apps/emqx/src/emqx_keepalive.erl @@ -22,7 +22,7 @@ info/1, info/2, check/2, - set/3 + update/2 ]). -elvis([{elvis_style, no_if_expression, disable}]). @@ -31,66 +31,16 @@ -record(keepalive, { interval :: pos_integer(), - statval :: non_neg_integer(), - repeat :: non_neg_integer() + statval :: non_neg_integer() }). -opaque keepalive() :: #keepalive{}. +-define(MAX_INTERVAL, 65535000). %% @doc Init keepalive. -spec init(Interval :: non_neg_integer()) -> keepalive(). init(Interval) -> init(0, Interval). -%% @doc Init keepalive. --spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive(). -init(StatVal, Interval) when Interval > 0 -> - #keepalive{ - interval = Interval, - statval = StatVal, - repeat = 0 - }. - -%% @doc Get Info of the keepalive. --spec info(keepalive()) -> emqx_types:infos(). -info(#keepalive{ - interval = Interval, - statval = StatVal, - repeat = Repeat -}) -> - #{ - interval => Interval, - statval => StatVal, - repeat => Repeat - }. - --spec info(interval | statval | repeat, keepalive()) -> - non_neg_integer(). -info(interval, #keepalive{interval = Interval}) -> - Interval; -info(statval, #keepalive{statval = StatVal}) -> - StatVal; -info(repeat, #keepalive{repeat = Repeat}) -> - Repeat. - -%% @doc Check keepalive. --spec check(non_neg_integer(), keepalive()) -> - {ok, keepalive()} | {error, timeout}. -check( - NewVal, - KeepAlive = #keepalive{ - statval = OldVal, - repeat = Repeat - } -) -> - if - NewVal =/= OldVal -> - {ok, KeepAlive#keepalive{statval = NewVal, repeat = 0}}; - Repeat < 1 -> - {ok, KeepAlive#keepalive{repeat = Repeat + 1}}; - true -> - {error, timeout} - end. - %% from mqtt-v3.1.1 specific %% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism. %% This means that, in this case, the Server is not required @@ -102,7 +52,43 @@ check( %%The actual value of the Keep Alive is application specific; %% typically this is a few minutes. %% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds. -%% @doc Update keepalive's interval --spec set(interval, non_neg_integer(), keepalive()) -> keepalive(). -set(interval, Interval, KeepAlive) when Interval >= 0 andalso Interval =< 65535000 -> - KeepAlive#keepalive{interval = Interval}. +%% @doc Init keepalive. +-spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined. +init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL -> + #keepalive{interval = Interval, statval = StatVal}; +init(_, 0) -> + undefined; +init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL). + +%% @doc Get Info of the keepalive. +-spec info(keepalive()) -> emqx_types:infos(). +info(#keepalive{ + interval = Interval, + statval = StatVal +}) -> + #{ + interval => Interval, + statval => StatVal + }. + +-spec info(interval | statval, keepalive()) -> + non_neg_integer(). +info(interval, #keepalive{interval = Interval}) -> + Interval; +info(statval, #keepalive{statval = StatVal}) -> + StatVal; +info(interval, undefined) -> + 0. + +%% @doc Check keepalive. +-spec check(non_neg_integer(), keepalive()) -> + {ok, keepalive()} | {error, timeout}. +check(Val, #keepalive{statval = Val}) -> {error, timeout}; +check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}. + +%% @doc Update keepalive. +%% The statval of the previous keepalive will be used, +%% and normal checks will begin from the next cycle. +-spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined. +update(Interval, undefined) -> init(0, Interval); +update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 5a66ad5a0..0e9a29f3d 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -77,6 +77,7 @@ validate_heap_size/1, user_lookup_fun_tr/2, validate_alarm_actions/1, + validate_keepalive_multiplier/1, non_empty_string/1, validations/0, naive_env_interpolation/1 @@ -109,7 +110,8 @@ servers_validator/2, servers_sc/2, convert_servers/1, - convert_servers/2 + convert_servers/2, + mqtt_converter/2 ]). %% tombstone types @@ -150,6 +152,8 @@ -define(BIT(Bits), (1 bsl (Bits))). -define(MAX_UINT(Bits), (?BIT(Bits) - 1)). +-define(DEFAULT_MULTIPLIER, 1.5). +-define(DEFAULT_BACKOFF, 0.75). namespace() -> broker. @@ -172,6 +176,7 @@ roots(high) -> ref("mqtt"), #{ desc => ?DESC(mqtt), + converter => fun ?MODULE:mqtt_converter/2, importance => ?IMPORTANCE_MEDIUM } )}, @@ -522,8 +527,19 @@ fields("mqtt") -> sc( number(), #{ - default => 0.75, - desc => ?DESC(mqtt_keepalive_backoff) + default => ?DEFAULT_BACKOFF, + %% Must add required => false, zone schema has no default. + required => false, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"keepalive_multiplier", + sc( + number(), + #{ + default => ?DEFAULT_MULTIPLIER, + validator => fun ?MODULE:validate_keepalive_multiplier/1, + desc => ?DESC(mqtt_keepalive_multiplier) } )}, {"max_subscriptions", @@ -2744,6 +2760,13 @@ validate_heap_size(Siz) when is_integer(Siz) -> validate_heap_size(_SizStr) -> {error, invalid_heap_size}. +validate_keepalive_multiplier(Multiplier) when + is_number(Multiplier) andalso Multiplier >= 1.0 andalso Multiplier =< 65535.0 +-> + ok; +validate_keepalive_multiplier(_Multiplier) -> + {error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}. + validate_alarm_actions(Actions) -> UnSupported = lists:filter( fun(Action) -> Action =/= log andalso Action =/= publish end, Actions @@ -3385,3 +3408,20 @@ ensure_default_listener(Map, ListenerType) -> cert_file(_File, client) -> undefined; cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])). + +mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) -> + case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of + false -> + %% Multiplier is provided, and it's not default value + Mqtt; + true -> + %% Multiplier is default value, fallback to use Backoff value + %% Backoff default value was half of Multiplier default value + %% so there is no need to compare Backoff with its default. + Backoff = maps:get(<<"keepalive_backoff">>, Mqtt, ?DEFAULT_BACKOFF), + Mqtt#{<<"keepalive_multiplier">> => Backoff * 2} + end; +mqtt_converter(#{<<"keepalive_backoff">> := Backoff} = Mqtt, _Opts) -> + Mqtt#{<<"keepalive_multiplier">> => Backoff * 2}; +mqtt_converter(Mqtt, _Opts) -> + Mqtt. diff --git a/apps/emqx/test/emqx_keepalive_SUITE.erl b/apps/emqx/test/emqx_keepalive_SUITE.erl index dce55409e..480beeaa4 100644 --- a/apps/emqx/test/emqx_keepalive_SUITE.erl +++ b/apps/emqx/test/emqx_keepalive_SUITE.erl @@ -27,20 +27,14 @@ t_check(_) -> Keepalive = emqx_keepalive:init(60), ?assertEqual(60, emqx_keepalive:info(interval, Keepalive)), ?assertEqual(0, emqx_keepalive:info(statval, Keepalive)), - ?assertEqual(0, emqx_keepalive:info(repeat, Keepalive)), Info = emqx_keepalive:info(Keepalive), ?assertEqual( #{ interval => 60, - statval => 0, - repeat => 0 + statval => 0 }, Info ), {ok, Keepalive1} = emqx_keepalive:check(1, Keepalive), ?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)), - ?assertEqual(0, emqx_keepalive:info(repeat, Keepalive1)), - {ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1), - ?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)), - ?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)), - ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)). + ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)). diff --git a/apps/emqx/test/emqx_schema_tests.erl b/apps/emqx/test/emqx_schema_tests.erl index 81991f26e..3dcfa331e 100644 --- a/apps/emqx/test/emqx_schema_tests.erl +++ b/apps/emqx/test/emqx_schema_tests.erl @@ -655,6 +655,43 @@ password_converter_test() -> ?assertThrow("must_quote", emqx_schema:password_converter(foobar, #{})), ok. +-define(MQTT(B, M), #{<<"keepalive_backoff">> => B, <<"keepalive_multiplier">> => M}). + +keepalive_convert_test() -> + ?assertEqual(undefined, emqx_schema:mqtt_converter(undefined, #{})), + DefaultBackoff = 0.75, + DefaultMultiplier = 1.5, + Default = ?MQTT(DefaultBackoff, DefaultMultiplier), + ?assertEqual(Default, emqx_schema:mqtt_converter(Default, #{})), + ?assertEqual(?MQTT(1.5, 3), emqx_schema:mqtt_converter(?MQTT(1.5, 3), #{})), + ?assertEqual( + ?MQTT(DefaultBackoff, 3), emqx_schema:mqtt_converter(?MQTT(DefaultBackoff, 3), #{}) + ), + ?assertEqual(?MQTT(1, 2), emqx_schema:mqtt_converter(?MQTT(1, DefaultMultiplier), #{})), + ?assertEqual(?MQTT(1.5, 3), emqx_schema:mqtt_converter(?MQTT(1.5, 3), #{})), + + ?assertEqual(#{}, emqx_schema:mqtt_converter(#{}, #{})), + ?assertEqual( + #{<<"keepalive_backoff">> => 1.5, <<"keepalive_multiplier">> => 3.0}, + emqx_schema:mqtt_converter(#{<<"keepalive_backoff">> => 1.5}, #{}) + ), + ?assertEqual( + #{<<"keepalive_multiplier">> => 5.0}, + emqx_schema:mqtt_converter(#{<<"keepalive_multiplier">> => 5.0}, #{}) + ), + ?assertEqual( + #{ + <<"keepalive_backoff">> => DefaultBackoff, + <<"keepalive_multiplier">> => DefaultMultiplier + }, + emqx_schema:mqtt_converter(#{<<"keepalive_backoff">> => DefaultBackoff}, #{}) + ), + ?assertEqual( + #{<<"keepalive_multiplier">> => DefaultMultiplier}, + emqx_schema:mqtt_converter(#{<<"keepalive_multiplier">> => DefaultMultiplier}, #{}) + ), + ok. + url_type_test_() -> [ ?_assertEqual( diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 681c851bf..846a1d466 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -546,7 +546,8 @@ fields(authz_cache) -> ]; fields(keepalive) -> [ - {interval, hoconsc:mk(integer(), #{desc => <<"Keepalive time, with the unit of second">>})} + {interval, + hoconsc:mk(range(0, 65535), #{desc => <<"Keepalive time, with the unit of second">>})} ]; fields(subscribe) -> [ diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 6d7733b22..89838c346 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -244,13 +244,31 @@ t_keepalive(_Config) -> Body = #{interval => 11}, {error, {"HTTP/1.1", 404, "Not Found"}} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), - {ok, C1} = emqtt:start_link(#{username => Username, clientid => ClientId}), + %% 65535 is the max value of keepalive + MaxKeepalive = 65535, + InitKeepalive = round(MaxKeepalive / 1.5 + 1), + {ok, C1} = emqtt:start_link(#{ + username => Username, clientid => ClientId, keepalive => InitKeepalive + }), {ok, _} = emqtt:connect(C1), + [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), + %% will reset to max keepalive if keepalive > max keepalive + #{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid), + ?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))), + {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]), - [Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)), #{conninfo := #{keepalive := Keepalive}} = emqx_connection:info(Pid), ?assertEqual(11, Keepalive), + %% Disable keepalive + Body1 = #{interval => 0}, + {ok, NewClient1} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body1), + #{<<"keepalive">> := 0} = emqx_utils_json:decode(NewClient1, [return_maps]), + ?assertMatch(#{conninfo := #{keepalive := 0}}, emqx_connection:info(Pid)), + %% Maximal keepalive + Body2 = #{interval => 65536}, + {error, {"HTTP/1.1", 400, _}} = + emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body2), emqtt:disconnect(C1), ok. diff --git a/changes/ce/feat-10702.en.md b/changes/ce/feat-10702.en.md new file mode 100644 index 000000000..936103848 --- /dev/null +++ b/changes/ce/feat-10702.en.md @@ -0,0 +1,4 @@ +Introduce a more straightforward configuration option `keepalive_multiplier` and +deprecate the old `keepalive_backoff` configuration. +After this enhancement, EMQX checks the client's keepalive timeout status +period by multiplying the "Client Requested Keepalive Interval" with `keepalive_multiplier`. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index ac8b7e8a4..ad63b4ba9 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -799,7 +799,7 @@ fields_tcp_opts_high_watermark.desc: by the VM socket implementation reaches this limit.""" fields_tcp_opts_high_watermark.label: -"""TCP 高水位线""" +"""TCP high watermark""" fields_mqtt_quic_listener_stateless_operation_expiration_ms.desc: """The time limit between operations for the same endpoint, in milliseconds. Default: 100""" @@ -885,11 +885,12 @@ and an MQTT message is published to the system topic $SYS/sysmon/long_sche sysmon_vm_long_schedule.label: """Enable Long Schedule monitoring.""" -mqtt_keepalive_backoff.desc: -"""The coefficient EMQX uses to confirm whether the keep alive duration of the client expires. Formula: Keep Alive * Backoff * 2""" +mqtt_keepalive_multiplier.desc: +"""Keep-Alive Timeout = Keep-Alive interval × Keep-Alive Multiplier. +The default value 1.5 is following the MQTT 5.0 specification. This multiplier is adjustable, providing system administrators flexibility for tailoring to their specific needs. For instance, if a client's 10-second Keep-Alive interval PINGREQ gets delayed by an extra 10 seconds, changing the multiplier to 2 lets EMQX tolerate this delay.""" -mqtt_keepalive_backoff.label: -"""Keep Alive Backoff""" +mqtt_keepalive_multiplier.label: +"""Keep Alive Multiplier""" force_gc_bytes.desc: """GC the process after specified number of bytes have passed through."""