From a8e020af58e03ca8946709368b30bcb66ce35b28 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 20 Dec 2022 15:55:34 +0800 Subject: [PATCH] fix(limiter): change limiter `cache` to `buffer` --- apps/emqx/src/emqx_connection.erl | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index d604d4784..e0e86c145 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -107,8 +107,8 @@ %% Limiter limiter :: maybe(limiter()), - %% cache operation when overload - limiter_cache :: queue:queue(cache()), + %% limiter buffer for overload use + limiter_buffer :: queue:queue(pending_req()), %% limiter timers limiter_timer :: undefined | reference() @@ -120,14 +120,14 @@ next :: check_succ_handler() }). --record(cache, { +-record(pending_req, { need :: list({pos_integer(), limiter_type()}), data :: any(), next :: check_succ_handler() }). -type state() :: #state{}. --type cache() :: #cache{}. +-type pending_req() :: #pending_req{}. -define(ACTIVE_N, 100). @@ -358,7 +358,7 @@ init_state( idle_timer = IdleTimer, zone = Zone, listener = Listener, - limiter_cache = queue:new(), + limiter_buffer = queue:new(), limiter_timer = undefined }. @@ -525,12 +525,12 @@ handle_msg({quic, Data, _Sock, _, _, _}, State) -> inc_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), when_bytes_in(Oct, Data, State); -handle_msg(check_cache, #state{limiter_cache = Cache} = State) -> +handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> case queue:peek(Cache) of empty -> activate_socket(State); - {value, #cache{need = Needs, data = Data, next = Next}} -> - State2 = State#state{limiter_cache = queue:drop(Cache)}, + {value, #pending_req{need = Needs, data = Data, next = Next}} -> + State2 = State#state{limiter_buffer = queue:drop(Cache)}, check_limiter(Needs, Data, Next, [check_cache], State2) end; handle_msg( @@ -936,7 +936,7 @@ check_limiter( #state{ limiter = Limiter, limiter_timer = LimiterTimer, - limiter_cache = Cache + limiter_buffer = Cache } = State ) when Limiter =/= undefined -> case LimiterTimer of @@ -972,8 +972,8 @@ check_limiter( %% if there has a retry timer, %% cache the operation and execute it after the retry is over %% the maximum length of the cache queue is equal to the active_n - New = #cache{need = Needs, data = Data, next = WhenOk}, - {ok, State#state{limiter_cache = queue:in(New, Cache)}} + New = #pending_req{need = Needs, data = Data, next = WhenOk}, + {ok, State#state{limiter_buffer = queue:in(New, Cache)}} end; check_limiter(_, Data, WhenOk, Msgs, State) -> WhenOk(Data, Msgs, State).