diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 37ce72d74..07329721a 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -94,7 +94,7 @@ limiter :: container(), %% cache operation when overload - limiter_cache :: queue:queue(cache()), + limiter_buffer :: queue:queue(cache()), %% limiter timers limiter_timer :: undefined | reference() @@ -326,7 +326,7 @@ websocket_init([Req, Opts]) -> zone = Zone, listener = {Type, Listener}, limiter_timer = undefined, - limiter_cache = queue:new() + limiter_buffer = queue:new() }, hibernate}; {denny, Reason} -> @@ -462,13 +462,13 @@ websocket_info( State ) -> return(retry_limiter(State)); -websocket_info(check_cache, #state{limiter_cache = Cache} = State) -> - case queue:peek(Cache) of +websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) -> + case queue:peek(Buffer) of empty -> return(enqueue({active, true}, State#state{sockstate = running})); {value, #cache{need = Needs, data = Data, next = Next}} -> - State2 = State#state{limiter_cache = queue:drop(Cache)}, - return(check_limiter(Needs, Data, Next, [check_cache], State2)) + State2 = State#state{limiter_buffer = queue:drop(Buffer)}, + return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2)) end; websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> handle_timeout(TRef, Msg, State); @@ -630,10 +630,10 @@ check_limiter( Data, WhenOk, _Msgs, - #state{limiter_cache = Cache} = State + #state{limiter_buffer = Buffer} = State ) -> New = #cache{need = Needs, data = Data, next = WhenOk}, - State#state{limiter_cache = queue:in(New, Cache)}. + State#state{limiter_buffer = queue:in(New, Buffer)}. -spec retry_limiter(state()) -> state(). retry_limiter(#state{limiter = Limiter} = State) -> @@ -644,7 +644,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> {ok, Limiter2} -> Next( Data, - [check_cache], + [check_limiter_buffer], State#state{ limiter = Limiter2, limiter_timer = undefined