Merge pull request #6868 from HJianBo/fix-stomp-frame-parser
Fix the sticky tcp stream parsing
This commit is contained in:
commit
34c489da40
|
@ -123,6 +123,8 @@ parse(<<>>, Parser) ->
|
||||||
|
|
||||||
parse(Bytes, #{phase := body, length := Len, state := State}) ->
|
parse(Bytes, #{phase := body, length := Len, state := State}) ->
|
||||||
parse(body, Bytes, State, Len);
|
parse(body, Bytes, State, Len);
|
||||||
|
parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none ->
|
||||||
|
parse(Phase, Bytes, State);
|
||||||
|
|
||||||
parse(Bytes, Parser = #{pre := Pre}) ->
|
parse(Bytes, Parser = #{pre := Pre}) ->
|
||||||
parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser));
|
parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser));
|
||||||
|
@ -153,6 +155,8 @@ parse(command, <<?LF, Rest/binary>>, State = #parser_state{acc = Acc}) ->
|
||||||
parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
|
parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
|
||||||
parse(command, <<Ch:8, Rest/binary>>, State) ->
|
parse(command, <<Ch:8, Rest/binary>>, State) ->
|
||||||
parse(command, Rest, acc(Ch, State));
|
parse(command, Rest, acc(Ch, State));
|
||||||
|
parse(command, <<>>, State) ->
|
||||||
|
{more, #{phase => command, state => State}};
|
||||||
|
|
||||||
parse(headers, <<?LF, Rest/binary>>, State) ->
|
parse(headers, <<?LF, Rest/binary>>, State) ->
|
||||||
parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
|
parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
|
||||||
|
@ -165,11 +169,15 @@ parse(hdname, <<?COLON, Rest/binary>>, State = #parser_state{acc = Acc}) ->
|
||||||
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
|
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
|
||||||
parse(hdname, <<Ch:8, Rest/binary>>, State) ->
|
parse(hdname, <<Ch:8, Rest/binary>>, State) ->
|
||||||
parse(hdname, Rest, acc(Ch, State));
|
parse(hdname, Rest, acc(Ch, State));
|
||||||
|
parse(hdname, <<>>, State) ->
|
||||||
|
{more, #{phase => hdname, state => State}};
|
||||||
|
|
||||||
parse(hdvalue, <<?LF, Rest/binary>>, State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
|
parse(hdvalue, <<?LF, Rest/binary>>, State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
|
||||||
parse(headers, Rest, State#parser_state{headers = add_header(Name, Acc, Headers), hdname = undefined, acc = <<>>});
|
parse(headers, Rest, State#parser_state{headers = add_header(Name, Acc, Headers), hdname = undefined, acc = <<>>});
|
||||||
parse(hdvalue, <<Ch:8, Rest/binary>>, State) ->
|
parse(hdvalue, <<Ch:8, Rest/binary>>, State) ->
|
||||||
parse(hdvalue, Rest, acc(Ch, State)).
|
parse(hdvalue, Rest, acc(Ch, State));
|
||||||
|
parse(hdvalue, <<>>, State) ->
|
||||||
|
{more, #{phase => hdvalue, state => State}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
parse(body, <<>>, State, Length) ->
|
parse(body, <<>>, State, Length) ->
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-module(emqx_stomp_SUITE).
|
-module(emqx_stomp_SUITE).
|
||||||
|
|
||||||
-include_lib("emqx_stomp/include/emqx_stomp.hrl").
|
-include_lib("emqx_stomp/include/emqx_stomp.hrl").
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
@ -324,6 +325,40 @@ t_ack(_) ->
|
||||||
body = _}, _} = parse(Data4)
|
body = _}, _} = parse(Data4)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
t_1000_msg_send(_) ->
|
||||||
|
with_connection(fun(Sock) ->
|
||||||
|
gen_tcp:send(Sock, serialize(<<"CONNECT">>,
|
||||||
|
[{<<"accept-version">>, ?STOMP_VER},
|
||||||
|
{<<"host">>, <<"127.0.0.1:61613">>},
|
||||||
|
{<<"login">>, <<"guest">>},
|
||||||
|
{<<"passcode">>, <<"guest">>},
|
||||||
|
{<<"heart-beat">>, <<"0,0">>}])),
|
||||||
|
{ok, Data} = gen_tcp:recv(Sock, 0),
|
||||||
|
{ok, #stomp_frame{command = <<"CONNECTED">>,
|
||||||
|
headers = _,
|
||||||
|
body = _}, _} = parse(Data),
|
||||||
|
|
||||||
|
Topic = <<"/queue/foo">>,
|
||||||
|
SendFun = fun() ->
|
||||||
|
gen_tcp:send(Sock, serialize(<<"SEND">>,
|
||||||
|
[{<<"destination">>, Topic}],
|
||||||
|
<<"msgtest">>))
|
||||||
|
end,
|
||||||
|
|
||||||
|
RecvFun = fun() ->
|
||||||
|
receive
|
||||||
|
{deliver, Topic, _Msg}->
|
||||||
|
ok
|
||||||
|
after 100 ->
|
||||||
|
?assert(false, "waiting message timeout")
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
emqx:subscribe(Topic),
|
||||||
|
lists:foreach(fun(_) -> SendFun() end, lists:seq(1, 1000)),
|
||||||
|
lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000))
|
||||||
|
end).
|
||||||
|
|
||||||
with_connection(DoFun) ->
|
with_connection(DoFun) ->
|
||||||
{ok, Sock} = gen_tcp:connect({127, 0, 0, 1},
|
{ok, Sock} = gen_tcp:connect({127, 0, 0, 1},
|
||||||
61613,
|
61613,
|
||||||
|
|
Loading…
Reference in New Issue