diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index 5dd9a317c..ae13fcf14 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -542,4 +542,9 @@ -define(SHARE(Group, Topic), emqx_topic:join([<>, Group, Topic])). -define(IS_SHARE(Topic), case Topic of <> -> true; _ -> false end). +-define(FRAME_PARSE_ERROR(Reason), {frame_parse_error, Reason}). +-define(FRAME_SERIALIZE_ERROR(Reason), {frame_serialize_error, Reason}). +-define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_PARSE_ERROR(Reason))). +-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw(?FRAME_SERIALIZE_ERROR(Reason))). + -endif. diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index 26eb346a4..8d0e74313 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -644,10 +644,21 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, [Packet|Packets], NState) catch - error:Reason:Stk -> - ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", - [Reason, Stk, Data]), - {[{frame_error, Reason}|Packets], State} + throw : ?FRAME_PARSE_ERROR(Reason) -> + ?SLOG(info, #{ reason => Reason + , at_state => emqx_frame:describe_state(ParseState) + , input_bytes => Data + , parsed_packets => Packets + }), + {[{frame_error, Reason} | Packets], State}; + error : Reason : Stacktrace -> + ?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState) + , input_bytes => Data + , parsed_packets => Packets + , exception => Reason + , stacktrace => Stacktrace + }), + {[{frame_error, Reason} | Packets], State} end. -compile({inline, [next_incoming_msgs/1]}). @@ -696,7 +707,7 @@ handle_outgoing(Packet, State) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case emqx_frame:serialize_pkt(Packet, Serialize) of + try emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), @@ -705,6 +716,17 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), Data + catch + %% Maybe Never happen. + throw : ?FRAME_SERIALIZE_ERROR(Reason) -> + ?SLOG(info, #{ reason => Reason + , input_packet => Packet}), + erlang:error(?FRAME_SERIALIZE_ERROR(Reason)); + error : Reason : Stacktrace -> + ?SLOG(error, #{ input_packet => Packet + , exception => Reason + , stacktrace => Stacktrace}), + erlang:error(frame_serialize_error) end end. diff --git a/apps/emqx/src/emqx_frame.erl b/apps/emqx/src/emqx_frame.erl index cea94eec8..2fe1b6d1a 100644 --- a/apps/emqx/src/emqx_frame.erl +++ b/apps/emqx/src/emqx_frame.erl @@ -34,6 +34,10 @@ , serialize/2 ]). + +-export([ describe_state/1 + ]). + -export_type([ options/0 , parse_state/0 , parse_result/0 @@ -47,7 +51,9 @@ version => emqx_types:proto_ver() }). --type(parse_state() :: {none, options()} | {cont_state(), options()}). +-define(NONE(Options), {none, Options}). + +-type(parse_state() :: ?NONE(options()) | {cont_state(), options()}). -type(parse_result() :: {more, parse_state()} | {ok, emqx_types:packet(), binary(), parse_state()}). @@ -61,27 +67,45 @@ -type(serialize_opts() :: options()). --define(none(Options), {none, Options}). - -define(DEFAULT_OPTIONS, #{strict_mode => false, max_size => ?MAX_PACKET_SIZE, version => ?MQTT_PROTO_V4 }). +-define(PARSE_ERR(Reason), ?THROW_FRAME_ERROR(Reason)). +-define(SERIALIZE_ERR(Reason), ?THROW_SERIALIZE_ERROR(Reason)). + +-define(MULTIPLIER_MAX, 16#200000). + -dialyzer({no_match, [serialize_utf8_string/2]}). +%% @doc Describe state for logging. +describe_state(?NONE(_Opts)) -> <<"clean">>; +describe_state({{len, _}, _Opts}) -> <<"parsing_varint_length">>; +describe_state({{body, State}, _Opts}) -> + #{ hdr := Hdr + , len := Len + } = State, + Desc = #{ parsed_header => Hdr + , expected_bytes => Len + }, + case maps:get(rest, State, undefined) of + undefined -> Desc; + Body -> Desc#{received_bytes => body_bytes(Body)} + end. + %%-------------------------------------------------------------------- %% Init Parse State %%-------------------------------------------------------------------- --spec(initial_parse_state() -> {none, options()}). +-spec(initial_parse_state() -> ?NONE(options())). initial_parse_state() -> initial_parse_state(#{}). --spec(initial_parse_state(options()) -> {none, options()}). +-spec(initial_parse_state(options()) -> ?NONE(options())). initial_parse_state(Options) when is_map(Options) -> - ?none(maps:merge(?DEFAULT_OPTIONS, Options)). + ?NONE(maps:merge(?DEFAULT_OPTIONS, Options)). %%-------------------------------------------------------------------- %% Parse MQTT Frame @@ -92,10 +116,10 @@ parse(Bin) -> parse(Bin, initial_parse_state()). -spec(parse(binary(), parse_state()) -> parse_result()). -parse(<<>>, {none, Options}) -> - {more, {none, Options}}; +parse(<<>>, ?NONE(Options)) -> + {more, ?NONE(Options)}; parse(<>, - {none, Options = #{strict_mode := StrictMode}}) -> + ?NONE(Options = #{strict_mode := StrictMode})) -> %% Validate header if strict mode. StrictMode andalso validate_header(Type, Dup, QoS, Retain), Header = #mqtt_packet_header{type = Type, @@ -123,14 +147,14 @@ parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> - error(frame_too_large); + ?PARSE_ERR(frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; %% Match DISCONNECT without payload parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), - {ok, Packet, Rest, ?none(Options)}; + {ok, Packet, Rest, ?NONE(Options)}; %% Match PINGREQ. parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 0, Options); @@ -138,21 +162,22 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> parse_frame(Rest, Header, 2, Options); parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options) - when Multiplier > 2097152 -> - error(malformed_variable_byte_integer); + when Multiplier > ?MULTIPLIER_MAX -> + ?PARSE_ERR(malformed_variable_byte_integer); parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options = #{max_size := MaxSize}) -> FrameLen = Value + Len * Multiplier, case FrameLen > MaxSize of - true -> error(frame_too_large); + true -> ?PARSE_ERR(frame_too_large); false -> parse_frame(Rest, Header, FrameLen, Options) end. body_bytes(B) when is_binary(B) -> size(B); body_bytes(?Q(Bytes, _)) -> Bytes. +append_body(H, <<>>) -> H; append_body(H, T) when is_binary(H) andalso size(H) < 1024 -> <>; append_body(H, T) when is_binary(H) -> @@ -165,18 +190,18 @@ flatten_body(Body) when is_binary(Body) -> Body; flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)). parse_frame(Body, Header, 0, Options) -> - {ok, packet(Header), flatten_body(Body), ?none(Options)}; + {ok, packet(Header), flatten_body(Body), ?NONE(Options)}; parse_frame(Body, Header, Length, Options) -> case body_bytes(Body) >= Length of true -> <> = flatten_body(Body), case parse_packet(Header, FrameBin, Options) of {Variable, Payload} -> - {ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; + {ok, packet(Header, Variable, Payload), Rest, ?NONE(Options)}; Variable = #mqtt_packet_connect{proto_ver = Ver} -> - {ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; + {ok, packet(Header, Variable), Rest, ?NONE(Options#{version := Ver})}; Variable -> - {ok, packet(Header, Variable), Rest, ?none(Options)} + {ok, packet(Header, Variable), Rest, ?NONE(Options)} end; false -> {more, {{body, #{hdr => Header, @@ -420,10 +445,16 @@ parse_property(<<16#28, Val, Bin/binary>>, Props) -> parse_property(<<16#29, Val, Bin/binary>>, Props) -> parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}); parse_property(<<16#2A, Val, Bin/binary>>, Props) -> - parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}). + parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}); +parse_property(<>, _Props) -> + ?PARSE_ERR(#{invalid_property_code => Property}). +%% TODO: invalid property in specific packet. parse_variable_byte_integer(Bin) -> parse_variable_byte_integer(Bin, 1, 0). +parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value) + when Multiplier > ?MULTIPLIER_MAX -> + ?PARSE_ERR(malformed_variable_byte_integer); parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> @@ -441,7 +472,23 @@ parse_reason_codes(Bin) -> parse_utf8_pair(<>) -> - {{Key, Val}, Rest}. + {{Key, Val}, Rest}; +parse_utf8_pair(<>) + when LenK > byte_size(Rest) -> + ?PARSE_ERR(#{ hint => user_property_not_enough_bytes + , parsed_key_length => LenK + , remaining_bytes_length => byte_size(Rest)}); +parse_utf8_pair(<>) + when LenV > byte_size(Rest) -> + ?PARSE_ERR(#{ hint => malformed_user_property_value + , parsed_key_length => LenK + , parsed_value_length => LenV + , remaining_bytes_length => byte_size(Rest)}); +parse_utf8_pair(Bin) + when 4 > byte_size(Bin) -> + ?PARSE_ERR(#{ hint => user_property_not_enough_bytes + , total_bytes => byte_size(Bin)}). parse_utf8_string(Bin, false) -> {undefined, Bin}; @@ -449,10 +496,26 @@ parse_utf8_string(Bin, true) -> parse_utf8_string(Bin). parse_utf8_string(<>) -> - {Str, Rest}. + {Str, Rest}; +parse_utf8_string(<>) + when Len > byte_size(Rest) -> + ?PARSE_ERR(#{ hint => malformed_utf8_string + , parsed_length => Len + , remaining_bytes_length => byte_size(Rest)}); +parse_utf8_string(Bin) + when 2 > byte_size(Bin) -> + ?PARSE_ERR(malformed_utf8_string_length). parse_binary_data(<>) -> - {Data, Rest}. + {Data, Rest}; +parse_binary_data(<>) + when Len > byte_size(Rest) -> + ?PARSE_ERR(#{ hint => malformed_binary_data + , parsed_length => Len + , remaining_bytes_length => byte_size(Rest)}); +parse_binary_data(Bin) + when 2 > byte_size(Bin) -> + ?PARSE_ERR(malformed_binary_data_length). %%-------------------------------------------------------------------- %% Serialize MQTT Packet @@ -719,7 +782,7 @@ serialize_binary_data(Bin) -> [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin]. serialize_utf8_string(undefined, false) -> - error(utf8_string_undefined); + ?SERIALIZE_ERR(utf8_string_undefined); serialize_utf8_string(undefined, true) -> <<>>; serialize_utf8_string(String, _AllowNull) -> @@ -767,13 +830,13 @@ validate_header(?PINGREQ, 0, 0, 0) -> ok; validate_header(?PINGRESP, 0, 0, 0) -> ok; validate_header(?DISCONNECT, 0, 0, 0) -> ok; validate_header(?AUTH, 0, 0, 0) -> ok; -validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header). +validate_header(_Type, _Dup, _QoS, _Rt) -> ?PARSE_ERR(bad_frame_header). -compile({inline, [validate_packet_id/1]}). -validate_packet_id(0) -> error(bad_packet_id); +validate_packet_id(0) -> ?PARSE_ERR(bad_packet_id); validate_packet_id(_) -> ok. -validate_subqos([3|_]) -> error(bad_subqos); +validate_subqos([3|_]) -> ?PARSE_ERR(bad_subqos); validate_subqos([_|T]) -> validate_subqos(T); validate_subqos([]) -> ok. diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index 32a81c26a..bbd1fb62d 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -547,9 +547,19 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) -> NState = State#state{parse_state = NParseState}, parse_incoming(Rest, postpone({incoming, Packet}, NState)) catch - error:Reason:Stk -> - ?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p", - [Reason, Stk, Data]), + throw : ?FRAME_PARSE_ERROR(Reason) -> + ?SLOG(info, #{ reason => Reason + , at_state => emqx_frame:describe_state(ParseState) + , input_bytes => Data + }), + FrameError = {frame_error, Reason}, + postpone({incoming, FrameError}, State); + error : Reason : Stacktrace -> + ?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState) + , input_bytes => Data + , exception => Reason + , stacktrace => Stacktrace + }), FrameError = {frame_error, Reason}, postpone({incoming, FrameError}, State) end. @@ -617,7 +627,7 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback, serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> fun(Packet) -> - case emqx_frame:serialize_pkt(Packet, Serialize) of + try emqx_frame:serialize_pkt(Packet, Serialize) of <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", [emqx_packet:format(Packet)]), ok = emqx_metrics:inc('delivery.dropped.too_large'), @@ -626,6 +636,17 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), ok = inc_outgoing_stats(Packet), Data + catch + %% Maybe Never happen. + throw : ?FRAME_SERIALIZE_ERROR(Reason) -> + ?SLOG(info, #{ reason => Reason + , input_packet => Packet}), + erlang:error(?FRAME_SERIALIZE_ERROR(Reason)); + error : Reason : Stacktrace -> + ?SLOG(error, #{ input_packet => Packet + , exception => Reason + , stacktrace => Stacktrace}), + erlang:error(frame_serialize_error) end end. @@ -791,4 +812,4 @@ get_ws_opts(Type, Listener, Key) -> emqx_config:get_listener_conf(Type, Listener, [websocket, Key]). get_active_n(Type, Listener) -> - emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]). \ No newline at end of file + emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]). diff --git a/apps/emqx/test/emqx_frame_SUITE.erl b/apps/emqx/test/emqx_frame_SUITE.erl index 09206cee1..6d3bccd99 100644 --- a/apps/emqx/test/emqx_frame_SUITE.erl +++ b/apps/emqx/test/emqx_frame_SUITE.erl @@ -22,7 +22,9 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --include_lib("emqx_ct_helpers/include/emqx_ct.hrl"). + +-define(ASSERT_FRAME_THROW(Reason, Expr), + ?assertThrow(?FRAME_PARSE_ERROR(Reason), Expr)). all() -> [{group, parse}, @@ -113,7 +115,7 @@ init_per_group(_Group, Config) -> Config. end_per_group(_Group, _Config) -> - ok. + ok. t_parse_cont(_) -> Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}), @@ -127,15 +129,15 @@ t_parse_cont(_) -> t_parse_frame_too_large(_) -> Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)), - ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 256})), - ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), + ?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 256})), + ?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). t_parse_frame_malformed_variable_byte_integer(_) -> - MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>, + MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 6) >>, ParseState = emqx_frame:initial_parse_state(#{}), - ?catch_error(malformed_variable_byte_integer, - emqx_frame:parse(MalformedPayload, ParseState)). + ?ASSERT_FRAME_THROW(malformed_variable_byte_integer, + emqx_frame:parse(MalformedPayload, ParseState)). t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, @@ -329,7 +331,7 @@ t_serialize_parse_qos1_publish(_) -> ?assertEqual(Bin, serialize_to_binary(Packet)), ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))), + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))), %% strict_mode = false _ = parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>), #{strict_mode => false}). @@ -340,7 +342,7 @@ t_serialize_parse_qos2_publish(_) -> ?assertEqual(Bin, serialize_to_binary(Packet)), ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))), + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))), %% strict_mode = false _ = parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>), #{strict_mode => false}). @@ -360,7 +362,7 @@ t_serialize_parse_puback(_) -> ?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)), ?assertEqual(Packet, parse_serialize(Packet)), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))), + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))), %% strict_mode = false ?PUBACK_PACKET(0) = parse_serialize(?PUBACK_PACKET(0), #{strict_mode => false}). @@ -381,7 +383,7 @@ t_serialize_parse_pubrec(_) -> ?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)), ?assertEqual(Packet, parse_serialize(Packet)), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))), + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))), %% strict_mode = false ?PUBREC_PACKET(0) = parse_serialize(?PUBREC_PACKET(0), #{strict_mode => false}). @@ -397,11 +399,11 @@ t_serialize_parse_pubrel(_) -> %% PUBREL with bad qos 0 Bin0 = <<6:4,0:4,2,0,1>>, ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), - ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), %% strict_mode = false ?PUBREL_PACKET(0) = parse_serialize(?PUBREL_PACKET(0), #{strict_mode => false}), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))). + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))). t_serialize_parse_pubrel_v5(_) -> Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -415,7 +417,7 @@ t_serialize_parse_pubcomp(_) -> %% strict_mode = false ?PUBCOMP_PACKET(0) = parse_serialize(?PUBCOMP_PACKET(0), #{strict_mode => false}), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))). + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))). t_serialize_parse_pubcomp_v5(_) -> Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -434,12 +436,12 @@ t_serialize_parse_subscribe(_) -> ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), %% strict_mode = false _ = parse_to_packet(Bin0, #{strict_mode => false}), - ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), %% strict_mode = false _ = parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters), #{strict_mode => false}), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))), - ?catch_error(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))). + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))), + ?ASSERT_FRAME_THROW(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))). t_serialize_parse_subscribe_v5(_) -> TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}}, @@ -453,7 +455,7 @@ t_serialize_parse_suback(_) -> %% strict_mode = false _ = parse_serialize(?SUBACK_PACKET(0, [?QOS_0]), #{strict_mode => false}), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))). + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))). t_serialize_parse_suback_v5(_) -> Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>, @@ -471,11 +473,11 @@ t_serialize_parse_unsubscribe(_) -> %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) Bin0 = <>, ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), - ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), %% strict_mode = false _ = parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]), #{strict_mode => false}), %% strict_mode = true - ?catch_error(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))). + ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))). t_serialize_parse_unsubscribe_v5(_) -> Props = #{'User-Property' => [{<<"key">>, <<"val">>}]}, @@ -550,4 +552,3 @@ parse_to_packet(Bin, Opts) -> Packet. payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)). - diff --git a/apps/emqx/test/emqx_inflight_SUITE.erl b/apps/emqx/test/emqx_inflight_SUITE.erl index a819e788b..6e5d8f5cc 100644 --- a/apps/emqx/test/emqx_inflight_SUITE.erl +++ b/apps/emqx/test/emqx_inflight_SUITE.erl @@ -20,10 +20,9 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). --include_lib("emqx_ct_helpers/include/emqx_ct.hrl"). all() -> emqx_ct:all(?MODULE). - + t_contain(_) -> Inflight = emqx_inflight:insert(k, v, emqx_inflight:new()), ?assert(emqx_inflight:contain(k, Inflight)), @@ -41,12 +40,12 @@ t_insert(_) -> ?assertEqual(2, emqx_inflight:size(Inflight)), ?assertEqual({value, 1}, emqx_inflight:lookup(a, Inflight)), ?assertEqual({value, 2}, emqx_inflight:lookup(b, Inflight)), - ?catch_error({key_exists, a}, emqx_inflight:insert(a, 1, Inflight)). + ?assertError({key_exists, a}, emqx_inflight:insert(a, 1, Inflight)). t_update(_) -> Inflight = emqx_inflight:insert(k, v, emqx_inflight:new()), ?assertEqual(Inflight, emqx_inflight:update(k, v, Inflight)), - ?catch_error(function_clause, emqx_inflight:update(badkey, v, Inflight)). + ?assertError(function_clause, emqx_inflight:update(badkey, v, Inflight)). t_resize(_) -> Inflight = emqx_inflight:insert(k, v, emqx_inflight:new(2)), diff --git a/apps/emqx/test/emqx_mqtt_props_SUITE.erl b/apps/emqx/test/emqx_mqtt_props_SUITE.erl index 2e96182b0..b4dcd2f18 100644 --- a/apps/emqx/test/emqx_mqtt_props_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_props_SUITE.erl @@ -21,7 +21,6 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). --include_lib("emqx_ct_helpers/include/emqx_ct.hrl"). all() -> emqx_ct:all(?MODULE). @@ -30,14 +29,14 @@ t_id(_) -> fun({Id, Prop}) -> ?assertEqual(Id, emqx_mqtt_props:id(element(1, Prop))) end), - ?catch_error({bad_property, 'Bad-Property'}, emqx_mqtt_props:id('Bad-Property')). + ?assertError({bad_property, 'Bad-Property'}, emqx_mqtt_props:id('Bad-Property')). t_name(_) -> foreach_prop( fun({Id, Prop}) -> ?assertEqual(emqx_mqtt_props:name(Id), element(1, Prop)) end), - ?catch_error({unsupported_property, 16#FF}, emqx_mqtt_props:name(16#FF)). + ?assertError({unsupported_property, 16#FF}, emqx_mqtt_props:name(16#FF)). t_filter(_) -> ConnProps = #{'Session-Expiry-Interval' => 1, @@ -60,7 +59,7 @@ t_validate(_) -> }, ok = emqx_mqtt_props:validate(ConnProps), BadProps = #{'Unknown-Property' => 10}, - ?catch_error({bad_property,'Unknown-Property'}, + ?assertError({bad_property,'Unknown-Property'}, emqx_mqtt_props:validate(BadProps)). t_validate_value(_) -> @@ -68,11 +67,11 @@ t_validate_value(_) -> ok = emqx_mqtt_props:validate(#{'Reason-String' => <<"Unknown Reason">>}), ok = emqx_mqtt_props:validate(#{'User-Property' => {<<"Prop">>, <<"Val">>}}), ok = emqx_mqtt_props:validate(#{'User-Property' => [{<<"Prop">>, <<"Val">>}]}), - ?catch_error({bad_property_value, {'Payload-Format-Indicator', 16#FFFF}}, + ?assertError({bad_property_value, {'Payload-Format-Indicator', 16#FFFF}}, emqx_mqtt_props:validate(#{'Payload-Format-Indicator' => 16#FFFF})), - ?catch_error({bad_property_value, {'Server-Keep-Alive', 16#FFFFFF}}, + ?assertError({bad_property_value, {'Server-Keep-Alive', 16#FFFFFF}}, emqx_mqtt_props:validate(#{'Server-Keep-Alive' => 16#FFFFFF})), - ?catch_error({bad_property_value, {'Will-Delay-Interval', -16#FF}}, + ?assertError({bad_property_value, {'Will-Delay-Interval', -16#FF}}, emqx_mqtt_props:validate(#{'Will-Delay-Interval' => -16#FF})). foreach_prop(Fun) -> @@ -86,4 +85,4 @@ foreach_prop(Fun) -> % error('TODO'). % t_get(_) -> -% error('TODO'). \ No newline at end of file +% error('TODO'). diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index 0cccb74bb..e8262a8ec 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -20,7 +20,6 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). --include_lib("emqx_ct_helpers/include/emqx_ct.hrl"). -import(emqx_topic, [ wildcard/1 @@ -126,21 +125,21 @@ t_validate(_) -> true = validate({filter, <<"abc/#">>}), true = validate({filter, <<"x">>}), true = validate({name, <<"x//y">>}), - true = validate({filter, <<"sport/tennis/#">>}), - ok = ?catch_error(empty_topic, validate({name, <<>>})), - ok = ?catch_error(topic_name_error, validate({name, <<"abc/#">>})), - ok = ?catch_error(topic_too_long, validate({name, long_topic()})), - ok = ?catch_error('topic_invalid_#', validate({filter, <<"abc/#/1">>})), - ok = ?catch_error(topic_invalid_char, validate({filter, <<"abc/#xzy/+">>})), - ok = ?catch_error(topic_invalid_char, validate({filter, <<"abc/xzy/+9827">>})), - ok = ?catch_error(topic_invalid_char, validate({filter, <<"sport/tennis#">>})), - ok = ?catch_error('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})). + true = validate({filter, <<"sport/tennis/#">>}), + ?assertError(empty_topic, validate({name, <<>>})), + ?assertError(topic_name_error, validate({name, <<"abc/#">>})), + ?assertError(topic_too_long, validate({name, long_topic()})), + ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})), + ?assertError(topic_invalid_char, validate({filter, <<"abc/#xzy/+">>})), + ?assertError(topic_invalid_char, validate({filter, <<"abc/xzy/+9827">>})), + ?assertError(topic_invalid_char, validate({filter, <<"sport/tennis#">>})), + ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})). t_sigle_level_validate(_) -> true = validate({filter, <<"+">>}), true = validate({filter, <<"+/tennis/#">>}), true = validate({filter, <<"sport/+/player1">>}), - ok = ?catch_error(topic_invalid_char, validate({filter, <<"sport+">>})). + ?assertError(topic_invalid_char, validate({filter, <<"sport+">>})). t_prepend(_) -> ?assertEqual(<<"ab">>, prepend(undefined, <<"ab">>)), @@ -192,14 +191,14 @@ long_topic() -> iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]). t_parse(_) -> - ok = ?catch_error({invalid_topic_filter, <<"$queue/t">>}, - parse(<<"$queue/t">>, #{share => <<"g">>})), - ok = ?catch_error({invalid_topic_filter, <<"$share/g/t">>}, - parse(<<"$share/g/t">>, #{share => <<"g">>})), - ok = ?catch_error({invalid_topic_filter, <<"$share/t">>}, - parse(<<"$share/t">>)), - ok = ?catch_error({invalid_topic_filter, <<"$share/+/t">>}, - parse(<<"$share/+/t">>)), + ?assertError({invalid_topic_filter, <<"$queue/t">>}, + parse(<<"$queue/t">>, #{share => <<"g">>})), + ?assertError({invalid_topic_filter, <<"$share/g/t">>}, + parse(<<"$share/g/t">>, #{share => <<"g">>})), + ?assertError({invalid_topic_filter, <<"$share/t">>}, + parse(<<"$share/t">>)), + ?assertError({invalid_topic_filter, <<"$share/+/t">>}, + parse(<<"$share/+/t">>)), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})), ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),