From d6282e9156a8f5a3707edcbac415fb61c900f97c Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 29 May 2023 17:45:50 +0800 Subject: [PATCH] feat: configurable TCP keepalive --- etc/emqx.conf | 46 ++++++++++++++++++++++++++++++++++ priv/emqx.schema | 25 ++++++++++++++---- scripts/macos-sign-binaries.sh | 1 + scripts/relup-base-packages.sh | 13 +++++++--- scripts/split-config.escript | 16 +++++++++--- src/emqx_connection.erl | 33 +++++++++++++++++++----- src/emqx_zone.erl | 11 ++++++++ test/emqx_mqtt_SUITE.erl | 26 ++++++++++--------- test/emqx_ocsp_cache_SUITE.erl | 4 ++- 9 files changed, 144 insertions(+), 31 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index bcd2a67c5..9abb6b414 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1072,6 +1072,29 @@ zone.external.strict_mode = false ## Value: String ## zone.external.response_information = example +## Enable TCP keepalive for MQTT connections over TCP or SSL. +## +## The value is three comma separated numbers in the format of +## 'Idle,Interval,Probes' +## +## Idle: The number of seconds a connection needs to be idle before +## the server begins to send out keep-alive probes (Linux default 7200). +## +## Interval: The number of seconds between TCP keep-alive probes (Linux default 75). +## +## Probes: The maximum number of TCP keep-alive probes to send before +## giving up and killing the connection if no response is +## obtained from the other end (Linux default 9). +## +## For example "240,30,5" means: EMQX should start sending TCP keepalive probes +## after the connection is in idel for 240 seconds, +## and the probes are sent every 30 seconds until a response is received from the MQTT +## client, if it misses 5 consecutive responses, EMQX should close the connection. +## +## Value: string +## Default: none +## zone.external.tcp_keepalive = none + ##-------------------------------------------------------------------- ## Internal Zone @@ -1177,6 +1200,29 @@ zone.internal.strict_mode = false ## Value: true | false zone.internal.bypass_auth_plugins = true +## Enable TCP keepalive for MQTT connections over TCP or SSL. +## +## The value is three comma separated numbers in the format of +## 'Idle,Interval,Probes' +## +## Idle: The number of seconds a connection needs to be idle before +## the server begins to send out keep-alive probes (Linux default 7200). +## +## Interval: The number of seconds between TCP keep-alive probes (Linux default 75). +## +## Probes: The maximum number of TCP keep-alive probes to send before +## giving up and killing the connection if no response is +## obtained from the other end (Linux default 9). +## +## For example "240,30,5" means: EMQX should start sending TCP keepalive probes +## after the connection is in idel for 240 seconds, +## and the probes are sent every 30 seconds until a response is received from the MQTT +## client, if it misses 5 consecutive responses, EMQX should close the connection. +## +## Value: string +## Default: none +## zone.internal.tcp_keepalive = none + ## CONFIG_SECTION_END=zones ==================================================== ## CONFIG_SECTION_BGN=listeners ================================================ diff --git a/priv/emqx.schema b/priv/emqx.schema index c110d0f4b..9e2487757 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -166,11 +166,11 @@ {Ip, Port} end, Options = fun(static) -> - [{seeds, [list_to_atom(S) || S <- string:tokens(cuttlefish:conf_get("cluster.static.seeds", Conf, ""), ",")]}]; + [{seeds, [list_to_atom(S) || S <- string:tokens(cuttlefish:conf_get("cluster.static.seeds", Conf, ""), ", ")]}]; (mcast) -> {ok, Addr} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.addr", Conf)), {ok, Iface} = inet:parse_address(cuttlefish:conf_get("cluster.mcast.iface", Conf)), - Ports = [list_to_integer(S) || S <- string:tokens(cuttlefish:conf_get("cluster.mcast.ports", Conf), ",")], + Ports = [list_to_integer(S) || S <- string:tokens(cuttlefish:conf_get("cluster.mcast.ports", Conf), ", ")], [{addr, Addr}, {ports, Ports}, {iface, Iface}, {ttl, cuttlefish:conf_get("cluster.mcast.ttl", Conf, 1)}, {loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}]; @@ -185,7 +185,7 @@ {list_to_atom(Name), Value} end, Options) end, - [{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ",")}, + [{server, string:tokens(cuttlefish:conf_get("cluster.etcd.server", Conf), ", ")}, {prefix, cuttlefish:conf_get("cluster.etcd.prefix", Conf, "emqcl")}, {node_ttl, cuttlefish:conf_get("cluster.etcd.node_ttl", Conf, 60)}, {ssl_options, SslOpts(Conf)}]; @@ -1286,6 +1286,12 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Set TCP-keepalive options. +{mapping, "zone.$name.tcp_keepalive", "emqx.zones", [ + {default, "none"}, + {datatype, string} +]}. + {translation, "emqx.zones", fun(Conf) -> Ratelimit = fun(Val) -> [L, D] = string:tokens(Val, ", "), @@ -1371,6 +1377,15 @@ end}. {quota, {conn_messages_routing, Ratelimit(Val)}}; (["quota", "overall_messages_routing"], Val) -> {quota, {overall_messages_routing, Ratelimit(Val)}}; + (["tcp_keepalive"], Val) -> + V = case Val of + "none" -> + false; + _ -> + [Idle, Interval, Probes] = string:tokens(Val, ", "), + {list_to_integer(Idle), list_to_integer(Interval), list_to_integer(Probes)} + end, + {tcp_keepalive, V}; ([Opt], Val) -> {list_to_atom(Opt), Val} end, @@ -2227,7 +2242,7 @@ end}. end, CheckOrigin = fun(S) -> - Origins = string:tokens(S, ","), + Origins = string:tokens(S, ", "), [ list_to_binary(string:trim(O)) || O <- Origins] end, @@ -2873,7 +2888,7 @@ end}. ]}. {translation, "emqx.alarm", fun(Conf) -> - [{actions, [list_to_atom(Action) || Action <- string:tokens(cuttlefish:conf_get("alarm.actions", Conf), ",")]}, + [{actions, [list_to_atom(Action) || Action <- string:tokens(cuttlefish:conf_get("alarm.actions", Conf), ", ")]}, {size_limit, cuttlefish:conf_get("alarm.size_limit", Conf)}, {validity_period, cuttlefish:conf_get("alarm.validity_period", Conf)}] end}. diff --git a/scripts/macos-sign-binaries.sh b/scripts/macos-sign-binaries.sh index b41c8088d..898e7d6ad 100755 --- a/scripts/macos-sign-binaries.sh +++ b/scripts/macos-sign-binaries.sh @@ -68,6 +68,7 @@ for f in \ sasl_auth.so \ snappyer.so \ odbcserver \ + ezstd_nif.so \ ; do find "${REL_DIR}"/lib/ -name "$f" -exec codesign -s "${APPLE_DEVELOPER_IDENTITY}" -f --verbose=4 --timestamp --options=runtime {} \; done diff --git a/scripts/relup-base-packages.sh b/scripts/relup-base-packages.sh index cb23dcabe..ffbb40ac1 100755 --- a/scripts/relup-base-packages.sh +++ b/scripts/relup-base-packages.sh @@ -64,17 +64,24 @@ mkdir -p _upgrade_base pushd _upgrade_base otp_vsn_for() { - ../scripts/relup-base-vsns.escript otp-vsn-for "${1#[e|v]}" ../data/relup-paths.eterm + case "$PROFILE" in + *-ee-*) + ../scripts/relup-base-vsns.escript otp-vsn-for "${1#[e|v]}" ../data/relup-paths-ee.eterm + ;; + *) + ../scripts/relup-base-vsns.escript otp-vsn-for "${1#[e|v]}" ../data/relup-paths.eterm + ;; + esac } for tag in $(../scripts/relup-base-vsns.sh $EDITION | xargs echo -n); do filename="$PROFILE-${tag#[e|v]}-otp$(otp_vsn_for "$tag")-$SYSTEM-$ARCH.zip" url="https://packages.emqx.io/$DIR/$tag/$filename" - echo "downloading base package from ${url} ..." if [ -f "$filename" ]; then - echo "file $filename already downloaded; skikpped" + echo "file $filename already downloaded; skipped" continue fi + curl -L -I -m 10 -o /dev/null -s -w "%{http_code}" "${url}" | grep -q -oE "^[23]+" echo "downloading base package from ${url} ..." curl -L -o "${filename}" "${url}" if [ "$SYSTEM" != "centos6" ]; then diff --git a/scripts/split-config.escript b/scripts/split-config.escript index 04f94269c..53e332e3a 100755 --- a/scripts/split-config.escript +++ b/scripts/split-config.escript @@ -57,7 +57,15 @@ parse_sections([Line | Lines], Parse, Section, Sections) -> dump_sections([]) -> ok; dump_sections([{Name, Lines0} | Rest]) -> - Filename = filename:join(["etc", iolist_to_binary([Name, ".conf.seg"])]), - Lines = [[L, "\n"] || L <- Lines0], - ok = file:write_file(Filename, Lines), - dump_sections(Rest). + case is_skipped(Name) of + true -> + dump_sections(Rest); + false -> + Filename = filename:join(["etc", iolist_to_binary([Name, ".conf.seg"])]), + Lines = [[L, "\n"] || L <- Lines0], + ok = file:write_file(Filename, Lines), + dump_sections(Rest) + end. + +is_skipped(Name) -> + Name =:= <<"modules">>. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ac9d40ac1..c7258be2d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -40,7 +40,7 @@ ]). -export([ async_set_keepalive/3 - , async_set_keepalive/4 + , async_set_keepalive/5 , async_set_socket_options/2 ]). @@ -226,15 +226,24 @@ stats(#state{transport = Transport, %% NOTE: This API sets TCP socket options, which has nothing to do with %% the MQTT layer's keepalive (PINGREQ and PINGRESP). async_set_keepalive(Idle, Interval, Probes) -> - async_set_keepalive(self(), Idle, Interval, Probes). + async_set_keepalive(os:type(), self(), Idle, Interval, Probes). -async_set_keepalive(Pid, Idle, Interval, Probes) -> +async_set_keepalive({unix, linux}, Pid, Idle, Interval, Probes) -> Options = [ {keepalive, true} , {raw, 6, 4, <>} , {raw, 6, 5, <>} , {raw, 6, 6, <>} ], - async_set_socket_options(Pid, Options). + async_set_socket_options(Pid, Options); +async_set_keepalive({unix, darwin}, Pid, Idle, Interval, Probes) -> + Options = [ {keepalive, true} + , {raw, 6, 16#10, <>} + , {raw, 6, 16#101, <>} + , {raw, 6, 16#102, <>} + ], + async_set_socket_options(Pid, Options); +async_set_keepalive(_Unsupported, _Pid, _Idle, _Interval, _Probes) -> + ok. %% @doc Set custom socket options. %% This API is made async because the call might be originated from @@ -291,6 +300,12 @@ init_state(Transport, Socket, Options) -> StatsTimer = emqx_zone:stats_timer(Zone), IdleTimeout = emqx_zone:idle_timeout(Zone), IdleTimer = start_timer(IdleTimeout, idle_timeout), + case emqx_zone:tcp_keepalive(Zone) of + false -> + ok; + {Idle, Interval, Probes} -> + ok = async_set_keepalive(Idle, Interval, Probes) + end, #state{transport = Transport, socket = Socket, peername = Peername, @@ -769,8 +784,14 @@ handle_cast({async_set_socket_options, Opts}, socket = Socket }) -> case Transport:setopts(Socket, Opts) of - ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts}); - Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err}) + ok -> + ?tp(debug, "custom_socket_options_successfully", #{opts => Opts}); + {error, einval} -> + %% socket is already closed, ignore this error + ok; + Err -> + %% other errors + ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err}) end, State; handle_cast(Req, State) -> diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index cac8d9301..75d7cb5e3 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -79,6 +79,7 @@ , force_shutdown_policy/1 , response_information/1 , quota_policy/1 + , tcp_keepalive/1 ]). -export([ init_gc_state/1 @@ -223,6 +224,16 @@ response_information(Zone) -> quota_policy(Zone) -> get_env(Zone, quota, []). +-spec tcp_keepalive(zone()) -> false | {integer(), integer(), integer()}. +tcp_keepalive(Zone) -> + case get_env(Zone, tcp_keepalive, false) of + {_, _, _} = V -> + V; + _ -> + %% failed to parse + false + end. + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/test/emqx_mqtt_SUITE.erl b/test/emqx_mqtt_SUITE.erl index b8fe10bc9..207e02345 100644 --- a/test/emqx_mqtt_SUITE.erl +++ b/test/emqx_mqtt_SUITE.erl @@ -183,13 +183,15 @@ t_async_set_keepalive('end', _Config) -> t_async_set_keepalive(_) -> case os:type() of {unix, darwin} -> - %% Mac OSX don't support the feature - ok; + do_async_set_keepalive(16#10, 16#101, 16#102); + {unix, linux} -> + do_async_set_keepalive(4, 5, 6); _ -> - do_async_set_keepalive() + %% don't support the feature on other OS + ok end. -do_async_set_keepalive() -> +do_async_set_keepalive(OptKeepIdle, OptKeepInterval, OptKeepCount) -> ClientID = <<"client-tcp-keepalive">>, {ok, Client} = emqtt:start_link([{host, "localhost"}, {proto_ver,v5}, @@ -203,17 +205,17 @@ do_async_set_keepalive() -> Transport = maps:get(transport, State), Socket = maps:get(socket, State), ?assert(is_port(Socket)), - Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}], - {ok, [ {raw, 6, 4, <>} - , {raw, 6, 5, <>} - , {raw, 6, 6, <>} + Opts = [{raw, 6, OptKeepIdle, 4}, {raw, 6, OptKeepInterval, 4}, {raw, 6, OptKeepCount, 4}], + {ok, [ {raw, 6, OptKeepIdle, <>} + , {raw, 6, OptKeepInterval, <>} + , {raw, 6, OptKeepCount, <>} ]} = Transport:getopts(Socket, Opts), ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]), - emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1), + emqx_connection:async_set_keepalive(os:type(), Pid, Idle + 1, Interval + 1, Probes + 1), {ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000), - {ok, [ {raw, 6, 4, <>} - , {raw, 6, 5, <>} - , {raw, 6, 6, <>} + {ok, [ {raw, 6, OptKeepIdle, <>} + , {raw, 6, OptKeepInterval, <>} + , {raw, 6, OptKeepCount, <>} ]} = Transport:getopts(Socket, Opts), ?assertEqual(NewIdle, Idle + 1), ?assertEqual(NewInterval, Interval + 1), diff --git a/test/emqx_ocsp_cache_SUITE.erl b/test/emqx_ocsp_cache_SUITE.erl index 757950c34..ada04e4c6 100644 --- a/test/emqx_ocsp_cache_SUITE.erl +++ b/test/emqx_ocsp_cache_SUITE.erl @@ -65,7 +65,7 @@ end_per_group(_Group, _Config) -> ok. init_per_testcase(t_openssl_client, Config) -> - ct:timetrap(10_000), + ct:timetrap(30_000), OriginalListeners = application:get_env(emqx, listeners), DataDir = ?config(data_dir, Config), IssuerPem = filename:join([DataDir, "ocsp-issuer.pem"]), @@ -99,6 +99,8 @@ init_per_testcase(t_openssl_client, Config) -> OCSPOpts = [ {ocsp_stapling_enabled, true} , {ocsp_responder_url, "http://127.0.0.1:9877"} , {ocsp_issuer_pem, IssuerPem} + , {ocsp_refresh_http_timeout, 15_000} + , {ocsp_refresh_interval, 1_000} ], Opts2 = emqx_misc:merge_opts(Opts1, [ {ocsp_options, OCSPOpts} , {ssl_options, SSLOpts2}]),