feat(mqueue): Interleave messages with different priorities
This commit is contained in:
parent
ca9b90be9a
commit
903a9e57a8
|
@ -19,7 +19,9 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
{add_module, emqx_congestion},
|
{add_module, emqx_congestion},
|
||||||
|
@ -37,7 +39,9 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[23]">>, [
|
{<<"4.2.[23]">>, [
|
||||||
{add_module, emqx_congestion},
|
{add_module, emqx_congestion},
|
||||||
|
@ -52,7 +56,9 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.4">>, [
|
{<<"4.2.4">>, [
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
@ -65,7 +71,9 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.5">>, [
|
{<<"4.2.5">>, [
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
@ -77,7 +85,13 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{<<"4.2.[6-7]">>, [
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
|
@ -99,7 +113,9 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{"4.2.1", [
|
{"4.2.1", [
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
|
@ -117,7 +133,9 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[23]">>, [
|
{<<"4.2.[23]">>, [
|
||||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||||
|
@ -132,7 +150,9 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.4">>, [
|
{<<"4.2.4">>, [
|
||||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
@ -145,7 +165,9 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.5">>, [
|
{<<"4.2.5">>, [
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
@ -157,7 +179,13 @@
|
||||||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
|
]},
|
||||||
|
{<<"4.2.[6-7]">>, [
|
||||||
|
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -67,6 +67,9 @@
|
||||||
, dropped/1
|
, dropped/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ live_upgrade/1
|
||||||
|
]).
|
||||||
|
|
||||||
-export_type([mqueue/0, options/0]).
|
-export_type([mqueue/0, options/0]).
|
||||||
|
|
||||||
-type(topic() :: emqx_topic:topic()).
|
-type(topic() :: emqx_topic:topic()).
|
||||||
|
@ -91,6 +94,11 @@
|
||||||
-define(MAX_LEN_INFINITY, 0).
|
-define(MAX_LEN_INFINITY, 0).
|
||||||
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
|
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
|
||||||
|
|
||||||
|
-record(shift_opts, {
|
||||||
|
multiplier :: non_neg_integer(),
|
||||||
|
base :: integer()
|
||||||
|
}).
|
||||||
|
|
||||||
-record(mqueue, {
|
-record(mqueue, {
|
||||||
store_qos0 = false :: boolean(),
|
store_qos0 = false :: boolean(),
|
||||||
max_len = ?MAX_LEN_INFINITY :: count(),
|
max_len = ?MAX_LEN_INFINITY :: count(),
|
||||||
|
@ -98,11 +106,16 @@
|
||||||
dropped = 0 :: count(),
|
dropped = 0 :: count(),
|
||||||
p_table = ?NO_PRIORITY_TABLE :: p_table(),
|
p_table = ?NO_PRIORITY_TABLE :: p_table(),
|
||||||
default_p = ?LOWEST_PRIORITY :: priority(),
|
default_p = ?LOWEST_PRIORITY :: priority(),
|
||||||
q = ?PQUEUE:new() :: pq()
|
q = ?PQUEUE:new() :: pq(),
|
||||||
|
shift_opts :: #shift_opts{},
|
||||||
|
last_p :: non_neg_integer() | undefined,
|
||||||
|
counter :: non_neg_integer() | undefined
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(mqueue() :: #mqueue{}).
|
-type(mqueue() :: #mqueue{}).
|
||||||
|
|
||||||
|
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
|
||||||
|
|
||||||
-spec(init(options()) -> mqueue()).
|
-spec(init(options()) -> mqueue()).
|
||||||
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
|
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
|
||||||
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
|
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
|
||||||
|
@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
|
||||||
#mqueue{max_len = MaxLen,
|
#mqueue{max_len = MaxLen,
|
||||||
store_qos0 = QoS_0,
|
store_qos0 = QoS_0,
|
||||||
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
|
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
|
||||||
default_p = get_priority_opt(Opts)
|
default_p = get_priority_opt(Opts),
|
||||||
|
shift_opts = get_shift_opt(Opts)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
-spec(info(mqueue()) -> emqx_types:infos()).
|
-spec(info(mqueue()) -> emqx_types:infos()).
|
||||||
|
@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) ->
|
||||||
info(len, #mqueue{len = Len}) ->
|
info(len, #mqueue{len = Len}) ->
|
||||||
Len;
|
Len;
|
||||||
info(dropped, #mqueue{dropped = Dropped}) ->
|
info(dropped, #mqueue{dropped = Dropped}) ->
|
||||||
Dropped.
|
Dropped;
|
||||||
|
info(Info, ?OLD(MQ)) ->
|
||||||
|
info(Info, live_upgrade(MQ)).
|
||||||
|
|
||||||
is_empty(#mqueue{len = Len}) -> Len =:= 0.
|
is_empty(#mqueue{len = Len}) -> Len =:= 0;
|
||||||
|
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
|
||||||
|
|
||||||
len(#mqueue{len = Len}) -> Len.
|
len(#mqueue{len = Len}) -> Len;
|
||||||
|
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
|
||||||
|
|
||||||
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
|
max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
|
||||||
|
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
|
||||||
|
|
||||||
%% @doc Return number of dropped messages.
|
%% @doc Return number of dropped messages.
|
||||||
-spec(dropped(mqueue()) -> count()).
|
-spec(dropped(mqueue()) -> count()).
|
||||||
dropped(#mqueue{dropped = Dropped}) -> Dropped.
|
dropped(#mqueue{dropped = Dropped}) -> Dropped;
|
||||||
|
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
|
||||||
|
|
||||||
%% @doc Stats of the mqueue
|
%% @doc Stats of the mqueue
|
||||||
-spec(stats(mqueue()) -> [stat()]).
|
-spec(stats(mqueue()) -> [stat()]).
|
||||||
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
|
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
|
||||||
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
|
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
|
||||||
|
stats(?OLD(MQ)) ->
|
||||||
|
stats(live_upgrade(MQ)).
|
||||||
|
|
||||||
%% @doc Enqueue a message.
|
%% @doc Enqueue a message.
|
||||||
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
|
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
|
||||||
|
@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
||||||
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
|
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
|
||||||
false ->
|
false ->
|
||||||
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
|
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
|
||||||
end.
|
end;
|
||||||
|
in(Msg, ?OLD(MQ)) ->
|
||||||
|
in(Msg, live_upgrade(MQ)).
|
||||||
|
|
||||||
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
|
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
|
||||||
out(MQ = #mqueue{len = 0, q = Q}) ->
|
out(MQ = #mqueue{len = 0, q = Q}) ->
|
||||||
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
|
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
|
||||||
{empty, MQ};
|
{empty, MQ};
|
||||||
out(MQ = #mqueue{q = Q, len = Len}) ->
|
out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
|
||||||
|
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
|
||||||
|
MQ1 = MQ#mqueue{
|
||||||
|
q = Q1,
|
||||||
|
len = Len - 1,
|
||||||
|
last_p = Prio,
|
||||||
|
counter = init_counter(Prio, ShiftOpts)
|
||||||
|
},
|
||||||
|
{{value, Val}, MQ1};
|
||||||
|
out(MQ = #mqueue{q = Q, counter = 0}) ->
|
||||||
|
MQ1 = MQ#mqueue{
|
||||||
|
q = ?PQUEUE:shift(Q),
|
||||||
|
last_p = undefined
|
||||||
|
},
|
||||||
|
out(MQ1);
|
||||||
|
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
|
||||||
{R, Q1} = ?PQUEUE:out(Q),
|
{R, Q1} = ?PQUEUE:out(Q),
|
||||||
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
|
{R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
|
||||||
|
out(?OLD(MQ)) ->
|
||||||
|
out(live_upgrade(MQ)).
|
||||||
|
|
||||||
get_opt(Key, Opts, Default) ->
|
get_opt(Key, Opts, Default) ->
|
||||||
case maps:get(Key, Opts, Default) of
|
case maps:get(Key, Opts, Default) of
|
||||||
|
@ -194,3 +235,46 @@ get_priority_opt(Opts) ->
|
||||||
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
|
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
|
||||||
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
|
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
|
||||||
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
|
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
|
||||||
|
|
||||||
|
init_counter(?HIGHEST_PRIORITY, Opts) ->
|
||||||
|
Infinity = 1000000,
|
||||||
|
init_counter(Infinity, Opts);
|
||||||
|
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
|
||||||
|
(Prio + Base) * Mult.
|
||||||
|
|
||||||
|
get_shift_opt(Opts) ->
|
||||||
|
Mult = maps:get(shift_multiplier, Opts, 10),
|
||||||
|
Min = case Opts of
|
||||||
|
#{p_table := PTab} ->
|
||||||
|
case maps:size(PTab) of
|
||||||
|
0 -> 0;
|
||||||
|
_ -> lists:min(maps:values(PTab))
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
?LOWEST_PRIORITY
|
||||||
|
end,
|
||||||
|
Base = case Min < 0 of
|
||||||
|
true -> -Min;
|
||||||
|
false -> 0
|
||||||
|
end,
|
||||||
|
#shift_opts{
|
||||||
|
multiplier = Mult,
|
||||||
|
base = Base
|
||||||
|
}.
|
||||||
|
|
||||||
|
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
|
||||||
|
ShiftOpts = case is_map(PTable) of
|
||||||
|
true -> get_shift_opt(#{p_table => PTable});
|
||||||
|
false -> get_shift_opt(#{})
|
||||||
|
end,
|
||||||
|
#mqueue{ store_qos0 = StoreQos0
|
||||||
|
, max_len = MaxLen
|
||||||
|
, dropped = Dropped
|
||||||
|
, p_table = PTable
|
||||||
|
, default_p = DefaultP
|
||||||
|
, len = Len
|
||||||
|
, q = Q
|
||||||
|
, shift_opts = ShiftOpts
|
||||||
|
, last_p = undefined
|
||||||
|
, counter = undefined
|
||||||
|
}.
|
||||||
|
|
|
@ -55,6 +55,7 @@
|
||||||
, filter/2
|
, filter/2
|
||||||
, fold/3
|
, fold/3
|
||||||
, highest/1
|
, highest/1
|
||||||
|
, shift/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export_type([q/0]).
|
-export_type([q/0]).
|
||||||
|
@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
|
||||||
end,
|
end,
|
||||||
{R, NewQ}.
|
{R, NewQ}.
|
||||||
|
|
||||||
|
-spec(shift(pqueue()) -> pqueue()).
|
||||||
|
shift(Q = {queue, _, _, _}) ->
|
||||||
|
Q;
|
||||||
|
shift({pqueue, []}) ->
|
||||||
|
{pqueue, []}; %% Shouldn't happen?
|
||||||
|
shift({pqueue, [Hd|Rest]}) ->
|
||||||
|
{pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
|
||||||
|
|
||||||
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
|
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
|
||||||
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
|
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
|
||||||
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
|
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
|
||||||
|
@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
|
||||||
|
|
||||||
maybe_negate_priority(infinity) -> infinity;
|
maybe_negate_priority(infinity) -> infinity;
|
||||||
maybe_negate_priority(P) -> -P.
|
maybe_negate_priority(P) -> -P.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue