Merge pull request #5820 from JimMoen/refactor-frame-parse-logging

Refactor frame parse logging with bug fix.
This commit is contained in:
Zaiming (Stone) Shi 2021-09-28 19:31:39 +02:00 committed by GitHub
commit 83e7ab681b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 197 additions and 88 deletions

View File

@ -542,4 +542,9 @@
-define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])). -define(SHARE(Group, Topic), emqx_topic:join([<<?SHARE>>, Group, Topic])).
-define(IS_SHARE(Topic), case Topic of <<?SHARE, _/binary>> -> true; _ -> false end). -define(IS_SHARE(Topic), case Topic of <<?SHARE, _/binary>> -> true; _ -> false end).
-define(FRAME_PARSE_ERROR(Reason), {frame_parse_error, Reason}).
-define(FRAME_SERIALIZE_ERROR(Reason), {frame_serialize_error, Reason}).
-define(THROW_FRAME_ERROR(Reason), erlang:throw(?FRAME_PARSE_ERROR(Reason))).
-define(THROW_SERIALIZE_ERROR(Reason), erlang:throw(?FRAME_SERIALIZE_ERROR(Reason))).
-endif. -endif.

View File

@ -644,10 +644,21 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
NState = State#state{parse_state = NParseState}, NState = State#state{parse_state = NParseState},
parse_incoming(Rest, [Packet|Packets], NState) parse_incoming(Rest, [Packet|Packets], NState)
catch catch
error:Reason:Stk -> throw : ?FRAME_PARSE_ERROR(Reason) ->
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data:~0p", ?SLOG(info, #{ reason => Reason
[Reason, Stk, Data]), , at_state => emqx_frame:describe_state(ParseState)
{[{frame_error, Reason}|Packets], State} , input_bytes => Data
, parsed_packets => Packets
}),
{[{frame_error, Reason} | Packets], State};
error : Reason : Stacktrace ->
?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState)
, input_bytes => Data
, parsed_packets => Packets
, exception => Reason
, stacktrace => Stacktrace
}),
{[{frame_error, Reason} | Packets], State}
end. end.
-compile({inline, [next_incoming_msgs/1]}). -compile({inline, [next_incoming_msgs/1]}).
@ -696,7 +707,7 @@ handle_outgoing(Packet, State) ->
serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) -> fun(Packet) ->
case emqx_frame:serialize_pkt(Packet, Serialize) of try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!", <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
[emqx_packet:format(Packet)]), [emqx_packet:format(Packet)]),
ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped.too_large'),
@ -705,6 +716,17 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
ok = inc_outgoing_stats(Packet), ok = inc_outgoing_stats(Packet),
Data Data
catch
%% Maybe Never happen.
throw : ?FRAME_SERIALIZE_ERROR(Reason) ->
?SLOG(info, #{ reason => Reason
, input_packet => Packet}),
erlang:error(?FRAME_SERIALIZE_ERROR(Reason));
error : Reason : Stacktrace ->
?SLOG(error, #{ input_packet => Packet
, exception => Reason
, stacktrace => Stacktrace}),
erlang:error(frame_serialize_error)
end end
end. end.

View File

