diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 803104c5e..398db256a 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -59,7 +59,8 @@ -export([new/3, name/1, is_empty/1, is_full/1, len/1, max_len/1, - in/2, out/1]). + in/2, out/1, + stats/1]). -define(LOW_WM, 0.2). @@ -72,6 +73,7 @@ high_wm = ?HIGH_WM, max_len = ?MAX_LEN, qos0 = false, + dropped = 0, alarm_fun}). -type mqueue() :: #mqueue{}. @@ -111,6 +113,9 @@ len(#mqueue{len = Len}) -> Len. max_len(#mqueue{max_len= MaxLen}) -> MaxLen. +stats(#mqueue{max_len = MaxLen, len = Len, dropped = Dropped}) -> + [{max_len, MaxLen}, {len, Len}, {dropped, Dropped}]. + %%------------------------------------------------------------------------------ %% @doc Queue one message. %% @end @@ -122,11 +127,11 @@ in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; %% simply drop the oldest one if queue is full, improve later -in(Msg, MQ = #mqueue{name = Name, q = Q, len = Len, max_len = MaxLen}) +in(Msg, MQ = #mqueue{q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) when Len =:= MaxLen -> - {{value, OldMsg}, Q2} = queue:out(Q), - lager:error("MQueue(~s) drop ~s", [Name, emqttd_message:format(OldMsg)]), - MQ#mqueue{q = queue:in(Msg, Q2)}; + {{value, _OldMsg}, Q2} = queue:out(Q), + %lager:error("MQueue(~s) drop ~s", [Name, emqttd_message:format(OldMsg)]), + MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1}; in(Msg, MQ = #mqueue{q = Q, len = Len}) -> maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}). diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 38ab89a27..a9b4b10ea 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -647,21 +647,23 @@ start_collector(Session = #session{collect_interval = Interval}) -> TRef = erlang:send_after(Interval * 1000, self(), collect_info), Session#session{collect_timer = TRef}. -info(#session{clean_sess = CleanSess, - subscriptions = Subscriptions, - inflight_queue = InflightQueue, - max_inflight = MaxInflight, - message_queue = MessageQueue, - awaiting_rel = AwaitingRel, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - timestamp = CreatedAt}) -> - [{pid, self()}, +info(#session{clean_sess = CleanSess, + subscriptions = Subscriptions, + inflight_queue = InflightQueue, + max_inflight = MaxInflight, + message_queue = MessageQueue, + awaiting_rel = AwaitingRel, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + timestamp = CreatedAt}) -> + Stats = emqttd_mqueue:stats(MessageQueue), + [{pid, self()}, {clean_sess, CleanSess}, {subscriptions, Subscriptions}, {max_inflight, MaxInflight}, {inflight_queue, length(InflightQueue)}, - {message_queue, emqttd_mqueue:len(MessageQueue)}, + {message_queue, proplists:get_value(len, Stats)}, + {message_dropped, proplists:get_value(dropped, Stats)}, {awaiting_rel, maps:size(AwaitingRel)}, {awaiting_ack, maps:size(AwaitingAck)}, {awaiting_comp, maps:size(AwaitingComp)},