diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 3bf40272c..bfaa63443 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,7 @@ [ {"4.3.1", [ {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, @@ -30,6 +31,7 @@ [ {"4.3.1", [ {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index c4a2f6ac0..37063c65f 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -121,17 +121,8 @@ parse(Bin, {{body, #{hdr := Header, len := Length, rest := Body} }, Options}) when is_binary(Bin) -> - BodyBytes = body_bytes(Body), - {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin), - NewBody = append_body(Body, NewBodyPart), - parse_frame(NewBody, Tail, Header, Length, Options). - -%% split given binary with the first N bytes -split(N, Bin) when N =< 0 -> - {Bin, <<>>}; -split(N, Bin) when N =< size(Bin) -> - <> = Bin, - {H, T}. + NewBody = append_body(Body, Bin), + parse_frame(NewBody, Header, Length, Options). parse_remaining_len(<<>>, Header, Options) -> {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; @@ -178,19 +169,15 @@ append_body(H, T) when is_binary(H) -> append_body(?Q(Bytes, Q), T) -> ?Q(Bytes + iolist_size(T), queue:in(T, Q)). -flatten_body(Body, Tail) when is_binary(Body) -> <>; -flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]). +flatten_body(Body) when is_binary(Body) -> Body; +flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)). +parse_frame(Body, Header, 0, Options) -> + {ok, packet(Header), flatten_body(Body), ?none(Options)}; parse_frame(Body, Header, Length, Options) -> - %% already appended - parse_frame(Body, _SplitTail = <<>>, Header, Length, Options). - -parse_frame(Body, Tail, Header, 0, Options) -> - {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)}; -parse_frame(Body, Tail, Header, Length, Options) -> case body_bytes(Body) >= Length of true -> - <> = flatten_body(Body, Tail), + <> = flatten_body(Body), case parse_packet(Header, FrameBin, Options) of {Variable, Payload} -> {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; @@ -202,7 +189,7 @@ parse_frame(Body, Tail, Header, Length, Options) -> false -> {more, {{body, #{hdr => Header, len => Length, - rest => append_body(Body, Tail) + rest => Body }}, Options}} end. diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 09aa97c3e..09206cee1 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -58,7 +58,8 @@ groups() -> t_serialize_parse_connack_v5 ]}, {publish, [parallel], - [t_serialize_parse_qos0_publish, + [t_parse_sticky_frames, + t_serialize_parse_qos0_publish, t_serialize_parse_qos1_publish, t_serialize_parse_qos2_publish, t_serialize_parse_publish_v5 @@ -286,6 +287,24 @@ t_serialize_parse_connack_v5(_) -> Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). +t_parse_sticky_frames(_) -> + Payload = lists:duplicate(10, 0), + P = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = false, + qos = ?QOS_0, + retain = false}, + variable = #mqtt_packet_publish{topic_name = <<"a/b">>, + packet_id = undefined}, + payload = iolist_to_binary(Payload) + }, + Bin = serialize_to_binary(P), + Size = size(Bin), + <> = Bin, + {more, PState1} = emqx_frame:parse(H), %% needs 2 more bytes + %% feed 3 bytes as if the next 1 byte belongs to the next packet. + {ok, _, <<42>>, PState2} = emqx_frame:parse(iolist_to_binary([TailTwoBytes, 42]), PState1), + ?assertMatch({none, _}, PState2). + t_serialize_parse_qos0_publish(_) -> Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>, Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,