@ -34,6 +34,10 @@
, serialize/2 , serialize/2
]). ]).
-export([ describe_state/1
]).
-export_type([ options/0 -export_type([ options/0
, parse_state/0 , parse_state/0
, parse_result/0 , parse_result/0
@ -47,7 +51,9 @@
version => emqx_types:proto_ver() version => emqx_types:proto_ver()
}). }).
-type(parse_state() :: {none, options()} | {cont_state(), options()}). -define(NONE(Options), {none, Options}).
-type(parse_state() :: ?NONE(options()) | {cont_state(), options()}).
-type(parse_result() :: {more, parse_state()} -type(parse_result() :: {more, parse_state()}
| {ok, emqx_types:packet(), binary(), parse_state()}). | {ok, emqx_types:packet(), binary(), parse_state()}).
@ -61,27 +67,45 @@
-type(serialize_opts() :: options()). -type(serialize_opts() :: options()).
-define(none(Options), {none, Options}).
-define(DEFAULT_OPTIONS, -define(DEFAULT_OPTIONS,
#{strict_mode => false, #{strict_mode => false,
max_size => ?MAX_PACKET_SIZE, max_size => ?MAX_PACKET_SIZE,
version => ?MQTT_PROTO_V4 version => ?MQTT_PROTO_V4
}). }).
-define(PARSE_ERR(Reason), ?THROW_FRAME_ERROR(Reason)).
-define(SERIALIZE_ERR(Reason), ?THROW_SERIALIZE_ERROR(Reason)).
-define(MULTIPLIER_MAX, 16#200000).
-dialyzer({no_match, [serialize_utf8_string/2]}). -dialyzer({no_match, [serialize_utf8_string/2]}).
%% @doc Describe state for logging.
describe_state(?NONE(_Opts)) -> <<"clean">>;
describe_state({{len, _}, _Opts}) -> <<"parsing_varint_length">>;
describe_state({{body, State}, _Opts}) ->
#{ hdr := Hdr
, len := Len
} = State,
Desc = #{ parsed_header => Hdr
, expected_bytes => Len
},
case maps:get(rest, State, undefined) of
undefined -> Desc;
Body -> Desc#{received_bytes => body_bytes(Body)}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Init Parse State %% Init Parse State
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(initial_parse_state() -> {none, options()}). -spec(initial_parse_state() -> ?NONE(options())).
initial_parse_state() -> initial_parse_state() ->
initial_parse_state(#{}). initial_parse_state(#{}).
-spec(initial_parse_state(options()) -> {none, options()}). -spec(initial_parse_state(options()) -> ?NONE(options())).
initial_parse_state(Options) when is_map(Options) -> initial_parse_state(Options) when is_map(Options) ->
?none(maps:merge(?DEFAULT_OPTIONS, Options)). ?NONE(maps:merge(?DEFAULT_OPTIONS, Options)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Parse MQTT Frame %% Parse MQTT Frame
@ -92,10 +116,10 @@ parse(Bin) ->
parse(Bin, initial_parse_state()). parse(Bin, initial_parse_state()).
-spec(parse(binary(), parse_state()) -> parse_result()). -spec(parse(binary(), parse_state()) -> parse_result()).
parse(<<>>, {none, Options}) -> parse(<<>>, ?NONE(Options)) ->
{more, {none, Options}}; {more, ?NONE(Options)};
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>,
{none, Options = #{strict_mode := StrictMode}}) -> ?NONE(Options = #{strict_mode := StrictMode})) ->
%% Validate header if strict mode. %% Validate header if strict mode.
StrictMode andalso validate_header(Type, Dup, QoS, Retain), StrictMode andalso validate_header(Type, Dup, QoS, Retain),
Header = #mqtt_packet_header{type = Type, Header = #mqtt_packet_header{type = Type,
@ -123,14 +147,14 @@ parse_remaining_len(Rest, Header, Options) ->
parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize})
when Length > MaxSize -> when Length > MaxSize ->
error(frame_too_large); ?PARSE_ERR(frame_too_large);
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
{more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}}; {more, {{len, #{hdr => Header, len => {Multiplier, Length}}}, Options}};
%% Match DISCONNECT without payload %% Match DISCONNECT without payload
parse_remaining_len(<<0:8, Rest/binary>>, parse_remaining_len(<<0:8, Rest/binary>>,
Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) ->
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
{ok, Packet, Rest, ?none(Options)}; {ok, Packet, Rest, ?NONE(Options)};
%% Match PINGREQ. %% Match PINGREQ.
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) -> parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 0, Options); parse_frame(Rest, Header, 0, Options);
@ -138,21 +162,22 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) -> parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options); parse_frame(Rest, Header, 2, Options);
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options) parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
when Multiplier > 2097152 -> when Multiplier > ?MULTIPLIER_MAX ->
error(malformed_variable_byte_integer); ?PARSE_ERR(malformed_variable_byte_integer);
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) -> parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options); parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
Options = #{max_size := MaxSize}) -> Options = #{max_size := MaxSize}) ->
FrameLen = Value + Len * Multiplier, FrameLen = Value + Len * Multiplier,
case FrameLen > MaxSize of case FrameLen > MaxSize of
true -> error(frame_too_large); true -> ?PARSE_ERR(frame_too_large);
false -> parse_frame(Rest, Header, FrameLen, Options) false -> parse_frame(Rest, Header, FrameLen, Options)
end. end.
body_bytes(B) when is_binary(B) -> size(B); body_bytes(B) when is_binary(B) -> size(B);
body_bytes(?Q(Bytes, _)) -> Bytes. body_bytes(?Q(Bytes, _)) -> Bytes.
append_body(H, <<>>) -> H;
append_body(H, T) when is_binary(H) andalso size(H) < 1024 -> append_body(H, T) when is_binary(H) andalso size(H) < 1024 ->
<<H/binary, T/binary>>; <<H/binary, T/binary>>;
append_body(H, T) when is_binary(H) -> append_body(H, T) when is_binary(H) ->
@ -165,18 +190,18 @@ flatten_body(Body) when is_binary(Body) -> Body;
flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)). flatten_body(?Q(_, Q)) -> iolist_to_binary(queue:to_list(Q)).
parse_frame(Body, Header, 0, Options) -> parse_frame(Body, Header, 0, Options) ->
{ok, packet(Header), flatten_body(Body), ?none(Options)}; {ok, packet(Header), flatten_body(Body), ?NONE(Options)};
parse_frame(Body, Header, Length, Options) -> parse_frame(Body, Header, Length, Options) ->
case body_bytes(Body) >= Length of case body_bytes(Body) >= Length of
true -> true ->
<<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body), <<FrameBin:Length/binary, Rest/binary>> = flatten_body(Body),
case parse_packet(Header, FrameBin, Options) of case parse_packet(Header, FrameBin, Options) of
{Variable, Payload} -> {Variable, Payload} ->
{ok, packet(Header, Variable, Payload), Rest, ?none(Options)}; {ok, packet(Header, Variable, Payload), Rest, ?NONE(Options)};
Variable = #mqtt_packet_connect{proto_ver = Ver} -> Variable = #mqtt_packet_connect{proto_ver = Ver} ->
{ok, packet(Header, Variable), Rest, ?none(Options#{version := Ver})}; {ok, packet(Header, Variable), Rest, ?NONE(Options#{version := Ver})};
Variable -> Variable ->
{ok, packet(Header, Variable), Rest, ?none(Options)} {ok, packet(Header, Variable), Rest, ?NONE(Options)}
end; end;
false -> false ->
{more, {{body, #{hdr => Header, {more, {{body, #{hdr => Header,
@ -420,10 +445,16 @@ parse_property(<<16#28, Val, Bin/binary>>, Props) ->
parse_property(<<16#29, Val, Bin/binary>>, Props) -> parse_property(<<16#29, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}); parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val});
parse_property(<<16#2A, Val, Bin/binary>>, Props) -> parse_property(<<16#2A, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}). parse_property(Bin, Props#{'Shared-Subscription-Available' => Val});
parse_property(<<Property:8, _Rest/binary>>, _Props) ->
?PARSE_ERR(#{invalid_property_code => Property}).
%% TODO: invalid property in specific packet.
parse_variable_byte_integer(Bin) -> parse_variable_byte_integer(Bin) ->
parse_variable_byte_integer(Bin, 1, 0). parse_variable_byte_integer(Bin, 1, 0).
parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value)
when Multiplier > ?MULTIPLIER_MAX ->
?PARSE_ERR(malformed_variable_byte_integer);
parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
@ -441,7 +472,23 @@ parse_reason_codes(Bin) ->
parse_utf8_pair(<<Len1:16/big, Key:Len1/binary, parse_utf8_pair(<<Len1:16/big, Key:Len1/binary,
Len2:16/big, Val:Len2/binary, Rest/binary>>) -> Len2:16/big, Val:Len2/binary, Rest/binary>>) ->
{{Key, Val}, Rest}. {{Key, Val}, Rest};
parse_utf8_pair(<<LenK:16/big, Rest/binary>>)
when LenK > byte_size(Rest) ->
?PARSE_ERR(#{ hint => user_property_not_enough_bytes
, parsed_key_length => LenK
, remaining_bytes_length => byte_size(Rest)});
parse_utf8_pair(<<LenK:16/big, _Key:LenK/binary, %% key maybe malformed
LenV:16/big, Rest/binary>>)
when LenV > byte_size(Rest) ->
?PARSE_ERR(#{ hint => malformed_user_property_value
, parsed_key_length => LenK
, parsed_value_length => LenV
, remaining_bytes_length => byte_size(Rest)});
parse_utf8_pair(Bin)
when 4 > byte_size(Bin) ->
?PARSE_ERR(#{ hint => user_property_not_enough_bytes
, total_bytes => byte_size(Bin)}).
parse_utf8_string(Bin, false) -> parse_utf8_string(Bin, false) ->
{undefined, Bin}; {undefined, Bin};
@ -449,10 +496,26 @@ parse_utf8_string(Bin, true) ->
parse_utf8_string(Bin). parse_utf8_string(Bin).
parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>) -> parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
{Str, Rest}. {Str, Rest};
parse_utf8_string(<<Len:16/big, Rest/binary>>)
when Len > byte_size(Rest) ->
?PARSE_ERR(#{ hint => malformed_utf8_string
, parsed_length => Len
, remaining_bytes_length => byte_size(Rest)});
parse_utf8_string(Bin)
when 2 > byte_size(Bin) ->
?PARSE_ERR(malformed_utf8_string_length).
parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) -> parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
{Data, Rest}. {Data, Rest};
parse_binary_data(<<Len:16/big, Rest/binary>>)
when Len > byte_size(Rest) ->
?PARSE_ERR(#{ hint => malformed_binary_data
, parsed_length => Len
, remaining_bytes_length => byte_size(Rest)});
parse_binary_data(Bin)
when 2 > byte_size(Bin) ->
?PARSE_ERR(malformed_binary_data_length).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Serialize MQTT Packet %% Serialize MQTT Packet
@ -719,7 +782,7 @@ serialize_binary_data(Bin) ->
[<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin]. [<<(byte_size(Bin)):16/big-unsigned-integer>>, Bin].
serialize_utf8_string(undefined, false) -> serialize_utf8_string(undefined, false) ->
error(utf8_string_undefined); ?SERIALIZE_ERR(utf8_string_undefined);
serialize_utf8_string(undefined, true) -> serialize_utf8_string(undefined, true) ->
<<>>; <<>>;
serialize_utf8_string(String, _AllowNull) -> serialize_utf8_string(String, _AllowNull) ->
@ -767,13 +830,13 @@ validate_header(?PINGREQ, 0, 0, 0) -> ok;
validate_header(?PINGRESP, 0, 0, 0) -> ok; validate_header(?PINGRESP, 0, 0, 0) -> ok;
validate_header(?DISCONNECT, 0, 0, 0) -> ok; validate_header(?DISCONNECT, 0, 0, 0) -> ok;
validate_header(?AUTH, 0, 0, 0) -> ok; validate_header(?AUTH, 0, 0, 0) -> ok;
validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header). validate_header(_Type, _Dup, _QoS, _Rt) -> ?PARSE_ERR(bad_frame_header).
-compile({inline, [validate_packet_id/1]}). -compile({inline, [validate_packet_id/1]}).
validate_packet_id(0) -> error(bad_packet_id); validate_packet_id(0) -> ?PARSE_ERR(bad_packet_id);
validate_packet_id(_) -> ok. validate_packet_id(_) -> ok.
validate_subqos([3|_]) -> error(bad_subqos); validate_subqos([3|_]) -> ?PARSE_ERR(bad_subqos);
validate_subqos([_|T]) -> validate_subqos(T); validate_subqos([_|T]) -> validate_subqos(T);
validate_subqos([]) -> ok. validate_subqos([]) -> ok.

