From da0f0f947e0c4892bc72d90ccda3c9efdf85cca8 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 11:23:29 +0100 Subject: [PATCH] feat(listen): support hot config update of esockd-based listeners --- apps/emqx/src/emqx_listeners.erl | 217 +++++++++++++++---------------- 1 file changed, 107 insertions(+), 110 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 5cf631ccb..c6daf7fb2 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -55,7 +55,6 @@ ]). -export([pre_config_update/3, post_config_update/5]). --export([create_listener/3, remove_listener/3, update_listener/3]). -export([format_bind/1]). @@ -66,6 +65,11 @@ -export_type([listener_id/0]). -type listener_id() :: atom() | binary(). +-type listener_type() :: tcp | ssl | ws | wss | quic | dtls. + +-define(ESOCKD_LISTENER(T), (T == tcp orelse T == ssl)). +-define(COWBOY_LISTENER(T), (T == ws orelse T == wss)). + -define(ROOT_KEY, listeners). -define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']). -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]). @@ -140,15 +144,9 @@ format_raw_listeners({Type0, Conf}) -> -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}. is_running(ListenerId) -> - case - [ - Running - || {Id, #{running := Running}} <- list(), - Id =:= ListenerId - ] - of - [] -> {error, not_found}; - [IsRunning] -> IsRunning + case lists:keyfind(ListenerId, 1, list()) of + {_Id, #{running := Running}} -> Running; + false -> {error, not_found} end. is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl -> @@ -229,11 +227,10 @@ start() -> start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). --spec start_listener(atom(), atom(), map()) -> ok | {error, term()}. -start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> +-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}. +start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> case do_start_listener(Type, ListenerName, Conf) of {ok, {skipped, Reason}} when - Reason =:= listener_disabled; Reason =:= quic_app_missing -> ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}), @@ -269,7 +266,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> ) ), {error, {failed_to_start, Msg}} - end. + end; +start_listener(Type, ListenerName, #{enable := false}) -> + console_print( + "Listener ~ts is NOT started due to: disabled.~n", + [listener_id(Type, ListenerName)] + ), + ok. %% @doc Restart all listeners -spec restart() -> ok. @@ -280,16 +283,35 @@ restart() -> restart_listener(ListenerId) -> apply_on_listener(ListenerId, fun restart_listener/3). --spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}. -restart_listener(Type, ListenerName, {OldConf, NewConf}) -> - restart_listener(Type, ListenerName, OldConf, NewConf); +-spec restart_listener(listener_type(), atom(), map()) -> ok | {error, term()}. restart_listener(Type, ListenerName, Conf) -> restart_listener(Type, ListenerName, Conf, Conf). -restart_listener(Type, ListenerName, OldConf, NewConf) -> - case stop_listener(Type, ListenerName, OldConf) of - ok -> start_listener(Type, ListenerName, NewConf); - {error, Reason} -> {error, Reason} +update_listener(_Type, _Name, #{enable := false}, #{enable := false}) -> + ok; +update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) -> + stop_listener(Type, Name, Conf); +update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) -> + start_listener(Type, Name, Conf); +update_listener(Type, Name, OldConf = #{bind := Bind}, NewConf = #{bind := Bind}) -> + case do_update_listener(Type, Name, OldConf, NewConf) of + ok -> + ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), + ok; + {error, _Reason} -> + restart_listener(Type, Name, OldConf, NewConf) + end; +update_listener(Type, Name, OldConf, NewConf) -> + %% TODO + %% Again, we're not strictly required to drop live connections in this case. + restart_listener(Type, Name, OldConf, NewConf). + +restart_listener(Type, Name, OldConf, NewConf) -> + case stop_listener(Type, Name, OldConf) of + ok -> + start_listener(Type, Name, NewConf); + {error, Reason} -> + {error, Reason} end. %% @doc Stop all listeners. @@ -305,9 +327,10 @@ stop() -> stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). -stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> - Id = listener_id(Type, ListenerName), +stop_listener(Type, Name, #{bind := Bind} = Conf) -> + Id = listener_id(Type, Name), ok = del_limiter_bucket(Id, Conf), + ok = unregister_ocsp_stapling_refresh(Type, Name), case do_stop_listener(Type, Id, Conf) of ok -> console_print( @@ -325,11 +348,10 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> {error, Reason} end. --spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}. - -do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl -> +-spec do_stop_listener(listener_type(), atom(), map()) -> ok | {error, term()}. +do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) -> esockd:close(Id, ListenOn); -do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss -> +do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) -> case cowboy:stop_listener(Id) of ok -> wait_listener_stopped(ListenOn); @@ -369,39 +391,23 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args). console_print(_Fmt, _Args) -> ok. -endif. -%% Start MQTT/TCP listener --spec do_start_listener(atom(), atom(), map()) -> +-spec do_start_listener(listener_type(), atom(), map()) -> {ok, pid() | {skipped, atom()}} | {error, term()}. -do_start_listener(_Type, _ListenerName, #{enable := false}) -> - {ok, {skipped, listener_disabled}}; -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when - Type == tcp; Type == ssl --> - Id = listener_id(Type, ListenerName), - Limiter = limiter(Opts), - add_limiter_bucket(Id, Limiter), +%% Start MQTT/TCP listener +do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> + Id = listener_id(Type, Name), + ok = add_limiter_bucket(Id, limiter(Opts)), esockd:open( Id, ListenOn, - merge_default(esockd_opts(Id, Type, Opts)), - {emqx_connection, start_link, [ - #{ - listener => {Type, ListenerName}, - zone => zone(Opts), - limiter => Limiter, - enable_authn => enable_authn(Opts) - } - ]} + merge_default(esockd_opts(Id, Type, Name, Opts)) ); %% Start MQTT/WS listener -do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when - Type == ws; Type == wss --> +do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when ?COWBOY_LISTENER(Type) -> Id = listener_id(Type, ListenerName), - Limiter = limiter(Opts), - add_limiter_bucket(Id, Limiter), + ok = add_limiter_bucket(Id, limiter(Opts)), RanchOpts = ranch_opts(Type, ListenOn, Opts), - WsOpts = ws_opts(Type, ListenerName, Opts, Limiter), + WsOpts = ws_opts(Type, ListenerName, Opts), case Type of ws -> cowboy:start_clear(Id, RanchOpts, WsOpts); wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) @@ -476,6 +482,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> {ok, {skipped, quic_app_missing}} end. +do_update_listener(Type, Name, _OldConf, NewConf) when ?ESOCKD_LISTENER(Type) -> + Id = listener_id(Type, Name), + ListenOn = maps:get(bind, NewConf), + esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf)); +do_update_listener(_Type, _Name, _OldConf, _NewConf) -> + {error, not_supported}. + %% Update the listeners at runtime pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when V =:= undefined orelse V =:= ?TOMBSTONE_VALUE @@ -501,69 +514,44 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, OldConf, _AppEnvs) when OldConf =:= undefined orelse OldConf =:= ?TOMBSTONE_TYPE -> - create_listener(Type, Name, NewConf); + start_listener(Type, Name, NewConf); post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) -> - update_listener(Type, Name, {OldConf, NewConf}); + update_listener(Type, Name, OldConf, NewConf); post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) -> - remove_listener(Type, Name, OldConf); + stop_listener(Type, Name, OldConf); post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) -> - #{enable := NewEnabled} = NewConf, - #{enable := OldEnabled} = OldConf, - case {NewEnabled, OldEnabled} of - {true, true} -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - restart_listener(Type, Name, {OldConf, NewConf}); - {true, false} -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - start_listener(Type, Name, NewConf); - {false, true} -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf); - {false, false} -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf) - end; + update_listener(Type, Name, OldConf, NewConf); post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) -> ok; post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) -> #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf), - Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed), - perform_listener_changes([ - {fun ?MODULE:remove_listener/3, Removed}, - {fun ?MODULE:update_listener/3, Updated}, - {fun ?MODULE:create_listener/3, Added} - ]); + %% TODO + %% This currently lacks transactional semantics. If one of the changes fails, + %% previous changes will not be rolled back. + perform_listener_changes( + [{update, L} || L <- Changed] ++ + [{stop, L} || L <- Removed] ++ + [{start, L} || L <- Added] + ); post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) -> ok. -create_listener(Type, Name, NewConf) -> - start_listener(Type, Name, NewConf). - -remove_listener(Type, Name, OldConf) -> - ok = unregister_ocsp_stapling_refresh(Type, Name), - stop_listener(Type, Name, OldConf). - -update_listener(Type, Name, {OldConf, NewConf}) -> - ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), - restart_listener(Type, Name, {OldConf, NewConf}). - perform_listener_changes([]) -> ok; -perform_listener_changes([{Action, ConfL} | Tasks]) -> - case perform_listener_changes(Action, ConfL) of - ok -> perform_listener_changes(Tasks); +perform_listener_changes([{Action, Listener} | Rest]) -> + case perform_listener_change(Action, Listener) of + ok -> perform_listener_changes(Rest); {error, Reason} -> {error, Reason} end. -perform_listener_changes(_Action, []) -> - ok; -perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) -> - case Action(Type, Name, Diff) of - ok -> perform_listener_changes(Action, MapConf); - {error, Reason} -> {error, Reason} - end. +perform_listener_change(start, {Type, Name, Conf}) -> + start_listener(Type, Name, Conf); +perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) -> + update_listener(Type, Name, ConfOld, ConfNew); +perform_listener_change(stop, {Type, Name, Conf}) -> + stop_listener(Type, Name, Conf). -esockd_opts(ListenerId, Type, Opts0) -> +esockd_opts(ListenerId, Type, Name, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Limiter = limiter(Opts0), Opts2 = @@ -579,7 +567,16 @@ esockd_opts(ListenerId, Type, Opts0) -> end, Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])), - tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]} + tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}, + connection_mfargs => + {emqx_connection, start_link, [ + #{ + listener => {Type, Name}, + zone => zone(Opts0), + limiter => Limiter, + enable_authn => enable_authn(Opts0) + } + ]} }, maps:to_list( case Type of @@ -593,12 +590,12 @@ esockd_opts(ListenerId, Type, Opts0) -> end ). -ws_opts(Type, ListenerName, Opts, Limiter) -> +ws_opts(Type, ListenerName, Opts) -> WsPaths = [ {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{ zone => zone(Opts), listener => {Type, ListenerName}, - limiter => Limiter, + limiter => limiter(Opts), enable_authn => enable_authn(Opts) }} ], @@ -742,24 +739,24 @@ diff_confs(NewConfs, OldConfs) -> emqx_utils:diff_lists( flatten_confs(NewConfs), flatten_confs(OldConfs), - fun({Key, _}) -> Key end + fun({Type, Name, _}) -> {Type, Name} end ). -flatten_confs(Conf0) -> +flatten_confs(Confs) -> lists:flatmap( - fun({Type, Conf}) -> - do_flatten_confs(Type, Conf) + fun({Type, Listeners}) -> + do_flatten_confs(Type, Listeners) end, - maps:to_list(Conf0) + maps:to_list(Confs) ). -do_flatten_confs(Type, Conf0) -> +do_flatten_confs(Type, Listeners) -> FilterFun = fun ({_Name, ?TOMBSTONE_TYPE}) -> false; - ({Name, Conf}) -> {true, {{Type, Name}, Conf}} + ({Name, Conf}) -> {true, {Type, Name, Conf}} end, - lists:filtermap(FilterFun, maps:to_list(Conf0)). + lists:filtermap(FilterFun, maps:to_list(Listeners)). enable_authn(Opts) -> maps:get(enable_authn, Opts, true).