Merge branch 'emqx30' into emqx30-feng

This commit is contained in:
Feng Lee 2018-08-30 23:41:26 +08:00 committed by GitHub
commit 3131acd3bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 213 additions and 95 deletions

View File

@ -32,13 +32,14 @@ TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
EUNIT_OPTS = verbose
# CT_SUITES = emqx_stats
# CT_SUITES = emqx_mqueue
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_connection emqx_session emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \
CT_SUITES = emqx emqx_banned emqx_connection emqx_session emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone emqx_mountpoint
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone \
emqx_mountpoint emqx_listeners
CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1

View File

@ -1,7 +1,7 @@
# *EMQ X* - MQTT Broker
*EMQ X* broker is fully a open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients.
*EMQ X* broker is a fully open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients.
Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket, STOMP and SockJS. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster.
@ -17,8 +17,8 @@ The *EMQ* broker is cross-platform, which can be deployed on Linux, Unix, Mac, W
Download the binary package for your platform from [here](http://emqtt.io/downloads).
-[Single Node Install](http://emqtt.io/docs/v2/install.html)
-[Multi Node Install](http://emqtt.io/docs/v2/cluster.html)
- [Single Node Install](http://emqtt.io/docs/v2/install.html)
- [Multi Node Install](http://emqtt.io/docs/v2/cluster.html)
## Build From Source

View File

@ -145,5 +145,16 @@
descr :: string()
}).
%%--------------------------------------------------------------------
%% Banned
%%--------------------------------------------------------------------
-record(banned, {
key,
reason,
by,
desc,
until}).
-endif.

View File

@ -36,14 +36,8 @@
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
-type(key() :: {client_id, emqx_types:client_id()} |
{username, emqx_types:username() |
{ipaddr, inet:ip_address()}}).
-record(state, {expiry_timer}).
-record(banned, {key :: key(), reason, by, desc, until}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
@ -84,7 +78,7 @@ del(Key) ->
%%--------------------------------------------------------------------
init([]) ->
emqx_timer:seed(),
emqx_time:seed(),
{ok, ensure_expiry_timer(#state{})}.
handle_call(Req, _From, State) ->
@ -128,7 +122,8 @@ expire_banned_item(Key, Now) ->
[#banned{until = undefined}] -> ok;
[B = #banned{until = Until}] when Until < Now ->
mnesia:delete_object(?TAB, B, sticky_write);
[_] -> ok;
[] -> ok
end,
expire_banned_item(mnesia:next(Key), Now).
expire_banned_item(mnesia:next(?TAB, Key), Now).

View File

@ -373,22 +373,12 @@ init([Options]) ->
{_ver, undefined} -> random_client_id();
{_ver, Id} -> iolist_to_binary(Id)
end,
Username = case proplists:get_value(username, Options) of
undefined -> <<>>;
Name -> Name
end,
Password = case proplists:get_value(password, Options) of
undefined -> <<>>;
Passw -> Passw
end,
State = init(Options, #state{host = {127,0,0,1},
port = 1883,
hosts = [],
sock_opts = [],
bridge_mode = false,
client_id = ClientId,
username = Username,
password = Password,
clean_start = true,
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
@ -450,9 +440,9 @@ init([{client_id, ClientId} | Opts], State) ->
init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->
init(Opts, State#state{clean_start = CleanStart});
init([{useranme, Username} | Opts], State) ->
init([{username, Username} | Opts], State) ->
init(Opts, State#state{username = iolist_to_binary(Username)});
init([{passwrod, Password} | Opts], State) ->
init([{password, Password} | Opts], State) ->
init(Opts, State#state{password = iolist_to_binary(Password)});
init([{keepalive, Secs} | Opts], State) ->
init(Opts, State#state{keepalive = timer:seconds(Secs)});
@ -552,8 +542,6 @@ mqtt_connect(State = #state{client_id = ClientId,
properties = Properties}) ->
?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties),
io:format("ConnProps: ~p, ClientID: ~p, Username: ~p, Password: ~p~n",
[ConnProps, ClientId, Username, Password]),
send(?CONNECT_PACKET(
#mqtt_packet_connect{proto_ver = ProtoVer,
proto_name = ProtoName,
@ -592,8 +580,8 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode,
_SessPresent,
Properties), State) ->
Reason = emqx_reason_codes:name(ReasonCode),
Properties), State = #state{ proto_ver = ProtoVer}) ->
Reason = emqx_reason_codes:name(ReasonCode, ProtoVer),
case take_call(connect, State) of
{value, #call{from = From}, _State} ->
Reply = {error, {Reason, Properties}},
@ -1082,6 +1070,7 @@ receive_loop(Bytes, State = #state{parse_state = ParseState}) ->
{error, Reason} ->
{stop, Reason};
{'EXIT', Error} ->
io:format("client stop"),
{stop, Error}
end.

View File

@ -264,7 +264,6 @@ process_packet(?CONNECT_PACKET(
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
WillMsg = emqx_packet:will_msg(Connect),
PState1 = set_username(Username,
PState#pstate{client_id = ClientId,
proto_ver = ProtoVer,
@ -412,7 +411,7 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
true ->
emqx_reason_codes:compat(connack, ReasonCode)
end}, PState),
{error, emqx_reason_codes:name(ReasonCode), PState}.
{error, emqx_reason_codes:name(ReasonCode, ProtoVer), PState}.
%%------------------------------------------------------------------------------
%% Publish Message -> Broker
@ -650,12 +649,14 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) ->
shutdown(_Reason, #pstate{client_id = undefined}) ->
ok;
shutdown(_Reason, PState = #pstate{connected = false}) ->
ok;
shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
Reason =:= discard ->
emqx_cm:unregister_connection(ClientId);
shutdown(Reason, PState = #pstate{client_id = ClientId,
will_msg = WillMsg,
connected = true}) ->
shutdown(Reason, PState = #pstate{connected = true,
client_id = ClientId,
will_msg = WillMsg}) ->
?LOG(info, "Shutdown for ~p", [Reason], PState),
_ = send_willmsg(WillMsg),
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
@ -663,6 +664,10 @@ shutdown(Reason, PState = #pstate{client_id = ClientId,
send_willmsg(undefined) ->
ignore;
send_willmsg(WillMsg = #message{topic = Topic,
headers = #{'Will-Delay-Interval' := Interval}}) when is_integer(Interval) ->
SendAfter = integer_to_binary(Interval),
emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>});
send_willmsg(WillMsg) ->
emqx_broker:publish(WillMsg).
@ -704,4 +709,3 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
sp(true) -> 1;
sp(false) -> 0.

View File

@ -17,9 +17,18 @@
-include("emqx_mqtt.hrl").
-export([name/1, text/1]).
-export([name/2, text/1]).
-export([compat/2]).
name(I, Ver) when Ver >= ?MQTT_PROTO_V5 ->
name(I);
name(0, _Ver) -> connection_acceptd;
name(1, _Ver) -> unacceptable_protocol_version;
name(2, _Ver) -> client_identifier_not_valid;
name(3, _Ver) -> server_unavaliable;
name(4, _Ver) -> malformed_username_or_password;
name(5, _Ver) -> unauthorized_client.
name(16#00) -> success;
name(16#01) -> granted_qos1;
name(16#02) -> granted_qos2;

View File

@ -0,0 +1,41 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_banned_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> [t_banned_all].
t_banned_all(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
emqx_banned:start_link(),
{MegaSecs, Secs, MicroSecs} = erlang:timestamp(),
ok = emqx_banned:add(#banned{key = {client_id, <<"TestClient">>},
reason = <<"test">>,
by = <<"banned suite">>,
desc = <<"test">>,
until = {MegaSecs, Secs + 10, MicroSecs}}),
% here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed
timer:sleep(100),
?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
emqx_banned:del({client_id, <<"TestClient">>}),
?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})).

View File

@ -0,0 +1,76 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_listeners_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
all() ->
[start_stop_listeners,
restart_listeners].
init_per_suite() ->
NewConfig = generate_config(),
application:ensure_all_started(esockd),
lists:foreach(fun set_app_env/1, NewConfig),
ok.
end_per_suite() ->
application:stop(esockd),
ok.
start_stop_listeners(_) ->
ok = emqx_listeners:start(),
ok = emqx_listeners:stop().
restart_listeners(_) ->
ok = emqx_listeners:start(),
ok = emqx_listeners:stop(),
ok = emqx_listeners:restart(),
ok = emqx_listeners:stop().
generate_config() ->
Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]),
Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]),
cuttlefish_generator:map(Schema, Conf).
set_app_env({App, Lists}) ->
lists:foreach(fun({acl_file, _Var}) ->
application:set_env(App, acl_file, local_path(["etc", "acl.conf"]));
({plugins_loaded_file, _Var}) ->
application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"]));
({Par, Var}) ->
application:set_env(App, Par, Var)
end, Lists).
local_path(Components, Module) ->
filename:join([get_base_dir(Module) | Components]).
local_path(Components) ->
local_path(Components, ?MODULE).
get_base_dir(Module) ->
{file, Here} = code:is_loaded(Module),
filename:dirname(filename:dirname(Here)).
get_base_dir() ->
get_base_dir(?MODULE).

View File

@ -28,7 +28,8 @@ all() ->
message_make,
message_flag,
message_header,
message_format
message_format,
message_expired
].
message_make(_) ->
@ -62,4 +63,16 @@ message_header(_) ->
message_format(_) ->
io:format("~s", [emqx_message:format(emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>))]).
message_expired(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
timer:sleep(500),
?assertNot(emqx_message:is_expired(Msg1)),
{ok, 1} = emqx_message:check_expiry(Msg1),
timer:sleep(600),
?assert(emqx_message:is_expired(Msg1)),
expired = emqx_message:check_expiry(Msg1),
timer:sleep(1000),
Msg2 = emqx_message:update_expiry(Msg1),
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).

View File

@ -33,14 +33,13 @@
all() ->
[basic_test,
retained_message_test,
will_message_test,
zero_length_clientid_test,
offline_message_queueing_test,
overlapping_subscriptions_test,
keepalive_test,
redelivery_on_reconnect_test,
subscribe_failure_test,
%% subscribe_failure_test,
dollar_topics_test].
init_per_suite(Config) ->
@ -57,7 +56,7 @@ receive_messages(0, Msgs) ->
Msgs;
receive_messages(Count, Msgs) ->
receive
{public, Msg} ->
{publish, Msg} ->
receive_messages(Count-1, [Msg|Msgs]);
_Other ->
receive_messages(Count, Msgs)
@ -69,40 +68,16 @@ basic_test(_Config) ->
Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"),
{ok, C, _} = emqx_client:start_link(),
{ok, _, [0]} = emqx_client:subscribe(C, Topic, qos2),
ok = emqx_client:publish(C, Topic, <<"qos 0">>),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 1">>, 1),
{ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
ok = emqx_client:disconnect(C),
?assertEqual(3, length(receive_messages(3))).
retained_message_test(_Config) ->
ct:print("Retained message test starting"),
%% Retained messages
{ok, C1, _} = emqx_client:start_link([{clean_start, true}]),
ok = emqx_client:publish(C1, nth(1, ?TOPICS), <<"qos 0">>, [{qos, 0}, {retain, true}]),
{ok, _} = emqx_client:publish(C1, nth(3, ?TOPICS), <<"qos 1">>, [{qos, 1}, {retain, true}]),
{ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<"qos 2">>, [{qos, 2}, {retain, true}]),
timer:sleep(10),
{ok, #{}, [0]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
ok = emqx_client:disconnect(C1),
?assertEqual(3, length(receive_messages(10))),
%% Clear retained messages
{ok, C2, _} = emqx_client:start_link([{clean_start, true}]),
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"">>, [{qos, 0}, {retain, true}]),
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"">>, [{qos, 1}, {retain, true}]),
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"">>, [{qos, 2}, {retain, true}]),
timer:sleep(10), %% wait for QoS 2 exchange to be completed
{ok, _, [0]} = emqx_client:subscribe(C2, nth(6, ?WILD_TOPICS), 2),
timer:sleep(10),
ok = emqx_client:disconnect(),
?assertEqual(0, length(receive_messages(3))).
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
?assertEqual(3, length(receive_messages(3))),
ok = emqx_client:disconnect(C).
will_message_test(_Config) ->
{ok, C1, _} = emqx_client:start_link([{clean_start, true},
{will_topic = nth(3, ?TOPICS)},
{will_topic, nth(3, ?TOPICS)},
{will_payload, <<"client disconnected">>},
{keepalive, 2}]),
{ok, C2, _} = emqx_client:start_link(),
@ -110,14 +85,18 @@ will_message_test(_Config) ->
timer:sleep(10),
ok = emqx_client:stop(C1),
timer:sleep(5),
ok = emqx_client:disconnect(C2),
?assertEqual(1, length(receive_messages(1))),
ok = emqx_client:disconnect(C2),
ct:print("Will message test succeeded").
zero_length_clientid_test(_Config) ->
ct:print("Zero length clientid test starting"),
{error, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<>>}]),
%% TODO: There are some controversies on the situation when
%% clean_start flag is true and clientid is zero length.
%% {error, _} = emqx_client:start_link([{clean_start, false},
%% {client_id, <<>>}]),
{ok, _, _} = emqx_client:start_link([{clean_start, true},
{client_id, <<>>}]),
ct:print("Zero length clientid test succeeded").
@ -129,7 +108,7 @@ offline_message_queueing_test(_) ->
ok = emqx_client:disconnect(C1),
{ok, C2, _} = emqx_client:start_link([{clean_start, true},
{client_id, <<"c2">>}]),
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
@ -147,9 +126,9 @@ overlapping_subscriptions_test(_) ->
{nth(1, ?WILD_TOPICS), 1}]),
timer:sleep(10),
{ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2),
time:sleep(10),
emqx_client:disconnect(C),
Num = receive_messages(2),
timer:sleep(10),
Num = length(receive_messages(2)),
?assert(lists:member(Num, [1, 2])),
if
Num == 1 ->
@ -159,7 +138,8 @@ overlapping_subscriptions_test(_) ->
ct:print("This server is publishing one message per each
matching overlapping subscription.");
true -> ok
end.
end,
emqx_client:disconnect(C).
keepalive_test(_) ->
ct:print("Keepalive test starting"),
@ -168,14 +148,13 @@ keepalive_test(_) ->
{will_topic, nth(5, ?TOPICS)},
{will_payload, <<"keepalive expiry">>}]),
ok = emqx_client:pause(C1),
{ok, C2, _} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]),
{ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2),
timer:sleep(15000),
ok = emqx_client:disconnect(C2),
?assertEqual(1, length(receive_messages(1))),
ct:print("Keepalive test succeeded").
ct:print("Keepalive test succeeded"),
ok = emqx_client:disconnect(C2).
redelivery_on_reconnect_test(_) ->
ct:print("Redelivery on reconnect test starting"),
@ -188,7 +167,7 @@ redelivery_on_reconnect_test(_) ->
[{qos, 1}, {retain, false}]),
{ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>,
[{qos, 2}, {retain, false}]),
time:sleep(10),
timer:sleep(10),
ok = emqx_client:disconnect(C1),
?assertEqual(0, length(receive_messages(2))),
{ok, C2, _} = emqx_client:start_link([{clean_start, false},
@ -197,20 +176,20 @@ redelivery_on_reconnect_test(_) ->
ok = emqx_client:disconnect(C2),
?assertEqual(2, length(receive_messages(2))).
subscribe_failure_test(_) ->
ct:print("Subscribe failure test starting"),
{ok, C, _} = emqx_client:start_link([]),
{ok, _, [16#80]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2),
timer:sleep(10),
ct:print("Subscribe failure test succeeded").
%% subscribe_failure_test(_) ->
%% ct:print("Subscribe failure test starting"),
%% {ok, C, _} = emqx_client:start_link([]),
%% {ok, _, [2]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2),
%% timer:sleep(10),
%% ct:print("Subscribe failure test succeeded").
dollar_topics_test(_) ->
ct:print("$ topics test starting"),
{ok, C, _} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]),
{ok, _, [2]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 2),
{ok, _} = emqx_client:publish(C, <<"$", (nth(2, ?TOPICS))>>,
<<"">>, [{qos, 1}, {retain, false}]),
{ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1),
{ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
<<"test">>, [{qos, 1}, {retain, false}]),
timer:sleep(10),
?assertEqual(0, length(receive_messages(1))),
ok = emqx_client:disconnect(C),

View File

@ -86,7 +86,7 @@ t_infinity_simple_mqueue(_) ->
?assertEqual(<<1>>, V#message.payload).
t_priority_mqueue(_) ->
Opts = #{type => priority, max_len => 3, store_qos0 => false},
Opts = #{type => priority, max_len => 3, priorities => [{<<"t1">>, 1}, {<<"t2">>, 2}, {<<"t3">>, 3}], store_qos0 => false},
Q = ?Q:init(Opts),
?assertEqual(priority, ?Q:type(Q)),
?assertEqual(3, ?Q:max_len(Q)),
@ -103,10 +103,10 @@ t_priority_mqueue(_) ->
?assertEqual(5, ?Q:len(Q6)),
{{value, Msg}, Q7} = ?Q:out(Q6),
?assertEqual(4, ?Q:len(Q7)),
?assertEqual(<<"t1">>, Msg#message.topic).
?assertEqual(<<"t3">>, Msg#message.topic).
t_infinity_priority_mqueue(_) ->
Opts = #{type => priority, max_len => 0, store_qos0 => false},
Opts = #{type => priority, max_len => 0, priorities => [{<<"t">>, 1}, {<<"t1">>, 2}], store_qos0 => false},
Q = ?Q:init(Opts),
?assertEqual(0, ?Q:max_len(Q)),
Qx = lists:foldl(fun(I, AccQ) ->