Introduce the 'strict_mode' option and validate MQTT header (#2898)
Introduce the 'strict_mode' option and validate MQTT header
This commit is contained in:
parent
35822ff97a
commit
681ae511a8
|
@ -408,9 +408,10 @@ process_incoming(Data, State) ->
|
||||||
process_incoming(<<>>, Packets, State) ->
|
process_incoming(<<>>, Packets, State) ->
|
||||||
{keep_state, State, next_incoming_events(Packets)};
|
{keep_state, State, next_incoming_events(Packets)};
|
||||||
|
|
||||||
process_incoming(Data, Packets, State = #connection{parse_state = ParseState, chan_state = ChanState}) ->
|
process_incoming(Data, Packets, State = #connection{parse_state = ParseState,
|
||||||
|
chan_state = ChanState}) ->
|
||||||
try emqx_frame:parse(Data, ParseState) of
|
try emqx_frame:parse(Data, ParseState) of
|
||||||
{ok, NParseState} ->
|
{more, NParseState} ->
|
||||||
NState = State#connection{parse_state = NParseState},
|
NState = State#connection{parse_state = NParseState},
|
||||||
{keep_state, NState, next_incoming_events(Packets)};
|
{keep_state, NState, next_incoming_events(Packets)};
|
||||||
{ok, Packet, Rest, NParseState} ->
|
{ok, Packet, Rest, NParseState} ->
|
||||||
|
|
|
@ -34,21 +34,23 @@
|
||||||
, parse_result/0
|
, parse_result/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-type(options() :: #{max_size => 1..?MAX_PACKET_SIZE,
|
-type(options() :: #{strict_mode => boolean(),
|
||||||
|
max_size => 1..?MAX_PACKET_SIZE,
|
||||||
version => emqx_types:version()
|
version => emqx_types:version()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-opaque(parse_state() :: {none, options()} | {more, cont_fun()}).
|
-opaque(parse_state() :: {none, options()} | cont_fun()).
|
||||||
|
|
||||||
-opaque(parse_result() :: {ok, parse_state()}
|
-opaque(parse_result() :: {more, cont_fun()}
|
||||||
| {ok, emqx_types:packet(), binary(), parse_state()}).
|
| {ok, emqx_types:packet(), binary(), parse_state()}).
|
||||||
|
|
||||||
-type(cont_fun() :: fun((binary()) -> parse_result())).
|
-type(cont_fun() :: fun((binary()) -> parse_result())).
|
||||||
|
|
||||||
-define(none(Opts), {none, Opts}).
|
-define(none(Opts), {none, Opts}).
|
||||||
-define(more(Cont), {more, Cont}).
|
|
||||||
-define(DEFAULT_OPTIONS,
|
-define(DEFAULT_OPTIONS,
|
||||||
#{max_size => ?MAX_PACKET_SIZE,
|
#{strict_mode => false,
|
||||||
|
max_size => ?MAX_PACKET_SIZE,
|
||||||
version => ?MQTT_PROTO_V4
|
version => ?MQTT_PROTO_V4
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -78,17 +80,26 @@ parse(Bin) ->
|
||||||
|
|
||||||
-spec(parse(binary(), parse_state()) -> parse_result()).
|
-spec(parse(binary(), parse_state()) -> parse_result()).
|
||||||
parse(<<>>, {none, Options}) ->
|
parse(<<>>, {none, Options}) ->
|
||||||
{ok, ?more(fun(Bin) -> parse(Bin, {none, Options}) end)};
|
{more, fun(Bin) -> parse(Bin, {none, Options}) end};
|
||||||
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Options}) ->
|
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
|
||||||
parse_remaining_len(Rest, #mqtt_packet_header{type = Type,
|
{none, Options = #{strict_mode := StrictMode}}) ->
|
||||||
|
%% Validate header if strict mode.
|
||||||
|
StrictMode andalso validate_header(Type, Dup, QoS, Retain),
|
||||||
|
Header = #mqtt_packet_header{type = Type,
|
||||||
dup = bool(Dup),
|
dup = bool(Dup),
|
||||||
qos = fixqos(Type, QoS),
|
qos = QoS,
|
||||||
retain = bool(Retain)}, Options);
|
retain = bool(Retain)
|
||||||
parse(Bin, {more, Cont}) when is_binary(Bin), is_function(Cont) ->
|
},
|
||||||
|
Header1 = case fixqos(Type, QoS) of
|
||||||
|
QoS -> Header;
|
||||||
|
FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS}
|
||||||
|
end,
|
||||||
|
parse_remaining_len(Rest, Header1, Options);
|
||||||
|
parse(Bin, Cont) when is_binary(Bin), is_function(Cont) ->
|
||||||
Cont(Bin).
|
Cont(Bin).
|
||||||
|
|
||||||
parse_remaining_len(<<>>, Header, Options) ->
|
parse_remaining_len(<<>>, Header, Options) ->
|
||||||
{ok, ?more(fun(Bin) -> parse_remaining_len(Bin, Header, Options) end)};
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end};
|
||||||
parse_remaining_len(Rest, Header, Options) ->
|
parse_remaining_len(Rest, Header, Options) ->
|
||||||
parse_remaining_len(Rest, Header, 1, 0, Options).
|
parse_remaining_len(Rest, Header, 1, 0, Options).
|
||||||
|
|
||||||
|
@ -96,7 +107,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
|
||||||
when Length > MaxSize ->
|
when Length > MaxSize ->
|
||||||
error(mqtt_frame_too_large);
|
error(mqtt_frame_too_large);
|
||||||
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
|
||||||
{ok, ?more(fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end)};
|
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
|
||||||
%% Match DISCONNECT without payload
|
%% Match DISCONNECT without payload
|
||||||
parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
|
parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
|
||||||
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
|
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
|
||||||
|
@ -132,9 +143,9 @@ parse_frame(Bin, Header, Length, Options) ->
|
||||||
{ok, packet(Header, Variable), Rest, ?none(Options)}
|
{ok, packet(Header, Variable), Rest, ?none(Options)}
|
||||||
end;
|
end;
|
||||||
TooShortBin ->
|
TooShortBin ->
|
||||||
{ok, ?more(fun(BinMore) ->
|
{more, fun(BinMore) ->
|
||||||
parse_frame(<<TooShortBin/binary, BinMore/binary>>, Header, Length, Options)
|
parse_frame(<<TooShortBin/binary, BinMore/binary>>, Header, Length, Options)
|
||||||
end)}
|
end}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
packet(Header) ->
|
packet(Header) ->
|
||||||
|
@ -189,6 +200,7 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
|
||||||
?QOS_0 -> {undefined, Rest};
|
?QOS_0 -> {undefined, Rest};
|
||||||
_ -> parse_packet_id(Rest)
|
_ -> parse_packet_id(Rest)
|
||||||
end,
|
end,
|
||||||
|
(PacketId =/= undefined) andalso validate_packet_id(PacketId),
|
||||||
{Properties, Payload} = parse_properties(Rest1, Ver),
|
{Properties, Payload} = parse_properties(Rest1, Ver),
|
||||||
{#mqtt_packet_publish{topic_name = TopicName,
|
{#mqtt_packet_publish{topic_name = TopicName,
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
|
@ -196,10 +208,12 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big>>, _Options)
|
parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big>>, _Options)
|
||||||
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
||||||
|
ok = validate_packet_id(PacketId),
|
||||||
#mqtt_packet_puback{packet_id = PacketId, reason_code = 0};
|
#mqtt_packet_puback{packet_id = PacketId, reason_code = 0};
|
||||||
parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big, ReasonCode, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big, ReasonCode, Rest/binary>>,
|
||||||
#{version := Ver = ?MQTT_PROTO_V5})
|
#{version := Ver = ?MQTT_PROTO_V5})
|
||||||
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP ->
|
||||||
|
ok = validate_packet_id(PacketId),
|
||||||
{Properties, <<>>} = parse_properties(Rest, Ver),
|
{Properties, <<>>} = parse_properties(Rest, Ver),
|
||||||
#mqtt_packet_puback{packet_id = PacketId,
|
#mqtt_packet_puback{packet_id = PacketId,
|
||||||
reason_code = ReasonCode,
|
reason_code = ReasonCode,
|
||||||
|
@ -207,6 +221,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, <<PacketId:16/big, ReasonCode,
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
||||||
#{version := Ver}) ->
|
#{version := Ver}) ->
|
||||||
|
ok = validate_packet_id(PacketId),
|
||||||
{Properties, Rest1} = parse_properties(Rest, Ver),
|
{Properties, Rest1} = parse_properties(Rest, Ver),
|
||||||
TopicFilters = parse_topic_filters(subscribe, Rest1),
|
TopicFilters = parse_topic_filters(subscribe, Rest1),
|
||||||
#mqtt_packet_subscribe{packet_id = PacketId,
|
#mqtt_packet_subscribe{packet_id = PacketId,
|
||||||
|
@ -215,6 +230,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <<PacketId:16/big, Rest/bin
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?SUBACK}, <<PacketId:16/big, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?SUBACK}, <<PacketId:16/big, Rest/binary>>,
|
||||||
#{version := Ver}) ->
|
#{version := Ver}) ->
|
||||||
|
ok = validate_packet_id(PacketId),
|
||||||
{Properties, Rest1} = parse_properties(Rest, Ver),
|
{Properties, Rest1} = parse_properties(Rest, Ver),
|
||||||
#mqtt_packet_suback{packet_id = PacketId,
|
#mqtt_packet_suback{packet_id = PacketId,
|
||||||
properties = Properties,
|
properties = Properties,
|
||||||
|
@ -222,6 +238,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBACK}, <<PacketId:16/big, Rest/binary
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <<PacketId:16/big, Rest/binary>>,
|
||||||
#{version := Ver}) ->
|
#{version := Ver}) ->
|
||||||
|
ok = validate_packet_id(PacketId),
|
||||||
{Properties, Rest1} = parse_properties(Rest, Ver),
|
{Properties, Rest1} = parse_properties(Rest, Ver),
|
||||||
TopicFilters = parse_topic_filters(unsubscribe, Rest1),
|
TopicFilters = parse_topic_filters(unsubscribe, Rest1),
|
||||||
#mqtt_packet_unsubscribe{packet_id = PacketId,
|
#mqtt_packet_unsubscribe{packet_id = PacketId,
|
||||||
|
@ -229,9 +246,11 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <<PacketId:16/big, Rest/b
|
||||||
topic_filters = TopicFilters};
|
topic_filters = TopicFilters};
|
||||||
|
|
||||||
parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big>>, _Options) ->
|
parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big>>, _Options) ->
|
||||||
|
ok = validate_packet_id(PacketId),
|
||||||
#mqtt_packet_unsuback{packet_id = PacketId};
|
#mqtt_packet_unsuback{packet_id = PacketId};
|
||||||
parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big, Rest/binary>>,
|
parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <<PacketId:16/big, Rest/binary>>,
|
||||||
#{version := Ver}) ->
|
#{version := Ver}) ->
|
||||||
|
ok = validate_packet_id(PacketId),
|
||||||
{Properties, Rest1} = parse_properties(Rest, Ver),
|
{Properties, Rest1} = parse_properties(Rest, Ver),
|
||||||
ReasonCodes = parse_reason_codes(Rest1),
|
ReasonCodes = parse_reason_codes(Rest1),
|
||||||
#mqtt_packet_unsuback{packet_id = PacketId,
|
#mqtt_packet_unsuback{packet_id = PacketId,
|
||||||
|
@ -260,12 +279,12 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
|
||||||
parse_will_message(Packet, Bin) ->
|
parse_will_message(Packet, Bin) ->
|
||||||
{Packet, Bin}.
|
{Packet, Bin}.
|
||||||
|
|
||||||
% protocol_approved(Ver, Name) ->
|
|
||||||
% lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
|
||||||
|
|
||||||
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
|
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
|
||||||
{PacketId, Rest}.
|
{PacketId, Rest}.
|
||||||
|
|
||||||
|
validate_packet_id(0) -> error(bad_packet_id);
|
||||||
|
validate_packet_id(_) -> ok.
|
||||||
|
|
||||||
parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
|
parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 ->
|
||||||
{undefined, Bin};
|
{undefined, Bin};
|
||||||
%% TODO: version mess?
|
%% TODO: version mess?
|
||||||
|
@ -336,7 +355,8 @@ parse_property(<<16#26, Bin/binary>>, Props) ->
|
||||||
{Pair, Rest} = parse_utf8_pair(Bin),
|
{Pair, Rest} = parse_utf8_pair(Bin),
|
||||||
case maps:find('User-Property', Props) of
|
case maps:find('User-Property', Props) of
|
||||||
{ok, UserProps} ->
|
{ok, UserProps} ->
|
||||||
parse_property(Rest,Props#{'User-Property' := [Pair|UserProps]});
|
UserProps1 = lists:append(UserProps, [Pair]),
|
||||||
|
parse_property(Rest, Props#{'User-Property' := UserProps1});
|
||||||
error ->
|
error ->
|
||||||
parse_property(Rest, Props#{'User-Property' => [Pair]})
|
parse_property(Rest, Props#{'User-Property' => [Pair]})
|
||||||
end;
|
end;
|
||||||
|
@ -357,7 +377,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
|
||||||
{Value + Len * Multiplier, Rest}.
|
{Value + Len * Multiplier, Rest}.
|
||||||
|
|
||||||
parse_topic_filters(subscribe, Bin) ->
|
parse_topic_filters(subscribe, Bin) ->
|
||||||
[{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0}}
|
[{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => validate_subqos(QoS), rc => 0}}
|
||||||
|| <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
|
|| <<Len:16/big, Topic:Len/binary, _:2, Rh:2, Rap:1, Nl:1, QoS:2>> <= Bin];
|
||||||
|
|
||||||
parse_topic_filters(unsubscribe, Bin) ->
|
parse_topic_filters(unsubscribe, Bin) ->
|
||||||
|
@ -638,6 +658,30 @@ serialize_variable_byte_integer(N) when N =< ?LOWBITS ->
|
||||||
serialize_variable_byte_integer(N) ->
|
serialize_variable_byte_integer(N) ->
|
||||||
<<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
|
<<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
|
||||||
|
|
||||||
|
%% Validate header if sctrict mode. See: mqtt-v5.0: 2.1.3 Flags
|
||||||
|
validate_header(?CONNECT, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?CONNACK, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PUBLISH, 0, ?QOS_0, _) -> ok;
|
||||||
|
validate_header(?PUBLISH, _, ?QOS_1, _) -> ok;
|
||||||
|
validate_header(?PUBLISH, 0, ?QOS_2, _) -> ok;
|
||||||
|
validate_header(?PUBACK, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PUBREC, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PUBREL, 0, 1, 0) -> ok;
|
||||||
|
validate_header(?PUBCOMP, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?SUBSCRIBE, 0, 1, 0) -> ok;
|
||||||
|
validate_header(?SUBACK, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?UNSUBSCRIBE, 0, 1, 0) -> ok;
|
||||||
|
validate_header(?UNSUBACK, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PINGREQ, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?PINGRESP, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?DISCONNECT, 0, 0, 0) -> ok;
|
||||||
|
validate_header(?AUTH, 0, 0, 0) -> ok;
|
||||||
|
validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header).
|
||||||
|
|
||||||
|
validate_subqos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
|
||||||
|
QoS;
|
||||||
|
validate_subqos(_) -> error(bad_subqos).
|
||||||
|
|
||||||
bool(0) -> false;
|
bool(0) -> false;
|
||||||
bool(1) -> true.
|
bool(1) -> true.
|
||||||
|
|
||||||
|
|
|
@ -338,9 +338,10 @@ handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) ->
|
||||||
process_incoming(<<>>, State) ->
|
process_incoming(<<>>, State) ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
process_incoming(Data, State = #ws_connection{parse_state = ParseState, chan_state = ChanState}) ->
|
process_incoming(Data, State = #ws_connection{parse_state = ParseState,
|
||||||
|
chan_state = ChanState}) ->
|
||||||
try emqx_frame:parse(Data, ParseState) of
|
try emqx_frame:parse(Data, ParseState) of
|
||||||
{ok, NParseState} ->
|
{more, NParseState} ->
|
||||||
{ok, State#ws_connection{parse_state = NParseState}};
|
{ok, State#ws_connection{parse_state = NParseState}};
|
||||||
{ok, Packet, Rest, NParseState} ->
|
{ok, Packet, Rest, NParseState} ->
|
||||||
self() ! {incoming, Packet},
|
self() ! {incoming, Packet},
|
||||||
|
|
|
@ -20,10 +20,17 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
-include_lib("proper/include/proper.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").
|
||||||
|
|
||||||
|
%%-define(PROPTEST(F), ?assert(proper:quickcheck(F()))).
|
||||||
|
-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{to_file, user}]))).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[{group, connect},
|
[{group, parse},
|
||||||
|
{group, connect},
|
||||||
{group, connack},
|
{group, connack},
|
||||||
{group, publish},
|
{group, publish},
|
||||||
{group, puback},
|
{group, puback},
|
||||||
|
@ -36,7 +43,11 @@ all() ->
|
||||||
{group, auth}].
|
{group, auth}].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
[{connect, [parallel],
|
[{parse, [parallel],
|
||||||
|
[t_parse_cont,
|
||||||
|
t_parse_frame_too_large
|
||||||
|
]},
|
||||||
|
{connect, [parallel],
|
||||||
[t_serialize_parse_connect,
|
[t_serialize_parse_connect,
|
||||||
t_serialize_parse_v3_connect,
|
t_serialize_parse_v3_connect,
|
||||||
t_serialize_parse_v4_connect,
|
t_serialize_parse_v4_connect,
|
||||||
|
@ -57,6 +68,7 @@ groups() ->
|
||||||
]},
|
]},
|
||||||
{puback, [parallel],
|
{puback, [parallel],
|
||||||
[t_serialize_parse_puback,
|
[t_serialize_parse_puback,
|
||||||
|
t_serialize_parse_puback_v3_4,
|
||||||
t_serialize_parse_puback_v5,
|
t_serialize_parse_puback_v5,
|
||||||
t_serialize_parse_pubrec,
|
t_serialize_parse_pubrec,
|
||||||
t_serialize_parse_pubrec_v5,
|
t_serialize_parse_pubrec_v5,
|
||||||
|
@ -105,19 +117,48 @@ init_per_group(_Group, Config) ->
|
||||||
end_per_group(_Group, _Config) ->
|
end_per_group(_Group, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_parse_cont(_) ->
|
||||||
|
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}),
|
||||||
|
ParseState = emqx_frame:initial_parse_state(),
|
||||||
|
<<HdrBin:1/binary, LenBin:1/binary, RestBin/binary>> = serialize_to_binary(Packet),
|
||||||
|
{more, ContParse} = emqx_frame:parse(<<>>, ParseState),
|
||||||
|
{more, ContParse1} = emqx_frame:parse(HdrBin, ContParse),
|
||||||
|
{more, ContParse2} = emqx_frame:parse(LenBin, ContParse1),
|
||||||
|
{more, ContParse3} = emqx_frame:parse(<<>>, ContParse2),
|
||||||
|
{ok, Packet, <<>>, _} = emqx_frame:parse(RestBin, ContParse3).
|
||||||
|
|
||||||
|
t_parse_frame_too_large(_) ->
|
||||||
|
Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
|
||||||
|
?catch_error(mqtt_frame_too_large, parse_serialize(Packet, #{max_size => 256})),
|
||||||
|
?catch_error(mqtt_frame_too_large, parse_serialize(Packet, #{max_size => 512})),
|
||||||
|
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
|
||||||
|
|
||||||
t_serialize_parse_connect(_) ->
|
t_serialize_parse_connect(_) ->
|
||||||
Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{}),
|
?PROPTEST(prop_serialize_parse_connect).
|
||||||
?assertEqual(Packet1, parse_serialize(Packet1)),
|
|
||||||
Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{
|
prop_serialize_parse_connect() ->
|
||||||
|
?FORALL(Opts = #{version := ProtoVer}, parse_opts(),
|
||||||
|
begin
|
||||||
|
ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES),
|
||||||
|
DefaultProps = if ProtoVer == ?MQTT_PROTO_V5 ->
|
||||||
|
#{};
|
||||||
|
true -> undefined
|
||||||
|
end,
|
||||||
|
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{
|
||||||
|
proto_name = ProtoName,
|
||||||
|
proto_ver = ProtoVer,
|
||||||
client_id = <<"clientId">>,
|
client_id = <<"clientId">>,
|
||||||
will_qos = ?QOS_1,
|
will_qos = ?QOS_1,
|
||||||
will_flag = true,
|
will_flag = true,
|
||||||
will_retain = true,
|
will_retain = true,
|
||||||
will_topic = <<"will">>,
|
will_topic = <<"will">>,
|
||||||
|
will_props = DefaultProps,
|
||||||
will_payload = <<"bye">>,
|
will_payload = <<"bye">>,
|
||||||
clean_start = true
|
clean_start = true,
|
||||||
|
properties = DefaultProps
|
||||||
}),
|
}),
|
||||||
?assertEqual(Packet2, parse_serialize(Packet2)).
|
ok == ?assertEqual(Packet, parse_serialize(Packet, Opts))
|
||||||
|
end).
|
||||||
|
|
||||||
t_serialize_parse_v3_connect(_) ->
|
t_serialize_parse_v3_connect(_) ->
|
||||||
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
|
||||||
|
@ -130,16 +171,19 @@ t_serialize_parse_v3_connect(_) ->
|
||||||
clean_start = true,
|
clean_start = true,
|
||||||
keepalive = 60
|
keepalive = 60
|
||||||
}),
|
}),
|
||||||
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
|
{ok, Packet, <<>>, PState} = emqx_frame:parse(Bin),
|
||||||
|
?assertMatch({none, #{version := ?MQTT_PROTO_V3}}, PState).
|
||||||
|
|
||||||
t_serialize_parse_v4_connect(_) ->
|
t_serialize_parse_v4_connect(_) ->
|
||||||
Bin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,
|
Bin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117,
|
||||||
98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>,
|
||||||
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = 4,
|
Packet = ?CONNECT_PACKET(
|
||||||
|
#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V4,
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
client_id = <<"mosqpub/10451-iMac.loca">>,
|
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||||
clean_start = true,
|
clean_start = true,
|
||||||
keepalive = 60}),
|
keepalive = 60
|
||||||
|
}),
|
||||||
?assertEqual(Bin, serialize_to_binary(Packet)),
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
|
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
|
||||||
|
|
||||||
|
@ -185,8 +229,7 @@ t_serialize_parse_v5_connect(_) ->
|
||||||
|
|
||||||
t_serialize_parse_connect_without_clientid(_) ->
|
t_serialize_parse_connect_without_clientid(_) ->
|
||||||
Bin = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
Bin = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>,
|
||||||
Packet = ?CONNECT_PACKET(
|
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V4,
|
||||||
#mqtt_packet_connect{proto_ver = 4,
|
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
client_id = <<>>,
|
client_id = <<>>,
|
||||||
clean_start = true,
|
clean_start = true,
|
||||||
|
@ -237,7 +280,9 @@ t_serialize_parse_bridge_connect(_) ->
|
||||||
will_payload = <<"0">>
|
will_payload = <<"0">>
|
||||||
}},
|
}},
|
||||||
?assertEqual(Bin, serialize_to_binary(Packet)),
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
|
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)),
|
||||||
|
Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{is_bridge = true}),
|
||||||
|
?assertEqual(Packet1, parse_serialize(Packet1)).
|
||||||
|
|
||||||
t_serialize_parse_connack(_) ->
|
t_serialize_parse_connack(_) ->
|
||||||
Packet = ?CONNACK_PACKET(?RC_SUCCESS),
|
Packet = ?CONNACK_PACKET(?RC_SUCCESS),
|
||||||
|
@ -275,7 +320,7 @@ t_serialize_parse_qos0_publish(_) ->
|
||||||
packet_id = undefined},
|
packet_id = undefined},
|
||||||
payload = <<"hello">>},
|
payload = <<"hello">>},
|
||||||
?assertEqual(Bin, serialize_to_binary(Packet)),
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
|
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})).
|
||||||
|
|
||||||
t_serialize_parse_qos1_publish(_) ->
|
t_serialize_parse_qos1_publish(_) ->
|
||||||
Bin = <<50,13,0,5,97,47,98,47,99,0,1,104,97,104,97>>,
|
Bin = <<50,13,0,5,97,47,98,47,99,0,1,104,97,104,97>>,
|
||||||
|
@ -287,11 +332,16 @@ t_serialize_parse_qos1_publish(_) ->
|
||||||
packet_id = 1},
|
packet_id = 1},
|
||||||
payload = <<"haha">>},
|
payload = <<"haha">>},
|
||||||
?assertEqual(Bin, serialize_to_binary(Packet)),
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
|
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))).
|
||||||
|
|
||||||
t_serialize_parse_qos2_publish(_) ->
|
t_serialize_parse_qos2_publish(_) ->
|
||||||
Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, payload()),
|
Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>),
|
||||||
?assertEqual(Packet, parse_serialize(Packet)).
|
Bin = <<52,9,0,5,84,111,112,105,99,0,1>>,
|
||||||
|
?assertEqual(Packet, parse_serialize(Packet)),
|
||||||
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
|
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))).
|
||||||
|
|
||||||
t_serialize_parse_publish_v5(_) ->
|
t_serialize_parse_publish_v5(_) ->
|
||||||
Props = #{'Payload-Format-Indicator' => 1,
|
Props = #{'Payload-Format-Indicator' => 1,
|
||||||
|
@ -307,7 +357,16 @@ t_serialize_parse_publish_v5(_) ->
|
||||||
t_serialize_parse_puback(_) ->
|
t_serialize_parse_puback(_) ->
|
||||||
Packet = ?PUBACK_PACKET(1),
|
Packet = ?PUBACK_PACKET(1),
|
||||||
?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)),
|
?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)),
|
||||||
?assertEqual(Packet, parse_serialize(Packet)).
|
?assertEqual(Packet, parse_serialize(Packet)),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))).
|
||||||
|
|
||||||
|
t_serialize_parse_puback_v3_4(_) ->
|
||||||
|
Bin = <<64,2,0,1>>,
|
||||||
|
Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, variable = 1},
|
||||||
|
?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V3)),
|
||||||
|
?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V4)),
|
||||||
|
?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V3})),
|
||||||
|
?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V4})).
|
||||||
|
|
||||||
t_serialize_parse_puback_v5(_) ->
|
t_serialize_parse_puback_v5(_) ->
|
||||||
Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
@ -316,7 +375,8 @@ t_serialize_parse_puback_v5(_) ->
|
||||||
t_serialize_parse_pubrec(_) ->
|
t_serialize_parse_pubrec(_) ->
|
||||||
Packet = ?PUBREC_PACKET(1),
|
Packet = ?PUBREC_PACKET(1),
|
||||||
?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)),
|
?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)),
|
||||||
?assertEqual(Packet, parse_serialize(Packet)).
|
?assertEqual(Packet, parse_serialize(Packet)),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))).
|
||||||
|
|
||||||
t_serialize_parse_pubrec_v5(_) ->
|
t_serialize_parse_pubrec_v5(_) ->
|
||||||
Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
@ -326,7 +386,12 @@ t_serialize_parse_pubrel(_) ->
|
||||||
Packet = ?PUBREL_PACKET(1),
|
Packet = ?PUBREL_PACKET(1),
|
||||||
Bin = serialize_to_binary(Packet),
|
Bin = serialize_to_binary(Packet),
|
||||||
?assertEqual(<<6:4,2:4,2,0,1>>, Bin),
|
?assertEqual(<<6:4,2:4,2,0,1>>, Bin),
|
||||||
?assertEqual(Packet, parse_serialize(Packet)).
|
?assertEqual(Packet, parse_serialize(Packet)),
|
||||||
|
%% PUBREL with bad qos 0
|
||||||
|
Bin0 = <<6:4,0:4,2,0,1>>,
|
||||||
|
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
||||||
|
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))).
|
||||||
|
|
||||||
t_serialize_parse_pubrel_v5(_) ->
|
t_serialize_parse_pubrel_v5(_) ->
|
||||||
Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
@ -336,7 +401,8 @@ t_serialize_parse_pubcomp(_) ->
|
||||||
Packet = ?PUBCOMP_PACKET(1),
|
Packet = ?PUBCOMP_PACKET(1),
|
||||||
Bin = serialize_to_binary(Packet),
|
Bin = serialize_to_binary(Packet),
|
||||||
?assertEqual(<<7:4,0:4,2,0,1>>, Bin),
|
?assertEqual(<<7:4,0:4,2,0,1>>, Bin),
|
||||||
?assertEqual(Packet, parse_serialize(Packet)).
|
?assertEqual(Packet, parse_serialize(Packet)),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))).
|
||||||
|
|
||||||
t_serialize_parse_pubcomp_v5(_) ->
|
t_serialize_parse_pubcomp_v5(_) ->
|
||||||
Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
|
||||||
|
@ -344,13 +410,18 @@ t_serialize_parse_pubcomp_v5(_) ->
|
||||||
|
|
||||||
t_serialize_parse_subscribe(_) ->
|
t_serialize_parse_subscribe(_) ->
|
||||||
%% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
|
%% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
|
||||||
Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>,
|
Bin = <<?SUBSCRIBE:4,2:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
|
||||||
TopicOpts = #{nl => 0 , rap => 0, rc => 0, rh => 0, qos => 2},
|
TopicOpts = #{nl => 0 , rap => 0, rc => 0, rh => 0, qos => 2},
|
||||||
TopicFilters = [{<<"TopicA">>, TopicOpts}],
|
TopicFilters = [{<<"TopicA">>, TopicOpts}],
|
||||||
Packet = ?SUBSCRIBE_PACKET(2, TopicFilters),
|
Packet = ?SUBSCRIBE_PACKET(2, TopicFilters),
|
||||||
?assertEqual(Bin, serialize_to_binary(Packet)),
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
%%ct:log("Bin: ~p, Packet: ~p ~n", [Packet, parse(Bin)]),
|
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
|
||||||
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
|
%% SUBSCRIBE with bad qos 0
|
||||||
|
Bin0 = <<?SUBSCRIBE:4,0:4,11,0,2,0,6,84,111,112,105,99,65,2>>,
|
||||||
|
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
||||||
|
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))),
|
||||||
|
?catch_error(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))).
|
||||||
|
|
||||||
t_serialize_parse_subscribe_v5(_) ->
|
t_serialize_parse_subscribe_v5(_) ->
|
||||||
TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}},
|
TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}},
|
||||||
|
@ -360,7 +431,8 @@ t_serialize_parse_subscribe_v5(_) ->
|
||||||
|
|
||||||
t_serialize_parse_suback(_) ->
|
t_serialize_parse_suback(_) ->
|
||||||
Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]),
|
Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]),
|
||||||
?assertEqual(Packet, parse_serialize(Packet)).
|
?assertEqual(Packet, parse_serialize(Packet)),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))).
|
||||||
|
|
||||||
t_serialize_parse_suback_v5(_) ->
|
t_serialize_parse_suback_v5(_) ->
|
||||||
Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>,
|
Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>,
|
||||||
|
@ -369,11 +441,17 @@ t_serialize_parse_suback_v5(_) ->
|
||||||
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
||||||
|
|
||||||
t_serialize_parse_unsubscribe(_) ->
|
t_serialize_parse_unsubscribe(_) ->
|
||||||
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
|
%% UNSUBSCRIBE(Q1, R1, D0, PacketId=2, TopicTable=[<<"TopicA">>])
|
||||||
|
Bin = <<?UNSUBSCRIBE:4,2:4,10,0,2,0,6,84,111,112,105,99,65>>,
|
||||||
Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]),
|
Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]),
|
||||||
Bin = <<162,10,0,2,0,6,84,111,112,105,99,65>>,
|
|
||||||
?assertEqual(Bin, serialize_to_binary(Packet)),
|
?assertEqual(Bin, serialize_to_binary(Packet)),
|
||||||
?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)).
|
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
|
||||||
|
%% UNSUBSCRIBE with bad qos
|
||||||
|
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
|
||||||
|
Bin0 = <<?UNSUBSCRIBE:4,0:4,10,0,2,0,6,84,111,112,105,99,65>>,
|
||||||
|
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
|
||||||
|
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
|
||||||
|
?catch_error(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))).
|
||||||
|
|
||||||
t_serialize_parse_unsubscribe_v5(_) ->
|
t_serialize_parse_unsubscribe_v5(_) ->
|
||||||
Props = #{'User-Property' => [{<<"key">>, <<"val">>}]},
|
Props = #{'User-Property' => [{<<"key">>, <<"val">>}]},
|
||||||
|
@ -419,12 +497,18 @@ t_serialize_parse_auth_v5(_) ->
|
||||||
#{'Authentication-Method' => <<"oauth2">>,
|
#{'Authentication-Method' => <<"oauth2">>,
|
||||||
'Authentication-Data' => <<"3zekkd">>,
|
'Authentication-Data' => <<"3zekkd">>,
|
||||||
'Reason-String' => <<"success">>,
|
'Reason-String' => <<"success">>,
|
||||||
'User-Property' => [{<<"key">>, <<"val">>}]
|
'User-Property' => [{<<"key1">>, <<"val1">>},
|
||||||
|
{<<"key2">>, <<"val2">>}]
|
||||||
}),
|
}),
|
||||||
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).
|
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})),
|
||||||
|
?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5,
|
||||||
|
strict_mode => true})).
|
||||||
|
|
||||||
|
parse_opts() ->
|
||||||
|
?LET(PropList, [{strict_mode, boolean()}, {version, range(4,5)}], maps:from_list(PropList)).
|
||||||
|
|
||||||
parse_serialize(Packet) ->
|
parse_serialize(Packet) ->
|
||||||
parse_serialize(Packet, #{}).
|
parse_serialize(Packet, #{strict_mode => true}).
|
||||||
|
|
||||||
parse_serialize(Packet, Opts) when is_map(Opts) ->
|
parse_serialize(Packet, Opts) when is_map(Opts) ->
|
||||||
Ver = maps:get(version, Opts, ?MQTT_PROTO_V4),
|
Ver = maps:get(version, Opts, ?MQTT_PROTO_V4),
|
||||||
|
@ -436,6 +520,13 @@ parse_serialize(Packet, Opts) when is_map(Opts) ->
|
||||||
serialize_to_binary(Packet) ->
|
serialize_to_binary(Packet) ->
|
||||||
iolist_to_binary(emqx_frame:serialize(Packet)).
|
iolist_to_binary(emqx_frame:serialize(Packet)).
|
||||||
|
|
||||||
payload() ->
|
serialize_to_binary(Packet, Ver) ->
|
||||||
iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]).
|
iolist_to_binary(emqx_frame:serialize(Packet, Ver)).
|
||||||
|
|
||||||
|
parse_to_packet(Bin, Opts) ->
|
||||||
|
PState = emqx_frame:initial_parse_state(Opts),
|
||||||
|
{ok, Packet, <<>>, _} = emqx_frame:parse(Bin, PState),
|
||||||
|
Packet.
|
||||||
|
|
||||||
|
payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue