Rewrite emqx_mqueue.erl
Fixed bugs: - Priority queue lack of a `len + 1` logic in `in/2` Changed behaviors: - Topics not found in priority table (from config) will be treated with default priority, instead of hasing topic name to a priority number. - Default priority is now configurable (it was always lower than all configured priorities) - The dropped message due to reaching `max_len` is now returned from `in/2`, so the queue owner (`in/2` caller) can perform autopsy on it
This commit is contained in:
parent
41b79e4f99
commit
ae743ad1f0
|
@ -506,17 +506,6 @@ mqtt.wildcard_subscription = true
|
||||||
## Value: boolean
|
## Value: boolean
|
||||||
mqtt.shared_subscription = true
|
mqtt.shared_subscription = true
|
||||||
|
|
||||||
## Message queue type.
|
|
||||||
##
|
|
||||||
## Value: simple | priority
|
|
||||||
mqtt.mqueue_type = simple
|
|
||||||
|
|
||||||
## Topic priorities. Default is 0.
|
|
||||||
##
|
|
||||||
## Priority: Number [0-255]
|
|
||||||
##
|
|
||||||
## mqtt.mqueue_priorities = topic/1=10,topic/2=8
|
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## Zones
|
## Zones
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
|
@ -649,22 +638,29 @@ zone.external.await_rel_timeout = 300s
|
||||||
## Default: 2h, 2 hours
|
## Default: 2h, 2 hours
|
||||||
zone.external.session_expiry_interval = 2h
|
zone.external.session_expiry_interval = 2h
|
||||||
|
|
||||||
## Message queue type.
|
|
||||||
##
|
|
||||||
## Value: simple | priority
|
|
||||||
zone.external.mqueue_type = simple
|
|
||||||
|
|
||||||
## Maximum queue length. Enqueued messages when persistent client disconnected,
|
## Maximum queue length. Enqueued messages when persistent client disconnected,
|
||||||
## or inflight window is full. 0 means no limit.
|
## or inflight window is full. 0 means no limit.
|
||||||
##
|
##
|
||||||
## Value: Number >= 0
|
## Value: Number >= 0
|
||||||
zone.external.max_mqueue_len = 1000
|
zone.external.max_mqueue_len = 1000
|
||||||
|
|
||||||
## Topic priorities. Default is 0.
|
## Topic priorities.
|
||||||
|
## 'none' to indicate no priority table (by default), hence all messages
|
||||||
|
## are treated equal
|
||||||
##
|
##
|
||||||
## Priority: Number [0-255]
|
## Priority number [1-255]
|
||||||
|
## Example: topic/1=10,topic/2=8
|
||||||
|
## NOTE: comma and equal signs are not allowed for priority topic names
|
||||||
|
## NOTE: messages for topics not in the priority table are treated as
|
||||||
|
## either highest or lowest priority depending on the configured
|
||||||
|
## value for mqueue_default_priority
|
||||||
##
|
##
|
||||||
## zone.external.mqueue_priorities = topic/1=10,topic/2=8
|
zone.external.mqueue_priorities = none
|
||||||
|
|
||||||
|
## Default to highest priority for topics not matching priority table
|
||||||
|
##
|
||||||
|
## Value: highest | lowest
|
||||||
|
zone.external.mqueue_default_priority = highest
|
||||||
|
|
||||||
## Whether to enqueue Qos0 messages.
|
## Whether to enqueue Qos0 messages.
|
||||||
##
|
##
|
||||||
|
|
|
@ -27,6 +27,12 @@
|
||||||
|
|
||||||
-define(ERTS_MINIMUM_REQUIRED, "10.0").
|
-define(ERTS_MINIMUM_REQUIRED, "10.0").
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Configs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(NO_PRIORITY_TABLE, none).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Topics' prefix: $SYS | $queue | $share
|
%% Topics' prefix: $SYS | $queue | $share
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -651,18 +651,6 @@ end}.
|
||||||
{datatype, {enum, [true, false]}}
|
{datatype, {enum, [true, false]}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc Type: simple | priority
|
|
||||||
{mapping, "mqtt.mqueue_type", "emqx.mqueue_type", [
|
|
||||||
{default, simple},
|
|
||||||
{datatype, {enum, [simple, priority]}}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
%% @doc Topic Priorities: 0~255, Default is 0
|
|
||||||
{mapping, "mqtt.mqueue_priorities", "emqx.mqueue_priorities", [
|
|
||||||
{default, ""},
|
|
||||||
{datatype, string}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Zones
|
%% Zones
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -804,12 +792,6 @@ end}.
|
||||||
{datatype, {duration, s}}
|
{datatype, {duration, s}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc Type: simple | priority
|
|
||||||
{mapping, "zone.$name.mqueue_type", "emqx.zones", [
|
|
||||||
{default, simple},
|
|
||||||
{datatype, {enum, [simple, priority]}}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
%% @doc Max queue length. Enqueued messages when persistent client
|
%% @doc Max queue length. Enqueued messages when persistent client
|
||||||
%% disconnected, or inflight window is full. 0 means no limit.
|
%% disconnected, or inflight window is full. 0 means no limit.
|
||||||
{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
|
{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
|
||||||
|
@ -817,11 +799,23 @@ end}.
|
||||||
{datatype, integer}
|
{datatype, integer}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
%% @doc Topic Priorities: 0~255, Default is 0
|
%% @doc Topic Priorities, comma separated topic=priority pairs,
|
||||||
|
%% where priority should be integer in range 1-255 (inclusive)
|
||||||
|
%% 1 being the lowest and 255 being the highest.
|
||||||
|
%% default value `none` to indicate no priority table, hence all
|
||||||
|
%% messages are treated equal, which means either highest ('infinity'),
|
||||||
|
%% or lowest (0) depending on mqueue_default_priority config.
|
||||||
{mapping, "zone.$name.mqueue_priorities", "emqx.zones", [
|
{mapping, "zone.$name.mqueue_priorities", "emqx.zones", [
|
||||||
|
{default, "none"},
|
||||||
{datatype, string}
|
{datatype, string}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @doc Default priority for topics not in priority table.
|
||||||
|
{mapping, "zone.$name.mqueue_default_priority", "emqx.zones", [
|
||||||
|
{default, lowest},
|
||||||
|
{datatype, {enum, [highest, lowest]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
%% @doc Queue Qos0 messages?
|
%% @doc Queue Qos0 messages?
|
||||||
{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
|
{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
|
||||||
{default, true},
|
{default, true},
|
||||||
|
@ -886,6 +880,18 @@ end}.
|
||||||
max_heap_size => Siz1}
|
max_heap_size => Siz1}
|
||||||
end,
|
end,
|
||||||
{force_shutdown_policy, ShutdownPolicy};
|
{force_shutdown_policy, ShutdownPolicy};
|
||||||
|
("mqueue_priorities", Val) ->
|
||||||
|
case Val of
|
||||||
|
"none" -> none; % NO_PRIORITY_TABLE
|
||||||
|
_ ->
|
||||||
|
lists:foldl(fun(T, Acc) ->
|
||||||
|
%% NOTE: space in "= " is intended
|
||||||
|
[{Topic, Prio}] = string:tokens(T, "= "),
|
||||||
|
P = list_to_integer(Prio),
|
||||||
|
(P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}),
|
||||||
|
maps:put(iolist_to_binary(Topic), P, Acc)
|
||||||
|
end, string:tokens(Val, ","))
|
||||||
|
end;
|
||||||
(Opt, Val) ->
|
(Opt, Val) ->
|
||||||
{list_to_atom(Opt), Val}
|
{list_to_atom(Opt), Val}
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -63,8 +63,7 @@ init([Pool, Id, Node, Topic, Options]) ->
|
||||||
Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
|
Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
|
||||||
emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}),
|
emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}),
|
||||||
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
|
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
|
||||||
MQueue = emqx_mqueue:init(#{type => simple,
|
MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len,
|
||||||
max_len => State#state.max_queue_len,
|
|
||||||
store_qos0 => true}),
|
store_qos0 => true}),
|
||||||
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
|
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
|
||||||
false ->
|
false ->
|
||||||
|
@ -96,7 +95,8 @@ handle_cast(Msg, State) ->
|
||||||
|
|
||||||
handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) ->
|
handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) ->
|
||||||
%% TODO: how to drop???
|
%% TODO: how to drop???
|
||||||
{noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}};
|
{_Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
|
||||||
|
{noreply, State#state{mqueue = NewQ}};
|
||||||
|
|
||||||
handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
|
handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
|
||||||
emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),
|
emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
|
|
||||||
%% @doc A Simple in-memory message queue.
|
%% @doc A Simple in-memory message queue.
|
||||||
%%
|
%%
|
||||||
%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
|
%% Notice that MQTT is not a (on-disk) persistent messaging queue.
|
||||||
%% should be online in most of the time.
|
%% It assumes that clients should be online in most of the time.
|
||||||
%%
|
%%
|
||||||
%% This module implements a simple in-memory queue for MQTT persistent session.
|
%% This module implements a simple in-memory queue for MQTT persistent session.
|
||||||
%%
|
%%
|
||||||
|
@ -37,7 +37,8 @@
|
||||||
%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
|
%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
|
||||||
%% in init options
|
%% in init options
|
||||||
%%
|
%%
|
||||||
%% 4. If the queue is full drop the oldest one unless `max_len' is set to `0'.
|
%% 4. If the queue is full, drop the oldest one
|
||||||
|
%% unless `max_len' is set to `0' which implies (`infinity').
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
|
|
||||||
|
@ -46,132 +47,122 @@
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
-export([init/1, type/1]).
|
-export([init/1]).
|
||||||
-export([is_empty/1]).
|
-export([is_empty/1]).
|
||||||
-export([len/1, max_len/1]).
|
-export([len/1, max_len/1]).
|
||||||
-export([in/2, out/1]).
|
-export([in/2, out/1]).
|
||||||
-export([stats/1, dropped/1]).
|
-export([stats/1, dropped/1]).
|
||||||
|
|
||||||
-define(PQUEUE, emqx_pqueue).
|
-export_type([mqueue/0, options/0]).
|
||||||
|
|
||||||
-type(priority() :: {iolist(), pos_integer()}).
|
-type(topic() :: emqx_topic:topic()).
|
||||||
|
-type(priority() :: infinity | integer()).
|
||||||
-type(options() :: #{type := simple | priority,
|
-type(pq() :: emqx_pqueue:q()).
|
||||||
max_len := non_neg_integer(),
|
-type(count() :: non_neg_integer()).
|
||||||
priorities => list(priority()),
|
-type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}).
|
||||||
store_qos0 => boolean()}).
|
-type(options() :: #{max_len := count(),
|
||||||
|
priorities => p_table(),
|
||||||
|
default_priority => highest | lowest,
|
||||||
|
store_qos0 => boolean()
|
||||||
|
}).
|
||||||
|
-type(message() :: pemqx_types:message()).
|
||||||
|
|
||||||
-type(stat() :: {len, non_neg_integer()}
|
-type(stat() :: {len, non_neg_integer()}
|
||||||
| {max_len, non_neg_integer()}
|
| {max_len, non_neg_integer()}
|
||||||
| {dropped, non_neg_integer()}).
|
| {dropped, non_neg_integer()}).
|
||||||
|
|
||||||
|
-define(PQUEUE, emqx_pqueue).
|
||||||
|
-define(LOWEST_PRIORITY, 0).
|
||||||
|
-define(HIGHEST_PRIORITY, infinity).
|
||||||
|
-define(MAX_LEN_INFINITY, 0).
|
||||||
|
|
||||||
-record(mqueue, {
|
-record(mqueue, {
|
||||||
type :: simple | priority,
|
store_qos0 = false :: boolean(),
|
||||||
q :: queue:queue() | ?PQUEUE:q(),
|
max_len = ?MAX_LEN_INFINITY :: count(),
|
||||||
%% priority table
|
len = 0 :: count(),
|
||||||
priorities = [],
|
dropped = 0 :: count(),
|
||||||
pseq = 0,
|
p_table = ?NO_PRIORITY_TABLE :: p_table(),
|
||||||
len = 0,
|
default_p = ?LOWEST_PRIORITY :: priority(),
|
||||||
max_len = 0,
|
q = ?PQUEUE:new() :: pq()
|
||||||
qos0 = false,
|
|
||||||
dropped = 0
|
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type(mqueue() :: #mqueue{}).
|
-opaque(mqueue() :: #mqueue{}).
|
||||||
|
|
||||||
-export_type([mqueue/0, priority/0, options/0]).
|
|
||||||
|
|
||||||
-spec(init(options()) -> mqueue()).
|
-spec(init(options()) -> mqueue()).
|
||||||
init(Opts = #{type := Type, max_len := MaxLen, store_qos0 := QoS0}) ->
|
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS0}) ->
|
||||||
init_q(#mqueue{type = Type, len = 0, max_len = MaxLen, qos0 = QoS0}, Opts).
|
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
|
||||||
|
true -> MaxLen0;
|
||||||
|
false -> ?MAX_LEN_INFINITY
|
||||||
|
end,
|
||||||
|
#mqueue{max_len = MaxLen,
|
||||||
|
store_qos0 = QoS0,
|
||||||
|
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
|
||||||
|
default_p = get_priority_opt(Opts)
|
||||||
|
}.
|
||||||
|
|
||||||
init_q(MQ = #mqueue{type = simple}, _Opts) ->
|
is_empty(#mqueue{len = Len}) -> Len =:= 0.
|
||||||
MQ#mqueue{q = queue:new()};
|
|
||||||
init_q(MQ = #mqueue{type = priority}, #{priorities := Priorities}) ->
|
|
||||||
init_pq(Priorities, MQ#mqueue{q = ?PQUEUE:new()}).
|
|
||||||
|
|
||||||
init_pq([], MQ) ->
|
len(#mqueue{len = Len}) -> Len.
|
||||||
MQ;
|
|
||||||
init_pq([{Topic, P} | L], MQ) ->
|
|
||||||
{_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
|
|
||||||
init_pq(L, MQ1).
|
|
||||||
|
|
||||||
insert_p(Topic, P, MQ = #mqueue{priorities = L, pseq = Seq}) ->
|
|
||||||
<<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
|
|
||||||
{PInt, MQ#mqueue{priorities = [{Topic, PInt} | L], pseq = Seq + 1}}.
|
|
||||||
|
|
||||||
-spec(type(mqueue()) -> simple | priority).
|
|
||||||
type(#mqueue{type = Type}) -> Type.
|
|
||||||
|
|
||||||
is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
|
|
||||||
is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q).
|
|
||||||
|
|
||||||
len(#mqueue{type = simple, len = Len}) -> Len;
|
|
||||||
len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q).
|
|
||||||
|
|
||||||
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
|
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
|
||||||
|
|
||||||
%% @doc Dropped of the mqueue
|
%% @doc Return number of dropped messages.
|
||||||
-spec(dropped(mqueue()) -> non_neg_integer()).
|
-spec(dropped(mqueue()) -> count()).
|
||||||
dropped(#mqueue{dropped = Dropped}) -> Dropped.
|
dropped(#mqueue{dropped = Dropped}) -> Dropped.
|
||||||
|
|
||||||
%% @doc Stats of the mqueue
|
%% @doc Stats of the mqueue
|
||||||
-spec(stats(mqueue()) -> [stat()]).
|
-spec(stats(mqueue()) -> [stat()]).
|
||||||
stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
|
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
|
||||||
[{len, case Type of
|
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
|
||||||
simple -> Len;
|
|
||||||
priority -> ?PQUEUE:len(Q)
|
|
||||||
end} | [{max_len, MaxLen}, {dropped, Dropped}]].
|
|
||||||
|
|
||||||
%% @doc Enqueue a message.
|
%% @doc Enqueue a message.
|
||||||
-spec(in(emqx_types:message(), mqueue()) -> mqueue()).
|
-spec(in(message(), mqueue()) -> {undefined | message(), mqueue()}).
|
||||||
in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
|
in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
|
||||||
MQ;
|
{_Dropped = undefined, MQ};
|
||||||
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
|
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
||||||
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
|
p_table = PTab,
|
||||||
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
|
q = Q,
|
||||||
when Len >= MaxLen ->
|
len = Len,
|
||||||
{{value, _Old}, Q2} = queue:out(Q),
|
max_len = MaxLen,
|
||||||
MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
|
dropped = Dropped
|
||||||
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
|
} = MQ) ->
|
||||||
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
|
Priority = get_priority(Topic, PTab, Dp),
|
||||||
|
PLen = ?PQUEUE:plen(Priority, Q),
|
||||||
in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
|
case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
|
||||||
priorities = Priorities,
|
|
||||||
max_len = 0}) ->
|
|
||||||
case lists:keysearch(Topic, 1, Priorities) of
|
|
||||||
{value, {_, Pri}} ->
|
|
||||||
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)};
|
|
||||||
false ->
|
|
||||||
{Pri, MQ1} = insert_p(Topic, 0, MQ),
|
|
||||||
MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
|
|
||||||
end;
|
|
||||||
in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
|
|
||||||
priorities = Priorities,
|
|
||||||
max_len = MaxLen}) ->
|
|
||||||
case lists:keysearch(Topic, 1, Priorities) of
|
|
||||||
{value, {_, Pri}} ->
|
|
||||||
case ?PQUEUE:plen(Pri, Q) >= MaxLen of
|
|
||||||
true ->
|
true ->
|
||||||
{_, Q1} = ?PQUEUE:out(Pri, Q),
|
%% reached max length, drop the oldest message
|
||||||
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)};
|
{{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
|
||||||
|
Q2 = ?PQUEUE:in(Msg, Priority, Q1),
|
||||||
|
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
|
||||||
false ->
|
false ->
|
||||||
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
|
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
|
||||||
end;
|
|
||||||
false ->
|
|
||||||
{Pri, MQ1} = insert_p(Topic, 0, MQ),
|
|
||||||
MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
out(MQ = #mqueue{type = simple, len = 0}) ->
|
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
|
||||||
|
out(MQ = #mqueue{len = 0, q = Q}) ->
|
||||||
|
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
|
||||||
{empty, MQ};
|
{empty, MQ};
|
||||||
out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
|
out(MQ = #mqueue{q = Q, len = Len}) ->
|
||||||
{R, Q2} = queue:out(Q),
|
{R, Q1} = ?PQUEUE:out(Q),
|
||||||
{R, MQ#mqueue{q = Q2, len = Len - 1}};
|
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
|
||||||
out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
|
|
||||||
{R, Q2} = queue:out(Q),
|
get_opt(Key, Opts, Default) ->
|
||||||
{R, MQ#mqueue{q = Q2, len = Len - 1}};
|
case maps:get(Key, Opts, Default) of
|
||||||
out(MQ = #mqueue{type = priority, q = Q}) ->
|
undefined -> Default;
|
||||||
{R, Q2} = ?PQUEUE:out(Q),
|
X -> X
|
||||||
{R, MQ#mqueue{q = Q2}}.
|
end.
|
||||||
|
|
||||||
|
get_priority_opt(Opts) ->
|
||||||
|
case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
|
||||||
|
lowest -> ?LOWEST_PRIORITY;
|
||||||
|
highest -> ?HIGHEST_PRIORITY;
|
||||||
|
N when is_integer(N) -> N
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
|
||||||
|
%% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
|
||||||
|
%% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
|
||||||
|
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
|
||||||
|
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
|
||||||
|
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
|
||||||
|
|
||||||
|
|
|
@ -377,10 +377,10 @@ init([Parent, #{zone := Zone,
|
||||||
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
|
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
|
||||||
|
|
||||||
init_mqueue(Zone) ->
|
init_mqueue(Zone) ->
|
||||||
emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple),
|
emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000),
|
||||||
max_len => get_env(Zone, max_mqueue_len, 1000),
|
store_qos0 => get_env(Zone, mqueue_store_qos0, true),
|
||||||
priorities => get_env(Zone, mqueue_priorities, ""),
|
priorities => get_env(Zone, mqueue_priorities),
|
||||||
store_qos0 => get_env(Zone, mqueue_store_qos0, true)
|
default_priority => get_env(Zone, mqueue_default_priority)
|
||||||
}).
|
}).
|
||||||
|
|
||||||
binding(ConnPid) ->
|
binding(ConnPid) ->
|
||||||
|
@ -817,7 +817,8 @@ dispatch(Msg = #message{qos = QoS} = Msg,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
||||||
inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
|
{_Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
|
||||||
|
inc_stats(enqueue, Msg, State#state{mqueue = NewQ}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Deliver
|
%% Deliver
|
||||||
|
|
|
@ -28,57 +28,58 @@ all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_infinity_simple_mqueue,
|
||||||
t_priority_mqueue, t_infinity_priority_mqueue].
|
t_priority_mqueue, t_infinity_priority_mqueue].
|
||||||
|
|
||||||
t_in(_) ->
|
t_in(_) ->
|
||||||
Opts = #{type => simple, max_len => 5, store_qos0 => true},
|
Opts = #{max_len => 5, store_qos0 => true},
|
||||||
Q = ?Q:init(Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assert(?Q:is_empty(Q)),
|
?assert(?Q:is_empty(Q)),
|
||||||
Q1 = ?Q:in(#message{}, Q),
|
{_, Q1} = ?Q:in(#message{}, Q),
|
||||||
?assertEqual(1, ?Q:len(Q1)),
|
?assertEqual(1, ?Q:len(Q1)),
|
||||||
Q2 = ?Q:in(#message{qos = 1}, Q1),
|
{_, Q2} = ?Q:in(#message{qos = 1}, Q1),
|
||||||
?assertEqual(2, ?Q:len(Q2)),
|
?assertEqual(2, ?Q:len(Q2)),
|
||||||
Q3 = ?Q:in(#message{qos = 2}, Q2),
|
{_, Q3} = ?Q:in(#message{qos = 2}, Q2),
|
||||||
Q4 = ?Q:in(#message{}, Q3),
|
{_, Q4} = ?Q:in(#message{}, Q3),
|
||||||
Q5 = ?Q:in(#message{}, Q4),
|
{_, Q5} = ?Q:in(#message{}, Q4),
|
||||||
?assertEqual(5, ?Q:len(Q5)).
|
?assertEqual(5, ?Q:len(Q5)).
|
||||||
|
|
||||||
t_in_qos0(_) ->
|
t_in_qos0(_) ->
|
||||||
Opts = #{type => simple, max_len => 5, store_qos0 => false},
|
Opts = #{max_len => 5, store_qos0 => false},
|
||||||
Q = ?Q:init(Opts),
|
Q = ?Q:init(Opts),
|
||||||
Q1 = ?Q:in(#message{qos = 0}, Q),
|
{_, Q1} = ?Q:in(#message{qos = 0}, Q),
|
||||||
?assert(?Q:is_empty(Q1)),
|
?assert(?Q:is_empty(Q1)),
|
||||||
Q2 = ?Q:in(#message{qos = 0}, Q1),
|
{_, Q2} = ?Q:in(#message{qos = 0}, Q1),
|
||||||
?assert(?Q:is_empty(Q2)).
|
?assert(?Q:is_empty(Q2)).
|
||||||
|
|
||||||
t_out(_) ->
|
t_out(_) ->
|
||||||
Opts = #{type => simple, max_len => 5, store_qos0 => true},
|
Opts = #{max_len => 5, store_qos0 => true},
|
||||||
Q = ?Q:init(Opts),
|
Q = ?Q:init(Opts),
|
||||||
{empty, Q} = ?Q:out(Q),
|
{empty, Q} = ?Q:out(Q),
|
||||||
Q1 = ?Q:in(#message{}, Q),
|
{_, Q1} = ?Q:in(#message{}, Q),
|
||||||
{Value, Q2} = ?Q:out(Q1),
|
{Value, Q2} = ?Q:out(Q1),
|
||||||
?assertEqual(0, ?Q:len(Q2)),
|
?assertEqual(0, ?Q:len(Q2)),
|
||||||
?assertEqual({value, #message{}}, Value).
|
?assertEqual({value, #message{}}, Value).
|
||||||
|
|
||||||
t_simple_mqueue(_) ->
|
t_simple_mqueue(_) ->
|
||||||
Opts = #{type => simple, max_len => 3, store_qos0 => false},
|
Opts = #{max_len => 3, store_qos0 => false},
|
||||||
Q = ?Q:init(Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assertEqual(simple, ?Q:type(Q)),
|
|
||||||
?assertEqual(3, ?Q:max_len(Q)),
|
?assertEqual(3, ?Q:max_len(Q)),
|
||||||
?assert(?Q:is_empty(Q)),
|
?assert(?Q:is_empty(Q)),
|
||||||
Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q),
|
{_, Q1} = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q),
|
||||||
Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1),
|
{_, Q2} = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1),
|
||||||
Q3 = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2),
|
{_, Q3} = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2),
|
||||||
Q4 = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3),
|
{_, Q4} = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3),
|
||||||
?assertEqual(3, ?Q:len(Q4)),
|
?assertEqual(3, ?Q:len(Q4)),
|
||||||
{{value, Msg}, Q5} = ?Q:out(Q4),
|
{{value, Msg}, Q5} = ?Q:out(Q4),
|
||||||
?assertEqual(<<"2">>, Msg#message.payload),
|
?assertEqual(<<"2">>, Msg#message.payload),
|
||||||
?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)).
|
?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)).
|
||||||
|
|
||||||
t_infinity_simple_mqueue(_) ->
|
t_infinity_simple_mqueue(_) ->
|
||||||
Opts = #{type => simple, max_len => 0, store_qos0 => false},
|
Opts = #{max_len => 0, store_qos0 => false},
|
||||||
Q = ?Q:init(Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assert(?Q:is_empty(Q)),
|
?assert(?Q:is_empty(Q)),
|
||||||
?assertEqual(0, ?Q:max_len(Q)),
|
?assertEqual(0, ?Q:max_len(Q)),
|
||||||
Qx = lists:foldl(fun(I, AccQ) ->
|
Qx = lists:foldl(
|
||||||
?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ)
|
fun(I, AccQ) ->
|
||||||
|
{_, NewQ} = ?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ),
|
||||||
|
NewQ
|
||||||
end, Q, lists:seq(1, 255)),
|
end, Q, lists:seq(1, 255)),
|
||||||
?assertEqual(255, ?Q:len(Qx)),
|
?assertEqual(255, ?Q:len(Qx)),
|
||||||
?assertEqual([{len, 255}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)),
|
?assertEqual([{len, 255}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)),
|
||||||
|
@ -86,45 +87,55 @@ t_infinity_simple_mqueue(_) ->
|
||||||
?assertEqual(<<1>>, V#message.payload).
|
?assertEqual(<<1>>, V#message.payload).
|
||||||
|
|
||||||
t_priority_mqueue(_) ->
|
t_priority_mqueue(_) ->
|
||||||
Opts = #{type => priority, max_len => 3, priorities => [{<<"t1">>, 1}, {<<"t2">>, 2}, {<<"t3">>, 3}], store_qos0 => false},
|
Opts = #{max_len => 3,
|
||||||
|
priorities =>
|
||||||
|
#{<<"t1">> => 1,
|
||||||
|
<<"t2">> => 2,
|
||||||
|
<<"t3">> => 3
|
||||||
|
},
|
||||||
|
store_qos0 => false},
|
||||||
Q = ?Q:init(Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assertEqual(priority, ?Q:type(Q)),
|
|
||||||
?assertEqual(3, ?Q:max_len(Q)),
|
?assertEqual(3, ?Q:max_len(Q)),
|
||||||
?assert(?Q:is_empty(Q)),
|
?assert(?Q:is_empty(Q)),
|
||||||
Q1 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q),
|
{_, Q1} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q),
|
||||||
Q2 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1),
|
{_, Q2} = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1),
|
||||||
Q3 = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2),
|
{_, Q3} = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2),
|
||||||
?assertEqual(3, ?Q:len(Q3)),
|
?assertEqual(3, ?Q:len(Q3)),
|
||||||
Q4 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3),
|
{_, Q4} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3),
|
||||||
?assertEqual(4, ?Q:len(Q4)),
|
?assertEqual(4, ?Q:len(Q4)),
|
||||||
Q5 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4),
|
{_, Q5} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4),
|
||||||
?assertEqual(5, ?Q:len(Q5)),
|
?assertEqual(5, ?Q:len(Q5)),
|
||||||
Q6 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
|
{_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
|
||||||
?assertEqual(5, ?Q:len(Q6)),
|
?assertEqual(5, ?Q:len(Q6)),
|
||||||
{{value, Msg}, Q7} = ?Q:out(Q6),
|
{{value, Msg}, Q7} = ?Q:out(Q6),
|
||||||
?assertEqual(4, ?Q:len(Q7)),
|
?assertEqual(4, ?Q:len(Q7)),
|
||||||
?assertEqual(<<"t3">>, Msg#message.topic).
|
?assertEqual(<<"t3">>, Msg#message.topic).
|
||||||
|
|
||||||
t_infinity_priority_mqueue(_) ->
|
t_infinity_priority_mqueue(_) ->
|
||||||
Opts = #{type => priority, max_len => 0, priorities => [{<<"t">>, 1}, {<<"t1">>, 2}], store_qos0 => false},
|
Opts = #{max_len => 0,
|
||||||
|
priorities =>
|
||||||
|
#{<<"t">> => 1,
|
||||||
|
<<"t1">> => 2
|
||||||
|
},
|
||||||
|
store_qos0 => false},
|
||||||
Q = ?Q:init(Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assertEqual(0, ?Q:max_len(Q)),
|
?assertEqual(0, ?Q:max_len(Q)),
|
||||||
Qx = lists:foldl(fun(I, AccQ) ->
|
Qx = lists:foldl(fun(I, AccQ) ->
|
||||||
AccQ1 =
|
{undefined, AccQ1} = ?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ),
|
||||||
?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ),
|
{undefined, AccQ2} = ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1),
|
||||||
?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1)
|
AccQ2
|
||||||
end, Q, lists:seq(1, 255)),
|
end, Q, lists:seq(1, 255)),
|
||||||
?assertEqual(510, ?Q:len(Qx)),
|
?assertEqual(510, ?Q:len(Qx)),
|
||||||
?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)).
|
?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)).
|
||||||
|
|
||||||
t_priority_mqueue2(_) ->
|
t_priority_mqueue2(_) ->
|
||||||
Opts = #{type => priority, max_length => 2, store_qos0 => false},
|
Opts = #{max_length => 2, store_qos0 => false},
|
||||||
Q = ?Q:init("priority_queue2_test", Opts),
|
Q = ?Q:init("priority_queue2_test", Opts),
|
||||||
2 = ?Q:max_len(Q),
|
2 = ?Q:max_len(Q),
|
||||||
Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
|
{_, Q1} = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
|
||||||
Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
|
{_, Q2} = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
|
||||||
Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
|
{_, Q3} = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
|
||||||
Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
|
{_, Q4} = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
|
||||||
4 = ?Q:len(Q4),
|
4 = ?Q:len(Q4),
|
||||||
{{value, _Val}, Q5} = ?Q:out(Q4),
|
{{value, _Val}, Q5} = ?Q:out(Q4),
|
||||||
3 = ?Q:len(Q5).
|
3 = ?Q:len(Q5).
|
||||||
|
|
Loading…
Reference in New Issue