refactor(emqx_ws_connection): rename cache to buffer for limiter
This commit is contained in:
parent
7f078295c1
commit
b02711af79
|
@ -94,7 +94,7 @@
|
||||||
limiter :: container(),
|
limiter :: container(),
|
||||||
|
|
||||||
%% cache operation when overload
|
%% cache operation when overload
|
||||||
limiter_cache :: queue:queue(cache()),
|
limiter_buffer :: queue:queue(cache()),
|
||||||
|
|
||||||
%% limiter timers
|
%% limiter timers
|
||||||
limiter_timer :: undefined | reference()
|
limiter_timer :: undefined | reference()
|
||||||
|
@ -326,7 +326,7 @@ websocket_init([Req, Opts]) ->
|
||||||
zone = Zone,
|
zone = Zone,
|
||||||
listener = {Type, Listener},
|
listener = {Type, Listener},
|
||||||
limiter_timer = undefined,
|
limiter_timer = undefined,
|
||||||
limiter_cache = queue:new()
|
limiter_buffer = queue:new()
|
||||||
},
|
},
|
||||||
hibernate};
|
hibernate};
|
||||||
{denny, Reason} ->
|
{denny, Reason} ->
|
||||||
|
@ -462,13 +462,13 @@ websocket_info(
|
||||||
State
|
State
|
||||||
) ->
|
) ->
|
||||||
return(retry_limiter(State));
|
return(retry_limiter(State));
|
||||||
websocket_info(check_cache, #state{limiter_cache = Cache} = State) ->
|
websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
|
||||||
case queue:peek(Cache) of
|
case queue:peek(Buffer) of
|
||||||
empty ->
|
empty ->
|
||||||
return(enqueue({active, true}, State#state{sockstate = running}));
|
return(enqueue({active, true}, State#state{sockstate = running}));
|
||||||
{value, #cache{need = Needs, data = Data, next = Next}} ->
|
{value, #cache{need = Needs, data = Data, next = Next}} ->
|
||||||
State2 = State#state{limiter_cache = queue:drop(Cache)},
|
State2 = State#state{limiter_buffer = queue:drop(Buffer)},
|
||||||
return(check_limiter(Needs, Data, Next, [check_cache], State2))
|
return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2))
|
||||||
end;
|
end;
|
||||||
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
|
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
|
||||||
handle_timeout(TRef, Msg, State);
|
handle_timeout(TRef, Msg, State);
|
||||||
|
@ -630,10 +630,10 @@ check_limiter(
|
||||||
Data,
|
Data,
|
||||||
WhenOk,
|
WhenOk,
|
||||||
_Msgs,
|
_Msgs,
|
||||||
#state{limiter_cache = Cache} = State
|
#state{limiter_buffer = Buffer} = State
|
||||||
) ->
|
) ->
|
||||||
New = #cache{need = Needs, data = Data, next = WhenOk},
|
New = #cache{need = Needs, data = Data, next = WhenOk},
|
||||||
State#state{limiter_cache = queue:in(New, Cache)}.
|
State#state{limiter_buffer = queue:in(New, Buffer)}.
|
||||||
|
|
||||||
-spec retry_limiter(state()) -> state().
|
-spec retry_limiter(state()) -> state().
|
||||||
retry_limiter(#state{limiter = Limiter} = State) ->
|
retry_limiter(#state{limiter = Limiter} = State) ->
|
||||||
|
@ -644,7 +644,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
|
||||||
{ok, Limiter2} ->
|
{ok, Limiter2} ->
|
||||||
Next(
|
Next(
|
||||||
Data,
|
Data,
|
||||||
[check_cache],
|
[check_limiter_buffer],
|
||||||
State#state{
|
State#state{
|
||||||
limiter = Limiter2,
|
limiter = Limiter2,
|
||||||
limiter_timer = undefined
|
limiter_timer = undefined
|
||||||
|
|
Loading…
Reference in New Issue