Add 'next_events/1' and fix the 'process_incoming/3' function

This commit is contained in:
Feng Lee 2019-01-10 09:17:21 +08:00 committed by Feng Lee
parent 3e15ac0bbb
commit 5ccaaed34c
1 changed files with 9 additions and 6 deletions

View File

@ -1326,19 +1326,15 @@ run_sock(State = #state{socket = Sock}) ->
%%------------------------------------------------------------------------------
%% Process incomming
process_incoming(<<>>, [Packet], State) ->
{keep_state, State, {next_event, cast, Packet}};
process_incoming(<<>>, Packets, State) ->
NextEvts = [{next_event, cast, Packet} || Packet <- lists:reverse(Packets)],
{keep_state, State, NextEvts};
{keep_state, State, next_events(Packets)};
process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) ->
try emqx_frame:parse(Bytes, ParseState) of
{ok, Packet, Rest} ->
process_incoming(Rest, [Packet|Packets], init_parse_state(State));
{more, NewParseState} ->
{keep_state, State#state{parse_state = NewParseState}};
{keep_state, State#state{parse_state = NewParseState}, next_events(Packets)};
{error, Reason} ->
{stop, Reason}
catch
@ -1346,6 +1342,13 @@ process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) ->
{stop, Error}
end.
next_events([]) ->
[];
next_events([Packet]) ->
{next_event, cast, Packet};
next_events(Packets) ->
[{next_event, cast, Packet} || Packet <- lists:reverse(Packets)].
%%------------------------------------------------------------------------------
%% Next packet id