From 0704cbc986c37ff507d39c9e9ff953ef76a17767 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 23 Jul 2021 14:11:32 +0800 Subject: [PATCH] refactor(config): change mqtt.session_expiry_interval to ms --- apps/emqx/src/emqx_channel.erl | 6 +++--- apps/emqx/src/emqx_schema.erl | 2 +- apps/emqx/test/emqx_channel_SUITE.erl | 2 +- apps/emqx_management/src/emqx_mgmt_cli.erl | 21 +++++++++++-------- apps/emqx_modules/src/emqx_mod_presence.erl | 2 +- .../emqx_rule_engine/src/emqx_rule_events.erl | 2 +- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d4c020185..a031b13c5 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1118,7 +1118,7 @@ interval(retry_timer, #channel{session = Session}) -> interval(await_timer, #channel{session = Session}) -> emqx_session:info(await_rel_timeout, Session); interval(expire_timer, #channel{conninfo = ConnInfo}) -> - timer:seconds(maps:get(expiry_interval, ConnInfo)); + maps:get(expiry_interval, ConnInfo); interval(will_timer, #channel{will_msg = WillMsg}) -> timer:seconds(will_delay_interval(WillMsg)). @@ -1176,7 +1176,7 @@ enrich_conninfo(ConnPkt = #mqtt_packet_connect{ %% If the Session Expiry Interval is absent the value 0 is used. expiry_interval(_, #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, properties = ConnProps}) -> - emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0); + timer:seconds(emqx_mqtt_props:get('Session-Expiry-Interval', ConnProps, 0)); expiry_interval(Zone, #mqtt_packet_connect{clean_start = false}) -> get_mqtt_conf(Zone, session_expiry_interval); expiry_interval(_, #mqtt_packet_connect{clean_start = true}) -> @@ -1615,7 +1615,7 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> case maps:get(expiry_interval, ConnInfo) of ?UINT_MAX -> {ok, Channel}; I when I > 0 -> - {ok, ensure_timer(expire_timer, timer:seconds(I), Channel)}; + {ok, ensure_timer(expire_timer, I, Channel)}; _ -> shutdown(Reason, Channel) end. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 12f3f0b06..818f659e8 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -291,7 +291,7 @@ fields("mqtt") -> , {"retry_interval", t(duration(), undefined, "30s")} , {"max_awaiting_rel", maybe_infinity(integer(), 100)} , {"await_rel_timeout", t(duration(), undefined, "300s")} - , {"session_expiry_interval", t(duration_s(), undefined, "2h")} + , {"session_expiry_interval", t(duration(), undefined, "2h")} , {"max_mqueue_len", maybe_infinity(range(0, inf), 1000)} , {"mqueue_priorities", maybe_disabled(map())} , {"mqueue_default_priority", t(union(highest, lowest), undefined, lowest)} diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 1c4bf8f76..7b2161882 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -51,7 +51,7 @@ mqtt_conf() -> retain_available => true, retry_interval => 30000, server_keepalive => disabled, - session_expiry_interval => 7200, + session_expiry_interval => 7200000, shared_subscription => true, strict_mode => false, upgrade_qos => false, diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 8d759eb18..9e07dbe2f 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -590,18 +590,21 @@ print({client, {ClientId, ChanPid}}) -> InfoKeys = [clientid, username, peername, clean_start, keepalive, expiry_interval, subscriptions_cnt, inflight_cnt, awaiting_rel_cnt, send_msg, mqueue_len, mqueue_dropped, - connected, created_at, connected_at] ++ case maps:is_key(disconnected_at, Info) of - true -> [disconnected_at]; - false -> [] - end, + connected, created_at, connected_at] ++ + case maps:is_key(disconnected_at, Info) of + true -> [disconnected_at]; + false -> [] + end, + Info1 = Info#{expiry_interval => maps:get(expiry_interval, Info) div 1000}, emqx_ctl:print("Client(~s, username=~s, peername=~s, " "clean_start=~s, keepalive=~w, session_expiry_interval=~w, " "subscriptions=~w, inflight=~w, awaiting_rel=~w, delivered_msgs=~w, enqueued_msgs=~w, dropped_msgs=~w, " - "connected=~s, created_at=~w, connected_at=~w" ++ case maps:is_key(disconnected_at, Info) of - true -> ", disconnected_at=~w)~n"; - false -> ")~n" - end, - [format(K, maps:get(K, Info)) || K <- InfoKeys]); + "connected=~s, created_at=~w, connected_at=~w" ++ + case maps:is_key(disconnected_at, Info1) of + true -> ", disconnected_at=~w)~n"; + false -> ")~n" + end, + [format(K, maps:get(K, Info1)) || K <- InfoKeys]); print({emqx_route, #route{topic = Topic, dest = {_, Node}}}) -> emqx_ctl:print("~s -> ~s~n", [Topic, Node]); diff --git a/apps/emqx_modules/src/emqx_mod_presence.erl b/apps/emqx_modules/src/emqx_mod_presence.erl index 738d1c892..729316663 100644 --- a/apps/emqx_modules/src/emqx_mod_presence.erl +++ b/apps/emqx_modules/src/emqx_mod_presence.erl @@ -102,7 +102,7 @@ connected_presence(#{peerhost := PeerHost, keepalive => Keepalive, connack => 0, %% Deprecated? clean_start => CleanStart, - expiry_interval => ExpiryInterval, + expiry_interval => ExpiryInterval div 1000, connected_at => ConnectedAt, ts => erlang:system_time(millisecond) }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index 824cfdcb1..5865f224c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -182,7 +182,7 @@ eventmsg_connected(_ClientInfo = #{ keepalive => Keepalive, clean_start => CleanStart, receive_maximum => RcvMax, - expiry_interval => ExpiryInterval, + expiry_interval => ExpiryInterval div 1000, is_bridge => IsBridge, conn_props => printable_maps(ConnProps), connected_at => ConnectedAt