From 1e4c51f080e667a073e4637bba45ae72de633901 Mon Sep 17 00:00:00 2001 From: terry-xiaoyu <506895667@qq.com> Date: Thu, 5 Sep 2019 18:16:00 +0800 Subject: [PATCH] Send DISCONNECT on packet parse error --- src/emqx_connection.erl | 11 +++++++++-- src/emqx_reason_codes.erl | 3 +++ src/emqx_ws_connection.erl | 10 ++++++++-- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index cd5956cc2..54da647fb 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -402,7 +402,7 @@ process_incoming(Data, State) -> process_incoming(<<>>, Packets, State) -> {keep_state, State, next_incoming_events(Packets)}; -process_incoming(Data, Packets, State = #connection{parse_state = ParseState}) -> +process_incoming(Data, Packets, State = #connection{parse_state = ParseState, chan_state = ChanState}) -> try emqx_frame:parse(Data, ParseState) of {ok, NParseState} -> NState = State#connection{parse_state = NParseState}, @@ -416,7 +416,14 @@ process_incoming(Data, Packets, State = #connection{parse_state = ParseState}) - error:Reason:Stk -> ?LOG(error, "Parse failed for ~p~n\ Stacktrace:~p~nError data:~p", [Reason, Stk, Data]), - shutdown(parse_error, State) + case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of + {stop, Reason0, OutPackets, NChanState} -> + Shutdown = fun(NewSt) -> stop(Reason0, NewSt) end, + NState = State#connection{chan_state = NChanState}, + handle_outgoing(OutPackets, Shutdown, NState); + {stop, Reason0, NChanState} -> + stop(Reason0, State#connection{chan_state = NChanState}) + end end. -compile({inline, [next_incoming_events/1]}). diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index df95e6022..6aad26a31 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -24,6 +24,7 @@ , text/1 , text/2 , connack_error/1 + , mqtt_frame_error/1 ]). -export([compat/2]). @@ -175,3 +176,5 @@ connack_error(banned) -> ?RC_BANNED; connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD; connack_error(_) -> ?RC_NOT_AUTHORIZED. +mqtt_frame_error(mqtt_frame_too_large) -> ?RC_PACKET_TOO_LARGE; +mqtt_frame_error(_) -> ?RC_MALFORMED_PACKET. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 8c9604064..4e6baebdf 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -336,7 +336,7 @@ handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) -> process_incoming(<<>>, State) -> {ok, State}; -process_incoming(Data, State = #ws_connection{parse_state = ParseState}) -> +process_incoming(Data, State = #ws_connection{parse_state = ParseState, chan_state = ChanState}) -> try emqx_frame:parse(Data, ParseState) of {ok, NParseState} -> {ok, State#ws_connection{parse_state = NParseState}}; @@ -350,7 +350,13 @@ process_incoming(Data, State = #ws_connection{parse_state = ParseState}) -> error:Reason:Stk -> ?LOG(error, "Parse failed for ~p~n\ Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), - stop(parse_error, State) + case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of + {stop, Reason0, OutPackets, NChanState} -> + NState = State#ws_connection{chan_state = NChanState}, + stop(Reason0, enqueue(OutPackets, NState)); + {stop, Reason0, NChanState} -> + stop(Reason0, State#ws_connection{chan_state = NChanState}) + end end. %%--------------------------------------------------------------------