fix(mgmt clients api): hold channel info after client disconnects for display in api
Fixes https://emqx.atlassian.net/browse/EMQX-12266
This commit is contained in:
parent
004dc80fb2
commit
6d6eb42fa3
|
@ -658,16 +658,17 @@ replay_batch(Srs0, Session0, ClientInfo) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
|
-spec disconnect(session(), emqx_types:conninfo()) -> {shutdown, session()}.
|
||||||
disconnect(Session = #{s := S0}, ConnInfo) ->
|
disconnect(Session = #{id := Id, s := S0}, ConnInfo) ->
|
||||||
S1 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S0),
|
S1 = maybe_set_offline_info(S0, Id),
|
||||||
S2 =
|
S2 = emqx_persistent_session_ds_state:set_last_alive_at(now_ms(), S1),
|
||||||
|
S3 =
|
||||||
case ConnInfo of
|
case ConnInfo of
|
||||||
#{expiry_interval := EI} when is_number(EI) ->
|
#{expiry_interval := EI} when is_number(EI) ->
|
||||||
emqx_persistent_session_ds_state:set_expiry_interval(EI, S1);
|
emqx_persistent_session_ds_state:set_expiry_interval(EI, S2);
|
||||||
_ ->
|
_ ->
|
||||||
S1
|
S2
|
||||||
end,
|
end,
|
||||||
S = emqx_persistent_session_ds_state:commit(S2),
|
S = emqx_persistent_session_ds_state:commit(S3),
|
||||||
{shutdown, Session#{s => S}}.
|
{shutdown, Session#{s => S}}.
|
||||||
|
|
||||||
-spec terminate(Reason :: term(), session()) -> ok.
|
-spec terminate(Reason :: term(), session()) -> ok.
|
||||||
|
@ -1175,6 +1176,19 @@ try_get_live_session(ClientId) ->
|
||||||
not_found
|
not_found
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec maybe_set_offline_info(emqx_persistent_session_ds_state:t(), emqx_types:clientid()) ->
|
||||||
|
emqx_persistent_session_ds_state:t().
|
||||||
|
maybe_set_offline_info(S, Id) ->
|
||||||
|
case emqx_cm:lookup_client({clientid, Id}) of
|
||||||
|
[{_Key, ChannelInfo, Stats}] ->
|
||||||
|
emqx_persistent_session_ds_state:set_offline_info(
|
||||||
|
#{chan_info => ChannelInfo, stats => Stats},
|
||||||
|
S
|
||||||
|
);
|
||||||
|
_ ->
|
||||||
|
S
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% SeqNo tracking
|
%% SeqNo tracking
|
||||||
%% --------------------------------------------------------------------
|
%% --------------------------------------------------------------------
|
||||||
|
|
|
@ -81,5 +81,6 @@
|
||||||
-define(will_message, will_message).
|
-define(will_message, will_message).
|
||||||
-define(clientinfo, clientinfo).
|
-define(clientinfo, clientinfo).
|
||||||
-define(protocol, protocol).
|
-define(protocol, protocol).
|
||||||
|
-define(offline_info, offline_info).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -35,6 +35,7 @@
|
||||||
-export([get_expiry_interval/1, set_expiry_interval/2]).
|
-export([get_expiry_interval/1, set_expiry_interval/2]).
|
||||||
-export([get_clientinfo/1, set_clientinfo/2]).
|
-export([get_clientinfo/1, set_clientinfo/2]).
|
||||||
-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
|
-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
|
||||||
|
-export([set_offline_info/2]).
|
||||||
-export([get_peername/1, set_peername/2]).
|
-export([get_peername/1, set_peername/2]).
|
||||||
-export([get_protocol/1, set_protocol/2]).
|
-export([get_protocol/1, set_protocol/2]).
|
||||||
-export([new_id/1]).
|
-export([new_id/1]).
|
||||||
|
@ -372,6 +373,10 @@ clear_will_message_now(SessionId) when is_binary(SessionId) ->
|
||||||
clear_will_message(Rec) ->
|
clear_will_message(Rec) ->
|
||||||
set_will_message(undefined, Rec).
|
set_will_message(undefined, Rec).
|
||||||
|
|
||||||
|
-spec set_offline_info(_Info :: map(), t()) -> t().
|
||||||
|
set_offline_info(Info, Rec) ->
|
||||||
|
set_meta(?offline_info, Info, Rec).
|
||||||
|
|
||||||
-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
|
-spec new_id(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}.
|
||||||
new_id(Rec) ->
|
new_id(Rec) ->
|
||||||
LastId =
|
LastId =
|
||||||
|
|
|
@ -1745,6 +1745,17 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}, Opts) ->
|
||||||
format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
|
format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
|
||||||
format_persistent_session_info(ClientId, PSInfo0).
|
format_persistent_session_info(ClientId, PSInfo0).
|
||||||
|
|
||||||
|
format_persistent_session_info(
|
||||||
|
_ClientId, #{metadata := #{offline_info := #{chan_info := ChanInfo, stats := Stats}}} = PSInfo
|
||||||
|
) ->
|
||||||
|
Info0 = format_channel_info(_Node = undefined, {_Key = undefined, ChanInfo, Stats}, #{
|
||||||
|
fields => all
|
||||||
|
}),
|
||||||
|
Info0#{
|
||||||
|
connected => false,
|
||||||
|
is_persistent => true,
|
||||||
|
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo, #{}))
|
||||||
|
};
|
||||||
format_persistent_session_info(ClientId, PSInfo0) ->
|
format_persistent_session_info(ClientId, PSInfo0) ->
|
||||||
Metadata = maps:get(metadata, PSInfo0, #{}),
|
Metadata = maps:get(metadata, PSInfo0, #{}),
|
||||||
{ProtoName, ProtoVer} = maps:get(protocol, Metadata),
|
{ProtoName, ProtoVer} = maps:get(protocol, Metadata),
|
||||||
|
|
|
@ -1800,6 +1800,11 @@ maybe_json_decode(X) ->
|
||||||
{error, _} -> X
|
{error, _} -> X
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_client_request(Port, ClientId) ->
|
||||||
|
Host = "http://127.0.0.1:" ++ integer_to_list(Port),
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(Host, ["clients", ClientId]),
|
||||||
|
request(get, Path, []).
|
||||||
|
|
||||||
list_request(Port) ->
|
list_request(Port) ->
|
||||||
list_request(Port, _QueryParams = "").
|
list_request(Port, _QueryParams = "").
|
||||||
|
|
||||||
|
@ -1874,6 +1879,19 @@ assert_single_client(Opts) ->
|
||||||
{ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}},
|
{ok, {{_, 200, _}, _, #{<<"connected">> := IsConnected}}},
|
||||||
lookup_request(ClientId, APIPort)
|
lookup_request(ClientId, APIPort)
|
||||||
),
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_, 200, _}, _, #{
|
||||||
|
<<"connected">> := IsConnected,
|
||||||
|
<<"is_persistent">> := true,
|
||||||
|
%% contains statistics from disconnect time
|
||||||
|
<<"recv_pkt">> := _,
|
||||||
|
%% contains channel info from disconnect time
|
||||||
|
<<"listener">> := _,
|
||||||
|
<<"clean_start">> := _
|
||||||
|
}}},
|
||||||
|
get_client_request(APIPort, ClientId)
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
connect_client(Opts) ->
|
connect_client(Opts) ->
|
||||||
|
|
Loading…
Reference in New Issue