diff --git a/apps/emqx/include/asserts.hrl b/apps/emqx/include/asserts.hrl index 489c47862..63dad42aa 100644 --- a/apps/emqx/include/asserts.hrl +++ b/apps/emqx/include/asserts.hrl @@ -83,6 +83,28 @@ end)() ). +-define(assertExceptionOneOf(CT1, CT2, EXPR), + (fun() -> + X__Attrs = [ + {module, ?MODULE}, + {line, ?LINE}, + {expression, (??EXPR)}, + {pattern, "[ " ++ (??CT1) ++ ", " ++ (??CT2) ++ " ]"} + ], + X__Exc = + try (EXPR) of + X__V -> erlang:error({assertException, [{unexpected_success, X__V} | X__Attrs]}) + catch + X__C:X__T:X__S -> {X__C, X__T, X__S} + end, + case {element(1, X__Exc), element(2, X__Exc)} of + CT1 -> ok; + CT2 -> ok; + _ -> erlang:error({assertException, [{unexpected_exception, X__Exc} | X__Attrs]}) + end + end)() +). + -define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin __TEST_CASE = ?FUNCTION_NAME, (fun diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index db8bc6ce7..931e2bb8c 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.40.3"}}}, diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 5cf631ccb..dc1f6d9ad 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -55,7 +55,6 @@ ]). -export([pre_config_update/3, post_config_update/5]). --export([create_listener/3, remove_listener/3, update_listener/3]). -export([format_bind/1]). @@ -66,6 +65,11 @@ -export_type([listener_id/0]). -type listener_id() :: atom() | binary(). +-type listener_type() :: tcp | ssl | ws | wss | quic | dtls. + +-define(ESOCKD_LISTENER(T), (T == tcp orelse T == ssl)). +-define(COWBOY_LISTENER(T), (T == ws orelse T == wss)). + -define(ROOT_KEY, listeners). -define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). @@ -140,15 +144,9 @@ format_raw_listeners({Type0, Conf}) -> -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. is_running(ListenerId) -> - case - [ - Running - || {Id, #{running := Running}} <- list(), - Id =:= ListenerId - ] - of - [] -> {error, not_found}; - [IsRunning] -> IsRunning + case lists:keyfind(ListenerId, 1, list()) of + {_Id, #{running := Running}} -> Running; + false -> {error, not_found} end. is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> @@ -229,24 +227,26 @@ start() -> start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). --spec start_listener(atom(), atom(), map()) -> ok | {error, term()}. -start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> - case do_start_listener(Type, ListenerName, Conf) of +-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}. +start_listener(Type, Name, #{bind := Bind, enable := true} = Conf) -> + ListenerId = listener_id(Type, Name), + Limiter = limiter(Conf), + ok = add_limiter_bucket(ListenerId, Limiter), + case do_start_listener(Type, Name, ListenerId, Conf) of {ok, {skipped, Reason}} when - Reason =:= listener_disabled; Reason =:= quic_app_missing -> ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}), console_print( "Listener ~ts is NOT started due to: ~p.~n", - [listener_id(Type, ListenerName), Reason] + [ListenerId, Reason] ), ok; {ok, _} -> ?tp(listener_started, #{type => Type, bind => Bind}), console_print( "Listener ~ts on ~ts started.~n", - [listener_id(Type, ListenerName), format_bind(Bind)] + [ListenerId, format_bind(Bind)] ), ok; {error, {already_started, Pid}} -> @@ -255,8 +255,8 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> }), {error, {already_started, Pid}}; {error, Reason} -> + ok = del_limiter_bucket(ListenerId, Limiter), ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}), - ListenerId = listener_id(Type, ListenerName), BindStr = format_bind(Bind), ?ELOG( "Failed to start listener ~ts on ~ts: ~0p.~n", @@ -269,7 +269,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> ) ), {error, {failed_to_start, Msg}} - end. + end; +start_listener(Type, Name, #{enable := false}) -> + console_print( + "Listener ~ts is NOT started due to: disabled.~n", + [listener_id(Type, Name)] + ), + ok. %% @doc Restart all listeners -spec restart() -> ok. @@ -280,16 +286,33 @@ restart() -> restart_listener(ListenerId) -> apply_on_listener(ListenerId, fun restart_listener/3). --spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}. -restart_listener(Type, ListenerName, {OldConf, NewConf}) -> - restart_listener(Type, ListenerName, OldConf, NewConf); +-spec restart_listener(listener_type(), atom(), map()) -> ok | {error, term()}. restart_listener(Type, ListenerName, Conf) -> restart_listener(Type, ListenerName, Conf, Conf). -restart_listener(Type, ListenerName, OldConf, NewConf) -> - case stop_listener(Type, ListenerName, OldConf) of - ok -> start_listener(Type, ListenerName, NewConf); - {error, Reason} -> {error, Reason} +update_listener(_Type, _Name, #{enable := false}, #{enable := false}) -> + ok; +update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) -> + stop_listener(Type, Name, Conf); +update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) -> + start_listener(Type, Name, Conf); +update_listener(Type, Name, OldConf, NewConf) -> + Id = listener_id(Type, Name), + ok = update_limiter_bucket(Id, limiter(OldConf), limiter(NewConf)), + case do_update_listener(Type, Name, OldConf, NewConf) of + ok -> + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), + ok; + {error, _Reason} -> + restart_listener(Type, Name, OldConf, NewConf) + end. + +restart_listener(Type, Name, OldConf, NewConf) -> + case stop_listener(Type, Name, OldConf) of + ok -> + start_listener(Type, Name, NewConf); + {error, Reason} -> + {error, Reason} end. %% @doc Stop all listeners. @@ -305,9 +328,10 @@ stop() -> stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). -stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> - Id = listener_id(Type, ListenerName), - ok = del_limiter_bucket(Id, Conf), +stop_listener(Type, Name, #{bind := Bind} = Conf) -> + Id = listener_id(Type, Name), + ok = del_limiter_bucket(Id, limiter(Conf)), + ok = unregister_ocsp_stapling_refresh(Type, Name), case do_stop_listener(Type, Id, Conf) of ok -> console_print( @@ -325,11 +349,10 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> {error, Reason} end. --spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}. - -do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl -> +-spec do_stop_listener(listener_type(), atom(), map()) -> ok | {error, term()}. +do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) -> esockd:close(Id, ListenOn); -do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss -> +do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) -> case cowboy:stop_listener(Id) of ok -> wait_listener_stopped(ListenOn); @@ -369,45 +392,25 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args). console_print(_Fmt, _Args) -> ok. -endif. -%% Start MQTT/TCP listener --spec do_start_listener(atom(), atom(), map()) -> +-spec do_start_listener(listener_type(), atom(), listener_id(), map()) -> {ok, pid() | {skipped, atom()}} | {error, term()}. -do_start_listener(_Type, _ListenerName, #{enable := false}) -> - {ok, {skipped, listener_disabled}}; -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when - Type == tcp; Type == ssl --> - Id = listener_id(Type, ListenerName), - Limiter = limiter(Opts), - add_limiter_bucket(Id, Limiter), +%% Start MQTT/TCP listener +do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> esockd:open( Id, ListenOn, - merge_default(esockd_opts(Id, Type, Opts)), - {emqx_connection, start_link, [ - #{ - listener => {Type, ListenerName}, - zone => zone(Opts), - limiter => Limiter, - enable_authn => enable_authn(Opts) - } - ]} + merge_default(esockd_opts(Id, Type, Name, Opts)) ); %% Start MQTT/WS listener -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when - Type == ws; Type == wss --> - Id = listener_id(Type, ListenerName), - Limiter = limiter(Opts), - add_limiter_bucket(Id, Limiter), - RanchOpts = ranch_opts(Type, ListenOn, Opts), - WsOpts = ws_opts(Type, ListenerName, Opts, Limiter), +do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) -> + RanchOpts = ranch_opts(Type, Opts), + WsOpts = ws_opts(Type, Name, Opts), case Type of ws -> cowboy:start_clear(Id, RanchOpts, WsOpts); wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) end; %% Start MQTT/QUIC listener -do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> +do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) -> ListenOn = case Bind of {Addr, Port} when tuple_size(Addr) == 4 -> @@ -457,16 +460,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1), peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10), zone => zone(Opts), - listener => {quic, ListenerName}, + listener => {quic, Name}, limiter => Limiter }, StreamOpts = #{ stream_callback => emqx_quic_stream, active => 1 }, - - Id = listener_id(quic, ListenerName), - add_limiter_bucket(Id, Limiter), quicer:spawn_listener( Id, ListenOn, @@ -476,6 +476,39 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> {ok, {skipped, quic_app_missing}} end. +do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when + ?ESOCKD_LISTENER(Type) +-> + Id = listener_id(Type, Name), + case maps:get(bind, OldConf) of + ListenOn -> + esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf)); + _Different -> + %% TODO + %% Again, we're not strictly required to drop live connections in this case. + {error, not_supported} + end; +do_update_listener(Type, Name, OldConf, NewConf) when + ?COWBOY_LISTENER(Type) +-> + Id = listener_id(Type, Name), + RanchOpts = ranch_opts(Type, NewConf), + WsOpts = ws_opts(Type, Name, NewConf), + case ranch_opts(Type, OldConf) of + RanchOpts -> + %% Transport options did not change, no need to touch the listener. + ok; + _Different -> + %% Transport options changed, we need to tear down the listener. + ok = ranch:suspend_listener(Id), + ok = ranch:set_transport_options(Id, RanchOpts) + end, + ok = ranch:set_protocol_options(Id, WsOpts), + %% No-op if the listener was not suspended. + ranch:resume_listener(Id); +do_update_listener(_Type, _Name, _OldConf, _NewConf) -> + {error, not_supported}. + %% Update the listeners at runtime pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when V =:= undefined orelse V =:= ?TOMBSTONE_VALUE @@ -501,69 +534,44 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, OldConf, _AppEnvs) when OldConf =:= undefined orelse OldConf =:= ?TOMBSTONE_TYPE -> - create_listener(Type, Name, NewConf); + start_listener(Type, Name, NewConf); post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> - update_listener(Type, Name, {OldConf, NewConf}); + update_listener(Type, Name, OldConf, NewConf); post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) -> - remove_listener(Type, Name, OldConf); + stop_listener(Type, Name, OldConf); post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> - #{enable := NewEnabled} = NewConf, - #{enable := OldEnabled} = OldConf, - case {NewEnabled, OldEnabled} of - {true, true} -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - restart_listener(Type, Name, {OldConf, NewConf}); - {true, false} -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - start_listener(Type, Name, NewConf); - {false, true} -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf); - {false, false} -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf) - end; + update_listener(Type, Name, OldConf, NewConf); post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) -> ok; post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) -> #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf), - Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed), - perform_listener_changes([ - {fun ?MODULE:remove_listener/3, Removed}, - {fun ?MODULE:update_listener/3, Updated}, - {fun ?MODULE:create_listener/3, Added} - ]); + %% TODO + %% This currently lacks transactional semantics. If one of the changes fails, + %% previous changes will not be rolled back. + perform_listener_changes( + [{update, L} || L <- Changed] ++ + [{stop, L} || L <- Removed] ++ + [{start, L} || L <- Added] + ); post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. -create_listener(Type, Name, NewConf) -> - start_listener(Type, Name, NewConf). - -remove_listener(Type, Name, OldConf) -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf). - -update_listener(Type, Name, {OldConf, NewConf}) -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - restart_listener(Type, Name, {OldConf, NewConf}). - perform_listener_changes([]) -> ok; -perform_listener_changes([{Action, ConfL} | Tasks]) -> - case perform_listener_changes(Action, ConfL) of - ok -> perform_listener_changes(Tasks); +perform_listener_changes([{Action, Listener} | Rest]) -> + case perform_listener_change(Action, Listener) of + ok -> perform_listener_changes(Rest); {error, Reason} -> {error, Reason} end. -perform_listener_changes(_Action, []) -> - ok; -perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) -> - case Action(Type, Name, Diff) of - ok -> perform_listener_changes(Action, MapConf); - {error, Reason} -> {error, Reason} - end. +perform_listener_change(start, {Type, Name, Conf}) -> + start_listener(Type, Name, Conf); +perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) -> + update_listener(Type, Name, ConfOld, ConfNew); +perform_listener_change(stop, {Type, Name, Conf}) -> + stop_listener(Type, Name, Conf). -esockd_opts(ListenerId, Type, Opts0) -> +esockd_opts(ListenerId, Type, Name, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), Opts2 = @@ -579,7 +587,16 @@ esockd_opts(ListenerId, Type, Opts0) -> end, Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])), - tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]} + tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}, + connection_mfargs => + {emqx_connection, start_link, [ + #{ + listener => {Type, Name}, + zone => zone(Opts0), + limiter => Limiter, + enable_authn => enable_authn(Opts0) + } + ]} }, maps:to_list( case Type of @@ -593,20 +610,21 @@ esockd_opts(ListenerId, Type, Opts0) -> end ). -ws_opts(Type, ListenerName, Opts, Limiter) -> - WsPaths = [ - {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{ +ws_opts(Type, ListenerName, Opts) -> + WsPath = emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), + WsRoutes = [ + {WsPath, emqx_ws_connection, #{ zone => zone(Opts), listener => {Type, ListenerName}, - limiter => Limiter, + limiter => limiter(Opts), enable_authn => enable_authn(Opts) }} ], - Dispatch = cowboy_router:compile([{'_', WsPaths}]), + Dispatch = cowboy_router:compile([{'_', WsRoutes}]), ProxyProto = maps:get(proxy_protocol, Opts, false), #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}. -ranch_opts(Type, ListenOn, Opts) -> +ranch_opts(Type, Opts = #{bind := ListenOn}) -> NumAcceptors = maps:get(acceptors, Opts, 4), MaxConnections = maps:get(max_connections, Opts, 1024), SocketOpts = @@ -725,41 +743,47 @@ add_limiter_bucket(Id, Limiter) -> maps:without([client], Limiter) ). -del_limiter_bucket(Id, Conf) -> - case limiter(Conf) of - undefined -> - ok; - Limiter -> - lists:foreach( - fun(Type) -> - emqx_limiter_server:del_bucket(Id, Type) - end, - maps:keys(Limiter) - ) - end. +del_limiter_bucket(_Id, undefined) -> + ok; +del_limiter_bucket(Id, Limiter) -> + maps:foreach( + fun(Type, _) -> + emqx_limiter_server:del_bucket(Id, Type) + end, + Limiter + ). + +update_limiter_bucket(Id, Limiter, undefined) -> + del_limiter_bucket(Id, Limiter); +update_limiter_bucket(Id, undefined, Limiter) -> + add_limiter_bucket(Id, Limiter); +update_limiter_bucket(Id, OldLimiter, NewLimiter) -> + ok = add_limiter_bucket(Id, NewLimiter), + Outdated = maps:without(maps:keys(NewLimiter), OldLimiter), + del_limiter_bucket(Id, Outdated). diff_confs(NewConfs, OldConfs) -> emqx_utils:diff_lists( flatten_confs(NewConfs), flatten_confs(OldConfs), - fun({Key, _}) -> Key end + fun({Type, Name, _}) -> {Type, Name} end ). -flatten_confs(Conf0) -> +flatten_confs(Confs) -> lists:flatmap( - fun({Type, Conf}) -> - do_flatten_confs(Type, Conf) + fun({Type, Listeners}) -> + do_flatten_confs(Type, Listeners) end, - maps:to_list(Conf0) + maps:to_list(Confs) ). -do_flatten_confs(Type, Conf0) -> +do_flatten_confs(Type, Listeners) -> FilterFun = fun ({_Name, ?TOMBSTONE_TYPE}) -> false; - ({Name, Conf}) -> {true, {{Type, Name}, Conf}} + ({Name, Conf}) -> {true, {Type, Name, Conf}} end, - lists:filtermap(FilterFun, maps:to_list(Conf0)). + lists:filtermap(FilterFun, maps:to_list(Listeners)). enable_authn(Opts) -> maps:get(enable_authn, Opts, true). diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index b8d0c39f6..476f02eb3 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -20,122 +20,46 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(CERTS_PATH(CertName), filename:join(["../../lib/emqx/etc/certs/", CertName])). - -define(SERVER_KEY_PASSWORD, "sErve7r8Key$!"). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - NewConfig = generate_config(), - application:ensure_all_started(esockd), - application:ensure_all_started(quicer), - application:ensure_all_started(cowboy), generate_tls_certs(Config), - lists:foreach(fun set_app_env/1, NewConfig), - Config. + WorkDir = emqx_cth_suite:work_dir(Config), + Apps = emqx_cth_suite:start([quicer, emqx], #{work_dir => WorkDir}), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - application:stop(esockd), - application:stop(cowboy). +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(Case, Config) when - Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp + Case =:= t_start_stop_listeners; + Case =:= t_restart_listeners; + Case =:= t_restart_listeners_with_hibernate_after_disabled -> - catch emqx_config_handler:stop(), - Port = emqx_common_test_helpers:select_free_port(tcp), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - PureListeners2 = PureListeners#{ - tcp => #{ - listener_test => #{ - bind => {"127.0.0.1", Port}, - max_connections => 4321, - limiter => #{} - } - } - }, - emqx_config:put([listeners], PureListeners2), - - ok = emqx_listeners:start(), - [ - {prev_listener_conf, PrevListeners}, - {tcp_port, Port} - | Config - ]; -init_per_testcase(t_wss_conn, Config) -> - catch emqx_config_handler:stop(), - Port = emqx_common_test_helpers:select_free_port(ssl), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - PureListeners2 = PureListeners#{ - wss => #{ - listener_test => #{ - bind => {{127, 0, 0, 1}, Port}, - limiter => #{}, - ssl_options => #{ - cacertfile => ?CERTS_PATH("cacert.pem"), - certfile => ?CERTS_PATH("cert.pem"), - keyfile => ?CERTS_PATH("key.pem") - } - } - } - }, - emqx_config:put([listeners], PureListeners2), - - ok = emqx_listeners:start(), - [ - {prev_listener_conf, PrevListeners}, - {wss_port, Port} - | Config - ]; + ok = emqx_listeners:stop(), + Config; init_per_testcase(_, Config) -> - catch emqx_config_handler:stop(), - {ok, _} = emqx_config_handler:start_link(), - PrevListeners = emqx_config:get([listeners], #{}), - PureListeners = remove_default_limiter(PrevListeners), - emqx_config:put([listeners], PureListeners), - [ - {prev_listener_conf, PrevListeners} - | Config - ]. + ok = emqx_listeners:start(), + Config. -end_per_testcase(Case, Config) when - Case =:= t_max_conns_tcp; Case =:= t_current_conns_tcp --> - PrevListener = ?config(prev_listener_conf, Config), - emqx_listeners:stop(), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), - ok; -end_per_testcase(t_wss_conn, Config) -> - PrevListener = ?config(prev_listener_conf, Config), - emqx_listeners:stop(), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), - ok; -end_per_testcase(_, Config) -> - PrevListener = ?config(prev_listener_conf, Config), - emqx_config:put([listeners], PrevListener), - _ = emqx_config_handler:stop(), +end_per_testcase(_, _Config) -> ok. t_start_stop_listeners(_) -> ok = emqx_listeners:start(), - ?assertException(error, _, emqx_listeners:start_listener({ws, {"127.0.0.1", 8083}, []})), + ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})), ok = emqx_listeners:stop(). t_restart_listeners(_) -> ok = emqx_listeners:start(), ok = emqx_listeners:stop(), - %% flakyness: eaddrinuse - timer:sleep(timer:seconds(2)), ok = emqx_listeners:restart(), ok = emqx_listeners:stop(). @@ -168,77 +92,315 @@ t_restart_listeners_with_hibernate_after_disabled(_Config) -> ), ok = emqx_listeners:start(), ok = emqx_listeners:stop(), - %% flakyness: eaddrinuse - timer:sleep(timer:seconds(2)), ok = emqx_listeners:restart(), ok = emqx_listeners:stop(), emqx_config:put([listeners], OldLConf). -t_max_conns_tcp(Config) -> +t_max_conns_tcp(_Config) -> %% Note: Using a string representation for the bind address like %% "127.0.0.1" does not work - ?assertEqual( - 4321, - emqx_listeners:max_conns('tcp:listener_test', {{127, 0, 0, 1}, ?config(tcp_port, Config)}) - ). + Port = emqx_common_test_helpers:select_free_port(tcp), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"max_connections">> => 4321, + <<"limiter">> => #{} + }, + with_listener(tcp, maxconns, Conf, fun() -> + ?assertEqual( + 4321, + emqx_listeners:max_conns('tcp:maxconns', {{127, 0, 0, 1}, Port}) + ) + end). -t_current_conns_tcp(Config) -> - ?assertEqual( - 0, - emqx_listeners:current_conns('tcp:listener_test', { - {127, 0, 0, 1}, ?config(tcp_port, Config) - }) - ). +t_current_conns_tcp(_Config) -> + Port = emqx_common_test_helpers:select_free_port(tcp), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"max_connections">> => 42, + <<"limiter">> => #{} + }, + with_listener(tcp, curconns, Conf, fun() -> + ?assertEqual( + 0, + emqx_listeners:current_conns('tcp:curconns', {{127, 0, 0, 1}, Port}) + ) + end). t_wss_conn(Config) -> - {ok, Socket} = ssl:connect( - {127, 0, 0, 1}, ?config(wss_port, Config), [{verify, verify_none}], 1000 - ), - ok = ssl:close(Socket). + PrivDir = ?config(priv_dir, Config), + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"limiter">> => #{}, + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }, + with_listener(wss, wssconn, Conf, fun() -> + {ok, Socket} = ssl:connect({127, 0, 0, 1}, Port, [{verify, verify_none}], 1000), + ok = ssl:close(Socket) + end). t_quic_conn(Config) -> + PrivDir = ?config(priv_dir, Config), Port = emqx_common_test_helpers:select_free_port(quic), - DataDir = ?config(data_dir, Config), - SSLOpts = #{ - password => ?SERVER_KEY_PASSWORD, - certfile => filename:join(DataDir, "server-password.pem"), - cacertfile => filename:join(DataDir, "ca.pem"), - keyfile => filename:join(DataDir, "server-password.key") + Conf = #{ + <<"bind">> => format_bind({"127.0.0.1", Port}), + <<"ssl_options">> => #{ + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key") + } }, - emqx_common_test_helpers:ensure_quic_listener(?FUNCTION_NAME, Port, #{ssl_options => SSLOpts}), - ct:pal("~p", [emqx_listeners:list()]), - {ok, Conn} = quicer:connect( - {127, 0, 0, 1}, - Port, - [ - {verify, verify_none}, - {alpn, ["mqtt"]} - ], - 1000 - ), - ok = quicer:close_connection(Conn), - emqx_listeners:stop_listener(quic, ?FUNCTION_NAME, #{bind => Port}). + with_listener(quic, ?FUNCTION_NAME, Conf, fun() -> + {ok, Conn} = quicer:connect( + {127, 0, 0, 1}, + Port, + [ + {verify, verify_none}, + {alpn, ["mqtt"]} + ], + 1000 + ), + ok = quicer:close_connection(Conn) + end). t_ssl_password_cert(Config) -> + PrivDir = ?config(priv_dir, Config), Port = emqx_common_test_helpers:select_free_port(ssl), - DataDir = ?config(data_dir, Config), SSLOptsPWD = #{ - password => ?SERVER_KEY_PASSWORD, - certfile => filename:join(DataDir, "server-password.pem"), - cacertfile => filename:join(DataDir, "ca.pem"), - keyfile => filename:join(DataDir, "server-password.key") + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key") }, LConf = #{ - enable => true, - bind => {{127, 0, 0, 1}, Port}, - mountpoint => <<>>, - zone => default, - ssl_options => SSLOptsPWD + <<"enable">> => true, + <<"bind">> => format_bind({{127, 0, 0, 1}, Port}), + <<"ssl_options">> => SSLOptsPWD }, - ok = emqx_listeners:start_listener(ssl, ?FUNCTION_NAME, LConf), - {ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]), - ssl:close(SSLSocket), - emqx_listeners:stop_listener(ssl, ?FUNCTION_NAME, LConf). + with_listener(ssl, ?FUNCTION_NAME, LConf, fun() -> + {ok, SSLSocket} = ssl:connect("127.0.0.1", Port, [{verify, verify_none}]), + ssl:close(SSLSocket) + end). + +t_ssl_update_opts(Config) -> + PrivDir = ?config(priv_dir, Config), + Host = "127.0.0.1", + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"enable">> => true, + <<"bind">> => format_bind({Host, Port}), + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"verify">> => verify_none + } + }, + ClientSSLOpts = [ + {verify, verify_peer}, + {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} + ], + with_listener(ssl, updated, Conf, fun() -> + %% Client connects successfully. + C1 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: another set of cert/key files. + {ok, _} = emqx:update_config( + [listeners, ssl, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }} + ), + + %% Unable to connect with old SSL options, server's cert is signed by another CA. + ?assertError( + {tls_alert, {unknown_ca, _}}, + emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} | ClientSSLOpts + ]) + ), + + C2 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: require peer certificate. + {ok, _} = emqx:update_config( + [listeners, ssl, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"verify">> => verify_peer, + <<"fail_if_no_peer_cert">> => true + } + }} + ), + + %% Unable to connect with old SSL options, certificate is now required. + ?assertExceptionOneOf( + {error, {ssl_error, _Socket, {tls_alert, {certificate_required, _}}}}, + {error, closed}, + emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} | ClientSSLOpts + ]) + ), + + C3 = emqtt_connect_ssl(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")}, + {certfile, filename:join(PrivDir, "client.pem")}, + {keyfile, filename:join(PrivDir, "client.key")} + | ClientSSLOpts + ]), + + %% Both pre- and post-update clients should be alive. + ?assertEqual(pong, emqtt:ping(C1)), + ?assertEqual(pong, emqtt:ping(C2)), + ?assertEqual(pong, emqtt:ping(C3)), + + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) + end). + +t_wss_update_opts(Config) -> + PrivDir = ?config(priv_dir, Config), + Host = "127.0.0.1", + Port = emqx_common_test_helpers:select_free_port(ssl), + Conf = #{ + <<"enable">> => true, + <<"bind">> => format_bind({Host, Port}), + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca.pem"), + <<"certfile">> => filename:join(PrivDir, "server-password.pem"), + <<"keyfile">> => filename:join(PrivDir, "server-password.key"), + <<"password">> => ?SERVER_KEY_PASSWORD, + <<"verify">> => verify_none + } + }, + ClientSSLOpts = [ + {verify, verify_peer}, + {customize_hostname_check, [{match_fun, fun(_, _) -> true end}]} + ], + with_listener(wss, updated, Conf, fun() -> + %% Start a client. + C1 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca.pem")} + | ClientSSLOpts + ]), + + %% Change the listener SSL configuration. + %% 1. Another set of (password protected) cert/key files. + %% 2. Require peer certificate. + {ok, _} = emqx:update_config( + [listeners, wss, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"cacertfile">> => filename:join(PrivDir, "ca-next.pem"), + <<"certfile">> => filename:join(PrivDir, "server.pem"), + <<"keyfile">> => filename:join(PrivDir, "server.key") + } + }} + ), + + %% Unable to connect with old SSL options, server's cert is signed by another CA. + %% Due to a bug `emqtt` exits with `badmatch` in this case. + ?assertExit( + _Badmatch, + emqtt_connect_wss(Host, Port, ClientSSLOpts) + ), + + C2 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} + | ClientSSLOpts + ]), + + %% Change the listener SSL configuration: require peer certificate. + {ok, _} = emqx:update_config( + [listeners, wss, updated], + {update, #{ + <<"ssl_options">> => #{ + <<"verify">> => verify_peer, + <<"fail_if_no_peer_cert">> => true + } + }} + ), + + %% Unable to connect with old SSL options, certificate is now required. + %% Due to a bug `emqtt` does not instantly report that socket was closed. + ?assertError( + timeout, + emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")} + | ClientSSLOpts + ]) + ), + + C3 = emqtt_connect_wss(Host, Port, [ + {cacertfile, filename:join(PrivDir, "ca-next.pem")}, + {certfile, filename:join(PrivDir, "client.pem")}, + {keyfile, filename:join(PrivDir, "client.key")} + | ClientSSLOpts + ]), + + %% Both pre- and post-update clients should be alive. + ?assertEqual(pong, emqtt:ping(C1)), + ?assertEqual(pong, emqtt:ping(C2)), + ?assertEqual(pong, emqtt:ping(C3)), + + ok = emqtt:stop(C1), + ok = emqtt:stop(C2), + ok = emqtt:stop(C3) + end). + +with_listener(Type, Name, Config, Then) -> + {ok, _} = emqx:update_config([listeners, Type, Name], {create, Config}), + try + Then() + after + emqx:update_config([listeners, Type, Name], ?TOMBSTONE_CONFIG_CHANGE_REQ) + end. + +emqtt_connect_ssl(Host, Port, SSLOpts) -> + emqtt_connect(fun emqtt:connect/1, #{ + hosts => [{Host, Port}], + connect_timeout => 1, + ssl => true, + ssl_opts => SSLOpts + }). + +emqtt_connect_wss(Host, Port, SSLOpts) -> + emqtt_connect(fun emqtt:ws_connect/1, #{ + hosts => [{Host, Port}], + connect_timeout => 1, + ws_transport_options => [ + {protocols, [http]}, + {transport, tls}, + {tls_opts, SSLOpts} + ] + }). + +emqtt_connect(Connect, Opts) -> + case emqtt:start_link(Opts) of + {ok, Client} -> + true = erlang:unlink(Client), + case Connect(Client) of + {ok, _} -> Client; + {error, Reason} -> error(Reason, [Opts]) + end; + {error, Reason} -> + error(Reason, [Opts]) + end. t_format_bind(_) -> ?assertEqual( @@ -266,67 +428,15 @@ t_format_bind(_) -> lists:flatten(emqx_listeners:format_bind(":1883")) ). -render_config_file() -> - Path = local_path(["etc", "emqx.conf"]), - {ok, Temp} = file:read_file(Path), - Vars0 = mustache_vars(), - Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0], - Targ = bbmustache:render(Temp, Vars), - NewName = Path ++ ".rendered", - ok = file:write_file(NewName, Targ), - NewName. - -mustache_vars() -> - [ - {platform_data_dir, local_path(["data"])}, - {platform_etc_dir, local_path(["etc"])} - ]. - -generate_config() -> - ConfFile = render_config_file(), - {ok, Conf} = hocon:load(ConfFile, #{format => richmap}), - hocon_tconf:generate(emqx_schema, Conf). - -set_app_env({App, Lists}) -> - lists:foreach( - fun - ({authz_file, _Var}) -> - application:set_env(App, authz_file, local_path(["etc", "authz.conf"])); - ({Par, Var}) -> - application:set_env(App, Par, Var) - end, - Lists - ). - -local_path(Components, Module) -> - filename:join([get_base_dir(Module) | Components]). - -local_path(Components) -> - local_path(Components, ?MODULE). - -get_base_dir(Module) -> - {file, Here} = code:is_loaded(Module), - filename:dirname(filename:dirname(Here)). - -get_base_dir() -> - get_base_dir(?MODULE). - -remove_default_limiter(Listeners) -> - maps:map( - fun(_, X) -> - maps:map( - fun(_, E) -> - maps:remove(limiter, E) - end, - X - ) - end, - Listeners - ). - generate_tls_certs(Config) -> - DataDir = ?config(data_dir, Config), - emqx_common_test_helpers:gen_ca(DataDir, "ca"), - emqx_common_test_helpers:gen_host_cert("server-password", "ca", DataDir, #{ + PrivDir = ?config(priv_dir, Config), + emqx_common_test_helpers:gen_ca(PrivDir, "ca"), + emqx_common_test_helpers:gen_ca(PrivDir, "ca-next"), + emqx_common_test_helpers:gen_host_cert("server", "ca-next", PrivDir, #{}), + emqx_common_test_helpers:gen_host_cert("client", "ca-next", PrivDir, #{}), + emqx_common_test_helpers:gen_host_cert("server-password", "ca", PrivDir, #{ password => ?SERVER_KEY_PASSWORD }). + +format_bind(Bind) -> + iolist_to_binary(emqx_listeners:format_bind(Bind)). diff --git a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl index 76e11ef00..74a488abb 100644 --- a/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway_exproto/test/emqx_exproto_SUITE.erl @@ -663,9 +663,11 @@ tcp_opts() -> udp_opts() -> #{ - recbuf => 1024, - sndbuf => 1024, - buffer => 1024, + %% NOTE + %% Making those too small will lead to inability to accept connections. + recbuf => 2048, + sndbuf => 2048, + buffer => 2048, reuseaddr => true }. diff --git a/changes/ce/feat-12201.en.md b/changes/ce/feat-12201.en.md new file mode 100644 index 000000000..4247ec6e9 --- /dev/null +++ b/changes/ce/feat-12201.en.md @@ -0,0 +1,10 @@ +Support hot update of TCP/SSL/WS/WSS MQTT listeners configuration, which allows changing most of the configuration parameters without restarting the listener and disconnecting the clients. + +In case of TCP/SSL listeners, changes to the following parameters still require full listener restart: + * `bind` + * `tcp_options.backlog` + +In case of WS/WSS listeners, any parameter can be freely changed without losing the connected clients. However, changing transport related parameters will cause listening socket to be re-opened, namely: + * `bind` + * `tcp_options.*` + * `ssl_options.*` diff --git a/mix.exs b/mix.exs index 58a951092..6f88ef0b4 100644 --- a/mix.exs +++ b/mix.exs @@ -53,7 +53,7 @@ defmodule EMQXUmbrella.MixProject do {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.6", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true}, - {:esockd, github: "emqx/esockd", tag: "5.9.9", override: true}, + {:esockd, github: "emqx/esockd", tag: "5.11.1", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.8.0-emqx-2", override: true}, {:ekka, github: "emqx/ekka", tag: "0.17.0", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "3.3.1", override: true}, diff --git a/rebar.config b/rebar.config index 3c5169688..ffdc5472a 100644 --- a/rebar.config +++ b/rebar.config @@ -69,7 +69,7 @@ , {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.9"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.1"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-2"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.17.0"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}