From 22dabcb3eabc7afc95afd4989654ab01666ba90f Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Sat, 17 Feb 2024 11:08:30 +0100 Subject: [PATCH] feat(mqtt): add more logging context for frame_too_large error --- apps/emqx/src/emqx_channel.erl | 12 +++++++++--- apps/emqx/src/emqx_frame.erl | 4 ++-- apps/emqx/src/emqx_packet.erl | 2 +- apps/emqx/test/emqx_channel_SUITE.erl | 23 +++++++++++++++++------ apps/emqx/test/emqx_frame_SUITE.erl | 4 ++-- 5 files changed, 31 insertions(+), 14 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 130ec99c8..9b57f25cd 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -540,13 +540,17 @@ handle_in(?AUTH_PACKET(), Channel) -> handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(shutdown_count(frame_error, Reason), Channel); -handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connecting}) -> +handle_in( + {frame_error, #{hint := frame_too_large} = R}, Channel = #channel{conn_state = connecting} +) -> shutdown( - shutdown_count(frame_error, frame_too_large), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel + shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel ); handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) -> shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel); -handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = ConnState}) when +handle_in( + {frame_error, #{hint := frame_too_large}}, Channel = #channel{conn_state = ConnState} +) when ConnState =:= connected orelse ConnState =:= reauthenticating -> handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel); @@ -2327,6 +2331,8 @@ shutdown(Reason, Reply, Packet, Channel) -> %% process exits with {shutdown, #{shutdown_count := Kind}} will trigger %% the connection supervisor (esockd) to keep a shutdown-counter grouped by Kind +shutdown_count(_Kind, #{hint := Hint} = Reason) when is_atom(Hint) -> + Reason#{shutdown_count => Hint}; shutdown_count(Kind, Reason) when is_map(Reason) -> Reason#{shutdown_count => Kind}; shutdown_count(Kind, Reason) -> diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index 22170b6de..c5ce7dc8f 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -168,7 +168,7 @@ parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> - ?PARSE_ERR(frame_too_large); + ?PARSE_ERR(#{hint => frame_too_large, limit => MaxSize, received => Length}); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; %% Match DISCONNECT without payload @@ -213,7 +213,7 @@ parse_remaining_len( ) -> FrameLen = Value + Len * Multiplier, case FrameLen > MaxSize of - true -> ?PARSE_ERR(frame_too_large); + true -> ?PARSE_ERR(#{hint => frame_too_large, limit => MaxSize, received => FrameLen}); false -> parse_frame(Rest, Header, FrameLen, Options) end. diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 542dc8b3b..29c13d99e 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -493,7 +493,7 @@ format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, Pa "" -> [HeaderIO, ")"]; VarIO -> [HeaderIO, ", ", VarIO, ")"] end; -%% receive a frame error packet, such as {frame_error,frame_too_large} or +%% receive a frame error packet, such as {frame_error,#{hint := frame_too_large}} or %% {frame_error,#{expected => <<"'MQTT' or 'MQIsdp'">>,hint => invalid_proto_name,received => <<"bad_name">>}} format(FrameError, _PayloadEncode) -> lists:flatten(io_lib:format("~tp", [FrameError])). diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index ca038ac85..dfee02dc1 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -427,19 +427,30 @@ t_handle_in_auth(_) -> t_handle_in_frame_error(_) -> IdleChannel = channel(#{conn_state => idle}), - {shutdown, #{shutdown_count := frame_error, reason := frame_too_large}, _Chan} = - emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel), + {shutdown, #{shutdown_count := frame_too_large, hint := frame_too_large}, _Chan} = + emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, IdleChannel), ConnectingChan = channel(#{conn_state => connecting}), ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), - {shutdown, #{shutdown_count := frame_error, reason := frame_too_large}, ConnackPacket, _} = - emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan), + {shutdown, + #{ + shutdown_count := frame_too_large, + hint := frame_too_large, + limit := 100, + received := 101 + }, + ConnackPacket, + _} = + emqx_channel:handle_in( + {frame_error, #{reason => frame_too_large, received => 101, limit => 100}}, + ConnectingChan + ), DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE), ConnectedChan = channel(#{conn_state => connected}), {ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} = - emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan), + emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, ConnectedChan), DisconnectedChan = channel(#{conn_state => disconnected}), {ok, DisconnectedChan} = - emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan). + emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, DisconnectedChan). t_handle_in_expected_packet(_) -> Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), diff --git a/apps/emqx/test/emqx_frame_SUITE.erl b/apps/emqx/test/emqx_frame_SUITE.erl index 23e8972e9..935a0d8ec 100644 --- a/apps/emqx/test/emqx_frame_SUITE.erl +++ b/apps/emqx/test/emqx_frame_SUITE.erl @@ -138,8 +138,8 @@ t_parse_cont(_) -> t_parse_frame_too_large(_) -> Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)), - ?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 256})), - ?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 512})), + ?ASSERT_FRAME_THROW(#{hint := frame_too_large}, parse_serialize(Packet, #{max_size => 256})), + ?ASSERT_FRAME_THROW(#{hint := frame_too_large}, parse_serialize(Packet, #{max_size => 512})), ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). t_parse_frame_malformed_variable_byte_integer(_) ->