diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 53f37206f..9abcc6843 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -43,35 +43,33 @@ -module(emqttd_mqueue). +-author("Feng Lee "). + -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). -import(proplists, [get_value/3]). --export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]). +-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, + dropped/1, stats/1]). -define(LOW_WM, 0.2). -define(HIGH_WM, 0.6). --type priority() :: {iolist(), pos_integer()}. +-type(priority() :: {iolist(), pos_integer()}). --type option() :: {type, simple | priority} +-type(option() :: {type, simple | priority} | {max_length, pos_integer() | infinity} | {priority, list(priority())} - | {low_watermark, float()} %% Low watermark - | {high_watermark, float()} %% High watermark - | {queue_qos0, boolean()}. %% Queue Qos0? + | {low_watermark, float()} %% Low watermark + | {high_watermark, float()} %% High watermark + | {queue_qos0, boolean()}). %% Queue Qos0? --type mqueue_option() :: {max_length, pos_integer()} %% Max queue length - | {low_watermark, float()} %% Low watermark - | {high_watermark, float()} %% High watermark - | {queue_qos0, boolean()}. %% Queue Qos0 - --type stat() :: {max_len, infinity | pos_integer()} +-type(stat() :: {max_len, infinity | pos_integer()} | {len, non_neg_integer()} - | {dropped, non_neg_integer()}. + | {dropped, non_neg_integer()}). -record(mqueue, {type :: simple | priority, name, q :: queue:queue() | priority_queue:q(), @@ -83,12 +81,12 @@ qos0 = false, dropped = 0, alarm_fun}). --type mqueue() :: #mqueue{}. +-type(mqueue() :: #mqueue{}). -export_type([mqueue/0, priority/0, option/0]). %% @doc New Queue. --spec(new(iolist(), list(mqueue_option()), fun()) -> mqueue()). +-spec(new(iolist(), list(option()), fun()) -> mqueue()). new(Name, Opts, AlarmFun) -> Type = get_value(type, Opts, simple), MaxLen = get_value(max_length, Opts, infinity), @@ -141,6 +139,10 @@ len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q). max_len(#mqueue{max_len= MaxLen}) -> MaxLen. +%% @doc Dropped of the mqueue +-spec(dropped(mqueue()) -> non_neg_integer()). +dropped(#mqueue{dropped = Dropped}) -> Dropped. + %% @doc Stats of the mqueue -spec(stats(mqueue()) -> [stat()]). stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) -> @@ -208,14 +210,12 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun title = io_lib:format("Queue ~s high-water mark", [Name]), summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])}, MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)}; - maybe_set_alarm(MQ) -> MQ. maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun}) when Len < LowWM -> MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))}; - maybe_clear_alarm(MQ) -> MQ.