Merge pull request #12980 from thalesmg/fix-ds-get-offline-client-stuff-r57-20240506

fix(mgmt clients api): hold channel info after client disconnects for display in API
This commit is contained in:
Thales Macedo Garitezi 2024-05-07 11:18:00 -03:00 committed by GitHub
commit 2989793f4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 55 additions and 6 deletions

View File

@ -658,16 +658,17 @@ replay_batch(Srs0, Session0, ClientInfo) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}. -spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
disconnect(Session = #{s := S0}, ConnInfo) -> disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0), S1 = maybe_set_offline_info(S0, Id),
S2 = S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
S3 =
case ConnInfo of case ConnInfo of
#{expiry_interval := EI} when is_number(EI) -> #{expiry_interval := EI} when is_number(EI) ->
emqx_persistent_session_ds_state:set_expiry_interval(EI, S1); emqx_persistent_session_ds_state:set_expiry_interval(EI, S2);
_ -> _ ->
S1 S2
end, end,
S = emqx_persistent_session_ds_state:commit(S2), S = emqx_persistent_session_ds_state:commit(S3),
{shutdown, Session#{s => S}}. {shutdown, Session#{s => S}}.
-spec terminate(Reason :: term(), session()) -> ok. -spec terminate(Reason :: term(), session()) -> ok.
@ -1175,6 +1176,19 @@ try_get_live_session(ClientId) ->
not_found not_found
end. end.
-spec maybe_set_offline_info(emqx_persistent_session_ds_state:t(), emqx_types:clientid()) ->
emqx_persistent_session_ds_state:t().
maybe_set_offline_info(S, Id) ->
case emqx_cm:lookup_client({clientid, Id}) of
[{_Key, ChannelInfo, Stats}] ->
emqx_persistent_session_ds_state:set_offline_info(
#{chan_info => ChannelInfo, stats => Stats},
S
);
_ ->
S
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% SeqNo tracking %% SeqNo tracking
%% -------------------------------------------------------------------- %% --------------------------------------------------------------------

View File

@ -81,5 +81,6 @@
-define(will_message, will_message). -define(will_message, will_message).
-define(clientinfo, clientinfo). -define(clientinfo, clientinfo).
-define(protocol, protocol). -define(protocol, protocol).
-define(offline_info, offline_info).
-endif. -endif.

View File

@ -35,6 +35,7 @@
-export([get_expiry_interval/1, set_expiry_interval/2]). -export([get_expiry_interval/1, set_expiry_interval/2]).
-export([get_clientinfo/1, set_clientinfo/2]). -export([get_clientinfo/1, set_clientinfo/2]).
-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
-export([set_offline_info/2]).
-export([get_peername/1, set_peername/2]). -export([get_peername/1, set_peername/2]).
-export([get_protocol/1, set_protocol/2]). -export([get_protocol/1, set_protocol/2]).
-export([new_id/1]). -export([new_id/1]).
@ -372,6 +373,10 @@ clear_will_message_now(SessionId) when is_binary(SessionId) ->
clear_will_message(Rec) -> clear_will_message(Rec) ->
set_will_message(undefined, Rec). set_will_message(undefined, Rec).
-spec set_offline_info(_Info :: map(), t()) -> t().
set_offline_info(Info, Rec) ->
set_meta(?offline_info, Info, Rec).
-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. -spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
new_id(Rec) -> new_id(Rec) ->
LastId = LastId =

View File

@ -1745,6 +1745,17 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) -> format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
format_persistent_session_info(ClientId, PSInfo0). format_persistent_session_info(ClientId, PSInfo0).
format_persistent_session_info(
_ClientId, #{metadata := #{offline_info := #{chan_info := ChanInfo, stats := Stats}}} = PSInfo
) ->
Info0 = format_channel_info(_Node = undefined, {_Key = undefined, ChanInfo, Stats}, #{
fields => all
}),
Info0#{
connected => false,
is_persistent => true,
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
};
format_persistent_session_info(ClientId, PSInfo0) -> format_persistent_session_info(ClientId, PSInfo0) ->
Metadata = maps:get(metadata, PSInfo0, #{}), Metadata = maps:get(metadata, PSInfo0, #{}),
{ProtoName, ProtoVer} = maps:get(protocol, Metadata), {ProtoName, ProtoVer} = maps:get(protocol, Metadata),

View File

@ -1800,6 +1800,11 @@ maybe_json_decode(X) ->
{error, _} -> X {error, _} -> X
end. end.
get_client_request(Port, ClientId) ->
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]),
request(get, Path, []).
list_request(Port) -> list_request(Port) ->
list_request(Port, _QueryParams = ""). list_request(Port, _QueryParams = "").
@ -1874,6 +1879,19 @@ assert_single_client(Opts) ->
{ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}}, {ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}},
lookup_request(ClientId, APIPort) lookup_request(ClientId, APIPort)
), ),
?assertMatch(
{ok,
{{_, 200, _}, _, #{
<<"connected">> := IsConnected,
<<"is_persistent">> := true,
%% contains statistics from disconnect time
<<"recv_pkt">> := _,
%% contains channel info from disconnect time
<<"listener">> := _,
<<"clean_start">> := _
}}},
get_client_request(APIPort, ClientId)
),
ok. ok.
connect_client(Opts) -> connect_client(Opts) ->