diff --git a/src/emqx.appup.src b/src/emqx.appup.src index c36611f85..87ca0c005 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,48 +1,9 @@ %% -*-: erlang -*- -{DefaultLen, DefaultSize} = - case WordSize = erlang:system_info(wordsize) of - 8 -> % arch_64 - {10000, cuttlefish_bytesize:parse("64MB")}; - 4 -> % arch_32 - {1000, cuttlefish_bytesize:parse("32MB")} - end, -{"4.2.3", - [ - {"4.2.2", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.2.1", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]}, - {"4.2.0", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []}, - {apply, {application, set_env, - [emqx, force_shutdown_policy, - #{message_queue_len => DefaultLen, - max_heap_size => DefaultSize div WordSize}]}} - ]} - ], - [ - {"4.2.2", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []} - ]}, - {"4.2.1", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]}, - {"4.2.0", [ - {load_module, emqx_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_channel, brutal_purge, soft_purge, []}, - {load_module, emqx_mod_topic_metrics, brutal_purge, soft_purge, []}, - {load_module, emqx_json, brutal_purge, soft_purge, []} - ]} - ] +{VSN, + [ + {<<".*">>, []} + ], + [ + {<<".*">>, []} + ] }. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 2ab7f6c1e..8e8f7117d 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -80,8 +80,8 @@ limit_timer :: maybe(reference()), %% Parse State parse_state :: emqx_frame:parse_state(), - %% Serialize function - serialize :: emqx_frame:serialize_fun(), + %% Serialize options + serialize :: emqx_frame:serialize_opts(), %% Channel State channel :: emqx_channel:channel(), %% GC State @@ -203,7 +203,7 @@ init_state(Transport, Socket, Options) -> Limiter = emqx_limiter:init(Zone, PubLimit, BytesIn, RateLimit), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), - Serialize = emqx_frame:serialize_fun(), + Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Options), GcState = emqx_zone:init_gc_state(Zone), StatsTimer = emqx_zone:stats_timer(Zone), @@ -337,7 +337,7 @@ handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl -> handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #state{idle_timer = IdleTimer}) -> ok = emqx_misc:cancel_timer(IdleTimer), - Serialize = emqx_frame:serialize_fun(ConnPkt), + Serialize = emqx_frame:serialize_opts(ConnPkt), NState = State#state{serialize = Serialize, idle_timer = undefined }, @@ -578,7 +578,7 @@ handle_outgoing(Packet, State) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case Serialize(Packet) of + case emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 1c27548f7..cd41a935c 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -31,26 +31,53 @@ , serialize/2 ]). +%% The new version APIs to avoid saving +%% anonymous func +-export([ parse2/1 + , parse2/2 + , serialize_opts/0 + , serialize_opts/1 + , serialize_pkt/2 + ]). + -export_type([ options/0 , parse_state/0 , parse_result/0 , serialize_fun/0 ]). +-export_type([ parse_state2/0 + , parse_result2/0 + , serialize_opts/0 + ]). + -type(options() :: #{strict_mode => boolean(), max_size => 1..?MAX_PACKET_SIZE, version => emqx_types:version() }). -type(parse_state() :: {none, options()} | cont_fun()). +-type(parse_state2() :: {none, options()} | {cont_state(), options()}). -type(parse_result() :: {more, cont_fun()} | {ok, emqx_types:packet(), binary(), parse_state()}). +-type(parse_result2() :: {more, parse_state()} + | {ok, emqx_types:packet(), binary(), parse_state()}). + -type(cont_fun() :: fun((binary()) -> parse_result())). +-type(cont_state() :: {Stage :: len | body, + State :: #{hdr := #mqtt_packet_header{}, + len := {pos_integer(), non_neg_integer()} | non_neg_integer(), + rest => binary() + } + }). + -type(serialize_fun() :: fun((emqx_types:packet()) -> iodata())). +-type(serialize_opts() :: options()). + -define(none(Options), {none, Options}). -define(DEFAULT_OPTIONS, @@ -81,6 +108,89 @@ merge_opts(Options) -> %% Parse MQTT Frame %%-------------------------------------------------------------------- +-spec(parse2(binary()) -> parse_result2()). +parse2(Bin) -> + parse2(Bin, initial_parse_state()). + +-spec(parse2(binary(), parse_state()) -> parse_result2()). +parse2(<<>>, {none, Options}) -> + {more, {none, Options}}; +parse2(<>, + {none, Options = #{strict_mode := StrictMode}}) -> + %% Validate header if strict mode. + StrictMode andalso validate_header(Type, Dup, QoS, Retain), + Header = #mqtt_packet_header{type = Type, + dup = bool(Dup), + qos = QoS, + retain = bool(Retain) + }, + Header1 = case fixqos(Type, QoS) of + QoS -> Header; + FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} + end, + parse_remaining_len2(Rest, Header1, Options); + +parse2(Bin, {{len, #{hdr := Header, + len := {Multiplier, Length}} + }, Options}) when is_binary(Bin) -> + parse_remaining_len2(Bin, Header, Multiplier, Length, Options); +parse2(Bin, {{body, #{hdr := Header, + len := Length, + rest := Rest} + }, Options}) when is_binary(Bin) -> + parse_frame2(<>, Header, Length, Options). + +parse_remaining_len2(<<>>, Header, Options) -> + {more, {{len, #{hdr => Header, len => {1, 0}}}, Options}}; +parse_remaining_len2(Rest, Header, Options) -> + parse_remaining_len2(Rest, Header, 1, 0, Options). + +parse_remaining_len2(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) + when Length > MaxSize -> + error(frame_too_large); +parse_remaining_len2(<<>>, Header, Multiplier, Length, Options) -> + {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; +%% Match DISCONNECT without payload +parse_remaining_len2(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> + Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), + {ok, Packet, Rest, ?none(Options)}; +%% Match PINGREQ. +parse_remaining_len2(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> + parse_frame2(Rest, Header, 0, Options); +%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... +parse_remaining_len2(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> + parse_frame2(Rest, Header, 2, Options); +parse_remaining_len2(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> + parse_remaining_len2(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); +parse_remaining_len2(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, + Options = #{max_size := MaxSize}) -> + FrameLen = Value + Len * Multiplier, + if + FrameLen > MaxSize -> error(frame_too_large); + true -> parse_frame2(Rest, Header, FrameLen, Options) + end. + +parse_frame2(Bin, Header, 0, Options) -> + {ok, packet(Header), Bin, ?none(Options)}; + +parse_frame2(Bin, Header, Length, Options) -> + case Bin of + <> -> + case parse_packet(Header, FrameBin, Options) of + {Variable, Payload} -> + {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; + Variable = #mqtt_packet_connect{proto_ver = Ver} -> + {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; + Variable -> + {ok, packet(Header, Variable), Rest, ?none(Options)} + end; + TooShortBin -> + {more, {{body, #{hdr => Header, len => Length, rest => TooShortBin}}, Options}} + end. + +%% Deprecated parse funcs +%% It should be removed after 4.2.x + -spec(parse(binary()) -> parse_result()). parse(Bin) -> parse(Bin, initial_parse_state()). @@ -443,6 +553,20 @@ serialize_fun(#{version := Ver, max_size := MaxSize}) -> end end. +serialize_opts() -> + ?DEFAULT_OPTIONS. + +serialize_opts(#mqtt_packet_connect{proto_ver = ProtoVer, properties = ConnProps}) -> + MaxSize = get_property('Maximum-Packet-Size', ConnProps, ?MAX_PACKET_SIZE), + #{version => ProtoVer, max_size => MaxSize}. + +serialize_pkt(Packet, #{version := Ver, max_size := MaxSize}) -> + IoData = serialize(Packet, Ver), + case is_too_large(IoData, MaxSize) of + true -> <<>>; + false -> IoData + end. + -spec(serialize(emqx_types:packet()) -> iodata()). serialize(Packet) -> serialize(Packet, ?MQTT_PROTO_V4). diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index 26172f761..fad897d92 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -53,6 +53,8 @@ -type(limiter() :: #limiter{}). +-dialyzer({nowarn_function, [consume/3]}). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 33b882768..ee0e24dbb 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -70,8 +70,8 @@ limit_timer :: maybe(reference()), %% Parse State parse_state :: emqx_frame:parse_state(), - %% Serialize Fun - serialize :: emqx_frame:serialize_fun(), + %% Serialize options + serialize :: emqx_frame:serialize_opts(), %% Channel channel :: emqx_channel:channel(), %% GC State @@ -231,7 +231,7 @@ websocket_init([Req, Opts]) -> MQTTPiggyback = proplists:get_value(mqtt_piggyback, Opts, multiple), FrameOpts = emqx_zone:mqtt_frame_options(Zone), ParseState = emqx_frame:initial_parse_state(FrameOpts), - Serialize = emqx_frame:serialize_fun(), + Serialize = emqx_frame:serialize_opts(), Channel = emqx_channel:init(ConnInfo, Opts), GcState = emqx_zone:init_gc_state(Zone), StatsTimer = emqx_zone:stats_timer(Zone), @@ -292,7 +292,7 @@ websocket_info({cast, Msg}, State) -> handle_info(Msg, State); websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> - Serialize = emqx_frame:serialize_fun(ConnPkt), + Serialize = emqx_frame:serialize_opts(ConnPkt), NState = State#state{serialize = Serialize}, handle_incoming(Packet, cancel_idle_timer(NState)); @@ -544,7 +544,7 @@ handle_outgoing(Packets, State = #state{active_n = ActiveN, mqtt_piggyback = MQT serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case Serialize(Packet) of + case emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 5d4e146db..7ef60bc69 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -26,6 +26,7 @@ all() -> [{group, parse}, + {group, parse2}, {group, connect}, {group, connack}, {group, publish}, @@ -44,6 +45,8 @@ groups() -> [t_parse_cont, t_parse_frame_too_large ]}, + {parse2, [parallel], + [t_parse_cont2]}, {connect, [parallel], [t_serialize_parse_v3_connect, t_serialize_parse_v4_connect, @@ -129,6 +132,16 @@ t_parse_frame_too_large(_) -> ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). +t_parse_cont2(_) -> + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}), + ParseState = emqx_frame:initial_parse_state(), + <> = serialize_to_binary(Packet), + {more, ContParse} = emqx_frame:parse2(<<>>, ParseState), + {more, ContParse1} = emqx_frame:parse2(HdrBin, ContParse), + {more, ContParse2} = emqx_frame:parse2(LenBin, ContParse1), + {more, ContParse3} = emqx_frame:parse2(<<>>, ContParse2), + {ok, Packet, <<>>, _} = emqx_frame:parse2(RestBin, ContParse3). + t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, @@ -509,7 +522,7 @@ parse_serialize(Packet, Opts) when is_map(Opts) -> Ver = maps:get(version, Opts, ?MQTT_PROTO_V4), Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)), ParseState = emqx_frame:initial_parse_state(Opts), - {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState), + {ok, NPacket, <<>>, _} = emqx_frame:parse2(Bin, ParseState), NPacket. serialize_to_binary(Packet) ->