Merge pull request #13030 from thalesmg/fix-ds-return-expired-sessions-api-r57-20240513

fix(client mgmt api): return expired durable sessions with `is_expired: true`
This commit is contained in:
Thales Macedo Garitezi 2024-05-13 12:11:58 -03:00 committed by GitHub
commit a31d05d1ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 64 additions and 6 deletions

View File

@ -575,6 +575,11 @@ fields(client) ->
desc =>
<<"Indicates whether the client is connected via bridge">>
})},
{is_expired,
hoconsc:mk(boolean(), #{
desc =>
<<"Indicates whether the client session is expired">>
})},
{keepalive,
hoconsc:mk(integer(), #{
desc =>
@ -985,7 +990,7 @@ do_list_clients_v2(Nodes, _Cursor = #{type := ?CURSOR_TYPE_DS, iterator := Iter0
#{limit := Limit} = Acc0,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
NewCursor = next_ds_cursor(Iter),
Rows1 = drop_live_and_expired(Rows0),
Rows1 = check_for_live_and_expired(Rows0),
Rows = maybe_run_fuzzy_filter(Rows1, QString0),
Acc1 = maps:update_with(rows, fun(Rs) -> [{undefined, Rows} | Rs] end, Acc0),
Acc = #{n := N} = maps:update_with(n, fun(N) -> N + length(Rows) end, Acc1),
@ -1513,7 +1518,7 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
%% through all the nodes.
#{limit := Limit} = QueryState,
{Rows0, Iter} = emqx_persistent_session_ds_state:session_iterator_next(Iter0, Limit),
Rows = drop_live_and_expired(Rows0),
Rows = check_for_live_and_expired(Rows0),
case emqx_mgmt_api:accumulate_query_rows(undefined, Rows, QueryState, ResultAcc) of
{enough, NResultAcc} ->
emqx_mgmt_api:finalize_query(NResultAcc, emqx_mgmt_api:mark_complete(QueryState, true));
@ -1523,14 +1528,15 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
do_persistent_session_query1(NResultAcc, QueryState, Iter)
end.
drop_live_and_expired(Rows) ->
check_for_live_and_expired(Rows) ->
lists:filtermap(
fun({ClientId, Session}) ->
case is_expired(Session) orelse is_live_session(ClientId) of
case is_live_session(ClientId) of
true ->
false;
false ->
{true, {ClientId, emqx_persistent_session_ds_state:print_session(ClientId)}}
DSSession = emqx_persistent_session_ds_state:print_session(ClientId),
{true, {ClientId, DSSession#{is_expired => is_expired(Session)}}}
end
end,
Rows
@ -1730,7 +1736,11 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2),
ClientInfoMap4 = maps:put(port, Port, ClientInfoMap3),
ClientInfoMap5 = convert_expiry_interval_unit(ClientInfoMap4),
ClientInfoMap = maps:put(connected, Connected, ClientInfoMap5),
ClientInfoMap6 = maps:put(connected, Connected, ClientInfoMap5),
%% Since this is for the memory session format, and its lifetime is linked to the
%% channel process, we may say it's not expired. Durable sessions will override this
%% field if needed in their format function.
ClientInfoMap = maps:put(is_expired, false, ClientInfoMap6),
#{fields := RequestedFields} = Opts,
TimesKeys = [created_at, connected_at, disconnected_at],
@ -1755,6 +1765,7 @@ format_persistent_session_info(
connected => false,
durable => true,
is_persistent => true,
is_expired => maps:get(is_expired, PSInfo, false),
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
};
format_persistent_session_info(ClientId, PSInfo0) ->
@ -1776,6 +1787,7 @@ format_persistent_session_info(ClientId, PSInfo0) ->
connected_at => CreatedAt,
durable => true,
ip_address => IpAddress,
is_expired => maps:get(is_expired, PSInfo0, false),
is_persistent => true,
port => Port,
heap_size => 0,

View File

@ -49,6 +49,7 @@ persistent_session_testcases() ->
t_persistent_sessions3,
t_persistent_sessions4,
t_persistent_sessions5,
t_persistent_sessions6,
t_persistent_sessions_subscriptions1,
t_list_clients_v2
].
@ -553,6 +554,51 @@ t_persistent_sessions5(Config) ->
),
ok.
%% Checks that expired durable sessions are returned with `is_expired => true'.
t_persistent_sessions6(Config) ->
[N1, _N2] = ?config(nodes, Config),
APIPort = 18084,
Port1 = get_mqtt_port(N1, tcp),
?assertMatch({ok, {{_, 200, _}, _, #{<<"data">> := []}}}, list_request(APIPort)),
?check_trace(
begin
O = #{api_port => APIPort},
ClientId = <<"c1">>,
C1 = connect_client(#{port => Port1, clientid => ClientId, expiry => 1}),
assert_single_client(O#{node => N1, clientid => ClientId, status => connected}),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := false}]}}},
list_request(APIPort)
)
),
ok = emqtt:disconnect(C1),
%% Wait for session to be considered expired but not GC'ed
ct:sleep(2_000),
assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
?retry(
100,
20,
?assertMatch(
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := true}]}}},
list_request(APIPort)
)
),
C2 = connect_client(#{port => Port1, clientid => ClientId}),
disconnect_and_destroy_session(C2),
ok
end,
[]
),
ok.
%% Check that the output of `/clients/:clientid/subscriptions' has the expected keys.
t_persistent_sessions_subscriptions1(Config) ->
[N1, _N2] = ?config(nodes, Config),