feat: configurable TCP keepalive

This commit is contained in:
Shawn 2023-05-29 17:45:50 +08:00
parent 3e7ec9d008
commit d6282e9156
9 changed files with 144 additions and 31 deletions

View File

@ -1072,6 +1072,29 @@ zone.external.strict_mode = false
## Value: String
## zone.external.response_information = example
## Enable TCP keepalive for MQTT connections over TCP or SSL.
##
## The value is three comma separated numbers in the format of
## 'Idle,Interval,Probes'
##
## Idle: The number of seconds a connection needs to be idle before
## the server begins to send out keep-alive probes (Linux default 7200).
##
## Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
##
## Probes: The maximum number of TCP keep-alive probes to send before
## giving up and killing the connection if no response is
## obtained from the other end (Linux default 9).
##
## For example "240,30,5" means: EMQX should start sending TCP keepalive probes
## after the connection is in idel for 240 seconds,
## and the probes are sent every 30 seconds until a response is received from the MQTT
## client, if it misses 5 consecutive responses, EMQX should close the connection.
##
## Value: string
## Default: none
## zone.external.tcp_keepalive = none
##--------------------------------------------------------------------
## Internal Zone
@ -1177,6 +1200,29 @@ zone.internal.strict_mode = false
## Value: true | false
zone.internal.bypass_auth_plugins = true
## Enable TCP keepalive for MQTT connections over TCP or SSL.
##
## The value is three comma separated numbers in the format of
## 'Idle,Interval,Probes'
##
## Idle: The number of seconds a connection needs to be idle before
## the server begins to send out keep-alive probes (Linux default 7200).
##
## Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
##
## Probes: The maximum number of TCP keep-alive probes to send before
## giving up and killing the connection if no response is
## obtained from the other end (Linux default 9).
##
## For example "240,30,5" means: EMQX should start sending TCP keepalive probes
## after the connection is in idel for 240 seconds,
## and the probes are sent every 30 seconds until a response is received from the MQTT
## client, if it misses 5 consecutive responses, EMQX should close the connection.
##
## Value: string
## Default: none
## zone.internal.tcp_keepalive = none
## CONFIG_SECTION_END=zones ====================================================
## CONFIG_SECTION_BGN=listeners ================================================

View File

@ -1286,6 +1286,12 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Set TCP-keepalive options.
{mapping, "zone.$name.tcp_keepalive", "emqx.zones", [
{default, "none"},
{datatype, string}
]}.
{translation, "emqx.zones", fun(Conf) ->
Ratelimit = fun(Val) ->
[L, D] = string:tokens(Val, ", "),
@ -1371,6 +1377,15 @@ end}.
{quota, {conn_messages_routing, Ratelimit(Val)}};
(["quota", "overall_messages_routing"], Val) ->
{quota, {overall_messages_routing, Ratelimit(Val)}};
(["tcp_keepalive"], Val) ->
V = case Val of
"none" ->
false;
_ ->
[Idle, Interval, Probes] = string:tokens(Val, ", "),
{list_to_integer(Idle), list_to_integer(Interval), list_to_integer(Probes)}
end,
{tcp_keepalive, V};
([Opt], Val) ->
{list_to_atom(Opt), Val}
end,

View File

@ -68,6 +68,7 @@ for f in \
sasl_auth.so \
snappyer.so \
odbcserver \
ezstd_nif.so \
; do
find "${REL_DIR}"/lib/ -name "$f" -exec codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime {} \;
done

View File

