From 2483a4ecff69d4941b3ee1f4a259fc836567abf1 Mon Sep 17 00:00:00 2001 From: William Yang Date: Mon, 8 Jan 2024 13:57:40 +0100 Subject: [PATCH] feat(quic): support listener config reload --- apps/emqx/src/emqx_listeners.erl | 133 +++++++++------- apps/emqx/test/emqx_listeners_SUITE.erl | 197 ++++++++++++++++++++++++ 2 files changed, 277 insertions(+), 53 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index dc1f6d9ad..74417f3b7 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -303,6 +303,8 @@ update_listener(Type, Name, OldConf, NewConf) -> ok -> ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), ok; + {skip, Error} when Type =:= quic -> + {error, {rollbacked, Error}}; {error, _Reason} -> restart_listener(Type, Name, OldConf, NewConf) end. @@ -411,49 +413,10 @@ do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) -> end; %% Start MQTT/QUIC listener do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) -> - ListenOn = - case Bind of - {Addr, Port} when tuple_size(Addr) == 4 -> - %% IPv4 - lists:flatten(io_lib:format("~ts:~w", [inet:ntoa(Addr), Port])); - {Addr, Port} when tuple_size(Addr) == 8 -> - %% IPv6 - lists:flatten(io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port])); - Port -> - Port - end, - + ListenOn = quic_listen_on(Bind), case [A || {quicer, _, _} = A <- application:which_applications()] of [_] -> - DefAcceptors = erlang:system_info(schedulers_online) * 8, - SSLOpts = maps:get(ssl_options, Opts, #{}), - ListenOpts = - [ - {certfile, emqx_schema:naive_env_interpolation(maps:get(certfile, SSLOpts))}, - {keyfile, emqx_schema:naive_env_interpolation(maps:get(keyfile, SSLOpts))}, - {alpn, ["mqtt"]}, - {conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])}, - {keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)}, - {idle_timeout_ms, maps:get(idle_timeout, Opts, 0)}, - {handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)}, - {server_resumption_level, maps:get(server_resumption_level, Opts, 2)}, - {verify, maps:get(verify, SSLOpts, verify_none)} - ] ++ - case maps:get(cacertfile, SSLOpts, undefined) of - undefined -> - []; - <<>> -> - []; - "" -> - []; - CaCertFile -> - [{cacertfile, emqx_schema:naive_env_interpolation(CaCertFile)}] - end ++ - case maps:get(password, SSLOpts, undefined) of - undefined -> []; - Password -> [{password, str(Password)}] - end ++ - optional_quic_listener_opts(Opts), + ListenOpts = to_quicer_listener_opts(Opts), Limiter = limiter(Opts), ConnectionOpts = #{ conn_callback => emqx_quic_connection, @@ -470,7 +433,7 @@ do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) -> quicer:spawn_listener( Id, ListenOn, - {maps:from_list(ListenOpts), ConnectionOpts, StreamOpts} + {ListenOpts, ConnectionOpts, StreamOpts} ); [] -> {ok, {skipped, quic_app_missing}} @@ -506,6 +469,31 @@ do_update_listener(Type, Name, OldConf, NewConf) when ok = ranch:set_protocol_options(Id, WsOpts), %% No-op if the listener was not suspended. ranch:resume_listener(Id); +do_update_listener(quic = Type, Name, _OldConf, NewConf) -> + case quicer:listener(listener_id(Type, Name)) of + {ok, ListenerPid} -> + case quicer_listener:reload(ListenerPid, to_quicer_listener_opts(NewConf)) of + ok -> + ok; + {error, _} = Error -> + %% @TODO: prefer: case quicer_listener:reload(ListenerPid, to_quicer_listener_opts(OldConf)) of + case quicer_listener:unlock(ListenerPid, 3000) of + ok -> + ?ELOG("Failed to reload QUIC listener ~p, but Rollback success\n", [ + Error + ]), + {skip, Error}; + RestoreErr -> + ?ELOG( + "Failed to reload QUIC listener ~p, and Rollback failed as well\n", + [Error] + ), + {error, {rollback_fail, RestoreErr}} + end + end; + E -> + E + end; do_update_listener(_Type, _Name, _OldConf, _NewConf) -> {error, not_supported}. @@ -897,18 +885,16 @@ get_ssl_options(_) -> %% @doc Get QUIC optional settings for low level tunings. %% @see quicer:quic_settings() --spec optional_quic_listener_opts(map()) -> proplists:proplist(). +-spec optional_quic_listener_opts(map()) -> map(). optional_quic_listener_opts(Conf) when is_map(Conf) -> - maps:to_list( - maps:filter( - fun(Name, _V) -> - lists:member( - Name, - quic_listener_optional_settings() - ) - end, - Conf - ) + maps:filter( + fun(Name, _V) -> + lists:member( + Name, + quic_listener_optional_settings() + ) + end, + Conf ). -spec quic_listener_optional_settings() -> [atom()]. @@ -991,3 +977,44 @@ default_max_conn() -> ensure_max_conns(<<"infinity">>) -> <<"infinity">>; ensure_max_conns(MaxConn) when is_binary(MaxConn) -> binary_to_integer(MaxConn); ensure_max_conns(MaxConn) -> MaxConn. + +-spec quic_listen_on(X :: any()) -> quicer:listen_on(). +quic_listen_on(Bind) -> + case Bind of + {Addr, Port} when tuple_size(Addr) == 4 -> + %% IPv4 + lists:flatten(io_lib:format("~ts:~w", [inet:ntoa(Addr), Port])); + {Addr, Port} when tuple_size(Addr) == 8 -> + %% IPv6 + lists:flatten(io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port])); + Port -> + Port + end. + +-spec to_quicer_listener_opts(map()) -> quicer:listener_opts(). +to_quicer_listener_opts(Opts) -> + DefAcceptors = erlang:system_info(schedulers_online) * 8, + SSLOpts = maps:from_list(ssl_opts(Opts)), + Opts1 = maps:filter( + fun + (cacertfile, undefined) -> fasle; + (password, undefined) -> fasle; + (_, _) -> true + end, + Opts + ), + Opts2 = maps:merge( + Opts#{ + alpn => ["mqtt"], + conn_acceptors => max(DefAcceptors, maps:get(acceptors, Opts1, 0)), + %% @NOTE: Backward compatibility START + server_resumption_level => maps:get(server_resumption_level, Opts, 2), + idle_timeout_ms => maps:get(idle_timeout, Opts, 0), + keep_alive_interval_ms => maps:get(keep_alive_interval, Opts, 0), + handshake_idle_timeout_ms => maps:get(handshake_idle_timeout, Opts, 10000) + %% @NOTE: Backward compatibility END + }, + SSLOpts + ), + %% @NOTE: Optional options take precedence over required options + maps:merge(Opts2, optional_quic_listener_opts(Opts)). diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index 476f02eb3..542016131 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -363,6 +363,188 @@ t_wss_update_opts(Config) -> ok = emqtt:stop(C3) end). +t_quic_update_opts(Config) -> + ListenerType = quic, + ConnectFun = connect_fun(ListenerType), + PrivDir = ?config(priv_dir, Config), + Host = "127.0.0.1", + Port = emqx_common_test_helpers:select_free_port(ListenerType), + Conf = #{ + <<"enable">> => true, + <<"bind">> => format_bind({Host, Port}), + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"verify">> => verify_none + } + }, + ClientSSLOpts = [ + {verify, verify_peer}, + {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} + ], + with_listener(ListenerType, updated, Conf, fun() -> + %% Client connects successfully. + C1 = ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: another set of cert/key files. + {ok, _} = emqx:update_config( + [listeners, ListenerType, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }} + ), + + %% Unable to connect with old SSL options, server's cert is signed by another CA. + ?assertError( + {transport_down, #{error := _, status := Status}} when + (Status =:= bad_certificate orelse + Status =:= cert_untrusted_root orelse + Status =:= handshake_failure), + ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]) + ), + + C2 = ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: require peer certificate. + {ok, _} = emqx:update_config( + [listeners, ListenerType, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"verify">> => verify_peer, + <<"fail_if_no_peer_cert">> => true + } + }} + ), + + %% Unable to connect with old SSL options, certificate is now required. + ?assertExceptionOneOf( + {exit, _}, + {error, _}, + ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]) + ), + + C3 = ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")}, + {certfile, filename:join(PrivDir, "client.pem")}, + {keyfile, filename:join(PrivDir, "client.key")} + | ClientSSLOpts + ]), + + %% Both pre- and post-update clients should be alive. + ?assertEqual(pong, emqtt:ping(C1)), + ?assertEqual(pong, emqtt:ping(C2)), + ?assertEqual(pong, emqtt:ping(C3)), + + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) + end). + +t_quic_update_opts_fail(Config) -> + ListenerType = quic, + ConnectFun = connect_fun(ListenerType), + PrivDir = ?config(priv_dir, Config), + Host = "127.0.0.1", + Port = emqx_common_test_helpers:select_free_port(ListenerType), + Conf = #{ + <<"enable">> => true, + <<"bind">> => format_bind({Host, Port}), + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"verify">> => verify_none + } + }, + ClientSSLOpts = [ + {verify, verify_peer}, + {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} + ], + with_listener(ListenerType, updated, Conf, fun() -> + %% GIVEN: an working Listener that client could connect to. + C1 = ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]), + + %% WHEN: reload the listener with invalid SSL options (certfile and keyfile missmatch). + UpdateResult1 = emqx:update_config( + [listeners, ListenerType, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key") + } + }} + ), + + %% THEN: Reload failed but old listener is rollbacked. + ?assertMatch( + {error, {post_config_update, emqx_listeners, {{rollbacked, {error, tls_error}}, _}}}, + UpdateResult1 + ), + + %% THEN: Client with old TLS options could still connect + C2 = ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]), + + %% WHEN: Change the listener SSL configuration again + UpdateResult2 = emqx:update_config( + [listeners, ListenerType, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }} + ), + %% THEN: update should success + ?assertMatch({ok, _}, UpdateResult2), + + %% THEN: Client with old TLS options could not connect + %% Unable to connect with old SSL options, server's cert is signed by another CA. + ?assertError( + {transport_down, #{error := _, status := Status}} when + (Status =:= bad_certificate orelse + Status =:= cert_untrusted_root orelse + Status =:= handshake_failure), + ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]) + ), + + %% THEN: Client with new TLS options could connect + C3 = ConnectFun(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]), + + %% Both pre- and post-update clients should be alive. + ?assertEqual(pong, emqtt:ping(C1)), + ?assertEqual(pong, emqtt:ping(C2)), + ?assertEqual(pong, emqtt:ping(C3)), + + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) + end). + with_listener(Type, Name, Config, Then) -> {ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}), try @@ -379,6 +561,14 @@ emqtt_connect_ssl(Host, Port, SSLOpts) -> ssl_opts => SSLOpts }). +emqtt_connect_quic(Host, Port, SSLOpts) -> + emqtt_connect(fun emqtt:quic_connect/1, #{ + hosts => [{Host, Port}], + connect_timeout => 1, + ssl => true, + ssl_opts => SSLOpts + }). + emqtt_connect_wss(Host, Port, SSLOpts) -> emqtt_connect(fun emqtt:ws_connect/1, #{ hosts => [{Host, Port}], @@ -440,3 +630,10 @@ generate_tls_certs(Config) -> format_bind(Bind) -> iolist_to_binary(emqx_listeners:format_bind(Bind)). + +connect_fun(ssl) -> + fun emqtt_connect_ssl/3; +connect_fun(quic) -> + fun emqtt_connect_quic/3; +connect_fun(wss) -> + fun emqtt_connect_wss/3.