diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 10cd3d6cc..c33aacf30 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -124,7 +124,8 @@ {?CHAN_TAB, 'channels.count', 'channels.max'}, {?CHAN_TAB, 'sessions.count', 'sessions.max'}, {?CHAN_CONN_TAB, 'connections.count', 'connections.max'}, - {?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'} + {?CHAN_LIVE_TAB, 'live_connections.count', 'live_connections.max'}, + {?CHAN_REG_TAB, 'cluster_sessions.count', 'cluster_sessions.max'} ]). %% Batch drain diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 1fd140388..4556bce0e 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -99,7 +99,7 @@ unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid case is_enabled() of true -> mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)), - %% insert unregistration history after unrestration + %% insert unregistration history after unregstration ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end); false -> ok diff --git a/apps/emqx/src/emqx_cm_registry_keeper.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl index 8d697732a..f661e203f 100644 --- a/apps/emqx/src/emqx_cm_registry_keeper.erl +++ b/apps/emqx/src/emqx_cm_registry_keeper.erl @@ -172,7 +172,7 @@ cleanup_delay() -> %% prepare for online config change Default; RetainSeconds -> - Min = max(1, timer:seconds(RetainSeconds div 4)), + Min = max(timer:seconds(1), timer:seconds(RetainSeconds) div 4), min(Min, Default) end. @@ -188,5 +188,7 @@ now_ts() -> erlang:system_time(seconds). do_count(Since) -> - Ms = ets:fun2ms(fun(#channel{pid = V}) -> is_pid(V) orelse (is_integer(V) andalso (V >= Since)) end), + Ms = ets:fun2ms(fun(#channel{pid = V}) -> + is_pid(V) orelse (is_integer(V) andalso (V >= Since)) + end), ets:select_count(?CHAN_REG_TAB, Ms). diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 9685823ff..9b5e2a826 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -99,7 +99,11 @@ [ 'sessions.count', %% Maximum Number of Concurrent Sessions - 'sessions.max' + 'sessions.max', + %% Count of Sessions in the cluster + 'cluster_sessions.count', + %% Maximum Number of Sessions in the cluster + 'cluster_sessions.max' ] ). @@ -164,6 +168,8 @@ names() -> emqx_connections_max, emqx_live_connections_count, emqx_live_connections_max, + emqx_cluster_sessions_count, + emqx_cluster_sessions_max, emqx_sessions_count, emqx_sessions_max, emqx_channels_count, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl index 4891b5293..c8f92de0d 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor.erl @@ -415,6 +415,7 @@ getstats(Key) -> stats(connections) -> emqx_stats:getstat('connections.count'); stats(live_connections) -> emqx_stats:getstat('live_connections.count'); +stats(cluster_sessions) -> emqx_stats:getstat('cluster_sessions.count'); stats(topics) -> emqx_stats:getstat('topics.count'); stats(subscriptions) -> emqx_stats:getstat('subscriptions.count'); stats(received) -> emqx_metrics:val('messages.received'); diff --git a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl index d7e3c094c..c36c6d0f3 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl @@ -194,6 +194,12 @@ swagger_desc(live_connections) -> "Connections at the time of sampling." " Can only represent the approximate state" >>; +swagger_desc(cluster_sessions) -> + << + "Total number of sessions in the cluster at the time of sampling. " + "It includes expired sessions when `broker.session_history_retain` is set to a duration greater than `0s`. " + "Can only represent the approximate state" + >>; swagger_desc(received_msg_rate) -> swagger_desc_format("Dropped messages ", per); %swagger_desc(received_bytes_rate) -> swagger_desc_format("Received bytes ", per); diff --git a/apps/emqx_management/src/emqx_mgmt.erl b/apps/emqx_management/src/emqx_mgmt.erl index 9d4ad8521..67405af05 100644 --- a/apps/emqx_management/src/emqx_mgmt.erl +++ b/apps/emqx_management/src/emqx_mgmt.erl @@ -145,6 +145,7 @@ node_info() -> ), connections => ets:info(?CHAN_TAB, size), live_connections => ets:info(?CHAN_LIVE_TAB, size), + cluster_sessions => ets:info(?CHAN_REG_TAB, size), node_status => 'running', uptime => proplists:get_value(uptime, BrokerInfo), version => iolist_to_binary(proplists:get_value(version, BrokerInfo)), diff --git a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl index 9afb74f38..07d775f6e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_nodes.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_nodes.erl @@ -160,6 +160,19 @@ fields(node_info) -> non_neg_integer(), #{desc => <<"Number of clients currently connected to this node">>, example => 0} )}, + {cluster_sessions, + mk( + non_neg_integer(), + #{ + desc => + << + "By default, it includes only those sessions that have not expired. " + "If the `broker.session_history_retain` config is set to a duration greater than `0s`, " + "this count will also include sessions that expired within the specified retain time" + >>, + example => 0 + } + )}, {load1, mk( float(), diff --git a/apps/emqx_management/src/emqx_mgmt_api_stats.erl b/apps/emqx_management/src/emqx_mgmt_api_stats.erl index b57565671..cddc2a7c3 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_stats.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_stats.erl @@ -89,6 +89,10 @@ fields(node_stats_data) -> stats_schema('delayed.max', <<"Historical maximum number of delayed messages">>), stats_schema('live_connections.count', <<"Number of current live connections">>), stats_schema('live_connections.max', <<"Historical maximum number of live connections">>), + stats_schema('cluster_sessions.count', <<"Number of sessions in the cluster">>), + stats_schema( + 'cluster_sessions.max', <<"Historical maximum number of sessions in the cluster">> + ), stats_schema('retained.count', <<"Number of currently retained messages">>), stats_schema('retained.max', <<"Historical maximum number of retained messages">>), stats_schema('sessions.count', <<"Number of current sessions">>), diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 59241bd02..2942ac485 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -251,7 +251,7 @@ add_collect_family(Name, Data, Callback, Type) -> %% behaviour fetch_from_local_node(Mode) -> - {node(self()), #{ + {node(), #{ stats_data => stats_data(Mode), vm_data => vm_data(Mode), cluster_data => cluster_data(Mode), @@ -308,6 +308,8 @@ emqx_collect(K = emqx_sessions_count, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_sessions_max, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_channels_count, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_channels_max, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_cluster_sessions_count, D) -> gauge_metrics(?MG(K, D)); +emqx_collect(K = emqx_cluster_sessions_max, D) -> gauge_metrics(?MG(K, D)); %% pub/sub stats emqx_collect(K = emqx_topics_count, D) -> gauge_metrics(?MG(K, D)); emqx_collect(K = emqx_topics_max, D) -> gauge_metrics(?MG(K, D)); @@ -500,6 +502,8 @@ stats_metric_meta() -> {emqx_sessions_max, gauge, 'sessions.max'}, {emqx_channels_count, gauge, 'channels.count'}, {emqx_channels_max, gauge, 'channels.max'}, + {emqx_cluster_sessions_count, gauge, 'cluster_sessions.count'}, + {emqx_cluster_sessions_max, gauge, 'cluster_sessions.max'}, %% pub/sub stats {emqx_suboptions_count, gauge, 'suboptions.count'}, {emqx_suboptions_max, gauge, 'suboptions.max'}, diff --git a/apps/emqx_telemetry/src/emqx_telemetry.app.src b/apps/emqx_telemetry/src/emqx_telemetry.app.src index 32c2baa91..b4b0ebcfe 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry.app.src +++ b/apps/emqx_telemetry/src/emqx_telemetry.app.src @@ -1,6 +1,6 @@ {application, emqx_telemetry, [ {description, "Report telemetry data for EMQX Opensource edition"}, - {vsn, "0.1.3"}, + {vsn, "0.2.0"}, {registered, [emqx_telemetry_sup, emqx_telemetry]}, {mod, {emqx_telemetry_app, []}}, {applications, [ diff --git a/apps/emqx_telemetry/src/emqx_telemetry.erl b/apps/emqx_telemetry/src/emqx_telemetry.erl index 8842d7a86..2e54549b8 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry.erl +++ b/apps/emqx_telemetry/src/emqx_telemetry.erl @@ -303,6 +303,9 @@ active_plugins() -> num_clients() -> emqx_stats:getstat('live_connections.count'). +num_cluster_sessions() -> + emqx_stats:getstat('cluster_sessions.count'). + messages_sent() -> emqx_metrics:val('messages.sent'). @@ -348,6 +351,7 @@ get_telemetry(State0 = #state{node_uuid = NodeUUID, cluster_uuid = ClusterUUID}) {nodes_uuid, nodes_uuid()}, {active_plugins, active_plugins()}, {num_clients, num_clients()}, + {num_cluster_sessions, num_cluster_sessions()}, {messages_received, messages_received()}, {messages_sent, messages_sent()}, {build_info, build_info()}, diff --git a/rel/i18n/emqx_mgmt_api_clients.hocon b/rel/i18n/emqx_mgmt_api_clients.hocon index 1e9193df6..2431c09ec 100644 --- a/rel/i18n/emqx_mgmt_api_clients.hocon +++ b/rel/i18n/emqx_mgmt_api_clients.hocon @@ -61,9 +61,12 @@ set_keepalive_seconds.label: """Set the online client keepalive by seconds""" get_sessions_count.desc: -"""Get the number of sessions. By default it returns the number of non-expired sessions. -if `broker.session_history_retain` is set to a duration greater than `0s`, -this API can also count expired sessions by providing the `since` parameter.""" +"""Get the total number of sessions in the cluster. +By default, it includes only those sessions that have not expired. +If the `broker.session_history_retain` config is set to a duration greater than 0s, +this count will also include sessions that expired within the specified retain time. +By specifying the `since` parameter, it can return the number of sessions that have expired within the specified time.""" + get_sessions_count.label: """Count number of sessions"""