diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 8a63b28d8..51653d3c0 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -649,7 +649,7 @@ parse_incoming(Data, Packets, , reason => Reason , stacktrace => Stk }), - {[{frame_error, Reason}|Packets], State} + {[{frame_error, Reason} | Packets], State} end. next_incoming_msgs([Packet]) -> diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index e079e5fe6..a2ffa1988 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -136,6 +136,8 @@ parse(<<>>, Parser) -> parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); +parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none -> + parse(Phase, Bytes, State); parse(Bytes, Parser = #{pre := Pre}) -> parse(<
>, maps:without([pre], Parser));
@@ -162,6 +164,8 @@ parse(command, <>, State = #parser_state{acc = Acc}) ->
     parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
 parse(command, <>, State) ->
     parse(command, Rest, acc(Ch, State));
+parse(command, <<>>, State) ->
+    {more, #{phase => command, state => State}};
 
 parse(headers, <>, State) ->
     parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
@@ -174,6 +178,8 @@ parse(hdname, <>, State = #parser_state{acc = Acc}) ->
     parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
 parse(hdname, <>, State) ->
     parse(hdname, Rest, acc(Ch, State));
+parse(hdname, <<>>, State) ->
+    {more, #{phase => hdname, state => State}};
 
 parse(hdvalue, <>,
       State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
@@ -183,7 +189,9 @@ parse(hdvalue, <>,
                                },
     parse(headers, Rest, NState);
 parse(hdvalue, <>, State) ->
-    parse(hdvalue, Rest, acc(Ch, State)).
+    parse(hdvalue, Rest, acc(Ch, State));
+parse(hdvalue, <<>>, State) ->
+    {more, #{phase => hdvalue, state => State}}.
 
 %% @private
 parse(body, <<>>, State, Length) ->