View File

@ -547,9 +547,19 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
NState = State#state{parse_state = NParseState}, NState = State#state{parse_state = NParseState},
parse_incoming(Rest, postpone({incoming, Packet}, NState)) parse_incoming(Rest, postpone({incoming, Packet}, NState))
catch catch
error:Reason:Stk -> throw : ?FRAME_PARSE_ERROR(Reason) ->
?LOG(error, "~nParse failed for ~0p~n~0p~nFrame data: ~0p", ?SLOG(info, #{ reason => Reason
[Reason, Stk, Data]), , at_state => emqx_frame:describe_state(ParseState)
, input_bytes => Data
}),
FrameError = {frame_error, Reason},
postpone({incoming, FrameError}, State);
error : Reason : Stacktrace ->
?SLOG(error, #{ at_state => emqx_frame:describe_state(ParseState)
, input_bytes => Data
, exception => Reason
, stacktrace => Stacktrace
}),
FrameError = {frame_error, Reason}, FrameError = {frame_error, Reason},
postpone({incoming, FrameError}, State) postpone({incoming, FrameError}, State)
end. end.
@ -617,7 +627,7 @@ handle_outgoing(Packets, State = #state{mqtt_piggyback = MQTTPiggyback,
serialize_and_inc_stats_fun(#state{serialize = Serialize}) -> serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
fun(Packet) -> fun(Packet) ->
case emqx_frame:serialize_pkt(Packet, Serialize) of try emqx_frame:serialize_pkt(Packet, Serialize) of
<<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.", <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large.",
[emqx_packet:format(Packet)]), [emqx_packet:format(Packet)]),
ok = emqx_metrics:inc('delivery.dropped.too_large'), ok = emqx_metrics:inc('delivery.dropped.too_large'),
@ -626,6 +636,17 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
ok = inc_outgoing_stats(Packet), ok = inc_outgoing_stats(Packet),
Data Data
catch
%% Maybe Never happen.
throw : ?FRAME_SERIALIZE_ERROR(Reason) ->
?SLOG(info, #{ reason => Reason
, input_packet => Packet}),
erlang:error(?FRAME_SERIALIZE_ERROR(Reason));
error : Reason : Stacktrace ->
?SLOG(error, #{ input_packet => Packet
, exception => Reason
, stacktrace => Stacktrace}),
erlang:error(frame_serialize_error)
end end
end. end.
@ -791,4 +812,4 @@ get_ws_opts(Type, Listener, Key) ->
emqx_config:get_listener_conf(Type, Listener, [websocket, Key]). emqx_config:get_listener_conf(Type, Listener, [websocket, Key]).
get_active_n(Type, Listener) -> get_active_n(Type, Listener) ->
emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]). emqx_config:get_listener_conf(Type, Listener, [tcp, active_n]).

