From ae743ad1f04f4506969a0e74138bfed518a2e38d Mon Sep 17 00:00:00 2001 From: spring2maz Date: Sun, 21 Oct 2018 17:20:09 +0200 Subject: [PATCH] Rewrite emqx_mqueue.erl Fixed bugs: - Priority queue lack of a `len + 1` logic in `in/2` Changed behaviors: - Topics not found in priority table (from config) will be treated with default priority, instead of hasing topic name to a priority number. - Default priority is now configurable (it was always lower than all configured priorities) - The dropped message due to reaching `max_len` is now returned from `in/2`, so the queue owner (`in/2` caller) can perform autopsy on it --- etc/emqx.conf | 34 +++---- include/emqx.hrl | 6 ++ priv/emqx.schema | 44 +++++---- src/emqx_local_bridge.erl | 6 +- src/emqx_mqueue.erl | 191 ++++++++++++++++++------------------- src/emqx_session.erl | 11 ++- test/emqx_mqueue_SUITE.erl | 87 +++++++++-------- 7 files changed, 195 insertions(+), 184 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 56fcf5ffc..0b3604102 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -506,17 +506,6 @@ mqtt.wildcard_subscription = true ## Value: boolean mqtt.shared_subscription = true -## Message queue type. -## -## Value: simple | priority -mqtt.mqueue_type = simple - -## Topic priorities. Default is 0. -## -## Priority: Number [0-255] -## -## mqtt.mqueue_priorities = topic/1=10,topic/2=8 - ##-------------------------------------------------------------------- ## Zones ##-------------------------------------------------------------------- @@ -649,22 +638,29 @@ zone.external.await_rel_timeout = 300s ## Default: 2h, 2 hours zone.external.session_expiry_interval = 2h -## Message queue type. -## -## Value: simple | priority -zone.external.mqueue_type = simple - ## Maximum queue length. Enqueued messages when persistent client disconnected, ## or inflight window is full. 0 means no limit. ## ## Value: Number >= 0 zone.external.max_mqueue_len = 1000 -## Topic priorities. Default is 0. +## Topic priorities. +## 'none' to indicate no priority table (by default), hence all messages +## are treated equal ## -## Priority: Number [0-255] +## Priority number [1-255] +## Example: topic/1=10,topic/2=8 +## NOTE: comma and equal signs are not allowed for priority topic names +## NOTE: messages for topics not in the priority table are treated as +## either highest or lowest priority depending on the configured +## value for mqueue_default_priority ## -## zone.external.mqueue_priorities = topic/1=10,topic/2=8 +zone.external.mqueue_priorities = none + +## Default to highest priority for topics not matching priority table +## +## Value: highest | lowest +zone.external.mqueue_default_priority = highest ## Whether to enqueue Qos0 messages. ## diff --git a/include/emqx.hrl b/include/emqx.hrl index 984f4c9e2..84feb16b7 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -27,6 +27,12 @@ -define(ERTS_MINIMUM_REQUIRED, "10.0"). +%%-------------------------------------------------------------------- +%% Configs +%%-------------------------------------------------------------------- + +-define(NO_PRIORITY_TABLE, none). + %%-------------------------------------------------------------------- %% Topics' prefix: $SYS | $queue | $share %%-------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 1424ab240..d09937bb2 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -651,18 +651,6 @@ end}. {datatype, {enum, [true, false]}} ]}. -%% @doc Type: simple | priority -{mapping, "mqtt.mqueue_type", "emqx.mqueue_type", [ - {default, simple}, - {datatype, {enum, [simple, priority]}} -]}. - -%% @doc Topic Priorities: 0~255, Default is 0 -{mapping, "mqtt.mqueue_priorities", "emqx.mqueue_priorities", [ - {default, ""}, - {datatype, string} -]}. - %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- @@ -804,12 +792,6 @@ end}. {datatype, {duration, s}} ]}. -%% @doc Type: simple | priority -{mapping, "zone.$name.mqueue_type", "emqx.zones", [ - {default, simple}, - {datatype, {enum, [simple, priority]}} -]}. - %% @doc Max queue length. Enqueued messages when persistent client %% disconnected, or inflight window is full. 0 means no limit. {mapping, "zone.$name.max_mqueue_len", "emqx.zones", [ @@ -817,11 +799,23 @@ end}. {datatype, integer} ]}. -%% @doc Topic Priorities: 0~255, Default is 0 +%% @doc Topic Priorities, comma separated topic=priority pairs, +%% where priority should be integer in range 1-255 (inclusive) +%% 1 being the lowest and 255 being the highest. +%% default value `none` to indicate no priority table, hence all +%% messages are treated equal, which means either highest ('infinity'), +%% or lowest (0) depending on mqueue_default_priority config. {mapping, "zone.$name.mqueue_priorities", "emqx.zones", [ + {default, "none"}, {datatype, string} ]}. +%% @doc Default priority for topics not in priority table. +{mapping, "zone.$name.mqueue_default_priority", "emqx.zones", [ + {default, lowest}, + {datatype, {enum, [highest, lowest]}} +]}. + %% @doc Queue Qos0 messages? {mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [ {default, true}, @@ -886,6 +880,18 @@ end}. max_heap_size => Siz1} end, {force_shutdown_policy, ShutdownPolicy}; + ("mqueue_priorities", Val) -> + case Val of + "none" -> none; % NO_PRIORITY_TABLE + _ -> + lists:foldl(fun(T, Acc) -> + %% NOTE: space in "= " is intended + [{Topic, Prio}] = string:tokens(T, "= "), + P = list_to_integer(Prio), + (P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}), + maps:put(iolist_to_binary(Topic), P, Acc) + end, string:tokens(Val, ",")) + end; (Opt, Val) -> {list_to_atom(Opt), Val} end, diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl index 228a64cff..7c4e7cea1 100644 --- a/src/emqx_local_bridge.erl +++ b/src/emqx_local_bridge.erl @@ -63,8 +63,7 @@ init([Pool, Id, Node, Topic, Options]) -> Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), - MQueue = emqx_mqueue:init(#{type => simple, - max_len => State#state.max_queue_len, + MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len, store_qos0 => true}), {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; false -> @@ -96,7 +95,8 @@ handle_cast(Msg, State) -> handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) -> %% TODO: how to drop??? - {noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}}; + {_Dropped, NewQ} = emqx_mqueue:in(Msg, Q), + {noreply, State#state{mqueue = NewQ}}; handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index d9270dd5f..90fe59ba8 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -13,8 +13,8 @@ %% @doc A Simple in-memory message queue. %% -%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client -%% should be online in most of the time. +%% Notice that MQTT is not a (on-disk) persistent messaging queue. +%% It assumes that clients should be online in most of the time. %% %% This module implements a simple in-memory queue for MQTT persistent session. %% @@ -37,7 +37,8 @@ %% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true` %% in init options %% -%% 4. If the queue is full drop the oldest one unless `max_len' is set to `0'. +%% 4. If the queue is full, drop the oldest one +%% unless `max_len' is set to `0' which implies (`infinity'). %% %% @end @@ -46,132 +47,122 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([init/1, type/1]). +-export([init/1]). -export([is_empty/1]). -export([len/1, max_len/1]). -export([in/2, out/1]). -export([stats/1, dropped/1]). --define(PQUEUE, emqx_pqueue). +-export_type([mqueue/0, options/0]). --type(priority() :: {iolist(), pos_integer()}). - --type(options() :: #{type := simple | priority, - max_len := non_neg_integer(), - priorities => list(priority()), - store_qos0 => boolean()}). +-type(topic() :: emqx_topic:topic()). +-type(priority() :: infinity | integer()). +-type(pq() :: emqx_pqueue:q()). +-type(count() :: non_neg_integer()). +-type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}). +-type(options() :: #{max_len := count(), + priorities => p_table(), + default_priority => highest | lowest, + store_qos0 => boolean() + }). +-type(message() :: pemqx_types:message()). -type(stat() :: {len, non_neg_integer()} | {max_len, non_neg_integer()} | {dropped, non_neg_integer()}). +-define(PQUEUE, emqx_pqueue). +-define(LOWEST_PRIORITY, 0). +-define(HIGHEST_PRIORITY, infinity). +-define(MAX_LEN_INFINITY, 0). + -record(mqueue, { - type :: simple | priority, - q :: queue:queue() | ?PQUEUE:q(), - %% priority table - priorities = [], - pseq = 0, - len = 0, - max_len = 0, - qos0 = false, - dropped = 0 + store_qos0 = false :: boolean(), + max_len = ?MAX_LEN_INFINITY :: count(), + len = 0 :: count(), + dropped = 0 :: count(), + p_table = ?NO_PRIORITY_TABLE :: p_table(), + default_p = ?LOWEST_PRIORITY :: priority(), + q = ?PQUEUE:new() :: pq() }). --type(mqueue() :: #mqueue{}). - --export_type([mqueue/0, priority/0, options/0]). +-opaque(mqueue() :: #mqueue{}). -spec(init(options()) -> mqueue()). -init(Opts = #{type := Type, max_len := MaxLen, store_qos0 := QoS0}) -> - init_q(#mqueue{type = Type, len = 0, max_len = MaxLen, qos0 = QoS0}, Opts). +init(Opts = #{max_len := MaxLen0, store_qos0 := QoS0}) -> + MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of + true -> MaxLen0; + false -> ?MAX_LEN_INFINITY + end, + #mqueue{max_len = MaxLen, + store_qos0 = QoS0, + p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), + default_p = get_priority_opt(Opts) + }. -init_q(MQ = #mqueue{type = simple}, _Opts) -> - MQ#mqueue{q = queue:new()}; -init_q(MQ = #mqueue{type = priority}, #{priorities := Priorities}) -> - init_pq(Priorities, MQ#mqueue{q = ?PQUEUE:new()}). +is_empty(#mqueue{len = Len}) -> Len =:= 0. -init_pq([], MQ) -> - MQ; -init_pq([{Topic, P} | L], MQ) -> - {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ), - init_pq(L, MQ1). - -insert_p(Topic, P, MQ = #mqueue{priorities = L, pseq = Seq}) -> - <> = <>, - {PInt, MQ#mqueue{priorities = [{Topic, PInt} | L], pseq = Seq + 1}}. - --spec(type(mqueue()) -> simple | priority). -type(#mqueue{type = Type}) -> Type. - -is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; -is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q). - -len(#mqueue{type = simple, len = Len}) -> Len; -len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q). +len(#mqueue{len = Len}) -> Len. max_len(#mqueue{max_len = MaxLen}) -> MaxLen. -%% @doc Dropped of the mqueue --spec(dropped(mqueue()) -> non_neg_integer()). +%% @doc Return number of dropped messages. +-spec(dropped(mqueue()) -> count()). dropped(#mqueue{dropped = Dropped}) -> Dropped. %% @doc Stats of the mqueue -spec(stats(mqueue()) -> [stat()]). -stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) -> - [{len, case Type of - simple -> Len; - priority -> ?PQUEUE:len(Q) - end} | [{max_len, MaxLen}, {dropped, Dropped}]]. +stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> + [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]. %% @doc Enqueue a message. --spec(in(emqx_types:message(), mqueue()) -> mqueue()). -in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> - MQ; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> - MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) - when Len >= MaxLen -> - {{value, _Old}, Q2} = queue:out(Q), - MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1}; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> - MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; - -in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, - priorities = Priorities, - max_len = 0}) -> - case lists:keysearch(Topic, 1, Priorities) of - {value, {_, Pri}} -> - MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}; +-spec(in(message(), mqueue()) -> {undefined | message(), mqueue()}). +in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + {_Dropped = undefined, MQ}; +in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, + p_table = PTab, + q = Q, + len = Len, + max_len = MaxLen, + dropped = Dropped + } = MQ) -> + Priority = get_priority(Topic, PTab, Dp), + PLen = ?PQUEUE:plen(Priority, Q), + case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of + true -> + %% reached max length, drop the oldest message + {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q), + Q2 = ?PQUEUE:in(Msg, Priority, Q1), + {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; false -> - {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} - end; -in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, - priorities = Priorities, - max_len = MaxLen}) -> - case lists:keysearch(Topic, 1, Priorities) of - {value, {_, Pri}} -> - case ?PQUEUE:plen(Pri, Q) >= MaxLen of - true -> - {_, Q1} = ?PQUEUE:out(Pri, Q), - MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)}; - false -> - MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} - end; - false -> - {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} + {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} end. -out(MQ = #mqueue{type = simple, len = 0}) -> +-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}). +out(MQ = #mqueue{len = 0, q = Q}) -> + 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap {empty, MQ}; -out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> - {R, Q2} = queue:out(Q), - {R, MQ#mqueue{q = Q2, len = Len - 1}}; -out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> - {R, Q2} = queue:out(Q), - {R, MQ#mqueue{q = Q2, len = Len - 1}}; -out(MQ = #mqueue{type = priority, q = Q}) -> - {R, Q2} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q2}}. +out(MQ = #mqueue{q = Q, len = Len}) -> + {R, Q1} = ?PQUEUE:out(Q), + {R, MQ#mqueue{q = Q1, len = Len - 1}}. + +get_opt(Key, Opts, Default) -> + case maps:get(Key, Opts, Default) of + undefined -> Default; + X -> X + end. + +get_priority_opt(Opts) -> + case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of + lowest -> ?LOWEST_PRIORITY; + highest -> ?HIGHEST_PRIORITY; + N when is_integer(N) -> N + end. + +%% MICRO-OPTIMIZATION: When there is no priority table defined (from config), +%% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0) +%% because the lowest priority in emqx_pqueue is a fallback to queue:queue() +%% while the highest 'infinity' is a [{infinity, queue:queue()}] +get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; +get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 286066a59..85b753f3d 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -377,10 +377,10 @@ init([Parent, #{zone := Zone, gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). init_mqueue(Zone) -> - emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple), - max_len => get_env(Zone, max_mqueue_len, 1000), - priorities => get_env(Zone, mqueue_priorities, ""), - store_qos0 => get_env(Zone, mqueue_store_qos0, true) + emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000), + store_qos0 => get_env(Zone, mqueue_store_qos0, true), + priorities => get_env(Zone, mqueue_priorities), + default_priority => get_env(Zone, mqueue_default_priority) }). binding(ConnPid) -> @@ -817,7 +817,8 @@ dispatch(Msg = #message{qos = QoS} = Msg, end. enqueue_msg(Msg, State = #state{mqueue = Q}) -> - inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}). + {_Dropped, NewQ} = emqx_mqueue:in(Msg, Q), + inc_stats(enqueue, Msg, State#state{mqueue = NewQ}). %%------------------------------------------------------------------------------ %% Deliver diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index 8a1ca5201..9bb424fa1 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -28,57 +28,58 @@ all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_infinity_simple_mqueue, t_priority_mqueue, t_infinity_priority_mqueue]. t_in(_) -> - Opts = #{type => simple, max_len => 5, store_qos0 => true}, + Opts = #{max_len => 5, store_qos0 => true}, Q = ?Q:init(Opts), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#message{}, Q), + {_, Q1} = ?Q:in(#message{}, Q), ?assertEqual(1, ?Q:len(Q1)), - Q2 = ?Q:in(#message{qos = 1}, Q1), + {_, Q2} = ?Q:in(#message{qos = 1}, Q1), ?assertEqual(2, ?Q:len(Q2)), - Q3 = ?Q:in(#message{qos = 2}, Q2), - Q4 = ?Q:in(#message{}, Q3), - Q5 = ?Q:in(#message{}, Q4), + {_, Q3} = ?Q:in(#message{qos = 2}, Q2), + {_, Q4} = ?Q:in(#message{}, Q3), + {_, Q5} = ?Q:in(#message{}, Q4), ?assertEqual(5, ?Q:len(Q5)). t_in_qos0(_) -> - Opts = #{type => simple, max_len => 5, store_qos0 => false}, + Opts = #{max_len => 5, store_qos0 => false}, Q = ?Q:init(Opts), - Q1 = ?Q:in(#message{qos = 0}, Q), + {_, Q1} = ?Q:in(#message{qos = 0}, Q), ?assert(?Q:is_empty(Q1)), - Q2 = ?Q:in(#message{qos = 0}, Q1), + {_, Q2} = ?Q:in(#message{qos = 0}, Q1), ?assert(?Q:is_empty(Q2)). t_out(_) -> - Opts = #{type => simple, max_len => 5, store_qos0 => true}, + Opts = #{max_len => 5, store_qos0 => true}, Q = ?Q:init(Opts), {empty, Q} = ?Q:out(Q), - Q1 = ?Q:in(#message{}, Q), + {_, Q1} = ?Q:in(#message{}, Q), {Value, Q2} = ?Q:out(Q1), ?assertEqual(0, ?Q:len(Q2)), ?assertEqual({value, #message{}}, Value). t_simple_mqueue(_) -> - Opts = #{type => simple, max_len => 3, store_qos0 => false}, + Opts = #{max_len => 3, store_qos0 => false}, Q = ?Q:init(Opts), - ?assertEqual(simple, ?Q:type(Q)), ?assertEqual(3, ?Q:max_len(Q)), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q), - Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1), - Q3 = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2), - Q4 = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3), + {_, Q1} = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q), + {_, Q2} = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1), + {_, Q3} = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2), + {_, Q4} = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3), ?assertEqual(3, ?Q:len(Q4)), {{value, Msg}, Q5} = ?Q:out(Q4), ?assertEqual(<<"2">>, Msg#message.payload), ?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)). t_infinity_simple_mqueue(_) -> - Opts = #{type => simple, max_len => 0, store_qos0 => false}, + Opts = #{max_len => 0, store_qos0 => false}, Q = ?Q:init(Opts), ?assert(?Q:is_empty(Q)), ?assertEqual(0, ?Q:max_len(Q)), - Qx = lists:foldl(fun(I, AccQ) -> - ?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ) + Qx = lists:foldl( + fun(I, AccQ) -> + {_, NewQ} = ?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ), + NewQ end, Q, lists:seq(1, 255)), ?assertEqual(255, ?Q:len(Qx)), ?assertEqual([{len, 255}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)), @@ -86,45 +87,55 @@ t_infinity_simple_mqueue(_) -> ?assertEqual(<<1>>, V#message.payload). t_priority_mqueue(_) -> - Opts = #{type => priority, max_len => 3, priorities => [{<<"t1">>, 1}, {<<"t2">>, 2}, {<<"t3">>, 3}], store_qos0 => false}, + Opts = #{max_len => 3, + priorities => + #{<<"t1">> => 1, + <<"t2">> => 2, + <<"t3">> => 3 + }, + store_qos0 => false}, Q = ?Q:init(Opts), - ?assertEqual(priority, ?Q:type(Q)), ?assertEqual(3, ?Q:max_len(Q)), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q), - Q2 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1), - Q3 = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2), + {_, Q1} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q), + {_, Q2} = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1), + {_, Q3} = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2), ?assertEqual(3, ?Q:len(Q3)), - Q4 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3), + {_, Q4} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3), ?assertEqual(4, ?Q:len(Q4)), - Q5 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4), + {_, Q5} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4), ?assertEqual(5, ?Q:len(Q5)), - Q6 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), + {_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), ?assertEqual(5, ?Q:len(Q6)), {{value, Msg}, Q7} = ?Q:out(Q6), ?assertEqual(4, ?Q:len(Q7)), ?assertEqual(<<"t3">>, Msg#message.topic). t_infinity_priority_mqueue(_) -> - Opts = #{type => priority, max_len => 0, priorities => [{<<"t">>, 1}, {<<"t1">>, 2}], store_qos0 => false}, + Opts = #{max_len => 0, + priorities => + #{<<"t">> => 1, + <<"t1">> => 2 + }, + store_qos0 => false}, Q = ?Q:init(Opts), ?assertEqual(0, ?Q:max_len(Q)), Qx = lists:foldl(fun(I, AccQ) -> - AccQ1 = - ?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), - ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) - end, Q, lists:seq(1, 255)), + {undefined, AccQ1} = ?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), + {undefined, AccQ2} = ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1), + AccQ2 + end, Q, lists:seq(1, 255)), ?assertEqual(510, ?Q:len(Qx)), ?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)). t_priority_mqueue2(_) -> - Opts = #{type => priority, max_length => 2, store_qos0 => false}, + Opts = #{max_length => 2, store_qos0 => false}, Q = ?Q:init("priority_queue2_test", Opts), 2 = ?Q:max_len(Q), - Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), - Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), - Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), - Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), + {_, Q1} = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), + {_, Q2} = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), + {_, Q3} = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), + {_, Q4} = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), 4 = ?Q:len(Q4), {{value, _Val}, Q5} = ?Q:out(Q4), 3 = ?Q:len(Q5).