From a4407764f3e896e2c14890854fd3d146ae67ef1c Mon Sep 17 00:00:00 2001 From: Paulo Zulato Date: Fri, 2 Jun 2023 16:47:47 -0300 Subject: [PATCH] feat(connection): configurable TCP keepalive Fixes https://emqx.atlassian.net/browse/EMQX-9852 --- apps/emqx/src/emqx_connection.erl | 49 +++++++++++++++--- apps/emqx/src/emqx_listeners.erl | 2 +- apps/emqx/src/emqx_schema.erl | 51 ++++++++++++++++++- apps/emqx/test/emqx_mqtt_SUITE.erl | 26 +++++----- .../src/emqx_gateway_api_listeners.erl | 3 +- changes/ce/feat-10933.en.md | 1 + rel/i18n/emqx_schema.hocon | 14 +++++ 7 files changed, 125 insertions(+), 21 deletions(-) create mode 100644 changes/ce/feat-10933.en.md diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 79654e510..385c20393 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -49,7 +49,7 @@ -export([ async_set_keepalive/3, - async_set_keepalive/4, + async_set_keepalive/5, async_set_socket_options/2 ]). @@ -273,16 +273,30 @@ stats(#state{ %% NOTE: This API sets TCP socket options, which has nothing to do with %% the MQTT layer's keepalive (PINGREQ and PINGRESP). async_set_keepalive(Idle, Interval, Probes) -> - async_set_keepalive(self(), Idle, Interval, Probes). + async_set_keepalive(os:type(), self(), Idle, Interval, Probes). -async_set_keepalive(Pid, Idle, Interval, Probes) -> +async_set_keepalive({unix, linux}, Pid, Idle, Interval, Probes) -> Options = [ {keepalive, true}, {raw, 6, 4, <>}, {raw, 6, 5, <>}, {raw, 6, 6, <>} ], - async_set_socket_options(Pid, Options). + async_set_socket_options(Pid, Options); +async_set_keepalive({unix, darwin}, Pid, Idle, Interval, Probes) -> + Options = [ + {keepalive, true}, + {raw, 6, 16#10, <>}, + {raw, 6, 16#101, <>}, + {raw, 6, 16#102, <>} + ], + async_set_socket_options(Pid, Options); +async_set_keepalive(OS, _Pid, _Idle, _Interval, _Probes) -> + ?SLOG(warning, #{ + msg => "Unsupported operation: set TCP keepalive", + os => OS + }), + ok. %% @doc Set custom socket options. %% This API is made async because the call might be originated from @@ -353,6 +367,9 @@ init_state( false -> disabled end, IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout), + + set_tcp_keepalive(Listener), + IdleTimer = start_timer(IdleTimeout, idle_timeout), #state{ transport = Transport, @@ -948,8 +965,15 @@ handle_cast( } ) -> case Transport:setopts(Socket, Opts) of - ok -> ?tp(info, "custom_socket_options_successfully", #{opts => Opts}); - Err -> ?tp(error, "failed_to_set_custom_socket_optionn", #{reason => Err}) + ok -> + ?tp(debug, "custom_socket_options_successfully", #{opts => Opts}); + {error, einval} -> + %% socket is already closed, ignore this error + ?tp(debug, "socket already closed", #{reason => socket_already_closed}), + ok; + Err -> + %% other errors + ?tp(error, "failed_to_set_custom_socket_option", #{reason => Err}) end, State; handle_cast(Req, State) -> @@ -1199,6 +1223,19 @@ inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. +set_tcp_keepalive({quic, _Listener}) -> + ok; +set_tcp_keepalive({Type, Id}) -> + Conf = emqx_config:get_listener_conf(Type, Id, [tcp_options, keepalive], <<"none">>), + case iolist_to_binary(Conf) of + <<"none">> -> + ok; + Value -> + %% the value is already validated by schema, so we do not validate it again. + {Idle, Interval, Probes} = emqx_schema:parse_tcp_keepalive(Value), + async_set_keepalive(Idle, Interval, Probes) + end. + %%-------------------------------------------------------------------- %% For CT tests %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 9a59db6e1..8c7f0ec17 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -786,7 +786,7 @@ ssl_opts(Opts) -> tcp_opts(Opts) -> maps:to_list( maps:without( - [active_n], + [active_n, keepalive], maps:get(tcp_options, Opts, #{}) ) ). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 521293f7a..d65da8316 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -95,7 +95,9 @@ non_empty_string/1, validations/0, naive_env_interpolation/1, - validate_server_ssl_opts/1 + validate_server_ssl_opts/1, + validate_tcp_keepalive/1, + parse_tcp_keepalive/1 ]). -export([qos/0]). @@ -1389,6 +1391,15 @@ fields("tcp_opts") -> default => true, desc => ?DESC(fields_tcp_opts_reuseaddr) } + )}, + {"keepalive", + sc( + string(), + #{ + default => <<"none">>, + desc => ?DESC(fields_tcp_opts_keepalive), + validator => fun validate_tcp_keepalive/1 + } )} ]; fields("listener_ssl_opts") -> @@ -2842,6 +2853,44 @@ validate_alarm_actions(Actions) -> Error -> {error, Error} end. +validate_tcp_keepalive(Value) -> + case iolist_to_binary(Value) of + <<"none">> -> + ok; + _ -> + _ = parse_tcp_keepalive(Value), + ok + end. + +%% @doc This function is used as value validator and also run-time parser. +parse_tcp_keepalive(Str) -> + try + [Idle, Interval, Probes] = binary:split(iolist_to_binary(Str), <<",">>, [global]), + %% use 10 times the Linux defaults as range limit + IdleInt = parse_ka_int(Idle, "Idle", 1, 7200_0), + IntervalInt = parse_ka_int(Interval, "Interval", 1, 75_0), + ProbesInt = parse_ka_int(Probes, "Probes", 1, 9_0), + {IdleInt, IntervalInt, ProbesInt} + catch + error:_ -> + throw(#{ + reason => "Not comma separated positive integers of 'Idle,Interval,Probes' format", + value => Str + }) + end. + +parse_ka_int(Bin, Name, Min, Max) -> + I = binary_to_integer(string:trim(Bin)), + case I >= Min andalso I =< Max of + true -> + I; + false -> + Msg = io_lib:format("TCP-Keepalive '~s' value must be in the rage of [~p, ~p].", [ + Name, Min, Max + ]), + throw(#{reason => lists:flatten(Msg), value => I}) + end. + user_lookup_fun_tr(Lookup, #{make_serializable := true}) -> fmt_user_lookup_fun(Lookup); user_lookup_fun_tr(Lookup, _) -> diff --git a/apps/emqx/test/emqx_mqtt_SUITE.erl b/apps/emqx/test/emqx_mqtt_SUITE.erl index d0162b34b..f03c7af83 100644 --- a/apps/emqx/test/emqx_mqtt_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_SUITE.erl @@ -219,13 +219,15 @@ t_async_set_keepalive('end', _Config) -> t_async_set_keepalive(_) -> case os:type() of {unix, darwin} -> - %% Mac OSX don't support the feature - ok; + do_async_set_keepalive(16#10, 16#101, 16#102); + {unix, linux} -> + do_async_set_keepalive(4, 5, 6); _ -> - do_async_set_keepalive() + %% don't support the feature on other OS + ok end. -do_async_set_keepalive() -> +do_async_set_keepalive(OptKeepIdle, OptKeepInterval, OptKeepCount) -> ClientID = <<"client-tcp-keepalive">>, {ok, Client} = emqtt:start_link([ {host, "localhost"}, @@ -247,19 +249,19 @@ do_async_set_keepalive() -> Transport = maps:get(transport, State), Socket = maps:get(socket, State), ?assert(is_port(Socket)), - Opts = [{raw, 6, 4, 4}, {raw, 6, 5, 4}, {raw, 6, 6, 4}], + Opts = [{raw, 6, OptKeepIdle, 4}, {raw, 6, OptKeepInterval, 4}, {raw, 6, OptKeepCount, 4}], {ok, [ - {raw, 6, 4, <>}, - {raw, 6, 5, <>}, - {raw, 6, 6, <>} + {raw, 6, OptKeepIdle, <>}, + {raw, 6, OptKeepInterval, <>}, + {raw, 6, OptKeepCount, <>} ]} = Transport:getopts(Socket, Opts), ct:pal("Idle=~p, Interval=~p, Probes=~p", [Idle, Interval, Probes]), - emqx_connection:async_set_keepalive(Pid, Idle + 1, Interval + 1, Probes + 1), + emqx_connection:async_set_keepalive(os:type(), Pid, Idle + 1, Interval + 1, Probes + 1), {ok, _} = ?block_until(#{?snk_kind := "custom_socket_options_successfully"}, 1000), {ok, [ - {raw, 6, 4, <>}, - {raw, 6, 5, <>}, - {raw, 6, 6, <>} + {raw, 6, OptKeepIdle, <>}, + {raw, 6, OptKeepInterval, <>}, + {raw, 6, OptKeepCount, <>} ]} = Transport:getopts(Socket, Opts), ?assertEqual(NewIdle, Idle + 1), ?assertEqual(NewInterval, Interval + 1), diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 8eea3c522..7c0b8c4bf 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -723,7 +723,8 @@ examples_listener() -> buffer => <<"10KB">>, high_watermark => <<"1MB">>, nodelay => false, - reuseaddr => true + reuseaddr => true, + keepalive => "none" } } }, diff --git a/changes/ce/feat-10933.en.md b/changes/ce/feat-10933.en.md new file mode 100644 index 000000000..dcef6029b --- /dev/null +++ b/changes/ce/feat-10933.en.md @@ -0,0 +1 @@ +Add support for configuring TCP keep-alive in MQTT/TCP and MQTT/SSL listeners diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 54d866014..c6901f937 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -975,6 +975,20 @@ fields_tcp_opts_nodelay.desc: fields_tcp_opts_nodelay.label: """TCP_NODELAY""" +fields_tcp_opts_keepalive.desc: +""" +Enable TCP keepalive for MQTT connections over TCP or SSL. +The value is three comma separated numbers in the format of 'Idle,Interval,Probes' + - Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200). + - Interval: The number of seconds between TCP keep-alive probes (Linux default 75). + - Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9). +For example "240,30,5" means: EMQX should start sending TCP keepalive probes after the connection is in idle for 240 seconds, and the probes are sent every 30 seconds until a response is received from the MQTT client, if it misses 5 consecutive responses, EMQX should close the connection. +Default: 'none' +""" + +fields_tcp_opts_keepalive.label: +"""TCP keepalive options""" + sysmon_top_db_username.desc: """Username of the PostgreSQL database"""