diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index d9670b858..399bd3d08 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -41,6 +41,10 @@ , parse_listener_id/1 ]). +-export([post_config_update/4]). + +-define(CONF_KEY_PATH, [listeners]). + %% @doc List configured listeners. -spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]). list() -> @@ -88,6 +92,9 @@ is_running(quic, _ListenerId, _Conf)-> %% @doc Start all listeners. -spec(start() -> ok). start() -> + %% The ?MODULE:start/0 will be called by emqx_app when emqx get started, + %% so we install the config handler here. + ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), foreach_listeners(fun start_listener/3). -spec start_listener(atom()) -> ok | {error, term()}. @@ -102,7 +109,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) -> console_print("- Skip - starting listener ~s on ~s ~n due to ~p", [listener_id(Type, ListenerName), format_addr(Bind), Reason]); {ok, _} -> - console_print("Start listener ~s on ~s successfully.~n", + console_print("Listener ~s on ~s started.~n", [listener_id(Type, ListenerName), format_addr(Bind)]); {error, {already_started, Pid}} -> {error, {already_started, Pid}}; @@ -122,27 +129,47 @@ restart_listener(ListenerId) -> apply_on_listener(ListenerId, fun restart_listener/3). -spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}). +restart_listener(Type, ListenerName, {OldConf, NewConf}) -> + restart_listener(Type, ListenerName, OldConf, NewConf); restart_listener(Type, ListenerName, Conf) -> - case stop_listener(Type, ListenerName, Conf) of - ok -> start_listener(Type, ListenerName, Conf); + restart_listener(Type, ListenerName, Conf, Conf). + +restart_listener(Type, ListenerName, OldConf, NewConf) -> + case stop_listener(Type, ListenerName, OldConf) of + ok -> start_listener(Type, ListenerName, NewConf); Error -> Error end. %% @doc Stop all listeners. -spec(stop() -> ok). stop() -> + %% The ?MODULE:stop/0 will be called by emqx_app when emqx is going to shutdown, + %% so we uninstall the config handler here. + _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), foreach_listeners(fun stop_listener/3). -spec(stop_listener(atom()) -> ok | {error, term()}). stop_listener(ListenerId) -> apply_on_listener(ListenerId, fun stop_listener/3). --spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}). -stop_listener(Type, ListenerName, #{bind := ListenOn}) when Type == tcp; Type == ssl -> +stop_listener(Type, ListenerName, #{bind := Bind} = Conf) -> + case do_stop_listener(Type, ListenerName, Conf) of + ok -> + console_print("Listener ~s on ~s stopped.~n", + [listener_id(Type, ListenerName), format_addr(Bind)]), + ok; + {error, Reason} -> + ?ELOG("Failed to stop listener ~s on ~s: ~0p~n", + [listener_id(Type, ListenerName), format_addr(Bind), Reason]), + {error, Reason} + end. + +-spec(do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}). +do_stop_listener(Type, ListenerName, #{bind := ListenOn}) when Type == tcp; Type == ssl -> esockd:close(listener_id(Type, ListenerName), ListenOn); -stop_listener(Type, ListenerName, _Conf) when Type == ws; Type == wss -> +do_stop_listener(Type, ListenerName, _Conf) when Type == ws; Type == wss -> cowboy:stop_listener(listener_id(Type, ListenerName)); -stop_listener(quic, ListenerName, _Conf) -> +do_stop_listener(quic, ListenerName, _Conf) -> quicer:stop_listener(listener_id(quic, ListenerName)). -ifndef(TEST). @@ -201,6 +228,32 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) -> {ok, {skipped, quic_app_missing}} end. +%% Update the listeners at runtime +post_config_update(_Req, NewListeners, OldListeners, _AppEnvs) -> + #{added := Added, removed := Removed, changed := Updated} + = diff_listeners(NewListeners, OldListeners), + perform_listener_changes(fun stop_listener/3, Removed), + perform_listener_changes(fun start_listener/3, Added), + perform_listener_changes(fun restart_listener/3, Updated). + +perform_listener_changes(Action, MapConfs) -> + lists:foreach(fun + ({Id, Conf}) -> + {Type, Name} = parse_listener_id(Id), + Action(Type, Name, Conf) + end, maps:to_list(MapConfs)). + +diff_listeners(NewListeners, OldListeners) -> + emqx_map_lib:diff_maps(flatten_listeners(NewListeners), flatten_listeners(OldListeners)). + +flatten_listeners(Conf0) -> + maps:from_list( + lists:append([do_flatten_listeners(Type, Conf) + || {Type, Conf} <- maps:to_list(Conf0)])). + +do_flatten_listeners(Type, Conf0) -> + [{listener_id(Type, Name), Conf} || {Name, Conf} <- maps:to_list(Conf0)]. + esockd_opts(Type, Opts0) -> Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0), Opts2 = case emqx_config:get_zone_conf(zone(Opts0), [rate_limit, max_conn_rate]) of diff --git a/apps/emqx/src/emqx_map_lib.erl b/apps/emqx/src/emqx_map_lib.erl index 2ed25d22d..d5e851971 100644 --- a/apps/emqx/src/emqx_map_lib.erl +++ b/apps/emqx/src/emqx_map_lib.erl @@ -131,20 +131,20 @@ jsonable_map(Map, JsonableFun) -> deep_convert(Map, fun binary_string_kv/3, [JsonableFun]). -spec diff_maps(map(), map()) -> - #{added := [map()], identical := [map()], removed := [map()], - changed := [#{any() => {OldValue::any(), NewValue::any()}}]}. + #{added := map(), identical := map(), removed := map(), + changed := #{any() => {OldValue::any(), NewValue::any()}}}. diff_maps(NewMap, OldMap) -> - InitR = #{identical => [], changed => [], removed => []}, + InitR = #{identical => #{}, changed => #{}, removed => #{}}, {Result, RemInNew} = lists:foldl(fun({OldK, OldV}, {Result0 = #{identical := I, changed := U, removed := D}, RemNewMap}) -> Result1 = case maps:find(OldK, NewMap) of error -> - Result0#{removed => [#{OldK => OldV} | D]}; + Result0#{removed => D#{OldK => OldV}}; {ok, NewV} when NewV == OldV -> - Result0#{identical => [#{OldK => OldV} | I]}; + Result0#{identical => I#{OldK => OldV}}; {ok, NewV} -> - Result0#{changed => [#{OldK => {OldV, NewV}} | U]} + Result0#{changed => U#{OldK => {OldV, NewV}}} end, {Result1, maps:remove(OldK, RemNewMap)} end, {InitR, NewMap}, maps:to_list(OldMap)),