Send DISCONNECT on packet parse error

This commit is contained in:
terry-xiaoyu 2019-09-05 18:16:00 +08:00
parent cc4ee065a4
commit 1e4c51f080
3 changed files with 20 additions and 4 deletions

View File

@ -402,7 +402,7 @@ process_incoming(Data, State) ->
process_incoming(<<>>, Packets, State) -> process_incoming(<<>>, Packets, State) ->
{keep_state, State, next_incoming_events(Packets)}; {keep_state, State, next_incoming_events(Packets)};
process_incoming(Data, Packets, State = #connection{parse_state = ParseState}) -> process_incoming(Data, Packets, State = #connection{parse_state = ParseState, chan_state = ChanState}) ->
try emqx_frame:parse(Data, ParseState) of try emqx_frame:parse(Data, ParseState) of
{ok, NParseState} -> {ok, NParseState} ->
NState = State#connection{parse_state = NParseState}, NState = State#connection{parse_state = NParseState},
@ -416,7 +416,14 @@ process_incoming(Data, Packets, State = #connection{parse_state = ParseState}) -
error:Reason:Stk -> error:Reason:Stk ->
?LOG(error, "Parse failed for ~p~n\ ?LOG(error, "Parse failed for ~p~n\
Stacktrace:~p~nError data:~p", [Reason, Stk, Data]), Stacktrace:~p~nError data:~p", [Reason, Stk, Data]),
shutdown(parse_error, State) case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of
{stop, Reason0, OutPackets, NChanState} ->
Shutdown = fun(NewSt) -> stop(Reason0, NewSt) end,
NState = State#connection{chan_state = NChanState},
handle_outgoing(OutPackets, Shutdown, NState);
{stop, Reason0, NChanState} ->
stop(Reason0, State#connection{chan_state = NChanState})
end
end. end.
-compile({inline, [next_incoming_events/1]}). -compile({inline, [next_incoming_events/1]}).

View File

@ -24,6 +24,7 @@
, text/1 , text/1
, text/2 , text/2
, connack_error/1 , connack_error/1
, mqtt_frame_error/1
]). ]).
-export([compat/2]). -export([compat/2]).
@ -175,3 +176,5 @@ connack_error(banned) -> ?RC_BANNED;
connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD; connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
connack_error(_) -> ?RC_NOT_AUTHORIZED. connack_error(_) -> ?RC_NOT_AUTHORIZED.
mqtt_frame_error(mqtt_frame_too_large) -> ?RC_PACKET_TOO_LARGE;
mqtt_frame_error(_) -> ?RC_MALFORMED_PACKET.

View File

@ -336,7 +336,7 @@ handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) ->
process_incoming(<<>>, State) -> process_incoming(<<>>, State) ->
{ok, State}; {ok, State};
process_incoming(Data, State = #ws_connection{parse_state = ParseState}) -> process_incoming(Data, State = #ws_connection{parse_state = ParseState, chan_state = ChanState}) ->
try emqx_frame:parse(Data, ParseState) of try emqx_frame:parse(Data, ParseState) of
{ok, NParseState} -> {ok, NParseState} ->
{ok, State#ws_connection{parse_state = NParseState}}; {ok, State#ws_connection{parse_state = NParseState}};
@ -350,7 +350,13 @@ process_incoming(Data, State = #ws_connection{parse_state = ParseState}) ->
error:Reason:Stk -> error:Reason:Stk ->
?LOG(error, "Parse failed for ~p~n\ ?LOG(error, "Parse failed for ~p~n\
Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]),
stop(parse_error, State) case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of
{stop, Reason0, OutPackets, NChanState} ->
NState = State#ws_connection{chan_state = NChanState},
stop(Reason0, enqueue(OutPackets, NState));
{stop, Reason0, NChanState} ->
stop(Reason0, State#ws_connection{chan_state = NChanState})
end
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------