fix(listen): ensure limiter server state consistent with updates

This commit is contained in:
Andrew Mayorov 2023-12-19 16:05:48 +01:00
parent 4796f85dff
commit 7e4049620d
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 36 additions and 32 deletions

View File

@ -228,22 +228,25 @@ start_listener(ListenerId) ->
apply_on_listener(ListenerId, fun start_listener/3). apply_on_listener(ListenerId, fun start_listener/3).
-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}. -spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) -> start_listener(Type, Name, #{bind := Bind, enable := true} = Conf) ->
case do_start_listener(Type, ListenerName, Conf) of ListenerId = listener_id(Type, Name),
Limiter = limiter(Conf),
ok = add_limiter_bucket(ListenerId, Limiter),
case do_start_listener(Type, Name, ListenerId, Conf) of
{ok, {skipped, Reason}} when {ok, {skipped, Reason}} when
Reason =:= quic_app_missing Reason =:= quic_app_missing
-> ->
?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}), ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}),
console_print( console_print(
"Listener ~ts is NOT started due to: ~p.~n", "Listener ~ts is NOT started due to: ~p.~n",
[listener_id(Type, ListenerName), Reason] [ListenerId, Reason]
), ),
ok; ok;
{ok, _} -> {ok, _} ->
?tp(listener_started, #{type => Type, bind => Bind}), ?tp(listener_started, #{type => Type, bind => Bind}),
console_print( console_print(
"Listener ~ts on ~ts started.~n", "Listener ~ts on ~ts started.~n",
[listener_id(Type, ListenerName), format_bind(Bind)] [ListenerId, format_bind(Bind)]
), ),
ok; ok;
{error, {already_started, Pid}} -> {error, {already_started, Pid}} ->
@ -252,8 +255,8 @@ start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) ->
}), }),
{error, {already_started, Pid}}; {error, {already_started, Pid}};
{error, Reason} -> {error, Reason} ->
ok = del_limiter_bucket(ListenerId, Limiter),
?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}), ?tp(listener_not_started, #{type => Type, bind => Bind, status => {error, Reason}}),
ListenerId = listener_id(Type, ListenerName),
BindStr = format_bind(Bind), BindStr = format_bind(Bind),
?ELOG( ?ELOG(
"Failed to start listener ~ts on ~ts: ~0p.~n", "Failed to start listener ~ts on ~ts: ~0p.~n",
@ -267,10 +270,10 @@ start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) ->
), ),
{error, {failed_to_start, Msg}} {error, {failed_to_start, Msg}}
end; end;
start_listener(Type, ListenerName, #{enable := false}) -> start_listener(Type, Name, #{enable := false}) ->
console_print( console_print(
"Listener ~ts is NOT started due to: disabled.~n", "Listener ~ts is NOT started due to: disabled.~n",
[listener_id(Type, ListenerName)] [listener_id(Type, Name)]
), ),
ok. ok.
@ -294,6 +297,8 @@ update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) ->
update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) -> update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) ->
start_listener(Type, Name, Conf); start_listener(Type, Name, Conf);
update_listener(Type, Name, OldConf, NewConf) -> update_listener(Type, Name, OldConf, NewConf) ->
Id = listener_id(Type, Name),
ok = update_limiter_bucket(Id, limiter(OldConf), limiter(NewConf)),
case do_update_listener(Type, Name, OldConf, NewConf) of case do_update_listener(Type, Name, OldConf, NewConf) of
ok -> ok ->
ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf), ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
@ -325,7 +330,7 @@ stop_listener(ListenerId) ->
stop_listener(Type, Name, #{bind := Bind} = Conf) -> stop_listener(Type, Name, #{bind := Bind} = Conf) ->
Id = listener_id(Type, Name), Id = listener_id(Type, Name),
ok = del_limiter_bucket(Id, Conf), ok = del_limiter_bucket(Id, limiter(Conf)),
ok = unregister_ocsp_stapling_refresh(Type, Name), ok = unregister_ocsp_stapling_refresh(Type, Name),
case do_stop_listener(Type, Id, Conf) of case do_stop_listener(Type, Id, Conf) of
ok -> ok ->
@ -387,21 +392,17 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
console_print(_Fmt, _Args) -> ok. console_print(_Fmt, _Args) -> ok.
-endif. -endif.
-spec do_start_listener(listener_type(), atom(), map()) -> -spec do_start_listener(listener_type(), atom(), listener_id(), map()) ->
{ok, pid() | {skipped, atom()}} | {error, term()}. {ok, pid() | {skipped, atom()}} | {error, term()}.
%% Start MQTT/TCP listener %% Start MQTT/TCP listener
do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) -> do_start_listener(Type, Name, Id, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) ->
Id = listener_id(Type, Name),
ok = add_limiter_bucket(Id, limiter(Opts)),
esockd:open( esockd:open(
Id, Id,
ListenOn, ListenOn,
merge_default(esockd_opts(Id, Type, Name, Opts)) merge_default(esockd_opts(Id, Type, Name, Opts))
); );
%% Start MQTT/WS listener %% Start MQTT/WS listener
do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) -> do_start_listener(Type, Name, Id, Opts) when ?COWBOY_LISTENER(Type) ->
Id = listener_id(Type, Name),
ok = add_limiter_bucket(Id, limiter(Opts)),
RanchOpts = ranch_opts(Type, Opts), RanchOpts = ranch_opts(Type, Opts),
WsOpts = ws_opts(Type, Name, Opts), WsOpts = ws_opts(Type, Name, Opts),
case Type of case Type of
@ -409,7 +410,7 @@ do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) ->
wss -> cowboy:start_tls(Id, RanchOpts, WsOpts) wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
end; end;
%% Start MQTT/QUIC listener %% Start MQTT/QUIC listener
do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) -> do_start_listener(quic, Name, Id, #{bind := Bind} = Opts) ->
ListenOn = ListenOn =
case Bind of case Bind of
{Addr, Port} when tuple_size(Addr) == 4 -> {Addr, Port} when tuple_size(Addr) == 4 ->
@ -459,16 +460,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1), peer_unidi_stream_count => maps:get(peer_unidi_stream_count, Opts, 1),
peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10), peer_bidi_stream_count => maps:get(peer_bidi_stream_count, Opts, 10),
zone => zone(Opts), zone => zone(Opts),
listener => {quic, ListenerName}, listener => {quic, Name},
limiter => Limiter limiter => Limiter
}, },
StreamOpts = #{ StreamOpts = #{
stream_callback => emqx_quic_stream, stream_callback => emqx_quic_stream,
active => 1 active => 1
}, },
Id = listener_id(quic, ListenerName),
add_limiter_bucket(Id, Limiter),
quicer:spawn_listener( quicer:spawn_listener(
Id, Id,
ListenOn, ListenOn,
@ -745,18 +743,24 @@ add_limiter_bucket(Id, Limiter) ->
maps:without([client], Limiter) maps:without([client], Limiter)
). ).
del_limiter_bucket(Id, Conf) -> del_limiter_bucket(_Id, undefined) ->
case limiter(Conf) of ok;
undefined -> del_limiter_bucket(Id, Limiter) ->
ok; maps:foreach(
Limiter -> fun(Type, _) ->
lists:foreach( emqx_limiter_server:del_bucket(Id, Type)
fun(Type) -> end,
emqx_limiter_server:del_bucket(Id, Type) Limiter
end, ).
maps:keys(Limiter)
) update_limiter_bucket(Id, Limiter, undefined) ->
end. del_limiter_bucket(Id, Limiter);
update_limiter_bucket(Id, undefined, Limiter) ->
add_limiter_bucket(Id, Limiter);
update_limiter_bucket(Id, OldLimiter, NewLimiter) ->
ok = add_limiter_bucket(Id, NewLimiter),
Outdated = maps:without(maps:keys(NewLimiter), OldLimiter),
del_limiter_bucket(Id, Outdated).
diff_confs(NewConfs, OldConfs) -> diff_confs(NewConfs, OldConfs) ->
emqx_utils:diff_lists( emqx_utils:diff_lists(