replace emqttd_opts:g/3 with proplists:get_value/3
This commit is contained in:
parent
1b61ef0856
commit
49a979a25e
|
@ -47,6 +47,8 @@
|
||||||
|
|
||||||
-include("emqttd_protocol.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
|
-import(proplists, [get_value/3]).
|
||||||
|
|
||||||
-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]).
|
-export([new/3, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1, stats/1]).
|
||||||
|
|
||||||
-define(LOW_WM, 0.2).
|
-define(LOW_WM, 0.2).
|
||||||
|
@ -88,19 +90,19 @@
|
||||||
%% @doc New Queue.
|
%% @doc New Queue.
|
||||||
-spec new(iolist(), list(mqueue_option()), fun()) -> mqueue().
|
-spec new(iolist(), list(mqueue_option()), fun()) -> mqueue().
|
||||||
new(Name, Opts, AlarmFun) ->
|
new(Name, Opts, AlarmFun) ->
|
||||||
Type = emqttd_opts:g(type, Opts, simple),
|
Type = get_value(type, Opts, simple),
|
||||||
MaxLen = emqttd_opts:g(max_length, Opts, infinity),
|
MaxLen = get_value(max_length, Opts, infinity),
|
||||||
init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
|
init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
|
||||||
len = 0, max_len = MaxLen,
|
len = 0, max_len = MaxLen,
|
||||||
low_wm = low_wm(MaxLen, Opts),
|
low_wm = low_wm(MaxLen, Opts),
|
||||||
high_wm = high_wm(MaxLen, Opts),
|
high_wm = high_wm(MaxLen, Opts),
|
||||||
qos0 = emqttd_opts:g(queue_qos0, Opts, false),
|
qos0 = get_value(queue_qos0, Opts, false),
|
||||||
alarm_fun = AlarmFun}, Opts).
|
alarm_fun = AlarmFun}, Opts).
|
||||||
|
|
||||||
init_q(MQ = #mqueue{type = simple}, _Opts) ->
|
init_q(MQ = #mqueue{type = simple}, _Opts) ->
|
||||||
MQ#mqueue{q = queue:new()};
|
MQ#mqueue{q = queue:new()};
|
||||||
init_q(MQ = #mqueue{type = priority}, Opts) ->
|
init_q(MQ = #mqueue{type = priority}, Opts) ->
|
||||||
Priorities = emqttd_opts:g(priority, Opts, []),
|
Priorities = get_value(priority, Opts, []),
|
||||||
init_p(Priorities, MQ#mqueue{q = priority_queue:new()}).
|
init_p(Priorities, MQ#mqueue{q = priority_queue:new()}).
|
||||||
|
|
||||||
init_p([], MQ) ->
|
init_p([], MQ) ->
|
||||||
|
@ -116,12 +118,12 @@ insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
|
||||||
low_wm(infinity, _Opts) ->
|
low_wm(infinity, _Opts) ->
|
||||||
infinity;
|
infinity;
|
||||||
low_wm(MaxLen, Opts) ->
|
low_wm(MaxLen, Opts) ->
|
||||||
round(MaxLen * emqttd_opts:g(low_watermark, Opts, ?LOW_WM)).
|
round(MaxLen * get_value(low_watermark, Opts, ?LOW_WM)).
|
||||||
|
|
||||||
high_wm(infinity, _Opts) ->
|
high_wm(infinity, _Opts) ->
|
||||||
infinity;
|
infinity;
|
||||||
high_wm(MaxLen, Opts) ->
|
high_wm(MaxLen, Opts) ->
|
||||||
round(MaxLen * emqttd_opts:g(high_watermark, Opts, ?HIGH_WM)).
|
round(MaxLen * get_value(high_watermark, Opts, ?HIGH_WM)).
|
||||||
|
|
||||||
-spec name(mqueue()) -> iolist().
|
-spec name(mqueue()) -> iolist().
|
||||||
name(#mqueue{name = Name}) ->
|
name(#mqueue{name = Name}) ->
|
||||||
|
|
|
@ -47,6 +47,8 @@
|
||||||
|
|
||||||
-behaviour(gen_server2).
|
-behaviour(gen_server2).
|
||||||
|
|
||||||
|
-import(proplists, [get_value/2, get_value/3]).
|
||||||
|
|
||||||
%% Session API
|
%% Session API
|
||||||
-export([start_link/3, resume/3, info/1, destroy/2]).
|
-export([start_link/3, resume/3, info/1, destroy/2]).
|
||||||
|
|
||||||
|
@ -216,16 +218,16 @@ init([CleanSess, ClientId, ClientPid]) ->
|
||||||
client_pid = ClientPid,
|
client_pid = ClientPid,
|
||||||
subscriptions = dict:new(),
|
subscriptions = dict:new(),
|
||||||
inflight_queue = [],
|
inflight_queue = [],
|
||||||
max_inflight = emqttd_opts:g(max_inflight, SessEnv, 0),
|
max_inflight = get_value(max_inflight, SessEnv, 0),
|
||||||
message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
|
message_queue = emqttd_mqueue:new(ClientId, QEnv, emqttd_alarm:alarm_fun()),
|
||||||
awaiting_rel = #{},
|
awaiting_rel = #{},
|
||||||
awaiting_ack = #{},
|
awaiting_ack = #{},
|
||||||
awaiting_comp = #{},
|
awaiting_comp = #{},
|
||||||
retry_interval = emqttd_opts:g(unack_retry_interval, SessEnv),
|
retry_interval = get_value(unack_retry_interval, SessEnv),
|
||||||
await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
|
await_rel_timeout = get_value(await_rel_timeout, SessEnv),
|
||||||
max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv),
|
max_awaiting_rel = get_value(max_awaiting_rel, SessEnv),
|
||||||
expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600,
|
expired_after = get_value(expired_after, SessEnv) * 3600,
|
||||||
collect_interval = emqttd_opts:g(collect_interval, SessEnv, 0),
|
collect_interval = get_value(collect_interval, SessEnv, 0),
|
||||||
timestamp = os:timestamp()},
|
timestamp = os:timestamp()},
|
||||||
emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
|
emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
|
||||||
%% start statistics
|
%% start statistics
|
||||||
|
|
Loading…
Reference in New Issue