Merge pull request #4858 from zmstone/fix-frame-parse-split-function-clause
fix(emqx_frame): no need to split incoming bytes
This commit is contained in:
commit
37c559a08d
|
@ -53,7 +53,7 @@ jobs:
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
erl_otp:
|
erl_otp:
|
||||||
- 23.2.7.2
|
- 23.2.7.2-emqx-2
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v1
|
- uses: actions/checkout@v1
|
||||||
|
@ -73,15 +73,16 @@ jobs:
|
||||||
brew install curl zip unzip gnu-sed kerl unixodbc freetds
|
brew install curl zip unzip gnu-sed kerl unixodbc freetds
|
||||||
echo "/usr/local/bin" >> $GITHUB_PATH
|
echo "/usr/local/bin" >> $GITHUB_PATH
|
||||||
git config --global credential.helper store
|
git config --global credential.helper store
|
||||||
- uses: actions/cache@v2
|
# - uses: actions/cache@v2
|
||||||
id: cache
|
# id: cache
|
||||||
with:
|
# with:
|
||||||
path: ~/.kerl
|
# path: ~/.kerl
|
||||||
key: erl${{ matrix.erl_otp }}-macos10.15
|
# key: erl${{ matrix.erl_otp }}-macos10.15
|
||||||
- name: build erlang
|
- name: build erlang
|
||||||
if: steps.cache.outputs.cache-hit != 'true'
|
# if: steps.cache.outputs.cache-hit != 'true'
|
||||||
timeout-minutes: 60
|
timeout-minutes: 60
|
||||||
run: |
|
run: |
|
||||||
|
export OTP_GITHUB_URL="https://github.com/emqx/otp"
|
||||||
kerl build ${{ matrix.erl_otp }}
|
kerl build ${{ matrix.erl_otp }}
|
||||||
kerl install ${{ matrix.erl_otp }} $HOME/.kerl/${{ matrix.erl_otp }}
|
kerl install ${{ matrix.erl_otp }} $HOME/.kerl/${{ matrix.erl_otp }}
|
||||||
- name: build
|
- name: build
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
[
|
[
|
||||||
{"4.3.1", [
|
{"4.3.1", [
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_node_dump, brutal_purge, soft_purge, []},
|
{load_module, emqx_node_dump, brutal_purge, soft_purge, []},
|
||||||
|
@ -30,6 +31,7 @@
|
||||||
[
|
[
|
||||||
{"4.3.1", [
|
{"4.3.1", [
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_node_dump, brutal_purge, soft_purge, []},
|
{load_module, emqx_node_dump, brutal_purge, soft_purge, []},
|
||||||
|
|
|
@ -121,17 +121,8 @@ parse(Bin, {{body, #{hdr := Header,
|
||||||
len := Length,
|
len := Length,
|
||||||
rest := Body}
|
rest := Body}
|
||||||
}, Options}) when is_binary(Bin) ->
|
}, Options}) when is_binary(Bin) ->
|
||||||
BodyBytes = body_bytes(Body),
|
NewBody = append_body(Body, Bin),
|
||||||
{NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin),
|
parse_frame(NewBody, Header, Length, Options).
|
||||||
NewBody = append_body(Body, NewBodyPart),
|
|
||||||
parse_frame(NewBody, Tail, Header, Length, Options).
|
|
||||||
|
|
||||||
%% split given binary with the first N bytes
|
|
||||||
split(N, Bin) when N =< 0 ->
|
|
||||||
{Bin, <<>>};
|
|
||||||
split(N, Bin) when N =< size(Bin) ->
|
|
||||||
<<H:N/binary, T/binary>> = Bin,
|
|
||||||
{H, T}.
|
|
||||||
|
|
||||||
parse_remaining_len(<<>>, Header, Options) ->
|
parse_remaining_len(<<>>, Header, Options) ->
|
||||||
{more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
|
{more, {{len, #{hdr => Header, len => {1, 0}}}, Options}};
|
||||||
|
@ -178,19 +169,15 @@ append_body(H, T) when is_binary(H) ->
|
||||||
append_body(?Q(Bytes, Q), T) ->
|
append_body(?Q(Bytes, Q), T) ->
|
||||||
?Q(Bytes + iolist_size(T), queue:in(T, Q)).
|
?Q(Bytes + iolist_size(T), queue:in(T, Q)).
|
||||||
|
|
||||||
flatten_body(Body, Tail) when is_binary(Body) -> <<Body/binary, Tail/binary>>;
|
flatten_body(Body) when is_binary(Body) -> Body;
|
||||||
flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]).
|
flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)).
|
||||||
|
|
||||||
|
parse_frame(Body, Header, 0, Options) ->
|
||||||
|
{ok, packet(Header), flatten_body(Body), ?none(Options)};
|
||||||
parse_frame(Body, Header, Length, Options) ->
|
parse_frame(Body, Header, Length, Options) ->
|
||||||
%% already appended
|
|
||||||
parse_frame(Body, _SplitTail = <<>>, Header, Length, Options).
|
|
||||||
|
|
||||||
parse_frame(Body, Tail, Header, 0, Options) ->
|
|
||||||
{ok, packet(Header), flatten_body(Body, Tail), ?none(Options)};
|
|
||||||
parse_frame(Body, Tail, Header, Length, Options) ->
|
|
||||||
case body_bytes(Body) >= Length of
|
case body_bytes(Body) >= Length of
|
||||||
true ->
|
true ->
|
||||||
<<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body, Tail),
|
<<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body),
|
||||||
case parse_packet(Header, FrameBin, Options) of
|
case parse_packet(Header, FrameBin, Options) of
|
||||||
{Variable, Payload} ->
|
{Variable, Payload} ->
|
||||||
{ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
|
{ok, packet(Header, Variable, Payload), Rest, ?none(Options)};
|
||||||
|
@ -202,7 +189,7 @@ parse_frame(Body, Tail, Header, Length, Options) ->
|
||||||
false ->
|
false ->
|
||||||
{more, {{body, #{hdr => Header,
|
{more, {{body, #{hdr => Header,
|
||||||
len => Length,
|
len => Length,
|
||||||
rest => append_body(Body, Tail)
|
rest => Body
|
||||||
}}, Options}}
|
}}, Options}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,8 @@ groups() ->
|
||||||
t_serialize_parse_connack_v5
|
t_serialize_parse_connack_v5
|
||||||
]},
|
]},
|
||||||
{publish, [parallel],
|
{publish, [parallel],
|
||||||
[t_serialize_parse_qos0_publish,
|
[t_parse_sticky_frames,
|
||||||
|
t_serialize_parse_qos0_publish,
|
||||||
t_serialize_parse_qos1_publish,
|
t_serialize_parse_qos1_publish,
|
||||||
t_serialize_parse_qos2_publish,
|
t_serialize_parse_qos2_publish,
|
||||||
t_serialize_parse_publish_v5
|
t_serialize_parse_publish_v5
|
||||||
|
@ -286,6 +287,24 @@ t_serialize_parse_connack_v5(_) ->
|
||||||
Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props),
|
Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props),
|
||||||
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
t_parse_sticky_frames(_) ->
|
||||||
|
Payload = lists:duplicate(10, 0),
|
||||||
|
P = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
dup = false,
|
||||||
|
qos = ?QOS_0,
|
||||||
|
retain = false},
|
||||||
|
variable = #mqtt_packet_publish{topic_name = <<"a/b">>,
|
||||||
|
packet_id = undefined},
|
||||||
|
payload = iolist_to_binary(Payload)
|
||||||
|
},
|
||||||
|
Bin = serialize_to_binary(P),
|
||||||
|
Size = size(Bin),
|
||||||
|
<<H:(Size-2)/binary, TailTwoBytes/binary>> = Bin,
|
||||||
|
{more, PState1} = emqx_frame:parse(H), %% needs 2 more bytes
|
||||||
|
%% feed 3 bytes as if the next 1 byte belongs to the next packet.
|
||||||
|
{ok, _, <<42>>, PState2} = emqx_frame:parse(iolist_to_binary([TailTwoBytes, 42]), PState1),
|
||||||
|
?assertMatch({none, _}, PState2).
|
||||||
|
|
||||||
t_serialize_parse_qos0_publish(_) ->
|
t_serialize_parse_qos0_publish(_) ->
|
||||||
Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>,
|
Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>,
|
||||||
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
|
Loading…
Reference in New Issue