View File

@ -22,7 +22,9 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").
-define(ASSERT_FRAME_THROW(Reason, Expr),
?assertThrow(?FRAME_PARSE_ERROR(Reason), Expr)).
all() -> all() ->
[{group, parse}, [{group, parse},
@ -113,7 +115,7 @@ init_per_group(_Group, Config) ->
Config. Config.
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
t_parse_cont(_) -> t_parse_cont(_) ->
Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}), Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}),
@ -127,15 +129,15 @@ t_parse_cont(_) ->
t_parse_frame_too_large(_) -> t_parse_frame_too_large(_) ->
Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)), Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)),
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 256})), ?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 256})),
?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?ASSERT_FRAME_THROW(frame_too_large, parse_serialize(Packet, #{max_size => 512})),
?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})).
t_parse_frame_malformed_variable_byte_integer(_) -> t_parse_frame_malformed_variable_byte_integer(_) ->
MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 4) >>, MalformedPayload = << <<16#80>> || _ <- lists:seq(1, 6) >>,
ParseState = emqx_frame:initial_parse_state(#{}), ParseState = emqx_frame:initial_parse_state(#{}),
?catch_error(malformed_variable_byte_integer, ?ASSERT_FRAME_THROW(malformed_variable_byte_integer,
emqx_frame:parse(MalformedPayload, ParseState)). emqx_frame:parse(MalformedPayload, ParseState)).
t_serialize_parse_v3_connect(_) -> t_serialize_parse_v3_connect(_) ->
Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115,
@ -329,7 +331,7 @@ t_serialize_parse_qos1_publish(_) ->
?assertEqual(Bin, serialize_to_binary(Packet)), ?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))), ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))),
%% strict_mode = false %% strict_mode = false
_ = parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>), #{strict_mode => false}). _ = parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
@ -340,7 +342,7 @@ t_serialize_parse_qos2_publish(_) ->
?assertEqual(Bin, serialize_to_binary(Packet)), ?assertEqual(Bin, serialize_to_binary(Packet)),
?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))), ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))),
%% strict_mode = false %% strict_mode = false
_ = parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>), #{strict_mode => false}). _ = parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>), #{strict_mode => false}).
@ -360,7 +362,7 @@ t_serialize_parse_puback(_) ->
?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)), ?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)),
?assertEqual(Packet, parse_serialize(Packet)), ?assertEqual(Packet, parse_serialize(Packet)),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))), ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))),
%% strict_mode = false %% strict_mode = false
?PUBACK_PACKET(0) = parse_serialize(?PUBACK_PACKET(0), #{strict_mode => false}). ?PUBACK_PACKET(0) = parse_serialize(?PUBACK_PACKET(0), #{strict_mode => false}).
@ -381,7 +383,7 @@ t_serialize_parse_pubrec(_) ->
?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)), ?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)),
?assertEqual(Packet, parse_serialize(Packet)), ?assertEqual(Packet, parse_serialize(Packet)),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))), ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))),
%% strict_mode = false %% strict_mode = false
?PUBREC_PACKET(0) = parse_serialize(?PUBREC_PACKET(0), #{strict_mode => false}). ?PUBREC_PACKET(0) = parse_serialize(?PUBREC_PACKET(0), #{strict_mode => false}).
@ -397,11 +399,11 @@ t_serialize_parse_pubrel(_) ->
%% PUBREL with bad qos 0 %% PUBREL with bad qos 0
Bin0 = <<6:4,0:4,2,0,1>>, Bin0 = <<6:4,0:4,2,0,1>>,
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
%% strict_mode = false %% strict_mode = false
?PUBREL_PACKET(0) = parse_serialize(?PUBREL_PACKET(0), #{strict_mode => false}), ?PUBREL_PACKET(0) = parse_serialize(?PUBREL_PACKET(0), #{strict_mode => false}),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))). ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))).
t_serialize_parse_pubrel_v5(_) -> t_serialize_parse_pubrel_v5(_) ->
Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
@ -415,7 +417,7 @@ t_serialize_parse_pubcomp(_) ->
%% strict_mode = false %% strict_mode = false
?PUBCOMP_PACKET(0) = parse_serialize(?PUBCOMP_PACKET(0), #{strict_mode => false}), ?PUBCOMP_PACKET(0) = parse_serialize(?PUBCOMP_PACKET(0), #{strict_mode => false}),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))). ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))).
t_serialize_parse_pubcomp_v5(_) -> t_serialize_parse_pubcomp_v5(_) ->
Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}),
@ -434,12 +436,12 @@ t_serialize_parse_subscribe(_) ->
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
%% strict_mode = false %% strict_mode = false
_ = parse_to_packet(Bin0, #{strict_mode => false}), _ = parse_to_packet(Bin0, #{strict_mode => false}),
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
%% strict_mode = false %% strict_mode = false
_ = parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters), #{strict_mode => false}), _ = parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters), #{strict_mode => false}),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))), ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))),
?catch_error(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))). ?ASSERT_FRAME_THROW(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))).
t_serialize_parse_subscribe_v5(_) -> t_serialize_parse_subscribe_v5(_) ->
TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}}, TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0}},
@ -453,7 +455,7 @@ t_serialize_parse_suback(_) ->
%% strict_mode = false %% strict_mode = false
_ = parse_serialize(?SUBACK_PACKET(0, [?QOS_0]), #{strict_mode => false}), _ = parse_serialize(?SUBACK_PACKET(0, [?QOS_0]), #{strict_mode => false}),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))). ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))).
t_serialize_parse_suback_v5(_) -> t_serialize_parse_suback_v5(_) ->
Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>, Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>,
@ -471,11 +473,11 @@ t_serialize_parse_unsubscribe(_) ->
%% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>])
Bin0 = <<?UNSUBSCRIBE:4,0:4,10,0,2,0,6,84,111,112,105,99,65>>, Bin0 = <<?UNSUBSCRIBE:4,0:4,10,0,2,0,6,84,111,112,105,99,65>>,
?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})),
?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), ?ASSERT_FRAME_THROW(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})),
%% strict_mode = false %% strict_mode = false
_ = parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]), #{strict_mode => false}), _ = parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]), #{strict_mode => false}),
%% strict_mode = true %% strict_mode = true
?catch_error(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))). ?ASSERT_FRAME_THROW(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))).
t_serialize_parse_unsubscribe_v5(_) -> t_serialize_parse_unsubscribe_v5(_) ->
Props = #{'User-Property' => [{<<"key">>, <<"val">>}]}, Props = #{'User-Property' => [{<<"key">>, <<"val">>}]},
@ -550,4 +552,3 @@ parse_to_packet(Bin, Opts) ->
Packet. Packet.
payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)). payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)).

