diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index e0ab47f73..1ac660d4b 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -512,8 +512,15 @@ handle_in(?PACKET(?CMD_ABORT, Headers), handle_out(receipt, receipt_id(Headers), NChannel) end); -handle_in(?PACKET(?CMD_DISCONNECT, Headers), Channel) -> - shutdown_with_recepit(normal, receipt_id(Headers), Channel); +handle_in(?PACKET(?CMD_DISCONNECT, Headers), + Channel = #channel{conn_state = connected}) -> + Outgoings = case receipt_id(Headers) of + undefined -> [{close, normal}]; + ReceiptId -> + [{outgoing, receipt_frame(ReceiptId)}, + {close, normal}] + end, + {ok, Outgoings, Channel}; handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> ?SLOG(error, #{ msg => "unexpected_frame_error" @@ -765,7 +772,6 @@ handle_info({sock_closed, Reason}, %emqx_zone:enable_flapping_detect(Zone) % andalso emqx_flapping:detect(ClientInfo), NChannel = ensure_disconnected(Reason, Channel), - %% XXX: Session keepper detect here shutdown(Reason, NChannel); handle_info({sock_closed, Reason}, @@ -918,20 +924,9 @@ reply(Reply, Channel) -> shutdown(Reason, Channel) -> {shutdown, Reason, Channel}. -shutdown_with_recepit(Reason, ReceiptId, Channel) -> - case ReceiptId of - undefined -> - {shutdown, Reason, Channel}; - _ -> - {shutdown, Reason, receipt_frame(ReceiptId), Channel} - end. - shutdown(Reason, AckFrame, Channel) -> {shutdown, Reason, AckFrame, Channel}. -%shutdown_and_reply(Reason, Reply, Channel) -> -% {shutdown, Reason, Reply, Channel}. - shutdown_and_reply(Reason, Reply, OutPkt, Channel) -> {shutdown, Reason, Reply, OutPkt, Channel}. diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index 314b2d884..7158cbcdb 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -158,6 +158,8 @@ parse(<>, Phase =:= hdvalue -> parse(Phase, Rest, acc(unescape(Ch), State)); +parse(<>, Parser = #{phase := none}) -> + {more, Parser}; parse(Bytes, #{phase := none, state := State}) -> parse(command, Bytes, State).