diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index fedf583e2..6982b3dea 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -383,17 +383,18 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> {keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)}, {idle_timeout_ms, maps:get(idle_timeout, Opts, 0)}, {handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)}, - {server_resumption_level, 2}, + {server_resumption_level, maps:get(server_resumption_level, Opts, 2)}, {verify, maps:get(verify, SSLOpts, verify_none)} ] ++ case maps:get(cacertfile, SSLOpts, undefined) of undefined -> []; CaCertFile -> [{cacertfile, binary_to_list(CaCertFile)}] - end, + end ++ + optional_quic_listener_opts(Opts), ConnectionOpts = #{ conn_callback => emqx_quic_connection, - peer_unidi_stream_count => 1, - peer_bidi_stream_count => 10, + peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1), + peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10), zone => zone(Opts), listener => {quic, ListenerName}, limiter => limiter(Opts) @@ -726,3 +727,61 @@ get_ssl_options(Conf) -> error -> maps:get(<<"ssl_options">>, Conf, undefined) end. + +%% @doc Get QUIC optional settings for low level tunings. +%% @see quicer:quic_settings() +-spec optional_quic_listener_opts(map()) -> proplists:proplist(). +optional_quic_listener_opts(Conf) when is_map(Conf) -> + maps:to_list( + maps:filter( + fun(Name, _V) -> + lists:member( + Name, + quic_listener_optional_settings() + ) + end, + Conf + ) + ). + +-spec quic_listener_optional_settings() -> [atom()]. +quic_listener_optional_settings() -> + [ + max_bytes_per_key, + %% In conf schema we use handshake_idle_timeout + handshake_idle_timeout_ms, + %% In conf schema we use idle_timeout + idle_timeout_ms, + %% not use since we are server + %% tls_client_max_send_buffer, + tls_server_max_send_buffer, + stream_recv_window_default, + stream_recv_buffer_default, + conn_flow_control_window, + max_stateless_operations, + initial_window_packets, + send_idle_timeout_ms, + initial_rtt_ms, + max_ack_delay_ms, + disconnect_timeout_ms, + %% In conf schema, we use keep_alive_interval + keep_alive_interval_ms, + %% over written by conn opts + peer_bidi_stream_count, + %% over written by conn opts + peer_unidi_stream_count, + retry_memory_limit, + load_balancing_mode, + max_operations_per_drain, + send_buffering_enabled, + pacing_enabled, + migration_enabled, + datagram_receive_enabled, + server_resumption_level, + minimum_mtu, + maximum_mtu, + mtu_discovery_search_complete_timeout_us, + mtu_discovery_missing_probe_count, + max_binding_stateless_operations, + stateless_operation_expiration_ms + ]. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 8d24e6937..cae187686 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -120,6 +120,9 @@ -elvis([{elvis_style, god_modules, disable}]). +-define(BIT(Bits), (1 bsl (Bits))). +-define(MAX_UINT(Bits), (?BIT(Bits) - 1)). + namespace() -> broker. tags() -> @@ -862,6 +865,80 @@ fields("mqtt_quic_listener") -> } )}, {"ciphers", ciphers_schema(quic)}, + + {"max_bytes_per_key", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(64), + "Maximum number of bytes to encrypt with a single 1-RTT encryption key" + "before initiating key update. Default: 274877906944" + )}, + {"handshake_idle_timeout_ms", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(64), + "How long a handshake can idle before it is discarded. Default: 10 000" + )}, + {"tls_server_max_send_buffer", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "How much Server TLS data to buffer. Default: 8192" + )}, + {"stream_recv_window_default", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "Initial stream receive window size. Default: 32678" + )}, + {"stream_recv_buffer_default", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "Stream initial buffer size. Default: 4096" + )}, + {"conn_flow_control_window", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "Connection-wide flow control window. Default: 16777216" + )}, + {"max_stateless_operations", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "The maximum number of stateless operations that may be queued on a worker at any one time. Default: 16" + )}, + {"initial_window_packets", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "The size (in packets) of the initial congestion window for a connection. Default: 10" + )}, + {"send_idle_timeout_ms", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "Reset congestion control after being idle for amount of time. Default: 1000" + )}, + {"initial_rtt_ms", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "Initial RTT estimate. Default: 333" + )}, + {"max_ack_delay_ms", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "How long to wait after receiving data before sending an ACK. Default: 25" + )}, + {"disconnect_timeout_ms", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(32), + "How long to wait for an ACK before declaring a path dead and disconnecting. Default: 16000" + )}, {"idle_timeout", sc( duration_ms(), @@ -870,6 +947,12 @@ fields("mqtt_quic_listener") -> desc => ?DESC(fields_mqtt_quic_listener_idle_timeout) } )}, + {"idle_timeout_ms", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(64), + "How long a connection can go idle before it is gracefully shut down. 0 to disable timeout" + )}, {"handshake_idle_timeout", sc( duration_ms(), @@ -878,6 +961,12 @@ fields("mqtt_quic_listener") -> desc => ?DESC(fields_mqtt_quic_listener_handshake_idle_timeout) } )}, + {"handshake_idle_timeout_ms", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(64), + "How long a handshake can idle before it is discarded" + )}, {"keep_alive_interval", sc( duration_ms(), @@ -886,6 +975,105 @@ fields("mqtt_quic_listener") -> desc => ?DESC(fields_mqtt_quic_listener_keep_alive_interval) } )}, + {"keep_alive_interval_ms", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(32), + "How often to send PING frames to keep a connection alive. Default: 0 (Disabled)" + )}, + {"peer_bidi_stream_count", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(16), + "Number of bidirectional streams to allow the peer to open." + )}, + {"peer_unidi_stream_count", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(16), + "Number of unidirectional streams to allow the peer to open." + )}, + {"retry_memory_limit", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(16), + "The percentage of available memory usable for handshake connections before" + "stateless retry is used. Calculated as `N/65535`. Default: 65" + )}, + {"load_balancing_mode", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(16), + "0: Disabled, 1: SERVER_ID_IP, 2: SERVER_ID_FIXED, default: 0" + )}, + {"max_operations_per_drain", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(8), + "The maximum number of operations to drain per connection quantum. Default: 16" + )}, + {"send_buffering_enabled", + quic_feature_toggle( + "Buffer send data instead of holding application buffers until" + "sent data is acknowledged. Default: 1 (Enabled)" + )}, + {"pacing_enabled", + quic_feature_toggle( + "Pace sending to avoid overfilling buffers on the path. Default: 1 (Enabled)" + )}, + {"migration_enabled", + quic_feature_toggle( + "Enable clients to migrate IP addresses and tuples. " + "Requires a cooperative load-balancer, or no load-balancer. Default: 1 (Enabled)" + )}, + {"datagram_receive_enabled", + quic_feature_toggle( + "Advertise support for QUIC datagram extension. Reserve for the future. Default 0 (FALSE)" + )}, + {"server_resumption_level", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(8), + "Controls resumption tickets and/or 0-RTT server support. Default: 0 (No resumption)" + )}, + {"minimum_mtu", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(16), + "The minimum MTU supported by a connection. This will be used as the starting MTU. Default: 1248" + )}, + {"maximum_mtu", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(16), + "The maximum MTU supported by a connection. This will be the maximum probed value. Default: 1500" + )}, + {"mtu_discovery_search_complete_timeout_us", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(64), + "The time in microseconds to wait before reattempting MTU probing if" + "max was not reached. Default: 600000000" + )}, + {"mtu_discovery_missing_probe_count", + quic_lowlevel_settings_uint( + 1, + ?MAX_UINT(8), + "The maximum number of stateless operations that may be queued on a binding at any one time. Default: 3" + )}, + {"max_binding_stateless_operations", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(16), + "The maximum number of stateless operations that may be queued on" + "a binding at any one time. Default: 100" + )}, + {"stateless_operation_expiration_ms", + quic_lowlevel_settings_uint( + 0, + ?MAX_UINT(16), + "The time limit between operations for the same endpoint, in milliseconds. Default: 100" + )}, {"ssl_options", sc( ref("listener_quic_ssl_opts"), @@ -2638,3 +2826,30 @@ parse_port(Port) -> _:_ -> throw("bad_port_number") end. + +quic_feature_toggle(Desc) -> + sc( + %% true, false are for user facing + %% 0, 1 are for internal represtation + typerefl:alias("boolean", typerefl:union([true, false, 0, 1])), + #{ + desc => Desc, + hidden => true, + required => false, + converter => fun + (true) -> 1; + (false) -> 0; + (Other) -> Other + end + } + ). + +quic_lowlevel_settings_uint(Low, High, Desc) -> + sc( + range(Low, High), + #{ + required => false, + hidden => true, + desc => Desc + } + ). diff --git a/changes/ce/feat-10019.en.md b/changes/ce/feat-10019.en.md new file mode 100644 index 000000000..b6cc0381c --- /dev/null +++ b/changes/ce/feat-10019.en.md @@ -0,0 +1 @@ +Add low level tuning settings for QUIC listeners. diff --git a/changes/ce/feat-10019.zh.md b/changes/ce/feat-10019.zh.md new file mode 100644 index 000000000..9ef671b3d --- /dev/null +++ b/changes/ce/feat-10019.zh.md @@ -0,0 +1 @@ +为 QUIC 侦听器添加更多底层调优选项。