From 8b4a1c3d75617c110ca4697fb93ed11e9556ac7c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 20 May 2024 10:39:34 -0300 Subject: [PATCH 1/2] fix(client mgmt api): cache disconnected durable session count for `/clients` api Fixes https://emqx.atlassian.net/browse/EMQX-12396 --- .../emqx_persistent_session_bookkeeper.erl | 62 ++++++++++++++++++- apps/emqx/src/emqx_schema.erl | 8 +++ apps/emqx/test/emqx_config_SUITE.erl | 3 +- .../src/emqx_mgmt_api_clients.erl | 27 +------- .../test/emqx_mgmt_api_clients_SUITE.erl | 22 +++---- 5 files changed, 84 insertions(+), 38 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_bookkeeper.erl b/apps/emqx/src/emqx_persistent_session_bookkeeper.erl index bed63224d..7aa2d7a72 100644 --- a/apps/emqx/src/emqx_persistent_session_bookkeeper.erl +++ b/apps/emqx/src/emqx_persistent_session_bookkeeper.erl @@ -21,7 +21,8 @@ %% API -export([ start_link/0, - get_subscription_count/0 + get_subscription_count/0, + get_disconnected_session_count/0 ]). %% `gen_server' API @@ -39,7 +40,9 @@ %% call/cast/info events -record(tally_subs, {}). +-record(tally_disconnected_sessions, {}). -record(get_subscription_count, {}). +-record(get_disconnected_session_count, {}). %%------------------------------------------------------------------------------ %% API @@ -59,6 +62,16 @@ get_subscription_count() -> 0 end. +%% @doc Gets a cached view of the cluster-global count of disconnected persistent sessions. +-spec get_disconnected_session_count() -> non_neg_integer(). +get_disconnected_session_count() -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + gen_server:call(?MODULE, #get_disconnected_session_count{}, infinity); + false -> + 0 + end. + %%------------------------------------------------------------------------------ %% `gen_server' API %%------------------------------------------------------------------------------ @@ -66,7 +79,10 @@ get_subscription_count() -> init(_Opts) -> case emqx_persistent_message:is_persistence_enabled() of true -> - State = #{subs_count => 0}, + State = #{ + subs_count => 0, + disconnected_session_count => 0 + }, {ok, State, {continue, #tally_subs{}}}; false -> ignore @@ -75,11 +91,18 @@ init(_Opts) -> handle_continue(#tally_subs{}, State0) -> State = tally_persistent_subscriptions(State0), ensure_subs_tally_timer(), + {noreply, State, {continue, #tally_disconnected_sessions{}}}; +handle_continue(#tally_disconnected_sessions{}, State0) -> + State = tally_disconnected_persistent_sessions(State0), + ensure_disconnected_sessions_tally_timer(), {noreply, State}. handle_call(#get_subscription_count{}, _From, State) -> #{subs_count := N} = State, {reply, N, State}; +handle_call(#get_disconnected_session_count{}, _From, State) -> + #{disconnected_session_count := N} = State, + {reply, N, State}; handle_call(_Call, _From, State) -> {reply, {error, bad_call}, State}. @@ -90,6 +113,10 @@ handle_info(#tally_subs{}, State0) -> State = tally_persistent_subscriptions(State0), ensure_subs_tally_timer(), {noreply, State}; +handle_info(#tally_disconnected_sessions{}, State0) -> + State = tally_disconnected_persistent_sessions(State0), + ensure_disconnected_sessions_tally_timer(), + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -101,7 +128,38 @@ tally_persistent_subscriptions(State0) -> N = emqx_persistent_session_ds_state:total_subscription_count(), State0#{subs_count := N}. +tally_disconnected_persistent_sessions(State0) -> + N = do_tally_disconnected_persistent_sessions(), + State0#{disconnected_session_count := N}. + ensure_subs_tally_timer() -> Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]), _ = erlang:send_after(Timeout, self(), #tally_subs{}), ok. + +ensure_disconnected_sessions_tally_timer() -> + Timeout = emqx_config:get([durable_sessions, disconnected_session_count_refresh_interval]), + _ = erlang:send_after(Timeout, self(), #tally_disconnected_sessions{}), + ok. + +do_tally_disconnected_persistent_sessions() -> + Iter = emqx_persistent_session_ds_state:make_session_iterator(), + do_tally_disconnected_persistent_sessions(Iter, 0). + +do_tally_disconnected_persistent_sessions('$end_of_table', N) -> + N; +do_tally_disconnected_persistent_sessions(Iter0, N) -> + case emqx_persistent_session_ds_state:session_iterator_next(Iter0, 1) of + {[], _} -> + N; + {[{Id, _Meta}], Iter} -> + case is_live_session(Id) of + true -> + do_tally_disconnected_persistent_sessions(Iter, N); + false -> + do_tally_disconnected_persistent_sessions(Iter, N + 1) + end + end. + +is_live_session(Id) -> + [] =/= emqx_cm_registry:lookup_channels(Id). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d5e10efc0..c3584d7dd 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1719,6 +1719,14 @@ fields("durable_sessions") -> importance => ?IMPORTANCE_HIDDEN } )}, + {"disconnected_session_count_refresh_interval", + sc( + timeout_duration(), + #{ + default => <<"5s">>, + importance => ?IMPORTANCE_HIDDEN + } + )}, {"message_retention_period", sc( timeout_duration(), diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index e7d8629b2..d32b88b07 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -476,6 +476,7 @@ zone_global_defaults() -> renew_streams_interval => 5000, session_gc_batch_size => 100, session_gc_interval => 600000, - subscription_count_refresh_interval => 5000 + subscription_count_refresh_interval => 5000, + disconnected_session_count_refresh_interval => 5000 } }. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index d29deedce..a9ec7c62b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1383,7 +1383,6 @@ do_list_clients_cluster_query( {Rows, QueryState1 = #{complete := Complete0}} -> case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of {enough, NResultAcc} -> - %% TODO: add persistent session count? %% TODO: this may return `{error, _, _}'... QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes( Tail, QueryState1 @@ -1427,8 +1426,9 @@ add_persistent_session_count(QueryState0 = #{total := Totals0}) -> %% to traverse the whole table), but also hard to deduplicate live connections %% from it... So this count will possibly overshoot the true count of %% sessions. - SessionCount = persistent_session_count(), - Totals = Totals0#{undefined => SessionCount}, + DisconnectedSessionCount = + emqx_persistent_session_bookkeeper:get_disconnected_session_count(), + Totals = Totals0#{undefined => DisconnectedSessionCount}, QueryState0#{total := Totals}; false -> QueryState0 @@ -1476,27 +1476,6 @@ no_persistent_sessions() -> true end. --spec persistent_session_count() -> non_neg_integer(). -persistent_session_count() -> - %% N.B.: this is potentially costly. Should not be called in hot paths. - %% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse... - do_persistent_session_count(init_persistent_session_iterator(), 0). - -do_persistent_session_count('$end_of_table', N) -> - N; -do_persistent_session_count(Cursor, N) -> - case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of - {[], _} -> - N; - {[{_Id, Meta}], NextCursor} -> - case is_expired(Meta) of - true -> - do_persistent_session_count(NextCursor, N); - false -> - do_persistent_session_count(NextCursor, N + 1) - end - end. - is_expired(#{last_alive_at := LastAliveAt, expiry_interval := ExpiryInterval}) -> LastAliveAt + ExpiryInterval < erlang:system_time(millisecond). diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 23ab101c9..d51085eea 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -79,7 +79,9 @@ end_per_suite(Config) -> init_per_group(persistent_sessions, Config) -> AppSpecs = [ - {emqx, "durable_sessions.enable = true"}, + {emqx, + "durable_sessions.enable = true\n" + "durable_sessions.disconnected_session_count_refresh_interval = 100ms"}, emqx_management ], Dashboard = emqx_mgmt_api_test_util:emqx_dashboard( @@ -457,9 +459,7 @@ t_persistent_sessions5(Config) -> {{_, 200, _}, _, #{ <<"data">> := [_, _, _], <<"meta">> := #{ - %% TODO: if/when we fix the persistent session count, this - %% should be 4. - <<"count">> := 6, + <<"count">> := 4, <<"hasnext">> := true } }}}, @@ -470,9 +470,7 @@ t_persistent_sessions5(Config) -> {{_, 200, _}, _, #{ <<"data">> := [_], <<"meta">> := #{ - %% TODO: if/when we fix the persistent session count, this - %% should be 4. - <<"count">> := 6, + <<"count">> := 4, <<"hasnext">> := false } }}}, @@ -489,9 +487,7 @@ t_persistent_sessions5(Config) -> {{_, 200, _}, _, #{ <<"data">> := [_, _], <<"meta">> := #{ - %% TODO: if/when we fix the persistent session count, this - %% should be 4. - <<"count">> := 6, + <<"count">> := 4, <<"hasnext">> := true } }}}, @@ -1996,7 +1992,11 @@ assert_single_client(Opts) -> 100, 20, ?assertMatch( - {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}}, + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [#{<<"connected">> := IsConnected}], + <<"meta">> := #{<<"count">> := 1} + }}}, list_request(APIPort) ) ), From 7c5cb1acc5cf0e5f21be86278c8a19e850d41855 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 21 May 2024 10:03:09 -0300 Subject: [PATCH 2/2] fix(monitor current api): expose `disconnected_durable_sessions` count Fixes https://emqx.atlassian.net/browse/EMQX-12423 --- .../emqx_dashboard/include/emqx_dashboard.hrl | 1 + .../src/emqx_dashboard_monitor.erl | 52 +++++++++++++------ .../src/emqx_dashboard_monitor_api.erl | 2 + .../test/emqx_dashboard_monitor_SUITE.erl | 25 +++++++++ 4 files changed, 64 insertions(+), 16 deletions(-) diff --git a/apps/emqx_dashboard/include/emqx_dashboard.hrl b/apps/emqx_dashboard/include/emqx_dashboard.hrl index 61e513a5f..c2d3479cf 100644 --- a/apps/emqx_dashboard/include/emqx_dashboard.hrl +++ b/apps/emqx_dashboard/include/emqx_dashboard.hrl @@ -72,6 +72,7 @@ ]). -define(GAUGE_SAMPLER_LIST, [ + disconnected_durable_sessions, durable_subscriptions, subscriptions, topics, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index f2ebe3831..d3b533ebf 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -262,6 +262,8 @@ merge_cluster_rate(Node, Cluster) -> Fun = fun %% cluster-synced values + (disconnected_durable_sessions, V, NCluster) -> + NCluster#{disconnected_durable_sessions => V}; (durable_subscriptions, V, NCluster) -> NCluster#{durable_subscriptions => V}; (topics, V, NCluster) -> @@ -417,22 +419,40 @@ getstats(Key) -> _:_ -> 0 end. -stats(connections) -> emqx_stats:getstat('connections.count'); -stats(durable_subscriptions) -> emqx_stats:getstat('durable_subscriptions.count'); -stats(live_connections) -> emqx_stats:getstat('live_connections.count'); -stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count'); -stats(topics) -> emqx_stats:getstat('topics.count'); -stats(subscriptions) -> emqx_stats:getstat('subscriptions.count'); -stats(shared_subscriptions) -> emqx_stats:getstat('subscriptions.shared.count'); -stats(retained_msg_count) -> emqx_stats:getstat('retained.count'); -stats(received) -> emqx_metrics:val('messages.received'); -stats(received_bytes) -> emqx_metrics:val('bytes.received'); -stats(sent) -> emqx_metrics:val('messages.sent'); -stats(sent_bytes) -> emqx_metrics:val('bytes.sent'); -stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded'); -stats(validation_failed) -> emqx_metrics:val('messages.validation_failed'); -stats(dropped) -> emqx_metrics:val('messages.dropped'); -stats(persisted) -> emqx_metrics:val('messages.persisted'). +stats(connections) -> + emqx_stats:getstat('connections.count'); +stats(disconnected_durable_sessions) -> + emqx_persistent_session_bookkeeper:get_disconnected_session_count(); +stats(durable_subscriptions) -> + emqx_stats:getstat('durable_subscriptions.count'); +stats(live_connections) -> + emqx_stats:getstat('live_connections.count'); +stats(cluster_sessions) -> + emqx_stats:getstat('cluster_sessions.count'); +stats(topics) -> + emqx_stats:getstat('topics.count'); +stats(subscriptions) -> + emqx_stats:getstat('subscriptions.count'); +stats(shared_subscriptions) -> + emqx_stats:getstat('subscriptions.shared.count'); +stats(retained_msg_count) -> + emqx_stats:getstat('retained.count'); +stats(received) -> + emqx_metrics:val('messages.received'); +stats(received_bytes) -> + emqx_metrics:val('bytes.received'); +stats(sent) -> + emqx_metrics:val('messages.sent'); +stats(sent_bytes) -> + emqx_metrics:val('bytes.sent'); +stats(validation_succeeded) -> + emqx_metrics:val('messages.validation_succeeded'); +stats(validation_failed) -> + emqx_metrics:val('messages.validation_failed'); +stats(dropped) -> + emqx_metrics:val('messages.dropped'); +stats(persisted) -> + emqx_metrics:val('messages.persisted'). %% ------------------------------------------------------------------------------------------------- %% Retained && License Quota diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index 1dca9d341..c8bb9c8be 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -194,6 +194,8 @@ swagger_desc(validation_failed) -> swagger_desc_format("Schema validations failed "); swagger_desc(persisted) -> swagger_desc_format("Messages saved to the durable storage "); +swagger_desc(disconnected_durable_sessions) -> + <<"Disconnected durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(durable_subscriptions) -> <<"Subscriptions from durable sessions at the time of sampling.", ?APPROXIMATE_DESC>>; swagger_desc(subscriptions) -> diff --git a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl index f174e03be..59951faa9 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_monitor_SUITE.erl @@ -341,6 +341,8 @@ t_persistent_session_stats(_Config) -> ?retry(1_000, 10, begin ?assertMatch( {ok, #{ + <<"connections">> := 2, + <<"disconnected_durable_sessions">> := 0, %% N.B.: we currently don't perform any deduplication between persistent %% and non-persistent routes, so we count `commont/topic' twice and get 8 %% instead of 6 here. @@ -356,6 +358,29 @@ t_persistent_session_stats(_Config) -> ?assert(PSRouteCount > 0, #{ps_route_count => PSRouteCount}), PSSubCount = emqx_persistent_session_bookkeeper:get_subscription_count(), ?assert(PSSubCount > 0, #{ps_sub_count => PSSubCount}), + + %% Now with disconnected but alive persistent sessions + {ok, {ok, _}} = + ?wait_async_action( + emqtt:disconnect(PSClient), + #{?snk_kind := dashboard_monitor_flushed} + ), + ?retry(1_000, 10, begin + ?assertMatch( + {ok, #{ + <<"connections">> := 1, + <<"disconnected_durable_sessions">> := 1, + %% N.B.: we currently don't perform any deduplication between persistent + %% and non-persistent routes, so we count `commont/topic' twice and get 8 + %% instead of 6 here. + <<"topics">> := 8, + <<"durable_subscriptions">> := 4, + <<"subscriptions">> := 4 + }}, + request(["monitor_current"]) + ) + end), + ok. request(Path) ->