diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index a2ffa1988..314b2d884 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -136,6 +136,8 @@ parse(<<>>, Parser) -> parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); +parse(<>, #{phase := hdname, state := State}) -> + parse(body, Bytes, State, content_len(State)); parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none -> parse(Phase, Bytes, State); diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl index 6aa22fb3b..cd06e58f5 100644 --- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl @@ -385,6 +385,34 @@ t_1000_msg_send(_) -> lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000)) end). +t_sticky_packets_truncate_after_headers(_) -> + with_connection(fun(Sock) -> + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _, _} = parse(Data), + + Topic = <<"/queue/foo">>, + + emqx:subscribe(Topic), + gen_tcp:send(Sock, ["SEND\n", + "content-length:3\n", + "destination:/queue/foo\n"]), + timer:sleep(300), + gen_tcp:send(Sock, ["\nfoo",0]), + receive + {deliver, Topic, _Msg}-> + ok + after 100 -> + ?assert(false, "waiting message timeout") + end + end). t_rest_clienit_info(_) -> with_connection(fun(Sock) -> gen_tcp:send(Sock, serialize(<<"CONNECT">>,