@ -64,17 +64,24 @@ mkdir -p _upgrade_base
pushd _upgrade_base
otp_vsn_for() {
case "$PROFILE" in
*-ee-*)
../scripts/relup-base-vsns.escript otp-vsn-for "${1#[e|v]}" ../data/relup-paths-ee.eterm
;;
*)
../scripts/relup-base-vsns.escript otp-vsn-for "${1#[e|v]}" ../data/relup-paths.eterm
;;
esac
}
for tag in $(../scripts/relup-base-vsns.sh $EDITION | xargs echo -n); do
filename="$PROFILE-${tag#[e|v]}-otp$(otp_vsn_for "$tag")-$SYSTEM-$ARCH.zip"
url="https://packages.emqx.io/$DIR/$tag/$filename"
echo "downloading base package from ${url} ..."
if [ -f "$filename" ]; then
echo "file $filename already downloaded; skikpped"
echo "file $filename already downloaded; skipped"
continue
fi
curl -L -I -m 10 -o /dev/null -s -w "%{http_code}" "${url}" | grep -q -oE "^[23]+"
echo "downloading base package from ${url} ..."
curl -L -o "${filename}" "${url}"
if [ "$SYSTEM" != "centos6" ]; then

View File

@ -57,7 +57,15 @@ parse_sections([Line | Lines], Parse, Section, Sections) ->
dump_sections([]) -> ok;
dump_sections([{Name, Lines0} | Rest]) ->
case is_skipped(Name) of
true ->
dump_sections(Rest);
false ->
Filename = filename:join(["etc", iolist_to_binary([Name, ".conf.seg"])]),
Lines = [[L, "\n"] || L <- Lines0],
ok = file:write_file(Filename, Lines),
dump_sections(Rest).
dump_sections(Rest)
end.
is_skipped(Name) ->
Name =:= <<"modules">>.

View File

@ -40,7 +40,7 @@
]).
-export([ async_set_keepalive/3
, async_set_keepalive/4
, async_set_keepalive/5
, async_set_socket_options/2
]).
@ -226,15 +226,24 @@ stats(#state{transport = Transport,
%% NOTE: This API sets TCP socket options, which has nothing to do with
%% the MQTT layer's keepalive (PINGREQ and PINGRESP).
async_set_keepalive(Idle, Interval, Probes) ->
async_set_keepalive(self(), Idle, Interval, Probes).
async_set_keepalive(os:type(), self(), Idle, Interval, Probes).
async_set_keepalive(Pid, Idle, Interval, Probes) ->
async_set_keepalive({unix, linux}, Pid, Idle, Interval, Probes) ->
Options = [ {keepalive, true}
, {raw, 6, 4, <<Idle:32/native>>}
, {raw, 6, 5, <<Interval:32/native>>}
, {raw, 6, 6, <<Probes:32/native>>}
],
async_set_socket_options(Pid, Options).
async_set_socket_options(Pid, Options);
async_set_keepalive({unix, darwin}, Pid, Idle, Interval, Probes) ->
Options = [ {keepalive, true}
, {raw, 6, 16#10, <<Idle:32/native>>}
, {raw, 6, 16#101, <<Interval:32/native>>}
, {raw, 6, 16#102, <<Probes:32/native>>}
],
async_set_socket_options(Pid, Options);
async_set_keepalive(_Unsupported, _Pid, _Idle, _Interval, _Probes) ->
ok.
%% @doc Set custom socket options.
%% This API is made async because the call might be originated from
@ -291,6 +300,12 @@ init_state(Transport, Socket, Options) ->
StatsTimer = emqx_zone:stats_timer(Zone),
IdleTimeout = emqx_zone:idle_timeout(Zone),
IdleTimer = start_timer(IdleTimeout, idle_timeout),
case emqx_zone:tcp_keepalive(Zone) of
false ->
ok;
{Idle, Interval, Probes} ->
ok = async_set_keepalive(Idle, Interval, Probes)
end,
#state{transport = Transport,
socket = Socket,
peername = Peername,
@ -769,8 +784,14 @@ handle_cast({async_set_socket_options, Opts},
socket = Socket
}) ->
case Transport:setopts(Socket, Opts) of
ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts});
Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err})
ok ->
?tp(debug, "custom_socket_options_successfully", #{opts => Opts});
{error, einval} ->
%% socket is already closed, ignore this error
ok;
Err ->
%% other errors
?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err})
end,
State;
handle_cast(Req, State) ->