View File

@ -20,10 +20,9 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
t_contain(_) -> t_contain(_) ->
Inflight = emqx_inflight:insert(k, v, emqx_inflight:new()), Inflight = emqx_inflight:insert(k, v, emqx_inflight:new()),
?assert(emqx_inflight:contain(k, Inflight)), ?assert(emqx_inflight:contain(k, Inflight)),
@ -41,12 +40,12 @@ t_insert(_) ->
?assertEqual(2, emqx_inflight:size(Inflight)), ?assertEqual(2, emqx_inflight:size(Inflight)),
?assertEqual({value, 1}, emqx_inflight:lookup(a, Inflight)), ?assertEqual({value, 1}, emqx_inflight:lookup(a, Inflight)),
?assertEqual({value, 2}, emqx_inflight:lookup(b, Inflight)), ?assertEqual({value, 2}, emqx_inflight:lookup(b, Inflight)),
?catch_error({key_exists, a}, emqx_inflight:insert(a, 1, Inflight)). ?assertError({key_exists, a}, emqx_inflight:insert(a, 1, Inflight)).
t_update(_) -> t_update(_) ->
Inflight = emqx_inflight:insert(k, v, emqx_inflight:new()), Inflight = emqx_inflight:insert(k, v, emqx_inflight:new()),
?assertEqual(Inflight, emqx_inflight:update(k, v, Inflight)), ?assertEqual(Inflight, emqx_inflight:update(k, v, Inflight)),
?catch_error(function_clause, emqx_inflight:update(badkey, v, Inflight)). ?assertError(function_clause, emqx_inflight:update(badkey, v, Inflight)).
t_resize(_) -> t_resize(_) ->
Inflight = emqx_inflight:insert(k, v, emqx_inflight:new(2)), Inflight = emqx_inflight:insert(k, v, emqx_inflight:new(2)),

