feat(mqtt): add more logging context for frame_too_large error
This commit is contained in:
parent
52cfeee2b1
commit
22dabcb3ea
|
@ -540,13 +540,17 @@ handle_in(?AUTH_PACKET(), Channel) ->
|
||||||
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
|
handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
|
||||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) ->
|
||||||
shutdown(shutdown_count(frame_error, Reason), Channel);
|
shutdown(shutdown_count(frame_error, Reason), Channel);
|
||||||
handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = connecting}) ->
|
handle_in(
|
||||||
|
{frame_error, #{hint := frame_too_large} = R}, Channel = #channel{conn_state = connecting}
|
||||||
|
) ->
|
||||||
shutdown(
|
shutdown(
|
||||||
shutdown_count(frame_error, frame_too_large), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
|
shutdown_count(frame_error, R), ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE), Channel
|
||||||
);
|
);
|
||||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = connecting}) ->
|
||||||
shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
|
shutdown(shutdown_count(frame_error, Reason), ?CONNACK_PACKET(?RC_MALFORMED_PACKET), Channel);
|
||||||
handle_in({frame_error, frame_too_large}, Channel = #channel{conn_state = ConnState}) when
|
handle_in(
|
||||||
|
{frame_error, #{hint := frame_too_large}}, Channel = #channel{conn_state = ConnState}
|
||||||
|
) when
|
||||||
ConnState =:= connected orelse ConnState =:= reauthenticating
|
ConnState =:= connected orelse ConnState =:= reauthenticating
|
||||||
->
|
->
|
||||||
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
|
handle_out(disconnect, {?RC_PACKET_TOO_LARGE, frame_too_large}, Channel);
|
||||||
|
@ -2327,6 +2331,8 @@ shutdown(Reason, Reply, Packet, Channel) ->
|
||||||
|
|
||||||
%% process exits with {shutdown, #{shutdown_count := Kind}} will trigger
|
%% process exits with {shutdown, #{shutdown_count := Kind}} will trigger
|
||||||
%% the connection supervisor (esockd) to keep a shutdown-counter grouped by Kind
|
%% the connection supervisor (esockd) to keep a shutdown-counter grouped by Kind
|
||||||
|
shutdown_count(_Kind, #{hint := Hint} = Reason) when is_atom(Hint) ->
|
||||||
|
Reason#{shutdown_count => Hint};
|
||||||
shutdown_count(Kind, Reason) when is_map(Reason) ->
|
shutdown_count(Kind, Reason) when is_map(Reason) ->
|
||||||
Reason#{shutdown_count => Kind};
|
Reason#{shutdown_count => Kind};
|
||||||
shutdown_count(Kind, Reason) ->
|
shutdown_count(Kind, Reason) ->
|
||||||
|
|
|
@ -168,7 +168,7 @@ parse_remaining_len(Rest, Header, Options) ->
|
||||||
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when
|
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when
|
||||||
Length > MaxSize
|
Length > MaxSize
|
||||||
->
|
->
|
||||||
?PARSE_ERR(frame_too_large);
|
?PARSE_ERR(#{hint => frame_too_large, limit => MaxSize, received => Length});
|
||||||
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
||||||
{more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
|
{more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
|
||||||
%% Match DISCONNECT without payload
|
%% Match DISCONNECT without payload
|
||||||
|
@ -213,7 +213,7 @@ parse_remaining_len(
|
||||||
) ->
|
) ->
|
||||||
FrameLen = Value + Len * Multiplier,
|
FrameLen = Value + Len * Multiplier,
|
||||||
case FrameLen > MaxSize of
|
case FrameLen > MaxSize of
|
||||||
true -> ?PARSE_ERR(frame_too_large);
|
true -> ?PARSE_ERR(#{hint => frame_too_large, limit => MaxSize, received => FrameLen});
|
||||||
false -> parse_frame(Rest, Header, FrameLen, Options)
|
false -> parse_frame(Rest, Header, FrameLen, Options)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -493,7 +493,7 @@ format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}, Pa
|
||||||
"" -> [HeaderIO, ")"];
|
"" -> [HeaderIO, ")"];
|
||||||
VarIO -> [HeaderIO, ", ", VarIO, ")"]
|
VarIO -> [HeaderIO, ", ", VarIO, ")"]
|
||||||
end;
|
end;
|
||||||
%% receive a frame error packet, such as {frame_error,frame_too_large} or
|
%% receive a frame error packet, such as {frame_error,#{hint := frame_too_large}} or
|
||||||
%% {frame_error,#{expected => <<"'MQTT' or 'MQIsdp'">>,hint => invalid_proto_name,received => <<"bad_name">>}}
|
%% {frame_error,#{expected => <<"'MQTT' or 'MQIsdp'">>,hint => invalid_proto_name,received => <<"bad_name">>}}
|
||||||
format(FrameError, _PayloadEncode) ->
|
format(FrameError, _PayloadEncode) ->
|
||||||
lists:flatten(io_lib:format("~tp", [FrameError])).
|
lists:flatten(io_lib:format("~tp", [FrameError])).
|
||||||
|
|
|
@ -427,19 +427,30 @@ t_handle_in_auth(_) ->
|
||||||
|
|
||||||
t_handle_in_frame_error(_) ->
|
t_handle_in_frame_error(_) ->
|
||||||
IdleChannel = channel(#{conn_state => idle}),
|
IdleChannel = channel(#{conn_state => idle}),
|
||||||
{shutdown, #{shutdown_count := frame_error, reason := frame_too_large}, _Chan} =
|
{shutdown, #{shutdown_count := frame_too_large, hint := frame_too_large}, _Chan} =
|
||||||
emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
|
emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, IdleChannel),
|
||||||
ConnectingChan = channel(#{conn_state => connecting}),
|
ConnectingChan = channel(#{conn_state => connecting}),
|
||||||
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
|
ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
|
||||||
{shutdown, #{shutdown_count := frame_error, reason := frame_too_large}, ConnackPacket, _} =
|
{shutdown,
|
||||||
emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
|
#{
|
||||||
|
shutdown_count := frame_too_large,
|
||||||
|
hint := frame_too_large,
|
||||||
|
limit := 100,
|
||||||
|
received := 101
|
||||||
|
},
|
||||||
|
ConnackPacket,
|
||||||
|
_} =
|
||||||
|
emqx_channel:handle_in(
|
||||||
|
{frame_error, #{reason => frame_too_large, received => 101, limit => 100}},
|
||||||
|
ConnectingChan
|
||||||
|
),
|
||||||
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
|
DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
|
||||||
ConnectedChan = channel(#{conn_state => connected}),
|
ConnectedChan = channel(#{conn_state => connected}),
|
||||||
{ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} =
|
{ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} =
|
||||||
emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
|
emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, ConnectedChan),
|
||||||
DisconnectedChan = channel(#{conn_state => disconnected}),
|
DisconnectedChan = channel(#{conn_state => disconnected}),
|
||||||
{ok, DisconnectedChan} =
|
{ok, DisconnectedChan} =
|
||||||
emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
|
emqx_channel:handle_in({frame_error, #{reason => frame_too_large}}, DisconnectedChan).
|
||||||
|
|
||||||
t_handle_in_expected_packet(_) ->
|
t_handle_in_expected_packet(_) ->
|
||||||
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
|
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
|
||||||
|
|
|
@ -138,8 +138,8 @@ t_parse_cont(_) ->
|
||||||
|
|
||||||
t_parse_frame_too_large(_) ->
|
t_parse_frame_too_large(_) ->
|
||||||
Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
|
Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
|
||||||
?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 256})),
|
?ASSERT_FRAME_THROW(#{hint := frame_too_large}, parse_serialize(Packet, #{max_size => 256})),
|
||||||
?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
|
?ASSERT_FRAME_THROW(#{hint := frame_too_large}, parse_serialize(Packet, #{max_size => 512})),
|
||||||
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
||||||
|
|
||||||
t_parse_frame_malformed_variable_byte_integer(_) ->
|
t_parse_frame_malformed_variable_byte_integer(_) ->
|
||||||
|
|
Loading…
Reference in New Issue