diff --git a/apps/emqx/include/emqx_cm.hrl b/apps/emqx/include/emqx_cm.hrl index d1d195921..a84a06688 100644 --- a/apps/emqx/include/emqx_cm.hrl +++ b/apps/emqx/include/emqx_cm.hrl @@ -23,7 +23,7 @@ -define(CHAN_INFO_TAB, emqx_channel_info). -define(CHAN_LIVE_TAB, emqx_channel_live). -%% Mria table for session registraition. +%% Mria table for session registration. -define(CHAN_REG_TAB, emqx_channel_registry). -define(T_KICK, 5_000). diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 4556bce0e..683afcb86 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -25,7 +25,9 @@ -export([ register_channel/1, - unregister_channel/1 + register_channel2/1, + unregister_channel/1, + unregister_channel2/1 ]). -export([lookup_channels/1]). @@ -80,14 +82,21 @@ is_hist_enabled() -> register_channel(ClientId) when is_binary(ClientId) -> register_channel({ClientId, self()}); register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> + IsHistEnabled = is_hist_enabled(), case is_enabled() of + true when IsHistEnabled -> + mria:async_dirty(?CM_SHARD, fun ?MODULE:register_channel2/1, [record(ClientId, ChanPid)]); true -> - ok = when_hist_enabled(fun() -> delete_hist_d(ClientId) end), mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. +%% @private +register_channel2(#channel{chid = ClientId} = Record) -> + _ = delete_hist_d(ClientId), + mria:dirty_write(?CHAN_REG_TAB, Record). + %% @doc Unregister a global channel. -spec unregister_channel( emqx_types:clientid() @@ -96,15 +105,23 @@ register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel({ClientId, self()}); unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> + IsHistEnabled = is_hist_enabled(), case is_enabled() of + true when IsHistEnabled -> + mria:async_dirty(?CM_SHARD, fun ?MODULE:unregister_channel2/1, [ + record(ClientId, ChanPid) + ]); true -> - mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)), - %% insert unregistration history after unregstration - ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end); + mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)); false -> ok end. +%% @private +unregister_channel2(#channel{chid = ClientId} = Record) -> + mria:dirty_delete_object(?CHAN_REG_TAB, Record), + ok = insert_hist_d(ClientId). + %% @doc Lookup the global channels. -spec lookup_channels(emqx_types:clientid()) -> list(pid()). lookup_channels(ClientId) -> @@ -205,24 +222,23 @@ do_cleanup_channels(Node) -> _Return = ['$_'] } ], - lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)). + IsHistEnabled = is_hist_enabled(), + lists:foreach( + fun(Chan) -> delete_channel(IsHistEnabled, Chan) end, + mnesia:select(?CHAN_REG_TAB, Pat, write) + ). -delete_channel(Chan) -> +delete_channel(IsHistEnabled, Chan) -> mnesia:delete_object(?CHAN_REG_TAB, Chan, write), - ok = when_hist_enabled(fun() -> insert_hist_t(Chan#channel.chid) end). + case IsHistEnabled of + true -> + insert_hist_t(Chan#channel.chid); + false -> + ok + end. %%-------------------------------------------------------------------- %% History entry operations -%%-------------------------------------------------------------------- - -when_hist_enabled(F) -> - case is_hist_enabled() of - true -> - _ = F(); - false -> - ok - end, - ok. %% Insert unregistration history in a transaction when unregistering the last channel for a clientid. insert_hist_t(ClientId) -> diff --git a/apps/emqx/src/emqx_cm_registry_keeper.erl b/apps/emqx/src/emqx_cm_registry_keeper.erl index 1087932df..e96fcdd7d 100644 --- a/apps/emqx/src/emqx_cm_registry_keeper.erl +++ b/apps/emqx/src/emqx_cm_registry_keeper.erl @@ -113,7 +113,7 @@ handle_info(start, #{next_clientid := NextClientId} = State) -> end, {noreply, State#{next_clientid := NewNext}}; false -> - %% if not enabled, dealy and check again + %% if not enabled, delay and check again %% because it might be enabled from online config change while waiting ok = send_delay_start(), {noreply, State} @@ -142,7 +142,7 @@ cleanup_loop('$end_of_table', _Count, _IsExpired) -> cleanup_loop(undefined, Count, IsExpired) -> cleanup_loop(mnesia:dirty_first(?CHAN_REG_TAB), Count, IsExpired); cleanup_loop(ClientId, Count, IsExpired) -> - Recods = mnesia:dirty_read(?CHAN_REG_TAB, ClientId), + Records = mnesia:dirty_read(?CHAN_REG_TAB, ClientId), Next = mnesia:dirty_next(?CHAN_REG_TAB, ClientId), lists:foreach( fun(R) -> @@ -153,7 +153,7 @@ cleanup_loop(ClientId, Count, IsExpired) -> ok end end, - Recods + Records ), cleanup_loop(Next, Count - 1, IsExpired). diff --git a/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl b/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl index 3dcded1c3..f3899fb3a 100644 --- a/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl +++ b/apps/emqx/test/emqx_cm_registry_keeper_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 935c690fe..8965f4633 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -401,7 +401,7 @@ schema("/sessions_count") -> required => false, default => 0, desc => - <<"Include sessions expired after this time (UNIX Epoch in seconds precesion)">>, + <<"Include sessions expired after this time (UNIX Epoch in seconds precision)">>, example => 1705391625 })} ], @@ -1087,6 +1087,6 @@ client_example() -> }. sessions_count(get, #{query_string := QString}) -> - Since = maps:get(<<"since">>, QString, undefined), + Since = maps:get(<<"since">>, QString, 0), Count = emqx_cm_registry_keeper:count(Since), {200, integer_to_binary(Count)}. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index 4c9f1b83e..a53104c0c 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1422,7 +1422,7 @@ force_shutdown_enable.label: broker_enable_session_registry.desc: """The Global Session Registry is a cluster-wide mechanism designed to maintain the uniqueness of client IDs within the cluster. Recommendations for Use
-- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconneted to another node in the cluster, the new connection will need to find the old session and take it over. +- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconnected to another node in the cluster, the new connection will need to find the old session and take it over. - Disabling the Feature: Disabling is an option for scenarios when all sessions expire immediately after client is disconnected (i.e. session expiry interval is zero). This can be relevant in certain specialized use cases. Advantages of Disabling
@@ -1433,7 +1433,7 @@ broker_session_history_retain.desc: """The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance. This retained history can be used to monitor how many sessions were registered in the past configured duration. Note: This config has no effect if `enable_session_registry` is set to `false`.
-Note: If the clients are suing random client IDs, it's not recommended to enable this feature, at least not for a long retain duration.
+Note: If the clients are using random client IDs, it's not recommended to enable this feature, at least not for a long retention period.
Note: When clustered, the lowest (but greater than `0s`) value among the nodes in the cluster will take effect.""" overload_protection_backoff_delay.desc: