diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 85b027781..41ee456f4 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -77,6 +77,8 @@ -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3, handle_pre_hibernate/1]). +-define(MQueue, emqttd_mqueue). + -record(state, { %% Clean Session Flag @@ -124,7 +126,7 @@ %% QoS 1 and QoS 2 messages pending transmission to the Client. %% %% Optionally, QoS 0 messages pending transmission to the Client. - mqueue :: emqttd_mqueue:mqueue(), + mqueue :: ?MQueue:mqueue(), %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. awaiting_rel :: map(), @@ -257,12 +259,9 @@ stats(#state{max_subscriptions = MaxSubscriptions, {subscriptions, maps:size(Subscriptions)}, {max_inflight, MaxInflight}, {inflight_len, Inflight:size()}, - {max_mqueue, case emqttd_mqueue:max_len(MQueue) of - infinity -> 0; - Len -> Len - end}, - {mqueue_len, emqttd_mqueue:len(MQueue)}, - {mqueue_dropped, emqttd_mqueue:dropped(MQueue)}, + {max_mqueue, ?MQueue:max_len(MQueue)}, + {mqueue_len, ?MQueue:len(MQueue)}, + {mqueue_dropped, ?MQueue:dropped(MQueue)}, {max_awaiting_rel, MaxAwaitingRel}, {awaiting_rel_len, maps:size(AwaitingRel)}, {deliver_msg, get(deliver_msg)}, @@ -286,7 +285,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) -> MaxInflight = get_value(max_inflight, Env, 0), EnableStats = get_value(enable_stats, Env, false), ForceGcCount = emqttd_gc:conn_max_gc_count(), - MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), + MQueue = ?MQueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), State = #state{clean_sess = CleanSess, binding = binding(ClientPid), client_id = ClientId, @@ -698,7 +697,7 @@ dispatch(Msg = #mqtt_message{qos = QoS}, enqueue_msg(Msg, State = #state{mqueue = Q}) -> inc_stats(enqueue_msg), - State#state{mqueue = emqttd_mqueue:in(Msg, Q)}. + State#state{mqueue = ?MQueue:in(Msg, Q)}. %%-------------------------------------------------------------------- %% Deliver @@ -755,7 +754,7 @@ dequeue(State = #state{inflight = Inflight}) -> end. dequeue2(State = #state{mqueue = Q}) -> - case emqttd_mqueue:out(Q) of + case ?MQueue:out(Q) of {empty, _Q} -> State; {{value, Msg}, Q1} ->