From 6d6eb42fa3ce8738aa81174f56a1452ec9618538 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 6 May 2024 14:49:39 -0300 Subject: [PATCH] fix(mgmt clients api): hold channel info after client disconnects for display in api Fixes https://emqx.atlassian.net/browse/EMQX-12266 --- apps/emqx/src/emqx_persistent_session_ds.erl | 26 ++++++++++++++----- apps/emqx/src/emqx_persistent_session_ds.hrl | 1 + .../src/emqx_persistent_session_ds_state.erl | 5 ++++ .../src/emqx_mgmt_api_clients.erl | 11 ++++++++ .../test/emqx_mgmt_api_clients_SUITE.erl | 18 +++++++++++++ 5 files changed, 55 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 4bfefe5b6..0cdb700af 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -658,16 +658,17 @@ replay_batch(Srs0, Session0, ClientInfo) -> %%-------------------------------------------------------------------- -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. -disconnect(Session = #{s := S0}, ConnInfo) -> - S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0), - S2 = +disconnect(Session = #{id := Id, s := S0}, ConnInfo) -> + S1 = maybe_set_offline_info(S0, Id), + S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1), + S3 = case ConnInfo of #{expiry_interval := EI} when is_number(EI) -> - emqx_persistent_session_ds_state:set_expiry_interval(EI, S1); + emqx_persistent_session_ds_state:set_expiry_interval(EI, S2); _ -> - S1 + S2 end, - S = emqx_persistent_session_ds_state:commit(S2), + S = emqx_persistent_session_ds_state:commit(S3), {shutdown, Session#{s => S}}. -spec terminate(Reason :: term(), session()) -> ok. @@ -1175,6 +1176,19 @@ try_get_live_session(ClientId) -> not_found end. +-spec maybe_set_offline_info(emqx_persistent_session_ds_state:t(), emqx_types:clientid()) -> + emqx_persistent_session_ds_state:t(). +maybe_set_offline_info(S, Id) -> + case emqx_cm:lookup_client({clientid, Id}) of + [{_Key, ChannelInfo, Stats}] -> + emqx_persistent_session_ds_state:set_offline_info( + #{chan_info => ChannelInfo, stats => Stats}, + S + ); + _ -> + S + end. + %%-------------------------------------------------------------------- %% SeqNo tracking %% -------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 79920629a..12372e5be 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -81,5 +81,6 @@ -define(will_message, will_message). -define(clientinfo, clientinfo). -define(protocol, protocol). +-define(offline_info, offline_info). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 9efffc7ff..d7161c10e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -35,6 +35,7 @@ -export([get_expiry_interval/1, set_expiry_interval/2]). -export([get_clientinfo/1, set_clientinfo/2]). -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). +-export([set_offline_info/2]). -export([get_peername/1, set_peername/2]). -export([get_protocol/1, set_protocol/2]). -export([new_id/1]). @@ -372,6 +373,10 @@ clear_will_message_now(SessionId) when is_binary(SessionId) -> clear_will_message(Rec) -> set_will_message(undefined, Rec). +-spec set_offline_info(_Info :: map(), t()) -> t(). +set_offline_info(Info, Rec) -> + set_meta(?offline_info, Info, Rec). + -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. new_id(Rec) -> LastId = diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 38320780d..8ef4ab98b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1745,6 +1745,17 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) -> format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) -> format_persistent_session_info(ClientId, PSInfo0). +format_persistent_session_info( + _ClientId, #{metadata := #{offline_info := #{chan_info := ChanInfo, stats := Stats}}} = PSInfo +) -> + Info0 = format_channel_info(_Node = undefined, {_Key = undefined, ChanInfo, Stats}, #{ + fields => all + }), + Info0#{ + connected => false, + is_persistent => true, + subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{})) + }; format_persistent_session_info(ClientId, PSInfo0) -> Metadata = maps:get(metadata, PSInfo0, #{}), {ProtoName, ProtoVer} = maps:get(protocol, Metadata), diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 2623e6d4d..481129389 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -1800,6 +1800,11 @@ maybe_json_decode(X) -> {error, _} -> X end. +get_client_request(Port, ClientId) -> + Host = "http://127.0.0.1:" ++ integer_to_list(Port), + Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]), + request(get, Path, []). + list_request(Port) -> list_request(Port, _QueryParams = ""). @@ -1874,6 +1879,19 @@ assert_single_client(Opts) -> {ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}}, lookup_request(ClientId, APIPort) ), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"connected">> := IsConnected, + <<"is_persistent">> := true, + %% contains statistics from disconnect time + <<"recv_pkt">> := _, + %% contains channel info from disconnect time + <<"listener">> := _, + <<"clean_start">> := _ + }}}, + get_client_request(APIPort, ClientId) + ), ok. connect_client(Opts) ->