diff --git a/apps/emqx/src/emqx_persistent_session_bookkeeper.erl b/apps/emqx/src/emqx_persistent_session_bookkeeper.erl index bed63224d..7aa2d7a72 100644 --- a/apps/emqx/src/emqx_persistent_session_bookkeeper.erl +++ b/apps/emqx/src/emqx_persistent_session_bookkeeper.erl @@ -21,7 +21,8 @@ %% API -export([ start_link/0, - get_subscription_count/0 + get_subscription_count/0, + get_disconnected_session_count/0 ]). %% `gen_server' API @@ -39,7 +40,9 @@ %% call/cast/info events -record(tally_subs, {}). +-record(tally_disconnected_sessions, {}). -record(get_subscription_count, {}). +-record(get_disconnected_session_count, {}). %%------------------------------------------------------------------------------ %% API @@ -59,6 +62,16 @@ get_subscription_count() -> 0 end. +%% @doc Gets a cached view of the cluster-global count of disconnected persistent sessions. +-spec get_disconnected_session_count() -> non_neg_integer(). +get_disconnected_session_count() -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + gen_server:call(?MODULE, #get_disconnected_session_count{}, infinity); + false -> + 0 + end. + %%------------------------------------------------------------------------------ %% `gen_server' API %%------------------------------------------------------------------------------ @@ -66,7 +79,10 @@ get_subscription_count() -> init(_Opts) -> case emqx_persistent_message:is_persistence_enabled() of true -> - State = #{subs_count => 0}, + State = #{ + subs_count => 0, + disconnected_session_count => 0 + }, {ok, State, {continue, #tally_subs{}}}; false -> ignore @@ -75,11 +91,18 @@ init(_Opts) -> handle_continue(#tally_subs{}, State0) -> State = tally_persistent_subscriptions(State0), ensure_subs_tally_timer(), + {noreply, State, {continue, #tally_disconnected_sessions{}}}; +handle_continue(#tally_disconnected_sessions{}, State0) -> + State = tally_disconnected_persistent_sessions(State0), + ensure_disconnected_sessions_tally_timer(), {noreply, State}. handle_call(#get_subscription_count{}, _From, State) -> #{subs_count := N} = State, {reply, N, State}; +handle_call(#get_disconnected_session_count{}, _From, State) -> + #{disconnected_session_count := N} = State, + {reply, N, State}; handle_call(_Call, _From, State) -> {reply, {error, bad_call}, State}. @@ -90,6 +113,10 @@ handle_info(#tally_subs{}, State0) -> State = tally_persistent_subscriptions(State0), ensure_subs_tally_timer(), {noreply, State}; +handle_info(#tally_disconnected_sessions{}, State0) -> + State = tally_disconnected_persistent_sessions(State0), + ensure_disconnected_sessions_tally_timer(), + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -101,7 +128,38 @@ tally_persistent_subscriptions(State0) -> N = emqx_persistent_session_ds_state:total_subscription_count(), State0#{subs_count := N}. +tally_disconnected_persistent_sessions(State0) -> + N = do_tally_disconnected_persistent_sessions(), + State0#{disconnected_session_count := N}. + ensure_subs_tally_timer() -> Timeout = emqx_config:get([durable_sessions, subscription_count_refresh_interval]), _ = erlang:send_after(Timeout, self(), #tally_subs{}), ok. + +ensure_disconnected_sessions_tally_timer() -> + Timeout = emqx_config:get([durable_sessions, disconnected_session_count_refresh_interval]), + _ = erlang:send_after(Timeout, self(), #tally_disconnected_sessions{}), + ok. + +do_tally_disconnected_persistent_sessions() -> + Iter = emqx_persistent_session_ds_state:make_session_iterator(), + do_tally_disconnected_persistent_sessions(Iter, 0). + +do_tally_disconnected_persistent_sessions('$end_of_table', N) -> + N; +do_tally_disconnected_persistent_sessions(Iter0, N) -> + case emqx_persistent_session_ds_state:session_iterator_next(Iter0, 1) of + {[], _} -> + N; + {[{Id, _Meta}], Iter} -> + case is_live_session(Id) of + true -> + do_tally_disconnected_persistent_sessions(Iter, N); + false -> + do_tally_disconnected_persistent_sessions(Iter, N + 1) + end + end. + +is_live_session(Id) -> + [] =/= emqx_cm_registry:lookup_channels(Id). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index d5e10efc0..c3584d7dd 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1719,6 +1719,14 @@ fields("durable_sessions") -> importance => ?IMPORTANCE_HIDDEN } )}, + {"disconnected_session_count_refresh_interval", + sc( + timeout_duration(), + #{ + default => <<"5s">>, + importance => ?IMPORTANCE_HIDDEN + } + )}, {"message_retention_period", sc( timeout_duration(), diff --git a/apps/emqx/test/emqx_config_SUITE.erl b/apps/emqx/test/emqx_config_SUITE.erl index e7d8629b2..d32b88b07 100644 --- a/apps/emqx/test/emqx_config_SUITE.erl +++ b/apps/emqx/test/emqx_config_SUITE.erl @@ -476,6 +476,7 @@ zone_global_defaults() -> renew_streams_interval => 5000, session_gc_batch_size => 100, session_gc_interval => 600000, - subscription_count_refresh_interval => 5000 + subscription_count_refresh_interval => 5000, + disconnected_session_count_refresh_interval => 5000 } }. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index d29deedce..a9ec7c62b 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1383,7 +1383,6 @@ do_list_clients_cluster_query( {Rows, QueryState1 = #{complete := Complete0}} -> case emqx_mgmt_api:accumulate_query_rows(Node, Rows, QueryState1, ResultAcc) of {enough, NResultAcc} -> - %% TODO: add persistent session count? %% TODO: this may return `{error, _, _}'... QueryState2 = emqx_mgmt_api:maybe_collect_total_from_tail_nodes( Tail, QueryState1 @@ -1427,8 +1426,9 @@ add_persistent_session_count(QueryState0 = #{total := Totals0}) -> %% to traverse the whole table), but also hard to deduplicate live connections %% from it... So this count will possibly overshoot the true count of %% sessions. - SessionCount = persistent_session_count(), - Totals = Totals0#{undefined => SessionCount}, + DisconnectedSessionCount = + emqx_persistent_session_bookkeeper:get_disconnected_session_count(), + Totals = Totals0#{undefined => DisconnectedSessionCount}, QueryState0#{total := Totals}; false -> QueryState0 @@ -1476,27 +1476,6 @@ no_persistent_sessions() -> true end. --spec persistent_session_count() -> non_neg_integer(). -persistent_session_count() -> - %% N.B.: this is potentially costly. Should not be called in hot paths. - %% `mnesia:table_info(_, size)' is always zero for rocksdb, so we need to traverse... - do_persistent_session_count(init_persistent_session_iterator(), 0). - -do_persistent_session_count('$end_of_table', N) -> - N; -do_persistent_session_count(Cursor, N) -> - case emqx_persistent_session_ds_state:session_iterator_next(Cursor, 1) of - {[], _} -> - N; - {[{_Id, Meta}], NextCursor} -> - case is_expired(Meta) of - true -> - do_persistent_session_count(NextCursor, N); - false -> - do_persistent_session_count(NextCursor, N + 1) - end - end. - is_expired(#{last_alive_at := LastAliveAt, expiry_interval := ExpiryInterval}) -> LastAliveAt + ExpiryInterval < erlang:system_time(millisecond). diff --git a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl index 23ab101c9..d51085eea 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl @@ -79,7 +79,9 @@ end_per_suite(Config) -> init_per_group(persistent_sessions, Config) -> AppSpecs = [ - {emqx, "durable_sessions.enable = true"}, + {emqx, + "durable_sessions.enable = true\n" + "durable_sessions.disconnected_session_count_refresh_interval = 100ms"}, emqx_management ], Dashboard = emqx_mgmt_api_test_util:emqx_dashboard( @@ -457,9 +459,7 @@ t_persistent_sessions5(Config) -> {{_, 200, _}, _, #{ <<"data">> := [_, _, _], <<"meta">> := #{ - %% TODO: if/when we fix the persistent session count, this - %% should be 4. - <<"count">> := 6, + <<"count">> := 4, <<"hasnext">> := true } }}}, @@ -470,9 +470,7 @@ t_persistent_sessions5(Config) -> {{_, 200, _}, _, #{ <<"data">> := [_], <<"meta">> := #{ - %% TODO: if/when we fix the persistent session count, this - %% should be 4. - <<"count">> := 6, + <<"count">> := 4, <<"hasnext">> := false } }}}, @@ -489,9 +487,7 @@ t_persistent_sessions5(Config) -> {{_, 200, _}, _, #{ <<"data">> := [_, _], <<"meta">> := #{ - %% TODO: if/when we fix the persistent session count, this - %% should be 4. - <<"count">> := 6, + <<"count">> := 4, <<"hasnext">> := true } }}}, @@ -1996,7 +1992,11 @@ assert_single_client(Opts) -> 100, 20, ?assertMatch( - {ok, {{_, 200, _}, _, #{<<"data">> := [#{<<"connected">> := IsConnected}]}}}, + {ok, + {{_, 200, _}, _, #{ + <<"data">> := [#{<<"connected">> := IsConnected}], + <<"meta">> := #{<<"count">> := 1} + }}}, list_request(APIPort) ) ),