Merge pull request #10702 from zhongwencool/keepalive-backoff-rename

feat: deprecated keepalive_backoff, introduce keepalive_multiplier
This commit is contained in:
zhongwencool 2023-05-18 10:00:17 +08:00 committed by GitHub
commit bf5ee41009
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 161 additions and 80 deletions

View File

@ -1202,7 +1202,7 @@ handle_call(
} }
) -> ) ->
ClientId = info(clientid, Channel), ClientId = info(clientid, Channel),
NKeepalive = emqx_keepalive:set(interval, Interval * 1000, KeepAlive), NKeepalive = emqx_keepalive:update(timer:seconds(Interval), KeepAlive),
NConnInfo = maps:put(keepalive, Interval, ConnInfo), NConnInfo = maps:put(keepalive, Interval, ConnInfo),
NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo}, NChannel = Channel#channel{keepalive = NKeepalive, conninfo = NConnInfo},
SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}), SockInfo = maps:get(sockinfo, emqx_cm:get_chan_info(ClientId), #{}),
@ -2034,9 +2034,9 @@ ensure_keepalive_timer(0, Channel) ->
ensure_keepalive_timer(disabled, Channel) -> ensure_keepalive_timer(disabled, Channel) ->
Channel; Channel;
ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) -> ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone}}) ->
Backoff = get_mqtt_conf(Zone, keepalive_backoff), Multiplier = get_mqtt_conf(Zone, keepalive_multiplier),
RecvOct = emqx_pd:get_counter(incoming_bytes), RecvCnt = emqx_pd:get_counter(recv_pkt),
Keepalive = emqx_keepalive:init(RecvOct, round(timer:seconds(Interval) * Backoff)), Keepalive = emqx_keepalive:init(RecvCnt, round(timer:seconds(Interval) * Multiplier)),
ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}).
clear_keepalive(Channel = #channel{timers = Timers}) -> clear_keepalive(Channel = #channel{timers = Timers}) ->

View File

@ -22,7 +22,7 @@
info/1, info/1,
info/2, info/2,
check/2, check/2,
set/3 update/2
]). ]).
-elvis([{elvis_style, no_if_expression, disable}]). -elvis([{elvis_style, no_if_expression, disable}]).
@ -31,66 +31,16 @@
-record(keepalive, { -record(keepalive, {
interval :: pos_integer(), interval :: pos_integer(),
statval :: non_neg_integer(), statval :: non_neg_integer()
repeat :: non_neg_integer()
}). }).
-opaque keepalive() :: #keepalive{}. -opaque keepalive() :: #keepalive{}.
-define(MAX_INTERVAL, 65535000).
%% @doc Init keepalive. %% @doc Init keepalive.
-spec init(Interval :: non_neg_integer()) -> keepalive(). -spec init(Interval :: non_neg_integer()) -> keepalive().
init(Interval) -> init(0, Interval). init(Interval) -> init(0, Interval).
%% @doc Init keepalive.
-spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive().
init(StatVal, Interval) when Interval > 0 ->
#keepalive{
interval = Interval,
statval = StatVal,
repeat = 0
}.
%% @doc Get Info of the keepalive.
-spec info(keepalive()) -> emqx_types:infos().
info(#keepalive{
interval = Interval,
statval = StatVal,
repeat = Repeat
}) ->
#{
interval => Interval,
statval => StatVal,
repeat => Repeat
}.
-spec info(interval | statval | repeat, keepalive()) ->
non_neg_integer().
info(interval, #keepalive{interval = Interval}) ->
Interval;
info(statval, #keepalive{statval = StatVal}) ->
StatVal;
info(repeat, #keepalive{repeat = Repeat}) ->
Repeat.
%% @doc Check keepalive.
-spec check(non_neg_integer(), keepalive()) ->
{ok, keepalive()} | {error, timeout}.
check(
NewVal,
KeepAlive = #keepalive{
statval = OldVal,
repeat = Repeat
}
) ->
if
NewVal =/= OldVal ->
{ok, KeepAlive#keepalive{statval = NewVal, repeat = 0}};
Repeat < 1 ->
{ok, KeepAlive#keepalive{repeat = Repeat + 1}};
true ->
{error, timeout}
end.
%% from mqtt-v3.1.1 specific %% from mqtt-v3.1.1 specific
%% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism. %% A Keep Alive value of zero (0) has the effect of turning off the keep alive mechanism.
%% This means that, in this case, the Server is not required %% This means that, in this case, the Server is not required
@ -102,7 +52,43 @@ check(
%%The actual value of the Keep Alive is application specific; %%The actual value of the Keep Alive is application specific;
%% typically this is a few minutes. %% typically this is a few minutes.
%% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds. %% The maximum value is (65535s) 18 hours 12 minutes and 15 seconds.
%% @doc Update keepalive's interval %% @doc Init keepalive.
-spec set(interval, non_neg_integer(), keepalive()) -> keepalive(). -spec init(StatVal :: non_neg_integer(), Interval :: non_neg_integer()) -> keepalive() | undefined.
set(interval, Interval, KeepAlive) when Interval >= 0 andalso Interval =< 65535000 -> init(StatVal, Interval) when Interval > 0 andalso Interval =< ?MAX_INTERVAL ->
KeepAlive#keepalive{interval = Interval}. #keepalive{interval = Interval, statval = StatVal};
init(_, 0) ->
undefined;
init(StatVal, Interval) when Interval > ?MAX_INTERVAL -> init(StatVal, ?MAX_INTERVAL).
%% @doc Get Info of the keepalive.
-spec info(keepalive()) -> emqx_types:infos().
info(#keepalive{
interval = Interval,
statval = StatVal
}) ->
#{
interval => Interval,
statval => StatVal
}.
-spec info(interval | statval, keepalive()) ->
non_neg_integer().
info(interval, #keepalive{interval = Interval}) ->
Interval;
info(statval, #keepalive{statval = StatVal}) ->
StatVal;
info(interval, undefined) ->
0.
%% @doc Check keepalive.
-spec check(non_neg_integer(), keepalive()) ->
{ok, keepalive()} | {error, timeout}.
check(Val, #keepalive{statval = Val}) -> {error, timeout};
check(Val, KeepAlive) -> {ok, KeepAlive#keepalive{statval = Val}}.
%% @doc Update keepalive.
%% The statval of the previous keepalive will be used,
%% and normal checks will begin from the next cycle.
-spec update(non_neg_integer(), keepalive() | undefined) -> keepalive() | undefined.
update(Interval, undefined) -> init(0, Interval);
update(Interval, #keepalive{statval = StatVal}) -> init(StatVal, Interval).

View File

@ -77,6 +77,7 @@
validate_heap_size/1, validate_heap_size/1,
user_lookup_fun_tr/2, user_lookup_fun_tr/2,
validate_alarm_actions/1, validate_alarm_actions/1,
validate_keepalive_multiplier/1,
non_empty_string/1, non_empty_string/1,
validations/0, validations/0,
naive_env_interpolation/1 naive_env_interpolation/1
@ -109,7 +110,8 @@
servers_validator/2, servers_validator/2,
servers_sc/2, servers_sc/2,
convert_servers/1, convert_servers/1,
convert_servers/2 convert_servers/2,
mqtt_converter/2
]). ]).
%% tombstone types %% tombstone types
@ -150,6 +152,8 @@
-define(BIT(Bits), (1 bsl (Bits))). -define(BIT(Bits), (1 bsl (Bits))).
-define(MAX_UINT(Bits), (?BIT(Bits) - 1)). -define(MAX_UINT(Bits), (?BIT(Bits) - 1)).
-define(DEFAULT_MULTIPLIER, 1.5).
-define(DEFAULT_BACKOFF, 0.75).
namespace() -> broker. namespace() -> broker.
@ -172,6 +176,7 @@ roots(high) ->
ref("mqtt"), ref("mqtt"),
#{ #{
desc => ?DESC(mqtt), desc => ?DESC(mqtt),
converter => fun ?MODULE:mqtt_converter/2,
importance => ?IMPORTANCE_MEDIUM importance => ?IMPORTANCE_MEDIUM
} }
)}, )},
@ -522,8 +527,19 @@ fields("mqtt") ->
sc( sc(
number(), number(),
#{ #{
default => 0.75, default => ?DEFAULT_BACKOFF,
desc => ?DESC(mqtt_keepalive_backoff) %% Must add required => false, zone schema has no default.
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"keepalive_multiplier",
sc(
number(),
#{
default => ?DEFAULT_MULTIPLIER,
validator => fun ?MODULE:validate_keepalive_multiplier/1,
desc => ?DESC(mqtt_keepalive_multiplier)
} }
)}, )},
{"max_subscriptions", {"max_subscriptions",
@ -2744,6 +2760,13 @@ validate_heap_size(Siz) when is_integer(Siz) ->
validate_heap_size(_SizStr) -> validate_heap_size(_SizStr) ->
{error, invalid_heap_size}. {error, invalid_heap_size}.
validate_keepalive_multiplier(Multiplier) when
is_number(Multiplier) andalso Multiplier >= 1.0 andalso Multiplier =< 65535.0
->
ok;
validate_keepalive_multiplier(_Multiplier) ->
{error, #{reason => keepalive_multiplier_out_of_range, min => 1, max => 65535}}.
validate_alarm_actions(Actions) -> validate_alarm_actions(Actions) ->
UnSupported = lists:filter( UnSupported = lists:filter(
fun(Action) -> Action =/= log andalso Action =/= publish end, Actions fun(Action) -> Action =/= log andalso Action =/= publish end, Actions
@ -3385,3 +3408,20 @@ ensure_default_listener(Map, ListenerType) ->
cert_file(_File, client) -> undefined; cert_file(_File, client) -> undefined;
cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])). cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])).
mqtt_converter(#{<<"keepalive_multiplier">> := Multi} = Mqtt, _Opts) ->
case round(Multi * 100) =:= round(?DEFAULT_MULTIPLIER * 100) of
false ->
%% Multiplier is provided, and it's not default value
Mqtt;
true ->
%% Multiplier is default value, fallback to use Backoff value
%% Backoff default value was half of Multiplier default value
%% so there is no need to compare Backoff with its default.
Backoff = maps:get(<<"keepalive_backoff">>, Mqtt, ?DEFAULT_BACKOFF),
Mqtt#{<<"keepalive_multiplier">> => Backoff * 2}
end;
mqtt_converter(#{<<"keepalive_backoff">> := Backoff} = Mqtt, _Opts) ->
Mqtt#{<<"keepalive_multiplier">> => Backoff * 2};
mqtt_converter(Mqtt, _Opts) ->
Mqtt.

View File

@ -27,20 +27,14 @@ t_check(_) ->
Keepalive = emqx_keepalive:init(60), Keepalive = emqx_keepalive:init(60),
?assertEqual(60, emqx_keepalive:info(interval, Keepalive)), ?assertEqual(60, emqx_keepalive:info(interval, Keepalive)),
?assertEqual(0, emqx_keepalive:info(statval, Keepalive)), ?assertEqual(0, emqx_keepalive:info(statval, Keepalive)),
?assertEqual(0, emqx_keepalive:info(repeat, Keepalive)),
Info = emqx_keepalive:info(Keepalive), Info = emqx_keepalive:info(Keepalive),
?assertEqual( ?assertEqual(
#{ #{
interval => 60, interval => 60,
statval => 0, statval => 0
repeat => 0
}, },
Info Info
), ),
{ok, Keepalive1} = emqx_keepalive:check(1, Keepalive), {ok, Keepalive1} = emqx_keepalive:check(1, Keepalive),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)), ?assertEqual(1, emqx_keepalive:info(statval, Keepalive1)),
?assertEqual(0, emqx_keepalive:info(repeat, Keepalive1)), ?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive1)).
{ok, Keepalive2} = emqx_keepalive:check(1, Keepalive1),
?assertEqual(1, emqx_keepalive:info(statval, Keepalive2)),
?assertEqual(1, emqx_keepalive:info(repeat, Keepalive2)),
?assertEqual({error, timeout}, emqx_keepalive:check(1, Keepalive2)).

