emqx/src/emqx_mqueue.erl

281 lines
9.6 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @doc A Simple in-memory message queue.
%%
%% Notice that MQTT is not a (on-disk) persistent messaging queue.
%% It assumes that clients should be online in most of the time.
%%
%% This module implements a simple in-memory queue for MQTT persistent session.
%%
%% If the broker restarts or crashes, all queued messages will be lost.
%%
%% Concept of Message Queue and Inflight Window:
%%
%% |<----------------- Max Len ----------------->|
%% -----------------------------------------------
%% IN -> | Messages Queue | Inflight Window | -> Out
%% -----------------------------------------------
%% |<--- Win Size --->|
%%
%%
%% 1. Inflight Window is to store the messages
%% that are delivered but still awaiting for puback.
%%
%% 2. Messages are enqueued to tail when the inflight window is full.
%%
%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
%% in init options
%%
%% 4. If the queue is full, drop the oldest one
%% unless `max_len' is set to `0' which implies (`infinity').
%%
%% @end
%%--------------------------------------------------------------------
-module(emqx_mqueue).
-include("emqx.hrl").
-include("types.hrl").
-include("emqx_mqtt.hrl").
-export([ init/1
, info/1
, info/2
]).
-export([ is_empty/1
, len/1
, max_len/1
, in/2
, out/1
, stats/1
, dropped/1
]).
-export([ live_upgrade/1
]).
-export_type([mqueue/0, options/0]).
-type(topic() :: emqx_topic:topic()).
-type(priority() :: infinity | integer()).
-type(pq() :: emqx_pqueue:q()).
-type(count() :: non_neg_integer()).
-type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}).
-type(options() :: #{max_len := count(),
priorities => p_table(),
default_priority => highest | lowest,
store_qos0 => boolean()
}).
-type(message() :: emqx_types:message()).
-type(stat() :: {len, non_neg_integer()}
| {max_len, non_neg_integer()}
| {dropped, non_neg_integer()}).
-define(PQUEUE, emqx_pqueue).
-define(LOWEST_PRIORITY, 0).
-define(HIGHEST_PRIORITY, infinity).
-define(MAX_LEN_INFINITY, 0).
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
-record(shift_opts, {
multiplier :: non_neg_integer(),
base :: integer()
}).
-record(mqueue, {
store_qos0 = false :: boolean(),
max_len = ?MAX_LEN_INFINITY :: count(),
len = 0 :: count(),
dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq(),
shift_opts :: #shift_opts{},
last_p :: non_neg_integer() | undefined,
counter :: non_neg_integer() | undefined
}).
-type(mqueue() :: #mqueue{}).
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
-spec(init(options()) -> mqueue()).
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
true -> MaxLen0;
false -> ?MAX_LEN_INFINITY
end,
#mqueue{max_len = MaxLen,
store_qos0 = QoS_0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts),
shift_opts = get_shift_opt(Opts)
}.
-spec(info(mqueue()) -> emqx_types:infos()).
info(MQ) ->
maps:from_list([{Key, info(Key, MQ)} || Key <- ?INFO_KEYS]).
-spec(info(atom(), mqueue()) -> term()).
info(store_qos0, #mqueue{store_qos0 = True}) ->
True;
info(max_len, #mqueue{max_len = MaxLen}) ->
MaxLen;
info(len, #mqueue{len = Len}) ->
Len;
info(dropped, #mqueue{dropped = Dropped}) ->
Dropped;
info(Info, ?OLD(MQ)) ->
info(Info, live_upgrade(MQ)).
is_empty(#mqueue{len = Len}) -> Len =:= 0;
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
len(#mqueue{len = Len}) -> Len;
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
%% @doc Return number of dropped messages.
-spec(dropped(mqueue()) -> count()).
dropped(#mqueue{dropped = Dropped}) -> Dropped;
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
%% @doc Stats of the mqueue
-spec(stats(mqueue()) -> [stat()]).
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
stats(?OLD(MQ)) ->
stats(live_upgrade(MQ)).
%% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
{_Dropped = Msg, MQ};
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
p_table = PTab,
q = Q,
len = Len,
max_len = MaxLen,
dropped = Dropped
} = MQ) ->
Priority = get_priority(Topic, PTab, Dp),
PLen = ?PQUEUE:plen(Priority, Q),
case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
true ->
%% reached max length, drop the oldest message
{{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
Q2 = ?PQUEUE:in(Msg, Priority, Q1),
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
end;
in(Msg, ?OLD(MQ)) ->
in(Msg, live_upgrade(MQ)).
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
out(MQ = #mqueue{len = 0, q = Q}) ->
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
{empty, MQ};
out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
MQ1 = MQ#mqueue{
q = Q1,
len = Len - 1,
last_p = Prio,
counter = init_counter(Prio, ShiftOpts)
},
{{value, Val}, MQ1};
out(MQ = #mqueue{q = Q, counter = 0}) ->
MQ1 = MQ#mqueue{
q = ?PQUEUE:shift(Q),
last_p = undefined
},
out(MQ1);
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
{R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
out(?OLD(MQ)) ->
out(live_upgrade(MQ)).
get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of
undefined -> Default;
X -> X
end.
get_priority_opt(Opts) ->
case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
lowest -> ?LOWEST_PRIORITY;
highest -> ?HIGHEST_PRIORITY;
N when is_integer(N) -> N
end.
%% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
%% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
%% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
init_counter(?HIGHEST_PRIORITY, Opts) ->
Infinity = 1000000,
init_counter(Infinity, Opts);
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
(Prio + Base) * Mult.
get_shift_opt(Opts) ->
Mult = maps:get(shift_multiplier, Opts, 10),
Min = case Opts of
#{p_table := PTab} ->
case maps:size(PTab) of
0 -> 0;
_ -> lists:min(maps:values(PTab))
end;
_ ->
?LOWEST_PRIORITY
end,
Base = case Min < 0 of
true -> -Min;
false -> 0
end,
#shift_opts{
multiplier = Mult,
base = Base
}.
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
ShiftOpts = case is_map(PTable) of
true -> get_shift_opt(#{p_table => PTable});
false -> get_shift_opt(#{})
end,
#mqueue{ store_qos0 = StoreQos0
, max_len = MaxLen
, dropped = Dropped
, p_table = PTable
, default_p = DefaultP
, len = Len
, q = Q
, shift_opts = ShiftOpts
, last_p = undefined
, counter = undefined
}.