From de43da881a26346ae4c091408ff6a04fedd89bc6 Mon Sep 17 00:00:00 2001 From: Zaiming Shi Date: Thu, 13 May 2021 22:52:04 +0200 Subject: [PATCH] fix(emqx_frame): poor large frame concatenation performance piror to this change, binary concatenation eats most of the CPU --- src/emqx_connection.erl | 2 +- src/emqx_frame.erl | 73 ++++++++++++++++++++++++++++++----------- 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 69ffddf06..89bb06e12 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -475,7 +475,7 @@ terminate(Reason, State = #state{channel = Channel, transport = Transport, E : C : S -> ?tp(warning, unclean_terminate, #{exception => E, context => C, stacktrace => S}) end, - ?tp(debug, terminate, #{}), + ?tp(info, terminate, #{reason => Reason}), maybe_raise_excption(Reason). %% close socket, discard new state, always return ok. diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 7e5aae788..c4a2f6ac0 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -40,6 +40,8 @@ , serialize_opts/0 ]). +-define(Q(BYTES, Q), {BYTES, Q}). + -type(options() :: #{strict_mode => boolean(), max_size => 1..?MAX_PACKET_SIZE, version => emqx_types:version() @@ -50,12 +52,12 @@ -type(parse_result() :: {more, parse_state()} | {ok, emqx_types:packet(), binary(), parse_state()}). --type(cont_state() :: {Stage :: len | body, - State :: #{hdr := #mqtt_packet_header{}, - len := {pos_integer(), non_neg_integer()} | non_neg_integer(), - rest => binary() - } - }). +-type(cont_state() :: + {Stage :: len | body, + State :: #{hdr := #mqtt_packet_header{}, + len := {pos_integer(), non_neg_integer()} | non_neg_integer(), + rest => binary() | ?Q(non_neg_integer(), queue:queue(binary())) + }}). -type(serialize_opts() :: options()). @@ -117,9 +119,19 @@ parse(Bin, {{len, #{hdr := Header, parse_remaining_len(Bin, Header, Multiplier, Length, Options); parse(Bin, {{body, #{hdr := Header, len := Length, - rest := Rest} + rest := Body} }, Options}) when is_binary(Bin) -> - parse_frame(<>, Header, Length, Options). + BodyBytes = body_bytes(Body), + {NewBodyPart, Tail} = split(BodyBytes + size(Bin) - Length, Bin), + NewBody = append_body(Body, NewBodyPart), + parse_frame(NewBody, Tail, Header, Length, Options). + +%% split given binary with the first N bytes +split(N, Bin) when N =< 0 -> + {Bin, <<>>}; +split(N, Bin) when N =< size(Bin) -> + <> = Bin, + {H, T}. parse_remaining_len(<<>>, Header, Options) -> {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; @@ -132,7 +144,8 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; %% Match DISCONNECT without payload -parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> +parse_remaining_len(<<0:8, Rest/binary>>, + Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), {ok, Packet, Rest, ?none(Options)}; %% Match PINGREQ. @@ -149,16 +162,35 @@ parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Opti parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options = #{max_size := MaxSize}) -> FrameLen = Value + Len * Multiplier, - if - FrameLen > MaxSize -> error(frame_too_large); - true -> parse_frame(Rest, Header, FrameLen, Options) + case FrameLen > MaxSize of + true -> error(frame_too_large); + false -> parse_frame(Rest, Header, FrameLen, Options) end. -parse_frame(Bin, Header, 0, Options) -> - {ok, packet(Header), Bin, ?none(Options)}; -parse_frame(Bin, Header, Length, Options) -> - case Bin of - <> -> +body_bytes(B) when is_binary(B) -> size(B); +body_bytes(?Q(Bytes, _)) -> Bytes. + +append_body(H, T) when is_binary(H) andalso size(H) < 1024 -> + <>; +append_body(H, T) when is_binary(H) -> + Bytes = size(H) + size(T), + ?Q(Bytes, queue:from_list([H, T])); +append_body(?Q(Bytes, Q), T) -> + ?Q(Bytes + iolist_size(T), queue:in(T, Q)). + +flatten_body(Body, Tail) when is_binary(Body) -> <>; +flatten_body(?Q(_, Q), Tail) -> iolist_to_binary([queue:to_list(Q), Tail]). + +parse_frame(Body, Header, Length, Options) -> + %% already appended + parse_frame(Body, _SplitTail = <<>>, Header, Length, Options). + +parse_frame(Body, Tail, Header, 0, Options) -> + {ok, packet(Header), flatten_body(Body, Tail), ?none(Options)}; +parse_frame(Body, Tail, Header, Length, Options) -> + case body_bytes(Body) >= Length of + true -> + <> = flatten_body(Body, Tail), case parse_packet(Header, FrameBin, Options) of {Variable, Payload} -> {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; @@ -167,8 +199,11 @@ parse_frame(Bin, Header, Length, Options) -> Variable -> {ok, packet(Header, Variable), Rest, ?none(Options)} end; - TooShortBin -> - {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} + false -> + {more, {{body, #{hdr => Header, + len => Length, + rest => append_body(Body, Tail) + }}, Options}} end. -compile({inline, [packet/1, packet/2, packet/3]}).