diff --git a/apps/emqx_stomp/src/emqx_stomp_frame.erl b/apps/emqx_stomp/src/emqx_stomp_frame.erl index fa37c2f64..51f23e778 100644 --- a/apps/emqx_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_stomp/src/emqx_stomp_frame.erl @@ -123,6 +123,8 @@ parse(<<>>, Parser) -> parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); +parse(<>, #{phase := hdname, state := State}) -> + parse(body, Bytes, State, content_len(State)); parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none -> parse(Phase, Bytes, State); diff --git a/apps/emqx_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_stomp/test/emqx_stomp_SUITE.erl index e2599ab51..f4503d791 100644 --- a/apps/emqx_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_stomp/test/emqx_stomp_SUITE.erl @@ -359,6 +359,35 @@ t_1000_msg_send(_) -> lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000)) end). +t_sticky_packets_truncate_after_headers(_) -> + with_connection(fun(Sock) -> + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _} = parse(Data), + + Topic = <<"/queue/foo">>, + + emqx:subscribe(Topic), + gen_tcp:send(Sock, ["SEND\n", + "content-length:3\n", + "destination:/queue/foo\n"]), + timer:sleep(300), + gen_tcp:send(Sock, ["\nfoo",0]), + receive + {deliver, Topic, _Msg}-> + ok + after 100 -> + ?assert(false, "waiting message timeout") + end + end). + with_connection(DoFun) -> {ok, Sock} = gen_tcp:connect({127, 0, 0, 1}, 61613,