fix(client mgmt api): return expired durable sessions with `is_expired: true`

Fixes https://emqx.atlassian.net/browse/EMQX-12274
This commit is contained in:
Thales Macedo Garitezi 2024-05-13 10:51:24 -03:00
parent 6e03479fd7
commit 6b032faebc
2 changed files with 64 additions and 6 deletions

View File

@ -575,6 +575,11 @@ fields(client) ->
desc => desc =>
<<"Indicates whether the client is connected via bridge">> <<"Indicates whether the client is connected via bridge">>
})}, })},
{is_expired,
hoconsc:mk(boolean(), #{
desc =>
<<"Indicates whether the client session is expired">>
})},
{keepalive, {keepalive,
hoconsc:mk(integer(), #{ hoconsc:mk(integer(), #{
desc => desc =>
@ -985,7 +990,7 @@ do_list_clients_v2(Nodes, _Cursor = #{type := ?CURSOR_TYPE_DS, iterator := Iter0
#{limit := Limit} = Acc0, #{limit := Limit} = Acc0,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
NewCursor = next_ds_cursor(Iter), NewCursor = next_ds_cursor(Iter),
Rows1 = drop_live_and_expired(Rows0), Rows1 = check_for_live_and_expired(Rows0),
Rows = maybe_run_fuzzy_filter(Rows1, QString0), Rows = maybe_run_fuzzy_filter(Rows1, QString0),
Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0), Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0),
Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1), Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1),
@ -1513,7 +1518,7 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
%% through all the nodes. %% through all the nodes.
#{limit := Limit} = QueryState, #{limit := Limit} = QueryState,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit), {Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
Rows = drop_live_and_expired(Rows0), Rows = check_for_live_and_expired(Rows0),
case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of
{enough, NResultAcc} -> {enough, NResultAcc} ->
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true)); emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
@ -1523,14 +1528,15 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
do_persistent_session_query1(NResultAcc, QueryState, Iter) do_persistent_session_query1(NResultAcc, QueryState, Iter)
end. end.
drop_live_and_expired(Rows) -> check_for_live_and_expired(Rows) ->
lists:filtermap( lists:filtermap(
fun({ClientId, Session}) -> fun({ClientId, Session}) ->
case is_expired(Session) orelse is_live_session(ClientId) of case is_live_session(ClientId) of
true -> true ->
false; false;
false -> false ->
{true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}} DSSession = emqx_persistent_session_ds_state:print_session(ClientId),
{true, {ClientId, DSSession#{is_expired => is_expired(Session)}}}
end end
end, end,
Rows Rows
@ -1730,7 +1736,11 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3), ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3),
ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4), ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5), ClientInfoMap6 = maps:put(connected, Connected, ClientInfoMap5),
%% Since this is for the memory session format, and its lifetime is linked to the
%% channel process, we may say it's not expired. Durable sessions will override this
%% field if needed in their format function.
ClientInfoMap = maps:put(is_expired, false, ClientInfoMap6),
#{fields := RequestedFields} = Opts, #{fields := RequestedFields} = Opts,
TimesKeys = [created_at, connected_at, disconnected_at], TimesKeys = [created_at, connected_at, disconnected_at],
@ -1755,6 +1765,7 @@ format_persistent_session_info(
connected => false, connected => false,
durable => true, durable => true,
is_persistent => true, is_persistent => true,
is_expired => maps:get(is_expired, PSInfo, false),
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{})) subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
}; };
format_persistent_session_info(ClientId, PSInfo0) -> format_persistent_session_info(ClientId, PSInfo0) ->
@ -1776,6 +1787,7 @@ format_persistent_session_info(ClientId, PSInfo0) ->
connected_at => CreatedAt, connected_at => CreatedAt,
durable => true, durable => true,
ip_address => IpAddress, ip_address => IpAddress,
is_expired => maps:get(is_expired, PSInfo0, false),
is_persistent => true, is_persistent => true,
port => Port, port => Port,
heap_size => 0, heap_size => 0,

View File

@ -49,6 +49,7 @@ persistent_session_testcases() ->
t_persistent_sessions3, t_persistent_sessions3,
t_persistent_sessions4, t_persistent_sessions4,
t_persistent_sessions5, t_persistent_sessions5,
t_persistent_sessions6,
t_persistent_sessions_subscriptions1, t_persistent_sessions_subscriptions1,
t_list_clients_v2 t_list_clients_v2
]. ].
@ -553,6 +554,51 @@ t_persistent_sessions5(Config) ->
), ),
ok. ok.
%% Checks that expired durable sessions are returned with `is_expired => true'.
t_persistent_sessions6(Config) ->
[N1, _N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
O = #{api_port => APIPort},
ClientId = <<"c1">>,
C1 = connect_client(#{port => Port1, clientid => ClientId, expiry => 1}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := false}]}}},
list_request(APIPort)
)
),
ok = emqtt:disconnect(C1),
%% Wait for session to be considered expired but not GC'ed
ct:sleep(2_000),
assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := true}]}}},
list_request(APIPort)
)
),
C2 = connect_client(#{port => Port1, clientid => ClientId}),
disconnect_and_destroy_session(C2),
ok
end,
[]
),
ok.
%% Check that the output of `/clients/:clientid/subscriptions' has the expected keys. %% Check that the output of `/clients/:clientid/subscriptions' has the expected keys.
t_persistent_sessions_subscriptions1(Config) -> t_persistent_sessions_subscriptions1(Config) ->
[N1, _N2] = ?config(nodes, Config), [N1, _N2] = ?config(nodes, Config),