feat(quic): support listener config reload

This commit is contained in:
William Yang 2024-01-08 13:57:40 +01:00
parent 973f40041d
commit 2483a4ecff
2 changed files with 277 additions and 53 deletions

View File

@ -303,6 +303,8 @@ update_listener(Type, Name, OldConf, NewConf) ->
ok ->
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
ok;
{skip, Error} when Type =:= quic ->
{error, {rollbacked, Error}};
{error, _Reason} ->
restart_listener(Type, Name, OldConf, NewConf)
end.
@ -411,49 +413,10 @@ do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
end;
%% Start MQTT/QUIC listener
do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
ListenOn =
case Bind of
{Addr, Port} when tuple_size(Addr) == 4 ->
%% IPv4
lists:flatten(io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]));
{Addr, Port} when tuple_size(Addr) == 8 ->
%% IPv6
lists:flatten(io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]));
Port ->
Port
end,
ListenOn = quic_listen_on(Bind),
case [A || {quicer, _, _} = A <- application:which_applications()] of
[_] ->
DefAcceptors = erlang:system_info(schedulers_online) * 8,
SSLOpts = maps:get(ssl_options, Opts, #{}),
ListenOpts =
[
{certfile, emqx_schema:naive_env_interpolation(maps:get(certfile, SSLOpts))},
{keyfile, emqx_schema:naive_env_interpolation(maps:get(keyfile, SSLOpts))},
{alpn, ["mqtt"]},
{conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
{keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
{idle_timeout_ms, maps:get(idle_timeout, Opts, 0)},
{handshake_idle_timeout_ms, maps:get(handshake_idle_timeout, Opts, 10000)},
{server_resumption_level, maps:get(server_resumption_level, Opts, 2)},
{verify, maps:get(verify, SSLOpts, verify_none)}
] ++
case maps:get(cacertfile, SSLOpts, undefined) of
undefined ->
[];
<<>> ->
[];
"" ->
[];
CaCertFile ->
[{cacertfile, emqx_schema:naive_env_interpolation(CaCertFile)}]
end ++
case maps:get(password, SSLOpts, undefined) of
undefined -> [];
Password -> [{password, str(Password)}]
end ++
optional_quic_listener_opts(Opts),
ListenOpts = to_quicer_listener_opts(Opts),
Limiter = limiter(Opts),
ConnectionOpts = #{
conn_callback => emqx_quic_connection,
@ -470,7 +433,7 @@ do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
quicer:spawn_listener(
Id,
ListenOn,
{maps:from_list(ListenOpts), ConnectionOpts, StreamOpts}
{ListenOpts, ConnectionOpts, StreamOpts}
);
[] ->
{ok, {skipped, quic_app_missing}}
@ -506,6 +469,31 @@ do_update_listener(Type, Name, OldConf, NewConf) when
ok = ranch:set_protocol_options(Id, WsOpts),
%% No-op if the listener was not suspended.
ranch:resume_listener(Id);
do_update_listener(quic = Type, Name, _OldConf, NewConf) ->
case quicer:listener(listener_id(Type, Name)) of
{ok, ListenerPid} ->
case quicer_listener:reload(ListenerPid, to_quicer_listener_opts(NewConf)) of
ok ->
ok;
{error, _} = Error ->
%% @TODO: prefer: case quicer_listener:reload(ListenerPid, to_quicer_listener_opts(OldConf)) of
case quicer_listener:unlock(ListenerPid, 3000) of
ok ->
?ELOG("Failed to reload QUIC listener ~p, but Rollback success\n", [
Error
]),
{skip, Error};
RestoreErr ->
?ELOG(
"Failed to reload QUIC listener ~p, and Rollback failed as well\n",
[Error]
),
{error, {rollback_fail, RestoreErr}}
end
end;
E ->
E
end;
do_update_listener(_Type, _Name, _OldConf, _NewConf) ->
{error, not_supported}.
@ -897,18 +885,16 @@ get_ssl_options(_) ->
%% @doc Get QUIC optional settings for low level tunings.
%% @see quicer:quic_settings()
-spec optional_quic_listener_opts(map()) -> proplists:proplist().
-spec optional_quic_listener_opts(map()) -> map().
optional_quic_listener_opts(Conf) when is_map(Conf) ->
maps:to_list(
maps:filter(
fun(Name, _V) ->
lists:member(
Name,
quic_listener_optional_settings()
)
end,
Conf
)
maps:filter(
fun(Name, _V) ->
lists:member(
Name,
quic_listener_optional_settings()
)
end,
Conf
).
-spec quic_listener_optional_settings() -> [atom()].
@ -991,3 +977,44 @@ default_max_conn() ->
ensure_max_conns(<<"infinity">>) -> <<"infinity">>;
ensure_max_conns(MaxConn) when is_binary(MaxConn) -> binary_to_integer(MaxConn);
ensure_max_conns(MaxConn) -> MaxConn.
-spec quic_listen_on(X :: any()) -> quicer:listen_on().
quic_listen_on(Bind) ->
case Bind of
{Addr, Port} when tuple_size(Addr) == 4 ->
%% IPv4
lists:flatten(io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]));
{Addr, Port} when tuple_size(Addr) == 8 ->
%% IPv6
lists:flatten(io_lib:format("[~ts]:~w", [inet:ntoa(Addr), Port]));
Port ->
Port
end.
-spec to_quicer_listener_opts(map()) -> quicer:listener_opts().
to_quicer_listener_opts(Opts) ->
DefAcceptors = erlang:system_info(schedulers_online) * 8,
SSLOpts = maps:from_list(ssl_opts(Opts)),
Opts1 = maps:filter(
fun
(cacertfile, undefined) -> fasle;
(password, undefined) -> fasle;
(_, _) -> true
end,
Opts
),
Opts2 = maps:merge(
Opts#{
alpn => ["mqtt"],
conn_acceptors => max(DefAcceptors, maps:get(acceptors, Opts1, 0)),
%% @NOTE: Backward compatibility START
server_resumption_level => maps:get(server_resumption_level, Opts, 2),
idle_timeout_ms => maps:get(idle_timeout, Opts, 0),
keep_alive_interval_ms => maps:get(keep_alive_interval, Opts, 0),
handshake_idle_timeout_ms => maps:get(handshake_idle_timeout, Opts, 10000)
%% @NOTE: Backward compatibility END
},
SSLOpts
),
%% @NOTE: Optional options take precedence over required options
maps:merge(Opts2, optional_quic_listener_opts(Opts)).

View File

@ -363,6 +363,188 @@ t_wss_update_opts(Config) ->
ok = emqtt:stop(C3)
end).
t_quic_update_opts(Config) ->
ListenerType = quic,
ConnectFun = connect_fun(ListenerType),
PrivDir = ?config(priv_dir, Config),
Host = "127.0.0.1",
Port = emqx_common_test_helpers:select_free_port(ListenerType),
Conf = #{
<<"enable">> => true,
<<"bind">> => format_bind({Host, Port}),
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
<<"password">> => ?SERVER_KEY_PASSWORD,
<<"certfile">> => filename:join(PrivDir, "server-password.pem"),
<<"keyfile">> => filename:join(PrivDir, "server-password.key"),
<<"verify">> => verify_none
}
},
ClientSSLOpts = [
{verify, verify_peer},
{customize_hostname_check, [{match_fun, fun(_, _) -> true end}]}
],
with_listener(ListenerType, updated, Conf, fun() ->
%% Client connects successfully.
C1 = ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
]),
%% Change the listener SSL configuration: another set of cert/key files.
{ok, _} = emqx:update_config(
[listeners, ListenerType, updated],
{update, #{
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
<<"certfile">> => filename:join(PrivDir, "server.pem"),
<<"keyfile">> => filename:join(PrivDir, "server.key")
}
}}
),
%% Unable to connect with old SSL options, server's cert is signed by another CA.
?assertError(
{transport_down, #{error := _, status := Status}} when
(Status =:= bad_certificate orelse
Status =:= cert_untrusted_root orelse
Status =:= handshake_failure),
ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
])
),
C2 = ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
]),
%% Change the listener SSL configuration: require peer certificate.
{ok, _} = emqx:update_config(
[listeners, ListenerType, updated],
{update, #{
<<"ssl_options">> => #{
<<"verify">> => verify_peer,
<<"fail_if_no_peer_cert">> => true
}
}}
),
%% Unable to connect with old SSL options, certificate is now required.
?assertExceptionOneOf(
{exit, _},
{error, _},
ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
])
),
C3 = ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")},
{certfile, filename:join(PrivDir, "client.pem")},
{keyfile, filename:join(PrivDir, "client.key")}
| ClientSSLOpts
]),
%% Both pre- and post-update clients should be alive.
?assertEqual(pong, emqtt:ping(C1)),
?assertEqual(pong, emqtt:ping(C2)),
?assertEqual(pong, emqtt:ping(C3)),
ok = emqtt:stop(C1),
ok = emqtt:stop(C2),
ok = emqtt:stop(C3)
end).
t_quic_update_opts_fail(Config) ->
ListenerType = quic,
ConnectFun = connect_fun(ListenerType),
PrivDir = ?config(priv_dir, Config),
Host = "127.0.0.1",
Port = emqx_common_test_helpers:select_free_port(ListenerType),
Conf = #{
<<"enable">> => true,
<<"bind">> => format_bind({Host, Port}),
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca.pem"),
<<"password">> => ?SERVER_KEY_PASSWORD,
<<"certfile">> => filename:join(PrivDir, "server-password.pem"),
<<"keyfile">> => filename:join(PrivDir, "server-password.key"),
<<"verify">> => verify_none
}
},
ClientSSLOpts = [
{verify, verify_peer},
{customize_hostname_check, [{match_fun, fun(_, _) -> true end}]}
],
with_listener(ListenerType, updated, Conf, fun() ->
%% GIVEN: an working Listener that client could connect to.
C1 = ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
]),
%% WHEN: reload the listener with invalid SSL options (certfile and keyfile missmatch).
UpdateResult1 = emqx:update_config(
[listeners, ListenerType, updated],
{update, #{
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
<<"certfile">> => filename:join(PrivDir, "server.pem"),
<<"keyfile">> => filename:join(PrivDir, "server-password.key")
}
}}
),
%% THEN: Reload failed but old listener is rollbacked.
?assertMatch(
{error, {post_config_update, emqx_listeners, {{rollbacked, {error, tls_error}}, _}}},
UpdateResult1
),
%% THEN: Client with old TLS options could still connect
C2 = ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
]),
%% WHEN: Change the listener SSL configuration again
UpdateResult2 = emqx:update_config(
[listeners, ListenerType, updated],
{update, #{
<<"ssl_options">> => #{
<<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"),
<<"certfile">> => filename:join(PrivDir, "server.pem"),
<<"keyfile">> => filename:join(PrivDir, "server.key")
}
}}
),
%% THEN: update should success
?assertMatch({ok, _}, UpdateResult2),
%% THEN: Client with old TLS options could not connect
%% Unable to connect with old SSL options, server's cert is signed by another CA.
?assertError(
{transport_down, #{error := _, status := Status}} when
(Status =:= bad_certificate orelse
Status =:= cert_untrusted_root orelse
Status =:= handshake_failure),
ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts
])
),
%% THEN: Client with new TLS options could connect
C3 = ConnectFun(Host, Port, [
{cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts
]),
%% Both pre- and post-update clients should be alive.
?assertEqual(pong, emqtt:ping(C1)),
?assertEqual(pong, emqtt:ping(C2)),
?assertEqual(pong, emqtt:ping(C3)),
ok = emqtt:stop(C1),
ok = emqtt:stop(C2),
ok = emqtt:stop(C3)
end).
with_listener(Type, Name, Config, Then) ->
{ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}),
try
@ -379,6 +561,14 @@ emqtt_connect_ssl(Host, Port, SSLOpts) ->
ssl_opts => SSLOpts
}).
emqtt_connect_quic(Host, Port, SSLOpts) ->
emqtt_connect(fun emqtt:quic_connect/1, #{
hosts => [{Host, Port}],
connect_timeout => 1,
ssl => true,
ssl_opts => SSLOpts
}).
emqtt_connect_wss(Host, Port, SSLOpts) ->
emqtt_connect(fun emqtt:ws_connect/1, #{
hosts => [{Host, Port}],
@ -440,3 +630,10 @@ generate_tls_certs(Config) ->
format_bind(Bind) ->
iolist_to_binary(emqx_listeners:format_bind(Bind)).
connect_fun(ssl) ->
fun emqtt_connect_ssl/3;
connect_fun(quic) ->
fun emqtt_connect_quic/3;
connect_fun(wss) ->
fun emqtt_connect_wss/3.