From 5fc1036cf775d04230412edc0672ab01dfb0687a Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 6 Sep 2021 23:02:51 +0200 Subject: [PATCH] chore(mqueue): Implement live upgrade --- src/emqx_mqueue.erl | 50 ++++++++++++++++++++++++++++++++------ test/emqx_mqueue_SUITE.erl | 27 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index c47cb1b00..2f92fc140 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -67,6 +67,9 @@ , dropped/1 ]). +-export([ live_upgrade/1 + ]). + -export_type([mqueue/0, options/0]). -type(topic() :: emqx_topic:topic()). @@ -111,6 +114,8 @@ -type(mqueue() :: #mqueue{}). +-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}). + -spec(init(options()) -> mqueue()). init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of @@ -136,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) -> info(len, #mqueue{len = Len}) -> Len; info(dropped, #mqueue{dropped = Dropped}) -> - Dropped. + Dropped; +info(Info, ?OLD(MQ)) -> + info(Info, live_upgrade(MQ)). -is_empty(#mqueue{len = Len}) -> Len =:= 0. +is_empty(#mqueue{len = Len}) -> Len =:= 0; +is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)). -len(#mqueue{len = Len}) -> Len. +len(#mqueue{len = Len}) -> Len; +len(?OLD(MQ)) -> len(live_upgrade(MQ)). -max_len(#mqueue{max_len = MaxLen}) -> MaxLen. +max_len(#mqueue{max_len = MaxLen}) -> MaxLen; +max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)). %% @doc Return number of dropped messages. -spec(dropped(mqueue()) -> count()). -dropped(#mqueue{dropped = Dropped}) -> Dropped. +dropped(#mqueue{dropped = Dropped}) -> Dropped; +dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)). %% @doc Stats of the mqueue -spec(stats(mqueue()) -> [stat()]). stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> - [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]. + [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]; +stats(?OLD(MQ)) -> + stats(live_upgrade(MQ)). %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). @@ -174,7 +187,9 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; false -> {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} - end. + end; +in(Msg, ?OLD(MQ)) -> + in(Msg, live_upgrade(MQ)). -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}). out(MQ = #mqueue{len = 0, q = Q}) -> @@ -198,7 +213,9 @@ out(MQ = #mqueue{q = Q, counter = 0}) -> out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) -> ct:pal("Cnt ~p", [Cnt]), {R, Q1} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}. + {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}; +out(?OLD(MQ)) -> + out(live_upgrade(MQ)). get_opt(Key, Opts, Default) -> case maps:get(Key, Opts, Default) of @@ -245,3 +262,20 @@ get_shift_opt(Opts) -> multiplier = Mult, base = Base }. + +live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) -> + ShiftOpts = case is_map(PTable) of + true -> get_shift_opt(#{p_table => PTable}); + false -> get_shift_opt(#{}) + end, + #mqueue{ store_qos0 = StoreQos0 + , max_len = MaxLen + , dropped = Dropped + , p_table = PTable + , default_p = DefaultP + , len = Len + , q = Q + , shift_opts = ShiftOpts + , last_p = undefined + , counter = undefined + }. diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index 106b45887..599e68da1 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -211,6 +211,33 @@ t_dropped(_) -> {Msg, Q2} = ?Q:in(Msg, Q1), ?assertEqual(1, ?Q:dropped(Q2)). +t_live_upgrade(_) -> + Q = {mqueue,true,1,0,0,none,0, + {queue,[],[],0}}, + ?assertMatch(#{}, ?Q:info(Q)), + ?assertMatch(true, ?Q:is_empty(Q)), + ?assertMatch(0, ?Q:len(Q)), + ?assertMatch(1, ?Q:max_len(Q)), + ?assertMatch({undefined, _}, ?Q:in(#message{qos = 0, topic = <<>>}, Q)), + ?assertMatch({empty, _}, ?Q:out(Q)), + ?assertMatch([_|_], ?Q:stats(Q)), + ?assertMatch(0, ?Q:dropped(Q)). + + +t_live_upgrade2(_) -> + Q = {mqueue,false,10,0,0, + #{<<"t">> => 1}, + 0, + {queue,[],[],0}}, + ?assertMatch(#{}, ?Q:info(Q)), + ?assertMatch(true, ?Q:is_empty(Q)), + ?assertMatch(0, ?Q:len(Q)), + ?assertMatch(10, ?Q:max_len(Q)), + ?assertMatch({_, _}, ?Q:in(#message{qos = 0, topic = <<>>}, Q)), + ?assertMatch({empty, _}, ?Q:out(Q)), + ?assertMatch([_|_], ?Q:stats(Q)), + ?assertMatch(0, ?Q:dropped(Q)). + conservation_prop() -> ?FORALL({Priorities, Messages}, ?LET(Priorities, topic_priorities(),