fix(ds clients mgmt api): add more fields to disconnected durable sessions
Fixes https://emqx.atlassian.net/browse/EMQX-12369
This commit is contained in:
parent
1c34a84dd2
commit
e7c47f4ec0
|
@ -1182,7 +1182,12 @@ maybe_set_offline_info(S, Id) ->
|
||||||
case emqx_cm:lookup_client({clientid, Id}) of
|
case emqx_cm:lookup_client({clientid, Id}) of
|
||||||
[{_Key, ChannelInfo, Stats}] ->
|
[{_Key, ChannelInfo, Stats}] ->
|
||||||
emqx_persistent_session_ds_state:set_offline_info(
|
emqx_persistent_session_ds_state:set_offline_info(
|
||||||
#{chan_info => ChannelInfo, stats => Stats},
|
#{
|
||||||
|
chan_info => ChannelInfo,
|
||||||
|
stats => Stats,
|
||||||
|
disconnected_at => erlang:system_time(millisecond),
|
||||||
|
last_connected_to => node()
|
||||||
|
},
|
||||||
S
|
S
|
||||||
);
|
);
|
||||||
_ ->
|
_ ->
|
||||||
|
|
|
@ -1529,13 +1529,13 @@ do_persistent_session_query1(ResultAcc, QueryState, Iter0) ->
|
||||||
|
|
||||||
check_for_live_and_expired(Rows) ->
|
check_for_live_and_expired(Rows) ->
|
||||||
lists:filtermap(
|
lists:filtermap(
|
||||||
fun({ClientId, Session}) ->
|
fun({ClientId, _Session}) ->
|
||||||
case is_live_session(ClientId) of
|
case is_live_session(ClientId) of
|
||||||
true ->
|
true ->
|
||||||
false;
|
false;
|
||||||
false ->
|
false ->
|
||||||
DSSession = emqx_persistent_session_ds_state:print_session(ClientId),
|
DSSession = emqx_persistent_session_ds_state:print_session(ClientId),
|
||||||
{true, {ClientId, DSSession#{is_expired => is_expired(Session)}}}
|
{true, {ClientId, DSSession}}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
Rows
|
Rows
|
||||||
|
@ -1755,18 +1755,32 @@ format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
|
||||||
format_persistent_session_info(ClientId, PSInfo0).
|
format_persistent_session_info(ClientId, PSInfo0).
|
||||||
|
|
||||||
format_persistent_session_info(
|
format_persistent_session_info(
|
||||||
_ClientId, #{metadata := #{offline_info := #{chan_info := ChanInfo, stats := Stats}}} = PSInfo
|
_ClientId,
|
||||||
|
#{
|
||||||
|
metadata := #{offline_info := #{chan_info := ChanInfo, stats := Stats} = OfflineInfo} =
|
||||||
|
Metadata
|
||||||
|
} =
|
||||||
|
PSInfo
|
||||||
) ->
|
) ->
|
||||||
Info0 = format_channel_info(_Node = undefined, {_Key = undefined, ChanInfo, Stats}, #{
|
Info0 = format_channel_info(_Node = undefined, {_Key = undefined, ChanInfo, Stats}, #{
|
||||||
fields => all
|
fields => all
|
||||||
}),
|
}),
|
||||||
|
LastConnectedToNode = maps:get(last_connected_to, OfflineInfo, undefined),
|
||||||
|
DisconnectedAt = maps:get(disconnected_at, OfflineInfo, undefined),
|
||||||
|
%% `created_at' and `connected_at' have already been formatted by this point.
|
||||||
|
Info = result_format_time_fun(
|
||||||
|
disconnected_at,
|
||||||
Info0#{
|
Info0#{
|
||||||
connected => false,
|
connected => false,
|
||||||
|
disconnected_at => DisconnectedAt,
|
||||||
durable => true,
|
durable => true,
|
||||||
is_persistent => true,
|
is_persistent => true,
|
||||||
is_expired => maps:get(is_expired, PSInfo, false),
|
is_expired => is_expired(Metadata),
|
||||||
|
node => LastConnectedToNode,
|
||||||
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
|
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
|
||||||
};
|
}
|
||||||
|
),
|
||||||
|
result_format_undefined_to_null(Info);
|
||||||
format_persistent_session_info(ClientId, PSInfo0) ->
|
format_persistent_session_info(ClientId, PSInfo0) ->
|
||||||
Metadata = maps:get(metadata, PSInfo0, #{}),
|
Metadata = maps:get(metadata, PSInfo0, #{}),
|
||||||
{ProtoName, ProtoVer} = maps:get(protocol, Metadata),
|
{ProtoName, ProtoVer} = maps:get(protocol, Metadata),
|
||||||
|
@ -1786,7 +1800,7 @@ format_persistent_session_info(ClientId, PSInfo0) ->
|
||||||
connected_at => CreatedAt,
|
connected_at => CreatedAt,
|
||||||
durable => true,
|
durable => true,
|
||||||
ip_address => IpAddress,
|
ip_address => IpAddress,
|
||||||
is_expired => maps:get(is_expired, PSInfo0, false),
|
is_expired => is_expired(Metadata),
|
||||||
is_persistent => true,
|
is_persistent => true,
|
||||||
port => Port,
|
port => Port,
|
||||||
heap_size => 0,
|
heap_size => 0,
|
||||||
|
|
|
@ -581,14 +581,33 @@ t_persistent_sessions6(Config) ->
|
||||||
%% Wait for session to be considered expired but not GC'ed
|
%% Wait for session to be considered expired but not GC'ed
|
||||||
ct:sleep(2_000),
|
ct:sleep(2_000),
|
||||||
assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
|
assert_single_client(O#{node => N1, clientid => ClientId, status => disconnected}),
|
||||||
|
N1Bin = atom_to_binary(N1),
|
||||||
?retry(
|
?retry(
|
||||||
100,
|
100,
|
||||||
20,
|
20,
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"is_expired">> := true}]}}},
|
{ok,
|
||||||
|
{{_, 200, _}, _, #{
|
||||||
|
<<"data">> := [
|
||||||
|
#{
|
||||||
|
<<"is_expired">> := true,
|
||||||
|
<<"node">> := N1Bin,
|
||||||
|
<<"disconnected_at">> := <<_/binary>>
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}}},
|
||||||
list_request(APIPort)
|
list_request(APIPort)
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_, 200, _}, _, #{
|
||||||
|
<<"is_expired">> := true,
|
||||||
|
<<"node">> := N1Bin,
|
||||||
|
<<"disconnected_at">> := <<_/binary>>
|
||||||
|
}}},
|
||||||
|
get_client_request(APIPort, ClientId)
|
||||||
|
),
|
||||||
|
|
||||||
C2 = connect_client(#{port => Port1, clientid => ClientId}),
|
C2 = connect_client(#{port => Port1, clientid => ClientId}),
|
||||||
disconnect_and_destroy_session(C2),
|
disconnect_and_destroy_session(C2),
|
||||||
|
|
Loading…
Reference in New Issue