From 75dab4dff021333e37aa95feedb5968e993387cf Mon Sep 17 00:00:00 2001 From: chengshq Date: Wed, 24 Apr 2024 01:42:26 +0800 Subject: [PATCH] fix: STOMP heartbeat --- .../src/bhvrs/emqx_gateway_conn.erl | 5 ++++- .../src/emqx_stomp_channel.erl | 22 ++++++++----------- .../src/emqx_stomp_frame.erl | 4 +++- .../test/emqx_stomp_SUITE.erl | 2 +- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 8df531a43..041a8e6f1 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -710,7 +710,10 @@ parse_incoming( ) -> try FrameMod:parse(Data, ParseState) of {more, NParseState} -> - {Packets, State#state{parse_state = NParseState}}; + if + Data == <<$\n>> -> {[Data], State#state{parse_state = NParseState}}; + true -> {Packets, State#state{parse_state = NParseState}} + end; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet | Packets], NState) diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 71458f15e..a3f744280 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -653,6 +653,12 @@ handle_in( ] end, {ok, Outgoings, Channel}; +handle_in( + <<$\n>>, + Channel = #channel{heartbeat = Heartbeat} +) -> + NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, 0, Heartbeat), + {ok, Channel#channel{heartbeat = NewHeartbeat}}; handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> @@ -1116,19 +1122,9 @@ handle_timeout( {keepalive_send, NewVal}, Channel = #channel{heartbeat = HrtBt} ) -> - case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of - {error, timeout} -> - NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt), - NChannel = Channel#channel{heartbeat = NHrtBt}, - {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)}, - reset_timer(outgoing_timer, NChannel)}; - {ok, NHrtBt} -> - {ok, - reset_timer( - outgoing_timer, - Channel#channel{heartbeat = NHrtBt} - )} - end; + NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt), + NChannel = Channel#channel{heartbeat = NHrtBt}, + {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)}, reset_timer(outgoing_timer, NChannel)}; handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) -> Now = erlang:system_time(millisecond), NTrans = maps:filter( diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl index cfb4924da..9921ad318 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl @@ -346,7 +346,9 @@ serialize_pkt( serialize_pkt(header, {Name, Val}) when is_integer(Val) -> [escape(Name), ?COLON, integer_to_list(Val), ?LF]; serialize_pkt(header, {Name, Val}) -> - [escape(Name), ?COLON, escape(Val), ?LF]. + [escape(Name), ?COLON, escape(Val), ?LF]; +serialize_pkt(<<$\n>>, _SerializeOpts) -> + <<$\n>>. escape(Bin) when is_binary(Bin) -> <<<<(escape(Ch))/binary>> || <> <= Bin>>; diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 64d95dc42..606635a85 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -153,7 +153,7 @@ t_heartbeat(_) -> {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"1000,800">>} + {<<"heart-beat">>, <<"500,800">>} ] ) ),