From 98d1094d7372fd8a19a46c0b2ca0a686d60709d8 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 1 Feb 2024 01:43:06 +0100 Subject: [PATCH] feat(sessds): Expose subscriptions in the REST API --- apps/emqx/src/emqx_channel.erl | 4 +- apps/emqx/src/emqx_persistent_session_ds.erl | 91 +++++++++++++++++-- .../src/emqx_persistent_session_ds_state.erl | 4 +- apps/emqx/test/emqx_cth_suite.erl | 4 +- .../test/emqx_persistent_session_SUITE.erl | 8 -- apps/emqx_management/src/emqx_mgmt.erl | 11 ++- apps/emqx_management/test/emqx_mgmt_SUITE.erl | 85 ++++++++++++++++- 7 files changed, 180 insertions(+), 27 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index c2e0f3396..4d6ed37e4 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -193,7 +193,9 @@ info(alias_maximum, #channel{alias_maximum = Limits}) -> info(timers, #channel{timers = Timers}) -> Timers; info(session_state, #channel{session = Session}) -> - Session. + Session; +info(impl, #channel{session = Session}) -> + emqx_session:info(impl, Session). set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 1c1e78058..cf027bd47 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -66,6 +66,11 @@ terminate/2 ]). +%% Managment APIs: +-export([ + list_client_subscriptions/1 +]). + %% session table operations -export([create_tables/0, sync/1]). @@ -243,18 +248,25 @@ info(await_rel_timeout, #{props := Conf}) -> stats(Session) -> info(?STATS_KEYS, Session). -%% Debug/troubleshooting +%% Used by management API -spec print_session(emqx_types:clientid()) -> map() | undefined. print_session(ClientId) -> - case emqx_cm:lookup_channels(ClientId) of - [Pid] -> - #{channel := ChanState} = emqx_connection:get_state(Pid), - SessionState = emqx_channel:info(session_state, ChanState), - maps:update_with(s, fun emqx_persistent_session_ds_state:format/1, SessionState#{ - '_alive' => {true, Pid} - }); - [] -> - emqx_persistent_session_ds_state:print_session(ClientId) + case try_get_live_session(ClientId) of + {Pid, SessionState} -> + maps:update_with( + s, fun emqx_persistent_session_ds_state:format/1, SessionState#{ + '_alive' => {true, Pid} + } + ); + not_found -> + case emqx_persistent_session_ds_state:print_session(ClientId) of + undefined -> + undefined; + S -> + #{s => S, '_alive' => false} + end; + not_persistent -> + undefined end. %%-------------------------------------------------------------------- @@ -529,6 +541,44 @@ terminate(_Reason, _Session = #{id := Id, s := S}) -> ?tp(debug, persistent_session_ds_terminate, #{id => Id}), ok. +%%-------------------------------------------------------------------- +%% Management APIs (dashboard) +%%-------------------------------------------------------------------- + +-spec list_client_subscriptions(emqx_types:clientid()) -> + {node() | undefined, [{emqx_types:topic() | emqx_types:share(), emqx_types:subopts()}]} + | {error, not_found}. +list_client_subscriptions(ClientId) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + %% TODO: this is not the most optimal implementation, since it + %% should be possible to avoid reading extra data (streams, etc.) + case print_session(ClientId) of + Sess = #{s := #{subscriptions := Subs}} -> + Node = + case Sess of + #{'_alive' := {true, Pid}} -> + node(Pid); + _ -> + undefined + end, + SubList = + maps:fold( + fun(Topic, #{props := SubProps}, Acc) -> + Elem = {Topic, SubProps}, + [Elem | Acc] + end, + [], + Subs + ), + {Node, SubList}; + undefined -> + {error, not_found} + end; + false -> + {error, not_found} + end. + %%-------------------------------------------------------------------- %% Session tables operations %%-------------------------------------------------------------------- @@ -899,6 +949,27 @@ expiry_interval(ConnInfo) -> bump_interval() -> emqx_config:get([session_persistence, last_alive_update_interval]). +-spec try_get_live_session(emqx_types:clientid()) -> + {pid(), session()} | not_found | not_persistent. +try_get_live_session(ClientId) -> + case emqx_cm:lookup_channels(local, ClientId) of + [Pid] -> + try + #{channel := ChanState} = emqx_connection:get_state(Pid), + case emqx_channel:info(impl, ChanState) of + ?MODULE -> + {Pid, emqx_channel:info(session_state, ChanState)}; + _ -> + not_persistent + end + catch + _:_ -> + not_found + end; + _ -> + not_found + end. + %%-------------------------------------------------------------------- %% SeqNo tracking %% -------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 0f617153b..4912ebe95 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -181,7 +181,9 @@ format(#{ ranks := Ranks }) -> Subs = emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end, + fun(Key, Sub, Acc) -> + maps:put(emqx_topic_gbt:get_topic(Key), Sub, Acc) + end, #{}, SubsGBT ), diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index fbb9da595..373da9858 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ %% %% Most of the time, you just need to: %% 1. Describe the appspecs for the applications you want to test. -%% 2. Call `emqx_cth_sutie:start/2` to start the applications before the testrun +%% 2. Call `emqx_cth_suite:start/2` to start the applications before the testrun %% (e.g. in `init_per_suite/1` / `init_per_group/2`), providing the appspecs %% and unique work dir for the testrun (e.g. `work_dir/1`). Save the result %% in a context. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 72c04ff74..bdd3e367f 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -1092,14 +1092,6 @@ get_msgs_essentials(Msgs) -> pick_respective_msgs(MsgRefs, Msgs) -> [M || M <- Msgs, Ref <- MsgRefs, maps:get(packet_id, M) =:= maps:get(packet_id, Ref)]. -skip_ds_tc(Config) -> - case ?config(persistence, Config) of - ds -> - {skip, "Testcase not yet supported under 'emqx_persistent_session_ds' implementation"}; - _ -> - Config - end. - debug_info(ClientId) -> Info = emqx_persistent_session_ds:print_session(ClientId), ct:pal("*** State:~n~p", [Info]). diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 9d4ad8521..1995ec9da 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -380,6 +380,15 @@ list_authz_cache(ClientId) -> call_client(ClientId, list_authz_cache). list_client_subscriptions(ClientId) -> + case emqx_persistent_session_ds:list_client_subscriptions(ClientId) of + {error, not_found} -> + list_client_subscriptions_mem(ClientId); + Result -> + Result + end. + +%% List subscriptions of an in-memory session: +list_client_subscriptions_mem(ClientId) -> case lookup_client({clientid, ClientId}, undefined) of [] -> {error, not_found}; diff --git a/apps/emqx_management/test/emqx_mgmt_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_SUITE.erl index 3eb37060e..9ce737353 100644 --- a/apps/emqx_management/test/emqx_mgmt_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -26,14 +26,56 @@ -define(FORMATFUN, {?MODULE, ident}). all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, persistence_disabled}, + {group, persistence_enabled} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {persistence_disabled, [], TCs}, + {persistence_enabled, [], [t_persist_list_subs]} + ]. + +init_per_group(persistence_disabled, Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, "session_persistence { enable = false }"}, + emqx_management + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [ + {apps, Apps} + | Config + ]; +init_per_group(persistence_enabled, Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, + "session_persistence {\n" + " enable = true\n" + " last_alive_update_interval = 100ms\n" + " renew_streams_interval = 100ms\n" + "}"}, + emqx_management + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [ + {apps, Apps} + | Config + ]. + +end_per_group(_Grp, Config) -> + emqx_cth_suite:stop(?config(apps, Config)). init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_management]), Config. end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite([emqx_management, emqx_conf]). + ok. init_per_testcase(TestCase, Config) -> meck:expect(emqx, running_nodes, 0, [node()]), @@ -370,6 +412,41 @@ t_banned(_) -> emqx_mgmt:delete_banned({clientid, <<"TestClient">>}) ). +%% This testcase verifies the behavior of various read-only functions +%% used by REST API via `emqx_mgmt' module: +t_persist_list_subs(_) -> + ClientId = <<"persistent_client">>, + Topics = lists:sort([<<"foo/bar">>, <<"/a/+//+/#">>, <<"foo">>]), + VerifySubs = + fun() -> + {Node, Ret} = emqx_mgmt:list_client_subscriptions(ClientId), + ?assert(Node =:= node() orelse Node =:= undefined, Node), + {TopicsL, SubProps} = lists:unzip(Ret), + ?assertEqual(Topics, lists:sort(TopicsL)), + [?assertMatch(#{rh := _, rap := _, nl := _, qos := _}, I) || I <- SubProps] + end, + %% 0. Verify that management functions work for missing clients: + ?assertMatch( + {error, not_found}, + emqx_mgmt:list_client_subscriptions(ClientId) + ), + %% 1. Connect the client and subscribe to topics: + {ok, Client} = emqtt:start_link([ + {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + ]), + {ok, _} = emqtt:connect(Client), + [{ok, _, _} = emqtt:subscribe(Client, I, qos2) || I <- Topics], + %% 2. Verify that management functions work for the connected + %% clients: + VerifySubs(), + %% 3. Disconnect the client: + emqtt:disconnect(Client), + %% 4. Verify that management functions work for the offline + %% clients: + VerifySubs(). + %%% helpers ident(Arg) -> Arg.