diff --git a/src/emqttd_mqueue.erl b/src/emqttd_mqueue.erl index 2c6a48ed7..e17768efa 100644 --- a/src/emqttd_mqueue.erl +++ b/src/emqttd_mqueue.erl @@ -47,6 +47,8 @@ -include("emqttd_protocol.hrl"). +-import(proplists, [get_value/3]). + -export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]). -define(LOW_WM, 0.2). @@ -88,19 +90,19 @@ %% @doc New Queue. -spec new(iolist(), list(mqueue_option()), fun()) -> mqueue(). new(Name, Opts, AlarmFun) -> - Type = emqttd_opts:g(type, Opts, simple), - MaxLen = emqttd_opts:g(max_length, Opts, infinity), + Type = get_value(type, Opts, simple), + MaxLen = get_value(max_length, Opts, infinity), init_q(#mqueue{type = Type, name = iolist_to_binary(Name), len = 0, max_len = MaxLen, low_wm = low_wm(MaxLen, Opts), high_wm = high_wm(MaxLen, Opts), - qos0 = emqttd_opts:g(queue_qos0, Opts, false), + qos0 = get_value(queue_qos0, Opts, false), alarm_fun = AlarmFun}, Opts). init_q(MQ = #mqueue{type = simple}, _Opts) -> MQ#mqueue{q = queue:new()}; init_q(MQ = #mqueue{type = priority}, Opts) -> - Priorities = emqttd_opts:g(priority, Opts, []), + Priorities = get_value(priority, Opts, []), init_p(Priorities, MQ#mqueue{q = priority_queue:new()}). init_p([], MQ) -> @@ -116,12 +118,12 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) -> low_wm(infinity, _Opts) -> infinity; low_wm(MaxLen, Opts) -> - round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)). + round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)). high_wm(infinity, _Opts) -> infinity; high_wm(MaxLen, Opts) -> - round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)). + round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)). -spec name(mqueue()) -> iolist(). name(#mqueue{name = Name}) -> diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f8c8852a3..f9779509f 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -47,6 +47,8 @@ -behaviour(gen_server2). +-import(proplists, [get_value/2, get_value/3]). + %% Session API -export([start_link/3, resume/3, info/1, destroy/2]). @@ -216,16 +218,16 @@ init([CleanSess, ClientId, ClientPid]) -> client_pid = ClientPid, subscriptions = dict:new(), inflight_queue = [], - max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0), + max_inflight = get_value(max_inflight, SessEnv, 0), message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()), awaiting_rel = #{}, awaiting_ack = #{}, awaiting_comp = #{}, - retry_interval = emqttd_opts:g(unack_retry_interval, SessEnv), - await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv), - max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv), - expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600, - collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0), + retry_interval = get_value(unack_retry_interval, SessEnv), + await_rel_timeout = get_value(await_rel_timeout, SessEnv), + max_awaiting_rel = get_value(max_awaiting_rel, SessEnv), + expired_after = get_value(expired_after, SessEnv) * 3600, + collect_interval = get_value(collect_interval, SessEnv, 0), timestamp = os:timestamp()}, emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)), %% start statistics