View File

@ -21,7 +21,6 @@
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
@ -30,14 +29,14 @@ t_id(_) ->
fun({Id, Prop}) -> fun({Id, Prop}) ->
?assertEqual(Id, emqx_mqtt_props:id(element(1, Prop))) ?assertEqual(Id, emqx_mqtt_props:id(element(1, Prop)))
end), end),
?catch_error({bad_property, 'Bad-Property'}, emqx_mqtt_props:id('Bad-Property')). ?assertError({bad_property, 'Bad-Property'}, emqx_mqtt_props:id('Bad-Property')).
t_name(_) -> t_name(_) ->
foreach_prop( foreach_prop(
fun({Id, Prop}) -> fun({Id, Prop}) ->
?assertEqual(emqx_mqtt_props:name(Id), element(1, Prop)) ?assertEqual(emqx_mqtt_props:name(Id), element(1, Prop))
end), end),
?catch_error({unsupported_property, 16#FF}, emqx_mqtt_props:name(16#FF)). ?assertError({unsupported_property, 16#FF}, emqx_mqtt_props:name(16#FF)).
t_filter(_) -> t_filter(_) ->
ConnProps = #{'Session-Expiry-Interval' => 1, ConnProps = #{'Session-Expiry-Interval' => 1,
@ -60,7 +59,7 @@ t_validate(_) ->
}, },
ok = emqx_mqtt_props:validate(ConnProps), ok = emqx_mqtt_props:validate(ConnProps),
BadProps = #{'Unknown-Property' => 10}, BadProps = #{'Unknown-Property' => 10},
?catch_error({bad_property,'Unknown-Property'}, ?assertError({bad_property,'Unknown-Property'},
emqx_mqtt_props:validate(BadProps)). emqx_mqtt_props:validate(BadProps)).
t_validate_value(_) -> t_validate_value(_) ->
@ -68,11 +67,11 @@ t_validate_value(_) ->
ok = emqx_mqtt_props:validate(#{'Reason-String' => <<"Unknown Reason">>}), ok = emqx_mqtt_props:validate(#{'Reason-String' => <<"Unknown Reason">>}),
ok = emqx_mqtt_props:validate(#{'User-Property' => {<<"Prop">>, <<"Val">>}}), ok = emqx_mqtt_props:validate(#{'User-Property' => {<<"Prop">>, <<"Val">>}}),
ok = emqx_mqtt_props:validate(#{'User-Property' => [{<<"Prop">>, <<"Val">>}]}), ok = emqx_mqtt_props:validate(#{'User-Property' => [{<<"Prop">>, <<"Val">>}]}),
?catch_error({bad_property_value, {'Payload-Format-Indicator', 16#FFFF}}, ?assertError({bad_property_value, {'Payload-Format-Indicator', 16#FFFF}},
emqx_mqtt_props:validate(#{'Payload-Format-Indicator' => 16#FFFF})), emqx_mqtt_props:validate(#{'Payload-Format-Indicator' => 16#FFFF})),
?catch_error({bad_property_value, {'Server-Keep-Alive', 16#FFFFFF}}, ?assertError({bad_property_value, {'Server-Keep-Alive', 16#FFFFFF}},
emqx_mqtt_props:validate(#{'Server-Keep-Alive' => 16#FFFFFF})), emqx_mqtt_props:validate(#{'Server-Keep-Alive' => 16#FFFFFF})),
?catch_error({bad_property_value, {'Will-Delay-Interval', -16#FF}}, ?assertError({bad_property_value, {'Will-Delay-Interval', -16#FF}},
emqx_mqtt_props:validate(#{'Will-Delay-Interval' => -16#FF})). emqx_mqtt_props:validate(#{'Will-Delay-Interval' => -16#FF})).
foreach_prop(Fun) -> foreach_prop(Fun) ->
@ -86,4 +85,4 @@ foreach_prop(Fun) ->
% error('TODO'). % error('TODO').
% t_get(_) -> % t_get(_) ->
% error('TODO'). % error('TODO').

View File

@ -20,7 +20,6 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("emqx_ct_helpers/include/emqx_ct.hrl").
-import(emqx_topic, -import(emqx_topic,
[ wildcard/1 [ wildcard/1
@ -126,21 +125,21 @@ t_validate(_) ->
true = validate({filter, <<"abc/#">>}), true = validate({filter, <<"abc/#">>}),
true = validate({filter, <<"x">>}), true = validate({filter, <<"x">>}),
true = validate({name, <<"x//y">>}), true = validate({name, <<"x//y">>}),
true = validate({filter, <<"sport/tennis/#">>}), true = validate({filter, <<"sport/tennis/#">>}),
ok = ?catch_error(empty_topic, validate({name, <<>>})), ?assertError(empty_topic, validate({name, <<>>})),
ok = ?catch_error(topic_name_error, validate({name, <<"abc/#">>})), ?assertError(topic_name_error, validate({name, <<"abc/#">>})),
ok = ?catch_error(topic_too_long, validate({name, long_topic()})), ?assertError(topic_too_long, validate({name, long_topic()})),
ok = ?catch_error('topic_invalid_#', validate({filter, <<"abc/#/1">>})), ?assertError('topic_invalid_#', validate({filter, <<"abc/#/1">>})),
ok = ?catch_error(topic_invalid_char, validate({filter, <<"abc/#xzy/+">>})), ?assertError(topic_invalid_char, validate({filter, <<"abc/#xzy/+">>})),
ok = ?catch_error(topic_invalid_char, validate({filter, <<"abc/xzy/+9827">>})), ?assertError(topic_invalid_char, validate({filter, <<"abc/xzy/+9827">>})),
ok = ?catch_error(topic_invalid_char, validate({filter, <<"sport/tennis#">>})), ?assertError(topic_invalid_char, validate({filter, <<"sport/tennis#">>})),
ok = ?catch_error('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})). ?assertError('topic_invalid_#', validate({filter, <<"sport/tennis/#/ranking">>})).
t_sigle_level_validate(_) -> t_sigle_level_validate(_) ->
true = validate({filter, <<"+">>}), true = validate({filter, <<"+">>}),
true = validate({filter, <<"+/tennis/#">>}), true = validate({filter, <<"+/tennis/#">>}),
true = validate({filter, <<"sport/+/player1">>}), true = validate({filter, <<"sport/+/player1">>}),
ok = ?catch_error(topic_invalid_char, validate({filter, <<"sport+">>})). ?assertError(topic_invalid_char, validate({filter, <<"sport+">>})).
t_prepend(_) -> t_prepend(_) ->
?assertEqual(<<"ab">>, prepend(undefined, <<"ab">>)), ?assertEqual(<<"ab">>, prepend(undefined, <<"ab">>)),
@ -192,14 +191,14 @@ long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]). iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
t_parse(_) -> t_parse(_) ->
ok = ?catch_error({invalid_topic_filter, <<"$queue/t">>}, ?assertError({invalid_topic_filter, <<"$queue/t">>},
parse(<<"$queue/t">>, #{share => <<"g">>})), parse(<<"$queue/t">>, #{share => <<"g">>})),
ok = ?catch_error({invalid_topic_filter, <<"$share/g/t">>}, ?assertError({invalid_topic_filter, <<"$share/g/t">>},
parse(<<"$share/g/t">>, #{share => <<"g">>})), parse(<<"$share/g/t">>, #{share => <<"g">>})),
ok = ?catch_error({invalid_topic_filter, <<"$share/t">>}, ?assertError({invalid_topic_filter, <<"$share/t">>},
parse(<<"$share/t">>)), parse(<<"$share/t">>)),
ok = ?catch_error({invalid_topic_filter, <<"$share/+/t">>}, ?assertError({invalid_topic_filter, <<"$share/+/t">>},
parse(<<"$share/+/t">>)), parse(<<"$share/+/t">>)),
?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})), ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)), ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),