Define 'MQueue' macro

This commit is contained in:
Feng Lee 2017-03-23 15:15:45 +08:00
parent 690f27a8b4
commit 06100ae6d5
1 changed files with 9 additions and 10 deletions

View File

@ -77,6 +77,8 @@
-export([prioritise_call/4, prioritise_cast/3, prioritise_info/3, -export([prioritise_call/4, prioritise_cast/3, prioritise_info/3,
handle_pre_hibernate/1]). handle_pre_hibernate/1]).
-define(MQueue, emqttd_mqueue).
-record(state, -record(state,
{ {
%% Clean Session Flag %% Clean Session Flag
@ -124,7 +126,7 @@
%% QoS 1 and QoS 2 messages pending transmission to the Client. %% QoS 1 and QoS 2 messages pending transmission to the Client.
%% %%
%% Optionally, QoS 0 messages pending transmission to the Client. %% Optionally, QoS 0 messages pending transmission to the Client.
mqueue :: emqttd_mqueue:mqueue(), mqueue :: ?MQueue:mqueue(),
%% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
awaiting_rel :: map(), awaiting_rel :: map(),
@ -257,12 +259,9 @@ stats(#state{max_subscriptions = MaxSubscriptions,
{subscriptions, maps:size(Subscriptions)}, {subscriptions, maps:size(Subscriptions)},
{max_inflight, MaxInflight}, {max_inflight, MaxInflight},
{inflight_len, Inflight:size()}, {inflight_len, Inflight:size()},
{max_mqueue, case emqttd_mqueue:max_len(MQueue) of {max_mqueue, ?MQueue:max_len(MQueue)},
infinity -> 0; {mqueue_len, ?MQueue:len(MQueue)},
Len -> Len {mqueue_dropped, ?MQueue:dropped(MQueue)},
end},
{mqueue_len, emqttd_mqueue:len(MQueue)},
{mqueue_dropped, emqttd_mqueue:dropped(MQueue)},
{max_awaiting_rel, MaxAwaitingRel}, {max_awaiting_rel, MaxAwaitingRel},
{awaiting_rel_len, maps:size(AwaitingRel)}, {awaiting_rel_len, maps:size(AwaitingRel)},
{deliver_msg, get(deliver_msg)}, {deliver_msg, get(deliver_msg)},
@ -286,7 +285,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
MaxInflight = get_value(max_inflight, Env, 0), MaxInflight = get_value(max_inflight, Env, 0),
EnableStats = get_value(enable_stats, Env, false), EnableStats = get_value(enable_stats, Env, false),
ForceGcCount = emqttd_gc:conn_max_gc_count(), ForceGcCount = emqttd_gc:conn_max_gc_count(),
MQueue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), MQueue = ?MQueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
State = #state{clean_sess = CleanSess, State = #state{clean_sess = CleanSess,
binding = binding(ClientPid), binding = binding(ClientPid),
client_id = ClientId, client_id = ClientId,
@ -698,7 +697,7 @@ dispatch(Msg = #mqtt_message{qos = QoS},
enqueue_msg(Msg, State = #state{mqueue = Q}) -> enqueue_msg(Msg, State = #state{mqueue = Q}) ->
inc_stats(enqueue_msg), inc_stats(enqueue_msg),
State#state{mqueue = emqttd_mqueue:in(Msg, Q)}. State#state{mqueue = ?MQueue:in(Msg, Q)}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Deliver %% Deliver
@ -755,7 +754,7 @@ dequeue(State = #state{inflight = Inflight}) ->
end. end.
dequeue2(State = #state{mqueue = Q}) -> dequeue2(State = #state{mqueue = Q}) ->
case emqttd_mqueue:out(Q) of case ?MQueue:out(Q) of
{empty, _Q} -> {empty, _Q} ->
State; State;
{{value, Msg}, Q1} -> {{value, Msg}, Q1} ->