View File

@ -79,6 +79,7 @@
, force_shutdown_policy/1
, response_information/1
, quota_policy/1
, tcp_keepalive/1
]).
-export([ init_gc_state/1
@ -223,6 +224,16 @@ response_information(Zone) ->
quota_policy(Zone) ->
get_env(Zone, quota, []).
-spec tcp_keepalive(zone()) -> false | {integer(), integer(), integer()}.
tcp_keepalive(Zone) ->
case get_env(Zone, tcp_keepalive, false) of
{_, _, _} = V ->
V;
_ ->
%% failed to parse
false
end.
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------

View File

@ -183,13 +183,15 @@ t_async_set_keepalive('end', _Config) ->
t_async_set_keepalive(_) ->
case os:type() of
{unix, darwin} ->
%% Mac OSX don't support the feature
ok;
do_async_set_keepalive(16#10, 16#101, 16#102);
{unix, linux} ->
do_async_set_keepalive(4, 5, 6);
_ ->
do_async_set_keepalive()
%% don't support the feature on other OS
ok
end.
do_async_set_keepalive() ->
do_async_set_keepalive(OptKeepIdle, OptKeepInterval, OptKeepCount) ->
ClientID = <<"client-tcp-keepalive">>,
{ok, Client} = emqtt:start_link([{host, "localhost"},
{proto_ver,v5},
@ -203,17 +205,17 @@ do_async_set_keepalive() ->
Transport = maps:get(transport, State),
Socket = maps:get(socket, State),
?assert(is_port(Socket)),
Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}],
{ok, [ {raw, 6, 4, <<Idle:32/native>>}
, {raw, 6, 5, <<Interval:32/native>>}
, {raw, 6, 6, <<Probes:32/native>>}
Opts = [{raw, 6, OptKeepIdle, 4}, {raw, 6, OptKeepInterval, 4}, {raw, 6, OptKeepCount, 4}],
{ok, [ {raw, 6, OptKeepIdle, <<Idle:32/native>>}
, {raw, 6, OptKeepInterval, <<Interval:32/native>>}
, {raw, 6, OptKeepCount, <<Probes:32/native>>}
]} = Transport:getopts(Socket, Opts),
ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]),
emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1),
emqx_connection:async_set_keepalive(os:type(), Pid, Idle + 1, Interval + 1, Probes + 1),
{ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000),
{ok, [ {raw, 6, 4, <<NewIdle:32/native>>}
, {raw, 6, 5, <<NewInterval:32/native>>}
, {raw, 6, 6, <<NewProbes:32/native>>}
{ok, [ {raw, 6, OptKeepIdle, <<NewIdle:32/native>>}
, {raw, 6, OptKeepInterval, <<NewInterval:32/native>>}
, {raw, 6, OptKeepCount, <<NewProbes:32/native>>}
]} = Transport:getopts(Socket, Opts),
?assertEqual(NewIdle, Idle + 1),
?assertEqual(NewInterval, Interval + 1),

View File

@ -65,7 +65,7 @@ end_per_group(_Group, _Config) ->
ok.
init_per_testcase(t_openssl_client, Config) ->
ct:timetrap(10_000),
ct:timetrap(30_000),
OriginalListeners = application:get_env(emqx, listeners),
DataDir = ?config(data_dir, Config),
IssuerPem = filename:join([DataDir, "ocsp-issuer.pem"]),
@ -99,6 +99,8 @@ init_per_testcase(t_openssl_client, Config) ->
OCSPOpts = [ {ocsp_stapling_enabled, true}
, {ocsp_responder_url, "http://127.0.0.1:9877"}
, {ocsp_issuer_pem, IssuerPem}
, {ocsp_refresh_http_timeout, 15_000}
, {ocsp_refresh_interval, 1_000}
],
Opts2 = emqx_misc:merge_opts(Opts1, [ {ocsp_options, OCSPOpts}
, {ssl_options, SSLOpts2}]),