fix(limiter): change limiter `cache` to `buffer`

This commit is contained in:
firest 2022-12-20 15:55:34 +08:00
parent 71d40c0490
commit a8e020af58
1 changed files with 11 additions and 11 deletions

View File

@ -107,8 +107,8 @@
%% Limiter %% Limiter
limiter :: maybe(limiter()), limiter :: maybe(limiter()),
%% cache operation when overload %% limiter buffer for overload use
limiter_cache :: queue:queue(cache()), limiter_buffer :: queue:queue(pending_req()),
%% limiter timers %% limiter timers
limiter_timer :: undefined | reference() limiter_timer :: undefined | reference()
@ -120,14 +120,14 @@
next :: check_succ_handler() next :: check_succ_handler()
}). }).
-record(cache, { -record(pending_req, {
need :: list({pos_integer(), limiter_type()}), need :: list({pos_integer(), limiter_type()}),
data :: any(), data :: any(),
next :: check_succ_handler() next :: check_succ_handler()
}). }).
-type state() :: #state{}. -type state() :: #state{}.
-type cache() :: #cache{}. -type pending_req() :: #pending_req{}.
-define(ACTIVE_N, 100). -define(ACTIVE_N, 100).
@ -358,7 +358,7 @@ init_state(
idle_timer = IdleTimer, idle_timer = IdleTimer,
zone = Zone, zone = Zone,
listener = Listener, listener = Listener,
limiter_cache = queue:new(), limiter_buffer = queue:new(),
limiter_timer = undefined limiter_timer = undefined
}. }.
@ -525,12 +525,12 @@ handle_msg({quic, Data, _Sock, _, _, _}, State) ->
inc_counter(incoming_bytes, Oct), inc_counter(incoming_bytes, Oct),
ok = emqx_metrics:inc('bytes.received', Oct), ok = emqx_metrics:inc('bytes.received', Oct),
when_bytes_in(Oct, Data, State); when_bytes_in(Oct, Data, State);
handle_msg(check_cache, #state{limiter_cache = Cache} = State) -> handle_msg(check_cache, #state{limiter_buffer = Cache} = State) ->
case queue:peek(Cache) of case queue:peek(Cache) of
empty -> empty ->
activate_socket(State); activate_socket(State);
{value, #cache{need = Needs, data = Data, next = Next}} -> {value, #pending_req{need = Needs, data = Data, next = Next}} ->
State2 = State#state{limiter_cache = queue:drop(Cache)}, State2 = State#state{limiter_buffer = queue:drop(Cache)},
check_limiter(Needs, Data, Next, [check_cache], State2) check_limiter(Needs, Data, Next, [check_cache], State2)
end; end;
handle_msg( handle_msg(
@ -936,7 +936,7 @@ check_limiter(
#state{ #state{
limiter = Limiter, limiter = Limiter,
limiter_timer = LimiterTimer, limiter_timer = LimiterTimer,
limiter_cache = Cache limiter_buffer = Cache
} = State } = State
) when Limiter =/= undefined -> ) when Limiter =/= undefined ->
case LimiterTimer of case LimiterTimer of
@ -972,8 +972,8 @@ check_limiter(
%% if there has a retry timer, %% if there has a retry timer,
%% cache the operation and execute it after the retry is over %% cache the operation and execute it after the retry is over
%% the maximum length of the cache queue is equal to the active_n %% the maximum length of the cache queue is equal to the active_n
New = #cache{need = Needs, data = Data, next = WhenOk}, New = #pending_req{need = Needs, data = Data, next = WhenOk},
{ok, State#state{limiter_cache = queue:in(New, Cache)}} {ok, State#state{limiter_buffer = queue:in(New, Cache)}}
end; end;
check_limiter(_, Data, WhenOk, Msgs, State) -> check_limiter(_, Data, WhenOk, Msgs, State) ->
WhenOk(Data, Msgs, State). WhenOk(Data, Msgs, State).