diff --git a/apps/emqx/include/emqx_channel.hrl b/apps/emqx/include/emqx_channel.hrl index d4362633a..be2448a20 100644 --- a/apps/emqx/include/emqx_channel.hrl +++ b/apps/emqx/include/emqx_channel.hrl @@ -40,3 +40,5 @@ session, will_msg ]). + +-define(EXPIRE_INTERVAL_INFINITE, 4294967295000). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 45a97711d..88e8669cd 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2079,7 +2079,7 @@ maybe_resume_session(#channel{ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) -> case maps:get(expiry_interval, ConnInfo) of - ?UINT_MAX -> + ?EXPIRE_INTERVAL_INFINITE -> {ok, Channel}; I when I > 0 -> {ok, ensure_timer(expire_timer, I, Channel)}; diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 9a3b4e39b..ebcf9c434 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -773,6 +773,7 @@ mark_channel_connected(ChanPid) -> mark_channel_disconnected(ChanPid) -> ?tp(emqx_cm_connected_client_count_dec, #{chan_pid => ChanPid}), ets:delete(?CHAN_LIVE_TAB, ChanPid), + ?tp(emqx_cm_connected_client_count_dec_done, #{chan_pid => ChanPid}), ok. get_connected_client_count() -> diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl index 68f783283..3e9e00c81 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl @@ -60,14 +60,12 @@ -export_type([sess_msg_key/0]). -include("emqx.hrl"). +-include("emqx_channel.hrl"). -include("emqx_persistent_session.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -compile({inline, [is_store_enabled/0]}). -%% 16#FFFFFFFF * 1000 --define(MAX_EXPIRY_INTERVAL, 4294967295000). - %% NOTE: Order is significant because of traversal order of the table. -define(MARKER, 3). -define(DELIVERED, 2). @@ -424,7 +422,7 @@ pending(SessionID, MarkerIds) -> %% @private [MQTT-3.1.2-23] persistent_session_status(#session_store{expiry_interval = 0}) -> not_persistent; -persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) -> +persistent_session_status(#session_store{expiry_interval = ?EXPIRE_INTERVAL_INFINITE}) -> persistent; persistent_session_status(#session_store{expiry_interval = E, ts = TS}) -> case E + TS > erlang:system_time(millisecond) of diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src index 239d9052e..7e6cf5b95 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src @@ -1,6 +1,6 @@ {application, emqx_eviction_agent, [ {description, "EMQX Eviction Agent"}, - {vsn, "5.0.0"}, + {vsn, "5.0.1"}, {registered, [ emqx_eviction_agent_sup, emqx_eviction_agent, diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl index a6097f03d..1369ee969 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent_channel.erl @@ -218,10 +218,10 @@ cancel_expiry_timer(_) -> set_expiry_timer(#{conninfo := ConnInfo} = Channel) -> case maps:get(expiry_interval, ConnInfo) of - ?UINT_MAX -> + ?EXPIRE_INTERVAL_INFINITE -> {ok, Channel}; I when I > 0 -> - Timer = erlang:send_after(timer:seconds(I), self(), expire_session), + Timer = erlang:send_after(I, self(), expire_session), {ok, Channel#{expiry_timer => Timer}}; _ -> {error, should_be_expired} diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl index 22b694d77..0f1b948c7 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_SUITE.erl @@ -177,7 +177,7 @@ t_explicit_session_takeover(Config) -> ?assert(false, "Connection not evicted") end end, - #{?snk_kind := emqx_cm_connected_client_count_dec, chan_pid := ChanPid}, + #{?snk_kind := emqx_cm_connected_client_count_dec_done, chan_pid := ChanPid}, 2000 ), @@ -383,7 +383,7 @@ t_ws_conn(_Config) -> ?assertWaitEvent( ok = emqx_eviction_agent:evict_connections(1), - #{?snk_kind := emqx_cm_connected_client_count_dec}, + #{?snk_kind := emqx_cm_connected_client_count_dec_done}, 1000 ), @@ -418,7 +418,7 @@ t_quic_conn(_Config) -> ?assertWaitEvent( ok = emqx_eviction_agent:evict_connections(1), - #{?snk_kind := emqx_cm_connected_client_count_dec}, + #{?snk_kind := emqx_cm_connected_client_count_dec_done}, 1000 ), diff --git a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl index 3b7ef6672..4ace80893 100644 --- a/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl +++ b/apps/emqx_eviction_agent/test/emqx_eviction_agent_channel_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_channel.hrl"). -define(CLIENT_ID, <<"client_with_session">>). @@ -101,7 +102,7 @@ t_start_infinite_expire(_Config) -> conninfo => #{ clientid => ?CLIENT_ID, receive_maximum => 32, - expiry_interval => ?UINT_MAX + expiry_interval => ?EXPIRE_INTERVAL_INFINITE } }, ?assertMatch(