diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 117cb3fe2..f17126607 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -334,26 +334,23 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(?PACKET(?PINGREQ), Channel) -> {ok, ?PACKET(?PINGRESP), Channel}; -handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{session = Session, - conninfo = ConnInfo = #{expiry_interval := OldInterval}}) -> - OldInterval = emqx_session:info(expiry_interval, Session), - Interval = emqx_mqtt_props:get('Session-Expiry-Interval', Props, OldInterval), +handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> + #{proto_ver := ProtoVer, expiry_interval := OldInterval} = ConnInfo, + Interval = emqx_mqtt_props:get('Session-Expiry-Interval', Properties, OldInterval), case OldInterval =:= 0 andalso Interval =/= OldInterval of true -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); false -> Reason = case ReasonCode of ?RC_SUCCESS -> normal; - _ -> - ProtoVer = emqx_protocol:info(proto_ver, Protocol), - emqx_reason_codes:name(ReasonCode, ProtoVer) + _ -> emqx_reason_codes:name(ReasonCode, ProtoVer) end, - {wait_session_expire, {shutdown, Reason}, - Channel#channel{session = emqx_session:update_expiry_interval(Interval, Session), - protocol = case ReasonCode of - ?RC_SUCCESS -> emqx_protocol:clear_will_msg(Protocol); - _ -> Protocol - end}} + Channel1 = Channel#channel{conninfo = ConnInfo#{expiry_interval := Interval}}, + Channel2 = case ReasonCode of + ?RC_SUCCESS -> Channel1#channel{will_msg = undefined}; + _ -> Channel1 + end, + {wait_session_expire, {shutdown, Reason}, Channel2} end; handle_in(?AUTH_PACKET(), Channel) -> @@ -694,7 +691,6 @@ handle_info(disconnected, Channel = #channel{connected = false}) -> handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval}, client = ClientInfo = #{zone := Zone}, - session = Session, will_msg = WillMsg}) -> emqx_zone:enable_flapping_detect(Zone) andalso emqx_flapping:detect(ClientInfo), Channel1 = ensure_disconnected(Channel), @@ -835,16 +831,12 @@ will_delay_interval(WillMsg) -> terminate(normal, #channel{conninfo = ConnInfo, client = ClientInfo}) -> ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]); -terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, client = ClientInfo,}) +terminate({shutdown, Reason}, #channel{conninfo = ConnInfo, client = ClientInfo}) when Reason =:= kicked orelse Reason =:= discarded orelse Reason =:= takeovered -> ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]); terminate(Reason, #channel{conninfo = ConnInfo, client = ClientInfo, will_msg = WillMsg}) -> publish_will_msg(WillMsg), ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]). - if - Protocol == undefined -> ok; - true -> publish_will_msg(emqx_protocol:info(will_msg, Protocol)) - end. -spec(received(pos_integer(), channel()) -> channel()). received(Oct, Channel) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 03e15cce6..96fb7be9b 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -360,8 +360,7 @@ handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) -> process_incoming(<<>>, State) -> {ok, State}; -process_incoming(Data, State = #ws_connection{parse_state = ParseState, - chan_state = ChanState}) -> +process_incoming(Data, State = #ws_connection{parse_state = ParseState}) -> try emqx_frame:parse(Data, ParseState) of {more, NParseState} -> {ok, State#ws_connection{parse_state = NParseState}};