diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index a32397bb1..c1acfea69 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -575,6 +575,11 @@ fields(client) -> desc => <<"Indicates whether the client is connected via bridge">> })}, + {is_expired, + hoconsc:mk(boolean(), #{ + desc => + <<"Indicates whether the client session is expired">> + })}, {keepalive, hoconsc:mk(integer(), #{ desc => @@ -985,7 +990,7 @@ do_list_clients_v2(Nodes, _Cursor = #{type := ?CURSOR_TYPE_DS, iterator := Iter0 #{limit := Limit} = Acc0, {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), NewCursor = next_ds_cursor(Iter), - Rows1 = drop_live_and_expired(Rows0), + Rows1 = check_for_live_and_expired(Rows0), Rows = maybe_run_fuzzy_filter(Rows1, QString0), Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0), Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1), @@ -1513,7 +1518,7 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) -> %% through all the nodes. #{limit := Limit} = QueryState, {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), - Rows = drop_live_and_expired(Rows0), + Rows = check_for_live_and_expired(Rows0), case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of {enough, NResultAcc} -> emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true)); @@ -1523,14 +1528,15 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) -> do_persistent_session_query1(NResultAcc, QueryState, Iter) end. -drop_live_and_expired(Rows) -> +check_for_live_and_expired(Rows) -> lists:filtermap( fun({ClientId, Session}) -> - case is_expired(Session) orelse is_live_session(ClientId) of + case is_live_session(ClientId) of true -> false; false -> - {true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}} + DSSession = emqx_persistent_session_ds_state:print_session(ClientId), + {true, {ClientId, DSSession#{is_expired => is_expired(Session)}}} end end, Rows @@ -1730,7 +1736,11 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) -> ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3), ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4), - ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5), + ClientInfoMap6 = maps:put(connected, Connected, ClientInfoMap5), + %% Since this is for the memory session format, and its lifetime is linked to the + %% channel process, we may say it's not expired. Durable sessions will override this + %% field if needed in their format function. + ClientInfoMap = maps:put(is_expired, false, ClientInfoMap6), #{fields := RequestedFields} = Opts, TimesKeys = [created_at, connected_at, disconnected_at], @@ -1755,6 +1765,7 @@ format_persistent_session_info( connected => false, durable => true, is_persistent => true, + is_expired => maps:get(is_expired, PSInfo, false), subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{})) }; format_persistent_session_info(ClientId, PSInfo0) -> @@ -1776,6 +1787,7 @@ format_persistent_session_info(ClientId, PSInfo0) -> connected_at => CreatedAt, durable => true, ip_address => IpAddress, + is_expired => maps:get(is_expired, PSInfo0, false), is_persistent => true, port => Port, heap_size => 0, diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 39d775a7a..c354552f9 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -49,6 +49,7 @@ persistent_session_testcases() -> t_persistent_sessions3, t_persistent_sessions4, t_persistent_sessions5, + t_persistent_sessions6, t_persistent_sessions_subscriptions1, t_list_clients_v2 ]. @@ -553,6 +554,51 @@ t_persistent_sessions5(Config) -> ), ok. +%% Checks that expired durable sessions are returned with `is_expired => true'. +t_persistent_sessions6(Config) -> + [N1, _N2] = ?config(nodes, Config), + APIPort = 18084, + Port1 = get_mqtt_port(N1, tcp), + + ?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)), + + ?check_trace( + begin + O = #{api_port => APIPort}, + ClientId = <<"c1">>, + C1 = connect_client(#{port => Port1, clientid => ClientId, expiry => 1}), + assert_single_client(O#{node => N1, clientid => ClientId, status => connected}), + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := false}]}}}, + list_request(APIPort) + ) + ), + + ok = emqtt:disconnect(C1), + %% Wait for session to be considered expired but not GC'ed + ct:sleep(2_000), + assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}), + ?retry( + 100, + 20, + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := true}]}}}, + list_request(APIPort) + ) + ), + + C2 = connect_client(#{port => Port1, clientid => ClientId}), + disconnect_and_destroy_session(C2), + + ok + end, + [] + ), + ok. + %% Check that the output of `/clients/:clientid/subscriptions' has the expected keys. t_persistent_sessions_subscriptions1(Config) -> [N1, _N2] = ?config(nodes, Config),