From 8ec3b1db5df25dc5d024944eed15acc8d87af8ee Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 16:51:39 +0100 Subject: [PATCH 1/4] fix(emqx_connection): handle socket activation error return --- apps/emqx/src/emqx_connection.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index db36fbea9..31281b8c2 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -555,7 +555,7 @@ handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) -> handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> case queue:peek(Cache) of empty -> - activate_socket(State); + handle_info(activate_socket, State); {value, #pending_req{need = Needs, data = Data, next = Next}} -> State2 = State#state{limiter_buffer = queue:drop(Cache)}, check_limiter(Needs, Data, Next, [check_cache], State2) From ec19247271aac074d67f8a9e81b0e9542a4ff0be Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 16:55:26 +0100 Subject: [PATCH 2/4] refactor: rename limiter buffer related messages and var names --- apps/emqx/src/emqx_connection.erl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 31281b8c2..11d42f9dd 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -552,13 +552,13 @@ handle_msg({quic, Data, _Stream, #{len := Len}}, State) when is_binary(Data) -> inc_counter(incoming_bytes, Len), ok = emqx_metrics:inc('bytes.received', Len), when_bytes_in(Len, Data, State); -handle_msg(check_cache, #state{limiter_buffer = Cache} = State) -> - case queue:peek(Cache) of +handle_msg(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) -> + case queue:peek(Buffer) of empty -> handle_info(activate_socket, State); {value, #pending_req{need = Needs, data = Data, next = Next}} -> - State2 = State#state{limiter_buffer = queue:drop(Cache)}, - check_limiter(Needs, Data, Next, [check_cache], State2) + State2 = State#state{limiter_buffer = queue:drop(Buffer)}, + check_limiter(Needs, Data, Next, [check_limiter_buffer], State2) end; handle_msg( {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, @@ -1036,13 +1036,13 @@ check_limiter( Data, WhenOk, _Msgs, - #state{limiter_buffer = Cache} = State + #state{limiter_buffer = Buffer} = State ) -> %% if there has a retry timer, - %% cache the operation and execute it after the retry is over - %% the maximum length of the cache queue is equal to the active_n + %% Buffer the operation and execute it after the retry is over + %% the maximum length of the buffer queue is equal to the active_n New = #pending_req{need = Needs, data = Data, next = WhenOk}, - {ok, State#state{limiter_buffer = queue:in(New, Cache)}}. + {ok, State#state{limiter_buffer = queue:in(New, Buffer)}}. %% try to perform a retry -spec retry_limiter(state()) -> _. @@ -1053,7 +1053,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> {ok, Limiter2} -> Next( Data, - [check_cache], + [check_limiter_buffer], State#state{ limiter = Limiter2, limiter_timer = undefined From 7f078295c1d3a60803c885bd4b38af63702cf646 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 17:44:39 +0100 Subject: [PATCH 3/4] docs: add changelog for PR 11987 --- changes/ce/fix-11987.en.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changes/ce/fix-11987.en.md diff --git a/changes/ce/fix-11987.en.md b/changes/ce/fix-11987.en.md new file mode 100644 index 000000000..4d85cff41 --- /dev/null +++ b/changes/ce/fix-11987.en.md @@ -0,0 +1,3 @@ +Fix connection crash when trying to set TCP/SSL socket `active_n` option. + +Prior to this fix, if a socket is already closed when connection process tries to set `active_n` option, it causes a `case_clause` crash. From b02711af79d6b914beb55f6aefaa18bf60860c75 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Nov 2023 17:47:20 +0100 Subject: [PATCH 4/4] refactor(emqx_ws_connection): rename cache to buffer for limiter --- apps/emqx/src/emqx_ws_connection.erl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 37ce72d74..07329721a 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -94,7 +94,7 @@ limiter :: container(), %% cache operation when overload - limiter_cache :: queue:queue(cache()), + limiter_buffer :: queue:queue(cache()), %% limiter timers limiter_timer :: undefined | reference() @@ -326,7 +326,7 @@ websocket_init([Req, Opts]) -> zone = Zone, listener = {Type, Listener}, limiter_timer = undefined, - limiter_cache = queue:new() + limiter_buffer = queue:new() }, hibernate}; {denny, Reason} -> @@ -462,13 +462,13 @@ websocket_info( State ) -> return(retry_limiter(State)); -websocket_info(check_cache, #state{limiter_cache = Cache} = State) -> - case queue:peek(Cache) of +websocket_info(check_limiter_buffer, #state{limiter_buffer = Buffer} = State) -> + case queue:peek(Buffer) of empty -> return(enqueue({active, true}, State#state{sockstate = running})); {value, #cache{need = Needs, data = Data, next = Next}} -> - State2 = State#state{limiter_cache = queue:drop(Cache)}, - return(check_limiter(Needs, Data, Next, [check_cache], State2)) + State2 = State#state{limiter_buffer = queue:drop(Buffer)}, + return(check_limiter(Needs, Data, Next, [check_limiter_buffer], State2)) end; websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> handle_timeout(TRef, Msg, State); @@ -630,10 +630,10 @@ check_limiter( Data, WhenOk, _Msgs, - #state{limiter_cache = Cache} = State + #state{limiter_buffer = Buffer} = State ) -> New = #cache{need = Needs, data = Data, next = WhenOk}, - State#state{limiter_cache = queue:in(New, Cache)}. + State#state{limiter_buffer = queue:in(New, Buffer)}. -spec retry_limiter(state()) -> state(). retry_limiter(#state{limiter = Limiter} = State) -> @@ -644,7 +644,7 @@ retry_limiter(#state{limiter = Limiter} = State) -> {ok, Limiter2} -> Next( Data, - [check_cache], + [check_limiter_buffer], State#state{ limiter = Limiter2, limiter_timer = undefined