From d8bdb3c9aadf1fcad6bf37fea8b461306667d488 Mon Sep 17 00:00:00 2001 From: zmstone Date: Thu, 14 Mar 2024 15:55:11 +0100 Subject: [PATCH] fix(api/clients): drop expired sessions from durable storage --- .../src/emqx_mgmt_api_clients.erl | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index b852f4a06..64e5f1ac8 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1094,10 +1094,18 @@ do_persistent_session_count(Cursor, N) -> case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of {[], _} -> N; - {_, NextCursor} -> - do_persistent_session_count(NextCursor, N + 1) + {[{_Id, Meta}], NextCursor} -> + case is_expired(Meta) of + true -> + do_persistent_session_count(NextCursor, N); + false -> + do_persistent_session_count(NextCursor, N + 1) + end end. +is_expired(#{last_alive_at := LastAliveAt, expiry_interval := ExpiryInterval}) -> + LastAliveAt + ExpiryInterval < erlang:system_time(millisecond). + do_persistent_session_query(ResultAcc, QueryState) -> case emqx_persistent_message:is_persistence_enabled() of true -> @@ -1115,7 +1123,7 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) -> %% through all the nodes. #{limit := Limit} = QueryState, {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), - Rows = remove_live_sessions(Rows0), + Rows = drop_live_and_expired(Rows0), case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of {enough, NResultAcc} -> emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true)); @@ -1125,19 +1133,27 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) -> do_persistent_session_query1(NResultAcc, QueryState, Iter) end. -remove_live_sessions(Rows) -> +drop_live_and_expired(Rows) -> lists:filtermap( - fun({ClientId, _Session}) -> - case emqx_mgmt:lookup_running_client(ClientId, _FormatFn = undefined) of - [] -> - {true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}}; - [_ | _] -> - false + fun({ClientId, Session}) -> + case is_expired(Session) orelse is_live_session(ClientId) of + true -> + false; + false -> + {true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}} end end, Rows ). +%% Return 'true' if there is a live channel found in the global channel registry. +%% NOTE: We cannot afford to query all running nodes to find out if a session is live. +%% i.e. assuming the global session registry is always enabled. +%% Otherwise this function may return `false` for `true` causing the session to appear +%% twice in the query result. +is_live_session(ClientId) -> + [] =/= emqx_cm_registry:lookup_channels(ClientId). + list_client_msgs(MsgType, ClientID, QString) -> case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of false ->