diff --git a/src/emqx_parser.erl b/src/emqx_frame.erl similarity index 51% rename from src/emqx_parser.erl rename to src/emqx_frame.erl index a7c6707f3..7f01ee706 100644 --- a/src/emqx_parser.erl +++ b/src/emqx_frame.erl @@ -14,44 +14,50 @@ %%% limitations under the License. %%%=================================================================== --module(emqx_parser). +-module(emqx_frame). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([initial_state/0, initial_state/1, parse/2]). +-type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE, + version => mqtt_version()}). --type(max_packet_size() :: 1..?MAX_PACKET_SIZE). +-type(parse_state() :: {none, options()} | cont_fun(binary())). --type(option() :: {max_len, max_packet_size()} - | {version, mqtt_version()}). +-type(cont_fun(Bin) :: fun((Bin) -> {ok, mqtt_packet(), binary()} + | {more, cont_fun(Bin)})). --type(state() :: {none, map()} | {more, fun()}). +-export_type([options/0, parse_state/0]). --export_type([option/0, state/0]). +-export([initial_state/0, initial_state/1]). +-export([parse/2]). +-export([serialize/1, serialize/2]). -%% @doc Initialize a parser --spec(initial_state() -> {none, map()}). -initial_state() -> initial_state([]). +-define(DEFAULT_OPTIONS, #{max_packet_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4}). --spec(initial_state([option()]) -> {none, map()}). -initial_state(Options) when is_list(Options) -> - {none, parse_opt(Options, #{max_len => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4})}. +%%-------------------------------------------------------------------- +%% Init parse state +%%-------------------------------------------------------------------- -parse_opt([], Map) -> - Map; -parse_opt([{version, Ver}|Opts], Map) -> - parse_opt(Opts, Map#{version := Ver}); -parse_opt([{max_len, Len}|Opts], Map) -> - parse_opt(Opts, Map#{max_len := Len}); -parse_opt([_|Opts], Map) -> - parse_opt(Opts, Map). +-spec(initial_state() -> {none, options()}). +initial_state() -> + initial_state(#{}). -%% @doc Parse MQTT Packet --spec(parse(binary(), {none, map()} | fun()) - -> {ok, mqtt_packet()} | {error, term()} | {more, fun()}). +-spec(initial_state(options()) -> {none, options()}). +initial_state(Options) when is_map(Options) -> + {none, merge_opts(Options)}. + +merge_opts(Options) -> + maps:merge(?DEFAULT_OPTIONS, Options). + +%%-------------------------------------------------------------------- +%% Parse MQTT Frame +%%-------------------------------------------------------------------- + +-spec(parse(binary(), parse_state()) + -> {ok, mqtt_packet(), binary()} | {more, cont_fun(binary())}). parse(<<>>, {none, Options}) -> {more, fun(Bin) -> parse(Bin, {none, Options}) end}; parse(<>, {none, Options}) -> @@ -59,31 +65,33 @@ parse(<>, {none, Options}) -> dup = bool(Dup), qos = fixqos(Type, QoS), retain = bool(Retain)}, Options); -parse(Bin, Cont) -> Cont(Bin). +parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> + Cont(Bin). parse_remaining_len(<<>>, Header, Options) -> {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(Rest, Header, 1, 0, Options). -parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_len := MaxLen}) - when Length > MaxLen -> - {error, mqtt_frame_too_long}; +parse_remaining_len(_Bin, _Header, _Multiplier, Length, + #{max_packet_size := MaxSize}) + when Length > MaxSize -> + error(mqtt_frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; -%% Optimize: match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... -parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> - parse_frame(Rest, Header, 2, Options); -%% optimize: match PINGREQ... +%% Match PINGREQ. parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 0, Options); +%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK... +parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> + parse_frame(Rest, Header, 2, Options); parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, - Options = #{max_len := MaxLen}) -> + Options = #{max_packet_size:= MaxSize}) -> FrameLen = Value + Len * Multiplier, if - FrameLen > MaxLen -> error(mqtt_frame_too_long); + FrameLen > MaxSize -> error(mqtt_frame_too_large); true -> parse_frame(Rest, Header, FrameLen, Options) end. @@ -105,6 +113,13 @@ parse_frame(Bin, Header, Length, Options) -> end} end. +wrap(Header, Variable, Payload, Rest) -> + {ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}. +wrap(Header, Variable, Rest) -> + {ok, #mqtt_packet{header = Header, variable = Variable}, Rest}. +wrap(Header, Rest) -> + {ok, #mqtt_packet{header = Header}, Rest}. + parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> {ProtoName, Rest} = parse_utf8_string(FrameBin), <> = Rest, @@ -117,7 +132,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> _Reserved : 1, KeepAlive : 16/big, Rest2/binary>> = Rest1, - case protocol_name_approved(ProtoVer, ProtoName) of + case protocol_approved(ProtoVer, ProtoName) of true -> ok; false -> error(protocol_name_unapproved) end, @@ -212,16 +227,6 @@ parse_packet(#mqtt_packet_header{type = ?AUTH}, <>, {Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5), #mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}. -wrap(Header, Variable, Payload, Rest) -> - {ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}. -wrap(Header, Variable, Rest) -> - {ok, #mqtt_packet{header = Header, variable = Variable}, Rest}. -wrap(Header, Rest) -> - {ok, #mqtt_packet{header = Header}, Rest}. - -protocol_name_approved(Ver, Name) -> - lists:member({Ver, Name}, ?PROTOCOL_NAMES). - parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, proto_ver = Ver}, Bin) -> {Props, Rest} = parse_properties(Bin, Ver), @@ -233,6 +238,9 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, parse_will_message(Packet, Bin) -> {Packet, Bin}. +protocol_approved(Ver, Name) -> + lists:member({Ver, Name}, ?PROTOCOL_NAMES). + parse_packet_id(<>) -> {PacketId, Rest}. @@ -333,8 +341,9 @@ parse_topic_filters(unsubscribe, Bin) -> parse_reason_codes(Bin) -> [Code || <> <= Bin]. -parse_utf8_pair(Bin) -> - [{Name, Value} || <> <= Bin]. +parse_utf8_pair(<>) -> + {{Key, Val}, Rest}. parse_utf8_string(Bin, false) -> {undefined, Bin}; @@ -347,10 +356,285 @@ parse_utf8_string(<>) -> parse_binary_data(<>) -> {Data, Rest}. +%%-------------------------------------------------------------------- +%% Serialize MQTT Packet +%%-------------------------------------------------------------------- + +-spec(serialize(mqtt_packet()) -> iodata()). +serialize(Packet) -> + serialize(Packet, ?DEFAULT_OPTIONS). + +-spec(serialize(mqtt_packet(), options()) -> iodata()). +serialize(#mqtt_packet{header = Header, + variable = Variable, + payload = Payload}, Options) when is_map(Options) -> + serialize(Header, serialize_variable(Variable, merge_opts(Options)), + serialize_payload(Payload)). + +serialize(#mqtt_packet_header{type = Type, + dup = Dup, + qos = QoS, + retain = Retain}, VariableBin, PayloadBin) + when ?CONNECT =< Type andalso Type =< ?AUTH -> + Len = iolist_size(VariableBin) + iolist_size(PayloadBin), + true = (Len =< ?MAX_PACKET_SIZE), + [<>, + serialize_remaining_len(Len), VariableBin, PayloadBin]. + +serialize_variable(#mqtt_packet_connect{ + proto_name = ProtoName, + proto_ver = ProtoVer, + is_bridge = IsBridge, + clean_start = CleanStart, + will_flag = WillFlag, + will_qos = WillQos, + will_retain = WillRetain, + keepalive = KeepAlive, + properties = Properties, + client_id = ClientId, + will_props = WillProps, + will_topic = WillTopic, + will_payload = WillPayload, + username = Username, + password = Password}, _Options) -> + [serialize_binary_data(ProtoName), + <<(case IsBridge of + true -> 16#80 + ProtoVer; + false -> ProtoVer + end):8, + (flag(Username)):1, + (flag(Password)):1, + (flag(WillRetain)):1, + WillQos:2, + (flag(WillFlag)):1, + (flag(CleanStart)):1, + 0:1, + KeepAlive:16/big-unsigned-integer>>, + serialize_properties(Properties, ProtoVer), + serialize_utf8_string(ClientId), + case WillFlag of + true -> [serialize_properties(WillProps, ProtoVer), + serialize_utf8_string(WillTopic), + serialize_binary_data(WillPayload)]; + false -> <<>> + end, + serialize_utf8_string(Username, true), + serialize_utf8_string(Password, true)]; + +serialize_variable(#mqtt_packet_connack{ack_flags = AckFlags, + reason_code = ReasonCode, + properties = Properties}, + #{version := Ver}) -> + [AckFlags, ReasonCode, serialize_properties(Properties, Ver)]; + +serialize_variable(#mqtt_packet_publish{topic_name = TopicName, + packet_id = PacketId, + properties = Properties}, + #{version := Ver}) -> + [serialize_utf8_string(TopicName), + if + PacketId =:= undefined -> <<>>; + true -> <> + end, + serialize_properties(Properties, Ver)]; + +serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, + #{version := Ver}) + when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> + <>; +serialize_variable(#mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode, + properties = Properties}, + #{version := ?MQTT_PROTO_V5}) -> + [<>, ReasonCode, + serialize_properties(Properties, ?MQTT_PROTO_V5)]; + +serialize_variable(#mqtt_packet_subscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters}, + #{version := Ver}) -> + [<>, serialize_properties(Properties, Ver), + serialize_topic_filters(subscribe, TopicFilters, Ver)]; + +serialize_variable(#mqtt_packet_suback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes}, + #{version := Ver}) -> + [<>, serialize_properties(Properties, Ver), + serialize_reason_codes(ReasonCodes)]; + +serialize_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, + properties = Properties, + topic_filters = TopicFilters}, + #{version := Ver}) -> + [<>, serialize_properties(Properties, Ver), + serialize_topic_filters(unsubscribe, TopicFilters, Ver)]; + +serialize_variable(#mqtt_packet_unsuback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes}, + #{version := Ver}) -> + [<>, serialize_properties(Properties, Ver), + serialize_reason_codes(ReasonCodes)]; + +serialize_variable(#mqtt_packet_disconnect{}, #{version := Ver}) + when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> + <<>>; + +serialize_variable(#mqtt_packet_disconnect{reason_code = ReasonCode, + properties = Properties}, + #{version := Ver = ?MQTT_PROTO_V5}) -> + [ReasonCode, serialize_properties(Properties, Ver)]; +serialize_variable(#mqtt_packet_disconnect{}, _Ver) -> + <<>>; + +serialize_variable(#mqtt_packet_auth{reason_code = ReasonCode, + properties = Properties}, + #{version := Ver = ?MQTT_PROTO_V5}) -> + [ReasonCode, serialize_properties(Properties, Ver)]; + +serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) -> + <>; +serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) -> + <>; +serialize_variable(undefined, _Ver) -> + <<>>. + +serialize_payload(undefined) -> <<>>; +serialize_payload(Bin) -> Bin. + +serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 -> + <<>>; +serialize_properties(Props, ?MQTT_PROTO_V5) -> + serialize_properties(Props). + +serialize_properties(undefined) -> + <<0>>; +serialize_properties(Props) when map_size(Props) == 0 -> + <<0>>; +serialize_properties(Props) when is_map(Props) -> + Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>, + [serialize_variable_byte_integer(byte_size(Bin)), Bin]. + +serialize_property(_, undefined) -> + <<>>; +serialize_property('Payload-Format-Indicator', Val) -> + <<16#01, Val>>; +serialize_property('Message-Expiry-Interval', Val) -> + <<16#02, Val:32/big>>; +serialize_property('Content-Type', Val) -> + <<16#03, (serialize_utf8_string(Val))/binary>>; +serialize_property('Response-Topic', Val) -> + <<16#08, (serialize_utf8_string(Val))/binary>>; +serialize_property('Correlation-Data', Val) -> + <<16#09, (byte_size(Val)):16, Val/binary>>; +serialize_property('Subscription-Identifier', Val) -> + <<16#0B, (serialize_variable_byte_integer(Val))/binary>>; +serialize_property('Session-Expiry-Interval', Val) -> + <<16#11, Val:32/big>>; +serialize_property('Assigned-Client-Identifier', Val) -> + <<16#12, (serialize_utf8_string(Val))/binary>>; +serialize_property('Server-Keep-Alive', Val) -> + <<16#13, Val:16/big>>; +serialize_property('Authentication-Method', Val) -> + <<16#15, (serialize_utf8_string(Val))/binary>>; +serialize_property('Authentication-Data', Val) -> + <<16#16, (iolist_size(Val)):16, Val/binary>>; +serialize_property('Request-Problem-Information', Val) -> + <<16#17, Val>>; +serialize_property('Will-Delay-Interval', Val) -> + <<16#18, Val:32/big>>; +serialize_property('Request-Response-Information', Val) -> + <<16#19, Val>>; +serialize_property('Response-Information', Val) -> + <<16#1A, (serialize_utf8_string(Val))/binary>>; +serialize_property('Server-Reference', Val) -> + <<16#1C, (serialize_utf8_string(Val))/binary>>; +serialize_property('Reason-String', Val) -> + <<16#1F, (serialize_utf8_string(Val))/binary>>; +serialize_property('Receive-Maximum', Val) -> + <<16#21, Val:16/big>>; +serialize_property('Topic-Alias-Maximum', Val) -> + <<16#22, Val:16/big>>; +serialize_property('Topic-Alias', Val) -> + <<16#23, Val:16/big>>; +serialize_property('Maximum-QoS', Val) -> + <<16#24, Val>>; +serialize_property('Retain-Available', Val) -> + <<16#25, Val>>; +serialize_property('User-Property', {Key, Val}) -> + <<16#26, (serialize_utf8_pair({Key, Val}))/binary>>; +serialize_property('User-Property', Props) when is_list(Props) -> + << <<(serialize_property('User-Property', {Key, Val}))/binary>> + || {Key, Val} <- Props >>; +serialize_property('Maximum-Packet-Size', Val) -> + <<16#27, Val:32/big>>; +serialize_property('Wildcard-Subscription-Available', Val) -> + <<16#28, Val>>; +serialize_property('Subscription-Identifier-Available', Val) -> + <<16#29, Val>>; +serialize_property('Shared-Subscription-Available', Val) -> + <<16#2A, Val>>. + +serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) -> + << <<(serialize_utf8_string(Topic))/binary, (serialize_subopts(SubOpts)) >> + || {Topic, SubOpts} <- TopicFilters >>; + +serialize_topic_filters(subscribe, TopicFilters, _Ver) -> + << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>> + || {Topic, #mqtt_subopts{qos = QoS}} <- TopicFilters >>; + +serialize_topic_filters(unsubscribe, TopicFilters, _Ver) -> + << <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>. + +serialize_subopts(#mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = QoS}) -> + <>. + +serialize_reason_codes(undefined) -> + <<>>; +serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) -> + << <> || Code <- ReasonCodes >>. + +serialize_utf8_pair({Name, Value}) -> + << (serialize_utf8_string(Name))/binary, (serialize_utf8_string(Value))/binary >>. + +serialize_binary_data(Bin) -> + [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin]. + +serialize_utf8_string(undefined, false) -> + error(utf8_string_undefined); +serialize_utf8_string(undefined, true) -> + <<>>; +serialize_utf8_string(String, _AllowNull) -> + serialize_utf8_string(String). + +serialize_utf8_string(String) -> + StringBin = unicode:characters_to_binary(String), + Len = byte_size(StringBin), + true = (Len =< 16#ffff), + <>. + +serialize_remaining_len(I) -> + serialize_variable_byte_integer(I). + +serialize_variable_byte_integer(N) when N =< ?LOWBITS -> + <<0:1, N:7>>; +serialize_variable_byte_integer(N) -> + <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + bool(0) -> false; bool(1) -> true. -%% Fix Issue#575 +flag(undefined) -> ?RESERVED; +flag(false) -> 0; +flag(true) -> 1; +flag(X) when is_integer(X) -> X; +flag(B) when is_binary(B) -> 1. + fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; diff --git a/src/emqx_serializer.erl b/src/emqx_serializer.erl deleted file mode 100644 index 670213674..000000000 --- a/src/emqx_serializer.erl +++ /dev/null @@ -1,294 +0,0 @@ -%%%=================================================================== -%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%%% -%%% Licensed under the Apache License, Version 2.0 (the "License"); -%%% you may not use this file except in compliance with the License. -%%% You may obtain a copy of the License at -%%% -%%% http://www.apache.org/licenses/LICENSE-2.0 -%%% -%%% Unless required by applicable law or agreed to in writing, software -%%% distributed under the License is distributed on an "AS IS" BASIS, -%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%%% See the License for the specific language governing permissions and -%%% limitations under the License. -%%%=================================================================== - --module(emqx_serializer). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --type(option() :: {version, mqtt_version()}). - --export_type([option/0]). - --export([serialize/1, serialize/2]). - --spec(serialize(mqtt_packet()) -> iodata()). -serialize(Packet) -> serialize(Packet, []). - --spec(serialize(mqtt_packet(), [option()]) -> iodata()). -serialize(#mqtt_packet{header = Header, - variable = Variable, - payload = Payload}, Opts) when is_list(Opts) -> - Opts1 = parse_opt(Opts, #{version => ?MQTT_PROTO_V4}), - serialize(Header, serialize_variable(Variable, Opts1), serialize_payload(Payload)). - -parse_opt([], Map) -> - Map; -parse_opt([{version, Ver}|Opts], Map) -> - parse_opt(Opts, Map#{version := Ver}); -parse_opt([_|Opts], Map) -> - parse_opt(Opts, Map). - -serialize(#mqtt_packet_header{type = Type, - dup = Dup, - qos = Qos, - retain = Retain}, VariableData, PayloadData) - when ?CONNECT =< Type andalso Type =< ?AUTH -> - Len = iolist_size(VariableData) + iolist_size(PayloadData), - true = (Len =< ?MAX_PACKET_SIZE), - [<>, - serialize_remaining_len(Len), VariableData, PayloadData]. - -serialize_variable(#mqtt_packet_connect{proto_name = ProtoName, - proto_ver = ProtoVer, - is_bridge = IsBridge, - clean_start = CleanStart, - will_flag = WillFlag, - will_qos = WillQos, - will_retain = WillRetain, - keepalive = KeepAlive, - properties = Properties, - client_id = ClientId, - will_props = WillProps, - will_topic = WillTopic, - will_payload = WillPayload, - username = Username, - password = Password}, _Opts) -> - [serialize_binary_data(ProtoName), - <<(case IsBridge of - true -> 16#80 + ProtoVer; - false -> ProtoVer - end):8, - (opt(Username)):1, - (opt(Password)):1, - (opt(WillRetain)):1, - WillQos:2, - (opt(WillFlag)):1, - (opt(CleanStart)):1, - 0:1, - KeepAlive:16/big-unsigned-integer>>, - serialize_properties(Properties, ProtoVer), - serialize_utf8_string(ClientId), - case WillFlag of - true -> [serialize_properties(WillProps, ProtoVer), - serialize_utf8_string(WillTopic), - serialize_binary_data(WillPayload)]; - false -> <<>> - end, - serialize_utf8_string(Username, true), - serialize_utf8_string(Password, true)]; - -serialize_variable(#mqtt_packet_connack{ack_flags = AckFlags, - reason_code = ReasonCode, - properties = Properties}, #{version := Ver}) -> - [AckFlags, ReasonCode, serialize_properties(Properties, Ver)]; - -serialize_variable(#mqtt_packet_publish{topic_name = TopicName, - packet_id = PacketId, - properties = Properties}, #{version := Ver}) -> - [serialize_utf8_string(TopicName), - if - PacketId =:= undefined -> <<>>; - true -> <> - end, - serialize_properties(Properties, Ver)]; - -serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, #{version := Ver}) - when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> - <>; -serialize_variable(#mqtt_packet_puback{packet_id = PacketId, - reason_code = ReasonCode, - properties = Properties}, - #{version := ?MQTT_PROTO_V5}) -> - [<>, ReasonCode, - serialize_properties(Properties, ?MQTT_PROTO_V5)]; - -serialize_variable(#mqtt_packet_subscribe{packet_id = PacketId, - properties = Properties, - topic_filters = TopicFilters}, - #{version := Ver}) -> - [<>, serialize_properties(Properties, Ver), - serialize_topic_filters(subscribe, TopicFilters, Ver)]; - -serialize_variable(#mqtt_packet_suback{packet_id = PacketId, - properties = Properties, - reason_codes = ReasonCodes}, - #{version := Ver}) -> - [<>, serialize_properties(Properties, Ver), - << <> || Code <- ReasonCodes >>]; - -serialize_variable(#mqtt_packet_unsubscribe{packet_id = PacketId, - properties = Properties, - topic_filters = TopicFilters}, - #{version := Ver}) -> - [<>, serialize_properties(Properties, Ver), - serialize_topic_filters(unsubscribe, TopicFilters, Ver)]; - -serialize_variable(#mqtt_packet_unsuback{packet_id = PacketId, - properties = Properties, - reason_codes = ReasonCodes}, - #{version := Ver}) -> - [<>, serialize_properties(Properties, Ver), - << <> || Code <- ReasonCodes >>]; - -serialize_variable(#mqtt_packet_disconnect{}, #{version := Ver}) - when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 -> - <<>>; - -serialize_variable(#mqtt_packet_disconnect{reason_code = ReasonCode, - properties = Properties}, - #{version := Ver = ?MQTT_PROTO_V5}) -> - [ReasonCode, serialize_properties(Properties, Ver)]; -serialize_variable(#mqtt_packet_disconnect{}, _Ver) -> - <<>>; - -serialize_variable(#mqtt_packet_auth{reason_code = ReasonCode, - properties = Properties}, - #{version := Ver = ?MQTT_PROTO_V5}) -> - [ReasonCode, serialize_properties(Properties, Ver)]; - -serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) -> - <>; -serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) -> - <>; -serialize_variable(undefined, _Ver) -> - <<>>. - -serialize_payload(undefined) -> - <<>>; -serialize_payload(Bin) when is_binary(Bin); is_list(Bin) -> - Bin. - -serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 -> - <<>>; -serialize_properties(Props, ?MQTT_PROTO_V5) -> - serialize_properties(Props). - -serialize_properties(undefined) -> - <<0>>; -serialize_properties(Props) when map_size(Props) == 0 -> - <<0>>; -serialize_properties(Props) when is_map(Props) -> - Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>, - [serialize_variable_byte_integer(byte_size(Bin)), Bin]. - -%% Ignore undefined -serialize_property(_, undefined) -> - <<>>; -serialize_property('Payload-Format-Indicator', Val) -> - <<16#01, Val>>; -serialize_property('Message-Expiry-Interval', Val) -> - <<16#02, Val:32/big>>; -serialize_property('Content-Type', Val) -> - <<16#03, (serialize_utf8_string(Val))/binary>>; -serialize_property('Response-Topic', Val) -> - <<16#08, (serialize_utf8_string(Val))/binary>>; -serialize_property('Correlation-Data', Val) -> - <<16#09, (byte_size(Val)):16, Val/binary>>; -serialize_property('Subscription-Identifier', Val) -> - <<16#0B, (serialize_variable_byte_integer(Val))/binary>>; -serialize_property('Session-Expiry-Interval', Val) -> - <<16#11, Val:32/big>>; -serialize_property('Assigned-Client-Identifier', Val) -> - <<16#12, (serialize_utf8_string(Val))/binary>>; -serialize_property('Server-Keep-Alive', Val) -> - <<16#13, Val:16/big>>; -serialize_property('Authentication-Method', Val) -> - <<16#15, (serialize_utf8_string(Val))/binary>>; -serialize_property('Authentication-Data', Val) -> - <<16#16, (iolist_size(Val)):16, Val/binary>>; -serialize_property('Request-Problem-Information', Val) -> - <<16#17, Val>>; -serialize_property('Will-Delay-Interval', Val) -> - <<16#18, Val:32/big>>; -serialize_property('Request-Response-Information', Val) -> - <<16#19, Val>>; -serialize_property('Response-Information', Val) -> - <<16#1A, (serialize_utf8_string(Val))/binary>>; -serialize_property('Server-Reference', Val) -> - <<16#1C, (serialize_utf8_string(Val))/binary>>; -serialize_property('Reason-String', Val) -> - <<16#1F, (serialize_utf8_string(Val))/binary>>; -serialize_property('Receive-Maximum', Val) -> - <<16#21, Val:16/big>>; -serialize_property('Topic-Alias-Maximum', Val) -> - <<16#22, Val:16/big>>; -serialize_property('Topic-Alias', Val) -> - <<16#23, Val:16/big>>; -serialize_property('Maximum-QoS', Val) -> - <<16#24, Val>>; -serialize_property('Retain-Available', Val) -> - <<16#25, Val>>; -serialize_property('User-Property', {Key, Val}) -> - <<16#26, (serialize_utf8_pair({Key, Val}))/binary>>; -serialize_property('User-Property', Props) when is_list(Props) -> - << <<(serialize_property('User-Property', {Key, Val}))/binary>> - || {Key, Val} <- Props >>; -serialize_property('Maximum-Packet-Size', Val) -> - <<16#27, Val:32/big>>; -serialize_property('Wildcard-Subscription-Available', Val) -> - <<16#28, Val>>; -serialize_property('Subscription-Identifier-Available', Val) -> - <<16#29, Val>>; -serialize_property('Shared-Subscription-Available', Val) -> - <<16#2A, Val>>. - -serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) -> - << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:2, Rh:2, (opt(Rap)):1, (opt(Nl)):1, Qos:2>> - || {Topic, #mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = Qos}} <- TopicFilters >>; - -serialize_topic_filters(subscribe, TopicFilters, _Ver) -> - << <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, Qos:2>> - || {Topic, #mqtt_subopts{qos = Qos}} <- TopicFilters >>; - -serialize_topic_filters(unsubscribe, TopicFilters, _Ver) -> - << <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>. - -serialize_utf8_pair({Name, Value}) -> - << <<(serialize_utf8_string(S))/binary, - (serialize_utf8_string(S))/binary>> || S <- [Name, Value] >>. - -serialize_binary_data(Bin) -> - [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin]. - -serialize_utf8_string(undefined, false) -> - error(utf8_string_undefined); -serialize_utf8_string(undefined, true) -> - <<>>; -serialize_utf8_string(String, _AllowNull) -> - serialize_utf8_string(String). - -serialize_utf8_string(String) -> - StringBin = unicode:characters_to_binary(String), - Len = byte_size(StringBin), - true = (Len =< 16#ffff), - <>. - -serialize_remaining_len(I) -> - serialize_variable_byte_integer(I). - -serialize_variable_byte_integer(N) when N =< ?LOWBITS -> - <<0:1, N:7>>; -serialize_variable_byte_integer(N) -> - <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. - -opt(undefined) -> ?RESERVED; -opt(false) -> 0; -opt(true) -> 1; -opt(X) when is_integer(X) -> X; -opt(B) when is_binary(B) -> 1. - diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl new file mode 100644 index 000000000..c177fda8e --- /dev/null +++ b/test/emqx_frame_SUITE.erl @@ -0,0 +1,430 @@ +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== + +-module(emqx_frame_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include("emqx_mqtt.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-import(emqx_frame, [serialize/1, serialize/2]). + +all() -> + [{group, connect}, + {group, connack}, + {group, publish}, + {group, puback}, + {group, subscribe}, + {group, suback}, + {group, unsubscribe}, + {group, unsuback}, + {group, ping}, + {group, disconnect}, + {group, auth}]. + +groups() -> + [{connect, [parallel], + [serialize_parse_connect, + serialize_parse_v3_connect, + serialize_parse_v4_connect, + serialize_parse_v5_connect, + serialize_parse_connect_without_clientid, + serialize_parse_connect_with_will, + serialize_parse_bridge_connect]}, + {connack, [parallel], + [serialize_parse_connack, + serialize_parse_connack_v5]}, + {publish, [parallel], + [serialize_parse_qos0_publish, + serialize_parse_qos1_publish, + serialize_parse_qos2_publish, + serialize_parse_publish_v5]}, + {puback, [parallel], + [serialize_parse_puback, + serialize_parse_puback_v5, + serialize_parse_pubrec, + serialize_parse_pubrec_v5, + serialize_parse_pubrel, + serialize_parse_pubrel_v5, + serialize_parse_pubcomp, + serialize_parse_pubcomp_v5]}, + {subscribe, [parallel], + [serialize_parse_subscribe, + serialize_parse_subscribe_v5]}, + {suback, [parallel], + [serialize_parse_suback, + serialize_parse_suback_v5]}, + {unsubscribe, [parallel], + [serialize_parse_unsubscribe, + serialize_parse_unsubscribe_v5]}, + {unsuback, [parallel], + [serialize_parse_unsuback, + serialize_parse_unsuback_v5]}, + {ping, [parallel], + [serialize_parse_pingreq, + serialize_parse_pingresp]}, + {disconnect, [parallel], + [serialize_parse_disconnect, + serialize_parse_disconnect_v5]}, + {auth, [parallel], + [serialize_parse_auth_v5]}]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +serialize_parse_connect(_) -> + Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{}), + ?assertEqual({ok, Packet1, <<>>}, parse_serialize(Packet1)), + Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{ + client_id = <<"clientId">>, + will_qos = ?QOS1, + will_flag = true, + will_retain = true, + will_topic = <<"will">>, + will_payload = <<"bye">>, + clean_start = true}), + ?assertEqual({ok, Packet2, <<>>}, parse_serialize(Packet2)). + +serialize_parse_v3_connect(_) -> + Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, + 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, + 111,99,97>>, + Packet = ?CONNECT_PACKET( + #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, + proto_name = <<"MQIsdp">>, + client_id = <<"mosqpub/10451-iMac.loca">>, + clean_start = true, + keepalive = 60}), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_v4_connect(_) -> + Bin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117, + 98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = 4, + proto_name = <<"MQTT">>, + client_id = <<"mosqpub/10451-iMac.loca">>, + clean_start = true, + keepalive = 60}), + ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_v5_connect(_) -> + Props = #{'Session-Expiry-Interval' => 60, + 'Receive-Maximum' => 100, + 'Maximum-QoS' => ?QOS_2, + 'Retain-Available' => 1, + 'Maximum-Packet-Size' => 1024, + 'Topic-Alias-Maximum' => 10, + 'Request-Response-Information' => 1, + 'Request-Problem-Information' => 1, + 'Authentication-Method' => <<"oauth2">>, + 'Authentication-Data' => <<"33kx93k">>}, + + WillProps = #{'Will-Delay-Interval' => 60, + 'Payload-Format-Indicator' => 1, + 'Message-Expiry-Interval' => 60, + 'Content-Type' => <<"text/json">>, + 'Response-Topic' => <<"topic">>, + 'Correlation-Data' => <<"correlateid">>, + 'User-Property' => [{<<"k">>, <<"v">>}]}, + Packet = ?CONNECT_PACKET( + #mqtt_packet_connect{proto_name = <<"MQTT">>, + proto_ver = ?MQTT_PROTO_V5, + is_bridge = false, + clean_start = true, + client_id = <<>>, + will_flag = true, + will_qos = ?QOS_1, + will_retain = false, + keepalive = 60, + properties = Props, + will_props = WillProps, + will_topic = <<"topic">>, + will_payload = <<>>, + username = <<"device:1">>, + password = <<"passwd">>}), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_connect_without_clientid(_) -> + Bin = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = 4, + proto_name = <<"MQTT">>, + client_id = <<>>, + clean_start = true, + keepalive = 60}), + ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_connect_with_will(_) -> + Bin = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112, + 117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119, + 105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6, + 112,117,98,108,105,99>>, + Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, + variable = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, + proto_name = <<"MQIsdp">>, + client_id = <<"mosqpub/10452-iMac.loca">>, + clean_start = true, + keepalive = 60, + will_retain = false, + will_qos = ?QOS_1, + will_flag = true, + will_topic = <<"/will">>, + will_payload = <<"willmsg">>, + username = <<"test">>, + password = <<"public">>}}, + ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_bridge_connect(_) -> + Bin = <<16,86,0,6,77,81,73,115,100,112,131,44,0,60,0,19,67,95,48,48,58,48,67, + 58,50,57,58,50,66,58,55,55,58,53,50,0,48,36,83,89,83,47,98,114,111,107, + 101,114,47,99,111,110,110,101,99,116,105,111,110,47,67,95,48,48,58,48, + 67,58,50,57,58,50,66,58,55,55,58,53,50,47,115,116,97,116,101,0,1,48>>, + Topic = <<"$SYS/broker/connection/C_00:0C:29:2B:77:52/state">>, + Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, + variable = #mqtt_packet_connect{client_id = <<"C_00:0C:29:2B:77:52">>, + proto_ver = 16#03, + proto_name = <<"MQIsdp">>, + is_bridge = true, + will_retain = true, + will_qos = ?QOS_1, + will_flag = true, + clean_start = false, + keepalive = 60, + will_topic = Topic, + will_payload = <<"0">>}}, + ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_connack(_) -> + Packet = ?CONNACK_PACKET(?RC_SUCCESS), + ?assertEqual(<<32,2,0,0>>, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_connack_v5(_) -> + Props = #{'Session-Expiry-Interval' => 60, + 'Receive-Maximum' => 100, + 'Maximum-QoS' => ?QOS_2, + 'Retain-Available' => 1, + 'Maximum-Packet-Size' => 1024, + 'Assigned-Client-Identifier' => <<"id">>, + 'Topic-Alias-Maximum' => 10, + 'Reason-String' => <<>>, + 'Wildcard-Subscription-Available' => 1, + 'Subscription-Identifier-Available' => 1, + 'Shared-Subscription-Available' => 1, + 'Server-Keep-Alive' => 60, + 'Response-Information' => <<"response">>, + 'Server-Reference' => <<"192.168.1.10">>, + 'Authentication-Method' => <<"oauth2">>, + 'Authentication-Data' => <<"33kx93k">>}, + Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_qos0_publish(_) -> + Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>, + Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = false, + qos = ?QOS_0, + retain = false}, + variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>, + packet_id = undefined}, + payload = <<"hello">>}, + ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_qos1_publish(_) -> + Bin = <<50,13,0,5,97,47,98,47,99,0,1,104,97,104,97>>, + Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = false, + qos = ?QOS_1, + retain = false}, + variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>, + packet_id = 1}, + payload = <<"haha">>}, + ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_qos2_publish(_) -> + Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, payload()), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_publish_v5(_) -> + Props = #{'Payload-Format-Indicator' => 1, + 'Message-Expiry-Interval' => 60, + 'Topic-Alias' => 16#AB, + 'Response-Topic' => <<"reply">>, + 'Correlation-Data' => <<"correlation-id">>, + 'Subscription-Identifier' => 1, + 'Content-Type' => <<"text/json">>}, + Packet = ?PUBLISH_PACKET(#mqtt_packet_header{type = ?PUBLISH}, + <<"$share/group/topic">>, 1, Props, + <<"payload">>), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_puback(_) -> + Packet = ?PUBACK_PACKET(1), + ?assertEqual(<<64,2,0,1>>, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_puback_v5(_) -> + Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_pubrec(_) -> + Packet = ?PUBREC_PACKET(1), + ?assertEqual(<<5:4,0:4,2,0,1>>, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_pubrec_v5(_) -> + Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_pubrel(_) -> + Packet = ?PUBREL_PACKET(1), + ?assertEqual(<<6:4,2:4,2,0,1>>, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_pubrel_v5(_) -> + Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_pubcomp(_) -> + Packet = ?PUBCOMP_PACKET(1), + ?assertEqual(<<7:4,0:4,2,0,1>>, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_pubcomp_v5(_) -> + Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_subscribe(_) -> + %% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}]) + Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>, + TopicFilters = [{<<"TopicA">>, #mqtt_subopts{qos = 2}}], + Packet = ?SUBSCRIBE_PACKET(2, TopicFilters), + ?assertEqual(Bin, serialize(Packet)), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_subscribe_v5(_) -> + TopicFilters = [{<<"TopicQos0">>, #mqtt_subopts{rh = 1, qos = ?QOS_0}}, + {<<"TopicQos1">>, #mqtt_subopts{rh = 1, qos =?QOS_1}}], + Packet = ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 16#FFFFFFF}, + TopicFilters), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_suback(_) -> + Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_suback_v5(_) -> + Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>, + 'User-Property' => [{<<"key">>, <<"value">>}]}, + [?QOS_0, ?QOS_1, 128]), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + + +serialize_parse_unsubscribe(_) -> + %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) + Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]), + Bin = <<162,10,0,2,0,6,84,111,112,105,99,65>>, + ?assertEqual(Bin, iolist_to_binary(serialize(Packet))), + ?assertEqual({ok, Packet, <<>>}, parse(Bin)). + +serialize_parse_unsubscribe_v5(_) -> + Props = #{'User-Property' => [{<<"key">>, <<"val">>}]}, + Packet = ?UNSUBSCRIBE_PACKET(10, Props, [<<"Topic1">>, <<"Topic2">>]), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_unsuback(_) -> + Packet = ?UNSUBACK_PACKET(10), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_unsuback_v5(_) -> + Packet = ?UNSUBACK_PACKET(10, #{'Reason-String' => <<"Not authorized">>, + 'User-Property' => [{<<"key">>, <<"val">>}]}, + [16#87, 16#87, 16#87]), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_pingreq(_) -> + PingReq = ?PACKET(?PINGREQ), + ?assertEqual({ok, PingReq, <<>>}, parse_serialize(PingReq)). + +serialize_parse_pingresp(_) -> + PingResp = ?PACKET(?PINGRESP), + ?assertEqual({ok, PingResp, <<>>}, parse_serialize(PingResp)). + +parse_disconnect(_) -> + ?assertEqual({ok, ?DISCONNECT_PACKET(?RC_SUCCESS), <<>>}, parse(<<224, 0>>)). + +serialize_parse_disconnect(_) -> + Packet = ?PACKET(?DISCONNECT), + ?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)). + +serialize_parse_disconnect_v5(_) -> + Packet = ?DISCONNECT_PACKET(?RC_SUCCESS, + #{'Session-Expiry-Interval' => 60, + 'Reason-String' => <<"server_moved">>, + 'Server-Reference' => <<"192.168.1.10">>}), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +serialize_parse_auth_v5(_) -> + Packet = ?AUTH_PACKET(?RC_SUCCESS, + #{'Authentication-Method' => <<"oauth2">>, + 'Authentication-Data' => <<"3zekkd">>, + 'Reason-String' => <<"success">>, + 'User-Property' => [{<<"key">>, <<"val">>}]}), + ?assertEqual({ok, Packet, <<>>}, + parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + +parse_serialize(Packet) -> + parse(iolist_to_binary(serialize(Packet))). + +parse_serialize(Packet, Opts) when is_map(Opts) -> + parse(iolist_to_binary(serialize(Packet, Opts)), Opts). + +parse(Bin) -> + parse(Bin, #{}). + +parse(Bin, Opts) when is_map(Opts) -> + emqx_frame:parse(Bin, emqx_frame:initial_state(Opts)). + +payload() -> + iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]). +