diff --git a/src/emqx.appup.src b/src/emqx.appup.src index a10c1d209..442ffe200 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -19,7 +19,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ {add_module, emqx_congestion}, @@ -37,7 +39,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ {add_module, emqx_congestion}, @@ -52,7 +56,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -65,7 +71,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, @@ -77,7 +85,13 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + ]}, + {<<"4.2.[6-7]">>, [ + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<".*">>, []} ], @@ -99,7 +113,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -117,7 +133,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -132,7 +150,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -145,7 +165,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, @@ -157,7 +179,13 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + ]}, + {<<"4.2.[6-7]">>, [ + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<".*">>, []} ] diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index f77a1d98f..f888f69a3 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -67,6 +67,9 @@ , dropped/1 ]). +-export([ live_upgrade/1 + ]). + -export_type([mqueue/0, options/0]). -type(topic() :: emqx_topic:topic()). @@ -91,6 +94,11 @@ -define(MAX_LEN_INFINITY, 0). -define(INFO_KEYS, [store_qos0, max_len, len, dropped]). +-record(shift_opts, { + multiplier :: non_neg_integer(), + base :: integer() + }). + -record(mqueue, { store_qos0 = false :: boolean(), max_len = ?MAX_LEN_INFINITY :: count(), @@ -98,11 +106,16 @@ dropped = 0 :: count(), p_table = ?NO_PRIORITY_TABLE :: p_table(), default_p = ?LOWEST_PRIORITY :: priority(), - q = ?PQUEUE:new() :: pq() + q = ?PQUEUE:new() :: pq(), + shift_opts :: #shift_opts{}, + last_p :: non_neg_integer() | undefined, + counter :: non_neg_integer() | undefined }). -type(mqueue() :: #mqueue{}). +-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}). + -spec(init(options()) -> mqueue()). init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of @@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> #mqueue{max_len = MaxLen, store_qos0 = QoS_0, p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), - default_p = get_priority_opt(Opts) + default_p = get_priority_opt(Opts), + shift_opts = get_shift_opt(Opts) }. -spec(info(mqueue()) -> emqx_types:infos()). @@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) -> info(len, #mqueue{len = Len}) -> Len; info(dropped, #mqueue{dropped = Dropped}) -> - Dropped. + Dropped; +info(Info, ?OLD(MQ)) -> + info(Info, live_upgrade(MQ)). -is_empty(#mqueue{len = Len}) -> Len =:= 0. +is_empty(#mqueue{len = Len}) -> Len =:= 0; +is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)). -len(#mqueue{len = Len}) -> Len. +len(#mqueue{len = Len}) -> Len; +len(?OLD(MQ)) -> len(live_upgrade(MQ)). -max_len(#mqueue{max_len = MaxLen}) -> MaxLen. +max_len(#mqueue{max_len = MaxLen}) -> MaxLen; +max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)). %% @doc Return number of dropped messages. -spec(dropped(mqueue()) -> count()). -dropped(#mqueue{dropped = Dropped}) -> Dropped. +dropped(#mqueue{dropped = Dropped}) -> Dropped; +dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)). %% @doc Stats of the mqueue -spec(stats(mqueue()) -> [stat()]). stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> - [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]. + [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]; +stats(?OLD(MQ)) -> + stats(live_upgrade(MQ)). %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). @@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; false -> {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} - end. + end; +in(Msg, ?OLD(MQ)) -> + in(Msg, live_upgrade(MQ)). -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}). out(MQ = #mqueue{len = 0, q = Q}) -> 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap {empty, MQ}; -out(MQ = #mqueue{q = Q, len = Len}) -> +out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) -> + {{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length + MQ1 = MQ#mqueue{ + q = Q1, + len = Len - 1, + last_p = Prio, + counter = init_counter(Prio, ShiftOpts) + }, + {{value, Val}, MQ1}; +out(MQ = #mqueue{q = Q, counter = 0}) -> + MQ1 = MQ#mqueue{ + q = ?PQUEUE:shift(Q), + last_p = undefined + }, + out(MQ1); +out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) -> {R, Q1} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q1, len = Len - 1}}. + {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}; +out(?OLD(MQ)) -> + out(live_upgrade(MQ)). get_opt(Key, Opts, Default) -> case maps:get(Key, Opts, Default) of @@ -194,3 +235,46 @@ get_priority_opt(Opts) -> %% while the highest 'infinity' is a [{infinity, queue:queue()}] get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). + +init_counter(?HIGHEST_PRIORITY, Opts) -> + Infinity = 1000000, + init_counter(Infinity, Opts); +init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) -> + (Prio + Base) * Mult. + +get_shift_opt(Opts) -> + Mult = maps:get(shift_multiplier, Opts, 10), + Min = case Opts of + #{p_table := PTab} -> + case maps:size(PTab) of + 0 -> 0; + _ -> lists:min(maps:values(PTab)) + end; + _ -> + ?LOWEST_PRIORITY + end, + Base = case Min < 0 of + true -> -Min; + false -> 0 + end, + #shift_opts{ + multiplier = Mult, + base = Base + }. + +live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) -> + ShiftOpts = case is_map(PTable) of + true -> get_shift_opt(#{p_table => PTable}); + false -> get_shift_opt(#{}) + end, + #mqueue{ store_qos0 = StoreQos0 + , max_len = MaxLen + , dropped = Dropped + , p_table = PTable + , default_p = DefaultP + , len = Len + , q = Q + , shift_opts = ShiftOpts + , last_p = undefined + , counter = undefined + }. diff --git a/src/emqx_pqueue.erl b/src/emqx_pqueue.erl index 85c89866d..5dd81af0b 100644 --- a/src/emqx_pqueue.erl +++ b/src/emqx_pqueue.erl @@ -55,6 +55,7 @@ , filter/2 , fold/3 , highest/1 + , shift/1 ]). -export_type([q/0]). @@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +-spec(shift(pqueue()) -> pqueue()). +shift(Q = {queue, _, _, _}) -> + Q; +shift({pqueue, []}) -> + {pqueue, []}; %% Shouldn't happen? +shift({pqueue, [Hd|Rest]}) -> + {pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities. + -spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). @@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(P) -> -P. -