Merge pull request #11987 from zmstone/1120-do-not-crash-on-einval-after-check-cache

fix(emqx_connection): handle socket activation error return
This commit is contained in:
Zaiming (Stone) Shi 2023-11-21 20:31:56 +01:00 committed by GitHub
commit 8fbdcab118
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 19 deletions

View File

@ -552,13 +552,13 @@ handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) ->
inc_counter(incoming_bytes, Len), inc_counter(incoming_bytes, Len),
ok = emqx_metrics:inc('bytes.received', Len), ok = emqx_metrics:inc('bytes.received', Len),
when_bytes_in(Len, Data, State); when_bytes_in(Len, Data, State);
handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> handle_msg(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
case queue:peek(Cache) of case queue:peek(Buffer) of
empty -> empty ->
activate_socket(State); handle_info(activate_socket, State);
{value, #pending_req{need = Needs, data = Data, next = Next}} -> {value, #pending_req{need = Needs, data = Data, next = Next}} ->
State2 = State#state{limiter_buffer = queue:drop(Cache)}, State2 = State#state{limiter_buffer = queue:drop(Buffer)},
check_limiter(Needs, Data, Next, [check_cache], State2) check_limiter(Needs, Data, Next, [check_limiter_buffer], State2)
end; end;
handle_msg( handle_msg(
{incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
@ -1036,13 +1036,13 @@ check_limiter(
Data, Data,
WhenOk, WhenOk,
_Msgs, _Msgs,
#state{limiter_buffer = Cache} = State #state{limiter_buffer = Buffer} = State
) -> ) ->
%% if there has a retry timer, %% if there has a retry timer,
%% cache the operation and execute it after the retry is over %% Buffer the operation and execute it after the retry is over
%% the maximum length of the cache queue is equal to the active_n %% the maximum length of the buffer queue is equal to the active_n
New = #pending_req{need = Needs, data = Data, next = WhenOk}, New = #pending_req{need = Needs, data = Data, next = WhenOk},
{ok, State#state{limiter_buffer = queue:in(New, Cache)}}. {ok, State#state{limiter_buffer = queue:in(New, Buffer)}}.
%% try to perform a retry %% try to perform a retry
-spec retry_limiter(state()) -> _. -spec retry_limiter(state()) -> _.
@ -1053,7 +1053,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
{ok, Limiter2} -> {ok, Limiter2} ->
Next( Next(
Data, Data,
[check_cache], [check_limiter_buffer],
State#state{ State#state{
limiter = Limiter2, limiter = Limiter2,
limiter_timer = undefined limiter_timer = undefined

View File

@ -94,7 +94,7 @@
limiter :: container(), limiter :: container(),
%% cache operation when overload %% cache operation when overload
limiter_cache :: queue:queue(cache()), limiter_buffer :: queue:queue(cache()),
%% limiter timers %% limiter timers
limiter_timer :: undefined | reference() limiter_timer :: undefined | reference()
@ -326,7 +326,7 @@ websocket_init([Req, Opts]) ->
zone = Zone, zone = Zone,
listener = {Type, Listener}, listener = {Type, Listener},
limiter_timer = undefined, limiter_timer = undefined,
limiter_cache = queue:new() limiter_buffer = queue:new()
}, },
hibernate}; hibernate};
{denny, Reason} -> {denny, Reason} ->
@ -462,13 +462,13 @@ websocket_info(
State State
) -> ) ->
return(retry_limiter(State)); return(retry_limiter(State));
websocket_info(check_cache, #state{limiter_cache = Cache} = State) -> websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) ->
case queue:peek(Cache) of case queue:peek(Buffer) of
empty -> empty ->
return(enqueue({active, true}, State#state{sockstate = running})); return(enqueue({active, true}, State#state{sockstate = running}));
{value, #cache{need = Needs, data = Data, next = Next}} -> {value, #cache{need = Needs, data = Data, next = Next}} ->
State2 = State#state{limiter_cache = queue:drop(Cache)}, State2 = State#state{limiter_buffer = queue:drop(Buffer)},
return(check_limiter(Needs, Data, Next, [check_cache], State2)) return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2))
end; end;
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
handle_timeout(TRef, Msg, State); handle_timeout(TRef, Msg, State);
@ -630,10 +630,10 @@ check_limiter(
Data, Data,
WhenOk, WhenOk,
_Msgs, _Msgs,
#state{limiter_cache = Cache} = State #state{limiter_buffer = Buffer} = State
) -> ) ->
New = #cache{need = Needs, data = Data, next = WhenOk}, New = #cache{need = Needs, data = Data, next = WhenOk},
State#state{limiter_cache = queue:in(New, Cache)}. State#state{limiter_buffer = queue:in(New, Buffer)}.
-spec retry_limiter(state()) -> state(). -spec retry_limiter(state()) -> state().
retry_limiter(#state{limiter = Limiter} = State) -> retry_limiter(#state{limiter = Limiter} = State) ->
@ -644,7 +644,7 @@ retry_limiter(#state{limiter = Limiter} = State) ->
{ok, Limiter2} -> {ok, Limiter2} ->
Next( Next(
Data, Data,
[check_cache], [check_limiter_buffer],
State#state{ State#state{
limiter = Limiter2, limiter = Limiter2,
limiter_timer = undefined limiter_timer = undefined

View File

@ -0,0 +1,3 @@
Fix connection crash when trying to set TCP/SSL socket `active_n` option.
Prior to this fix, if a socket is already closed when connection process tries to set `active_n` option, it causes a `case_clause` crash.