From 979e495a1e4f0bba655d3a35703d3b644a8eae8a Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 24 May 2021 20:51:34 +0200 Subject: [PATCH 1/2] fix(emqx_frame): no need to split incoming bytes Prior to this commit, there was a bug in emqx_frame:split/2 the tail number of bytes was used for header number of bytes whens split. As a result, if the tail happens to be longer then haeder, the parsing state becomes invalid and it crashes when the next packet arrives The split was a over-engineered micro-optimization, so it has been deleted instead of fixed --- src/emqx.appup.src | 2 ++ src/emqx_frame.erl | 29 ++++++++--------------------- test/emqx_frame_SUITE.erl | 21 ++++++++++++++++++++- 3 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 3bf40272c..bfaa63443 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -3,6 +3,7 @@ [ {"4.3.1", [ {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, @@ -30,6 +31,7 @@ [ {"4.3.1", [ {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_frame, brutal_purge, soft_purge, []}, {load_module, emqx_cm, brutal_purge, soft_purge, []}, {load_module, emqx_congestion, brutal_purge, soft_purge, []}, {load_module, emqx_node_dump, brutal_purge, soft_purge, []}, diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index c4a2f6ac0..37063c65f 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -121,17 +121,8 @@ parse(Bin, {{body, #{hdr := Header, len := Length, rest := Body} }, Options}) when is_binary(Bin) -> - BodyBytes = body_bytes(Body), - {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin), - NewBody = append_body(Body, NewBodyPart), - parse_frame(NewBody, Tail, Header, Length, Options). - -%% split given binary with the first N bytes -split(N, Bin) when N =< 0 -> - {Bin, <<>>}; -split(N, Bin) when N =< size(Bin) -> - <> = Bin, - {H, T}. + NewBody = append_body(Body, Bin), + parse_frame(NewBody, Header, Length, Options). parse_remaining_len(<<>>, Header, Options) -> {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; @@ -178,19 +169,15 @@ append_body(H, T) when is_binary(H) -> append_body(?Q(Bytes, Q), T) -> ?Q(Bytes + iolist_size(T), queue:in(T, Q)). -flatten_body(Body, Tail) when is_binary(Body) -> <>; -flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]). +flatten_body(Body) when is_binary(Body) -> Body; +flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)). +parse_frame(Body, Header, 0, Options) -> + {ok, packet(Header), flatten_body(Body), ?none(Options)}; parse_frame(Body, Header, Length, Options) -> - %% already appended - parse_frame(Body, _SplitTail = <<>>, Header, Length, Options). - -parse_frame(Body, Tail, Header, 0, Options) -> - {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)}; -parse_frame(Body, Tail, Header, Length, Options) -> case body_bytes(Body) >= Length of true -> - <> = flatten_body(Body, Tail), + <> = flatten_body(Body), case parse_packet(Header, FrameBin, Options) of {Variable, Payload} -> {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; @@ -202,7 +189,7 @@ parse_frame(Body, Tail, Header, Length, Options) -> false -> {more, {{body, #{hdr => Header, len => Length, - rest => append_body(Body, Tail) + rest => Body }}, Options}} end. diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 09aa97c3e..09206cee1 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -58,7 +58,8 @@ groups() -> t_serialize_parse_connack_v5 ]}, {publish, [parallel], - [t_serialize_parse_qos0_publish, + [t_parse_sticky_frames, + t_serialize_parse_qos0_publish, t_serialize_parse_qos1_publish, t_serialize_parse_qos2_publish, t_serialize_parse_publish_v5 @@ -286,6 +287,24 @@ t_serialize_parse_connack_v5(_) -> Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). +t_parse_sticky_frames(_) -> + Payload = lists:duplicate(10, 0), + P = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = false, + qos = ?QOS_0, + retain = false}, + variable = #mqtt_packet_publish{topic_name = <<"a/b">>, + packet_id = undefined}, + payload = iolist_to_binary(Payload) + }, + Bin = serialize_to_binary(P), + Size = size(Bin), + <> = Bin, + {more, PState1} = emqx_frame:parse(H), %% needs 2 more bytes + %% feed 3 bytes as if the next 1 byte belongs to the next packet. + {ok, _, <<42>>, PState2} = emqx_frame:parse(iolist_to_binary([TailTwoBytes, 42]), PState1), + ?assertMatch({none, _}, PState2). + t_serialize_parse_qos0_publish(_) -> Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>, Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, From 6015b4ac4a854a0230057f5e79c192304b7b8d48 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Mon, 24 May 2021 21:12:15 +0200 Subject: [PATCH 2/2] build: disable macos build cache for now --- .github/workflows/build_slim_packages.yaml | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 573b84e1b..be3d46bc1 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -53,7 +53,7 @@ jobs: strategy: matrix: erl_otp: - - 23.2.7.2 + - 23.2.7.2-emqx-2 steps: - uses: actions/checkout@v1 @@ -73,15 +73,16 @@ jobs: brew install curl zip unzip gnu-sed kerl unixodbc freetds echo "/usr/local/bin" >> $GITHUB_PATH git config --global credential.helper store - - uses: actions/cache@v2 - id: cache - with: - path: ~/.kerl - key: erl${{ matrix.erl_otp }}-macos10.15 + # - uses: actions/cache@v2 + # id: cache + # with: + # path: ~/.kerl + # key: erl${{ matrix.erl_otp }}-macos10.15 - name: build erlang - if: steps.cache.outputs.cache-hit != 'true' + # if: steps.cache.outputs.cache-hit != 'true' timeout-minutes: 60 run: | + export OTP_GITHUB_URL="https://github.com/emqx/otp" kerl build ${{ matrix.erl_otp }} kerl install ${{ matrix.erl_otp }} $HOME/.kerl/${{ matrix.erl_otp }} - name: build