refactor(config): change mqtt.retry_interval to ms
This commit is contained in:
parent
048ba1e067
commit
93e1257045
|
@ -1114,7 +1114,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
|||
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
||||
emqx_keepalive:info(interval, KeepAlive);
|
||||
interval(retry_timer, #channel{session = Session}) ->
|
||||
timer:seconds(emqx_session:info(retry_interval, Session));
|
||||
emqx_session:info(retry_interval, Session);
|
||||
interval(await_timer, #channel{session = Session}) ->
|
||||
timer:seconds(emqx_session:info(await_rel_timeout, Session));
|
||||
interval(expire_timer, #channel{conninfo = ConnInfo}) ->
|
||||
|
|
|
@ -248,22 +248,22 @@ create_session(ClientInfo, ConnInfo) ->
|
|||
Session.
|
||||
|
||||
get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
|
||||
#{max_subscriptions => get_conf(Zone, max_subscriptions),
|
||||
upgrade_qos => get_conf(Zone, upgrade_qos),
|
||||
#{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
|
||||
upgrade_qos => get_mqtt_conf(Zone, upgrade_qos),
|
||||
max_inflight => MaxInflight,
|
||||
retry_interval => get_conf(Zone, retry_interval),
|
||||
await_rel_timeout => get_conf(Zone, await_rel_timeout),
|
||||
retry_interval => get_mqtt_conf(Zone, retry_interval),
|
||||
await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout),
|
||||
mqueue => mqueue_confs(Zone)
|
||||
}.
|
||||
|
||||
mqueue_confs(Zone) ->
|
||||
#{max_len => get_conf(Zone, max_mqueue_len),
|
||||
store_qos0 => get_conf(Zone, mqueue_store_qos0),
|
||||
priorities => get_conf(Zone, mqueue_priorities),
|
||||
default_priority => get_conf(Zone, mqueue_default_priority)
|
||||
#{max_len => get_mqtt_conf(Zone, max_mqueue_len),
|
||||
store_qos0 => get_mqtt_conf(Zone, mqueue_store_qos0),
|
||||
priorities => get_mqtt_conf(Zone, mqueue_priorities),
|
||||
default_priority => get_mqtt_conf(Zone, mqueue_default_priority)
|
||||
}.
|
||||
|
||||
get_conf(Zone, Key) ->
|
||||
get_mqtt_conf(Zone, Key) ->
|
||||
emqx_config:get_zone_conf(Zone, [mqtt, Key]).
|
||||
|
||||
%% @doc Try to takeover a session.
|
||||
|
|
|
@ -164,7 +164,7 @@ fields("node") ->
|
|||
, {"config_files", t(list(string()), "emqx.config_files",
|
||||
[ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"])
|
||||
])}
|
||||
, {"global_gc_interval", t(duration_s(), undefined, "15m")}
|
||||
, {"global_gc_interval", t(duration(), undefined, "15m")}
|
||||
, {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)}
|
||||
, {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")}
|
||||
, {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
|
||||
|
@ -288,7 +288,7 @@ fields("mqtt") ->
|
|||
, {"max_subscriptions", maybe_infinity(range(1, inf))}
|
||||
, {"upgrade_qos", t(boolean(), undefined, false)}
|
||||
, {"max_inflight", t(range(1, 65535), undefined, 32)}
|
||||
, {"retry_interval", t(duration_s(), undefined, "30s")}
|
||||
, {"retry_interval", t(duration(), undefined, "30s")}
|
||||
, {"max_awaiting_rel", maybe_infinity(integer(), 100)}
|
||||
, {"await_rel_timeout", t(duration_s(), undefined, "300s")}
|
||||
, {"session_expiry_interval", t(duration_s(), undefined, "2h")}
|
||||
|
|
|
@ -178,7 +178,7 @@ init(Opts) ->
|
|||
inflight = emqx_inflight:new(MaxInflight),
|
||||
mqueue = emqx_mqueue:init(QueueOpts),
|
||||
next_pkt_id = 1,
|
||||
retry_interval = timer:seconds(maps:get(retry_interval, Opts, 30)),
|
||||
retry_interval = maps:get(retry_interval, Opts, 30000),
|
||||
awaiting_rel = #{},
|
||||
max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100),
|
||||
await_rel_timeout = timer:seconds(maps:get(await_rel_timeout, Opts, 300)),
|
||||
|
@ -211,7 +211,7 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
|
|||
info(inflight_max, #session{inflight = Inflight}) ->
|
||||
emqx_inflight:max_size(Inflight);
|
||||
info(retry_interval, #session{retry_interval = Interval}) ->
|
||||
Interval div 1000;
|
||||
Interval;
|
||||
info(mqueue, #session{mqueue = MQueue}) ->
|
||||
MQueue;
|
||||
info(mqueue_len, #session{mqueue = MQueue}) ->
|
||||
|
|
|
@ -49,7 +49,7 @@ mqtt_conf() ->
|
|||
peer_cert_as_username => disabled,
|
||||
response_information => [],
|
||||
retain_available => true,
|
||||
retry_interval => 30,
|
||||
retry_interval => 30000,
|
||||
server_keepalive => disabled,
|
||||
session_expiry_interval => 7200,
|
||||
shared_subscription => true,
|
||||
|
|
|
@ -59,7 +59,7 @@ t_session_init(_) ->
|
|||
?assertEqual(0, emqx_session:info(inflight_cnt, Session)),
|
||||
?assertEqual(64, emqx_session:info(inflight_max, Session)),
|
||||
?assertEqual(1, emqx_session:info(next_pkt_id, Session)),
|
||||
?assertEqual(30, emqx_session:info(retry_interval, Session)),
|
||||
?assertEqual(30000, emqx_session:info(retry_interval, Session)),
|
||||
?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))),
|
||||
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
|
||||
?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
|
||||
|
@ -73,7 +73,7 @@ t_session_init(_) ->
|
|||
t_session_info(_) ->
|
||||
?assertMatch(#{subscriptions := #{},
|
||||
upgrade_qos := false,
|
||||
retry_interval := 30,
|
||||
retry_interval := 30000,
|
||||
await_rel_timeout := 300
|
||||
}, emqx_session:info(session())).
|
||||
|
||||
|
@ -309,7 +309,7 @@ t_enqueue(_) ->
|
|||
|
||||
t_retry(_) ->
|
||||
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
|
||||
Session = session(#{retry_interval => 100}),
|
||||
Session = session(#{retry_interval => 100000}),
|
||||
{ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session),
|
||||
ok = timer:sleep(200),
|
||||
Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs],
|
||||
|
|
|
@ -1391,7 +1391,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
|
|||
interval(alive_timer, #channel{keepalive = KeepAlive}) ->
|
||||
emqx_keepalive:info(interval, KeepAlive);
|
||||
interval(retry_timer, #channel{session = Session}) ->
|
||||
timer:seconds(emqx_session:info(retry_interval, Session));
|
||||
emqx_session:info(retry_interval, Session);
|
||||
interval(await_timer, #channel{session = Session}) ->
|
||||
timer:seconds(emqx_session:info(await_rel_timeout, Session)).
|
||||
|
||||
|
|
Loading…
Reference in New Issue