diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 4c88059e6..f7d0541b1 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -516,6 +516,9 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun, State#state{proto_state = NProtoState}); {error, Reason, NProtoState} -> shutdown(Reason, State#state{proto_state = NProtoState}); + {error, Reason, OutPacket, NProtoState} -> + Shutdown = fun(NewSt) -> shutdown(Reason, NewSt) end, + handle_outgoing(OutPacket, Shutdown, State#state{proto_state = NProtoState}); {stop, Error, NProtoState} -> stop(Error, State#state{proto_state = NProtoState}) end. diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 2872c5c14..d35c85cf5 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -416,11 +416,15 @@ handle_out({unsuback, PacketId, ReasonCodes}, PState = #protocol{proto_ver = ?MQ handle_out({unsuback, PacketId, _ReasonCodes}, PState) -> {ok, ?UNSUBACK_PACKET(PacketId), PState}; -handle_out({disconnect, ReasonCode}, PState) -> - {ok, PState}; +handle_out({disconnect, ReasonCode}, PState = #protocol{proto_ver = ?MQTT_PROTO_V5}) -> + Reason = emqx_reason_codes:name(ReasonCode), + {error, Reason, ?DISCONNECT_PACKET(ReasonCode), PState}; + +handle_out({disconnect, ReasonCode}, PState = #protocol{proto_ver = ProtoVer}) -> + {error, emqx_reason_codes:name(ReasonCode, ProtoVer), PState}; handle_out(Packet, PState) -> - io:format("Out: ~p~n", [Packet]), + ?LOG(error, "Unexpected out:~p", [Packet]), {ok, PState}. %%-------------------------------------------------------------------- diff --git a/src/emqx_ws_channel.erl b/src/emqx_ws_channel.erl index 292294560..3bc067525 100644 --- a/src/emqx_ws_channel.erl +++ b/src/emqx_ws_channel.erl @@ -436,6 +436,8 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun, SuccFun(enqueue(OutPackets, State#state{proto_state = NProtoState})); {error, Reason, NProtoState} -> stop(Reason, State#state{proto_state = NProtoState}); + {error, Reason, OutPacket, NProtoState} -> + stop(Reason, enqueue(OutPacket, State#state{proto_state = NProtoState})); {stop, Error, NProtoState} -> stop(Error, State#state{proto_state = NProtoState}) end.