From 4eacaa29bd7e610039b831e4651fc9e0f0ca54c8 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 6 Sep 2021 21:03:34 +0200 Subject: [PATCH 1/4] feat(mqueue): Interleave messages with different priorities --- src/emqx_mqueue.erl | 59 +++++++++++++++++++-- src/emqx_pqueue.erl | 10 +++- test/emqx_mqueue_SUITE.erl | 106 ++++++++++++++++++++++++++++++++++++- 3 files changed, 168 insertions(+), 7 deletions(-) diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index d0c6365ff..c47cb1b00 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -91,6 +91,11 @@ -define(MAX_LEN_INFINITY, 0). -define(INFO_KEYS, [store_qos0, max_len, len, dropped]). +-record(shift_opts, { + multiplier :: non_neg_integer(), + base :: integer() + }). + -record(mqueue, { store_qos0 = false :: boolean(), max_len = ?MAX_LEN_INFINITY :: count(), @@ -98,7 +103,10 @@ dropped = 0 :: count(), p_table = ?NO_PRIORITY_TABLE :: p_table(), default_p = ?LOWEST_PRIORITY :: priority(), - q = ?PQUEUE:new() :: pq() + q = ?PQUEUE:new() :: pq(), + shift_opts :: #shift_opts{}, + last_p :: non_neg_integer() | undefined, + counter :: non_neg_integer() | undefined }). -type(mqueue() :: #mqueue{}). @@ -112,7 +120,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> #mqueue{max_len = MaxLen, store_qos0 = QoS_0, p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), - default_p = get_priority_opt(Opts) + default_p = get_priority_opt(Opts), + shift_opts = get_shift_opt(Opts) }. -spec(info(mqueue()) -> emqx_types:infos()). @@ -171,9 +180,25 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, out(MQ = #mqueue{len = 0, q = Q}) -> 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap {empty, MQ}; -out(MQ = #mqueue{q = Q, len = Len}) -> +out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) -> + {{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length + MQ1 = MQ#mqueue{ + q = Q1, + len = Len - 1, + last_p = Prio, + counter = init_counter(Prio, ShiftOpts) + }, + {{value, Val}, MQ1}; +out(MQ = #mqueue{q = Q, counter = 0}) -> + MQ1 = MQ#mqueue{ + q = ?PQUEUE:shift(Q), + last_p = undefined + }, + out(MQ1); +out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) -> + ct:pal("Cnt ~p", [Cnt]), {R, Q1} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q1, len = Len - 1}}. + {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}. get_opt(Key, Opts, Default) -> case maps:get(Key, Opts, Default) of @@ -194,3 +219,29 @@ get_priority_opt(Opts) -> %% while the highest 'infinity' is a [{infinity, queue:queue()}] get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). + +init_counter(?HIGHEST_PRIORITY, Opts) -> + Infinity = 1000000, + init_counter(Infinity, Opts); +init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) -> + (Prio + Base) * Mult. + +get_shift_opt(Opts) -> + Mult = maps:get(shift_multiplier, Opts, 10), + Min = case Opts of + #{p_table := PTab} -> + case maps:size(PTab) of + 0 -> 0; + _ -> lists:min(maps:values(PTab)) + end; + _ -> + ?LOWEST_PRIORITY + end, + Base = case Min < 0 of + true -> -Min; + false -> 0 + end, + #shift_opts{ + multiplier = Mult, + base = Base + }. diff --git a/src/emqx_pqueue.erl b/src/emqx_pqueue.erl index 85c89866d..5dd81af0b 100644 --- a/src/emqx_pqueue.erl +++ b/src/emqx_pqueue.erl @@ -55,6 +55,7 @@ , filter/2 , fold/3 , highest/1 + , shift/1 ]). -export_type([q/0]). @@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +-spec(shift(pqueue()) -> pqueue()). +shift(Q = {queue, _, _, _}) -> + Q; +shift({pqueue, []}) -> + {pqueue, []}; %% Shouldn't happen? +shift({pqueue, [Hd|Rest]}) -> + {pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities. + -spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). @@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(P) -> -P. - diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index 34e509145..106b45887 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(Q, emqx_mqueue). @@ -121,8 +122,55 @@ t_priority_mqueue(_) -> {_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), ?assertEqual(5, ?Q:len(Q6)), {{value, Msg}, Q7} = ?Q:out(Q6), - ?assertEqual(4, ?Q:len(Q7)), - ?assertEqual(<<"t3">>, Msg#message.topic). + ?assertEqual(4, ?Q:len(Q7)). + +t_priority_mqueue_conservation(_) -> + true = proper:quickcheck(conservation_prop()). + +t_priority_order(_) -> + Opts = #{max_len => 5, + shift_multiplier => 1, + priorities => + #{<<"t1">> => 0, + <<"t2">> => 1, + <<"t3">> => 2 + }, + store_qos0 => false + }, + Messages = [{Topic, Message} || + Topic <- [<<"t1">>, <<"t2">>, <<"t3">>], + Message <- lists:seq(1, 10)], + Q = lists:foldl(fun({Topic, Message}, Q) -> + element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q)) + end, + ?Q:init(Opts), + Messages), + ?assertMatch([{<<"t3">>, 6}, + {<<"t3">>, 7}, + {<<"t3">>, 8}, + + {<<"t2">>, 6}, + {<<"t2">>, 7}, + + {<<"t1">>, 6}, + + {<<"t3">>, 9}, + {<<"t3">>, 10}, + + {<<"t2">>, 8}, + + %% Note: for performance reasons we don't reset the + %% counter when we run out of messages with the + %% current prio, so next is t1: + {<<"t1">>, 7}, + + {<<"t2">>, 9}, + {<<"t2">>, 10}, + + {<<"t1">>, 8}, + {<<"t1">>, 9}, + {<<"t1">>, 10} + ], drain(Q)). t_infinity_priority_mqueue(_) -> Opts = #{max_len => 0, @@ -163,3 +211,57 @@ t_dropped(_) -> {Msg, Q2} = ?Q:in(Msg, Q1), ?assertEqual(1, ?Q:dropped(Q2)). +conservation_prop() -> + ?FORALL({Priorities, Messages}, + ?LET(Priorities, topic_priorities(), + {Priorities, messages(Priorities)}), + try + Opts = #{max_len => 0, + priorities => maps:from_list(Priorities), + store_qos0 => false}, + %% Put messages in + Q1 = lists:foldl(fun({Topic, Message}, Q) -> + element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q)) + end, + ?Q:init(Opts), + Messages), + %% Collect messages + Got = lists:sort(drain(Q1)), + Expected = lists:sort(Messages), + case Expected =:= Got of + true -> + true; + false -> + ct:pal("Mismatch: expected ~p~nGot ~p~n", [Expected, Got]), + false + end + catch + EC:Err:Stack -> + ct:pal("Error: ~p", [{EC, Err, Stack}]), + false + end). + +%% Proper generators: + +topic(Priorities) -> + {Topics, _} = lists:unzip(Priorities), + oneof(Topics). + +topic_priorities() -> + non_empty(list({binary(), priority()})). + +priority() -> + oneof([integer(), infinity]). + +messages(Topics) -> + list({topic(Topics), binary()}). + +%% Internal functions: + +drain(Q) -> + case ?Q:out(Q) of + {empty, _} -> + []; + {{value, #message{topic = T, payload = P}}, Q1} -> + [{T, P}|drain(Q1)] + end. From ed61999fdf00e75bb867511b3694f3766ea3b48d Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 6 Sep 2021 22:13:20 +0200 Subject: [PATCH 2/4] chore(emqx): Bump version --- src/emqx.app.src | 2 +- src/emqx.appup.src | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/emqx.app.src b/src/emqx.app.src index 613e3a0af..a46dcc9a8 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,7 +1,7 @@ {application, emqx, [{id, "emqx"}, {description, "EMQ X"}, - {vsn, "4.3.8"}, % strict semver, bump manually! + {vsn, "4.3.9"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon]}, diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 923bcd6b1..1cfd7c8ce 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,22 +1,32 @@ %% -*- mode: erlang -*- Instructions = -{"4.3.8", +{"4.3.9", [ + {"4.3.8", [ + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + ]}, {"4.3.7", [ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, - {load_module,emqx_misc,brutal_purge,soft_purge,[]} + {load_module,emqx_misc,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.3.6", [ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, - {load_module,emqx_ctl,brutal_purge,soft_purge,[]} + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.3.5", [ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, - {load_module,emqx_ctl,brutal_purge,soft_purge,[]} + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.3.4", [ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -24,7 +34,9 @@ Instructions = {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, - {load_module,emqx_ctl,brutal_purge,soft_purge,[]} + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.3.3", [ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -34,7 +46,9 @@ Instructions = {load_module,emqx_ws_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, - {load_module,emqx_ctl,brutal_purge,soft_purge,[]} + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.3.2", [ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -47,7 +61,9 @@ Instructions = {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, - {load_module,emqx_ctl,brutal_purge,soft_purge,[]} + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.3.1", [ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -65,7 +81,9 @@ Instructions = {load_module,emqx_logger_textfmt,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, - {load_module,emqx_ctl,brutal_purge,soft_purge,[]} + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.3.0", [ {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -175,7 +193,9 @@ Instructions = {load_module,emqx_metrics,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_access_rule,brutal_purge,soft_purge,[]}, - {load_module,emqx_ctl,brutal_purge,soft_purge,[]} + {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<".*">>,[]}]}, From 5fc1036cf775d04230412edc0672ab01dfb0687a Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Mon, 6 Sep 2021 23:02:51 +0200 Subject: [PATCH 3/4] chore(mqueue): Implement live upgrade --- src/emqx_mqueue.erl | 50 ++++++++++++++++++++++++++++++++------ test/emqx_mqueue_SUITE.erl | 27 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index c47cb1b00..2f92fc140 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -67,6 +67,9 @@ , dropped/1 ]). +-export([ live_upgrade/1 + ]). + -export_type([mqueue/0, options/0]). -type(topic() :: emqx_topic:topic()). @@ -111,6 +114,8 @@ -type(mqueue() :: #mqueue{}). +-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}). + -spec(init(options()) -> mqueue()). init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of @@ -136,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) -> info(len, #mqueue{len = Len}) -> Len; info(dropped, #mqueue{dropped = Dropped}) -> - Dropped. + Dropped; +info(Info, ?OLD(MQ)) -> + info(Info, live_upgrade(MQ)). -is_empty(#mqueue{len = Len}) -> Len =:= 0. +is_empty(#mqueue{len = Len}) -> Len =:= 0; +is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)). -len(#mqueue{len = Len}) -> Len. +len(#mqueue{len = Len}) -> Len; +len(?OLD(MQ)) -> len(live_upgrade(MQ)). -max_len(#mqueue{max_len = MaxLen}) -> MaxLen. +max_len(#mqueue{max_len = MaxLen}) -> MaxLen; +max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)). %% @doc Return number of dropped messages. -spec(dropped(mqueue()) -> count()). -dropped(#mqueue{dropped = Dropped}) -> Dropped. +dropped(#mqueue{dropped = Dropped}) -> Dropped; +dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)). %% @doc Stats of the mqueue -spec(stats(mqueue()) -> [stat()]). stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> - [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]. + [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]; +stats(?OLD(MQ)) -> + stats(live_upgrade(MQ)). %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). @@ -174,7 +187,9 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; false -> {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} - end. + end; +in(Msg, ?OLD(MQ)) -> + in(Msg, live_upgrade(MQ)). -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}). out(MQ = #mqueue{len = 0, q = Q}) -> @@ -198,7 +213,9 @@ out(MQ = #mqueue{q = Q, counter = 0}) -> out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) -> ct:pal("Cnt ~p", [Cnt]), {R, Q1} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}. + {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}; +out(?OLD(MQ)) -> + out(live_upgrade(MQ)). get_opt(Key, Opts, Default) -> case maps:get(Key, Opts, Default) of @@ -245,3 +262,20 @@ get_shift_opt(Opts) -> multiplier = Mult, base = Base }. + +live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) -> + ShiftOpts = case is_map(PTable) of + true -> get_shift_opt(#{p_table => PTable}); + false -> get_shift_opt(#{}) + end, + #mqueue{ store_qos0 = StoreQos0 + , max_len = MaxLen + , dropped = Dropped + , p_table = PTable + , default_p = DefaultP + , len = Len + , q = Q + , shift_opts = ShiftOpts + , last_p = undefined + , counter = undefined + }. diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index 106b45887..599e68da1 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -211,6 +211,33 @@ t_dropped(_) -> {Msg, Q2} = ?Q:in(Msg, Q1), ?assertEqual(1, ?Q:dropped(Q2)). +t_live_upgrade(_) -> + Q = {mqueue,true,1,0,0,none,0, + {queue,[],[],0}}, + ?assertMatch(#{}, ?Q:info(Q)), + ?assertMatch(true, ?Q:is_empty(Q)), + ?assertMatch(0, ?Q:len(Q)), + ?assertMatch(1, ?Q:max_len(Q)), + ?assertMatch({undefined, _}, ?Q:in(#message{qos = 0, topic = <<>>}, Q)), + ?assertMatch({empty, _}, ?Q:out(Q)), + ?assertMatch([_|_], ?Q:stats(Q)), + ?assertMatch(0, ?Q:dropped(Q)). + + +t_live_upgrade2(_) -> + Q = {mqueue,false,10,0,0, + #{<<"t">> => 1}, + 0, + {queue,[],[],0}}, + ?assertMatch(#{}, ?Q:info(Q)), + ?assertMatch(true, ?Q:is_empty(Q)), + ?assertMatch(0, ?Q:len(Q)), + ?assertMatch(10, ?Q:max_len(Q)), + ?assertMatch({_, _}, ?Q:in(#message{qos = 0, topic = <<>>}, Q)), + ?assertMatch({empty, _}, ?Q:out(Q)), + ?assertMatch([_|_], ?Q:stats(Q)), + ?assertMatch(0, ?Q:dropped(Q)). + conservation_prop() -> ?FORALL({Priorities, Messages}, ?LET(Priorities, topic_priorities(), From 9b097ac73fad70f562592633095ec13bcff603e2 Mon Sep 17 00:00:00 2001 From: k32 <10274441+k32@users.noreply.github.com> Date: Tue, 7 Sep 2021 11:33:16 +0200 Subject: [PATCH 4/4] chore(mqueue): Remove forgotten debug message --- src/emqx_mqueue.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 2f92fc140..f5fc90f47 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -211,7 +211,6 @@ out(MQ = #mqueue{q = Q, counter = 0}) -> }, out(MQ1); out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) -> - ct:pal("Cnt ~p", [Cnt]), {R, Q1} = ?PQUEUE:out(Q), {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}; out(?OLD(MQ)) ->