281 lines
9.6 KiB
Erlang
281 lines
9.6 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @doc A Simple in-memory message queue.
|
|
%%
|
|
%% Notice that MQTT is not a (on-disk) persistent messaging queue.
|
|
%% It assumes that clients should be online in most of the time.
|
|
%%
|
|
%% This module implements a simple in-memory queue for MQTT persistent session.
|
|
%%
|
|
%% If the broker restarts or crashes, all queued messages will be lost.
|
|
%%
|
|
%% Concept of Message Queue and Inflight Window:
|
|
%%
|
|
%% |<----------------- Max Len ----------------->|
|
|
%% -----------------------------------------------
|
|
%% IN -> | Messages Queue | Inflight Window | -> Out
|
|
%% -----------------------------------------------
|
|
%% |<--- Win Size --->|
|
|
%%
|
|
%%
|
|
%% 1. Inflight Window is to store the messages
|
|
%% that are delivered but still awaiting for puback.
|
|
%%
|
|
%% 2. Messages are enqueued to tail when the inflight window is full.
|
|
%%
|
|
%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
|
|
%% in init options
|
|
%%
|
|
%% 4. If the queue is full, drop the oldest one
|
|
%% unless `max_len' is set to `0' which implies (`infinity').
|
|
%%
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_mqueue).
|
|
|
|
-include("emqx.hrl").
|
|
-include("types.hrl").
|
|
-include("emqx_mqtt.hrl").
|
|
|
|
-export([ init/1
|
|
, info/1
|
|
, info/2
|
|
]).
|
|
|
|
-export([ is_empty/1
|
|
, len/1
|
|
, max_len/1
|
|
, in/2
|
|
, out/1
|
|
, stats/1
|
|
, dropped/1
|
|
]).
|
|
|
|
-export([ live_upgrade/1
|
|
]).
|
|
|
|
-export_type([mqueue/0, options/0]).
|
|
|
|
-type(topic() :: emqx_topic:topic()).
|
|
-type(priority() :: infinity | integer()).
|
|
-type(pq() :: emqx_pqueue:q()).
|
|
-type(count() :: non_neg_integer()).
|
|
-type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}).
|
|
-type(options() :: #{max_len := count(),
|
|
priorities => p_table(),
|
|
default_priority => highest | lowest,
|
|
store_qos0 => boolean()
|
|
}).
|
|
-type(message() :: emqx_types:message()).
|
|
|
|
-type(stat() :: {len, non_neg_integer()}
|
|
| {max_len, non_neg_integer()}
|
|
| {dropped, non_neg_integer()}).
|
|
|
|
-define(PQUEUE, emqx_pqueue).
|
|
-define(LOWEST_PRIORITY, 0).
|
|
-define(HIGHEST_PRIORITY, infinity).
|
|
-define(MAX_LEN_INFINITY, 0).
|
|
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
|
|
|
|
-record(shift_opts, {
|
|
multiplier :: non_neg_integer(),
|
|
base :: integer()
|
|
}).
|
|
|
|
-record(mqueue, {
|
|
store_qos0 = false :: boolean(),
|
|
max_len = ?MAX_LEN_INFINITY :: count(),
|
|
len = 0 :: count(),
|
|
dropped = 0 :: count(),
|
|
p_table = ?NO_PRIORITY_TABLE :: p_table(),
|
|
default_p = ?LOWEST_PRIORITY :: priority(),
|
|
q = ?PQUEUE:new() :: pq(),
|
|
shift_opts :: #shift_opts{},
|
|
last_p :: non_neg_integer() | undefined,
|
|
counter :: non_neg_integer() | undefined
|
|
}).
|
|
|
|
-type(mqueue() :: #mqueue{}).
|
|
|
|
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
|
|
|
|
-spec(init(options()) -> mqueue()).
|
|
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
|
|
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
|
|
true -> MaxLen0;
|
|
false -> ?MAX_LEN_INFINITY
|
|
end,
|
|
#mqueue{max_len = MaxLen,
|
|
store_qos0 = QoS_0,
|
|
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
|
|
default_p = get_priority_opt(Opts),
|
|
shift_opts = get_shift_opt(Opts)
|
|
}.
|
|
|
|
-spec(info(mqueue()) -> emqx_types:infos()).
|
|
info(MQ) ->
|
|
maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]).
|
|
|
|
-spec(info(atom(), mqueue()) -> term()).
|
|
info(store_qos0, #mqueue{store_qos0 = True}) ->
|
|
True;
|
|
info(max_len, #mqueue{max_len = MaxLen}) ->
|
|
MaxLen;
|
|
info(len, #mqueue{len = Len}) ->
|
|
Len;
|
|
info(dropped, #mqueue{dropped = Dropped}) ->
|
|
Dropped;
|
|
info(Info, ?OLD(MQ)) ->
|
|
info(Info, live_upgrade(MQ)).
|
|
|
|
is_empty(#mqueue{len = Len}) -> Len =:= 0;
|
|
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
|
|
|
|
len(#mqueue{len = Len}) -> Len;
|
|
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
|
|
|
|
max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
|
|
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
|
|
|
|
%% @doc Return number of dropped messages.
|
|
-spec(dropped(mqueue()) -> count()).
|
|
dropped(#mqueue{dropped = Dropped}) -> Dropped;
|
|
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
|
|
|
|
%% @doc Stats of the mqueue
|
|
-spec(stats(mqueue()) -> [stat()]).
|
|
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
|
|
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
|
|
stats(?OLD(MQ)) ->
|
|
stats(live_upgrade(MQ)).
|
|
|
|
%% @doc Enqueue a message.
|
|
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
|
|
in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
|
|
{_Dropped = Msg, MQ};
|
|
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
|
p_table = PTab,
|
|
q = Q,
|
|
len = Len,
|
|
max_len = MaxLen,
|
|
dropped = Dropped
|
|
} = MQ) ->
|
|
Priority = get_priority(Topic, PTab, Dp),
|
|
PLen = ?PQUEUE:plen(Priority, Q),
|
|
case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
|
|
true ->
|
|
%% reached max length, drop the oldest message
|
|
{{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
|
|
Q2 = ?PQUEUE:in(Msg, Priority, Q1),
|
|
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
|
|
false ->
|
|
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
|
|
end;
|
|
in(Msg, ?OLD(MQ)) ->
|
|
in(Msg, live_upgrade(MQ)).
|
|
|
|
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
|
|
out(MQ = #mqueue{len = 0, q = Q}) ->
|
|
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
|
|
{empty, MQ};
|
|
out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
|
|
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
|
|
MQ1 = MQ#mqueue{
|
|
q = Q1,
|
|
len = Len - 1,
|
|
last_p = Prio,
|
|
counter = init_counter(Prio, ShiftOpts)
|
|
},
|
|
{{value, Val}, MQ1};
|
|
out(MQ = #mqueue{q = Q, counter = 0}) ->
|
|
MQ1 = MQ#mqueue{
|
|
q = ?PQUEUE:shift(Q),
|
|
last_p = undefined
|
|
},
|
|
out(MQ1);
|
|
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
|
|
{R, Q1} = ?PQUEUE:out(Q),
|
|
{R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
|
|
out(?OLD(MQ)) ->
|
|
out(live_upgrade(MQ)).
|
|
|
|
get_opt(Key, Opts, Default) ->
|
|
case maps:get(Key, Opts, Default) of
|
|
undefined -> Default;
|
|
X -> X
|
|
end.
|
|
|
|
get_priority_opt(Opts) ->
|
|
case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
|
|
lowest -> ?LOWEST_PRIORITY;
|
|
highest -> ?HIGHEST_PRIORITY;
|
|
N when is_integer(N) -> N
|
|
end.
|
|
|
|
%% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
|
|
%% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
|
|
%% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
|
|
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
|
|
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
|
|
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
|
|
|
|
init_counter(?HIGHEST_PRIORITY, Opts) ->
|
|
Infinity = 1000000,
|
|
init_counter(Infinity, Opts);
|
|
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
|
|
(Prio + Base) * Mult.
|
|
|
|
get_shift_opt(Opts) ->
|
|
Mult = maps:get(shift_multiplier, Opts, 10),
|
|
Min = case Opts of
|
|
#{p_table := PTab} ->
|
|
case maps:size(PTab) of
|
|
0 -> 0;
|
|
_ -> lists:min(maps:values(PTab))
|
|
end;
|
|
_ ->
|
|
?LOWEST_PRIORITY
|
|
end,
|
|
Base = case Min < 0 of
|
|
true -> -Min;
|
|
false -> 0
|
|
end,
|
|
#shift_opts{
|
|
multiplier = Mult,
|
|
base = Base
|
|
}.
|
|
|
|
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
|
|
ShiftOpts = case is_map(PTable) of
|
|
true -> get_shift_opt(#{p_table => PTable});
|
|
false -> get_shift_opt(#{})
|
|
end,
|
|
#mqueue{ store_qos0 = StoreQos0
|
|
, max_len = MaxLen
|
|
, dropped = Dropped
|
|
, p_table = PTable
|
|
, default_p = DefaultP
|
|
, len = Len
|
|
, q = Q
|
|
, shift_opts = ShiftOpts
|
|
, last_p = undefined
|
|
, counter = undefined
|
|
}.
|