fix(stomp): fix client.disconnect not trigger
This commit is contained in:
parent
5682dcb72e
commit
053f9b422c
|
@ -512,8 +512,15 @@ handle_in(?PACKET(?CMD_ABORT, Headers),
|
||||||
handle_out(receipt, receipt_id(Headers), NChannel)
|
handle_out(receipt, receipt_id(Headers), NChannel)
|
||||||
end);
|
end);
|
||||||
|
|
||||||
handle_in(?PACKET(?CMD_DISCONNECT, Headers), Channel) ->
|
handle_in(?PACKET(?CMD_DISCONNECT, Headers),
|
||||||
shutdown_with_recepit(normal, receipt_id(Headers), Channel);
|
Channel = #channel{conn_state = connected}) ->
|
||||||
|
Outgoings = case receipt_id(Headers) of
|
||||||
|
undefined -> [{close, normal}];
|
||||||
|
ReceiptId ->
|
||||||
|
[{outgoing, receipt_frame(ReceiptId)},
|
||||||
|
{close, normal}]
|
||||||
|
end,
|
||||||
|
{ok, Outgoings, Channel};
|
||||||
|
|
||||||
handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
|
handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) ->
|
||||||
?SLOG(error, #{ msg => "unexpected_frame_error"
|
?SLOG(error, #{ msg => "unexpected_frame_error"
|
||||||
|
@ -765,7 +772,6 @@ handle_info({sock_closed, Reason},
|
||||||
%emqx_zone:enable_flapping_detect(Zone)
|
%emqx_zone:enable_flapping_detect(Zone)
|
||||||
% andalso emqx_flapping:detect(ClientInfo),
|
% andalso emqx_flapping:detect(ClientInfo),
|
||||||
NChannel = ensure_disconnected(Reason, Channel),
|
NChannel = ensure_disconnected(Reason, Channel),
|
||||||
%% XXX: Session keepper detect here
|
|
||||||
shutdown(Reason, NChannel);
|
shutdown(Reason, NChannel);
|
||||||
|
|
||||||
handle_info({sock_closed, Reason},
|
handle_info({sock_closed, Reason},
|
||||||
|
@ -918,20 +924,9 @@ reply(Reply, Channel) ->
|
||||||
shutdown(Reason, Channel) ->
|
shutdown(Reason, Channel) ->
|
||||||
{shutdown, Reason, Channel}.
|
{shutdown, Reason, Channel}.
|
||||||
|
|
||||||
shutdown_with_recepit(Reason, ReceiptId, Channel) ->
|
|
||||||
case ReceiptId of
|
|
||||||
undefined ->
|
|
||||||
{shutdown, Reason, Channel};
|
|
||||||
_ ->
|
|
||||||
{shutdown, Reason, receipt_frame(ReceiptId), Channel}
|
|
||||||
end.
|
|
||||||
|
|
||||||
shutdown(Reason, AckFrame, Channel) ->
|
shutdown(Reason, AckFrame, Channel) ->
|
||||||
{shutdown, Reason, AckFrame, Channel}.
|
{shutdown, Reason, AckFrame, Channel}.
|
||||||
|
|
||||||
%shutdown_and_reply(Reason, Reply, Channel) ->
|
|
||||||
% {shutdown, Reason, Reply, Channel}.
|
|
||||||
|
|
||||||
shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
|
shutdown_and_reply(Reason, Reply, OutPkt, Channel) ->
|
||||||
{shutdown, Reason, Reply, OutPkt, Channel}.
|
{shutdown, Reason, Reply, OutPkt, Channel}.
|
||||||
|
|
||||||
|
|
|
@ -158,6 +158,8 @@ parse(<<?BSL, Ch:8, Rest/binary>>,
|
||||||
Phase =:= hdvalue ->
|
Phase =:= hdvalue ->
|
||||||
parse(Phase, Rest, acc(unescape(Ch), State));
|
parse(Phase, Rest, acc(unescape(Ch), State));
|
||||||
|
|
||||||
|
parse(<<?LF>>, Parser = #{phase := none}) ->
|
||||||
|
{more, Parser};
|
||||||
parse(Bytes, #{phase := none, state := State}) ->
|
parse(Bytes, #{phase := none, state := State}) ->
|
||||||
parse(command, Bytes, State).
|
parse(command, Bytes, State).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue