Merge pull request #12443 from ieQu1/dev/sessds-list-subs

feat(sessds): Expose subscriptions in the REST API
This commit is contained in:
ieQu1 2024-02-01 18:09:49 +01:00 committed by GitHub
commit 494aa71156
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 180 additions and 27 deletions

View File

@ -193,7 +193,9 @@ info(alias_maximum, #channel{alias_maximum = Limits}) ->
info(timers, #channel{timers = Timers}) ->
Timers;
info(session_state, #channel{session = Session}) ->
Session.
Session;
info(impl, #channel{session = Session}) ->
emqx_session:info(impl, Session).
set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}.

View File

@ -66,6 +66,11 @@
terminate/2
]).
%% Managment APIs:
-export([
list_client_subscriptions/1
]).
%% session table operations
-export([create_tables/0, sync/1]).
@ -243,18 +248,25 @@ info(await_rel_timeout, #{props := Conf}) ->
stats(Session) ->
info(?STATS_KEYS, Session).
%% Debug/troubleshooting
%% Used by management API
-spec print_session(emqx_types:clientid()) -> map() | undefined.
print_session(ClientId) ->
case emqx_cm:lookup_channels(ClientId) of
[Pid] ->
#{channel := ChanState} = emqx_connection:get_state(Pid),
SessionState = emqx_channel:info(session_state, ChanState),
maps:update_with(s, fun emqx_persistent_session_ds_state:format/1, SessionState#{
'_alive' => {true, Pid}
});
[] ->
emqx_persistent_session_ds_state:print_session(ClientId)
case try_get_live_session(ClientId) of
{Pid, SessionState} ->
maps:update_with(
s, fun emqx_persistent_session_ds_state:format/1, SessionState#{
'_alive' => {true, Pid}
}
);
not_found ->
case emqx_persistent_session_ds_state:print_session(ClientId) of
undefined ->
undefined;
S ->
#{s => S, '_alive' => false}
end;
not_persistent ->
undefined
end.
%%--------------------------------------------------------------------
@ -529,6 +541,44 @@ terminate(_Reason, _Session = #{id := Id, s := S}) ->
?tp(debug, persistent_session_ds_terminate, #{id => Id}),
ok.
%%--------------------------------------------------------------------
%% Management APIs (dashboard)
%%--------------------------------------------------------------------
-spec list_client_subscriptions(emqx_types:clientid()) ->
{node() | undefined, [{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}]}
| {error, not_found}.
list_client_subscriptions(ClientId) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
%% TODO: this is not the most optimal implementation, since it
%% should be possible to avoid reading extra data (streams, etc.)
case print_session(ClientId) of
Sess = #{s := #{subscriptions := Subs}} ->
Node =
case Sess of
#{'_alive' := {true, Pid}} ->
node(Pid);
_ ->
undefined
end,
SubList =
maps:fold(
fun(Topic, #{props := SubProps}, Acc) ->
Elem = {Topic, SubProps},
[Elem | Acc]
end,
[],
Subs
),
{Node, SubList};
undefined ->
{error, not_found}
end;
false ->
{error, not_found}
end.
%%--------------------------------------------------------------------
%% Session tables operations
%%--------------------------------------------------------------------
@ -899,6 +949,27 @@ expiry_interval(ConnInfo) ->
bump_interval() ->
emqx_config:get([session_persistence, last_alive_update_interval]).
-spec try_get_live_session(emqx_types:clientid()) ->
{pid(), session()} | not_found | not_persistent.
try_get_live_session(ClientId) ->
case emqx_cm:lookup_channels(local, ClientId) of
[Pid] ->
try
#{channel := ChanState} = emqx_connection:get_state(Pid),
case emqx_channel:info(impl, ChanState) of
?MODULE ->
{Pid, emqx_channel:info(session_state, ChanState)};
_ ->
not_persistent
end
catch
_:_ ->
not_found
end;
_ ->
not_found
end.
%%--------------------------------------------------------------------
%% SeqNo tracking
%% --------------------------------------------------------------------

View File

@ -181,7 +181,9 @@ format(#{
ranks := Ranks
}) ->
Subs = emqx_topic_gbt:fold(
fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end,
fun(Key, Sub, Acc) ->
maps:put(emqx_topic_gbt:get_topic(Key), Sub, Acc)
end,
#{},
SubsGBT
),

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -48,7 +48,7 @@
%%
%% Most of the time, you just need to:
%% 1. Describe the appspecs for the applications you want to test.
%% 2. Call `emqx_cth_sutie:start/2` to start the applications before the testrun
%% 2. Call `emqx_cth_suite:start/2` to start the applications before the testrun
%% (e.g. in `init_per_suite/1` / `init_per_group/2`), providing the appspecs
%% and unique work dir for the testrun (e.g. `work_dir/1`). Save the result
%% in a context.

View File

@ -1092,14 +1092,6 @@ get_msgs_essentials(Msgs) ->
pick_respective_msgs(MsgRefs, Msgs) ->
[M || M <- Msgs, Ref <- MsgRefs, maps:get(packet_id, M) =:= maps:get(packet_id, Ref)].
skip_ds_tc(Config) ->
case ?config(persistence, Config) of
ds ->
{skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"};
_ ->
Config
end.
debug_info(ClientId) ->
Info = emqx_persistent_session_ds:print_session(ClientId),
ct:pal("*** State:~n~p", [Info]).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -380,6 +380,15 @@ list_authz_cache(ClientId) ->
call_client(ClientId, list_authz_cache).
list_client_subscriptions(ClientId) ->
case emqx_persistent_session_ds:list_client_subscriptions(ClientId) of
{error, not_found} ->
list_client_subscriptions_mem(ClientId);
Result ->
Result
end.
%% List subscriptions of an in-memory session:
list_client_subscriptions_mem(ClientId) ->
case lookup_client({clientid, ClientId}, undefined) of
[] ->
{error, not_found};

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -26,14 +26,56 @@
-define(FORMATFUN, {?MODULE, ident}).
all() ->
emqx_common_test_helpers:all(?MODULE).
[
{group, persistence_disabled},
{group, persistence_enabled}
].
groups() ->
TCs = emqx_common_test_helpers:all(?MODULE),
[
{persistence_disabled, [], TCs},
{persistence_enabled, [], [t_persist_list_subs]}
].
init_per_group(persistence_disabled, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx, "session_persistence { enable = false }"},
emqx_management
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[
{apps, Apps}
| Config
];
init_per_group(persistence_enabled, Config) ->
Apps = emqx_cth_suite:start(
[
{emqx,
"session_persistence {\n"
" enable = true\n"
" last_alive_update_interval = 100ms\n"
" renew_streams_interval = 100ms\n"
"}"},
emqx_management
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[
{apps, Apps}
| Config
].
end_per_group(_Grp, Config) ->
emqx_cth_suite:stop(?config(apps, Config)).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]),
Config.
end_per_suite(_) ->
emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]).
ok.
init_per_testcase(TestCase, Config) ->
meck:expect(emqx, running_nodes, 0, [node()]),
@ -370,6 +412,41 @@ t_banned(_) ->
emqx_mgmt:delete_banned({clientid, <<"TestClient">>})
).
%% This testcase verifies the behavior of various read-only functions
%% used by REST API via `emqx_mgmt' module:
t_persist_list_subs(_) ->
ClientId = <<"persistent_client">>,
Topics = lists:sort([<<"foo/bar">>, <<"/a/+//+/#">>, <<"foo">>]),
VerifySubs =
fun() ->
{Node, Ret} = emqx_mgmt:list_client_subscriptions(ClientId),
?assert(Node =:= node() orelse Node =:= undefined, Node),
{TopicsL, SubProps} = lists:unzip(Ret),
?assertEqual(Topics, lists:sort(TopicsL)),
[?assertMatch(#{rh := _, rap := _, nl := _, qos := _}, I) || I <- SubProps]
end,
%% 0. Verify that management functions work for missing clients:
?assertMatch(
{error, not_found},
emqx_mgmt:list_client_subscriptions(ClientId)
),
%% 1. Connect the client and subscribe to topics:
{ok, Client} = emqtt:start_link([
{clientid, ClientId},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 30}}
]),
{ok, _} = emqtt:connect(Client),
[{ok, _, _} = emqtt:subscribe(Client, I, qos2) || I <- Topics],
%% 2. Verify that management functions work for the connected
%% clients:
VerifySubs(),
%% 3. Disconnect the client:
emqtt:disconnect(Client),
%% 4. Verify that management functions work for the offline
%% clients:
VerifySubs().
%%% helpers
ident(Arg) ->
Arg.