diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index b840cad48..47acdb8fe 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -408,9 +408,10 @@ process_incoming(Data, State) -> process_incoming(<<>>, Packets, State) -> {keep_state, State, next_incoming_events(Packets)}; -process_incoming(Data, Packets, State = #connection{parse_state = ParseState, chan_state = ChanState}) -> +process_incoming(Data, Packets, State = #connection{parse_state = ParseState, + chan_state = ChanState}) -> try emqx_frame:parse(Data, ParseState) of - {ok, NParseState} -> + {more, NParseState} -> NState = State#connection{parse_state = NParseState}, {keep_state, NState, next_incoming_events(Packets)}; {ok, Packet, Rest, NParseState} -> diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index c0115cea8..9795e60a3 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -34,22 +34,24 @@ , parse_result/0 ]). --type(options() :: #{max_size => 1..?MAX_PACKET_SIZE, - version => emqx_types:version() +-type(options() :: #{strict_mode => boolean(), + max_size => 1..?MAX_PACKET_SIZE, + version => emqx_types:version() }). --opaque(parse_state() :: {none, options()} | {more, cont_fun()}). +-opaque(parse_state() :: {none, options()} | cont_fun()). --opaque(parse_result() :: {ok, parse_state()} +-opaque(parse_result() :: {more, cont_fun()} | {ok, emqx_types:packet(), binary(), parse_state()}). -type(cont_fun() :: fun((binary()) -> parse_result())). -define(none(Opts), {none, Opts}). --define(more(Cont), {more, Cont}). + -define(DEFAULT_OPTIONS, - #{max_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4 + #{strict_mode => false, + max_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4 }). %%-------------------------------------------------------------------- @@ -78,17 +80,26 @@ parse(Bin) -> -spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> - {ok, ?more(fun(Bin) -> parse(Bin, {none, Options}) end)}; -parse(<>, {none, Options}) -> - parse_remaining_len(Rest, #mqtt_packet_header{type = Type, - dup = bool(Dup), - qos = fixqos(Type, QoS), - retain = bool(Retain)}, Options); -parse(Bin, {more, Cont}) when is_binary(Bin), is_function(Cont) -> + {more, fun(Bin) -> parse(Bin, {none, Options}) end}; +parse(<>, + {none, Options = #{strict_mode := StrictMode}}) -> + %% Validate header if strict mode. + StrictMode andalso validate_header(Type, Dup, QoS, Retain), + Header = #mqtt_packet_header{type = Type, + dup = bool(Dup), + qos = QoS, + retain = bool(Retain) + }, + Header1 = case fixqos(Type, QoS) of + QoS -> Header; + FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} + end, + parse_remaining_len(Rest, Header1, Options); +parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> Cont(Bin). parse_remaining_len(<<>>, Header, Options) -> - {ok, ?more(fun(Bin) -> parse_remaining_len(Bin, Header, Options) end)}; + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(Rest, Header, 1, 0, Options). @@ -96,7 +107,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> error(mqtt_frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> - {ok, ?more(fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end)}; + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; %% Match DISCONNECT without payload parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), @@ -132,9 +143,9 @@ parse_frame(Bin, Header, Length, Options) -> {ok, packet(Header, Variable), Rest, ?none(Options)} end; TooShortBin -> - {ok, ?more(fun(BinMore) -> - parse_frame(<>, Header, Length, Options) - end)} + {more, fun(BinMore) -> + parse_frame(<>, Header, Length, Options) + end} end. packet(Header) -> @@ -189,6 +200,7 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) end, + (PacketId =/= undefined) andalso validate_packet_id(PacketId), {Properties, Payload} = parse_properties(Rest1, Ver), {#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, @@ -196,10 +208,12 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, parse_packet(#mqtt_packet_header{type = PubAck}, <>, _Options) when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> + ok = validate_packet_id(PacketId), #mqtt_packet_puback{packet_id = PacketId, reason_code = 0}; parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{version := Ver = ?MQTT_PROTO_V5}) when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> + ok = validate_packet_id(PacketId), {Properties, <<>>} = parse_properties(Rest, Ver), #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, @@ -207,6 +221,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{version := Ver}) -> + ok = validate_packet_id(PacketId), {Properties, Rest1} = parse_properties(Rest, Ver), TopicFilters = parse_topic_filters(subscribe, Rest1), #mqtt_packet_subscribe{packet_id = PacketId, @@ -215,6 +230,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <>, #{version := Ver}) -> + ok = validate_packet_id(PacketId), {Properties, Rest1} = parse_properties(Rest, Ver), #mqtt_packet_suback{packet_id = PacketId, properties = Properties, @@ -222,6 +238,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBACK}, <>, #{version := Ver}) -> + ok = validate_packet_id(PacketId), {Properties, Rest1} = parse_properties(Rest, Ver), TopicFilters = parse_topic_filters(unsubscribe, Rest1), #mqtt_packet_unsubscribe{packet_id = PacketId, @@ -229,9 +246,11 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <>, _Options) -> + ok = validate_packet_id(PacketId), #mqtt_packet_unsuback{packet_id = PacketId}; parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, #{version := Ver}) -> + ok = validate_packet_id(PacketId), {Properties, Rest1} = parse_properties(Rest, Ver), ReasonCodes = parse_reason_codes(Rest1), #mqtt_packet_unsuback{packet_id = PacketId, @@ -260,12 +279,12 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, parse_will_message(Packet, Bin) -> {Packet, Bin}. -% protocol_approved(Ver, Name) -> -% lists:member({Ver, Name}, ?PROTOCOL_NAMES). - parse_packet_id(<>) -> {PacketId, Rest}. +validate_packet_id(0) -> error(bad_packet_id); +validate_packet_id(_) -> ok. + parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> {undefined, Bin}; %% TODO: version mess? @@ -336,7 +355,8 @@ parse_property(<<16#26, Bin/binary>>, Props) -> {Pair, Rest} = parse_utf8_pair(Bin), case maps:find('User-Property', Props) of {ok, UserProps} -> - parse_property(Rest,Props#{'User-Property' := [Pair|UserProps]}); + UserProps1 = lists:append(UserProps, [Pair]), + parse_property(Rest, Props#{'User-Property' := UserProps1}); error -> parse_property(Rest, Props#{'User-Property' => [Pair]}) end; @@ -357,7 +377,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> {Value + Len * Multiplier, Rest}. parse_topic_filters(subscribe, Bin) -> - [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0}} + [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => validate_subqos(QoS), rc => 0}} || <> <= Bin]; parse_topic_filters(unsubscribe, Bin) -> @@ -638,6 +658,30 @@ serialize_variable_byte_integer(N) when N =< ?LOWBITS -> serialize_variable_byte_integer(N) -> <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. +%% Validate header if sctrict mode. See: mqtt-v5.0: 2.1.3 Flags +validate_header(?CONNECT, 0, 0, 0) -> ok; +validate_header(?CONNACK, 0, 0, 0) -> ok; +validate_header(?PUBLISH, 0, ?QOS_0, _) -> ok; +validate_header(?PUBLISH, _, ?QOS_1, _) -> ok; +validate_header(?PUBLISH, 0, ?QOS_2, _) -> ok; +validate_header(?PUBACK, 0, 0, 0) -> ok; +validate_header(?PUBREC, 0, 0, 0) -> ok; +validate_header(?PUBREL, 0, 1, 0) -> ok; +validate_header(?PUBCOMP, 0, 0, 0) -> ok; +validate_header(?SUBSCRIBE, 0, 1, 0) -> ok; +validate_header(?SUBACK, 0, 0, 0) -> ok; +validate_header(?UNSUBSCRIBE, 0, 1, 0) -> ok; +validate_header(?UNSUBACK, 0, 0, 0) -> ok; +validate_header(?PINGREQ, 0, 0, 0) -> ok; +validate_header(?PINGRESP, 0, 0, 0) -> ok; +validate_header(?DISCONNECT, 0, 0, 0) -> ok; +validate_header(?AUTH, 0, 0, 0) -> ok; +validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header). + +validate_subqos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> + QoS; +validate_subqos(_) -> error(bad_subqos). + bool(0) -> false; bool(1) -> true. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 58c989453..ff0f618ab 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -338,9 +338,10 @@ handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) -> process_incoming(<<>>, State) -> {ok, State}; -process_incoming(Data, State = #ws_connection{parse_state = ParseState, chan_state = ChanState}) -> +process_incoming(Data, State = #ws_connection{parse_state = ParseState, + chan_state = ChanState}) -> try emqx_frame:parse(Data, ParseState) of - {ok, NParseState} -> + {more, NParseState} -> {ok, State#ws_connection{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> self() ! {incoming, Packet}, diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index cde76bfb3..e1991c359 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -20,10 +20,17 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("emqx_ct_helpers/include/emqx_ct.hrl"). + +%%-define(PROPTEST(F), ?assert(proper:quickcheck(F()))). +-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{to_file, user}]))). all() -> - [{group, connect}, + [{group, parse}, + {group, connect}, {group, connack}, {group, publish}, {group, puback}, @@ -36,7 +43,11 @@ all() -> {group, auth}]. groups() -> - [{connect, [parallel], + [{parse, [parallel], + [t_parse_cont, + t_parse_frame_too_large + ]}, + {connect, [parallel], [t_serialize_parse_connect, t_serialize_parse_v3_connect, t_serialize_parse_v4_connect, @@ -57,6 +68,7 @@ groups() -> ]}, {puback, [parallel], [t_serialize_parse_puback, + t_serialize_parse_puback_v3_4, t_serialize_parse_puback_v5, t_serialize_parse_pubrec, t_serialize_parse_pubrec_v5, @@ -105,19 +117,48 @@ init_per_group(_Group, Config) -> end_per_group(_Group, _Config) -> ok. +t_parse_cont(_) -> + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}), + ParseState = emqx_frame:initial_parse_state(), + <> = serialize_to_binary(Packet), + {more, ContParse} = emqx_frame:parse(<<>>, ParseState), + {more, ContParse1} = emqx_frame:parse(HdrBin, ContParse), + {more, ContParse2} = emqx_frame:parse(LenBin, ContParse1), + {more, ContParse3} = emqx_frame:parse(<<>>, ContParse2), + {ok, Packet, <<>>, _} = emqx_frame:parse(RestBin, ContParse3). + +t_parse_frame_too_large(_) -> + Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)), + ?catch_error(mqtt_frame_too_large, parse_serialize(Packet, #{max_size => 256})), + ?catch_error(mqtt_frame_too_large, parse_serialize(Packet, #{max_size => 512})), + ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). + t_serialize_parse_connect(_) -> - Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{}), - ?assertEqual(Packet1, parse_serialize(Packet1)), - Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{ - client_id = <<"clientId">>, - will_qos = ?QOS_1, - will_flag = true, - will_retain = true, - will_topic = <<"will">>, - will_payload = <<"bye">>, - clean_start = true - }), - ?assertEqual(Packet2, parse_serialize(Packet2)). + ?PROPTEST(prop_serialize_parse_connect). + +prop_serialize_parse_connect() -> + ?FORALL(Opts = #{version := ProtoVer}, parse_opts(), + begin + ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES), + DefaultProps = if ProtoVer == ?MQTT_PROTO_V5 -> + #{}; + true -> undefined + end, + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_name = ProtoName, + proto_ver = ProtoVer, + client_id = <<"clientId">>, + will_qos = ?QOS_1, + will_flag = true, + will_retain = true, + will_topic = <<"will">>, + will_props = DefaultProps, + will_payload = <<"bye">>, + clean_start = true, + properties = DefaultProps + }), + ok == ?assertEqual(Packet, parse_serialize(Packet, Opts)) + end). t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, @@ -130,16 +171,19 @@ t_serialize_parse_v3_connect(_) -> clean_start = true, keepalive = 60 }), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + {ok, Packet, <<>>, PState} = emqx_frame:parse(Bin), + ?assertMatch({none, #{version := ?MQTT_PROTO_V3}}, PState). t_serialize_parse_v4_connect(_) -> Bin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, - Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = 4, - proto_name = <<"MQTT">>, - client_id = <<"mosqpub/10451-iMac.loca">>, - clean_start = true, - keepalive = 60}), + Packet = ?CONNECT_PACKET( + #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<"mosqpub/10451-iMac.loca">>, + clean_start = true, + keepalive = 60 + }), ?assertEqual(Bin, serialize_to_binary(Packet)), ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). @@ -185,13 +229,12 @@ t_serialize_parse_v5_connect(_) -> t_serialize_parse_connect_without_clientid(_) -> Bin = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, - Packet = ?CONNECT_PACKET( - #mqtt_packet_connect{proto_ver = 4, - proto_name = <<"MQTT">>, - client_id = <<>>, - clean_start = true, - keepalive = 60 - }), + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + clean_start = true, + keepalive = 60 + }), ?assertEqual(Bin, serialize_to_binary(Packet)), ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). @@ -237,7 +280,9 @@ t_serialize_parse_bridge_connect(_) -> will_payload = <<"0">> }}, ?assertEqual(Bin, serialize_to_binary(Packet)), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)), + Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{is_bridge = true}), + ?assertEqual(Packet1, parse_serialize(Packet1)). t_serialize_parse_connack(_) -> Packet = ?CONNACK_PACKET(?RC_SUCCESS), @@ -275,7 +320,7 @@ t_serialize_parse_qos0_publish(_) -> packet_id = undefined}, payload = <<"hello">>}, ?assertEqual(Bin, serialize_to_binary(Packet)), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})). t_serialize_parse_qos1_publish(_) -> Bin = <<50,13,0,5,97,47,98,47,99,0,1,104,97,104,97>>, @@ -287,11 +332,16 @@ t_serialize_parse_qos1_publish(_) -> packet_id = 1}, payload = <<"haha">>}, ?assertEqual(Bin, serialize_to_binary(Packet)), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))). t_serialize_parse_qos2_publish(_) -> - Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, payload()), - ?assertEqual(Packet, parse_serialize(Packet)). + Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>), + Bin = <<52,9,0,5,84,111,112,105,99,0,1>>, + ?assertEqual(Packet, parse_serialize(Packet)), + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))). t_serialize_parse_publish_v5(_) -> Props = #{'Payload-Format-Indicator' => 1, @@ -307,7 +357,16 @@ t_serialize_parse_publish_v5(_) -> t_serialize_parse_puback(_) -> Packet = ?PUBACK_PACKET(1), ?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + ?catch_error(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))). + +t_serialize_parse_puback_v3_4(_) -> + Bin = <<64,2,0,1>>, + Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, variable = 1}, + ?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V3)), + ?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V4)), + ?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V3})), + ?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V4})). t_serialize_parse_puback_v5(_) -> Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -316,7 +375,8 @@ t_serialize_parse_puback_v5(_) -> t_serialize_parse_pubrec(_) -> Packet = ?PUBREC_PACKET(1), ?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + ?catch_error(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))). t_serialize_parse_pubrec_v5(_) -> Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -326,7 +386,12 @@ t_serialize_parse_pubrel(_) -> Packet = ?PUBREL_PACKET(1), Bin = serialize_to_binary(Packet), ?assertEqual(<<6:4,2:4,2,0,1>>, Bin), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + %% PUBREL with bad qos 0 + Bin0 = <<6:4,0:4,2,0,1>>, + ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), + ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))). t_serialize_parse_pubrel_v5(_) -> Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -336,7 +401,8 @@ t_serialize_parse_pubcomp(_) -> Packet = ?PUBCOMP_PACKET(1), Bin = serialize_to_binary(Packet), ?assertEqual(<<7:4,0:4,2,0,1>>, Bin), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + ?catch_error(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))). t_serialize_parse_pubcomp_v5(_) -> Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -344,13 +410,18 @@ t_serialize_parse_pubcomp_v5(_) -> t_serialize_parse_subscribe(_) -> %% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}]) - Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>, + Bin = <>, TopicOpts = #{nl => 0 , rap => 0, rc => 0, rh => 0, qos => 2}, TopicFilters = [{<<"TopicA">>, TopicOpts}], Packet = ?SUBSCRIBE_PACKET(2, TopicFilters), ?assertEqual(Bin, serialize_to_binary(Packet)), - %%ct:log("Bin: ~p, Packet: ~p ~n", [Packet, parse(Bin)]), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), + %% SUBSCRIBE with bad qos 0 + Bin0 = <>, + ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), + ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))), + ?catch_error(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))). t_serialize_parse_subscribe_v5(_) -> TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}}, @@ -360,7 +431,8 @@ t_serialize_parse_subscribe_v5(_) -> t_serialize_parse_suback(_) -> Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + ?catch_error(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))). t_serialize_parse_suback_v5(_) -> Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>, @@ -369,11 +441,17 @@ t_serialize_parse_suback_v5(_) -> ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). t_serialize_parse_unsubscribe(_) -> - %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) + %% UNSUBSCRIBE(Q1, R1, D0, PacketId=2, TopicTable=[<<"TopicA">>]) + Bin = <>, Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]), - Bin = <<162,10,0,2,0,6,84,111,112,105,99,65>>, ?assertEqual(Bin, serialize_to_binary(Packet)), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), + %% UNSUBSCRIBE with bad qos + %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) + Bin0 = <>, + ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), + ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))). t_serialize_parse_unsubscribe_v5(_) -> Props = #{'User-Property' => [{<<"key">>, <<"val">>}]}, @@ -419,12 +497,18 @@ t_serialize_parse_auth_v5(_) -> #{'Authentication-Method' => <<"oauth2">>, 'Authentication-Data' => <<"3zekkd">>, 'Reason-String' => <<"success">>, - 'User-Property' => [{<<"key">>, <<"val">>}] + 'User-Property' => [{<<"key1">>, <<"val1">>}, + {<<"key2">>, <<"val2">>}] }), - ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})), + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5, + strict_mode => true})). + +parse_opts() -> + ?LET(PropList, [{strict_mode, boolean()}, {version, range(4,5)}], maps:from_list(PropList)). parse_serialize(Packet) -> - parse_serialize(Packet, #{}). + parse_serialize(Packet, #{strict_mode => true}). parse_serialize(Packet, Opts) when is_map(Opts) -> Ver = maps:get(version, Opts, ?MQTT_PROTO_V4), @@ -436,6 +520,13 @@ parse_serialize(Packet, Opts) when is_map(Opts) -> serialize_to_binary(Packet) -> iolist_to_binary(emqx_frame:serialize(Packet)). -payload() -> - iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]). +serialize_to_binary(Packet, Ver) -> + iolist_to_binary(emqx_frame:serialize(Packet, Ver)). + +parse_to_packet(Bin, Opts) -> + PState = emqx_frame:initial_parse_state(Opts), + {ok, Packet, <<>>, _} = emqx_frame:parse(Bin, PState), + Packet. + +payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).