fix(stomp): fix the sticky tcp stream parsing

This commit is contained in:
JianBo He 2022-01-13 14:33:11 +08:00
parent 44ea853059
commit cce0b1ca34
2 changed files with 10 additions and 2 deletions

View File

@ -649,7 +649,7 @@ parse_incoming(Data, Packets,
, reason => Reason , reason => Reason
, stacktrace => Stk , stacktrace => Stk
}), }),
{[{frame_error, Reason}|Packets], State} {[{frame_error, Reason} | Packets], State}
end. end.
next_incoming_msgs([Packet]) -> next_incoming_msgs([Packet]) ->

View File

@ -136,6 +136,8 @@ parse(<<>>, Parser) ->
parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(Bytes, #{phase := body, length := Len, state := State}) ->
parse(body, Bytes, State, Len); parse(body, Bytes, State, Len);
parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none ->
parse(Phase, Bytes, State);
parse(Bytes, Parser = #{pre := Pre}) -> parse(Bytes, Parser = #{pre := Pre}) ->
parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser)); parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser));
@ -162,6 +164,8 @@ parse(command, <<?LF, Rest/binary>>, State = #parser_state{acc = Acc}) ->
parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>}); parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
parse(command, <<Ch:8, Rest/binary>>, State) -> parse(command, <<Ch:8, Rest/binary>>, State) ->
parse(command, Rest, acc(Ch, State)); parse(command, Rest, acc(Ch, State));
parse(command, <<>>, State) ->
{more, #{phase => command, state => State}};
parse(headers, <<?LF, Rest/binary>>, State) -> parse(headers, <<?LF, Rest/binary>>, State) ->
parse(body, Rest, State, content_len(State#parser_state{acc = <<>>})); parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
@ -174,6 +178,8 @@ parse(hdname, <<?COLON, Rest/binary>>, State = #parser_state{acc = Acc}) ->
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>}); parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
parse(hdname, <<Ch:8, Rest/binary>>, State) -> parse(hdname, <<Ch:8, Rest/binary>>, State) ->
parse(hdname, Rest, acc(Ch, State)); parse(hdname, Rest, acc(Ch, State));
parse(hdname, <<>>, State) ->
{more, #{phase => hdname, state => State}};
parse(hdvalue, <<?LF, Rest/binary>>, parse(hdvalue, <<?LF, Rest/binary>>,
State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) -> State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
@ -183,7 +189,9 @@ parse(hdvalue, <<?LF, Rest/binary>>,
}, },
parse(headers, Rest, NState); parse(headers, Rest, NState);
parse(hdvalue, <<Ch:8, Rest/binary>>, State) -> parse(hdvalue, <<Ch:8, Rest/binary>>, State) ->
parse(hdvalue, Rest, acc(Ch, State)). parse(hdvalue, Rest, acc(Ch, State));
parse(hdvalue, <<>>, State) ->
{more, #{phase => hdvalue, state => State}}.
%% @private %% @private
parse(body, <<>>, State, Length) -> parse(body, <<>>, State, Length) ->