From 75dab4dff021333e37aa95feedb5968e993387cf Mon Sep 17 00:00:00 2001 From: chengshq Date: Wed, 24 Apr 2024 01:42:26 +0800 Subject: [PATCH 1/3] fix: STOMP heartbeat --- .../src/bhvrs/emqx_gateway_conn.erl | 5 ++++- .../src/emqx_stomp_channel.erl | 22 ++++++++----------- .../src/emqx_stomp_frame.erl | 4 +++- .../test/emqx_stomp_SUITE.erl | 2 +- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 8df531a43..041a8e6f1 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -710,7 +710,10 @@ parse_incoming( ) -> try FrameMod:parse(Data, ParseState) of {more, NParseState} -> - {Packets, State#state{parse_state = NParseState}}; + if + Data == <<$\n>> -> {[Data], State#state{parse_state = NParseState}}; + true -> {Packets, State#state{parse_state = NParseState}} + end; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet | Packets], NState) diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 71458f15e..a3f744280 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -653,6 +653,12 @@ handle_in( ] end, {ok, Outgoings, Channel}; +handle_in( + <<$\n>>, + Channel = #channel{heartbeat = Heartbeat} +) -> + NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, 0, Heartbeat), + {ok, Channel#channel{heartbeat = NewHeartbeat}}; handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> @@ -1116,19 +1122,9 @@ handle_timeout( {keepalive_send, NewVal}, Channel = #channel{heartbeat = HrtBt} ) -> - case emqx_stomp_heartbeat:check(outgoing, NewVal, HrtBt) of - {error, timeout} -> - NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt), - NChannel = Channel#channel{heartbeat = NHrtBt}, - {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)}, - reset_timer(outgoing_timer, NChannel)}; - {ok, NHrtBt} -> - {ok, - reset_timer( - outgoing_timer, - Channel#channel{heartbeat = NHrtBt} - )} - end; + NHrtBt = emqx_stomp_heartbeat:reset(outgoing, NewVal, HrtBt), + NChannel = Channel#channel{heartbeat = NHrtBt}, + {ok, {outgoing, emqx_stomp_frame:make(?CMD_HEARTBEAT)}, reset_timer(outgoing_timer, NChannel)}; handle_timeout(_TRef, clean_trans, Channel = #channel{transaction = Trans}) -> Now = erlang:system_time(millisecond), NTrans = maps:filter( diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl index cfb4924da..9921ad318 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl @@ -346,7 +346,9 @@ serialize_pkt( serialize_pkt(header, {Name, Val}) when is_integer(Val) -> [escape(Name), ?COLON, integer_to_list(Val), ?LF]; serialize_pkt(header, {Name, Val}) -> - [escape(Name), ?COLON, escape(Val), ?LF]. + [escape(Name), ?COLON, escape(Val), ?LF]; +serialize_pkt(<<$\n>>, _SerializeOpts) -> + <<$\n>>. escape(Bin) when is_binary(Bin) -> <<<<(escape(Ch))/binary>> || <> <= Bin>>; diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 64d95dc42..606635a85 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -153,7 +153,7 @@ t_heartbeat(_) -> {<<"host">>, <<"127.0.0.1:61613">>}, {<<"login">>, <<"guest">>}, {<<"passcode">>, <<"guest">>}, - {<<"heart-beat">>, <<"1000,800">>} + {<<"heart-beat">>, <<"500,800">>} ] ) ), From d2b6e41cd1b85be3052cab9619932bc2a8d029c2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 6 May 2024 14:39:41 +0800 Subject: [PATCH 2/3] chore(stomp): parse \n as heartbeat frame --- apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl | 5 +---- apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl | 2 +- apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl | 7 ++++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 041a8e6f1..8df531a43 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -710,10 +710,7 @@ parse_incoming( ) -> try FrameMod:parse(Data, ParseState) of {more, NParseState} -> - if - Data == <<$\n>> -> {[Data], State#state{parse_state = NParseState}}; - true -> {Packets, State#state{parse_state = NParseState}} - end; + {Packets, State#state{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet | Packets], NState) diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index a3f744280..cd2359269 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -654,7 +654,7 @@ handle_in( end, {ok, Outgoings, Channel}; handle_in( - <<$\n>>, + ?PACKET(?CMD_HEARTBEAT), Channel = #channel{heartbeat = Heartbeat} ) -> NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, 0, Heartbeat), diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl index 9921ad318..99593155b 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl @@ -140,6 +140,9 @@ g(Key, Opts, Val) -> -spec parse(binary(), parse_state()) -> parse_result(). parse(<<>>, Parser) -> {more, Parser}; +%% treat the \n as a heartbeat frame +parse(<<$\n>>, Parser = #{phase := none}) -> + {ok, #stomp_frame{command = ?CMD_HEARTBEAT}, <<>>, Parser}; parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); parse(<>, #{phase := hdname, state := State}) -> @@ -346,9 +349,7 @@ serialize_pkt( serialize_pkt(header, {Name, Val}) when is_integer(Val) -> [escape(Name), ?COLON, integer_to_list(Val), ?LF]; serialize_pkt(header, {Name, Val}) -> - [escape(Name), ?COLON, escape(Val), ?LF]; -serialize_pkt(<<$\n>>, _SerializeOpts) -> - <<$\n>>. + [escape(Name), ?COLON, escape(Val), ?LF]. escape(Bin) when is_binary(Bin) -> <<<<(escape(Ch))/binary>> || <> <= Bin>>; From cabe2ae10007d3bebd2f5918c97f2a8d1d00a0cd Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 6 May 2024 14:58:27 +0800 Subject: [PATCH 3/3] chore: fix dialyzer warning --- apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index cd2359269..ceada3c26 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -657,7 +657,8 @@ handle_in( ?PACKET(?CMD_HEARTBEAT), Channel = #channel{heartbeat = Heartbeat} ) -> - NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, 0, Heartbeat), + NewVal = emqx_pd:get_counter(recv_pkt), + NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, NewVal, Heartbeat), {ok, Channel#channel{heartbeat = NewHeartbeat}}; handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel);