Support priority queue

This commit is contained in:
Feng Lee 2018-08-30 18:29:20 +08:00
parent a67958adb4
commit 1f2bbe3eb8
1 changed files with 34 additions and 38 deletions

View File

@ -11,7 +11,6 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%% TODO: should be a bound queue.
%% @doc A Simple in-memory message queue. %% @doc A Simple in-memory message queue.
%% %%
%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client %% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
@ -39,70 +38,67 @@
%% %%
%% @end %% @end
%% TODO: ...
-module(emqx_mqueue). -module(emqx_mqueue).
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-import(proplists, [get_value/3]). -export([init/1, type/1]).
-export([is_empty/1]).
-export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1]). -export([len/1, max_len/1]).
-export([dropped/1, stats/1]). -export([in/2, out/1]).
-export([stats/1, dropped/1]).
-define(PQUEUE, emqx_pqueue). -define(PQUEUE, emqx_pqueue).
-type(priority() :: {iolist(), pos_integer()}). -type(priority() :: {iolist(), pos_integer()}).
-type(options() :: #{type => simple | priority, -type(options() :: #{type := simple | priority,
max_len => non_neg_integer(), max_len := non_neg_integer(),
priority => list(priority()), priorities => list(priority()),
store_qos0 => boolean()}). store_qos0 => boolean()}).
-type(stat() :: {max_len, non_neg_integer()} -type(stat() :: {len, non_neg_integer()}
| {len, non_neg_integer()} | {max_len, non_neg_integer()}
| {dropped, non_neg_integer()}). | {dropped, non_neg_integer()}).
-record(mqueue, {type :: simple | priority, -record(mqueue, {
name, q :: queue:queue() | ?PQUEUE:q(), type :: simple | priority,
q :: queue:queue() | ?PQUEUE:q(),
%% priority table %% priority table
pseq = 0, priorities = [], priorities = [],
%% len of simple queue pseq = 0,
len = 0, max_len = 0, len = 0,
qos0 = false, dropped = 0}). max_len = 0,
qos0 = false,
dropped = 0
}).
-type(mqueue() :: #mqueue{}). -type(mqueue() :: #mqueue{}).
-export_type([mqueue/0, priority/0, options/0]). -export_type([mqueue/0, priority/0, options/0]).
-spec(new(iolist(), options()) -> mqueue()). -spec(init(options()) -> mqueue()).
new(Name, #{type := Type, max_len := MaxLen, store_qos0 := StoreQos0}) -> init(Opts = #{type := Type, max_len := MaxLen, store_qos0 := QoS0}) ->
init_q(#mqueue{type = Type, name = iolist_to_binary(Name), init_q(#mqueue{type = Type, len = 0, max_len = MaxLen, qos0 = QoS0}, Opts).
len = 0, max_len = MaxLen, qos0 = StoreQos0}).
init_q(MQ = #mqueue{type = simple}) -> init_q(MQ = #mqueue{type = simple}, _Opts) ->
MQ#mqueue{q = queue:new()}; MQ#mqueue{q = queue:new()};
init_q(MQ = #mqueue{type = priority}) -> init_q(MQ = #mqueue{type = priority}, #{priorities := Priorities}) ->
%%Priorities = get_value(priority, Opts, []), init_pq(Priorities, MQ#mqueue{q = ?PQUEUE:new()}).
init_p([], MQ#mqueue{q = ?PQUEUE:new()}).
init_p([], MQ) -> init_pq([], MQ) ->
MQ; MQ;
init_p([{Topic, P} | L], MQ) -> init_pq([{Topic, P} | L], MQ) ->
{_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ), {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
init_p(L, MQ1). init_pq(L, MQ1).
insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> insert_p(Topic, P, MQ = #mqueue{priorities = L, pseq = Seq}) ->
<<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>, <<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
{PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}. {PInt, MQ#mqueue{priorities = [{Topic, PInt} | L], pseq = Seq + 1}}.
-spec(name(mqueue()) -> iolist()). -spec(type(mqueue()) -> simple | priority).
name(#mqueue{name = Name}) -> type(#mqueue{type = Type}) -> Type.
Name.
-spec(type(mqueue()) -> atom()).
type(#mqueue{type = Type}) ->
Type.
is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q). is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q).