diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index a27c534e2..f7c76cade 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -563,29 +563,8 @@ handle_in( process_disconnect(ReasonCode, Properties, NChannel); handle_in(?AUTH_PACKET(), Channel) -> handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> - shutdown(shutdown_count(frame_error, Reason), Channel); -handle_in( - {frame_error, #{cause := frame_too_large} = R}, Channel = #channel{conn_state = connecting} -) -> - shutdown( - shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel - ); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) -> - shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); -handle_in( - {frame_error, #{cause := frame_too_large}}, Channel = #channel{conn_state = ConnState} -) when - ConnState =:= connected orelse ConnState =:= reauthenticating --> - handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = ConnState}) when - ConnState =:= connected orelse ConnState =:= reauthenticating --> - handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); -handle_in({frame_error, Reason}, Channel = #channel{conn_state = disconnected}) -> - ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}), - {ok, Channel}; +handle_in({frame_error, Reason}, Channel) -> + handle_frame_error(Reason, Channel); handle_in(Packet, Channel) -> ?SLOG(error, #{msg => "disconnecting_due_to_unexpected_message", packet => Packet}), handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel). @@ -1017,6 +996,37 @@ not_nacked({deliver, _Topic, Msg}) -> true end. +%%-------------------------------------------------------------------- +%% Handle Frame Error +%%-------------------------------------------------------------------- + +handle_frame_error( + Reason, + Channel = #channel{conn_state = idle} +) -> + shutdown(shutdown_count(frame_error, Reason), Channel); +handle_frame_error( + #{cause := frame_too_large} = R, Channel = #channel{conn_state = connecting} +) -> + shutdown( + shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel + ); +handle_frame_error(Reason, Channel = #channel{conn_state = connecting}) -> + shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); +handle_frame_error( + #{cause := frame_too_large}, Channel = #channel{conn_state = ConnState} +) when + ConnState =:= connected orelse ConnState =:= reauthenticating +-> + handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); +handle_frame_error(Reason, Channel = #channel{conn_state = ConnState}) when + ConnState =:= connected orelse ConnState =:= reauthenticating +-> + handle_out(disconnect, {?RC_MALFORMED_PACKET, Reason}, Channel); +handle_frame_error(Reason, Channel = #channel{conn_state = disconnected}) -> + ?SLOG(error, #{msg => "malformed_mqtt_message", reason => Reason}), + {ok, Channel}. + %%-------------------------------------------------------------------- %% Handle outgoing packet %%-------------------------------------------------------------------- @@ -2629,8 +2639,7 @@ save_alias(outbound, AliasId, Topic, TopicAliases = #{outbound := Aliases}) -> NAliases = maps:put(Topic, AliasId, Aliases), TopicAliases#{outbound => NAliases}. --compile({inline, [reply/2, shutdown/2, shutdown/3, sp/1, flag/1]}). - +-compile({inline, [reply/2, shutdown/2, shutdown/3]}). reply(Reply, Channel) -> {reply, Reply, Channel}. diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index a1c9084dd..f83a739ad 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -1134,7 +1134,7 @@ validate_connect_reserved(0) -> ok; validate_connect_reserved(1) -> ?PARSE_ERR(reserved_connect_flag). %% MQTT-v3.1.1-[MQTT-3.1.2-13], MQTT-v5.0-[MQTT-3.1.2-11] -validate_connect_will(false, _, WillQos) when WillQos > 0 -> ?PARSE_ERR(invalid_will_qos); +validate_connect_will(false, _, WillQoS) when WillQoS > 0 -> ?PARSE_ERR(invalid_will_qos); %% MQTT-v3.1.1-[MQTT-3.1.2-14], MQTT-v5.0-[MQTT-3.1.2-12] validate_connect_will(true, _, WillQoS) when WillQoS > 2 -> ?PARSE_ERR(invalid_will_qos); %% MQTT-v3.1.1-[MQTT-3.1.2-15], MQTT-v5.0-[MQTT-3.1.2-13] diff --git a/apps/emqx/src/emqx_quic_connection.erl b/apps/emqx/src/emqx_quic_connection.erl index df9520d90..c63014cea 100644 --- a/apps/emqx/src/emqx_quic_connection.erl +++ b/apps/emqx/src/emqx_quic_connection.erl @@ -62,7 +62,7 @@ streams := [{pid(), quicer:stream_handle()}], %% New stream opts stream_opts := map(), - %% If conneciton is resumed from session ticket + %% If connection is resumed from session ticket is_resumed => boolean(), %% mqtt message serializer config serialize => undefined, @@ -70,8 +70,8 @@ }. -type cb_ret() :: quicer_lib:cb_ret(). -%% @doc Data streams initializions are started in parallel with control streams, data streams are blocked -%% for the activation from control stream after it is accepted as a legit conneciton. +%% @doc Data streams initializations are started in parallel with control streams, data streams are blocked +%% for the activation from control stream after it is accepted as a legit connection. %% For security, the initial number of allowed data streams from client should be limited by %% 'peer_bidi_stream_count` & 'peer_unidi_stream_count` -spec activate_data_streams(pid(), { @@ -80,7 +80,7 @@ activate_data_streams(ConnOwner, {PS, Serialize, Channel}) -> gen_server:call(ConnOwner, {activate_data_streams, {PS, Serialize, Channel}}, infinity). -%% @doc conneciton owner init callback +%% @doc connection owner init callback -spec init(map()) -> {ok, cb_state()}. init(#{stream_opts := SOpts} = S) when is_list(SOpts) -> init(S#{stream_opts := maps:from_list(SOpts)});