View File

@ -655,6 +655,43 @@ password_converter_test() ->
?assertThrow("must_quote", emqx_schema:password_converter(foobar, #{})), ?assertThrow("must_quote", emqx_schema:password_converter(foobar, #{})),
ok. ok.
-define(MQTT(B, M), #{<<"keepalive_backoff">> => B, <<"keepalive_multiplier">> => M}).
keepalive_convert_test() ->
?assertEqual(undefined, emqx_schema:mqtt_converter(undefined, #{})),
DefaultBackoff = 0.75,
DefaultMultiplier = 1.5,
Default = ?MQTT(DefaultBackoff, DefaultMultiplier),
?assertEqual(Default, emqx_schema:mqtt_converter(Default, #{})),
?assertEqual(?MQTT(1.5, 3), emqx_schema:mqtt_converter(?MQTT(1.5, 3), #{})),
?assertEqual(
?MQTT(DefaultBackoff, 3), emqx_schema:mqtt_converter(?MQTT(DefaultBackoff, 3), #{})
),
?assertEqual(?MQTT(1, 2), emqx_schema:mqtt_converter(?MQTT(1, DefaultMultiplier), #{})),
?assertEqual(?MQTT(1.5, 3), emqx_schema:mqtt_converter(?MQTT(1.5, 3), #{})),
?assertEqual(#{}, emqx_schema:mqtt_converter(#{}, #{})),
?assertEqual(
#{<<"keepalive_backoff">> => 1.5, <<"keepalive_multiplier">> => 3.0},
emqx_schema:mqtt_converter(#{<<"keepalive_backoff">> => 1.5}, #{})
),
?assertEqual(
#{<<"keepalive_multiplier">> => 5.0},
emqx_schema:mqtt_converter(#{<<"keepalive_multiplier">> => 5.0}, #{})
),
?assertEqual(
#{
<<"keepalive_backoff">> => DefaultBackoff,
<<"keepalive_multiplier">> => DefaultMultiplier
},
emqx_schema:mqtt_converter(#{<<"keepalive_backoff">> => DefaultBackoff}, #{})
),
?assertEqual(
#{<<"keepalive_multiplier">> => DefaultMultiplier},
emqx_schema:mqtt_converter(#{<<"keepalive_multiplier">> => DefaultMultiplier}, #{})
),
ok.
url_type_test_() -> url_type_test_() ->
[ [
?_assertEqual( ?_assertEqual(

View File

@ -546,7 +546,8 @@ fields(authz_cache) ->
]; ];
fields(keepalive) -> fields(keepalive) ->
[ [
{interval, hoconsc:mk(integer(), #{desc => <<"Keepalive time, with the unit of second">>})} {interval,
hoconsc:mk(range(0, 65535), #{desc => <<"Keepalive time, with the unit of second">>})}
]; ];
fields(subscribe) -> fields(subscribe) ->
[ [

View File

@ -244,13 +244,31 @@ t_keepalive(_Config) ->
Body = #{interval => 11}, Body = #{interval => 11},
{error, {"HTTP/1.1", 404, "Not Found"}} = {error, {"HTTP/1.1", 404, "Not Found"}} =
emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
{ok, C1} = emqtt:start_link(#{username => Username, clientid => ClientId}), %% 65535 is the max value of keepalive
MaxKeepalive = 65535,
InitKeepalive = round(MaxKeepalive / 1.5 + 1),
{ok, C1} = emqtt:start_link(#{
username => Username, clientid => ClientId, keepalive => InitKeepalive
}),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
[Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
%% will reset to max keepalive if keepalive > max keepalive
#{conninfo := #{keepalive := InitKeepalive}} = emqx_connection:info(Pid),
?assertMatch({keepalive, 65535000, _}, element(5, element(9, sys:get_state(Pid)))),
{ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body), {ok, NewClient} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body),
#{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]), #{<<"keepalive">> := 11} = emqx_utils_json:decode(NewClient, [return_maps]),
[Pid] = emqx_cm:lookup_channels(list_to_binary(ClientId)),
#{conninfo := #{keepalive := Keepalive}} = emqx_connection:info(Pid), #{conninfo := #{keepalive := Keepalive}} = emqx_connection:info(Pid),
?assertEqual(11, Keepalive), ?assertEqual(11, Keepalive),
%% Disable keepalive
Body1 = #{interval => 0},
{ok, NewClient1} = emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body1),
#{<<"keepalive">> := 0} = emqx_utils_json:decode(NewClient1, [return_maps]),
?assertMatch(#{conninfo := #{keepalive := 0}}, emqx_connection:info(Pid)),
%% Maximal keepalive
Body2 = #{interval => 65536},
{error, {"HTTP/1.1", 400, _}} =
emqx_mgmt_api_test_util:request_api(put, Path, <<"">>, AuthHeader, Body2),
emqtt:disconnect(C1), emqtt:disconnect(C1),
ok. ok.

View File

@ -0,0 +1,4 @@
Introduce a more straightforward configuration option `keepalive_multiplier` and
deprecate the old `keepalive_backoff` configuration.
After this enhancement, EMQX checks the client's keepalive timeout status
period by multiplying the "Client Requested Keepalive Interval" with `keepalive_multiplier`.

View File

@ -799,7 +799,7 @@ fields_tcp_opts_high_watermark.desc:
by the VM socket implementation reaches this limit.""" by the VM socket implementation reaches this limit."""
fields_tcp_opts_high_watermark.label: fields_tcp_opts_high_watermark.label:
"""TCP 高水位线""" """TCP high watermark"""
fields_mqtt_quic_listener_stateless_operation_expiration_ms.desc: fields_mqtt_quic_listener_stateless_operation_expiration_ms.desc:
"""The time limit between operations for the same endpoint, in milliseconds. Default: 100""" """The time limit between operations for the same endpoint, in milliseconds. Default: 100"""
@ -885,11 +885,12 @@ and an MQTT message is published to the system topic <code>$SYS/sysmon/long_sche
sysmon_vm_long_schedule.label: sysmon_vm_long_schedule.label:
"""Enable Long Schedule monitoring.""" """Enable Long Schedule monitoring."""
mqtt_keepalive_backoff.desc: mqtt_keepalive_multiplier.desc:
"""The coefficient EMQX uses to confirm whether the keep alive duration of the client expires. Formula: Keep Alive * Backoff * 2""" """Keep-Alive Timeout = Keep-Alive interval × Keep-Alive Multiplier.
The default value 1.5 is following the MQTT 5.0 specification. This multiplier is adjustable, providing system administrators flexibility for tailoring to their specific needs. For instance, if a client's 10-second Keep-Alive interval PINGREQ gets delayed by an extra 10 seconds, changing the multiplier to 2 lets EMQX tolerate this delay."""
mqtt_keepalive_backoff.label: mqtt_keepalive_multiplier.label:
"""Keep Alive Backoff""" """Keep Alive Multiplier"""
force_gc_bytes.desc: force_gc_bytes.desc:
"""GC the process after specified number of bytes have passed through.""" """GC the process after specified number of bytes have passed through."""