Merge pull request #12710 from zmstone/0314-fix-api-clients-drop-ds-sessions-if-expired
fix(api/clients): drop expired sessions from durable storage
This commit is contained in:
commit
bfca73c0e5
|
@ -1094,10 +1094,18 @@ do_persistent_session_count(Cursor, N) ->
|
|||
case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of
|
||||
{[], _} ->
|
||||
N;
|
||||
{_, NextCursor} ->
|
||||
do_persistent_session_count(NextCursor, N + 1)
|
||||
{[{_Id, Meta}], NextCursor} ->
|
||||
case is_expired(Meta) of
|
||||
true ->
|
||||
do_persistent_session_count(NextCursor, N);
|
||||
false ->
|
||||
do_persistent_session_count(NextCursor, N + 1)
|
||||
end
|
||||
end.
|
||||
|
||||
is_expired(#{last_alive_at := LastAliveAt, expiry_interval := ExpiryInterval}) ->
|
||||
LastAliveAt + ExpiryInterval < erlang:system_time(millisecond).
|
||||
|
||||
do_persistent_session_query(ResultAcc, QueryState) ->
|
||||
case emqx_persistent_message:is_persistence_enabled() of
|
||||
true ->
|
||||
|
@ -1115,7 +1123,7 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
|
|||
%% through all the nodes.
|
||||
#{limit := Limit} = QueryState,
|
||||
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
|
||||
Rows = remove_live_sessions(Rows0),
|
||||
Rows = drop_live_and_expired(Rows0),
|
||||
case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of
|
||||
{enough, NResultAcc} ->
|
||||
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
|
||||
|
@ -1125,19 +1133,27 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
|
|||
do_persistent_session_query1(NResultAcc, QueryState, Iter)
|
||||
end.
|
||||
|
||||
remove_live_sessions(Rows) ->
|
||||
drop_live_and_expired(Rows) ->
|
||||
lists:filtermap(
|
||||
fun({ClientId, _Session}) ->
|
||||
case emqx_mgmt:lookup_running_client(ClientId, _FormatFn = undefined) of
|
||||
[] ->
|
||||
{true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}};
|
||||
[_ | _] ->
|
||||
false
|
||||
fun({ClientId, Session}) ->
|
||||
case is_expired(Session) orelse is_live_session(ClientId) of
|
||||
true ->
|
||||
false;
|
||||
false ->
|
||||
{true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}}
|
||||
end
|
||||
end,
|
||||
Rows
|
||||
).
|
||||
|
||||
%% Return 'true' if there is a live channel found in the global channel registry.
|
||||
%% NOTE: We cannot afford to query all running nodes to find out if a session is live.
|
||||
%% i.e. assuming the global session registry is always enabled.
|
||||
%% Otherwise this function may return `false` for `true` causing the session to appear
|
||||
%% twice in the query result.
|
||||
is_live_session(ClientId) ->
|
||||
[] =/= emqx_cm_registry:lookup_channels(ClientId).
|
||||
|
||||
list_client_msgs(MsgType, ClientID, QString) ->
|
||||
case emqx_mgmt_api:parse_cont_pager_params(QString, cont_encoding(MsgType)) of
|
||||
false ->
|
||||
|
|
Loading…
Reference in New Issue