diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index 368398b92..854e56fc5 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -110,7 +110,7 @@ reclaim_seq(Topic) -> stats_fun() -> safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'), - safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'), + safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'), safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max'). safe_update_stats(undefined, _Stat, _MaxStat) -> @@ -118,6 +118,16 @@ safe_update_stats(undefined, _Stat, _MaxStat) -> safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) -> emqx_stats:setstat(Stat, MaxStat, Val). +subscription_count() -> + NonPSCount = table_size(?SUBSCRIPTION), + PSCount = emqx_persistent_session_bookkeeper:get_subscription_count(), + case is_integer(NonPSCount) of + true -> + NonPSCount + PSCount; + false -> + PSCount + end. + subscriber_val() -> sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)). diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl index 3b8e53961..d8b9aeb57 100644 --- a/apps/emqx/src/emqx_cm_sup.erl +++ b/apps/emqx/src/emqx_cm_sup.erl @@ -53,6 +53,7 @@ init([]) -> RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), + DSSessionBookkeeper = child_spec(emqx_persistent_session_bookkeeper, 5_000, worker), Children = [ Banned, @@ -62,7 +63,8 @@ init([]) -> Registry, RegistryKeeper, Manager, - DSSessionGCSup + DSSessionGCSup, + DSSessionBookkeeper ], {ok, {SupFlags, Children}}. diff --git a/apps/emqx/src/emqx_persistent_session_bookkeeper.erl b/apps/emqx/src/emqx_persistent_session_bookkeeper.erl new file mode 100644 index 000000000..42751161f --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_bookkeeper.erl @@ -0,0 +1,107 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_bookkeeper). + +-behaviour(gen_server). + +%% API +-export([ + start_link/0, + get_subscription_count/0 +]). + +%% `gen_server' API +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +%% call/cast/info events +-record(tally_subs, {}). +-record(get_subscription_count, {}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link() -> gen_server:start_ret(). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, _InitOpts = #{}, _Opts = []). + +%% @doc Gets a cached view of the cluster-global count of persistent subscriptions. +-spec get_subscription_count() -> non_neg_integer(). +get_subscription_count() -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + gen_server:call(?MODULE, #get_subscription_count{}, infinity); + false -> + 0 + end. + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init(_Opts) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + State = #{subs_count => 0}, + {ok, State, {continue, #tally_subs{}}}; + false -> + ignore + end. + +handle_continue(#tally_subs{}, State0) -> + State = tally_persistent_subscriptions(State0), + ensure_subs_tally_timer(), + {noreply, State}. + +handle_call(#get_subscription_count{}, _From, State) -> + #{subs_count := N} = State, + {reply, N, State}; +handle_call(_Call, _From, State) -> + {reply, {error, bad_call}, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info(#tally_subs{}, State0) -> + State = tally_persistent_subscriptions(State0), + ensure_subs_tally_timer(), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +tally_persistent_subscriptions(State0) -> + N = emqx_persistent_session_ds_state:total_subscription_count(), + State0#{subs_count := N}. + +ensure_subs_tally_timer() -> + Timeout = emqx_config:get([session_persistence, subscription_count_refresh_interval]), + _ = erlang:send_after(Timeout, self(), #tally_subs{}), + ok. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index d7161c10e..8f5f01e5c 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -54,6 +54,7 @@ cold_get_subscription/2, fold_subscriptions/3, n_subscriptions/1, + total_subscription_count/0, put_subscription/3, del_subscription/2 ]). @@ -406,6 +407,12 @@ fold_subscriptions(Fun, Acc, Rec) -> n_subscriptions(Rec) -> gen_size(?subscriptions, Rec). +-spec total_subscription_count() -> non_neg_integer(). +total_subscription_count() -> + mria:async_dirty(?DS_MRIA_SHARD, fun() -> + mnesia:foldl(fun(#kv{}, Acc) -> Acc + 1 end, 0, ?subscription_tab) + end). + -spec put_subscription( emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_subs:subscription(), diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 17222446e..710665ba4 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -189,7 +189,17 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- stats_fun() -> - emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)). + PSRouteCount = persistent_route_count(), + NonPSRouteCount = emqx_router:stats(n_routes), + emqx_stats:setstat('topics.count', 'topics.max', PSRouteCount + NonPSRouteCount). + +persistent_route_count() -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + emqx_persistent_session_ds_router:stats(n_routes); + false -> + 0 + end. cleanup_routes(Node) -> emqx_router:cleanup_routes(Node). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index b4ba08bbc..7f2839f35 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1713,6 +1713,14 @@ fields("session_persistence") -> desc => ?DESC(session_ds_session_gc_batch_size) } )}, + {"subscription_count_refresh_interval", + sc( + timeout_duration(), + #{ + default => <<"5s">>, + importance => ?IMPORTANCE_HIDDEN + } + )}, {"message_retention_period", sc( timeout_duration(), diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index a9b4a8328..54cc2ee51 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -475,6 +475,7 @@ zone_global_defaults() -> message_retention_period => 86400000, renew_streams_interval => 5000, session_gc_batch_size => 100, - session_gc_interval => 600000 + session_gc_interval => 600000, + subscription_count_refresh_interval => 5000 } }. diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index 95fe2e809..14c4f5fde 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -20,11 +20,13 @@ -compile(export_all). -import(emqx_dashboard_SUITE, [auth_header_/0]). +-import(emqx_common_test_helpers, [on_exit/1]). -include("emqx_dashboard.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -define(SERVER, "http://127.0.0.1:18083"). -define(BASE_PATH, "/api/v5"). @@ -52,10 +54,47 @@ %%-------------------------------------------------------------------- all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, common}, + {group, persistent_sessions} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + PSTCs = persistent_session_testcases(), + [ + {common, [], AllTCs -- PSTCs}, + {persistent_sessions, [], PSTCs} + ]. + +persistent_session_testcases() -> + [ + t_persistent_session_stats + ]. init_per_suite(Config) -> - emqx_common_test_helpers:clear_screen(), + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(persistent_sessions = Group, Config) -> + Apps = emqx_cth_suite:start( + [ + emqx_conf, + {emqx, "session_persistence {enable = true}"}, + {emqx_retainer, ?BASE_RETAINER_CONF}, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard( + "dashboard.listeners.http { enable = true, bind = 18083 }\n" + "dashboard.sample_interval = 1s" + ) + ], + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + [{apps, Apps} | Config]; +init_per_group(common = Group, Config) -> Apps = emqx_cth_suite:start( [ emqx, @@ -67,12 +106,12 @@ init_per_suite(Config) -> "dashboard.sample_interval = 1s" ) ], - #{work_dir => emqx_cth_suite:work_dir(Config)} + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} ), {ok, _} = emqx_common_test_http:create_default_app(), [{apps, Apps} | Config]. -end_per_suite(Config) -> +end_per_group(_Group, Config) -> Apps = ?config(apps, Config), emqx_cth_suite:stop(Apps), ok. @@ -84,6 +123,7 @@ init_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, _Config) -> ok = snabbkaffe:stop(), + emqx_common_test_helpers:call_janitor(), ok. %%-------------------------------------------------------------------- @@ -272,6 +312,51 @@ t_monitor_api_error(_) -> request(["monitor"], "latest=-1"), ok. +%% Verifies that subscriptions from persistent sessions are correctly accounted for. +t_persistent_session_stats(_Config) -> + %% pre-condition + true = emqx_persistent_message:is_persistence_enabled(), + + NonPSClient = start_and_connect(#{ + clientid => <<"non-ps">>, + expiry_interval => 0 + }), + PSClient = start_and_connect(#{ + clientid => <<"ps">>, + expiry_interval => 30 + }), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic/+">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic/+">>, 2), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic">>, 2), + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}), + infinity + ), + ?retry(1_000, 10, begin + ?assertMatch( + {ok, #{ + %% N.B.: we currently don't perform any deduplication between persistent + %% and non-persistent routes, so we count `commont/topic' twice and get 8 + %% instead of 6 here. + <<"topics">> := 8, + <<"subscriptions">> := 8 + }}, + request(["monitor_current"]) + ) + end), + %% Sanity checks + PSRouteCount = emqx_persistent_session_ds_router:stats(n_routes), + ?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}), + PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(), + ?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}), + ok. + request(Path) -> request(Path, ""). @@ -340,3 +425,22 @@ waiting_emqx_stats_and_monitor_update(WaitKey) -> %% manually call monitor update _ = emqx_dashboard_monitor:current_rate_cluster(), ok. + +start_and_connect(Opts) -> + Defaults = #{clean_start => false, expiry_interval => 30}, + #{ + clientid := ClientId, + clean_start := CleanStart, + expiry_interval := EI + } = maps:merge(Defaults, Opts), + {ok, Client} = emqtt:start_link([ + {clientid, ClientId}, + {clean_start, CleanStart}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => EI}} + ]), + on_exit(fun() -> + catch emqtt:disconnect(Client, ?RC_NORMAL_DISCONNECTION, #{'Session-Expiry-Interval' => 0}) + end), + {ok, _} = emqtt:connect(Client), + Client.