From c4027dfc16b3940ab3f78e619ec0daa5338888a6 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 11 Jun 2015 00:05:20 +0800 Subject: [PATCH] new queue --- apps/emqttd/src/emqttd_mqueue.erl | 142 +++++++++++++++++++----------- 1 file changed, 93 insertions(+), 49 deletions(-) diff --git a/apps/emqttd/src/emqttd_mqueue.erl b/apps/emqttd/src/emqttd_mqueue.erl index d9897a3f8..a76c5ef97 100644 --- a/apps/emqttd/src/emqttd_mqueue.erl +++ b/apps/emqttd/src/emqttd_mqueue.erl @@ -20,39 +20,76 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd simple queue. +%%% simple message queue. +%%% +%%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client +%%% should be online in most of the time. +%%% +%%% This module wraps an erlang queue to store offline messages temporarily for MQTT +%%% persistent session. +%%% +%%% If the broker restarted or crashed, all the messages stored will be gone. %%% %%% @end %%%----------------------------------------------------------------------------- -%% TODO: this module should be rewrited... - --module(emqttd_queue). +-module(emqttd_mqueue). -author("Feng Lee "). -include_lib("emqtt/include/emqtt.hrl"). --export([new/1, new/2, in/3, all/1, clear/1]). +-export([new/2, name/1, + is_empty/1, len/1, + in/2, out/1, + peek/1, + to_list/1]). --define(DEFAULT_MAX_LEN, 1000). +%% in_r/2, out_r/1, --record(mqtt_queue_wrapper, {queue = queue:new(), - max_len = ?DEFAULT_MAX_LEN, - store_qos0 = false}). +-define(MAX_LEN, 600). --type mqtt_queue() :: #mqtt_queue_wrapper{}. +-define(HIGH_WM, 0.6). + +-define(LOW_WM, 0.2). + +-record(mqueue, {name, + len = 0, + max_len = ?MAX_LEN, + queue = queue:new(), + store_qos0 = false, + high_watermark = ?HIGH_WM, + low_watermark = ?LOW_WM, + alert = false}). + +-type mqueue() :: #mqueue{}. + +-type queue_option() :: {max_queued_messages, pos_integer()} %% Max messages queued + | {high_queue_watermark, float()} %% High watermark + | {low_queue_watermark, float()} %% Low watermark + | {queue_qos0_messages, boolean()}. %% Queue Qos0 messages? %%------------------------------------------------------------------------------ -%% @doc -%% New Queue. -%% +%% @doc New Queue. %% @end %%------------------------------------------------------------------------------ --spec new(non_neg_integer()) -> mqtt_queue(). -new(MaxLen) -> #mqtt_queue_wrapper{max_len = MaxLen}. +-spec new(binary() | string(), list(queue_option())) -> mqueue(). +new(Name, Opts) -> + MaxLen = emqttd_opts:g(max_queued_messages, Opts, ?MAX_LEN), + HighWM = round(MaxLen * emqttd_opts:g(high_queue_watermark, Opts, ?HIGH_WM)), + LowWM = round(MaxLen * emqttd_opts:g(low_queue_watermark, Opts, ?LOW_WM)), + #mqueue{name = Name, max_len = MaxLen, + store_qos0 = emqttd_opts:g(queue_qos0_messages, Opts, false), + high_watermark = HighWM, low_watermark = LowWM}. -new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = StoreQos0}. +name(#mqueue{name = Name}) -> + Name. + +len(#mqueue{len = Len}) -> + Len. + +is_empty(#mqueue{len = 0}) -> true; +is_empty(_Q) -> false. %%------------------------------------------------------------------------------ %% @doc @@ -60,39 +97,46 @@ new(MaxLen, StoreQos0) -> #mqtt_queue_wrapper{max_len = MaxLen, store_qos0 = Sto %% %% @end %%------------------------------------------------------------------------------ --spec in(binary(), mqtt_message(), mqtt_queue()) -> mqtt_queue(). -in(ClientId, Message = #mqtt_message{qos = Qos}, - Wrapper = #mqtt_queue_wrapper{queue = Queue, max_len = MaxLen}) -> - case queue:len(Queue) < MaxLen of - true -> - Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue)}; - false -> % full - if - Qos =:= ?QOS_0 -> - lager:error("Queue ~s drop qos0 message: ~p", [ClientId, Message]), - Wrapper; - true -> - {{value, Msg}, Queue1} = queue:drop(Queue), - lager:error("Queue ~s drop message: ~p", [ClientId, Msg]), - Wrapper#mqtt_queue_wrapper{queue = queue:in(Message, Queue1)} - end - end. +-spec in(mqtt_message(), mqueue()) -> mqueue(). +in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + MQ; +%% queue is full, drop the oldest +in(Msg, MQ = #mqueue{name = Name, len = Len, max_len = MaxLen, queue = Q}) when Len =:= MaxLen -> + Q2 = case queue:out(Q) of + {{value, OldMsg}, Q1} -> + %%TODO: publish the dropped message to $SYS? + lager:error("Queue(~s) drop message: ~p", [Name, OldMsg]), + Q1; + {empty, Q1} -> %% maybe max_len is 1 + Q1 + end, + MQ#mqueue{queue = queue:in(Msg, Q2)}; +in(Msg, MQ = #mqueue{len = Len, queue = Q}) -> + maybe_set_alarm(MQ#mqueue{len = Len+1, queue = queue:in(Msg, Q)}). -%%------------------------------------------------------------------------------ -%% @doc -%% Get all messages in queue. -%% -%% @end -%%------------------------------------------------------------------------------ --spec all(mqtt_queue()) -> list(). -all(#mqtt_queue_wrapper { queue = Queue }) -> queue:to_list(Queue). +out(MQ = #mqueue{len = 0, queue = _Q}) -> + {empty, MQ}; +out(MQ = #mqueue{len = Len, queue = Q}) -> + {Result, Q1} = queue:out(Q), + {Result, maybe_clear_alarm(MQ#mqueue{len = Len - 1, queue = Q1})}. -%%------------------------------------------------------------------------------ -%% @doc -%% Clear queue. -%% -%% @end -%%------------------------------------------------------------------------------ --spec clear(mqtt_queue()) -> mqtt_queue(). -clear(Queue) -> Queue#mqtt_queue_wrapper{queue = queue:new()}. +peek(#mqueue{queue = Q}) -> + queue:peek(Q). + +to_list(#mqueue{queue = Q}) -> + queue:to_list(Q). + +maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_watermark = HighWM, alert = false}) + when Len >= HighWM -> + AlarmDescr = io_lib:format("len ~p > high_watermark ~p", [Len, HighWM]), + emqttd_alarm:set_alarm({{queue_high_watermark, Name}, AlarmDescr}), + MQ#mqueue{alert = true}; +maybe_set_alarm(MQ) -> + MQ. + +maybe_clear_alarm(MQ = #mqueue{name = Name, len = Len, low_watermark = LowWM, alert = true}) + when Len =< LowWM -> + emqttd_alarm:clear_alarm({queue_high_watermark, Name}), MQ#mqueue{alert = false}; +maybe_clear_alarm(MQ) -> + MQ.