diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index b706baf99..ea35f59ec 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1114,7 +1114,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); interval(retry_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(retry_interval, Session)); + emqx_session:info(retry_interval, Session); interval(await_timer, #channel{session = Session}) -> timer:seconds(emqx_session:info(await_rel_timeout, Session)); interval(expire_timer, #channel{conninfo = ConnInfo}) -> diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 1f878718c..e2c8438a2 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -248,22 +248,22 @@ create_session(ClientInfo, ConnInfo) -> Session. get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> - #{max_subscriptions => get_conf(Zone, max_subscriptions), - upgrade_qos => get_conf(Zone, upgrade_qos), + #{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions), + upgrade_qos => get_mqtt_conf(Zone, upgrade_qos), max_inflight => MaxInflight, - retry_interval => get_conf(Zone, retry_interval), - await_rel_timeout => get_conf(Zone, await_rel_timeout), + retry_interval => get_mqtt_conf(Zone, retry_interval), + await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout), mqueue => mqueue_confs(Zone) }. mqueue_confs(Zone) -> - #{max_len => get_conf(Zone, max_mqueue_len), - store_qos0 => get_conf(Zone, mqueue_store_qos0), - priorities => get_conf(Zone, mqueue_priorities), - default_priority => get_conf(Zone, mqueue_default_priority) + #{max_len => get_mqtt_conf(Zone, max_mqueue_len), + store_qos0 => get_mqtt_conf(Zone, mqueue_store_qos0), + priorities => get_mqtt_conf(Zone, mqueue_priorities), + default_priority => get_mqtt_conf(Zone, mqueue_default_priority) }. -get_conf(Zone, Key) -> +get_mqtt_conf(Zone, Key) -> emqx_config:get_zone_conf(Zone, [mqtt, Key]). %% @doc Try to takeover a session. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 0dc48e42f..1656aa529 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -164,7 +164,7 @@ fields("node") -> , {"config_files", t(list(string()), "emqx.config_files", [ filename:join([os:getenv("RUNNER_ETC_DIR"), "emqx.conf"]) ])} - , {"global_gc_interval", t(duration_s(), undefined, "15m")} + , {"global_gc_interval", t(duration(), undefined, "15m")} , {"crash_dump_dir", t(file(), "vm_args.-env ERL_CRASH_DUMP", undefined)} , {"dist_net_ticktime", t(duration(), "vm_args.-kernel net_ticktime", "2m")} , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} @@ -288,7 +288,7 @@ fields("mqtt") -> , {"max_subscriptions", maybe_infinity(range(1, inf))} , {"upgrade_qos", t(boolean(), undefined, false)} , {"max_inflight", t(range(1, 65535), undefined, 32)} - , {"retry_interval", t(duration_s(), undefined, "30s")} + , {"retry_interval", t(duration(), undefined, "30s")} , {"max_awaiting_rel", maybe_infinity(integer(), 100)} , {"await_rel_timeout", t(duration_s(), undefined, "300s")} , {"session_expiry_interval", t(duration_s(), undefined, "2h")} diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index d2670e978..41dbc639f 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -178,7 +178,7 @@ init(Opts) -> inflight = emqx_inflight:new(MaxInflight), mqueue = emqx_mqueue:init(QueueOpts), next_pkt_id = 1, - retry_interval = timer:seconds(maps:get(retry_interval, Opts, 30)), + retry_interval = maps:get(retry_interval, Opts, 30000), awaiting_rel = #{}, max_awaiting_rel = maps:get(max_awaiting_rel, Opts, 100), await_rel_timeout = timer:seconds(maps:get(await_rel_timeout, Opts, 300)), @@ -211,7 +211,7 @@ info(inflight_cnt, #session{inflight = Inflight}) -> info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); info(retry_interval, #session{retry_interval = Interval}) -> - Interval div 1000; + Interval; info(mqueue, #session{mqueue = MQueue}) -> MQueue; info(mqueue_len, #session{mqueue = MQueue}) -> diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index ec97daa10..910c80993 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -49,7 +49,7 @@ mqtt_conf() -> peer_cert_as_username => disabled, response_information => [], retain_available => true, - retry_interval => 30, + retry_interval => 30000, server_keepalive => disabled, session_expiry_interval => 7200, shared_subscription => true, diff --git a/apps/emqx/test/emqx_session_SUITE.erl b/apps/emqx/test/emqx_session_SUITE.erl index be0e81de8..dcedd6965 100644 --- a/apps/emqx/test/emqx_session_SUITE.erl +++ b/apps/emqx/test/emqx_session_SUITE.erl @@ -59,7 +59,7 @@ t_session_init(_) -> ?assertEqual(0, emqx_session:info(inflight_cnt, Session)), ?assertEqual(64, emqx_session:info(inflight_max, Session)), ?assertEqual(1, emqx_session:info(next_pkt_id, Session)), - ?assertEqual(30, emqx_session:info(retry_interval, Session)), + ?assertEqual(30000, emqx_session:info(retry_interval, Session)), ?assertEqual(0, emqx_mqueue:len(emqx_session:info(mqueue, Session))), ?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)), ?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)), @@ -73,7 +73,7 @@ t_session_init(_) -> t_session_info(_) -> ?assertMatch(#{subscriptions := #{}, upgrade_qos := false, - retry_interval := 30, + retry_interval := 30000, await_rel_timeout := 300 }, emqx_session:info(session())). @@ -309,7 +309,7 @@ t_enqueue(_) -> t_retry(_) -> Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)], - Session = session(#{retry_interval => 100}), + Session = session(#{retry_interval => 100000}), {ok, Pubs, Session1} = emqx_session:deliver(Delivers, Session), ok = timer:sleep(200), Msgs1 = [{I, emqx_message:set_flag(dup, Msg)} || {I, Msg} <- Pubs], diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 9d08fb8a1..68c481718 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -1391,7 +1391,7 @@ clean_timer(Name, Channel = #channel{timers = Timers}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) -> emqx_keepalive:info(interval, KeepAlive); interval(retry_timer, #channel{session = Session}) -> - timer:seconds(emqx_session:info(retry_interval, Session)); + emqx_session:info(retry_interval, Session); interval(await_timer, #channel{session = Session}) -> timer:seconds(emqx_session:info(await_rel_timeout, Session)).