Add dropped/1 function
This commit is contained in:
parent
a5ac32b49b
commit
67566ca372
|
@ -1,5 +1,5 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
|
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
|
||||||
%%
|
%%
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
%% you may not use this file except in compliance with the License.
|
%% you may not use this file except in compliance with the License.
|
||||||
|
@ -43,35 +43,33 @@
|
||||||
|
|
||||||
-module(emqttd_mqueue).
|
-module(emqttd_mqueue).
|
||||||
|
|
||||||
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
-include("emqttd.hrl").
|
||||||
|
|
||||||
-include("emqttd_protocol.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-import(proplists, [get_value/3]).
|
-import(proplists, [get_value/3]).
|
||||||
|
|
||||||
-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]).
|
-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1,
|
||||||
|
dropped/1, stats/1]).
|
||||||
|
|
||||||
-define(LOW_WM, 0.2).
|
-define(LOW_WM, 0.2).
|
||||||
|
|
||||||
-define(HIGH_WM, 0.6).
|
-define(HIGH_WM, 0.6).
|
||||||
|
|
||||||
-type priority() :: {iolist(), pos_integer()}.
|
-type(priority() :: {iolist(), pos_integer()}).
|
||||||
|
|
||||||
-type option() :: {type, simple | priority}
|
-type(option() :: {type, simple | priority}
|
||||||
| {max_length, pos_integer() | infinity}
|
| {max_length, pos_integer() | infinity}
|
||||||
| {priority, list(priority())}
|
| {priority, list(priority())}
|
||||||
| {low_watermark, float()} %% Low watermark
|
| {low_watermark, float()} %% Low watermark
|
||||||
| {high_watermark, float()} %% High watermark
|
| {high_watermark, float()} %% High watermark
|
||||||
| {queue_qos0, boolean()}. %% Queue Qos0?
|
| {queue_qos0, boolean()}). %% Queue Qos0?
|
||||||
|
|
||||||
-type mqueue_option() :: {max_length, pos_integer()} %% Max queue length
|
-type(stat() :: {max_len, infinity | pos_integer()}
|
||||||
| {low_watermark, float()} %% Low watermark
|
|
||||||
| {high_watermark, float()} %% High watermark
|
|
||||||
| {queue_qos0, boolean()}. %% Queue Qos0
|
|
||||||
|
|
||||||
-type stat() :: {max_len, infinity | pos_integer()}
|
|
||||||
| {len, non_neg_integer()}
|
| {len, non_neg_integer()}
|
||||||
| {dropped, non_neg_integer()}.
|
| {dropped, non_neg_integer()}).
|
||||||
|
|
||||||
-record(mqueue, {type :: simple | priority,
|
-record(mqueue, {type :: simple | priority,
|
||||||
name, q :: queue:queue() | priority_queue:q(),
|
name, q :: queue:queue() | priority_queue:q(),
|
||||||
|
@ -83,12 +81,12 @@
|
||||||
qos0 = false, dropped = 0,
|
qos0 = false, dropped = 0,
|
||||||
alarm_fun}).
|
alarm_fun}).
|
||||||
|
|
||||||
-type mqueue() :: #mqueue{}.
|
-type(mqueue() :: #mqueue{}).
|
||||||
|
|
||||||
-export_type([mqueue/0, priority/0, option/0]).
|
-export_type([mqueue/0, priority/0, option/0]).
|
||||||
|
|
||||||
%% @doc New Queue.
|
%% @doc New Queue.
|
||||||
-spec(new(iolist(), list(mqueue_option()), fun()) -> mqueue()).
|
-spec(new(iolist(), list(option()), fun()) -> mqueue()).
|
||||||
new(Name, Opts, AlarmFun) ->
|
new(Name, Opts, AlarmFun) ->
|
||||||
Type = get_value(type, Opts, simple),
|
Type = get_value(type, Opts, simple),
|
||||||
MaxLen = get_value(max_length, Opts, infinity),
|
MaxLen = get_value(max_length, Opts, infinity),
|
||||||
|
@ -141,6 +139,10 @@ len(#mqueue{type = priority, q = Q}) -> priority_queue:len(Q).
|
||||||
|
|
||||||
max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
|
max_len(#mqueue{max_len= MaxLen}) -> MaxLen.
|
||||||
|
|
||||||
|
%% @doc Dropped of the mqueue
|
||||||
|
-spec(dropped(mqueue()) -> non_neg_integer()).
|
||||||
|
dropped(#mqueue{dropped = Dropped}) -> Dropped.
|
||||||
|
|
||||||
%% @doc Stats of the mqueue
|
%% @doc Stats of the mqueue
|
||||||
-spec(stats(mqueue()) -> [stat()]).
|
-spec(stats(mqueue()) -> [stat()]).
|
||||||
stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
|
stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
|
||||||
|
@ -208,14 +210,12 @@ maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun
|
||||||
title = io_lib:format("Queue ~s high-water mark", [Name]),
|
title = io_lib:format("Queue ~s high-water mark", [Name]),
|
||||||
summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
|
summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
|
||||||
MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
|
MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
|
||||||
|
|
||||||
maybe_set_alarm(MQ) ->
|
maybe_set_alarm(MQ) ->
|
||||||
MQ.
|
MQ.
|
||||||
|
|
||||||
maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
|
maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_wm = LowWM, alarm_fun = AlarmFun})
|
||||||
when Len < LowWM ->
|
when Len < LowWM ->
|
||||||
MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
|
MQ#mqueue{alarm_fun = AlarmFun(clear, list_to_binary(["queue_high_watermark.", Name]))};
|
||||||
|
|
||||||
maybe_clear_alarm(MQ) ->
|
maybe_clear_alarm(MQ) ->
|
||||||
MQ.
|
MQ.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue