diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index ef33128d2..4b7f2b06a 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -662,6 +662,13 @@ handle_in( ] end, {ok, Outgoings, Channel}; +handle_in( + ?PACKET(?CMD_HEARTBEAT), + Channel = #channel{heartbeat = Heartbeat} +) -> + NewVal = emqx_pd:get_counter(recv_pkt), + NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, NewVal, Heartbeat), + {ok, Channel#channel{heartbeat = NewHeartbeat}}; handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> @@ -1125,19 +1132,9 @@ handle_timeout( {keepalive_send, NewVal}, Channel = #channel{heartbeat = HrtBt} ) -> - case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of - {error, timeout} -> - NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt), - NChannel = Channel#channel{heartbeat = NHrtBt}, - {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)}, - reset_timer(outgoing_timer, NChannel)}; - {ok, NHrtBt} -> - {ok, - reset_timer( - outgoing_timer, - Channel#channel{heartbeat = NHrtBt} - )} - end; + NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt), + NChannel = Channel#channel{heartbeat = NHrtBt}, + {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)}, reset_timer(outgoing_timer, NChannel)}; handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) -> Now = erlang:system_time(millisecond), NTrans = maps:filter( diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl index cfb4924da..99593155b 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl @@ -140,6 +140,9 @@ g(Key, Opts, Val) -> -spec parse(binary(), parse_state()) -> parse_result(). parse(<<>>, Parser) -> {more, Parser}; +%% treat the \n as a heartbeat frame +parse(<<$\n>>, Parser = #{phase := none}) -> + {ok, #stomp_frame{command = ?CMD_HEARTBEAT}, <<>>, Parser}; parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); parse(<>, #{phase := hdname, state := State}) -> diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 048e4f7ca..63c5e2a3f 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -190,7 +190,7 @@ t_heartbeat(_) -> {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"1000,800">>} + {<<"heart-beat">>, <<"500,800">>} ] ) ),