Merge pull request #5666 from k32/pqueue

feat(mqueue): Interleave messages with different priorities
This commit is contained in:
k32 2021-09-07 16:14:24 +02:00 committed by GitHub
commit 1a291d5d97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 265 additions and 24 deletions

View File

@ -1,7 +1,7 @@
{application, emqx, {application, emqx,
[{id, "emqx"}, [{id, "emqx"},
{description, "EMQ X"}, {description, "EMQ X"},
{vsn, "4.3.8"}, % strict semver, bump manually! {vsn, "4.3.9"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]}, {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]},

View File

@ -1,22 +1,32 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
Instructions = Instructions =
{"4.3.8", {"4.3.9",
[ [
{"4.3.8", [
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]},
{"4.3.7", [ {"4.3.7", [
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]} {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.6", [ {"4.3.6", [
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.5", [ {"4.3.5", [
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.4", [ {"4.3.4", [
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -24,7 +34,9 @@ Instructions =
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.3", [ {"4.3.3", [
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -34,7 +46,9 @@ Instructions =
{load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.2", [ {"4.3.2", [
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -47,7 +61,9 @@ Instructions =
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.1", [ {"4.3.1", [
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -65,7 +81,9 @@ Instructions =
{load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]}, ]},
{"4.3.0", [ {"4.3.0", [
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
@ -175,7 +193,9 @@ Instructions =
{load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]} {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
]}, ]},
{<<".*">>,[]}]}, {<<".*">>,[]}]},

View File

@ -67,6 +67,9 @@
, dropped/1 , dropped/1
]). ]).
-export([ live_upgrade/1
]).
-export_type([mqueue/0, options/0]). -export_type([mqueue/0, options/0]).
-type(topic() :: emqx_topic:topic()). -type(topic() :: emqx_topic:topic()).
@ -91,6 +94,11 @@
-define(MAX_LEN_INFINITY, 0). -define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]). -define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-record(shift_opts, {
multiplier :: non_neg_integer(),
base :: integer()
}).
-record(mqueue, { -record(mqueue, {
store_qos0 = false :: boolean(), store_qos0 = false :: boolean(),
max_len = ?MAX_LEN_INFINITY :: count(), max_len = ?MAX_LEN_INFINITY :: count(),
@ -98,11 +106,16 @@
dropped = 0 :: count(), dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(), p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(), default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq() q = ?PQUEUE:new() :: pq(),
shift_opts :: #shift_opts{},
last_p :: non_neg_integer() | undefined,
counter :: non_neg_integer() | undefined
}). }).
-type(mqueue() :: #mqueue{}). -type(mqueue() :: #mqueue{}).
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
-spec(init(options()) -> mqueue()). -spec(init(options()) -> mqueue()).
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
#mqueue{max_len = MaxLen, #mqueue{max_len = MaxLen,
store_qos0 = QoS_0, store_qos0 = QoS_0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts) default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
}. }.
-spec(info(mqueue()) -> emqx_types:infos()). -spec(info(mqueue()) -> emqx_types:infos()).
@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) ->
info(len, #mqueue{len = Len}) -> info(len, #mqueue{len = Len}) ->
Len; Len;
info(dropped, #mqueue{dropped = Dropped}) -> info(dropped, #mqueue{dropped = Dropped}) ->
Dropped. Dropped;
info(Info, ?OLD(MQ)) ->
info(Info, live_upgrade(MQ)).
is_empty(#mqueue{len = Len}) -> Len =:= 0. is_empty(#mqueue{len = Len}) -> Len =:= 0;
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
len(#mqueue{len = Len}) -> Len. len(#mqueue{len = Len}) -> Len;
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
max_len(#mqueue{max_len = MaxLen}) -> MaxLen. max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
%% @doc Return number of dropped messages. %% @doc Return number of dropped messages.
-spec(dropped(mqueue()) -> count()). -spec(dropped(mqueue()) -> count()).
dropped(#mqueue{dropped = Dropped}) -> Dropped. dropped(#mqueue{dropped = Dropped}) -> Dropped;
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
%% @doc Stats of the mqueue %% @doc Stats of the mqueue
-spec(stats(mqueue()) -> [stat()]). -spec(stats(mqueue()) -> [stat()]).
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]. [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
stats(?OLD(MQ)) ->
stats(live_upgrade(MQ)).
%% @doc Enqueue a message. %% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false -> false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
end. end;
in(Msg, ?OLD(MQ)) ->
in(Msg, live_upgrade(MQ)).
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}). -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
out(MQ = #mqueue{len = 0, q = Q}) -> out(MQ = #mqueue{len = 0, q = Q}) ->
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
{empty, MQ}; {empty, MQ};
out(MQ = #mqueue{q = Q, len = Len}) -> out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
MQ1 = MQ#mqueue{
q = Q1,
len = Len - 1,
last_p = Prio,
counter = init_counter(Prio, ShiftOpts)
},
{{value, Val}, MQ1};
out(MQ = #mqueue{q = Q, counter = 0}) ->
MQ1 = MQ#mqueue{
q = ?PQUEUE:shift(Q),
last_p = undefined
},
out(MQ1);
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
{R, Q1} = ?PQUEUE:out(Q), {R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1}}. {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
out(?OLD(MQ)) ->
out(live_upgrade(MQ)).
get_opt(Key, Opts, Default) -> get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of case maps:get(Key, Opts, Default) of
@ -194,3 +235,46 @@ get_priority_opt(Opts) ->
%% while the highest 'infinity' is a [{infinity, queue:queue()}] %% while the highest 'infinity' is a [{infinity, queue:queue()}]
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
init_counter(?HIGHEST_PRIORITY, Opts) ->
Infinity = 1000000,
init_counter(Infinity, Opts);
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
(Prio + Base) * Mult.
get_shift_opt(Opts) ->
Mult = maps:get(shift_multiplier, Opts, 10),
Min = case Opts of
#{p_table := PTab} ->
case maps:size(PTab) of
0 -> 0;
_ -> lists:min(maps:values(PTab))
end;
_ ->
?LOWEST_PRIORITY
end,
Base = case Min < 0 of
true -> -Min;
false -> 0
end,
#shift_opts{
multiplier = Mult,
base = Base
}.
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
ShiftOpts = case is_map(PTable) of
true -> get_shift_opt(#{p_table => PTable});
false -> get_shift_opt(#{})
end,
#mqueue{ store_qos0 = StoreQos0
, max_len = MaxLen
, dropped = Dropped
, p_table = PTable
, default_p = DefaultP
, len = Len
, q = Q
, shift_opts = ShiftOpts
, last_p = undefined
, counter = undefined
}.

View File

@ -55,6 +55,7 @@
, filter/2 , filter/2
, fold/3 , fold/3
, highest/1 , highest/1
, shift/1
]). ]).
-export_type([q/0]). -export_type([q/0]).
@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
end, end,
{R, NewQ}. {R, NewQ}.
-spec(shift(pqueue()) -> pqueue()).
shift(Q = {queue, _, _, _}) ->
Q;
shift({pqueue, []}) ->
{pqueue, []}; %% Shouldn't happen?
shift({pqueue, [Hd|Rest]}) ->
{pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). -spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(infinity) -> infinity;
maybe_negate_priority(P) -> -P. maybe_negate_priority(P) -> -P.

View File

@ -22,6 +22,7 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(Q, emqx_mqueue). -define(Q, emqx_mqueue).
@ -121,8 +122,55 @@ t_priority_mqueue(_) ->
{_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), {_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
?assertEqual(5, ?Q:len(Q6)), ?assertEqual(5, ?Q:len(Q6)),
{{value, Msg}, Q7} = ?Q:out(Q6), {{value, Msg}, Q7} = ?Q:out(Q6),
?assertEqual(4, ?Q:len(Q7)), ?assertEqual(4, ?Q:len(Q7)).
?assertEqual(<<"t3">>, Msg#message.topic).
t_priority_mqueue_conservation(_) ->
true = proper:quickcheck(conservation_prop()).
t_priority_order(_) ->
Opts = #{max_len => 5,
shift_multiplier => 1,
priorities =>
#{<<"t1">> => 0,
<<"t2">> => 1,
<<"t3">> => 2
},
store_qos0 => false
},
Messages = [{Topic, Message} ||
Topic <- [<<"t1">>, <<"t2">>, <<"t3">>],
Message <- lists:seq(1, 10)],
Q = lists:foldl(fun({Topic, Message}, Q) ->
element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q))
end,
?Q:init(Opts),
Messages),
?assertMatch([{<<"t3">>, 6},
{<<"t3">>, 7},
{<<"t3">>, 8},
{<<"t2">>, 6},
{<<"t2">>, 7},
{<<"t1">>, 6},
{<<"t3">>, 9},
{<<"t3">>, 10},
{<<"t2">>, 8},
%% Note: for performance reasons we don't reset the
%% counter when we run out of messages with the
%% current prio, so next is t1:
{<<"t1">>, 7},
{<<"t2">>, 9},
{<<"t2">>, 10},
{<<"t1">>, 8},
{<<"t1">>, 9},
{<<"t1">>, 10}
], drain(Q)).
t_infinity_priority_mqueue(_) -> t_infinity_priority_mqueue(_) ->
Opts = #{max_len => 0, Opts = #{max_len => 0,
@ -163,3 +211,84 @@ t_dropped(_) ->
{Msg, Q2} = ?Q:in(Msg, Q1), {Msg, Q2} = ?Q:in(Msg, Q1),
?assertEqual(1, ?Q:dropped(Q2)). ?assertEqual(1, ?Q:dropped(Q2)).
t_live_upgrade(_) ->
Q = {mqueue,true,1,0,0,none,0,
{queue,[],[],0}},
?assertMatch(#{}, ?Q:info(Q)),
?assertMatch(true, ?Q:is_empty(Q)),
?assertMatch(0, ?Q:len(Q)),
?assertMatch(1, ?Q:max_len(Q)),
?assertMatch({undefined, _}, ?Q:in(#message{qos = 0, topic = <<>>}, Q)),
?assertMatch({empty, _}, ?Q:out(Q)),
?assertMatch([_|_], ?Q:stats(Q)),
?assertMatch(0, ?Q:dropped(Q)).
t_live_upgrade2(_) ->
Q = {mqueue,false,10,0,0,
#{<<"t">> => 1},
0,
{queue,[],[],0}},
?assertMatch(#{}, ?Q:info(Q)),
?assertMatch(true, ?Q:is_empty(Q)),
?assertMatch(0, ?Q:len(Q)),
?assertMatch(10, ?Q:max_len(Q)),
?assertMatch({_, _}, ?Q:in(#message{qos = 0, topic = <<>>}, Q)),
?assertMatch({empty, _}, ?Q:out(Q)),
?assertMatch([_|_], ?Q:stats(Q)),
?assertMatch(0, ?Q:dropped(Q)).
conservation_prop() ->
?FORALL({Priorities, Messages},
?LET(Priorities, topic_priorities(),
{Priorities, messages(Priorities)}),
try
Opts = #{max_len => 0,
priorities => maps:from_list(Priorities),
store_qos0 => false},
%% Put messages in
Q1 = lists:foldl(fun({Topic, Message}, Q) ->
element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q))
end,
?Q:init(Opts),
Messages),
%% Collect messages
Got = lists:sort(drain(Q1)),
Expected = lists:sort(Messages),
case Expected =:= Got of
true ->
true;
false ->
ct:pal("Mismatch: expected ~p~nGot ~p~n", [Expected, Got]),
false
end
catch
EC:Err:Stack ->
ct:pal("Error: ~p", [{EC, Err, Stack}]),
false
end).
%% Proper generators:
topic(Priorities) ->
{Topics, _} = lists:unzip(Priorities),
oneof(Topics).
topic_priorities() ->
non_empty(list({binary(), priority()})).
priority() ->
oneof([integer(), infinity]).
messages(Topics) ->
list({topic(Topics), binary()}).
%% Internal functions:
drain(Q) ->
case ?Q:out(Q) of
{empty, _} ->
[];
{{value, #message{topic = T, payload = P}}, Q1} ->
[{T, P}|drain(Q1)]
end.