Add test cases for MQTT 5.0 frame
This commit is contained in:
parent
31bc091873
commit
c11e8f453b
|
@ -14,44 +14,50 @@
|
||||||
%%% limitations under the License.
|
%%% limitations under the License.
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
-module(emqx_parser).
|
-module(emqx_frame).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
-export([initial_state/0, initial_state/1, parse/2]).
|
-type(options() :: #{max_packet_size => 1..?MAX_PACKET_SIZE,
|
||||||
|
version => mqtt_version()}).
|
||||||
|
|
||||||
-type(max_packet_size() :: 1..?MAX_PACKET_SIZE).
|
-type(parse_state() :: {none, options()} | cont_fun(binary())).
|
||||||
|
|
||||||
-type(option() :: {max_len, max_packet_size()}
|
-type(cont_fun(Bin) :: fun((Bin) -> {ok, mqtt_packet(), binary()}
|
||||||
| {version, mqtt_version()}).
|
| {more, cont_fun(Bin)})).
|
||||||
|
|
||||||
-type(state() :: {none, map()} | {more, fun()}).
|
-export_type([options/0, parse_state/0]).
|
||||||
|
|
||||||
-export_type([option/0, state/0]).
|
-export([initial_state/0, initial_state/1]).
|
||||||
|
-export([parse/2]).
|
||||||
|
-export([serialize/1, serialize/2]).
|
||||||
|
|
||||||
%% @doc Initialize a parser
|
-define(DEFAULT_OPTIONS, #{max_packet_size => ?MAX_PACKET_SIZE,
|
||||||
-spec(initial_state() -> {none, map()}).
|
version => ?MQTT_PROTO_V4}).
|
||||||
initial_state() -> initial_state([]).
|
|
||||||
|
|
||||||
-spec(initial_state([option()]) -> {none, map()}).
|
%%--------------------------------------------------------------------
|
||||||
initial_state(Options) when is_list(Options) ->
|
%% Init parse state
|
||||||
{none, parse_opt(Options, #{max_len => ?MAX_PACKET_SIZE,
|
%%--------------------------------------------------------------------
|
||||||
version => ?MQTT_PROTO_V4})}.
|
|
||||||
|
|
||||||
parse_opt([], Map) ->
|
-spec(initial_state() -> {none, options()}).
|
||||||
Map;
|
initial_state() ->
|
||||||
parse_opt([{version, Ver}|Opts], Map) ->
|
initial_state(#{}).
|
||||||
parse_opt(Opts, Map#{version := Ver});
|
|
||||||
parse_opt([{max_len, Len}|Opts], Map) ->
|
|
||||||
parse_opt(Opts, Map#{max_len := Len});
|
|
||||||
parse_opt([_|Opts], Map) ->
|
|
||||||
parse_opt(Opts, Map).
|
|
||||||
|
|
||||||
%% @doc Parse MQTT Packet
|
-spec(initial_state(options()) -> {none, options()}).
|
||||||
-spec(parse(binary(), {none, map()} | fun())
|
initial_state(Options) when is_map(Options) ->
|
||||||
-> {ok, mqtt_packet()} | {error, term()} | {more, fun()}).
|
{none, merge_opts(Options)}.
|
||||||
|
|
||||||
|
merge_opts(Options) ->
|
||||||
|
maps:merge(?DEFAULT_OPTIONS, Options).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Parse MQTT Frame
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(parse(binary(), parse_state())
|
||||||
|
-> {ok, mqtt_packet(), binary()} | {more, cont_fun(binary())}).
|
||||||
parse(<<>>, {none, Options}) ->
|
parse(<<>>, {none, Options}) ->
|
||||||
{more, fun(Bin) -> parse(Bin, {none, Options}) end};
|
{more, fun(Bin) -> parse(Bin, {none, Options}) end};
|
||||||
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Options}) ->
|
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Options}) ->
|
||||||
|
@ -59,31 +65,33 @@ parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Options}) ->
|
||||||
dup = bool(Dup),
|
dup = bool(Dup),
|
||||||
qos = fixqos(Type, QoS),
|
qos = fixqos(Type, QoS),
|
||||||
retain = bool(Retain)}, Options);
|
retain = bool(Retain)}, Options);
|
||||||
parse(Bin, Cont) -> Cont(Bin).
|
parse(Bin, Cont) when is_binary(Bin), is_function(Cont) ->
|
||||||
|
Cont(Bin).
|
||||||
|
|
||||||
parse_remaining_len(<<>>, Header, Options) ->
|
parse_remaining_len(<<>>, Header, Options) ->
|
||||||
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end};
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end};
|
||||||
parse_remaining_len(Rest, Header, Options) ->
|
parse_remaining_len(Rest, Header, Options) ->
|
||||||
parse_remaining_len(Rest, Header, 1, 0, Options).
|
parse_remaining_len(Rest, Header, 1, 0, Options).
|
||||||
|
|
||||||
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_len := MaxLen})
|
parse_remaining_len(_Bin, _Header, _Multiplier, Length,
|
||||||
when Length > MaxLen ->
|
#{max_packet_size := MaxSize})
|
||||||
{error, mqtt_frame_too_long};
|
when Length > MaxSize ->
|
||||||
|
error(mqtt_frame_too_large);
|
||||||
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
||||||
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
|
||||||
%% Optimize: match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
|
%% Match PINGREQ.
|
||||||
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
|
||||||
parse_frame(Rest, Header, 2, Options);
|
|
||||||
%% optimize: match PINGREQ...
|
|
||||||
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
|
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
parse_frame(Rest, Header, 0, Options);
|
parse_frame(Rest, Header, 0, Options);
|
||||||
|
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
|
||||||
|
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
|
parse_frame(Rest, Header, 2, Options);
|
||||||
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
||||||
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
||||||
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
|
||||||
Options = #{max_len := MaxLen}) ->
|
Options = #{max_packet_size:= MaxSize}) ->
|
||||||
FrameLen = Value + Len * Multiplier,
|
FrameLen = Value + Len * Multiplier,
|
||||||
if
|
if
|
||||||
FrameLen > MaxLen -> error(mqtt_frame_too_long);
|
FrameLen > MaxSize -> error(mqtt_frame_too_large);
|
||||||
true -> parse_frame(Rest, Header, FrameLen, Options)
|
true -> parse_frame(Rest, Header, FrameLen, Options)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -105,6 +113,13 @@ parse_frame(Bin, Header, Length, Options) ->
|
||||||
end}
|
end}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
wrap(Header, Variable, Payload, Rest) ->
|
||||||
|
{ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}.
|
||||||
|
wrap(Header, Variable, Rest) ->
|
||||||
|
{ok, #mqtt_packet{header = Header, variable = Variable}, Rest}.
|
||||||
|
wrap(Header, Rest) ->
|
||||||
|
{ok, #mqtt_packet{header = Header}, Rest}.
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
|
parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
|
||||||
{ProtoName, Rest} = parse_utf8_string(FrameBin),
|
{ProtoName, Rest} = parse_utf8_string(FrameBin),
|
||||||
<<BridgeTag:4, ProtoVer:4, Rest1/binary>> = Rest,
|
<<BridgeTag:4, ProtoVer:4, Rest1/binary>> = Rest,
|
||||||
|
@ -117,7 +132,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
|
||||||
_Reserved : 1,
|
_Reserved : 1,
|
||||||
KeepAlive : 16/big,
|
KeepAlive : 16/big,
|
||||||
Rest2/binary>> = Rest1,
|
Rest2/binary>> = Rest1,
|
||||||
case protocol_name_approved(ProtoVer, ProtoName) of
|
case protocol_approved(ProtoVer, ProtoName) of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> error(protocol_name_unapproved)
|
false -> error(protocol_name_unapproved)
|
||||||
end,
|
end,
|
||||||
|
@ -212,16 +227,6 @@ parse_packet(#mqtt_packet_header{type = ?AUTH}, <<ReasonCode, Rest/binary>>,
|
||||||
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5),
|
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5),
|
||||||
#mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}.
|
#mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}.
|
||||||
|
|
||||||
wrap(Header, Variable, Payload, Rest) ->
|
|
||||||
{ok, #mqtt_packet{header = Header, variable = Variable, payload = Payload}, Rest}.
|
|
||||||
wrap(Header, Variable, Rest) ->
|
|
||||||
{ok, #mqtt_packet{header = Header, variable = Variable}, Rest}.
|
|
||||||
wrap(Header, Rest) ->
|
|
||||||
{ok, #mqtt_packet{header = Header}, Rest}.
|
|
||||||
|
|
||||||
protocol_name_approved(Ver, Name) ->
|
|
||||||
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
|
||||||
|
|
||||||
parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
|
parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
|
||||||
proto_ver = Ver}, Bin) ->
|
proto_ver = Ver}, Bin) ->
|
||||||
{Props, Rest} = parse_properties(Bin, Ver),
|
{Props, Rest} = parse_properties(Bin, Ver),
|
||||||
|
@ -233,6 +238,9 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
|
||||||
parse_will_message(Packet, Bin) ->
|
parse_will_message(Packet, Bin) ->
|
||||||
{Packet, Bin}.
|
{Packet, Bin}.
|
||||||
|
|
||||||
|
protocol_approved(Ver, Name) ->
|
||||||
|
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
||||||
|
|
||||||
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
|
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
|
||||||
{PacketId, Rest}.
|
{PacketId, Rest}.
|
||||||
|
|
||||||
|
@ -333,8 +341,9 @@ parse_topic_filters(unsubscribe, Bin) ->
|
||||||
parse_reason_codes(Bin) ->
|
parse_reason_codes(Bin) ->
|
||||||
[Code || <<Code>> <= Bin].
|
[Code || <<Code>> <= Bin].
|
||||||
|
|
||||||
parse_utf8_pair(Bin) ->
|
parse_utf8_pair(<<Len1:16/big, Key:Len1/binary,
|
||||||
[{Name, Value} || <<Len:16/big, Name:Len/binary, Len2:16/big, Value:Len2/binary>> <= Bin].
|
Len2:16/big, Val:Len2/binary, Rest/binary>>) ->
|
||||||
|
{{Key, Val}, Rest}.
|
||||||
|
|
||||||
parse_utf8_string(Bin, false) ->
|
parse_utf8_string(Bin, false) ->
|
||||||
{undefined, Bin};
|
{undefined, Bin};
|
||||||
|
@ -347,10 +356,285 @@ parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
|
||||||
parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
|
parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
|
||||||
{Data, Rest}.
|
{Data, Rest}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Serialize MQTT Packet
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(serialize(mqtt_packet()) -> iodata()).
|
||||||
|
serialize(Packet) ->
|
||||||
|
serialize(Packet, ?DEFAULT_OPTIONS).
|
||||||
|
|
||||||
|
-spec(serialize(mqtt_packet(), options()) -> iodata()).
|
||||||
|
serialize(#mqtt_packet{header = Header,
|
||||||
|
variable = Variable,
|
||||||
|
payload = Payload}, Options) when is_map(Options) ->
|
||||||
|
serialize(Header, serialize_variable(Variable, merge_opts(Options)),
|
||||||
|
serialize_payload(Payload)).
|
||||||
|
|
||||||
|
serialize(#mqtt_packet_header{type = Type,
|
||||||
|
dup = Dup,
|
||||||
|
qos = QoS,
|
||||||
|
retain = Retain}, VariableBin, PayloadBin)
|
||||||
|
when ?CONNECT =< Type andalso Type =< ?AUTH ->
|
||||||
|
Len = iolist_size(VariableBin) + iolist_size(PayloadBin),
|
||||||
|
true = (Len =< ?MAX_PACKET_SIZE),
|
||||||
|
[<<Type:4, (flag(Dup)):1, (flag(QoS)):2, (flag(Retain)):1>>,
|
||||||
|
serialize_remaining_len(Len), VariableBin, PayloadBin].
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_connect{
|
||||||
|
proto_name = ProtoName,
|
||||||
|
proto_ver = ProtoVer,
|
||||||
|
is_bridge = IsBridge,
|
||||||
|
clean_start = CleanStart,
|
||||||
|
will_flag = WillFlag,
|
||||||
|
will_qos = WillQos,
|
||||||
|
will_retain = WillRetain,
|
||||||
|
keepalive = KeepAlive,
|
||||||
|
properties = Properties,
|
||||||
|
client_id = ClientId,
|
||||||
|
will_props = WillProps,
|
||||||
|
will_topic = WillTopic,
|
||||||
|
will_payload = WillPayload,
|
||||||
|
username = Username,
|
||||||
|
password = Password}, _Options) ->
|
||||||
|
[serialize_binary_data(ProtoName),
|
||||||
|
<<(case IsBridge of
|
||||||
|
true -> 16#80 + ProtoVer;
|
||||||
|
false -> ProtoVer
|
||||||
|
end):8,
|
||||||
|
(flag(Username)):1,
|
||||||
|
(flag(Password)):1,
|
||||||
|
(flag(WillRetain)):1,
|
||||||
|
WillQos:2,
|
||||||
|
(flag(WillFlag)):1,
|
||||||
|
(flag(CleanStart)):1,
|
||||||
|
0:1,
|
||||||
|
KeepAlive:16/big-unsigned-integer>>,
|
||||||
|
serialize_properties(Properties, ProtoVer),
|
||||||
|
serialize_utf8_string(ClientId),
|
||||||
|
case WillFlag of
|
||||||
|
true -> [serialize_properties(WillProps, ProtoVer),
|
||||||
|
serialize_utf8_string(WillTopic),
|
||||||
|
serialize_binary_data(WillPayload)];
|
||||||
|
false -> <<>>
|
||||||
|
end,
|
||||||
|
serialize_utf8_string(Username, true),
|
||||||
|
serialize_utf8_string(Password, true)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_connack{ack_flags = AckFlags,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties},
|
||||||
|
#{version := Ver}) ->
|
||||||
|
[AckFlags, ReasonCode, serialize_properties(Properties, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_publish{topic_name = TopicName,
|
||||||
|
packet_id = PacketId,
|
||||||
|
properties = Properties},
|
||||||
|
#{version := Ver}) ->
|
||||||
|
[serialize_utf8_string(TopicName),
|
||||||
|
if
|
||||||
|
PacketId =:= undefined -> <<>>;
|
||||||
|
true -> <<PacketId:16/big-unsigned-integer>>
|
||||||
|
end,
|
||||||
|
serialize_properties(Properties, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_puback{packet_id = PacketId},
|
||||||
|
#{version := Ver})
|
||||||
|
when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 ->
|
||||||
|
<<PacketId:16/big-unsigned-integer>>;
|
||||||
|
serialize_variable(#mqtt_packet_puback{packet_id = PacketId,
|
||||||
|
reason_code = ReasonCode,
|
||||||
|
properties = Properties},
|
||||||
|
#{version := ?MQTT_PROTO_V5}) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, ReasonCode,
|
||||||
|
serialize_properties(Properties, ?MQTT_PROTO_V5)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
topic_filters = TopicFilters},
|
||||||
|
#{version := Ver}) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
||||||
|
serialize_topic_filters(subscribe, TopicFilters, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_suback{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
reason_codes = ReasonCodes},
|
||||||
|
#{version := Ver}) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
||||||
|
serialize_reason_codes(ReasonCodes)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
topic_filters = TopicFilters},
|
||||||
|
#{version := Ver}) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
||||||
|
serialize_topic_filters(unsubscribe, TopicFilters, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_unsuback{packet_id = PacketId,
|
||||||
|
properties = Properties,
|
||||||
|
reason_codes = ReasonCodes},
|
||||||
|
#{version := Ver}) ->
|
||||||
|
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
||||||
|
serialize_reason_codes(ReasonCodes)];
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_disconnect{}, #{version := Ver})
|
||||||
|
when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 ->
|
||||||
|
<<>>;
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_disconnect{reason_code = ReasonCode,
|
||||||
|
properties = Properties},
|
||||||
|
#{version := Ver = ?MQTT_PROTO_V5}) ->
|
||||||
|
[ReasonCode, serialize_properties(Properties, Ver)];
|
||||||
|
serialize_variable(#mqtt_packet_disconnect{}, _Ver) ->
|
||||||
|
<<>>;
|
||||||
|
|
||||||
|
serialize_variable(#mqtt_packet_auth{reason_code = ReasonCode,
|
||||||
|
properties = Properties},
|
||||||
|
#{version := Ver = ?MQTT_PROTO_V5}) ->
|
||||||
|
[ReasonCode, serialize_properties(Properties, Ver)];
|
||||||
|
|
||||||
|
serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) ->
|
||||||
|
<<PacketId:16/big-unsigned-integer>>;
|
||||||
|
serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) ->
|
||||||
|
<<PacketId:16/big-unsigned-integer>>;
|
||||||
|
serialize_variable(undefined, _Ver) ->
|
||||||
|
<<>>.
|
||||||
|
|
||||||
|
serialize_payload(undefined) -> <<>>;
|
||||||
|
serialize_payload(Bin) -> Bin.
|
||||||
|
|
||||||
|
serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
|
||||||
|
<<>>;
|
||||||
|
serialize_properties(Props, ?MQTT_PROTO_V5) ->
|
||||||
|
serialize_properties(Props).
|
||||||
|
|
||||||
|
serialize_properties(undefined) ->
|
||||||
|
<<0>>;
|
||||||
|
serialize_properties(Props) when map_size(Props) == 0 ->
|
||||||
|
<<0>>;
|
||||||
|
serialize_properties(Props) when is_map(Props) ->
|
||||||
|
Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>,
|
||||||
|
[serialize_variable_byte_integer(byte_size(Bin)), Bin].
|
||||||
|
|
||||||
|
serialize_property(_, undefined) ->
|
||||||
|
<<>>;
|
||||||
|
serialize_property('Payload-Format-Indicator', Val) ->
|
||||||
|
<<16#01, Val>>;
|
||||||
|
serialize_property('Message-Expiry-Interval', Val) ->
|
||||||
|
<<16#02, Val:32/big>>;
|
||||||
|
serialize_property('Content-Type', Val) ->
|
||||||
|
<<16#03, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Response-Topic', Val) ->
|
||||||
|
<<16#08, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Correlation-Data', Val) ->
|
||||||
|
<<16#09, (byte_size(Val)):16, Val/binary>>;
|
||||||
|
serialize_property('Subscription-Identifier', Val) ->
|
||||||
|
<<16#0B, (serialize_variable_byte_integer(Val))/binary>>;
|
||||||
|
serialize_property('Session-Expiry-Interval', Val) ->
|
||||||
|
<<16#11, Val:32/big>>;
|
||||||
|
serialize_property('Assigned-Client-Identifier', Val) ->
|
||||||
|
<<16#12, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Server-Keep-Alive', Val) ->
|
||||||
|
<<16#13, Val:16/big>>;
|
||||||
|
serialize_property('Authentication-Method', Val) ->
|
||||||
|
<<16#15, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Authentication-Data', Val) ->
|
||||||
|
<<16#16, (iolist_size(Val)):16, Val/binary>>;
|
||||||
|
serialize_property('Request-Problem-Information', Val) ->
|
||||||
|
<<16#17, Val>>;
|
||||||
|
serialize_property('Will-Delay-Interval', Val) ->
|
||||||
|
<<16#18, Val:32/big>>;
|
||||||
|
serialize_property('Request-Response-Information', Val) ->
|
||||||
|
<<16#19, Val>>;
|
||||||
|
serialize_property('Response-Information', Val) ->
|
||||||
|
<<16#1A, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Server-Reference', Val) ->
|
||||||
|
<<16#1C, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Reason-String', Val) ->
|
||||||
|
<<16#1F, (serialize_utf8_string(Val))/binary>>;
|
||||||
|
serialize_property('Receive-Maximum', Val) ->
|
||||||
|
<<16#21, Val:16/big>>;
|
||||||
|
serialize_property('Topic-Alias-Maximum', Val) ->
|
||||||
|
<<16#22, Val:16/big>>;
|
||||||
|
serialize_property('Topic-Alias', Val) ->
|
||||||
|
<<16#23, Val:16/big>>;
|
||||||
|
serialize_property('Maximum-QoS', Val) ->
|
||||||
|
<<16#24, Val>>;
|
||||||
|
serialize_property('Retain-Available', Val) ->
|
||||||
|
<<16#25, Val>>;
|
||||||
|
serialize_property('User-Property', {Key, Val}) ->
|
||||||
|
<<16#26, (serialize_utf8_pair({Key, Val}))/binary>>;
|
||||||
|
serialize_property('User-Property', Props) when is_list(Props) ->
|
||||||
|
<< <<(serialize_property('User-Property', {Key, Val}))/binary>>
|
||||||
|
|| {Key, Val} <- Props >>;
|
||||||
|
serialize_property('Maximum-Packet-Size', Val) ->
|
||||||
|
<<16#27, Val:32/big>>;
|
||||||
|
serialize_property('Wildcard-Subscription-Available', Val) ->
|
||||||
|
<<16#28, Val>>;
|
||||||
|
serialize_property('Subscription-Identifier-Available', Val) ->
|
||||||
|
<<16#29, Val>>;
|
||||||
|
serialize_property('Shared-Subscription-Available', Val) ->
|
||||||
|
<<16#2A, Val>>.
|
||||||
|
|
||||||
|
serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) ->
|
||||||
|
<< <<(serialize_utf8_string(Topic))/binary, (serialize_subopts(SubOpts)) >>
|
||||||
|
|| {Topic, SubOpts} <- TopicFilters >>;
|
||||||
|
|
||||||
|
serialize_topic_filters(subscribe, TopicFilters, _Ver) ->
|
||||||
|
<< <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, QoS:2>>
|
||||||
|
|| {Topic, #mqtt_subopts{qos = QoS}} <- TopicFilters >>;
|
||||||
|
|
||||||
|
serialize_topic_filters(unsubscribe, TopicFilters, _Ver) ->
|
||||||
|
<< <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>.
|
||||||
|
|
||||||
|
serialize_subopts(#mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = QoS}) ->
|
||||||
|
<<?RESERVED:2, Rh:2, (flag(Rap)):1, (flag(Nl)):1, QoS:2>>.
|
||||||
|
|
||||||
|
serialize_reason_codes(undefined) ->
|
||||||
|
<<>>;
|
||||||
|
serialize_reason_codes(ReasonCodes) when is_list(ReasonCodes) ->
|
||||||
|
<< <<Code>> || Code <- ReasonCodes >>.
|
||||||
|
|
||||||
|
serialize_utf8_pair({Name, Value}) ->
|
||||||
|
<< (serialize_utf8_string(Name))/binary, (serialize_utf8_string(Value))/binary >>.
|
||||||
|
|
||||||
|
serialize_binary_data(Bin) ->
|
||||||
|
[<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
|
||||||
|
|
||||||
|
serialize_utf8_string(undefined, false) ->
|
||||||
|
error(utf8_string_undefined);
|
||||||
|
serialize_utf8_string(undefined, true) ->
|
||||||
|
<<>>;
|
||||||
|
serialize_utf8_string(String, _AllowNull) ->
|
||||||
|
serialize_utf8_string(String).
|
||||||
|
|
||||||
|
serialize_utf8_string(String) ->
|
||||||
|
StringBin = unicode:characters_to_binary(String),
|
||||||
|
Len = byte_size(StringBin),
|
||||||
|
true = (Len =< 16#ffff),
|
||||||
|
<<Len:16/big, StringBin/binary>>.
|
||||||
|
|
||||||
|
serialize_remaining_len(I) ->
|
||||||
|
serialize_variable_byte_integer(I).
|
||||||
|
|
||||||
|
serialize_variable_byte_integer(N) when N =< ?LOWBITS ->
|
||||||
|
<<0:1, N:7>>;
|
||||||
|
serialize_variable_byte_integer(N) ->
|
||||||
|
<<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
bool(0) -> false;
|
bool(0) -> false;
|
||||||
bool(1) -> true.
|
bool(1) -> true.
|
||||||
|
|
||||||
%% Fix Issue#575
|
flag(undefined) -> ?RESERVED;
|
||||||
|
flag(false) -> 0;
|
||||||
|
flag(true) -> 1;
|
||||||
|
flag(X) when is_integer(X) -> X;
|
||||||
|
flag(B) when is_binary(B) -> 1.
|
||||||
|
|
||||||
fixqos(?PUBREL, 0) -> 1;
|
fixqos(?PUBREL, 0) -> 1;
|
||||||
fixqos(?SUBSCRIBE, 0) -> 1;
|
fixqos(?SUBSCRIBE, 0) -> 1;
|
||||||
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
fixqos(?UNSUBSCRIBE, 0) -> 1;
|
|
@ -1,294 +0,0 @@
|
||||||
%%%===================================================================
|
|
||||||
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
|
|
||||||
%%%
|
|
||||||
%%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%%% you may not use this file except in compliance with the License.
|
|
||||||
%%% You may obtain a copy of the License at
|
|
||||||
%%%
|
|
||||||
%%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%%
|
|
||||||
%%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%%% See the License for the specific language governing permissions and
|
|
||||||
%%% limitations under the License.
|
|
||||||
%%%===================================================================
|
|
||||||
|
|
||||||
-module(emqx_serializer).
|
|
||||||
|
|
||||||
-include("emqx.hrl").
|
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
|
||||||
|
|
||||||
-type(option() :: {version, mqtt_version()}).
|
|
||||||
|
|
||||||
-export_type([option/0]).
|
|
||||||
|
|
||||||
-export([serialize/1, serialize/2]).
|
|
||||||
|
|
||||||
-spec(serialize(mqtt_packet()) -> iodata()).
|
|
||||||
serialize(Packet) -> serialize(Packet, []).
|
|
||||||
|
|
||||||
-spec(serialize(mqtt_packet(), [option()]) -> iodata()).
|
|
||||||
serialize(#mqtt_packet{header = Header,
|
|
||||||
variable = Variable,
|
|
||||||
payload = Payload}, Opts) when is_list(Opts) ->
|
|
||||||
Opts1 = parse_opt(Opts, #{version => ?MQTT_PROTO_V4}),
|
|
||||||
serialize(Header, serialize_variable(Variable, Opts1), serialize_payload(Payload)).
|
|
||||||
|
|
||||||
parse_opt([], Map) ->
|
|
||||||
Map;
|
|
||||||
parse_opt([{version, Ver}|Opts], Map) ->
|
|
||||||
parse_opt(Opts, Map#{version := Ver});
|
|
||||||
parse_opt([_|Opts], Map) ->
|
|
||||||
parse_opt(Opts, Map).
|
|
||||||
|
|
||||||
serialize(#mqtt_packet_header{type = Type,
|
|
||||||
dup = Dup,
|
|
||||||
qos = Qos,
|
|
||||||
retain = Retain}, VariableData, PayloadData)
|
|
||||||
when ?CONNECT =< Type andalso Type =< ?AUTH ->
|
|
||||||
Len = iolist_size(VariableData) + iolist_size(PayloadData),
|
|
||||||
true = (Len =< ?MAX_PACKET_SIZE),
|
|
||||||
[<<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1>>,
|
|
||||||
serialize_remaining_len(Len), VariableData, PayloadData].
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_connect{proto_name = ProtoName,
|
|
||||||
proto_ver = ProtoVer,
|
|
||||||
is_bridge = IsBridge,
|
|
||||||
clean_start = CleanStart,
|
|
||||||
will_flag = WillFlag,
|
|
||||||
will_qos = WillQos,
|
|
||||||
will_retain = WillRetain,
|
|
||||||
keepalive = KeepAlive,
|
|
||||||
properties = Properties,
|
|
||||||
client_id = ClientId,
|
|
||||||
will_props = WillProps,
|
|
||||||
will_topic = WillTopic,
|
|
||||||
will_payload = WillPayload,
|
|
||||||
username = Username,
|
|
||||||
password = Password}, _Opts) ->
|
|
||||||
[serialize_binary_data(ProtoName),
|
|
||||||
<<(case IsBridge of
|
|
||||||
true -> 16#80 + ProtoVer;
|
|
||||||
false -> ProtoVer
|
|
||||||
end):8,
|
|
||||||
(opt(Username)):1,
|
|
||||||
(opt(Password)):1,
|
|
||||||
(opt(WillRetain)):1,
|
|
||||||
WillQos:2,
|
|
||||||
(opt(WillFlag)):1,
|
|
||||||
(opt(CleanStart)):1,
|
|
||||||
0:1,
|
|
||||||
KeepAlive:16/big-unsigned-integer>>,
|
|
||||||
serialize_properties(Properties, ProtoVer),
|
|
||||||
serialize_utf8_string(ClientId),
|
|
||||||
case WillFlag of
|
|
||||||
true -> [serialize_properties(WillProps, ProtoVer),
|
|
||||||
serialize_utf8_string(WillTopic),
|
|
||||||
serialize_binary_data(WillPayload)];
|
|
||||||
false -> <<>>
|
|
||||||
end,
|
|
||||||
serialize_utf8_string(Username, true),
|
|
||||||
serialize_utf8_string(Password, true)];
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_connack{ack_flags = AckFlags,
|
|
||||||
reason_code = ReasonCode,
|
|
||||||
properties = Properties}, #{version := Ver}) ->
|
|
||||||
[AckFlags, ReasonCode, serialize_properties(Properties, Ver)];
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_publish{topic_name = TopicName,
|
|
||||||
packet_id = PacketId,
|
|
||||||
properties = Properties}, #{version := Ver}) ->
|
|
||||||
[serialize_utf8_string(TopicName),
|
|
||||||
if
|
|
||||||
PacketId =:= undefined -> <<>>;
|
|
||||||
true -> <<PacketId:16/big-unsigned-integer>>
|
|
||||||
end,
|
|
||||||
serialize_properties(Properties, Ver)];
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_puback{packet_id = PacketId}, #{version := Ver})
|
|
||||||
when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 ->
|
|
||||||
<<PacketId:16/big-unsigned-integer>>;
|
|
||||||
serialize_variable(#mqtt_packet_puback{packet_id = PacketId,
|
|
||||||
reason_code = ReasonCode,
|
|
||||||
properties = Properties},
|
|
||||||
#{version := ?MQTT_PROTO_V5}) ->
|
|
||||||
[<<PacketId:16/big-unsigned-integer>>, ReasonCode,
|
|
||||||
serialize_properties(Properties, ?MQTT_PROTO_V5)];
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_subscribe{packet_id = PacketId,
|
|
||||||
properties = Properties,
|
|
||||||
topic_filters = TopicFilters},
|
|
||||||
#{version := Ver}) ->
|
|
||||||
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
|
||||||
serialize_topic_filters(subscribe, TopicFilters, Ver)];
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_suback{packet_id = PacketId,
|
|
||||||
properties = Properties,
|
|
||||||
reason_codes = ReasonCodes},
|
|
||||||
#{version := Ver}) ->
|
|
||||||
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
|
||||||
<< <<Code>> || Code <- ReasonCodes >>];
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_unsubscribe{packet_id = PacketId,
|
|
||||||
properties = Properties,
|
|
||||||
topic_filters = TopicFilters},
|
|
||||||
#{version := Ver}) ->
|
|
||||||
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
|
||||||
serialize_topic_filters(unsubscribe, TopicFilters, Ver)];
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_unsuback{packet_id = PacketId,
|
|
||||||
properties = Properties,
|
|
||||||
reason_codes = ReasonCodes},
|
|
||||||
#{version := Ver}) ->
|
|
||||||
[<<PacketId:16/big-unsigned-integer>>, serialize_properties(Properties, Ver),
|
|
||||||
<< <<Code>> || Code <- ReasonCodes >>];
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_disconnect{}, #{version := Ver})
|
|
||||||
when Ver == ?MQTT_PROTO_V3; Ver == ?MQTT_PROTO_V4 ->
|
|
||||||
<<>>;
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_disconnect{reason_code = ReasonCode,
|
|
||||||
properties = Properties},
|
|
||||||
#{version := Ver = ?MQTT_PROTO_V5}) ->
|
|
||||||
[ReasonCode, serialize_properties(Properties, Ver)];
|
|
||||||
serialize_variable(#mqtt_packet_disconnect{}, _Ver) ->
|
|
||||||
<<>>;
|
|
||||||
|
|
||||||
serialize_variable(#mqtt_packet_auth{reason_code = ReasonCode,
|
|
||||||
properties = Properties},
|
|
||||||
#{version := Ver = ?MQTT_PROTO_V5}) ->
|
|
||||||
[ReasonCode, serialize_properties(Properties, Ver)];
|
|
||||||
|
|
||||||
serialize_variable(PacketId, ?MQTT_PROTO_V3) when is_integer(PacketId) ->
|
|
||||||
<<PacketId:16/big-unsigned-integer>>;
|
|
||||||
serialize_variable(PacketId, ?MQTT_PROTO_V4) when is_integer(PacketId) ->
|
|
||||||
<<PacketId:16/big-unsigned-integer>>;
|
|
||||||
serialize_variable(undefined, _Ver) ->
|
|
||||||
<<>>.
|
|
||||||
|
|
||||||
serialize_payload(undefined) ->
|
|
||||||
<<>>;
|
|
||||||
serialize_payload(Bin) when is_binary(Bin); is_list(Bin) ->
|
|
||||||
Bin.
|
|
||||||
|
|
||||||
serialize_properties(_Props, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
|
|
||||||
<<>>;
|
|
||||||
serialize_properties(Props, ?MQTT_PROTO_V5) ->
|
|
||||||
serialize_properties(Props).
|
|
||||||
|
|
||||||
serialize_properties(undefined) ->
|
|
||||||
<<0>>;
|
|
||||||
serialize_properties(Props) when map_size(Props) == 0 ->
|
|
||||||
<<0>>;
|
|
||||||
serialize_properties(Props) when is_map(Props) ->
|
|
||||||
Bin = << <<(serialize_property(Prop, Val))/binary>> || {Prop, Val} <- maps:to_list(Props) >>,
|
|
||||||
[serialize_variable_byte_integer(byte_size(Bin)), Bin].
|
|
||||||
|
|
||||||
%% Ignore undefined
|
|
||||||
serialize_property(_, undefined) ->
|
|
||||||
<<>>;
|
|
||||||
serialize_property('Payload-Format-Indicator', Val) ->
|
|
||||||
<<16#01, Val>>;
|
|
||||||
serialize_property('Message-Expiry-Interval', Val) ->
|
|
||||||
<<16#02, Val:32/big>>;
|
|
||||||
serialize_property('Content-Type', Val) ->
|
|
||||||
<<16#03, (serialize_utf8_string(Val))/binary>>;
|
|
||||||
serialize_property('Response-Topic', Val) ->
|
|
||||||
<<16#08, (serialize_utf8_string(Val))/binary>>;
|
|
||||||
serialize_property('Correlation-Data', Val) ->
|
|
||||||
<<16#09, (byte_size(Val)):16, Val/binary>>;
|
|
||||||
serialize_property('Subscription-Identifier', Val) ->
|
|
||||||
<<16#0B, (serialize_variable_byte_integer(Val))/binary>>;
|
|
||||||
serialize_property('Session-Expiry-Interval', Val) ->
|
|
||||||
<<16#11, Val:32/big>>;
|
|
||||||
serialize_property('Assigned-Client-Identifier', Val) ->
|
|
||||||
<<16#12, (serialize_utf8_string(Val))/binary>>;
|
|
||||||
serialize_property('Server-Keep-Alive', Val) ->
|
|
||||||
<<16#13, Val:16/big>>;
|
|
||||||
serialize_property('Authentication-Method', Val) ->
|
|
||||||
<<16#15, (serialize_utf8_string(Val))/binary>>;
|
|
||||||
serialize_property('Authentication-Data', Val) ->
|
|
||||||
<<16#16, (iolist_size(Val)):16, Val/binary>>;
|
|
||||||
serialize_property('Request-Problem-Information', Val) ->
|
|
||||||
<<16#17, Val>>;
|
|
||||||
serialize_property('Will-Delay-Interval', Val) ->
|
|
||||||
<<16#18, Val:32/big>>;
|
|
||||||
serialize_property('Request-Response-Information', Val) ->
|
|
||||||
<<16#19, Val>>;
|
|
||||||
serialize_property('Response-Information', Val) ->
|
|
||||||
<<16#1A, (serialize_utf8_string(Val))/binary>>;
|
|
||||||
serialize_property('Server-Reference', Val) ->
|
|
||||||
<<16#1C, (serialize_utf8_string(Val))/binary>>;
|
|
||||||
serialize_property('Reason-String', Val) ->
|
|
||||||
<<16#1F, (serialize_utf8_string(Val))/binary>>;
|
|
||||||
serialize_property('Receive-Maximum', Val) ->
|
|
||||||
<<16#21, Val:16/big>>;
|
|
||||||
serialize_property('Topic-Alias-Maximum', Val) ->
|
|
||||||
<<16#22, Val:16/big>>;
|
|
||||||
serialize_property('Topic-Alias', Val) ->
|
|
||||||
<<16#23, Val:16/big>>;
|
|
||||||
serialize_property('Maximum-QoS', Val) ->
|
|
||||||
<<16#24, Val>>;
|
|
||||||
serialize_property('Retain-Available', Val) ->
|
|
||||||
<<16#25, Val>>;
|
|
||||||
serialize_property('User-Property', {Key, Val}) ->
|
|
||||||
<<16#26, (serialize_utf8_pair({Key, Val}))/binary>>;
|
|
||||||
serialize_property('User-Property', Props) when is_list(Props) ->
|
|
||||||
<< <<(serialize_property('User-Property', {Key, Val}))/binary>>
|
|
||||||
|| {Key, Val} <- Props >>;
|
|
||||||
serialize_property('Maximum-Packet-Size', Val) ->
|
|
||||||
<<16#27, Val:32/big>>;
|
|
||||||
serialize_property('Wildcard-Subscription-Available', Val) ->
|
|
||||||
<<16#28, Val>>;
|
|
||||||
serialize_property('Subscription-Identifier-Available', Val) ->
|
|
||||||
<<16#29, Val>>;
|
|
||||||
serialize_property('Shared-Subscription-Available', Val) ->
|
|
||||||
<<16#2A, Val>>.
|
|
||||||
|
|
||||||
serialize_topic_filters(subscribe, TopicFilters, ?MQTT_PROTO_V5) ->
|
|
||||||
<< <<(serialize_utf8_string(Topic))/binary, ?RESERVED:2, Rh:2, (opt(Rap)):1, (opt(Nl)):1, Qos:2>>
|
|
||||||
|| {Topic, #mqtt_subopts{rh = Rh, rap = Rap, nl = Nl, qos = Qos}} <- TopicFilters >>;
|
|
||||||
|
|
||||||
serialize_topic_filters(subscribe, TopicFilters, _Ver) ->
|
|
||||||
<< <<(serialize_utf8_string(Topic))/binary, ?RESERVED:6, Qos:2>>
|
|
||||||
|| {Topic, #mqtt_subopts{qos = Qos}} <- TopicFilters >>;
|
|
||||||
|
|
||||||
serialize_topic_filters(unsubscribe, TopicFilters, _Ver) ->
|
|
||||||
<< <<(serialize_utf8_string(Topic))/binary>> || Topic <- TopicFilters >>.
|
|
||||||
|
|
||||||
serialize_utf8_pair({Name, Value}) ->
|
|
||||||
<< <<(serialize_utf8_string(S))/binary,
|
|
||||||
(serialize_utf8_string(S))/binary>> || S <- [Name, Value] >>.
|
|
||||||
|
|
||||||
serialize_binary_data(Bin) ->
|
|
||||||
[<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
|
|
||||||
|
|
||||||
serialize_utf8_string(undefined, false) ->
|
|
||||||
error(utf8_string_undefined);
|
|
||||||
serialize_utf8_string(undefined, true) ->
|
|
||||||
<<>>;
|
|
||||||
serialize_utf8_string(String, _AllowNull) ->
|
|
||||||
serialize_utf8_string(String).
|
|
||||||
|
|
||||||
serialize_utf8_string(String) ->
|
|
||||||
StringBin = unicode:characters_to_binary(String),
|
|
||||||
Len = byte_size(StringBin),
|
|
||||||
true = (Len =< 16#ffff),
|
|
||||||
<<Len:16/big, StringBin/binary>>.
|
|
||||||
|
|
||||||
serialize_remaining_len(I) ->
|
|
||||||
serialize_variable_byte_integer(I).
|
|
||||||
|
|
||||||
serialize_variable_byte_integer(N) when N =< ?LOWBITS ->
|
|
||||||
<<0:1, N:7>>;
|
|
||||||
serialize_variable_byte_integer(N) ->
|
|
||||||
<<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
|
|
||||||
|
|
||||||
opt(undefined) -> ?RESERVED;
|
|
||||||
opt(false) -> 0;
|
|
||||||
opt(true) -> 1;
|
|
||||||
opt(X) when is_integer(X) -> X;
|
|
||||||
opt(B) when is_binary(B) -> 1.
|
|
||||||
|
|
|
@ -0,0 +1,430 @@
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved.
|
||||||
|
%%%
|
||||||
|
%%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%%% you may not use this file except in compliance with the License.
|
||||||
|
%%% You may obtain a copy of the License at
|
||||||
|
%%%
|
||||||
|
%%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%%
|
||||||
|
%%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%%% See the License for the specific language governing permissions and
|
||||||
|
%%% limitations under the License.
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-module(emqx_frame_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-import(emqx_frame, [serialize/1, serialize/2]).
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[{group, connect},
|
||||||
|
{group, connack},
|
||||||
|
{group, publish},
|
||||||
|
{group, puback},
|
||||||
|
{group, subscribe},
|
||||||
|
{group, suback},
|
||||||
|
{group, unsubscribe},
|
||||||
|
{group, unsuback},
|
||||||
|
{group, ping},
|
||||||
|
{group, disconnect},
|
||||||
|
{group, auth}].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
[{connect, [parallel],
|
||||||
|
[serialize_parse_connect,
|
||||||
|
serialize_parse_v3_connect,
|
||||||
|
serialize_parse_v4_connect,
|
||||||
|
serialize_parse_v5_connect,
|
||||||
|
serialize_parse_connect_without_clientid,
|
||||||
|
serialize_parse_connect_with_will,
|
||||||
|
serialize_parse_bridge_connect]},
|
||||||
|
{connack, [parallel],
|
||||||
|
[serialize_parse_connack,
|
||||||
|
serialize_parse_connack_v5]},
|
||||||
|
{publish, [parallel],
|
||||||
|
[serialize_parse_qos0_publish,
|
||||||
|
serialize_parse_qos1_publish,
|
||||||
|
serialize_parse_qos2_publish,
|
||||||
|
serialize_parse_publish_v5]},
|
||||||
|
{puback, [parallel],
|
||||||
|
[serialize_parse_puback,
|
||||||
|
serialize_parse_puback_v5,
|
||||||
|
serialize_parse_pubrec,
|
||||||
|
serialize_parse_pubrec_v5,
|
||||||
|
serialize_parse_pubrel,
|
||||||
|
serialize_parse_pubrel_v5,
|
||||||
|
serialize_parse_pubcomp,
|
||||||
|
serialize_parse_pubcomp_v5]},
|
||||||
|
{subscribe, [parallel],
|
||||||
|
[serialize_parse_subscribe,
|
||||||
|
serialize_parse_subscribe_v5]},
|
||||||
|
{suback, [parallel],
|
||||||
|
[serialize_parse_suback,
|
||||||
|
serialize_parse_suback_v5]},
|
||||||
|
{unsubscribe, [parallel],
|
||||||
|
[serialize_parse_unsubscribe,
|
||||||
|
serialize_parse_unsubscribe_v5]},
|
||||||
|
{unsuback, [parallel],
|
||||||
|
[serialize_parse_unsuback,
|
||||||
|
serialize_parse_unsuback_v5]},
|
||||||
|
{ping, [parallel],
|
||||||
|
[serialize_parse_pingreq,
|
||||||
|
serialize_parse_pingresp]},
|
||||||
|
{disconnect, [parallel],
|
||||||
|
[serialize_parse_disconnect,
|
||||||
|
serialize_parse_disconnect_v5]},
|
||||||
|
{auth, [parallel],
|
||||||
|
[serialize_parse_auth_v5]}].
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_group(_Group, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_group(_Group, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
serialize_parse_connect(_) ->
|
||||||
|
Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{}),
|
||||||
|
?assertEqual({ok, Packet1, <<>>}, parse_serialize(Packet1)),
|
||||||
|
Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
client_id = <<"clientId">>,
|
||||||
|
will_qos = ?QOS1,
|
||||||
|
will_flag = true,
|
||||||
|
will_retain = true,
|
||||||
|
will_topic = <<"will">>,
|
||||||
|
will_payload = <<"bye">>,
|
||||||
|
clean_start = true}),
|
||||||
|
?assertEqual({ok, Packet2, <<>>}, parse_serialize(Packet2)).
|
||||||
|
|
||||||
|
serialize_parse_v3_connect(_) ->
|
||||||
|
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
||||||
|
113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,
|
||||||
|
111,99,97>>,
|
||||||
|
Packet = ?CONNECT_PACKET(
|
||||||
|
#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
|
||||||
|
proto_name = <<"MQIsdp">>,
|
||||||
|
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||||
|
clean_start = true,
|
||||||
|
keepalive = 60}),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_v4_connect(_) ->
|
||||||
|
Bin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,
|
||||||
|
98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||||
|
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = 4,
|
||||||
|
proto_name = <<"MQTT">>,
|
||||||
|
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||||
|
clean_start = true,
|
||||||
|
keepalive = 60}),
|
||||||
|
?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_v5_connect(_) ->
|
||||||
|
Props = #{'Session-Expiry-Interval' => 60,
|
||||||
|
'Receive-Maximum' => 100,
|
||||||
|
'Maximum-QoS' => ?QOS_2,
|
||||||
|
'Retain-Available' => 1,
|
||||||
|
'Maximum-Packet-Size' => 1024,
|
||||||
|
'Topic-Alias-Maximum' => 10,
|
||||||
|
'Request-Response-Information' => 1,
|
||||||
|
'Request-Problem-Information' => 1,
|
||||||
|
'Authentication-Method' => <<"oauth2">>,
|
||||||
|
'Authentication-Data' => <<"33kx93k">>},
|
||||||
|
|
||||||
|
WillProps = #{'Will-Delay-Interval' => 60,
|
||||||
|
'Payload-Format-Indicator' => 1,
|
||||||
|
'Message-Expiry-Interval' => 60,
|
||||||
|
'Content-Type' => <<"text/json">>,
|
||||||
|
'Response-Topic' => <<"topic">>,
|
||||||
|
'Correlation-Data' => <<"correlateid">>,
|
||||||
|
'User-Property' => [{<<"k">>, <<"v">>}]},
|
||||||
|
Packet = ?CONNECT_PACKET(
|
||||||
|
#mqtt_packet_connect{proto_name = <<"MQTT">>,
|
||||||
|
proto_ver = ?MQTT_PROTO_V5,
|
||||||
|
is_bridge = false,
|
||||||
|
clean_start = true,
|
||||||
|
client_id = <<>>,
|
||||||
|
will_flag = true,
|
||||||
|
will_qos = ?QOS_1,
|
||||||
|
will_retain = false,
|
||||||
|
keepalive = 60,
|
||||||
|
properties = Props,
|
||||||
|
will_props = WillProps,
|
||||||
|
will_topic = <<"topic">>,
|
||||||
|
will_payload = <<>>,
|
||||||
|
username = <<"device:1">>,
|
||||||
|
password = <<"passwd">>}),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_connect_without_clientid(_) ->
|
||||||
|
Bin = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
||||||
|
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = 4,
|
||||||
|
proto_name = <<"MQTT">>,
|
||||||
|
client_id = <<>>,
|
||||||
|
clean_start = true,
|
||||||
|
keepalive = 60}),
|
||||||
|
?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_connect_with_will(_) ->
|
||||||
|
Bin = <<16,67,0,6,77,81,73,115,100,112,3,206,0,60,0,23,109,111,115,113,112,
|
||||||
|
117,98,47,49,48,52,53,50,45,105,77,97,99,46,108,111,99,97,0,5,47,119,
|
||||||
|
105,108,108,0,7,119,105,108,108,109,115,103,0,4,116,101,115,116,0,6,
|
||||||
|
112,117,98,108,105,99>>,
|
||||||
|
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT},
|
||||||
|
variable = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3,
|
||||||
|
proto_name = <<"MQIsdp">>,
|
||||||
|
client_id = <<"mosqpub/10452-iMac.loca">>,
|
||||||
|
clean_start = true,
|
||||||
|
keepalive = 60,
|
||||||
|
will_retain = false,
|
||||||
|
will_qos = ?QOS_1,
|
||||||
|
will_flag = true,
|
||||||
|
will_topic = <<"/will">>,
|
||||||
|
will_payload = <<"willmsg">>,
|
||||||
|
username = <<"test">>,
|
||||||
|
password = <<"public">>}},
|
||||||
|
?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_bridge_connect(_) ->
|
||||||
|
Bin = <<16,86,0,6,77,81,73,115,100,112,131,44,0,60,0,19,67,95,48,48,58,48,67,
|
||||||
|
58,50,57,58,50,66,58,55,55,58,53,50,0,48,36,83,89,83,47,98,114,111,107,
|
||||||
|
101,114,47,99,111,110,110,101,99,116,105,111,110,47,67,95,48,48,58,48,
|
||||||
|
67,58,50,57,58,50,66,58,55,55,58,53,50,47,115,116,97,116,101,0,1,48>>,
|
||||||
|
Topic = <<"$SYS/broker/connection/C_00:0C:29:2B:77:52/state">>,
|
||||||
|
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT},
|
||||||
|
variable = #mqtt_packet_connect{client_id = <<"C_00:0C:29:2B:77:52">>,
|
||||||
|
proto_ver = 16#03,
|
||||||
|
proto_name = <<"MQIsdp">>,
|
||||||
|
is_bridge = true,
|
||||||
|
will_retain = true,
|
||||||
|
will_qos = ?QOS_1,
|
||||||
|
will_flag = true,
|
||||||
|
clean_start = false,
|
||||||
|
keepalive = 60,
|
||||||
|
will_topic = Topic,
|
||||||
|
will_payload = <<"0">>}},
|
||||||
|
?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_connack(_) ->
|
||||||
|
Packet = ?CONNACK_PACKET(?RC_SUCCESS),
|
||||||
|
?assertEqual(<<32,2,0,0>>, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_connack_v5(_) ->
|
||||||
|
Props = #{'Session-Expiry-Interval' => 60,
|
||||||
|
'Receive-Maximum' => 100,
|
||||||
|
'Maximum-QoS' => ?QOS_2,
|
||||||
|
'Retain-Available' => 1,
|
||||||
|
'Maximum-Packet-Size' => 1024,
|
||||||
|
'Assigned-Client-Identifier' => <<"id">>,
|
||||||
|
'Topic-Alias-Maximum' => 10,
|
||||||
|
'Reason-String' => <<>>,
|
||||||
|
'Wildcard-Subscription-Available' => 1,
|
||||||
|
'Subscription-Identifier-Available' => 1,
|
||||||
|
'Shared-Subscription-Available' => 1,
|
||||||
|
'Server-Keep-Alive' => 60,
|
||||||
|
'Response-Information' => <<"response">>,
|
||||||
|
'Server-Reference' => <<"192.168.1.10">>,
|
||||||
|
'Authentication-Method' => <<"oauth2">>,
|
||||||
|
'Authentication-Data' => <<"33kx93k">>},
|
||||||
|
Packet = ?CONNACK_PACKET(?RC_SUCCESS, 0, Props),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_qos0_publish(_) ->
|
||||||
|
Bin = <<48,14,0,7,120,120,120,47,121,121,121,104,101,108,108,111>>,
|
||||||
|
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
dup = false,
|
||||||
|
qos = ?QOS_0,
|
||||||
|
retain = false},
|
||||||
|
variable = #mqtt_packet_publish{topic_name = <<"xxx/yyy">>,
|
||||||
|
packet_id = undefined},
|
||||||
|
payload = <<"hello">>},
|
||||||
|
?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_qos1_publish(_) ->
|
||||||
|
Bin = <<50,13,0,5,97,47,98,47,99,0,1,104,97,104,97>>,
|
||||||
|
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
dup = false,
|
||||||
|
qos = ?QOS_1,
|
||||||
|
retain = false},
|
||||||
|
variable = #mqtt_packet_publish{topic_name = <<"a/b/c">>,
|
||||||
|
packet_id = 1},
|
||||||
|
payload = <<"haha">>},
|
||||||
|
?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_qos2_publish(_) ->
|
||||||
|
Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, payload()),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_publish_v5(_) ->
|
||||||
|
Props = #{'Payload-Format-Indicator' => 1,
|
||||||
|
'Message-Expiry-Interval' => 60,
|
||||||
|
'Topic-Alias' => 16#AB,
|
||||||
|
'Response-Topic' => <<"reply">>,
|
||||||
|
'Correlation-Data' => <<"correlation-id">>,
|
||||||
|
'Subscription-Identifier' => 1,
|
||||||
|
'Content-Type' => <<"text/json">>},
|
||||||
|
Packet = ?PUBLISH_PACKET(#mqtt_packet_header{type = ?PUBLISH},
|
||||||
|
<<"$share/group/topic">>, 1, Props,
|
||||||
|
<<"payload">>),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_puback(_) ->
|
||||||
|
Packet = ?PUBACK_PACKET(1),
|
||||||
|
?assertEqual(<<64,2,0,1>>, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_puback_v5(_) ->
|
||||||
|
Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_pubrec(_) ->
|
||||||
|
Packet = ?PUBREC_PACKET(1),
|
||||||
|
?assertEqual(<<5:4,0:4,2,0,1>>, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_pubrec_v5(_) ->
|
||||||
|
Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_pubrel(_) ->
|
||||||
|
Packet = ?PUBREL_PACKET(1),
|
||||||
|
?assertEqual(<<6:4,2:4,2,0,1>>, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_pubrel_v5(_) ->
|
||||||
|
Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_pubcomp(_) ->
|
||||||
|
Packet = ?PUBCOMP_PACKET(1),
|
||||||
|
?assertEqual(<<7:4,0:4,2,0,1>>, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_pubcomp_v5(_) ->
|
||||||
|
Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_subscribe(_) ->
|
||||||
|
%% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
|
||||||
|
Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>,
|
||||||
|
TopicFilters = [{<<"TopicA">>, #mqtt_subopts{qos = 2}}],
|
||||||
|
Packet = ?SUBSCRIBE_PACKET(2, TopicFilters),
|
||||||
|
?assertEqual(Bin, serialize(Packet)),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_subscribe_v5(_) ->
|
||||||
|
TopicFilters = [{<<"TopicQos0">>, #mqtt_subopts{rh = 1, qos = ?QOS_0}},
|
||||||
|
{<<"TopicQos1">>, #mqtt_subopts{rh = 1, qos =?QOS_1}}],
|
||||||
|
Packet = ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 16#FFFFFFF},
|
||||||
|
TopicFilters),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_suback(_) ->
|
||||||
|
Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_suback_v5(_) ->
|
||||||
|
Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>,
|
||||||
|
'User-Property' => [{<<"key">>, <<"value">>}]},
|
||||||
|
[?QOS_0, ?QOS_1, 128]),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
|
||||||
|
serialize_parse_unsubscribe(_) ->
|
||||||
|
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
|
||||||
|
Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]),
|
||||||
|
Bin = <<162,10,0,2,0,6,84,111,112,105,99,65>>,
|
||||||
|
?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
|
||||||
|
|
||||||
|
serialize_parse_unsubscribe_v5(_) ->
|
||||||
|
Props = #{'User-Property' => [{<<"key">>, <<"val">>}]},
|
||||||
|
Packet = ?UNSUBSCRIBE_PACKET(10, Props, [<<"Topic1">>, <<"Topic2">>]),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_unsuback(_) ->
|
||||||
|
Packet = ?UNSUBACK_PACKET(10),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_unsuback_v5(_) ->
|
||||||
|
Packet = ?UNSUBACK_PACKET(10, #{'Reason-String' => <<"Not authorized">>,
|
||||||
|
'User-Property' => [{<<"key">>, <<"val">>}]},
|
||||||
|
[16#87, 16#87, 16#87]),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_pingreq(_) ->
|
||||||
|
PingReq = ?PACKET(?PINGREQ),
|
||||||
|
?assertEqual({ok, PingReq, <<>>}, parse_serialize(PingReq)).
|
||||||
|
|
||||||
|
serialize_parse_pingresp(_) ->
|
||||||
|
PingResp = ?PACKET(?PINGRESP),
|
||||||
|
?assertEqual({ok, PingResp, <<>>}, parse_serialize(PingResp)).
|
||||||
|
|
||||||
|
parse_disconnect(_) ->
|
||||||
|
?assertEqual({ok, ?DISCONNECT_PACKET(?RC_SUCCESS), <<>>}, parse(<<224, 0>>)).
|
||||||
|
|
||||||
|
serialize_parse_disconnect(_) ->
|
||||||
|
Packet = ?PACKET(?DISCONNECT),
|
||||||
|
?assertEqual({ok, Packet, <<>>}, parse_serialize(Packet)).
|
||||||
|
|
||||||
|
serialize_parse_disconnect_v5(_) ->
|
||||||
|
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS,
|
||||||
|
#{'Session-Expiry-Interval' => 60,
|
||||||
|
'Reason-String' => <<"server_moved">>,
|
||||||
|
'Server-Reference' => <<"192.168.1.10">>}),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
serialize_parse_auth_v5(_) ->
|
||||||
|
Packet = ?AUTH_PACKET(?RC_SUCCESS,
|
||||||
|
#{'Authentication-Method' => <<"oauth2">>,
|
||||||
|
'Authentication-Data' => <<"3zekkd">>,
|
||||||
|
'Reason-String' => <<"success">>,
|
||||||
|
'User-Property' => [{<<"key">>, <<"val">>}]}),
|
||||||
|
?assertEqual({ok, Packet, <<>>},
|
||||||
|
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
|
parse_serialize(Packet) ->
|
||||||
|
parse(iolist_to_binary(serialize(Packet))).
|
||||||
|
|
||||||
|
parse_serialize(Packet, Opts) when is_map(Opts) ->
|
||||||
|
parse(iolist_to_binary(serialize(Packet, Opts)), Opts).
|
||||||
|
|
||||||
|
parse(Bin) ->
|
||||||
|
parse(Bin, #{}).
|
||||||
|
|
||||||
|
parse(Bin, Opts) when is_map(Opts) ->
|
||||||
|
emqx_frame:parse(Bin, emqx_frame:initial_state(Opts)).
|
||||||
|
|
||||||
|
payload() ->
|
||||||
|
iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]).
|
||||||
|
|
Loading…
Reference in New Issue