Improve the mqueue design

This commit is contained in:
Feng Lee 2017-03-23 15:07:06 +08:00
parent 0d617c17e0
commit 496d046d52
1 changed files with 33 additions and 27 deletions

View File

@ -58,25 +58,27 @@
-define(HIGH_WM, 0.6).
-define(PQUEUE, priority_queue).
-type(priority() :: {iolist(), pos_integer()}).
-type(option() :: {type, simple | priority}
| {max_length, pos_integer() | infinity}
| {max_length, non_neg_integer()} %% Max queue length
| {priority, list(priority())}
| {low_watermark, float()} %% Low watermark
| {high_watermark, float()} %% High watermark
| {queue_qos0, boolean()}). %% Queue Qos0?
| {store_qos0, boolean()}). %% Queue Qos0?
-type(stat() :: {max_len, infinity | pos_integer()}
-type(stat() :: {max_len, non_neg_integer()}
| {len, non_neg_integer()}
| {dropped, non_neg_integer()}).
-record(mqueue, {type :: simple | priority,
name, q :: queue:queue() | priority_queue:q(),
name, q :: queue:queue() | ?PQUEUE:q(),
%% priority table
pseq = 0, priorities = [],
%% len of simple queue
len = 0, max_len = infinity,
len = 0, max_len = 0,
low_wm = ?LOW_WM, high_wm = ?HIGH_WM,
qos0 = false, dropped = 0,
alarm_fun}).
@ -89,19 +91,19 @@
-spec(new(iolist(), list(option()), fun()) -> mqueue()).
new(Name, Opts, AlarmFun) ->
Type = get_value(type, Opts, simple),
MaxLen = get_value(max_length, Opts, infinity),
MaxLen = get_value(max_length, Opts, 0),
init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
len = 0, max_len = MaxLen,
low_wm = low_wm(MaxLen, Opts),
high_wm = high_wm(MaxLen, Opts),
qos0 = get_value(queue_qos0, Opts, false),
qos0 = get_value(store_qos0, Opts, false),
alarm_fun = AlarmFun}, Opts).
init_q(MQ = #mqueue{type = simple}, _Opts) ->
MQ#mqueue{q = queue:new()};
init_q(MQ = #mqueue{type = priority}, Opts) ->
Priorities = get_value(priority, Opts, []),
init_p(Priorities, MQ#mqueue{q = priority_queue:new()}).
init_p(Priorities, MQ#mqueue{q = ?PQUEUE:new()}).
init_p([], MQ) ->
MQ;
@ -113,13 +115,13 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
<<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
{PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
low_wm(infinity, _Opts) ->
infinity;
low_wm(0, _Opts) ->
undefined;
low_wm(MaxLen, Opts) ->
round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)).
high_wm(infinity, _Opts) ->
infinity;
high_wm(0, _Opts) ->
undefined;
high_wm(MaxLen, Opts) ->
round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)).
@ -132,12 +134,12 @@ type(#mqueue{type = Type}) ->
Type.
is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q).
is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q).
len(#mqueue{type = simple, len = Len}) -> Len;
len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q).
len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q).
max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
%% @doc Dropped of the mqueue
-spec(dropped(mqueue()) -> non_neg_integer()).
@ -148,14 +150,14 @@ dropped(#mqueue{dropped = Dropped}) -> Dropped.
stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
[{len, case Type of
simple -> Len;
priority -> priority_queue:len(Q)
priority -> ?PQUEUE:len(Q)
end} | [{max_len, MaxLen}, {dropped, Dropped}]].
%% @doc Enqueue a message.
-spec(in(mqtt_message(), mqueue()) -> mqueue()).
in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
MQ;
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
when Len >= MaxLen ->
@ -166,43 +168,45 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
priorities = Priorities,
max_len = infinity}) ->
max_len = 0}) ->
case lists:keysearch(Topic, 1, Priorities) of
{value, {_, Pri}} ->
MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)};
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)};
false ->
{Pri, MQ1} = insert_p(Topic, 0, MQ),
MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
end;
in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
priorities = Priorities,
max_len = MaxLen}) ->
case lists:keysearch(Topic, 1, Priorities) of
{value, {_, Pri}} ->
case priority_queue:plen(Pri, Q) >= MaxLen of
case ?PQUEUE:plen(Pri, Q) >= MaxLen of
true ->
{_, Q1} = priority_queue:out(Pri, Q),
MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)};
{_, Q1} = ?PQUEUE:out(Pri, Q),
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)};
false ->
MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
end;
false ->
{Pri, MQ1} = insert_p(Topic, 0, MQ),
MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)}
MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
end.
out(MQ = #mqueue{type = simple, len = 0}) ->
{empty, MQ};
out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
{R, Q2} = queue:out(Q),
{R, MQ#mqueue{q = Q2, len = Len - 1}};
out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
{R, Q2} = queue:out(Q),
{R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
out(MQ = #mqueue{type = priority, q = Q}) ->
{R, Q2} = priority_queue:out(Q),
{R, Q2} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q2}}.
maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) ->
MQ;
maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
when Len > HighWM ->
Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
@ -213,6 +217,8 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun
maybe_set_alarm(MQ) ->
MQ.
maybe_clear_alarm(MQ = #mqueue{low_wm = undefined}) ->
MQ;
maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
when Len < LowWM ->
MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};