Merge pull request #10933 from paulozulato/feat-configurable-tcp-keepalive

feat(connection): configurable TCP keepalive
This commit is contained in:
Paulo Zulato 2023-06-07 11:30:43 -03:00 committed by GitHub
commit 63d42091e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 125 additions and 21 deletions

View File

@ -49,7 +49,7 @@
-export([
async_set_keepalive/3,
async_set_keepalive/4,
async_set_keepalive/5,
async_set_socket_options/2
]).
@ -273,16 +273,30 @@ stats(#state{
%% NOTE: This API sets TCP socket options, which has nothing to do with
%% the MQTT layer's keepalive (PINGREQ and PINGRESP).
async_set_keepalive(Idle, Interval, Probes) ->
async_set_keepalive(self(), Idle, Interval, Probes).
async_set_keepalive(os:type(), self(), Idle, Interval, Probes).
async_set_keepalive(Pid, Idle, Interval, Probes) ->
async_set_keepalive({unix, linux}, Pid, Idle, Interval, Probes) ->
Options = [
{keepalive, true},
{raw, 6, 4, <<Idle:32/native>>},
{raw, 6, 5, <<Interval:32/native>>},
{raw, 6, 6, <<Probes:32/native>>}
],
async_set_socket_options(Pid, Options).
async_set_socket_options(Pid, Options);
async_set_keepalive({unix, darwin}, Pid, Idle, Interval, Probes) ->
Options = [
{keepalive, true},
{raw, 6, 16#10, <<Idle:32/native>>},
{raw, 6, 16#101, <<Interval:32/native>>},
{raw, 6, 16#102, <<Probes:32/native>>}
],
async_set_socket_options(Pid, Options);
async_set_keepalive(OS, _Pid, _Idle, _Interval, _Probes) ->
?SLOG(warning, #{
msg => "Unsupported operation: set TCP keepalive",
os => OS
}),
ok.
%% @doc Set custom socket options.
%% This API is made async because the call might be originated from
@ -353,6 +367,9 @@ init_state(
false -> disabled
end,
IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),
set_tcp_keepalive(Listener),
IdleTimer = start_timer(IdleTimeout, idle_timeout),
#state{
transport = Transport,
@ -948,8 +965,15 @@ handle_cast(
}
) ->
case Transport:setopts(Socket, Opts) of
ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts});
Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err})
ok ->
?tp(debug, "custom_socket_options_successfully", #{opts => Opts});
{error, einval} ->
%% socket is already closed, ignore this error
?tp(debug, "socket already closed", #{reason => socket_already_closed}),
ok;
Err ->
%% other errors
?tp(error, "failed_to_set_custom_socket_option", #{reason => Err})
end,
State;
handle_cast(Req, State) ->
@ -1199,6 +1223,19 @@ inc_counter(Key, Inc) ->
_ = emqx_pd:inc_counter(Key, Inc),
ok.
set_tcp_keepalive({quic, _Listener}) ->
ok;
set_tcp_keepalive({Type, Id}) ->
Conf = emqx_config:get_listener_conf(Type, Id, [tcp_options, keepalive], <<"none">>),
case iolist_to_binary(Conf) of
<<"none">> ->
ok;
Value ->
%% the value is already validated by schema, so we do not validate it again.
{Idle, Interval, Probes} = emqx_schema:parse_tcp_keepalive(Value),
async_set_keepalive(Idle, Interval, Probes)
end.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------

View File

@ -786,7 +786,7 @@ ssl_opts(Opts) ->
tcp_opts(Opts) ->
maps:to_list(
maps:without(
[active_n],
[active_n, keepalive],
maps:get(tcp_options, Opts, #{})
)
).

View File

@ -95,7 +95,9 @@
non_empty_string/1,
validations/0,
naive_env_interpolation/1,
validate_server_ssl_opts/1
validate_server_ssl_opts/1,
validate_tcp_keepalive/1,
parse_tcp_keepalive/1
]).
-export([qos/0]).
@ -1389,6 +1391,15 @@ fields("tcp_opts") ->
default => true,
desc => ?DESC(fields_tcp_opts_reuseaddr)
}
)},
{"keepalive",
sc(
string(),
#{
default => <<"none">>,
desc => ?DESC(fields_tcp_opts_keepalive),
validator => fun validate_tcp_keepalive/1
}
)}
];
fields("listener_ssl_opts") ->
@ -2842,6 +2853,44 @@ validate_alarm_actions(Actions) ->
Error -> {error, Error}
end.
validate_tcp_keepalive(Value) ->
case iolist_to_binary(Value) of
<<"none">> ->
ok;
_ ->
_ = parse_tcp_keepalive(Value),
ok
end.
%% @doc This function is used as value validator and also run-time parser.
parse_tcp_keepalive(Str) ->
try
[Idle, Interval, Probes] = binary:split(iolist_to_binary(Str), <<",">>, [global]),
%% use 10 times the Linux defaults as range limit
IdleInt = parse_ka_int(Idle, "Idle", 1, 7200_0),
IntervalInt = parse_ka_int(Interval, "Interval", 1, 75_0),
ProbesInt = parse_ka_int(Probes, "Probes", 1, 9_0),
{IdleInt, IntervalInt, ProbesInt}
catch
error:_ ->
throw(#{
reason => "Not comma separated positive integers of 'Idle,Interval,Probes' format",
value => Str
})
end.
parse_ka_int(Bin, Name, Min, Max) ->
I = binary_to_integer(string:trim(Bin)),
case I >= Min andalso I =< Max of
true ->
I;
false ->
Msg = io_lib:format("TCP-Keepalive '~s' value must be in the rage of [~p, ~p].", [
Name, Min, Max
]),
throw(#{reason => lists:flatten(Msg), value => I})
end.
user_lookup_fun_tr(Lookup, #{make_serializable := true}) ->
fmt_user_lookup_fun(Lookup);
user_lookup_fun_tr(Lookup, _) ->

View File

@ -219,13 +219,15 @@ t_async_set_keepalive('end', _Config) ->
t_async_set_keepalive(_) ->
case os:type() of
{unix, darwin} ->
%% Mac OSX don't support the feature
ok;
do_async_set_keepalive(16#10, 16#101, 16#102);
{unix, linux} ->
do_async_set_keepalive(4, 5, 6);
_ ->
do_async_set_keepalive()
%% don't support the feature on other OS
ok
end.
do_async_set_keepalive() ->
do_async_set_keepalive(OptKeepIdle, OptKeepInterval, OptKeepCount) ->
ClientID = <<"client-tcp-keepalive">>,
{ok, Client} = emqtt:start_link([
{host, "localhost"},
@ -247,19 +249,19 @@ do_async_set_keepalive() ->
Transport = maps:get(transport, State),
Socket = maps:get(socket, State),
?assert(is_port(Socket)),
Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}],
Opts = [{raw, 6, OptKeepIdle, 4}, {raw, 6, OptKeepInterval, 4}, {raw, 6, OptKeepCount, 4}],
{ok, [
{raw, 6, 4, <<Idle:32/native>>},
{raw, 6, 5, <<Interval:32/native>>},
{raw, 6, 6, <<Probes:32/native>>}
{raw, 6, OptKeepIdle, <<Idle:32/native>>},
{raw, 6, OptKeepInterval, <<Interval:32/native>>},
{raw, 6, OptKeepCount, <<Probes:32/native>>}
]} = Transport:getopts(Socket, Opts),
ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]),
emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1),
emqx_connection:async_set_keepalive(os:type(), Pid, Idle + 1, Interval + 1, Probes + 1),
{ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000),
{ok, [
{raw, 6, 4, <<NewIdle:32/native>>},
{raw, 6, 5, <<NewInterval:32/native>>},
{raw, 6, 6, <<NewProbes:32/native>>}
{raw, 6, OptKeepIdle, <<NewIdle:32/native>>},
{raw, 6, OptKeepInterval, <<NewInterval:32/native>>},
{raw, 6, OptKeepCount, <<NewProbes:32/native>>}
]} = Transport:getopts(Socket, Opts),
?assertEqual(NewIdle, Idle + 1),
?assertEqual(NewInterval, Interval + 1),

View File

@ -723,7 +723,8 @@ examples_listener() ->
buffer => <<"10KB">>,
high_watermark => <<"1MB">>,
nodelay => false,
reuseaddr => true
reuseaddr => true,
keepalive => "none"
}
}
},

View File

@ -0,0 +1 @@
Add support for configuring TCP keep-alive in MQTT/TCP and MQTT/SSL listeners

View File

@ -975,6 +975,20 @@ fields_tcp_opts_nodelay.desc:
fields_tcp_opts_nodelay.label:
"""TCP_NODELAY"""
fields_tcp_opts_keepalive.desc:
"""
Enable TCP keepalive for MQTT connections over TCP or SSL.
The value is three comma separated numbers in the format of 'Idle,Interval,Probes'
- Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200).
- Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
- Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9).
For example "240,30,5" means: EMQX should start sending TCP keepalive probes after the connection is in idle for 240 seconds, and the probes are sent every 30 seconds until a response is received from the MQTT client, if it misses 5 consecutive responses, EMQX should close the connection.
Default: 'none'
"""
fields_tcp_opts_keepalive.label:
"""TCP keepalive options"""
sysmon_top_db_username.desc:
"""Username of the PostgreSQL database"""