Merge pull request #12979 from thalesmg/fix-ps-monitor-topic-count-r57-20240506

fix(monitor api): count persistent routes and subscriptions
This commit is contained in:
Thales Macedo Garitezi 2024-05-08 15:09:16 -03:00 committed by GitHub
commit 8d0574abf0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 257 additions and 8 deletions

View File

@ -110,7 +110,7 @@ reclaim_seq(Topic) ->
stats_fun() -> stats_fun() ->
safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'), safe_update_stats(subscriber_val(), 'subscribers.count', 'subscribers.max'),
safe_update_stats(table_size(?SUBSCRIPTION), 'subscriptions.count', 'subscriptions.max'), safe_update_stats(subscription_count(), 'subscriptions.count', 'subscriptions.max'),
safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max'). safe_update_stats(table_size(?SUBOPTION), 'suboptions.count', 'suboptions.max').
safe_update_stats(undefined, _Stat, _MaxStat) -> safe_update_stats(undefined, _Stat, _MaxStat) ->
@ -118,6 +118,16 @@ safe_update_stats(undefined, _Stat, _MaxStat) ->
safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) -> safe_update_stats(Val, Stat, MaxStat) when is_integer(Val) ->
emqx_stats:setstat(Stat, MaxStat, Val). emqx_stats:setstat(Stat, MaxStat, Val).
subscription_count() ->
NonPSCount = table_size(?SUBSCRIPTION),
PSCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
case is_integer(NonPSCount) of
true ->
NonPSCount + PSCount;
false ->
PSCount
end.
subscriber_val() -> subscriber_val() ->
sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)). sum_subscriber(table_size(?SUBSCRIBER), table_size(?SHARED_SUBSCRIBER)).

View File

@ -53,6 +53,7 @@ init([]) ->
RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker), RegistryKeeper = child_spec(emqx_cm_registry_keeper, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker),
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor), DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
DSSessionBookkeeper = child_spec(emqx_persistent_session_bookkeeper, 5_000, worker),
Children = Children =
[ [
Banned, Banned,
@ -62,7 +63,8 @@ init([]) ->
Registry, Registry,
RegistryKeeper, RegistryKeeper,
Manager, Manager,
DSSessionGCSup DSSessionGCSup,
DSSessionBookkeeper
], ],
{ok, {SupFlags, Children}}. {ok, {SupFlags, Children}}.

View File

