diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 4f825329a..08e620a37 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -58,25 +58,27 @@ -define(HIGH_WM, 0.6). +-define(PQUEUE, priority_queue). + -type(priority() :: {iolist(), pos_integer()}). -type(option() :: {type, simple | priority} - | {max_length, pos_integer() | infinity} + | {max_length, non_neg_integer()} %% Max queue length | {priority, list(priority())} | {low_watermark, float()} %% Low watermark | {high_watermark, float()} %% High watermark - | {queue_qos0, boolean()}). %% Queue Qos0? + | {store_qos0, boolean()}). %% Queue Qos0? --type(stat() :: {max_len, infinity | pos_integer()} +-type(stat() :: {max_len, non_neg_integer()} | {len, non_neg_integer()} | {dropped, non_neg_integer()}). -record(mqueue, {type :: simple | priority, - name, q :: queue:queue() | priority_queue:q(), + name, q :: queue:queue() | ?PQUEUE:q(), %% priority table pseq = 0, priorities = [], %% len of simple queue - len = 0, max_len = infinity, + len = 0, max_len = 0, low_wm = ?LOW_WM, high_wm = ?HIGH_WM, qos0 = false, dropped = 0, alarm_fun}). @@ -89,19 +91,19 @@ -spec(new(iolist(), list(option()), fun()) -> mqueue()). new(Name, Opts, AlarmFun) -> Type = get_value(type, Opts, simple), - MaxLen = get_value(max_length, Opts, infinity), + MaxLen = get_value(max_length, Opts, 0), init_q(#mqueue{type = Type, name = iolist_to_binary(Name), len = 0, max_len = MaxLen, low_wm = low_wm(MaxLen, Opts), high_wm = high_wm(MaxLen, Opts), - qos0 = get_value(queue_qos0, Opts, false), + qos0 = get_value(store_qos0, Opts, false), alarm_fun = AlarmFun}, Opts). init_q(MQ = #mqueue{type = simple}, _Opts) -> MQ#mqueue{q = queue:new()}; init_q(MQ = #mqueue{type = priority}, Opts) -> Priorities = get_value(priority, Opts, []), - init_p(Priorities, MQ#mqueue{q = priority_queue:new()}). + init_p(Priorities, MQ#mqueue{q = ?PQUEUE:new()}). init_p([], MQ) -> MQ; @@ -113,13 +115,13 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> <> = <>, {PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}. -low_wm(infinity, _Opts) -> - infinity; +low_wm(0, _Opts) -> + undefined; low_wm(MaxLen, Opts) -> round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)). -high_wm(infinity, _Opts) -> - infinity; +high_wm(0, _Opts) -> + undefined; high_wm(MaxLen, Opts) -> round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)). @@ -132,12 +134,12 @@ type(#mqueue{type = Type}) -> Type. is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; -is_empty(#mqueue{type = priority, q = Q}) -> priority_queue:is_empty(Q). +is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q). len(#mqueue{type = simple, len = Len}) -> Len; -len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q). +len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q). -max_len(#mqueue{max_len= MaxLen}) -> MaxLen. +max_len(#mqueue{max_len = MaxLen}) -> MaxLen. %% @doc Dropped of the mqueue -spec(dropped(mqueue()) -> non_neg_integer()). @@ -148,14 +150,14 @@ dropped(#mqueue{dropped = Dropped}) -> Dropped. stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) -> [{len, case Type of simple -> Len; - priority -> priority_queue:len(Q) + priority -> ?PQUEUE:len(Q) end} | [{max_len, MaxLen}, {dropped, Dropped}]]. %% @doc Enqueue a message. -spec(in(mqtt_message(), mqueue()) -> mqueue()). in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> +in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) when Len >= MaxLen -> @@ -166,43 +168,45 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, - max_len = infinity}) -> + max_len = 0}) -> case lists:keysearch(Topic, 1, Priorities) of {value, {_, Pri}} -> - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)}; + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}; false -> {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end; in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, max_len = MaxLen}) -> case lists:keysearch(Topic, 1, Priorities) of {value, {_, Pri}} -> - case priority_queue:plen(Pri, Q) >= MaxLen of + case ?PQUEUE:plen(Pri, Q) >= MaxLen of true -> - {_, Q1} = priority_queue:out(Pri, Q), - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q1)}; + {_, Q1} = ?PQUEUE:out(Pri, Q), + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)}; false -> - MQ#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end; false -> {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = priority_queue:in(Msg, Pri, Q)} + MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end. out(MQ = #mqueue{type = simple, len = 0}) -> {empty, MQ}; -out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> +out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> {R, Q2} = queue:out(Q), {R, MQ#mqueue{q = Q2, len = Len - 1}}; out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> {R, Q2} = queue:out(Q), {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}; out(MQ = #mqueue{type = priority, q = Q}) -> - {R, Q2} = priority_queue:out(Q), + {R, Q2} = ?PQUEUE:out(Q), {R, MQ#mqueue{q = Q2}}. +maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) -> + MQ; maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun}) when Len > HighWM -> Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]), @@ -213,6 +217,8 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun maybe_set_alarm(MQ) -> MQ. +maybe_clear_alarm(MQ = #mqueue{low_wm = undefined}) -> + MQ; maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun}) when Len < LowWM -> MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};