diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 4c19bbcc1..13765a5e2 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -1326,19 +1326,15 @@ run_sock(State = #state{socket = Sock}) -> %%------------------------------------------------------------------------------ %% Process incomming -process_incoming(<<>>, [Packet], State) -> - {keep_state, State, {next_event, cast, Packet}}; - process_incoming(<<>>, Packets, State) -> - NextEvts = [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)], - {keep_state, State, NextEvts}; + {keep_state, State, next_events(Packets)}; process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) -> try emqx_frame:parse(Bytes, ParseState) of {ok, Packet, Rest} -> process_incoming(Rest, [Packet|Packets], init_parse_state(State)); {more, NewParseState} -> - {keep_state, State#state{parse_state = NewParseState}}; + {keep_state, State#state{parse_state = NewParseState}, next_events(Packets)}; {error, Reason} -> {stop, Reason} catch @@ -1346,6 +1342,13 @@ process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) -> {stop, Error} end. +next_events([]) -> + []; +next_events([Packet]) -> + {next_event, cast, Packet}; +next_events(Packets) -> + [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)]. + %%------------------------------------------------------------------------------ %% Next packet id