feat(mqueue): Interleave messages with different priorities
This commit is contained in:
parent
4fa816fa97
commit
20d4652454
|
@ -93,6 +93,11 @@
|
|||
-define(MAX_LEN_INFINITY, 0).
|
||||
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
|
||||
|
||||
-record(shift_opts, {
|
||||
multiplier :: non_neg_integer(),
|
||||
base :: integer()
|
||||
}).
|
||||
|
||||
-record(mqueue, {
|
||||
store_qos0 = false :: boolean(),
|
||||
max_len = ?MAX_LEN_INFINITY :: count(),
|
||||
|
@ -100,7 +105,10 @@
|
|||
dropped = 0 :: count(),
|
||||
p_table = ?NO_PRIORITY_TABLE :: p_table(),
|
||||
default_p = ?LOWEST_PRIORITY :: priority(),
|
||||
q = ?PQUEUE:new() :: pq()
|
||||
q = ?PQUEUE:new() :: pq(),
|
||||
shift_opts :: #shift_opts{},
|
||||
last_prio :: non_neg_integer() | undefined,
|
||||
p_credit :: non_neg_integer() | undefined
|
||||
}).
|
||||
|
||||
-type(mqueue() :: #mqueue{}).
|
||||
|
@ -114,7 +122,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
|
|||
#mqueue{max_len = MaxLen,
|
||||
store_qos0 = QoS_0,
|
||||
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
|
||||
default_p = get_priority_opt(Opts)
|
||||
default_p = get_priority_opt(Opts),
|
||||
shift_opts = get_shift_opt(Opts)
|
||||
}.
|
||||
|
||||
-spec(info(mqueue()) -> emqx_types:infos()).
|
||||
|
@ -173,9 +182,24 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
|||
out(MQ = #mqueue{len = 0, q = Q}) ->
|
||||
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
|
||||
{empty, MQ};
|
||||
out(MQ = #mqueue{q = Q, len = Len}) ->
|
||||
out(MQ = #mqueue{q = Q, len = Len, last_prio = undefined, shift_opts = ShiftOpts}) ->
|
||||
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
|
||||
MQ1 = MQ#mqueue{
|
||||
q = Q1,
|
||||
len = Len - 1,
|
||||
last_prio = Prio,
|
||||
p_credit = get_credits(Prio, ShiftOpts)
|
||||
},
|
||||
{{value, Val}, MQ1};
|
||||
out(MQ = #mqueue{q = Q, p_credit = 0}) ->
|
||||
MQ1 = MQ#mqueue{
|
||||
q = ?PQUEUE:shift(Q),
|
||||
last_prio = undefined
|
||||
},
|
||||
out(MQ1);
|
||||
out(MQ = #mqueue{q = Q, len = Len, p_credit = Cnt}) ->
|
||||
{R, Q1} = ?PQUEUE:out(Q),
|
||||
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
|
||||
{R, MQ#mqueue{q = Q1, len = Len - 1, p_credit = Cnt - 1}}.
|
||||
|
||||
get_opt(Key, Opts, Default) ->
|
||||
case maps:get(Key, Opts, Default) of
|
||||
|
@ -196,3 +220,35 @@ get_priority_opt(Opts) ->
|
|||
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
|
||||
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
|
||||
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
|
||||
|
||||
get_credits(?HIGHEST_PRIORITY, Opts) ->
|
||||
Infinity = 1000000,
|
||||
get_credits(Infinity, Opts);
|
||||
get_credits(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
|
||||
(Prio + Base + 1) * Mult - 1.
|
||||
|
||||
get_shift_opt(Opts) ->
|
||||
%% Using 10 as a multiplier by default. This is needed to minimize
|
||||
%% overhead of ?PQUEUE:rotate
|
||||
Mult = maps:get(shift_multiplier, Opts, 10),
|
||||
true = is_integer(Mult) andalso Mult > 0,
|
||||
Min = case Opts of
|
||||
#{p_table := PTab} ->
|
||||
case maps:size(PTab) of
|
||||
0 -> 0;
|
||||
_ -> lists:min(maps:values(PTab))
|
||||
end;
|
||||
_ ->
|
||||
?LOWEST_PRIORITY
|
||||
end,
|
||||
%% `mqueue' module supports negative priorities, but we don't want
|
||||
%% the counter to be negative, so all priorities should be shifted
|
||||
%% by a constant, if negative priorities are used:
|
||||
Base = case Min < 0 of
|
||||
true -> -Min;
|
||||
false -> 0
|
||||
end,
|
||||
#shift_opts{
|
||||
multiplier = Mult,
|
||||
base = Base
|
||||
}.
|
||||
|
|
|
@ -55,6 +55,7 @@
|
|||
, filter/2
|
||||
, fold/3
|
||||
, highest/1
|
||||
, shift/1
|
||||
]).
|
||||
|
||||
-export_type([q/0]).
|
||||
|
@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
|
|||
end,
|
||||
{R, NewQ}.
|
||||
|
||||
-spec(shift(pqueue()) -> pqueue()).
|
||||
shift(Q = {queue, _, _, _}) ->
|
||||
Q;
|
||||
shift({pqueue, []}) ->
|
||||
{pqueue, []}; %% Shouldn't happen?
|
||||
shift({pqueue, [Hd|Rest]}) ->
|
||||
{pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
|
||||
|
||||
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
|
||||
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
|
||||
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
|
||||
|
@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
|
|||
|
||||
maybe_negate_priority(infinity) -> infinity;
|
||||
maybe_negate_priority(P) -> -P.
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
|
||||
-include_lib("proper/include/proper.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(Q, emqx_mqueue).
|
||||
|
@ -120,9 +121,88 @@ t_priority_mqueue(_) ->
|
|||
?assertEqual(5, ?Q:len(Q5)),
|
||||
{_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
|
||||
?assertEqual(5, ?Q:len(Q6)),
|
||||
{{value, Msg}, Q7} = ?Q:out(Q6),
|
||||
?assertEqual(4, ?Q:len(Q7)),
|
||||
?assertEqual(<<"t3">>, Msg#message.topic).
|
||||
{{value, _Msg}, Q7} = ?Q:out(Q6),
|
||||
?assertEqual(4, ?Q:len(Q7)).
|
||||
|
||||
t_priority_mqueue_conservation(_) ->
|
||||
true = proper:quickcheck(conservation_prop()).
|
||||
|
||||
t_priority_order(_) ->
|
||||
Opts = #{max_len => 5,
|
||||
shift_multiplier => 1,
|
||||
priorities =>
|
||||
#{<<"t1">> => 0,
|
||||
<<"t2">> => 1,
|
||||
<<"t3">> => 2
|
||||
},
|
||||
store_qos0 => false
|
||||
},
|
||||
Messages = [{Topic, Message} ||
|
||||
Topic <- [<<"t1">>, <<"t2">>, <<"t3">>],
|
||||
Message <- lists:seq(1, 10)],
|
||||
Q = lists:foldl(fun({Topic, Message}, Q) ->
|
||||
element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q))
|
||||
end,
|
||||
?Q:init(Opts),
|
||||
Messages),
|
||||
?assertMatch([{<<"t3">>, 6},
|
||||
{<<"t3">>, 7},
|
||||
{<<"t3">>, 8},
|
||||
|
||||
{<<"t2">>, 6},
|
||||
{<<"t2">>, 7},
|
||||
|
||||
{<<"t1">>, 6},
|
||||
|
||||
{<<"t3">>, 9},
|
||||
{<<"t3">>, 10},
|
||||
|
||||
{<<"t2">>, 8},
|
||||
|
||||
%% Note: for performance reasons we don't reset the
|
||||
%% counter when we run out of messages with the
|
||||
%% current prio, so next is t1:
|
||||
{<<"t1">>, 7},
|
||||
|
||||
{<<"t2">>, 9},
|
||||
{<<"t2">>, 10},
|
||||
|
||||
{<<"t1">>, 8},
|
||||
{<<"t1">>, 9},
|
||||
{<<"t1">>, 10}
|
||||
], drain(Q)).
|
||||
|
||||
t_priority_order2(_) ->
|
||||
Opts = #{max_len => 5,
|
||||
shift_multiplier => 2,
|
||||
priorities =>
|
||||
#{<<"t1">> => 0,
|
||||
<<"t2">> => 1
|
||||
},
|
||||
store_qos0 => false
|
||||
},
|
||||
Messages = [{Topic, Message} ||
|
||||
Topic <- [<<"t1">>, <<"t2">>],
|
||||
Message <- lists:seq(1, 10)],
|
||||
Q = lists:foldl(fun({Topic, Message}, Q) ->
|
||||
element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q))
|
||||
end,
|
||||
?Q:init(Opts),
|
||||
Messages),
|
||||
?assertMatch([{<<"t2">>, 6},
|
||||
{<<"t2">>, 7},
|
||||
{<<"t2">>, 8},
|
||||
{<<"t2">>, 9},
|
||||
|
||||
{<<"t1">>, 6},
|
||||
{<<"t1">>, 7},
|
||||
|
||||
{<<"t2">>, 10},
|
||||
|
||||
{<<"t1">>, 8},
|
||||
{<<"t1">>, 9},
|
||||
{<<"t1">>, 10}
|
||||
], drain(Q)).
|
||||
|
||||
t_infinity_priority_mqueue(_) ->
|
||||
Opts = #{max_len => 0,
|
||||
|
@ -163,3 +243,57 @@ t_dropped(_) ->
|
|||
{Msg, Q2} = ?Q:in(Msg, Q1),
|
||||
?assertEqual(1, ?Q:dropped(Q2)).
|
||||
|
||||
conservation_prop() ->
|
||||
?FORALL({Priorities, Messages},
|
||||
?LET(Priorities, topic_priorities(),
|
||||
{Priorities, messages(Priorities)}),
|
||||
try
|
||||
Opts = #{max_len => 0,
|
||||
priorities => maps:from_list(Priorities),
|
||||
store_qos0 => false},
|
||||
%% Put messages in
|
||||
Q1 = lists:foldl(fun({Topic, Message}, Q) ->
|
||||
element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q))
|
||||
end,
|
||||
?Q:init(Opts),
|
||||
Messages),
|
||||
%% Collect messages
|
||||
Got = lists:sort(drain(Q1)),
|
||||
Expected = lists:sort(Messages),
|
||||
case Expected =:= Got of
|
||||
true ->
|
||||
true;
|
||||
false ->
|
||||
ct:pal("Mismatch: expected ~p~nGot ~p~n", [Expected, Got]),
|
||||
false
|
||||
end
|
||||
catch
|
||||
EC:Err:Stack ->
|
||||
ct:pal("Error: ~p", [{EC, Err, Stack}]),
|
||||
false
|
||||
end).
|
||||
|
||||
%% Proper generators:
|
||||
|
||||
topic(Priorities) ->
|
||||
{Topics, _} = lists:unzip(Priorities),
|
||||
oneof(Topics).
|
||||
|
||||
topic_priorities() ->
|
||||
non_empty(list({binary(), priority()})).
|
||||
|
||||
priority() ->
|
||||
oneof([integer(), infinity]).
|
||||
|
||||
messages(Topics) ->
|
||||
list({topic(Topics), binary()}).
|
||||
|
||||
%% Internal functions:
|
||||
|
||||
drain(Q) ->
|
||||
case ?Q:out(Q) of
|
||||
{empty, _} ->
|
||||
[];
|
||||
{{value, #message{topic = T, payload = P}}, Q1} ->
|
||||
[{T, P}|drain(Q1)]
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue