From 8a73c62f662cc986ad0cf820ac15851216f77acd Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 20 Mar 2019 10:14:52 +0800 Subject: [PATCH] Fix followed packet parse failure (#2333) To fix issue#2303(https://github.com/emqx/emqx/issues/2303) It will report the following error, when a connection sends a TCP frame contained many of MQTT packet and followed a split MQTT packet. --- src/emqx_connection.erl | 24 +++++++++---------- test/emqx_protocol_SUITE.erl | 45 ++++++++++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index beacbb255..74f98b2f0 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -192,10 +192,10 @@ idle(enter, _, State) -> idle(timeout, _Timeout, State) -> {stop, idle_timeout, State}; -idle(cast, {incoming, Packet, PState}, _State) -> +idle(cast, {incoming, Packet}, State) -> handle_packet(Packet, fun(NState) -> {next_state, connected, reset_parser(NState)} - end, PState); + end, State); idle(EventType, Content, State) -> ?HANDLE(EventType, Content, State). @@ -208,12 +208,12 @@ connected(enter, _, _State) -> keep_state_and_data; %% Handle Input -connected(cast, {incoming, Packet = ?PACKET(Type), PState}, _State) -> +connected(cast, {incoming, Packet = ?PACKET(Type)}, State) -> _ = emqx_metrics:received(Packet), (Type == ?PUBLISH) andalso emqx_pd:update_counter(incoming_pubs, 1), handle_packet(Packet, fun(NState) -> - {keep_state, reset_parser(NState)} - end, PState); + {keep_state, NState} + end, State); %% Handle Output connected(info, {deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> @@ -373,14 +373,14 @@ terminate(Reason, _StateName, #state{transport = Transport, %% Process incoming data process_incoming(<<>>, Packets, State) -> - {keep_state, State, next_events({Packets, State})}; + {keep_state, State, next_events(Packets)}; process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> try emqx_frame:parse(Data, ParseState) of {ok, Packet, Rest} -> process_incoming(Rest, [Packet|Packets], reset_parser(State)); {more, NewParseState} -> - {keep_state, State#state{parse_state = NewParseState}, next_events({Packets, State})}; + {keep_state, State#state{parse_state = NewParseState}, next_events(Packets)}; {error, Reason} -> shutdown(Reason, State) catch @@ -392,12 +392,10 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> reset_parser(State = #state{proto_state = ProtoState}) -> State#state{parse_state = emqx_protocol:parser(ProtoState)}. -next_events([]) -> - []; -next_events([{Packet, State}]) -> - {next_event, cast, {incoming, Packet, State}}; -next_events({Packets, State}) -> - [next_events([{Packet, State}]) || Packet <- lists:reverse(Packets)]. +next_events(Packets) when is_list(Packets) -> + [next_events(Packet) || Packet <- lists:reverse(Packets)]; +next_events(Packet) -> + {next_event, cast, {incoming, Packet}}. %%------------------------------------------------------------------------------ %% Handle incoming packet diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index c447b8e4a..8df878147 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -36,7 +36,8 @@ all() -> {group, mqtt_common}, {group, mqttv4}, {group, mqttv5}, - {group, acl} + {group, acl}, + {group, frame_partial} ]. groups() -> @@ -51,7 +52,9 @@ groups() -> [connect_v5, subscribe_v5]}, {acl, [sequence], - [acl_deny_action_ct]}]. + [acl_deny_action_ct]}, + {frame_partial, [sequence], + [handle_followed_packet]}]. init_per_suite(Config) -> [start_apps(App, SchemaFile, ConfigFile) || @@ -97,6 +100,44 @@ with_connection(DoFun) -> % emqx_client_sock:close(Sock) % end. +handle_followed_packet(_Config) -> + ConnPkt = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, + PartialPkt1 = <<50,182,1,0,4,116,101,115,116,0,1,48,48,48,48,48,48,48,48,48,48,48,48,48, + 48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48, + 48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48, + 48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48, + 48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48>>, + PartialPkt2 = <<48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48, + 48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48, + 48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48>>, + + %% This is a PUBLISH message (Qos=1) + PubPkt = <>, + ComplexPkt = <>, + + AssertConnAck = fun(R) -> ?assertEqual({ok, <<32,2,0,0>>}, R) end, + AssertPubAck = fun(R) -> ?assertEqual({ok, <<64,2,0,1>>}, R) end, + + {ok, Sock} = gen_tcp:connect("127.0.0.1", 1883, [{active, false}, binary]), + + %% CONNECT + ok = gen_tcp:send(Sock, ConnPkt), + AssertConnAck(gen_tcp:recv(Sock, 4, 500)), + + %% Once Publish + ok = gen_tcp:send(Sock, PubPkt), + AssertPubAck(gen_tcp:recv(Sock, 4, 500)), + + %% Complex Packet + ok = gen_tcp:send(Sock, ComplexPkt), + AssertPubAck(gen_tcp:recv(Sock, 4, 500)), + AssertPubAck(gen_tcp:recv(Sock, 4, 500)), + AssertPubAck(gen_tcp:recv(Sock, 4, 500)), + + ok = gen_tcp:send(Sock, PartialPkt2), + AssertPubAck(gen_tcp:recv(Sock, 4, 500)), + gen_tcp:close(Sock). + connect_v4(_) -> with_connection(fun([Sock]) -> emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))),