emqx/src/emqx_mqueue.erl

175 lines
6.2 KiB
Erlang

%% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% @doc A Simple in-memory message queue.
%%
%% Notice that MQTT is not a (on-disk) persistent messaging queue.
%% It assumes that clients should be online in most of the time.
%%
%% This module implements a simple in-memory queue for MQTT persistent session.
%%
%% If the broker restarts or crashes, all queued messages will be lost.
%%
%% Concept of Message Queue and Inflight Window:
%%
%% |<----------------- Max Len ----------------->|
%% -----------------------------------------------
%% IN -> | Messages Queue | Inflight Window | -> Out
%% -----------------------------------------------
%% |<--- Win Size --->|
%%
%%
%% 1. Inflight Window is to store the messages
%% that are delivered but still awaiting for puback.
%%
%% 2. Messages are enqueued to tail when the inflight window is full.
%%
%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
%% in init options
%%
%% 4. If the queue is full, drop the oldest one
%% unless `max_len' is set to `0' which implies (`infinity').
%%
%% @end
-module(emqx_mqueue).
-include("emqx.hrl").
-include("types.hrl").
-include("emqx_mqtt.hrl").
-export([init/1]).
-export([ is_empty/1
, len/1
, max_len/1
, in/2
, out/1
, stats/1
, dropped/1
]).
-export_type([mqueue/0, options/0]).
-type(topic() :: emqx_topic:topic()).
-type(priority() :: infinity | integer()).
-type(pq() :: emqx_pqueue:q()).
-type(count() :: non_neg_integer()).
-type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}).
-type(options() :: #{max_len := count(),
priorities => p_table(),
default_priority => highest | lowest,
store_qos0 => boolean()
}).
-type(message() :: emqx_types:message()).
-type(stat() :: {len, non_neg_integer()}
| {max_len, non_neg_integer()}
| {dropped, non_neg_integer()}).
-define(PQUEUE, emqx_pqueue).
-define(LOWEST_PRIORITY, 0).
-define(HIGHEST_PRIORITY, infinity).
-define(MAX_LEN_INFINITY, 0).
-record(mqueue, {
store_qos0 = false :: boolean(),
max_len = ?MAX_LEN_INFINITY :: count(),
len = 0 :: count(),
dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq()
}).
-opaque(mqueue() :: #mqueue{}).
-spec(init(options()) -> mqueue()).
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
true -> MaxLen0;
false -> ?MAX_LEN_INFINITY
end,
#mqueue{max_len = MaxLen,
store_qos0 = QoS_0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts)
}.
is_empty(#mqueue{len = Len}) -> Len =:= 0.
len(#mqueue{len = Len}) -> Len.
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
%% @doc Return number of dropped messages.
-spec(dropped(mqueue()) -> count()).
dropped(#mqueue{dropped = Dropped}) -> Dropped.
%% @doc Stats of the mqueue
-spec(stats(mqueue()) -> [stat()]).
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
%% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
in(Msg = #message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
{_Dropped = Msg, MQ};
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
p_table = PTab,
q = Q,
len = Len,
max_len = MaxLen,
dropped = Dropped
} = MQ) ->
Priority = get_priority(Topic, PTab, Dp),
PLen = ?PQUEUE:plen(Priority, Q),
case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
true ->
%% reached max length, drop the oldest message
{{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
Q2 = ?PQUEUE:in(Msg, Priority, Q1),
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false ->
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
end.
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
out(MQ = #mqueue{len = 0, q = Q}) ->
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
{empty, MQ};
out(MQ = #mqueue{q = Q, len = Len}) ->
{R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of
undefined -> Default;
X -> X
end.
get_priority_opt(Opts) ->
case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
lowest -> ?LOWEST_PRIORITY;
highest -> ?HIGHEST_PRIORITY;
N when is_integer(N) -> N
end.
%% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
%% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
%% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).