Merge pull request #9582 from lafirest/fix/fix_field_name
fix(limiter): change limiter `cache` to `buffer`
This commit is contained in:
commit
f660724bf7
|
@ -107,8 +107,8 @@
|
||||||
%% Limiter
|
%% Limiter
|
||||||
limiter :: maybe(limiter()),
|
limiter :: maybe(limiter()),
|
||||||
|
|
||||||
%% cache operation when overload
|
%% limiter buffer for overload use
|
||||||
limiter_cache :: queue:queue(cache()),
|
limiter_buffer :: queue:queue(pending_req()),
|
||||||
|
|
||||||
%% limiter timers
|
%% limiter timers
|
||||||
limiter_timer :: undefined | reference()
|
limiter_timer :: undefined | reference()
|
||||||
|
@ -120,14 +120,14 @@
|
||||||
next :: check_succ_handler()
|
next :: check_succ_handler()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-record(cache, {
|
-record(pending_req, {
|
||||||
need :: list({pos_integer(), limiter_type()}),
|
need :: list({pos_integer(), limiter_type()}),
|
||||||
data :: any(),
|
data :: any(),
|
||||||
next :: check_succ_handler()
|
next :: check_succ_handler()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type state() :: #state{}.
|
-type state() :: #state{}.
|
||||||
-type cache() :: #cache{}.
|
-type pending_req() :: #pending_req{}.
|
||||||
|
|
||||||
-define(ACTIVE_N, 100).
|
-define(ACTIVE_N, 100).
|
||||||
|
|
||||||
|
@ -358,7 +358,7 @@ init_state(
|
||||||
idle_timer = IdleTimer,
|
idle_timer = IdleTimer,
|
||||||
zone = Zone,
|
zone = Zone,
|
||||||
listener = Listener,
|
listener = Listener,
|
||||||
limiter_cache = queue:new(),
|
limiter_buffer = queue:new(),
|
||||||
limiter_timer = undefined
|
limiter_timer = undefined
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -525,12 +525,12 @@ handle_msg({quic, Data, _Sock, _, _, _}, State) ->
|
||||||
inc_counter(incoming_bytes, Oct),
|
inc_counter(incoming_bytes, Oct),
|
||||||
ok = emqx_metrics:inc('bytes.received', Oct),
|
ok = emqx_metrics:inc('bytes.received', Oct),
|
||||||
when_bytes_in(Oct, Data, State);
|
when_bytes_in(Oct, Data, State);
|
||||||
handle_msg(check_cache, #state{limiter_cache = Cache} = State) ->
|
handle_msg(check_cache, #state{limiter_buffer = Cache} = State) ->
|
||||||
case queue:peek(Cache) of
|
case queue:peek(Cache) of
|
||||||
empty ->
|
empty ->
|
||||||
activate_socket(State);
|
activate_socket(State);
|
||||||
{value, #cache{need = Needs, data = Data, next = Next}} ->
|
{value, #pending_req{need = Needs, data = Data, next = Next}} ->
|
||||||
State2 = State#state{limiter_cache = queue:drop(Cache)},
|
State2 = State#state{limiter_buffer = queue:drop(Cache)},
|
||||||
check_limiter(Needs, Data, Next, [check_cache], State2)
|
check_limiter(Needs, Data, Next, [check_cache], State2)
|
||||||
end;
|
end;
|
||||||
handle_msg(
|
handle_msg(
|
||||||
|
@ -936,7 +936,7 @@ check_limiter(
|
||||||
#state{
|
#state{
|
||||||
limiter = Limiter,
|
limiter = Limiter,
|
||||||
limiter_timer = LimiterTimer,
|
limiter_timer = LimiterTimer,
|
||||||
limiter_cache = Cache
|
limiter_buffer = Cache
|
||||||
} = State
|
} = State
|
||||||
) when Limiter =/= undefined ->
|
) when Limiter =/= undefined ->
|
||||||
case LimiterTimer of
|
case LimiterTimer of
|
||||||
|
@ -972,8 +972,8 @@ check_limiter(
|
||||||
%% if there has a retry timer,
|
%% if there has a retry timer,
|
||||||
%% cache the operation and execute it after the retry is over
|
%% cache the operation and execute it after the retry is over
|
||||||
%% the maximum length of the cache queue is equal to the active_n
|
%% the maximum length of the cache queue is equal to the active_n
|
||||||
New = #cache{need = Needs, data = Data, next = WhenOk},
|
New = #pending_req{need = Needs, data = Data, next = WhenOk},
|
||||||
{ok, State#state{limiter_cache = queue:in(New, Cache)}}
|
{ok, State#state{limiter_buffer = queue:in(New, Cache)}}
|
||||||
end;
|
end;
|
||||||
check_limiter(_, Data, WhenOk, Msgs, State) ->
|
check_limiter(_, Data, WhenOk, Msgs, State) ->
|
||||||
WhenOk(Data, Msgs, State).
|
WhenOk(Data, Msgs, State).
|
||||||
|
|
Loading…
Reference in New Issue