new queue

This commit is contained in:
Feng Lee 2015-06-11 00:05:20 +08:00
parent db2cc7ba0b
commit c4027dfc16
1 changed files with 93 additions and 49 deletions

View File

@ -20,39 +20,76 @@
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc %%% @doc
%%% emqttd simple queue. %%% simple message queue.
%%%
%%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
%%% should be online in most of the time.
%%%
%%% This module wraps an erlang queue to store offline messages temporarily for MQTT
%%% persistent session.
%%%
%%% If the broker restarted or crashed, all the messages stored will be gone.
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%% TODO: this module should be rewrited... -module(emqttd_mqueue).
-module(emqttd_queue).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl"). -include_lib("emqtt/include/emqtt.hrl").
-export([new/1, new/2, in/3, all/1, clear/1]). -export([new/2, name/1,
is_empty/1, len/1,
in/2, out/1,
peek/1,
to_list/1]).
-define(DEFAULT_MAX_LEN, 1000). %% in_r/2, out_r/1,
-record(mqtt_queue_wrapper, {queue = queue:new(), -define(MAX_LEN, 600).
max_len = ?DEFAULT_MAX_LEN,
store_qos0 = false}).
-type mqtt_queue() :: #mqtt_queue_wrapper{}. -define(HIGH_WM, 0.6).
-define(LOW_WM, 0.2).
-record(mqueue, {name,
len = 0,
max_len = ?MAX_LEN,
queue = queue:new(),
store_qos0 = false,
high_watermark = ?HIGH_WM,
low_watermark = ?LOW_WM,
alert = false}).
-type mqueue() :: #mqueue{}.
-type queue_option() :: {max_queued_messages, pos_integer()} %% Max messages queued
| {high_queue_watermark, float()} %% High watermark
| {low_queue_watermark, float()} %% Low watermark
| {queue_qos0_messages, boolean()}. %% Queue Qos0 messages?
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc New Queue.
%% New Queue.
%%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec new(non_neg_integer()) -> mqtt_queue(). -spec new(binary() | string(), list(queue_option())) -> mqueue().
new(MaxLen) -> #mqtt_queue_wrapper{max_len = MaxLen}. new(Name, Opts) ->
MaxLen = emqttd_opts:g(max_queued_messages, Opts, ?MAX_LEN),
HighWM = round(MaxLen * emqttd_opts:g(high_queue_watermark, Opts, ?HIGH_WM)),
LowWM = round(MaxLen * emqttd_opts:g(low_queue_watermark, Opts, ?LOW_WM)),
#mqueue{name = Name, max_len = MaxLen,
store_qos0 = emqttd_opts:g(queue_qos0_messages, Opts, false),
high_watermark = HighWM, low_watermark = LowWM}.
new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = StoreQos0}. name(#mqueue{name = Name}) ->
Name.
len(#mqueue{len = Len}) ->
Len.
is_empty(#mqueue{len = 0}) -> true;
is_empty(_Q) -> false.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
@ -60,39 +97,46 @@ new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = Sto
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec in(binary(), mqtt_message(), mqtt_queue()) -> mqtt_queue(). -spec in(mqtt_message(), mqueue()) -> mqueue().
in(ClientId, Message = #mqtt_message{qos = Qos}, in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
Wrapper = #mqtt_queue_wrapper{queue = Queue, max_len = MaxLen}) -> MQ;
case queue:len(Queue) < MaxLen of %% queue is full, drop the oldest
true -> in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen, queue = Q}) when Len =:= MaxLen ->
Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)}; Q2 = case queue:out(Q) of
false -> % full {{value, OldMsg}, Q1} ->
if %%TODO: publish the dropped message to $SYS?
Qos =:= ?QOS_0 -> lager:error("Queue(~s) drop message: ~p", [Name, OldMsg]),
lager:error("Queue ~s drop qos0 message: ~p", [ClientId, Message]), Q1;
Wrapper; {empty, Q1} -> %% maybe max_len is 1
true -> Q1
{{value, Msg}, Queue1} = queue:drop(Queue), end,
lager:error("Queue ~s drop message: ~p", [ClientId, Msg]), MQ#mqueue{queue = queue:in(Msg, Q2)};
Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue1)} in(Msg, MQ = #mqueue{len = Len, queue = Q}) ->
end maybe_set_alarm(MQ#mqueue{len = Len+1, queue = queue:in(Msg, Q)}).
end.
%%------------------------------------------------------------------------------ out(MQ = #mqueue{len = 0, queue = _Q}) ->
%% @doc {empty, MQ};
%% Get all messages in queue. out(MQ = #mqueue{len = Len, queue = Q}) ->
%% {Result, Q1} = queue:out(Q),
%% @end {Result, maybe_clear_alarm(MQ#mqueue{len = Len - 1, queue = Q1})}.
%%------------------------------------------------------------------------------
-spec all(mqtt_queue()) -> list().
all(#mqtt_queue_wrapper { queue = Queue }) -> queue:to_list(Queue).
%%------------------------------------------------------------------------------ peek(#mqueue{queue = Q}) ->
%% @doc queue:peek(Q).
%% Clear queue.
%% to_list(#mqueue{queue = Q}) ->
%% @end queue:to_list(Q).
%%------------------------------------------------------------------------------
-spec clear(mqtt_queue()) -> mqtt_queue(). maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_watermark = HighWM, alert = false})
clear(Queue) -> Queue#mqtt_queue_wrapper{queue = queue:new()}. when Len >= HighWM ->
AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]),
emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}),
MQ#mqueue{alert = true};
maybe_set_alarm(MQ) ->
MQ.
maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alert = true})
when Len =< LowWM ->
emqttd_alarm:clear_alarm({queue_high_watermark, Name}), MQ#mqueue{alert = false};
maybe_clear_alarm(MQ) ->
MQ.