fix(api/clients): drop expired sessions from durable storage

This commit is contained in:
zmstone 2024-03-14 15:55:11 +01:00
parent e340ab7f1b
commit d8bdb3c9aa
1 changed files with 26 additions and 10 deletions

View File

@ -1094,10 +1094,18 @@ do_persistent_session_count(Cursor, N) ->
case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of
{[], _} -> {[], _} ->
N; N;
{_, NextCursor} -> {[{_Id, Meta}], NextCursor} ->
do_persistent_session_count(NextCursor, N + 1) case is_expired(Meta) of
true ->
do_persistent_session_count(NextCursor, N);
false ->
do_persistent_session_count(NextCursor, N + 1)
end
end. end.
is_expired(#{last_alive_at := LastAliveAt, expiry_interval := ExpiryInterval}) ->
LastAliveAt + ExpiryInterval < erlang:system_time(millisecond).
do_persistent_session_query(ResultAcc, QueryState) -> do_persistent_session_query(ResultAcc, QueryState) ->
case emqx_persistent_message:is_persistence_enabled() of case emqx_persistent_message:is_persistence_enabled() of
true -> true ->
@ -1115,7 +1123,7 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
%% through all the nodes. %% through all the nodes.
#{limit := Limit} = QueryState, #{limit := Limit} = QueryState,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
Rows = remove_live_sessions(Rows0), Rows = drop_live_and_expired(Rows0),
case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of
{enough, NResultAcc} -> {enough, NResultAcc} ->
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true)); emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
@ -1125,19 +1133,27 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
do_persistent_session_query1(NResultAcc, QueryState, Iter) do_persistent_session_query1(NResultAcc, QueryState, Iter)
end. end.
remove_live_sessions(Rows) -> drop_live_and_expired(Rows) ->
lists:filtermap( lists:filtermap(
fun({ClientId, _Session}) -> fun({ClientId, Session}) ->
case emqx_mgmt:lookup_running_client(ClientId, _FormatFn = undefined) of case is_expired(Session) orelse is_live_session(ClientId) of
[] -> true ->
{true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}}; false;
[_ | _] -> false ->
false {true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}}
end end
end, end,
Rows Rows
). ).
%% Return 'true' if there is a live channel found in the global channel registry.
%% NOTE: We cannot afford to query all running nodes to find out if a session is live.
%% i.e. assuming the global session registry is always enabled.
%% Otherwise this function may return `false` for `true` causing the session to appear
%% twice in the query result.
is_live_session(ClientId) ->
[] =/= emqx_cm_registry:lookup_channels(ClientId).
list_client_msgs(MsgType, ClientID, QString) -> list_client_msgs(MsgType, ClientID, QString) ->
case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of
false -> false ->