@ -0,0 +1,107 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_bookkeeper).
-behaviour(gen_server).
%% API
-export([
start_link/0,
get_subscription_count/0
]).
%% `gen_server' API
-export([
init/1,
handle_continue/2,
handle_call/3,
handle_cast/2,
handle_info/2
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%% call/cast/info events
-record(tally_subs, {}).
-record(get_subscription_count, {}).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec start_link() -> gen_server:start_ret().
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, _InitOpts = #{}, _Opts = []).
%% @doc Gets a cached view of the cluster-global count of persistent subscriptions.
-spec get_subscription_count() -> non_neg_integer().
get_subscription_count() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
gen_server:call(?MODULE, #get_subscription_count{}, infinity);
false ->
0
end.
%%------------------------------------------------------------------------------
%% `gen_server' API
%%------------------------------------------------------------------------------
init(_Opts) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
State = #{subs_count => 0},
{ok, State, {continue, #tally_subs{}}};
false ->
ignore
end.
handle_continue(#tally_subs{}, State0) ->
State = tally_persistent_subscriptions(State0),
ensure_subs_tally_timer(),
{noreply, State}.
handle_call(#get_subscription_count{}, _From, State) ->
#{subs_count := N} = State,
{reply, N, State};
handle_call(_Call, _From, State) ->
{reply, {error, bad_call}, State}.
handle_cast(_Cast, State) ->
{noreply, State}.
handle_info(#tally_subs{}, State0) ->
State = tally_persistent_subscriptions(State0),
ensure_subs_tally_timer(),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
tally_persistent_subscriptions(State0) ->
N = emqx_persistent_session_ds_state:total_subscription_count(),
State0#{subs_count := N}.
ensure_subs_tally_timer() ->
Timeout = emqx_config:get([session_persistence, subscription_count_refresh_interval]),
_ = erlang:send_after(Timeout, self(), #tally_subs{}),
ok.

View File

@ -54,6 +54,7 @@
cold_get_subscription/2, cold_get_subscription/2,
fold_subscriptions/3, fold_subscriptions/3,
n_subscriptions/1, n_subscriptions/1,
total_subscription_count/0,
put_subscription/3, put_subscription/3,
del_subscription/2 del_subscription/2
]). ]).
@ -406,6 +407,12 @@ fold_subscriptions(Fun, Acc, Rec) ->
n_subscriptions(Rec) -> n_subscriptions(Rec) ->
gen_size(?subscriptions, Rec). gen_size(?subscriptions, Rec).
-spec total_subscription_count() -> non_neg_integer().
total_subscription_count() ->
mria:async_dirty(?DS_MRIA_SHARD, fun() ->
mnesia:foldl(fun(#kv{}, Acc) -> Acc + 1 end, 0, ?subscription_tab)
end).
-spec put_subscription( -spec put_subscription(
emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds:topic_filter(),
emqx_persistent_session_ds_subs:subscription(), emqx_persistent_session_ds_subs:subscription(),

View File

@ -189,7 +189,17 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
stats_fun() -> stats_fun() ->
emqx_stats:setstat('topics.count', 'topics.max', emqx_router:stats(n_routes)). PSRouteCount = persistent_route_count(),
NonPSRouteCount = emqx_router:stats(n_routes),
emqx_stats:setstat('topics.count', 'topics.max', PSRouteCount + NonPSRouteCount).
persistent_route_count() ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
emqx_persistent_session_ds_router:stats(n_routes);
false ->
0
end.
cleanup_routes(Node) -> cleanup_routes(Node) ->
emqx_router:cleanup_routes(Node). emqx_router:cleanup_routes(Node).

View File

@ -1713,6 +1713,14 @@ fields("session_persistence") ->
desc => ?DESC(session_ds_session_gc_batch_size) desc => ?DESC(session_ds_session_gc_batch_size)
} }
)}, )},
{"subscription_count_refresh_interval",
sc(
timeout_duration(),
#{
default => <<"5s">>,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"message_retention_period", {"message_retention_period",
sc( sc(
timeout_duration(), timeout_duration(),

View File

@ -475,6 +475,7 @@ zone_global_defaults() ->
message_retention_period => 86400000, message_retention_period => 86400000,
renew_streams_interval => 5000, renew_streams_interval => 5000,
session_gc_batch_size => 100, session_gc_batch_size => 100,
session_gc_interval => 600000 session_gc_interval => 600000,
subscription_count_refresh_interval => 5000
} }
}. }.

View File

@ -20,11 +20,13 @@
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_SUITE, [auth_header_/0]). -import(emqx_dashboard_SUITE, [auth_header_/0]).
-import(emqx_common_test_helpers, [on_exit/1]).
-include("emqx_dashboard.hrl"). -include("emqx_dashboard.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-define(SERVER, "http://127.0.0.1:18083"). -define(SERVER, "http://127.0.0.1:18083").
-define(BASE_PATH, "/api/v5"). -define(BASE_PATH, "/api/v5").
@ -52,10 +54,47 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). [
{group, common},
{group, persistent_sessions}
].
groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE),
PSTCs = persistent_session_testcases(),
[
{common, [], AllTCs -- PSTCs},
{persistent_sessions, [], PSTCs}
].
persistent_session_testcases() ->
[
t_persistent_session_stats
].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(), Config.
end_per_suite(_Config) ->
ok.
init_per_group(persistent_sessions = Group, Config) ->
Apps = emqx_cth_suite:start(
[
emqx_conf,
{emqx, "session_persistence {enable = true}"},
{emqx_retainer, ?BASE_RETAINER_CONF},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard(
"dashboard.listeners.http { enable = true, bind = 18083 }\n"
"dashboard.sample_interval = 1s"
)
],
#{work_dir => emqx_cth_suite:work_dir(Group, Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config];
init_per_group(common = Group, Config) ->
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
[ [
emqx, emqx,
@ -67,12 +106,12 @@ init_per_suite(Config) ->
"dashboard.sample_interval = 1s" "dashboard.sample_interval = 1s"
) )
], ],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
), ),
{ok, _} = emqx_common_test_http:create_default_app(), {ok, _} = emqx_common_test_http:create_default_app(),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
end_per_suite(Config) -> end_per_group(_Group, Config) ->
Apps = ?config(apps, Config), Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps), emqx_cth_suite:stop(Apps),
ok. ok.
@ -84,6 +123,7 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
emqx_common_test_helpers:call_janitor(),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -272,6 +312,51 @@ t_monitor_api_error(_) ->
request(["monitor"], "latest=-1"), request(["monitor"], "latest=-1"),
ok. ok.
%% Verifies that subscriptions from persistent sessions are correctly accounted for.
t_persistent_session_stats(_Config) ->
%% pre-condition
true = emqx_persistent_message:is_persistence_enabled(),
NonPSClient = start_and_connect(#{
clientid => <<"non-ps">>,
expiry_interval => 0
}),
PSClient = start_and_connect(#{
clientid => <<"ps">>,
expiry_interval => 30
}),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"non/ps/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(NonPSClient, <<"common/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"ps/topic">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic/+">>, 2),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(PSClient, <<"common/topic">>, 2),
{ok, _} =
snabbkaffe:block_until(
?match_n_events(2, #{?snk_kind := dashboard_monitor_flushed}),
infinity
),
?retry(1_000, 10, begin
?assertMatch(
{ok, #{
%% N.B.: we currently don't perform any deduplication between persistent
%% and non-persistent routes, so we count `commont/topic' twice and get 8
%% instead of 6 here.
<<"topics">> := 8,
<<"subscriptions">> := 8
}},
request(["monitor_current"])
)
end),
%% Sanity checks
PSRouteCount = emqx_persistent_session_ds_router:stats(n_routes),
?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}),
PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(),
?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}),
ok.
request(Path) -> request(Path) ->
request(Path, ""). request(Path, "").
@ -340,3 +425,22 @@ waiting_emqx_stats_and_monitor_update(WaitKey) ->
%% manually call monitor update %% manually call monitor update
_ = emqx_dashboard_monitor:current_rate_cluster(), _ = emqx_dashboard_monitor:current_rate_cluster(),
ok. ok.
start_and_connect(Opts) ->
Defaults = #{clean_start => false, expiry_interval => 30},
#{
clientid := ClientId,
clean_start := CleanStart,
expiry_interval := EI
} = maps:merge(Defaults, Opts),
{ok, Client} = emqtt:start_link([
{clientid, ClientId},
{clean_start, CleanStart},
{proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => EI}}
]),
on_exit(fun() ->
catch emqtt:disconnect(Client, ?RC_NORMAL_DISCONNECTION, #{'Session-Expiry-Interval' => 0})
end),
{ok, _} = emqtt:connect(Client),
Client.