diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index d0c6365ff..c47cb1b00 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -91,6 +91,11 @@ -define(MAX_LEN_INFINITY, 0). -define(INFO_KEYS, [store_qos0, max_len, len, dropped]). +-record(shift_opts, { + multiplier :: non_neg_integer(), + base :: integer() + }). + -record(mqueue, { store_qos0 = false :: boolean(), max_len = ?MAX_LEN_INFINITY :: count(), @@ -98,7 +103,10 @@ dropped = 0 :: count(), p_table = ?NO_PRIORITY_TABLE :: p_table(), default_p = ?LOWEST_PRIORITY :: priority(), - q = ?PQUEUE:new() :: pq() + q = ?PQUEUE:new() :: pq(), + shift_opts :: #shift_opts{}, + last_p :: non_neg_integer() | undefined, + counter :: non_neg_integer() | undefined }). -type(mqueue() :: #mqueue{}). @@ -112,7 +120,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> #mqueue{max_len = MaxLen, store_qos0 = QoS_0, p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), - default_p = get_priority_opt(Opts) + default_p = get_priority_opt(Opts), + shift_opts = get_shift_opt(Opts) }. -spec(info(mqueue()) -> emqx_types:infos()). @@ -171,9 +180,25 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, out(MQ = #mqueue{len = 0, q = Q}) -> 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap {empty, MQ}; -out(MQ = #mqueue{q = Q, len = Len}) -> +out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) -> + {{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length + MQ1 = MQ#mqueue{ + q = Q1, + len = Len - 1, + last_p = Prio, + counter = init_counter(Prio, ShiftOpts) + }, + {{value, Val}, MQ1}; +out(MQ = #mqueue{q = Q, counter = 0}) -> + MQ1 = MQ#mqueue{ + q = ?PQUEUE:shift(Q), + last_p = undefined + }, + out(MQ1); +out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) -> + ct:pal("Cnt ~p", [Cnt]), {R, Q1} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q1, len = Len - 1}}. + {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}. get_opt(Key, Opts, Default) -> case maps:get(Key, Opts, Default) of @@ -194,3 +219,29 @@ get_priority_opt(Opts) -> %% while the highest 'infinity' is a [{infinity, queue:queue()}] get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). + +init_counter(?HIGHEST_PRIORITY, Opts) -> + Infinity = 1000000, + init_counter(Infinity, Opts); +init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) -> + (Prio + Base) * Mult. + +get_shift_opt(Opts) -> + Mult = maps:get(shift_multiplier, Opts, 10), + Min = case Opts of + #{p_table := PTab} -> + case maps:size(PTab) of + 0 -> 0; + _ -> lists:min(maps:values(PTab)) + end; + _ -> + ?LOWEST_PRIORITY + end, + Base = case Min < 0 of + true -> -Min; + false -> 0 + end, + #shift_opts{ + multiplier = Mult, + base = Base + }. diff --git a/src/emqx_pqueue.erl b/src/emqx_pqueue.erl index 85c89866d..5dd81af0b 100644 --- a/src/emqx_pqueue.erl +++ b/src/emqx_pqueue.erl @@ -55,6 +55,7 @@ , filter/2 , fold/3 , highest/1 + , shift/1 ]). -export_type([q/0]). @@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +-spec(shift(pqueue()) -> pqueue()). +shift(Q = {queue, _, _, _}) -> + Q; +shift({pqueue, []}) -> + {pqueue, []}; %% Shouldn't happen? +shift({pqueue, [Hd|Rest]}) -> + {pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities. + -spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). @@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(P) -> -P. - diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index 34e509145..106b45887 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(Q, emqx_mqueue). @@ -121,8 +122,55 @@ t_priority_mqueue(_) -> {_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), ?assertEqual(5, ?Q:len(Q6)), {{value, Msg}, Q7} = ?Q:out(Q6), - ?assertEqual(4, ?Q:len(Q7)), - ?assertEqual(<<"t3">>, Msg#message.topic). + ?assertEqual(4, ?Q:len(Q7)). + +t_priority_mqueue_conservation(_) -> + true = proper:quickcheck(conservation_prop()). + +t_priority_order(_) -> + Opts = #{max_len => 5, + shift_multiplier => 1, + priorities => + #{<<"t1">> => 0, + <<"t2">> => 1, + <<"t3">> => 2 + }, + store_qos0 => false + }, + Messages = [{Topic, Message} || + Topic <- [<<"t1">>, <<"t2">>, <<"t3">>], + Message <- lists:seq(1, 10)], + Q = lists:foldl(fun({Topic, Message}, Q) -> + element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q)) + end, + ?Q:init(Opts), + Messages), + ?assertMatch([{<<"t3">>, 6}, + {<<"t3">>, 7}, + {<<"t3">>, 8}, + + {<<"t2">>, 6}, + {<<"t2">>, 7}, + + {<<"t1">>, 6}, + + {<<"t3">>, 9}, + {<<"t3">>, 10}, + + {<<"t2">>, 8}, + + %% Note: for performance reasons we don't reset the + %% counter when we run out of messages with the + %% current prio, so next is t1: + {<<"t1">>, 7}, + + {<<"t2">>, 9}, + {<<"t2">>, 10}, + + {<<"t1">>, 8}, + {<<"t1">>, 9}, + {<<"t1">>, 10} + ], drain(Q)). t_infinity_priority_mqueue(_) -> Opts = #{max_len => 0, @@ -163,3 +211,57 @@ t_dropped(_) -> {Msg, Q2} = ?Q:in(Msg, Q1), ?assertEqual(1, ?Q:dropped(Q2)). +conservation_prop() -> + ?FORALL({Priorities, Messages}, + ?LET(Priorities, topic_priorities(), + {Priorities, messages(Priorities)}), + try + Opts = #{max_len => 0, + priorities => maps:from_list(Priorities), + store_qos0 => false}, + %% Put messages in + Q1 = lists:foldl(fun({Topic, Message}, Q) -> + element(2, ?Q:in(#message{topic = Topic, qos = 1, payload = Message}, Q)) + end, + ?Q:init(Opts), + Messages), + %% Collect messages + Got = lists:sort(drain(Q1)), + Expected = lists:sort(Messages), + case Expected =:= Got of + true -> + true; + false -> + ct:pal("Mismatch: expected ~p~nGot ~p~n", [Expected, Got]), + false + end + catch + EC:Err:Stack -> + ct:pal("Error: ~p", [{EC, Err, Stack}]), + false + end). + +%% Proper generators: + +topic(Priorities) -> + {Topics, _} = lists:unzip(Priorities), + oneof(Topics). + +topic_priorities() -> + non_empty(list({binary(), priority()})). + +priority() -> + oneof([integer(), infinity]). + +messages(Topics) -> + list({topic(Topics), binary()}). + +%% Internal functions: + +drain(Q) -> + case ?Q:out(Q) of + {empty, _} -> + []; + {{value, #message{topic = T, payload = P}}, Q1} -> + [{T, P}|drain(Q1)] + end.