Merge pull request #6963 from HJianBo/fix_stomp_unexpected_linefeed
fix(stomp): fix unexpected_linefeed error if the packet truncated on …
This commit is contained in:
commit
175d6969d4
|
@ -170,7 +170,6 @@ t_subscribe_case02(_) ->
|
||||||
ReturnCode = 0,
|
ReturnCode = 0,
|
||||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
|
||||||
ClientId = ?CLIENTID,
|
|
||||||
send_connect_msg(Socket, ?CLIENTID),
|
send_connect_msg(Socket, ?CLIENTID),
|
||||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
|
||||||
|
|
|
@ -123,6 +123,8 @@ parse(<<>>, Parser) ->
|
||||||
|
|
||||||
parse(Bytes, #{phase := body, length := Len, state := State}) ->
|
parse(Bytes, #{phase := body, length := Len, state := State}) ->
|
||||||
parse(body, Bytes, State, Len);
|
parse(body, Bytes, State, Len);
|
||||||
|
parse(<<?LF, Bytes/binary>>, #{phase := hdname, state := State}) ->
|
||||||
|
parse(body, Bytes, State, content_len(State));
|
||||||
parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none ->
|
parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none ->
|
||||||
parse(Phase, Bytes, State);
|
parse(Phase, Bytes, State);
|
||||||
|
|
||||||
|
|
|
@ -359,6 +359,35 @@ t_1000_msg_send(_) ->
|
||||||
lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
|
lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
t_sticky_packets_truncate_after_headers(_) ->
|
||||||
|
with_connection(fun(Sock) ->
|
||||||
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
||||||
|
[{<<"accept-version">>, ?STOMP_VER},
|
||||||
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
||||||
|
{<<"login">>, <<"guest">>},
|
||||||
|
{<<"passcode">>, <<"guest">>},
|
||||||
|
{<<"heart-beat">>, <<"0,0">>}])),
|
||||||
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||||
|
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
||||||
|
headers = _,
|
||||||
|
body = _}, _} = parse(Data),
|
||||||
|
|
||||||
|
Topic = <<"/queue/foo">>,
|
||||||
|
|
||||||
|
emqx:subscribe(Topic),
|
||||||
|
gen_tcp:send(Sock, ["SEND\n",
|
||||||
|
"content-length:3\n",
|
||||||
|
"destination:/queue/foo\n"]),
|
||||||
|
timer:sleep(300),
|
||||||
|
gen_tcp:send(Sock, ["\nfoo",0]),
|
||||||
|
receive
|
||||||
|
{deliver, Topic, _Msg}->
|
||||||
|
ok
|
||||||
|
after 100 ->
|
||||||
|
?assert(false, "waiting message timeout")
|
||||||
|
end
|
||||||
|
end).
|
||||||
|
|
||||||
with_connection(DoFun) ->
|
with_connection(DoFun) ->
|
||||||
{ok, Sock} = gen_tcp:connect({127, 0, 0, 1},
|
{ok, Sock} = gen_tcp:connect({127, 0, 0, 1},
|
||||||
61613,
|
61613,
|
||||||
|
|
Loading…
Reference in New Issue