From 78f7d01b1ceb056f52f2f68286a2ce47e6dc61ea Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 18 Dec 2023 20:52:57 +0100 Subject: [PATCH 01/12] test(listen): simplify test suite setup / teardown --- apps/emqx/test/emqx_listeners_SUITE.erl | 309 ++++++++---------------- 1 file changed, 105 insertions(+), 204 deletions(-) diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index b8d0c39f6..eabc2792f 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -20,122 +20,45 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(CERTS_PATH(CertName), filename:join(["../../lib/emqx/etc/certs/", CertName])). - -define(SERVER_KEY_PASSWORD, "sErve7r8Key$!"). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - NewConfig = generate_config(), - application:ensure_all_started(esockd), - application:ensure_all_started(quicer), - application:ensure_all_started(cowboy), generate_tls_certs(Config), - lists:foreach(fun set_app_env/1, NewConfig), - Config. + WorkDir = emqx_cth_suite:work_dir(Config), + Apps = emqx_cth_suite:start([quicer, emqx], #{work_dir => WorkDir}), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - application:stop(esockd), - application:stop(cowboy). +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(Case, Config) when - Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp + Case =:= t_start_stop_listeners; + Case =:= t_restart_listeners; + Case =:= t_restart_listeners_with_hibernate_after_disabled -> - catch emqx_config_handler:stop(), - Port = emqx_common_test_helpers:select_free_port(tcp), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - PureListeners2 = PureListeners#{ - tcp => #{ - listener_test => #{ - bind => {"127.0.0.1", Port}, - max_connections => 4321, - limiter => #{} - } - } - }, - emqx_config:put([listeners], PureListeners2), - - ok = emqx_listeners:start(), - [ - {prev_listener_conf, PrevListeners}, - {tcp_port, Port} - | Config - ]; -init_per_testcase(t_wss_conn, Config) -> - catch emqx_config_handler:stop(), - Port = emqx_common_test_helpers:select_free_port(ssl), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - PureListeners2 = PureListeners#{ - wss => #{ - listener_test => #{ - bind => {{127, 0, 0, 1}, Port}, - limiter => #{}, - ssl_options => #{ - cacertfile => ?CERTS_PATH("cacert.pem"), - certfile => ?CERTS_PATH("cert.pem"), - keyfile => ?CERTS_PATH("key.pem") - } - } - } - }, - emqx_config:put([listeners], PureListeners2), - - ok = emqx_listeners:start(), - [ - {prev_listener_conf, PrevListeners}, - {wss_port, Port} - | Config - ]; + ok = emqx_listeners:stop(), + Config; init_per_testcase(_, Config) -> - catch emqx_config_handler:stop(), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - emqx_config:put([listeners], PureListeners), - [ - {prev_listener_conf, PrevListeners} - | Config - ]. + ok = emqx_listeners:start(), + Config. -end_per_testcase(Case, Config) when - Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp --> - PrevListener = ?config(prev_listener_conf, Config), - emqx_listeners:stop(), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), - ok; -end_per_testcase(t_wss_conn, Config) -> - PrevListener = ?config(prev_listener_conf, Config), - emqx_listeners:stop(), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), - ok; -end_per_testcase(_, Config) -> - PrevListener = ?config(prev_listener_conf, Config), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), +end_per_testcase(_, _Config) -> ok. t_start_stop_listeners(_) -> ok = emqx_listeners:start(), - ?assertException(error, _, emqx_listeners:start_listener({ws, {"127.0.0.1", 8083}, []})), + ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})), ok = emqx_listeners:stop(). t_restart_listeners(_) -> ok = emqx_listeners:start(), ok = emqx_listeners:stop(), - %% flakyness: eaddrinuse - timer:sleep(timer:seconds(2)), ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). @@ -168,77 +91,108 @@ t_restart_listeners_with_hibernate_after_disabled(_Config) -> ), ok = emqx_listeners:start(), ok = emqx_listeners:stop(), - %% flakyness: eaddrinuse - timer:sleep(timer:seconds(2)), ok = emqx_listeners:restart(), ok = emqx_listeners:stop(), emqx_config:put([listeners], OldLConf). -t_max_conns_tcp(Config) -> +t_max_conns_tcp(_Config) -> %% Note: Using a string representation for the bind address like %% "127.0.0.1" does not work - ?assertEqual( - 4321, - emqx_listeners:max_conns('tcp:listener_test', {{127, 0, 0, 1}, ?config(tcp_port, Config)}) - ). + Port = emqx_common_test_helpers:select_free_port(tcp), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"max_connections">> => 4321, + <<"limiter">> => #{} + }, + with_listener(tcp, maxconns, Conf, fun() -> + ?assertEqual( + 4321, + emqx_listeners:max_conns('tcp:maxconns', {{127, 0, 0, 1}, Port}) + ) + end). -t_current_conns_tcp(Config) -> - ?assertEqual( - 0, - emqx_listeners:current_conns('tcp:listener_test', { - {127, 0, 0, 1}, ?config(tcp_port, Config) - }) - ). +t_current_conns_tcp(_Config) -> + Port = emqx_common_test_helpers:select_free_port(tcp), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"max_connections">> => 42, + <<"limiter">> => #{} + }, + with_listener(tcp, curconns, Conf, fun() -> + ?assertEqual( + 0, + emqx_listeners:current_conns('tcp:curconns', {{127, 0, 0, 1}, Port}) + ) + end). t_wss_conn(Config) -> - {ok, Socket} = ssl:connect( - {127, 0, 0, 1}, ?config(wss_port, Config), [{verify, verify_none}], 1000 - ), - ok = ssl:close(Socket). + PrivDir = ?config(priv_dir, Config), + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"limiter">> => #{}, + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }, + with_listener(wss, wssconn, Conf, fun() -> + {ok, Socket} = ssl:connect({127, 0, 0, 1}, Port, [{verify, verify_none}], 1000), + ok = ssl:close(Socket) + end). t_quic_conn(Config) -> + PrivDir = ?config(priv_dir, Config), Port = emqx_common_test_helpers:select_free_port(quic), - DataDir = ?config(data_dir, Config), - SSLOpts = #{ - password => ?SERVER_KEY_PASSWORD, - certfile => filename:join(DataDir, "server-password.pem"), - cacertfile => filename:join(DataDir, "ca.pem"), - keyfile => filename:join(DataDir, "server-password.key") + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"ssl_options">> => #{ + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key") + } }, - emqx_common_test_helpers:ensure_quic_listener(?FUNCTION_NAME, Port, #{ssl_options => SSLOpts}), - ct:pal("~p", [emqx_listeners:list()]), - {ok, Conn} = quicer:connect( - {127, 0, 0, 1}, - Port, - [ - {verify, verify_none}, - {alpn, ["mqtt"]} - ], - 1000 - ), - ok = quicer:close_connection(Conn), - emqx_listeners:stop_listener(quic, ?FUNCTION_NAME, #{bind => Port}). + with_listener(quic, ?FUNCTION_NAME, Conf, fun() -> + {ok, Conn} = quicer:connect( + {127, 0, 0, 1}, + Port, + [ + {verify, verify_none}, + {alpn, ["mqtt"]} + ], + 1000 + ), + ok = quicer:close_connection(Conn) + end). t_ssl_password_cert(Config) -> + PrivDir = ?config(priv_dir, Config), Port = emqx_common_test_helpers:select_free_port(ssl), - DataDir = ?config(data_dir, Config), SSLOptsPWD = #{ - password => ?SERVER_KEY_PASSWORD, - certfile => filename:join(DataDir, "server-password.pem"), - cacertfile => filename:join(DataDir, "ca.pem"), - keyfile => filename:join(DataDir, "server-password.key") + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key") }, LConf = #{ - enable => true, - bind => {{127, 0, 0, 1}, Port}, - mountpoint => <<>>, - zone => default, - ssl_options => SSLOptsPWD + <<"enable">> => true, + <<"bind">> => format_bind({{127, 0, 0, 1}, Port}), + <<"ssl_options">> => SSLOptsPWD }, - ok = emqx_listeners:start_listener(ssl, ?FUNCTION_NAME, LConf), - {ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]), - ssl:close(SSLSocket), - emqx_listeners:stop_listener(ssl, ?FUNCTION_NAME, LConf). + with_listener(ssl, ?FUNCTION_NAME, LConf, fun() -> + {ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]), + ssl:close(SSLSocket) + end). + +with_listener(Type, Name, Config, Then) -> + {ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}), + try + Then() + after + emqx:update_config([listeners, Type, Name], ?TOMBSTONE_CONFIG_CHANGE_REQ) + end. t_format_bind(_) -> ?assertEqual( @@ -266,67 +220,14 @@ t_format_bind(_) -> lists:flatten(emqx_listeners:format_bind(":1883")) ). -render_config_file() -> - Path = local_path(["etc", "emqx.conf"]), - {ok, Temp} = file:read_file(Path), - Vars0 = mustache_vars(), - Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0], - Targ = bbmustache:render(Temp, Vars), - NewName = Path ++ ".rendered", - ok = file:write_file(NewName, Targ), - NewName. - -mustache_vars() -> - [ - {platform_data_dir, local_path(["data"])}, - {platform_etc_dir, local_path(["etc"])} - ]. - -generate_config() -> - ConfFile = render_config_file(), - {ok, Conf} = hocon:load(ConfFile, #{format => richmap}), - hocon_tconf:generate(emqx_schema, Conf). - -set_app_env({App, Lists}) -> - lists:foreach( - fun - ({authz_file, _Var}) -> - application:set_env(App, authz_file, local_path(["etc", "authz.conf"])); - ({Par, Var}) -> - application:set_env(App, Par, Var) - end, - Lists - ). - -local_path(Components, Module) -> - filename:join([get_base_dir(Module) | Components]). - -local_path(Components) -> - local_path(Components, ?MODULE). - -get_base_dir(Module) -> - {file, Here} = code:is_loaded(Module), - filename:dirname(filename:dirname(Here)). - -get_base_dir() -> - get_base_dir(?MODULE). - -remove_default_limiter(Listeners) -> - maps:map( - fun(_, X) -> - maps:map( - fun(_, E) -> - maps:remove(limiter, E) - end, - X - ) - end, - Listeners - ). - generate_tls_certs(Config) -> - DataDir = ?config(data_dir, Config), - emqx_common_test_helpers:gen_ca(DataDir, "ca"), - emqx_common_test_helpers:gen_host_cert("server-password", "ca", DataDir, #{ + PrivDir = ?config(priv_dir, Config), + emqx_common_test_helpers:gen_ca(PrivDir, "ca"), + emqx_common_test_helpers:gen_host_cert("server", "ca", PrivDir, #{}), + emqx_common_test_helpers:gen_host_cert("client", "ca", PrivDir, #{}), + emqx_common_test_helpers:gen_host_cert("server-password", "ca", PrivDir, #{ password => ?SERVER_KEY_PASSWORD }). + +format_bind(Bind) -> + iolist_to_binary(emqx_listeners:format_bind(Bind)). From cafd384466f869ac8d29ba606aefe8f2c3c8fa2e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 11:15:09 +0100 Subject: [PATCH 02/12] chore: upgrade to esockd 5.11.0 --- apps/emqx/rebar.config | 2 +- mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 763e3ace9..6a53107ca 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.0"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.40.3"}}}, diff --git a/mix.exs b/mix.exs index 338bf7812..645420d9f 100644 --- a/mix.exs +++ b/mix.exs @@ -53,7 +53,7 @@ defmodule EMQXUmbrella.MixProject do {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, - {:esockd, github: "emqx/esockd", tag: "5.9.9", override: true}, + {:esockd, github: "emqx/esockd", tag: "5.11.0", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, {:ekka, github: "emqx/ekka", tag: "0.17.0", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.0", override: true}, diff --git a/rebar.config b/rebar.config index 661049f11..c92bcb1d1 100644 --- a/rebar.config +++ b/rebar.config @@ -69,7 +69,7 @@ , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.0"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}} From da0f0f947e0c4892bc72d90ccda3c9efdf85cca8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 11:23:29 +0100 Subject: [PATCH 03/12] feat(listen): support hot config update of esockd-based listeners --- apps/emqx/src/emqx_listeners.erl | 217 +++++++++++++++---------------- 1 file changed, 107 insertions(+), 110 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 5cf631ccb..c6daf7fb2 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -55,7 +55,6 @@ ]). -export([pre_config_update/3, post_config_update/5]). --export([create_listener/3, remove_listener/3, update_listener/3]). -export([format_bind/1]). @@ -66,6 +65,11 @@ -export_type([listener_id/0]). -type listener_id() :: atom() | binary(). +-type listener_type() :: tcp | ssl | ws | wss | quic | dtls. + +-define(ESOCKD_LISTENER(T), (T == tcp orelse T == ssl)). +-define(COWBOY_LISTENER(T), (T == ws orelse T == wss)). + -define(ROOT_KEY, listeners). -define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). @@ -140,15 +144,9 @@ format_raw_listeners({Type0, Conf}) -> -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. is_running(ListenerId) -> - case - [ - Running - || {Id, #{running := Running}} <- list(), - Id =:= ListenerId - ] - of - [] -> {error, not_found}; - [IsRunning] -> IsRunning + case lists:keyfind(ListenerId, 1, list()) of + {_Id, #{running := Running}} -> Running; + false -> {error, not_found} end. is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> @@ -229,11 +227,10 @@ start() -> start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). --spec start_listener(atom(), atom(), map()) -> ok | {error, term()}. -start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> +-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}. +start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> case do_start_listener(Type, ListenerName, Conf) of {ok, {skipped, Reason}} when - Reason =:= listener_disabled; Reason =:= quic_app_missing -> ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}), @@ -269,7 +266,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> ) ), {error, {failed_to_start, Msg}} - end. + end; +start_listener(Type, ListenerName, #{enable := false}) -> + console_print( + "Listener ~ts is NOT started due to: disabled.~n", + [listener_id(Type, ListenerName)] + ), + ok. %% @doc Restart all listeners -spec restart() -> ok. @@ -280,16 +283,35 @@ restart() -> restart_listener(ListenerId) -> apply_on_listener(ListenerId, fun restart_listener/3). --spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}. -restart_listener(Type, ListenerName, {OldConf, NewConf}) -> - restart_listener(Type, ListenerName, OldConf, NewConf); +-spec restart_listener(listener_type(), atom(), map()) -> ok | {error, term()}. restart_listener(Type, ListenerName, Conf) -> restart_listener(Type, ListenerName, Conf, Conf). -restart_listener(Type, ListenerName, OldConf, NewConf) -> - case stop_listener(Type, ListenerName, OldConf) of - ok -> start_listener(Type, ListenerName, NewConf); - {error, Reason} -> {error, Reason} +update_listener(_Type, _Name, #{enable := false}, #{enable := false}) -> + ok; +update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) -> + stop_listener(Type, Name, Conf); +update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) -> + start_listener(Type, Name, Conf); +update_listener(Type, Name, OldConf = #{bind := Bind}, NewConf = #{bind := Bind}) -> + case do_update_listener(Type, Name, OldConf, NewConf) of + ok -> + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), + ok; + {error, _Reason} -> + restart_listener(Type, Name, OldConf, NewConf) + end; +update_listener(Type, Name, OldConf, NewConf) -> + %% TODO + %% Again, we're not strictly required to drop live connections in this case. + restart_listener(Type, Name, OldConf, NewConf). + +restart_listener(Type, Name, OldConf, NewConf) -> + case stop_listener(Type, Name, OldConf) of + ok -> + start_listener(Type, Name, NewConf); + {error, Reason} -> + {error, Reason} end. %% @doc Stop all listeners. @@ -305,9 +327,10 @@ stop() -> stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). -stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> - Id = listener_id(Type, ListenerName), +stop_listener(Type, Name, #{bind := Bind} = Conf) -> + Id = listener_id(Type, Name), ok = del_limiter_bucket(Id, Conf), + ok = unregister_ocsp_stapling_refresh(Type, Name), case do_stop_listener(Type, Id, Conf) of ok -> console_print( @@ -325,11 +348,10 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> {error, Reason} end. --spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}. - -do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl -> +-spec do_stop_listener(listener_type(), atom(), map()) -> ok | {error, term()}. +do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) -> esockd:close(Id, ListenOn); -do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss -> +do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) -> case cowboy:stop_listener(Id) of ok -> wait_listener_stopped(ListenOn); @@ -369,39 +391,23 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args). console_print(_Fmt, _Args) -> ok. -endif. -%% Start MQTT/TCP listener --spec do_start_listener(atom(), atom(), map()) -> +-spec do_start_listener(listener_type(), atom(), map()) -> {ok, pid() | {skipped, atom()}} | {error, term()}. -do_start_listener(_Type, _ListenerName, #{enable := false}) -> - {ok, {skipped, listener_disabled}}; -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when - Type == tcp; Type == ssl --> - Id = listener_id(Type, ListenerName), - Limiter = limiter(Opts), - add_limiter_bucket(Id, Limiter), +%% Start MQTT/TCP listener +do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> + Id = listener_id(Type, Name), + ok = add_limiter_bucket(Id, limiter(Opts)), esockd:open( Id, ListenOn, - merge_default(esockd_opts(Id, Type, Opts)), - {emqx_connection, start_link, [ - #{ - listener => {Type, ListenerName}, - zone => zone(Opts), - limiter => Limiter, - enable_authn => enable_authn(Opts) - } - ]} + merge_default(esockd_opts(Id, Type, Name, Opts)) ); %% Start MQTT/WS listener -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when - Type == ws; Type == wss --> +do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when ?COWBOY_LISTENER(Type) -> Id = listener_id(Type, ListenerName), - Limiter = limiter(Opts), - add_limiter_bucket(Id, Limiter), + ok = add_limiter_bucket(Id, limiter(Opts)), RanchOpts = ranch_opts(Type, ListenOn, Opts), - WsOpts = ws_opts(Type, ListenerName, Opts, Limiter), + WsOpts = ws_opts(Type, ListenerName, Opts), case Type of ws -> cowboy:start_clear(Id, RanchOpts, WsOpts); wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) @@ -476,6 +482,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> {ok, {skipped, quic_app_missing}} end. +do_update_listener(Type, Name, _OldConf, NewConf) when ?ESOCKD_LISTENER(Type) -> + Id = listener_id(Type, Name), + ListenOn = maps:get(bind, NewConf), + esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf)); +do_update_listener(_Type, _Name, _OldConf, _NewConf) -> + {error, not_supported}. + %% Update the listeners at runtime pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when V =:= undefined orelse V =:= ?TOMBSTONE_VALUE @@ -501,69 +514,44 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, OldConf, _AppEnvs) when OldConf =:= undefined orelse OldConf =:= ?TOMBSTONE_TYPE -> - create_listener(Type, Name, NewConf); + start_listener(Type, Name, NewConf); post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> - update_listener(Type, Name, {OldConf, NewConf}); + update_listener(Type, Name, OldConf, NewConf); post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) -> - remove_listener(Type, Name, OldConf); + stop_listener(Type, Name, OldConf); post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> - #{enable := NewEnabled} = NewConf, - #{enable := OldEnabled} = OldConf, - case {NewEnabled, OldEnabled} of - {true, true} -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - restart_listener(Type, Name, {OldConf, NewConf}); - {true, false} -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - start_listener(Type, Name, NewConf); - {false, true} -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf); - {false, false} -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf) - end; + update_listener(Type, Name, OldConf, NewConf); post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) -> ok; post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) -> #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf), - Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed), - perform_listener_changes([ - {fun ?MODULE:remove_listener/3, Removed}, - {fun ?MODULE:update_listener/3, Updated}, - {fun ?MODULE:create_listener/3, Added} - ]); + %% TODO + %% This currently lacks transactional semantics. If one of the changes fails, + %% previous changes will not be rolled back. + perform_listener_changes( + [{update, L} || L <- Changed] ++ + [{stop, L} || L <- Removed] ++ + [{start, L} || L <- Added] + ); post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. -create_listener(Type, Name, NewConf) -> - start_listener(Type, Name, NewConf). - -remove_listener(Type, Name, OldConf) -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf). - -update_listener(Type, Name, {OldConf, NewConf}) -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - restart_listener(Type, Name, {OldConf, NewConf}). - perform_listener_changes([]) -> ok; -perform_listener_changes([{Action, ConfL} | Tasks]) -> - case perform_listener_changes(Action, ConfL) of - ok -> perform_listener_changes(Tasks); +perform_listener_changes([{Action, Listener} | Rest]) -> + case perform_listener_change(Action, Listener) of + ok -> perform_listener_changes(Rest); {error, Reason} -> {error, Reason} end. -perform_listener_changes(_Action, []) -> - ok; -perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) -> - case Action(Type, Name, Diff) of - ok -> perform_listener_changes(Action, MapConf); - {error, Reason} -> {error, Reason} - end. +perform_listener_change(start, {Type, Name, Conf}) -> + start_listener(Type, Name, Conf); +perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) -> + update_listener(Type, Name, ConfOld, ConfNew); +perform_listener_change(stop, {Type, Name, Conf}) -> + stop_listener(Type, Name, Conf). -esockd_opts(ListenerId, Type, Opts0) -> +esockd_opts(ListenerId, Type, Name, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), Opts2 = @@ -579,7 +567,16 @@ esockd_opts(ListenerId, Type, Opts0) -> end, Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])), - tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]} + tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}, + connection_mfargs => + {emqx_connection, start_link, [ + #{ + listener => {Type, Name}, + zone => zone(Opts0), + limiter => Limiter, + enable_authn => enable_authn(Opts0) + } + ]} }, maps:to_list( case Type of @@ -593,12 +590,12 @@ esockd_opts(ListenerId, Type, Opts0) -> end ). -ws_opts(Type, ListenerName, Opts, Limiter) -> +ws_opts(Type, ListenerName, Opts) -> WsPaths = [ {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{ zone => zone(Opts), listener => {Type, ListenerName}, - limiter => Limiter, + limiter => limiter(Opts), enable_authn => enable_authn(Opts) }} ], @@ -742,24 +739,24 @@ diff_confs(NewConfs, OldConfs) -> emqx_utils:diff_lists( flatten_confs(NewConfs), flatten_confs(OldConfs), - fun({Key, _}) -> Key end + fun({Type, Name, _}) -> {Type, Name} end ). -flatten_confs(Conf0) -> +flatten_confs(Confs) -> lists:flatmap( - fun({Type, Conf}) -> - do_flatten_confs(Type, Conf) + fun({Type, Listeners}) -> + do_flatten_confs(Type, Listeners) end, - maps:to_list(Conf0) + maps:to_list(Confs) ). -do_flatten_confs(Type, Conf0) -> +do_flatten_confs(Type, Listeners) -> FilterFun = fun ({_Name, ?TOMBSTONE_TYPE}) -> false; - ({Name, Conf}) -> {true, {{Type, Name}, Conf}} + ({Name, Conf}) -> {true, {Type, Name, Conf}} end, - lists:filtermap(FilterFun, maps:to_list(Conf0)). + lists:filtermap(FilterFun, maps:to_list(Listeners)). enable_authn(Opts) -> maps:get(enable_authn, Opts, true). From fb2d4544b661ca665b0e12034acc4b18ca740385 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 11:25:02 +0100 Subject: [PATCH 04/12] test(listen): verify that SSL options are hot-updateable --- apps/emqx/test/emqx_listeners_SUITE.erl | 80 +++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index eabc2792f..8a986f9a8 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -186,6 +186,78 @@ t_ssl_password_cert(Config) -> ssl:close(SSLSocket) end). +t_ssl_update_opts(Config) -> + PrivDir = ?config(priv_dir, Config), + CACertfile = filename:join(PrivDir, "ca.pem"), + Host = "127.0.0.1", + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"enable">> => true, + <<"bind">> => format_bind({Host, Port}), + <<"ssl_options">> => #{ + <<"cacertfile">> => CACertfile, + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key"), + <<"verify">> => verify_none + } + }, + ClientSSLOpts = [ + {verify, verify_peer}, + {cacertfile, CACertfile}, + {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} + ], + with_listener(ssl, updated, Conf, fun() -> + ClientOpts1 = #{ + hosts => [{Host, Port}], + ssl => true, + ssl_opts => ClientSSLOpts + }, + C1 = emqtt_connect(ClientOpts1), + + %% Change the listener SSL configuration. + %% 1. Another set of (password protected) cert/key files. + %% 2. Require peer certificate. + {ok, _} = emqx:update_config( + [listeners, ssl, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"cacertfile">> => CACertfile, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"verify">> => verify_peer, + <<"fail_if_no_peer_cert">> => true + } + }} + ), + + %% Unable to connect with old SSL options, certificate is now required. + ?assertError( + {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}, + emqtt_connect(ClientOpts1) + ), + + ClientOpts2 = ClientOpts1#{ + ssl_opts := [ + {certfile, filename:join(PrivDir, "client.pem")}, + {keyfile, filename:join(PrivDir, "client.key")} + | ClientSSLOpts + ] + }, + C2 = emqtt_connect(ClientOpts2), + + %% Both pre- and post-update clients should be alive. + ?assertEqual(pong, emqtt:ping(C1)), + ?assertEqual(pong, emqtt:ping(C2)), + %% TODO + %% Would be nice to verify that the server picked up new cert/key pair. + %% However, there's no usable API for that in `emqtt` at the moment. + %% ?assertEqual(<<"server">>, esockd_peercert:common_name(Cert1)), + %% ?assertEqual(<<"server-password">>, esockd_peercert:common_name(Cert2)), + ok = emqtt:stop(C1), + ok = emqtt:stop(C2) + end). + with_listener(Type, Name, Config, Then) -> {ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}), try @@ -194,6 +266,14 @@ with_listener(Type, Name, Config, Then) -> emqx:update_config([listeners, Type, Name], ?TOMBSTONE_CONFIG_CHANGE_REQ) end. +emqtt_connect(Options) -> + {ok, Client} = emqtt:start_link(Options), + true = erlang:unlink(Client), + case emqtt:connect(Client) of + {ok, _} -> Client; + {error, Reason} -> error(Reason) + end. + t_format_bind(_) -> ?assertEqual( ":1883", From a20ef0376b223b64dcc4ad41c5df4e220cbc4565 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 13:01:06 +0100 Subject: [PATCH 05/12] test(exproto): fix too small buffer size for listener These options have not been picked up by the listener before esockd 5.11.0, so it was not apparent that they are not working. --- apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl index 76e11ef00..74a488abb 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl @@ -663,9 +663,11 @@ tcp_opts() -> udp_opts() -> #{ - recbuf => 1024, - sndbuf => 1024, - buffer => 1024, + %% NOTE + %% Making those too small will lead to inability to accept connections. + recbuf => 2048, + sndbuf => 2048, + buffer => 2048, reuseaddr => true }. From 68bbbec5566a1a267d44d90df281ab7725e70e7b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 14:35:29 +0100 Subject: [PATCH 06/12] chore: bump to esockd 5.11.1 With a typespec fix. --- apps/emqx/rebar.config | 2 +- mix.exs | 2 +- rebar.config | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 6a53107ca..50548f645 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.0"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.40.3"}}}, diff --git a/mix.exs b/mix.exs index 645420d9f..10bbefa27 100644 --- a/mix.exs +++ b/mix.exs @@ -53,7 +53,7 @@ defmodule EMQXUmbrella.MixProject do {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, - {:esockd, github: "emqx/esockd", tag: "5.11.0", override: true}, + {:esockd, github: "emqx/esockd", tag: "5.11.1", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, {:ekka, github: "emqx/ekka", tag: "0.17.0", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.0", override: true}, diff --git a/rebar.config b/rebar.config index c92bcb1d1..185730832 100644 --- a/rebar.config +++ b/rebar.config @@ -69,7 +69,7 @@ , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.0"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.0"}}} From 036048b9ab94fbe27426cb216f7d5465e2c3f4b3 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 14:36:25 +0100 Subject: [PATCH 07/12] feat(listen): support hot update of WS/WSS options as well However, currently any transport option changes will cause listener to restart, which slightly impacts the availability of the broker. --- apps/emqx/src/emqx_listeners.erl | 57 ++++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index c6daf7fb2..f82c2fe10 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -293,18 +293,14 @@ update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) -> stop_listener(Type, Name, Conf); update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) -> start_listener(Type, Name, Conf); -update_listener(Type, Name, OldConf = #{bind := Bind}, NewConf = #{bind := Bind}) -> +update_listener(Type, Name, OldConf, NewConf) -> case do_update_listener(Type, Name, OldConf, NewConf) of ok -> ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), ok; {error, _Reason} -> restart_listener(Type, Name, OldConf, NewConf) - end; -update_listener(Type, Name, OldConf, NewConf) -> - %% TODO - %% Again, we're not strictly required to drop live connections in this case. - restart_listener(Type, Name, OldConf, NewConf). + end. restart_listener(Type, Name, OldConf, NewConf) -> case stop_listener(Type, Name, OldConf) of @@ -403,11 +399,11 @@ do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER( merge_default(esockd_opts(Id, Type, Name, Opts)) ); %% Start MQTT/WS listener -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when ?COWBOY_LISTENER(Type) -> - Id = listener_id(Type, ListenerName), +do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) -> + Id = listener_id(Type, Name), ok = add_limiter_bucket(Id, limiter(Opts)), - RanchOpts = ranch_opts(Type, ListenOn, Opts), - WsOpts = ws_opts(Type, ListenerName, Opts), + RanchOpts = ranch_opts(Type, Opts), + WsOpts = ws_opts(Type, Name, Opts), case Type of ws -> cowboy:start_clear(Id, RanchOpts, WsOpts); wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) @@ -482,10 +478,36 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> {ok, {skipped, quic_app_missing}} end. -do_update_listener(Type, Name, _OldConf, NewConf) when ?ESOCKD_LISTENER(Type) -> +do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when + ?ESOCKD_LISTENER(Type) +-> Id = listener_id(Type, Name), - ListenOn = maps:get(bind, NewConf), - esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf)); + case maps:get(bind, OldConf) of + ListenOn -> + esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf)); + _Different -> + %% TODO + %% Again, we're not strictly required to drop live connections in this case. + {error, not_supported} + end; +do_update_listener(Type, Name, OldConf, NewConf) when + ?COWBOY_LISTENER(Type) +-> + Id = listener_id(Type, Name), + RanchOpts = ranch_opts(Type, NewConf), + WsOpts = ws_opts(Type, Name, NewConf), + case ranch_opts(Type, OldConf) of + RanchOpts -> + %% Transport options did not change, no need to touch the listener. + ok; + _Different -> + %% Transport options changed, we need to tear down the listener. + ok = ranch:suspend_listener(Id), + ok = ranch:set_transport_options(Id, RanchOpts) + end, + ok = ranch:set_protocol_options(Id, WsOpts), + %% No-op if the listener was not suspended. + ranch:resume_listener(Id); do_update_listener(_Type, _Name, _OldConf, _NewConf) -> {error, not_supported}. @@ -591,19 +613,20 @@ esockd_opts(ListenerId, Type, Name, Opts0) -> ). ws_opts(Type, ListenerName, Opts) -> - WsPaths = [ - {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{ + WsPath = emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), + WsRoutes = [ + {WsPath, emqx_ws_connection, #{ zone => zone(Opts), listener => {Type, ListenerName}, limiter => limiter(Opts), enable_authn => enable_authn(Opts) }} ], - Dispatch = cowboy_router:compile([{'_', WsPaths}]), + Dispatch = cowboy_router:compile([{'_', WsRoutes}]), ProxyProto = maps:get(proxy_protocol, Opts, false), #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}. -ranch_opts(Type, ListenOn, Opts) -> +ranch_opts(Type, Opts = #{bind := ListenOn}) -> NumAcceptors = maps:get(acceptors, Opts, 4), MaxConnections = maps:get(max_connections, Opts, 1024), SocketOpts = From 4796f85dff06b94f161e7dd2575aecca00707901 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 14:38:00 +0100 Subject: [PATCH 08/12] test(listen): verify SSL options of WSS listener are hot updateable --- apps/emqx/test/emqx_listeners_SUITE.erl | 119 +++++++++++++++++++----- 1 file changed, 98 insertions(+), 21 deletions(-) diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index 8a986f9a8..203a30cde 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -207,12 +207,7 @@ t_ssl_update_opts(Config) -> {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} ], with_listener(ssl, updated, Conf, fun() -> - ClientOpts1 = #{ - hosts => [{Host, Port}], - ssl => true, - ssl_opts => ClientSSLOpts - }, - C1 = emqtt_connect(ClientOpts1), + C1 = emqtt_connect_ssl(Host, Port, ClientSSLOpts), %% Change the listener SSL configuration. %% 1. Another set of (password protected) cert/key files. @@ -234,17 +229,14 @@ t_ssl_update_opts(Config) -> %% Unable to connect with old SSL options, certificate is now required. ?assertError( {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}, - emqtt_connect(ClientOpts1) + emqtt_connect_ssl(Host, Port, ClientSSLOpts) ), - ClientOpts2 = ClientOpts1#{ - ssl_opts := [ - {certfile, filename:join(PrivDir, "client.pem")}, - {keyfile, filename:join(PrivDir, "client.key")} - | ClientSSLOpts - ] - }, - C2 = emqtt_connect(ClientOpts2), + C2 = emqtt_connect_ssl(Host, Port, [ + {certfile, filename:join(PrivDir, "client.pem")}, + {keyfile, filename:join(PrivDir, "client.key")} + | ClientSSLOpts + ]), %% Both pre- and post-update clients should be alive. ?assertEqual(pong, emqtt:ping(C1)), @@ -258,6 +250,68 @@ t_ssl_update_opts(Config) -> ok = emqtt:stop(C2) end). +t_wss_update_opts(Config) -> + PrivDir = ?config(priv_dir, Config), + CACertfile = filename:join(PrivDir, "ca.pem"), + Host = "127.0.0.1", + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"enable">> => true, + <<"bind">> => format_bind({Host, Port}), + <<"ssl_options">> => #{ + <<"cacertfile">> => CACertfile, + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key"), + <<"verify">> => verify_none + } + }, + ClientSSLOpts = [ + {verify, verify_peer}, + {cacertfile, CACertfile}, + {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} + ], + with_listener(wss, updated, Conf, fun() -> + %% Start a client. + C1 = emqtt_connect_wss(Host, Port, ClientSSLOpts), + + %% Change the listener SSL configuration. + %% 1. Another set of (password protected) cert/key files. + %% 2. Require peer certificate. + {ok, _} = emqx:update_config( + [listeners, wss, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"cacertfile">> => CACertfile, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"verify">> => verify_peer, + <<"fail_if_no_peer_cert">> => true + } + }} + ), + + %% Unable to connect with old SSL options, certificate is now required. + ?assertError( + %% Due to a bug `emqtt` does not instantly report that socket was closed. + timeout, + emqtt_connect_wss(Host, Port, ClientSSLOpts) + ), + + % C2 = emqx_cth_mqttws:connect(Host, Port, [ + C2 = emqtt_connect_wss(Host, Port, [ + {certfile, filename:join(PrivDir, "client.pem")}, + {keyfile, filename:join(PrivDir, "client.key")} + | ClientSSLOpts + ]), + + %% Both pre- and post-update clients should be alive. + ?assertEqual(pong, emqtt:ping(C1)), + ?assertEqual(pong, emqtt:ping(C2)), + ok = emqtt:stop(C1), + ok = emqtt:stop(C2) + end). + with_listener(Type, Name, Config, Then) -> {ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}), try @@ -266,12 +320,35 @@ with_listener(Type, Name, Config, Then) -> emqx:update_config([listeners, Type, Name], ?TOMBSTONE_CONFIG_CHANGE_REQ) end. -emqtt_connect(Options) -> - {ok, Client} = emqtt:start_link(Options), - true = erlang:unlink(Client), - case emqtt:connect(Client) of - {ok, _} -> Client; - {error, Reason} -> error(Reason) +emqtt_connect_ssl(Host, Port, SSLOpts) -> + emqtt_connect(fun emqtt:connect/1, #{ + hosts => [{Host, Port}], + connect_timeout => 1, + ssl => true, + ssl_opts => SSLOpts + }). + +emqtt_connect_wss(Host, Port, SSLOpts) -> + emqtt_connect(fun emqtt:ws_connect/1, #{ + hosts => [{Host, Port}], + connect_timeout => 1, + ws_transport_options => [ + {protocols, [http]}, + {transport, tls}, + {tls_opts, SSLOpts} + ] + }). + +emqtt_connect(Connect, Opts) -> + case emqtt:start_link(Opts) of + {ok, Client} -> + true = erlang:unlink(Client), + case Connect(Client) of + {ok, _} -> Client; + {error, Reason} -> error(Reason, [Opts]) + end; + {error, Reason} -> + error(Reason, [Opts]) end. t_format_bind(_) -> From 7e4049620d0682aef994976e31b290fdd5be220a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 16:05:48 +0100 Subject: [PATCH 09/12] fix(listen): ensure limiter server state consistent with updates --- apps/emqx/src/emqx_listeners.erl | 68 +++++++++++++++++--------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index f82c2fe10..dc1f6d9ad 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -228,22 +228,25 @@ start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). -spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}. -start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> - case do_start_listener(Type, ListenerName, Conf) of +start_listener(Type, Name, #{bind := Bind, enable := true} = Conf) -> + ListenerId = listener_id(Type, Name), + Limiter = limiter(Conf), + ok = add_limiter_bucket(ListenerId, Limiter), + case do_start_listener(Type, Name, ListenerId, Conf) of {ok, {skipped, Reason}} when Reason =:= quic_app_missing -> ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}), console_print( "Listener ~ts is NOT started due to: ~p.~n", - [listener_id(Type, ListenerName), Reason] + [ListenerId, Reason] ), ok; {ok, _} -> ?tp(listener_started, #{type => Type, bind => Bind}), console_print( "Listener ~ts on ~ts started.~n", - [listener_id(Type, ListenerName), format_bind(Bind)] + [ListenerId, format_bind(Bind)] ), ok; {error, {already_started, Pid}} -> @@ -252,8 +255,8 @@ start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> }), {error, {already_started, Pid}}; {error, Reason} -> + ok = del_limiter_bucket(ListenerId, Limiter), ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}), - ListenerId = listener_id(Type, ListenerName), BindStr = format_bind(Bind), ?ELOG( "Failed to start listener ~ts on ~ts: ~0p.~n", @@ -267,10 +270,10 @@ start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> ), {error, {failed_to_start, Msg}} end; -start_listener(Type, ListenerName, #{enable := false}) -> +start_listener(Type, Name, #{enable := false}) -> console_print( "Listener ~ts is NOT started due to: disabled.~n", - [listener_id(Type, ListenerName)] + [listener_id(Type, Name)] ), ok. @@ -294,6 +297,8 @@ update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) -> update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) -> start_listener(Type, Name, Conf); update_listener(Type, Name, OldConf, NewConf) -> + Id = listener_id(Type, Name), + ok = update_limiter_bucket(Id, limiter(OldConf), limiter(NewConf)), case do_update_listener(Type, Name, OldConf, NewConf) of ok -> ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), @@ -325,7 +330,7 @@ stop_listener(ListenerId) -> stop_listener(Type, Name, #{bind := Bind} = Conf) -> Id = listener_id(Type, Name), - ok = del_limiter_bucket(Id, Conf), + ok = del_limiter_bucket(Id, limiter(Conf)), ok = unregister_ocsp_stapling_refresh(Type, Name), case do_stop_listener(Type, Id, Conf) of ok -> @@ -387,21 +392,17 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args). console_print(_Fmt, _Args) -> ok. -endif. --spec do_start_listener(listener_type(), atom(), map()) -> +-spec do_start_listener(listener_type(), atom(), listener_id(), map()) -> {ok, pid() | {skipped, atom()}} | {error, term()}. %% Start MQTT/TCP listener -do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> - Id = listener_id(Type, Name), - ok = add_limiter_bucket(Id, limiter(Opts)), +do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> esockd:open( Id, ListenOn, merge_default(esockd_opts(Id, Type, Name, Opts)) ); %% Start MQTT/WS listener -do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) -> - Id = listener_id(Type, Name), - ok = add_limiter_bucket(Id, limiter(Opts)), +do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) -> RanchOpts = ranch_opts(Type, Opts), WsOpts = ws_opts(Type, Name, Opts), case Type of @@ -409,7 +410,7 @@ do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) -> wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) end; %% Start MQTT/QUIC listener -do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> +do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) -> ListenOn = case Bind of {Addr, Port} when tuple_size(Addr) == 4 -> @@ -459,16 +460,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1), peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10), zone => zone(Opts), - listener => {quic, ListenerName}, + listener => {quic, Name}, limiter => Limiter }, StreamOpts = #{ stream_callback => emqx_quic_stream, active => 1 }, - - Id = listener_id(quic, ListenerName), - add_limiter_bucket(Id, Limiter), quicer:spawn_listener( Id, ListenOn, @@ -745,18 +743,24 @@ add_limiter_bucket(Id, Limiter) -> maps:without([client], Limiter) ). -del_limiter_bucket(Id, Conf) -> - case limiter(Conf) of - undefined -> - ok; - Limiter -> - lists:foreach( - fun(Type) -> - emqx_limiter_server:del_bucket(Id, Type) - end, - maps:keys(Limiter) - ) - end. +del_limiter_bucket(_Id, undefined) -> + ok; +del_limiter_bucket(Id, Limiter) -> + maps:foreach( + fun(Type, _) -> + emqx_limiter_server:del_bucket(Id, Type) + end, + Limiter + ). + +update_limiter_bucket(Id, Limiter, undefined) -> + del_limiter_bucket(Id, Limiter); +update_limiter_bucket(Id, undefined, Limiter) -> + add_limiter_bucket(Id, Limiter); +update_limiter_bucket(Id, OldLimiter, NewLimiter) -> + ok = add_limiter_bucket(Id, NewLimiter), + Outdated = maps:without(maps:keys(NewLimiter), OldLimiter), + del_limiter_bucket(Id, Outdated). diff_confs(NewConfs, OldConfs) -> emqx_utils:diff_lists( From 8d30bb2b80b34945979d583d410725e77c3b8126 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 16:39:55 +0100 Subject: [PATCH 10/12] chore: add changelog entry --- changes/ce/feat-12201.en.md | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 changes/ce/feat-12201.en.md diff --git a/changes/ce/feat-12201.en.md b/changes/ce/feat-12201.en.md new file mode 100644 index 000000000..4247ec6e9 --- /dev/null +++ b/changes/ce/feat-12201.en.md @@ -0,0 +1,10 @@ +Support hot update of TCP/SSL/WS/WSS MQTT listeners configuration, which allows changing most of the configuration parameters without restarting the listener and disconnecting the clients. + +In case of TCP/SSL listeners, changes to the following parameters still require full listener restart: + * `bind` + * `tcp_options.backlog` + +In case of WS/WSS listeners, any parameter can be freely changed without losing the connected clients. However, changing transport related parameters will cause listening socket to be re-opened, namely: + * `bind` + * `tcp_options.*` + * `ssl_options.*` From baf46c9aa2895ca7b00d2df6c535b0ad3f67bd0f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 22:48:38 +0100 Subject: [PATCH 11/12] test(listen): also change CA on update and verify clients notice --- apps/emqx/test/emqx_listeners_SUITE.erl | 126 +++++++++++++++++------- 1 file changed, 88 insertions(+), 38 deletions(-) diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index 203a30cde..1dab8f4c4 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -188,38 +188,58 @@ t_ssl_password_cert(Config) -> t_ssl_update_opts(Config) -> PrivDir = ?config(priv_dir, Config), - CACertfile = filename:join(PrivDir, "ca.pem"), Host = "127.0.0.1", Port = emqx_common_test_helpers:select_free_port(ssl), Conf = #{ <<"enable">> => true, <<"bind">> => format_bind({Host, Port}), <<"ssl_options">> => #{ - <<"cacertfile">> => CACertfile, - <<"certfile">> => filename:join(PrivDir, "server.pem"), - <<"keyfile">> => filename:join(PrivDir, "server.key"), + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), <<"verify">> => verify_none } }, ClientSSLOpts = [ {verify, verify_peer}, - {cacertfile, CACertfile}, {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} ], with_listener(ssl, updated, Conf, fun() -> - C1 = emqtt_connect_ssl(Host, Port, ClientSSLOpts), + %% Client connects successfully. + C1 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]), - %% Change the listener SSL configuration. - %% 1. Another set of (password protected) cert/key files. - %% 2. Require peer certificate. + %% Change the listener SSL configuration: another set of cert/key files. + {ok, _} = emqx:update_config( + [listeners, ssl, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }} + ), + + %% Unable to connect with old SSL options, server's cert is signed by another CA. + ?assertError( + {tls_alert, {unknown_ca, _}}, + emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]) + ), + + C2 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: require peer certificate. {ok, _} = emqx:update_config( [listeners, ssl, updated], {update, #{ <<"ssl_options">> => #{ - <<"password">> => ?SERVER_KEY_PASSWORD, - <<"cacertfile">> => CACertfile, - <<"certfile">> => filename:join(PrivDir, "server-password.pem"), - <<"keyfile">> => filename:join(PrivDir, "server-password.key"), <<"verify">> => verify_peer, <<"fail_if_no_peer_cert">> => true } @@ -229,10 +249,13 @@ t_ssl_update_opts(Config) -> %% Unable to connect with old SSL options, certificate is now required. ?assertError( {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}, - emqtt_connect_ssl(Host, Port, ClientSSLOpts) + emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]) ), - C2 = emqtt_connect_ssl(Host, Port, [ + C3 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")}, {certfile, filename:join(PrivDir, "client.pem")}, {keyfile, filename:join(PrivDir, "client.key")} | ClientSSLOpts @@ -241,38 +264,38 @@ t_ssl_update_opts(Config) -> %% Both pre- and post-update clients should be alive. ?assertEqual(pong, emqtt:ping(C1)), ?assertEqual(pong, emqtt:ping(C2)), - %% TODO - %% Would be nice to verify that the server picked up new cert/key pair. - %% However, there's no usable API for that in `emqtt` at the moment. - %% ?assertEqual(<<"server">>, esockd_peercert:common_name(Cert1)), - %% ?assertEqual(<<"server-password">>, esockd_peercert:common_name(Cert2)), + ?assertEqual(pong, emqtt:ping(C3)), + ok = emqtt:stop(C1), - ok = emqtt:stop(C2) + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) end). t_wss_update_opts(Config) -> PrivDir = ?config(priv_dir, Config), - CACertfile = filename:join(PrivDir, "ca.pem"), Host = "127.0.0.1", Port = emqx_common_test_helpers:select_free_port(ssl), Conf = #{ <<"enable">> => true, <<"bind">> => format_bind({Host, Port}), <<"ssl_options">> => #{ - <<"cacertfile">> => CACertfile, - <<"certfile">> => filename:join(PrivDir, "server.pem"), - <<"keyfile">> => filename:join(PrivDir, "server.key"), + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"password">> => ?SERVER_KEY_PASSWORD, <<"verify">> => verify_none } }, ClientSSLOpts = [ {verify, verify_peer}, - {cacertfile, CACertfile}, {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} ], with_listener(wss, updated, Conf, fun() -> %% Start a client. - C1 = emqtt_connect_wss(Host, Port, ClientSSLOpts), + C1 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} + | ClientSSLOpts + ]), %% Change the listener SSL configuration. %% 1. Another set of (password protected) cert/key files. @@ -281,10 +304,30 @@ t_wss_update_opts(Config) -> [listeners, wss, updated], {update, #{ <<"ssl_options">> => #{ - <<"password">> => ?SERVER_KEY_PASSWORD, - <<"cacertfile">> => CACertfile, - <<"certfile">> => filename:join(PrivDir, "server-password.pem"), - <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }} + ), + + %% Unable to connect with old SSL options, server's cert is signed by another CA. + %% Due to a bug `emqtt` exits with `badmatch` in this case. + ?assertExit( + _Badmatch, + emqtt_connect_wss(Host, Port, ClientSSLOpts) + ), + + C2 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} + | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: require peer certificate. + {ok, _} = emqx:update_config( + [listeners, wss, updated], + {update, #{ + <<"ssl_options">> => #{ <<"verify">> => verify_peer, <<"fail_if_no_peer_cert">> => true } @@ -292,14 +335,17 @@ t_wss_update_opts(Config) -> ), %% Unable to connect with old SSL options, certificate is now required. + %% Due to a bug `emqtt` does not instantly report that socket was closed. ?assertError( - %% Due to a bug `emqtt` does not instantly report that socket was closed. timeout, - emqtt_connect_wss(Host, Port, ClientSSLOpts) + emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} + | ClientSSLOpts + ]) ), - % C2 = emqx_cth_mqttws:connect(Host, Port, [ - C2 = emqtt_connect_wss(Host, Port, [ + C3 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")}, {certfile, filename:join(PrivDir, "client.pem")}, {keyfile, filename:join(PrivDir, "client.key")} | ClientSSLOpts @@ -308,8 +354,11 @@ t_wss_update_opts(Config) -> %% Both pre- and post-update clients should be alive. ?assertEqual(pong, emqtt:ping(C1)), ?assertEqual(pong, emqtt:ping(C2)), + ?assertEqual(pong, emqtt:ping(C3)), + ok = emqtt:stop(C1), - ok = emqtt:stop(C2) + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) end). with_listener(Type, Name, Config, Then) -> @@ -380,8 +429,9 @@ t_format_bind(_) -> generate_tls_certs(Config) -> PrivDir = ?config(priv_dir, Config), emqx_common_test_helpers:gen_ca(PrivDir, "ca"), - emqx_common_test_helpers:gen_host_cert("server", "ca", PrivDir, #{}), - emqx_common_test_helpers:gen_host_cert("client", "ca", PrivDir, #{}), + emqx_common_test_helpers:gen_ca(PrivDir, "ca-next"), + emqx_common_test_helpers:gen_host_cert("server", "ca-next", PrivDir, #{}), + emqx_common_test_helpers:gen_host_cert("client", "ca-next", PrivDir, #{}), emqx_common_test_helpers:gen_host_cert("server-password", "ca", PrivDir, #{ password => ?SERVER_KEY_PASSWORD }). From a6c26ce9925ade9a6b1d9bec87111a02b4647056 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 23:36:52 +0100 Subject: [PATCH 12/12] test(listen): stabilize flaky testcase --- apps/emqx/include/asserts.hrl | 22 ++++++++++++++++++++++ apps/emqx/test/emqx_listeners_SUITE.erl | 6 ++++-- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl index 489c47862..63dad42aa 100644 --- a/apps/emqx/include/asserts.hrl +++ b/apps/emqx/include/asserts.hrl @@ -83,6 +83,28 @@ end)() ). +-define(assertExceptionOneOf(CT1, CT2, EXPR), + (fun() -> + X__Attrs = [ + {module, ?MODULE}, + {line, ?LINE}, + {expression, (??EXPR)}, + {pattern, "[ " ++ (??CT1) ++ ", " ++ (??CT2) ++ " ]"} + ], + X__Exc = + try (EXPR) of + X__V -> erlang:error({assertException, [{unexpected_success, X__V} | X__Attrs]}) + catch + X__C:X__T:X__S -> {X__C, X__T, X__S} + end, + case {element(1, X__Exc), element(2, X__Exc)} of + CT1 -> ok; + CT2 -> ok; + _ -> erlang:error({assertException, [{unexpected_exception, X__Exc} | X__Attrs]}) + end + end)() +). + -define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin __TEST_CASE = ?FUNCTION_NAME, (fun diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index 1dab8f4c4..476f02eb3 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_schema.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -247,8 +248,9 @@ t_ssl_update_opts(Config) -> ), %% Unable to connect with old SSL options, certificate is now required. - ?assertError( - {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}, + ?assertExceptionOneOf( + {error, {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}}, + {error, closed}, emqtt_connect_ssl(Host, Port, [ {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts ])