From 1f2bbe3eb8a9af8c055108febe1906fd05a44772 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 30 Aug 2018 18:29:20 +0800 Subject: [PATCH] Support priority queue --- src/emqx_mqueue.erl | 72 +++++++++++++++++++++------------------------ 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index bf31fa663..a93fd8838 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -11,7 +11,6 @@ %% See the License for the specific language governing permissions and %% limitations under the License. -%% TODO: should be a bound queue. %% @doc A Simple in-memory message queue. %% %% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client @@ -39,70 +38,67 @@ %% %% @end -%% TODO: ... -module(emqx_mqueue). -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --import(proplists, [get_value/3]). - --export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1]). --export([dropped/1, stats/1]). +-export([init/1, type/1]). +-export([is_empty/1]). +-export([len/1, max_len/1]). +-export([in/2, out/1]). +-export([stats/1, dropped/1]). -define(PQUEUE, emqx_pqueue). -type(priority() :: {iolist(), pos_integer()}). --type(options() :: #{type => simple | priority, - max_len => non_neg_integer(), - priority => list(priority()), +-type(options() :: #{type := simple | priority, + max_len := non_neg_integer(), + priorities => list(priority()), store_qos0 => boolean()}). --type(stat() :: {max_len, non_neg_integer()} - | {len, non_neg_integer()} +-type(stat() :: {len, non_neg_integer()} + | {max_len, non_neg_integer()} | {dropped, non_neg_integer()}). --record(mqueue, {type :: simple | priority, - name, q :: queue:queue() | ?PQUEUE:q(), - %% priority table - pseq = 0, priorities = [], - %% len of simple queue - len = 0, max_len = 0, - qos0 = false, dropped = 0}). +-record(mqueue, { + type :: simple | priority, + q :: queue:queue() | ?PQUEUE:q(), + %% priority table + priorities = [], + pseq = 0, + len = 0, + max_len = 0, + qos0 = false, + dropped = 0 + }). -type(mqueue() :: #mqueue{}). -export_type([mqueue/0, priority/0, options/0]). --spec(new(iolist(), options()) -> mqueue()). -new(Name, #{type := Type, max_len := MaxLen, store_qos0 := StoreQos0}) -> - init_q(#mqueue{type = Type, name = iolist_to_binary(Name), - len = 0, max_len = MaxLen, qos0 = StoreQos0}). +-spec(init(options()) -> mqueue()). +init(Opts = #{type := Type, max_len := MaxLen, store_qos0 := QoS0}) -> + init_q(#mqueue{type = Type, len = 0, max_len = MaxLen, qos0 = QoS0}, Opts). -init_q(MQ = #mqueue{type = simple}) -> +init_q(MQ = #mqueue{type = simple}, _Opts) -> MQ#mqueue{q = queue:new()}; -init_q(MQ = #mqueue{type = priority}) -> - %%Priorities = get_value(priority, Opts, []), - init_p([], MQ#mqueue{q = ?PQUEUE:new()}). +init_q(MQ = #mqueue{type = priority}, #{priorities := Priorities}) -> + init_pq(Priorities, MQ#mqueue{q = ?PQUEUE:new()}). -init_p([], MQ) -> +init_pq([], MQ) -> MQ; -init_p([{Topic, P} | L], MQ) -> +init_pq([{Topic, P} | L], MQ) -> {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ), - init_p(L, MQ1). + init_pq(L, MQ1). -insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> +insert_p(Topic, P, MQ = #mqueue{priorities = L, pseq = Seq}) -> <> = <>, - {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}. + {PInt, MQ#mqueue{priorities = [{Topic, PInt} | L], pseq = Seq + 1}}. --spec(name(mqueue()) -> iolist()). -name(#mqueue{name = Name}) -> - Name. - --spec(type(mqueue()) -> atom()). -type(#mqueue{type = Type}) -> - Type. +-spec(type(mqueue()) -> simple | priority). +type(#mqueue{type = Type}) -> Type. is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q).