From 7e4049620d0682aef994976e31b290fdd5be220a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 19 Dec 2023 16:05:48 +0100 Subject: [PATCH] fix(listen): ensure limiter server state consistent with updates --- apps/emqx/src/emqx_listeners.erl | 68 +++++++++++++++++--------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index f82c2fe10..dc1f6d9ad 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -228,22 +228,25 @@ start_listener(ListenerId) -> apply_on_listener(ListenerId, fun start_listener/3). -spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}. -start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> - case do_start_listener(Type, ListenerName, Conf) of +start_listener(Type, Name, #{bind := Bind, enable := true} = Conf) -> + ListenerId = listener_id(Type, Name), + Limiter = limiter(Conf), + ok = add_limiter_bucket(ListenerId, Limiter), + case do_start_listener(Type, Name, ListenerId, Conf) of {ok, {skipped, Reason}} when Reason =:= quic_app_missing -> ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}), console_print( "Listener ~ts is NOT started due to: ~p.~n", - [listener_id(Type, ListenerName), Reason] + [ListenerId, Reason] ), ok; {ok, _} -> ?tp(listener_started, #{type => Type, bind => Bind}), console_print( "Listener ~ts on ~ts started.~n", - [listener_id(Type, ListenerName), format_bind(Bind)] + [ListenerId, format_bind(Bind)] ), ok; {error, {already_started, Pid}} -> @@ -252,8 +255,8 @@ start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> }), {error, {already_started, Pid}}; {error, Reason} -> + ok = del_limiter_bucket(ListenerId, Limiter), ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}), - ListenerId = listener_id(Type, ListenerName), BindStr = format_bind(Bind), ?ELOG( "Failed to start listener ~ts on ~ts: ~0p.~n", @@ -267,10 +270,10 @@ start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> ), {error, {failed_to_start, Msg}} end; -start_listener(Type, ListenerName, #{enable := false}) -> +start_listener(Type, Name, #{enable := false}) -> console_print( "Listener ~ts is NOT started due to: disabled.~n", - [listener_id(Type, ListenerName)] + [listener_id(Type, Name)] ), ok. @@ -294,6 +297,8 @@ update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) -> update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) -> start_listener(Type, Name, Conf); update_listener(Type, Name, OldConf, NewConf) -> + Id = listener_id(Type, Name), + ok = update_limiter_bucket(Id, limiter(OldConf), limiter(NewConf)), case do_update_listener(Type, Name, OldConf, NewConf) of ok -> ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), @@ -325,7 +330,7 @@ stop_listener(ListenerId) -> stop_listener(Type, Name, #{bind := Bind} = Conf) -> Id = listener_id(Type, Name), - ok = del_limiter_bucket(Id, Conf), + ok = del_limiter_bucket(Id, limiter(Conf)), ok = unregister_ocsp_stapling_refresh(Type, Name), case do_stop_listener(Type, Id, Conf) of ok -> @@ -387,21 +392,17 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args). console_print(_Fmt, _Args) -> ok. -endif. --spec do_start_listener(listener_type(), atom(), map()) -> +-spec do_start_listener(listener_type(), atom(), listener_id(), map()) -> {ok, pid() | {skipped, atom()}} | {error, term()}. %% Start MQTT/TCP listener -do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> - Id = listener_id(Type, Name), - ok = add_limiter_bucket(Id, limiter(Opts)), +do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> esockd:open( Id, ListenOn, merge_default(esockd_opts(Id, Type, Name, Opts)) ); %% Start MQTT/WS listener -do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) -> - Id = listener_id(Type, Name), - ok = add_limiter_bucket(Id, limiter(Opts)), +do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) -> RanchOpts = ranch_opts(Type, Opts), WsOpts = ws_opts(Type, Name, Opts), case Type of @@ -409,7 +410,7 @@ do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) -> wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) end; %% Start MQTT/QUIC listener -do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> +do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) -> ListenOn = case Bind of {Addr, Port} when tuple_size(Addr) == 4 -> @@ -459,16 +460,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1), peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10), zone => zone(Opts), - listener => {quic, ListenerName}, + listener => {quic, Name}, limiter => Limiter }, StreamOpts = #{ stream_callback => emqx_quic_stream, active => 1 }, - - Id = listener_id(quic, ListenerName), - add_limiter_bucket(Id, Limiter), quicer:spawn_listener( Id, ListenOn, @@ -745,18 +743,24 @@ add_limiter_bucket(Id, Limiter) -> maps:without([client], Limiter) ). -del_limiter_bucket(Id, Conf) -> - case limiter(Conf) of - undefined -> - ok; - Limiter -> - lists:foreach( - fun(Type) -> - emqx_limiter_server:del_bucket(Id, Type) - end, - maps:keys(Limiter) - ) - end. +del_limiter_bucket(_Id, undefined) -> + ok; +del_limiter_bucket(Id, Limiter) -> + maps:foreach( + fun(Type, _) -> + emqx_limiter_server:del_bucket(Id, Type) + end, + Limiter + ). + +update_limiter_bucket(Id, Limiter, undefined) -> + del_limiter_bucket(Id, Limiter); +update_limiter_bucket(Id, undefined, Limiter) -> + add_limiter_bucket(Id, Limiter); +update_limiter_bucket(Id, OldLimiter, NewLimiter) -> + ok = add_limiter_bucket(Id, NewLimiter), + Outdated = maps:without(maps:keys(NewLimiter), OldLimiter), + del_limiter_bucket(Id, Outdated). diff_confs(NewConfs, OldConfs) -> emqx_utils:diff_lists(