refactor: use mria:async_dirty to group dirty ops

This commit is contained in:
Zaiming (Stone) Shi 2024-02-01 20:54:03 +01:00
parent 38047108a4
commit f0569d8ae8
6 changed files with 43 additions and 27 deletions

View File

@ -23,7 +23,7 @@
-define(CHAN_INFO_TAB, emqx_channel_info). -define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_LIVE_TAB, emqx_channel_live). -define(CHAN_LIVE_TAB, emqx_channel_live).
%% Mria table for session registraition. %% Mria table for session registration.
-define(CHAN_REG_TAB, emqx_channel_registry). -define(CHAN_REG_TAB, emqx_channel_registry).
-define(T_KICK, 5_000). -define(T_KICK, 5_000).

View File

@ -25,7 +25,9 @@
-export([ -export([
register_channel/1, register_channel/1,
unregister_channel/1 register_channel2/1,
unregister_channel/1,
unregister_channel2/1
]). ]).
-export([lookup_channels/1]). -export([lookup_channels/1]).
@ -80,14 +82,21 @@ is_hist_enabled() ->
register_channel(ClientId) when is_binary(ClientId) -> register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, self()}); register_channel({ClientId, self()});
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
IsHistEnabled = is_hist_enabled(),
case is_enabled() of case is_enabled() of
true when IsHistEnabled ->
mria:async_dirty(?CM_SHARD, fun ?MODULE:register_channel2/1, [record(ClientId, ChanPid)]);
true -> true ->
ok = when_hist_enabled(fun() -> delete_hist_d(ClientId) end),
mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid)); mria:dirty_write(?CHAN_REG_TAB, record(ClientId, ChanPid));
false -> false ->
ok ok
end. end.
%% @private
register_channel2(#channel{chid = ClientId} = Record) ->
_ = delete_hist_d(ClientId),
mria:dirty_write(?CHAN_REG_TAB, Record).
%% @doc Unregister a global channel. %% @doc Unregister a global channel.
-spec unregister_channel( -spec unregister_channel(
emqx_types:clientid() emqx_types:clientid()
@ -96,15 +105,23 @@ register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid)
unregister_channel(ClientId) when is_binary(ClientId) -> unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, self()}); unregister_channel({ClientId, self()});
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) -> unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
IsHistEnabled = is_hist_enabled(),
case is_enabled() of case is_enabled() of
true when IsHistEnabled ->
mria:async_dirty(?CM_SHARD, fun ?MODULE:unregister_channel2/1, [
record(ClientId, ChanPid)
]);
true -> true ->
mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid)), mria:dirty_delete_object(?CHAN_REG_TAB, record(ClientId, ChanPid));
%% insert unregistration history after unregstration
ok = when_hist_enabled(fun() -> insert_hist_d(ClientId) end);
false -> false ->
ok ok
end. end.
%% @private
unregister_channel2(#channel{chid = ClientId} = Record) ->
mria:dirty_delete_object(?CHAN_REG_TAB, Record),
ok = insert_hist_d(ClientId).
%% @doc Lookup the global channels. %% @doc Lookup the global channels.
-spec lookup_channels(emqx_types:clientid()) -> list(pid()). -spec lookup_channels(emqx_types:clientid()) -> list(pid()).
lookup_channels(ClientId) -> lookup_channels(ClientId) ->
@ -205,24 +222,23 @@ do_cleanup_channels(Node) ->
_Return = ['$_'] _Return = ['$_']
} }
], ],
lists:foreach(fun delete_channel/1, mnesia:select(?CHAN_REG_TAB, Pat, write)). IsHistEnabled = is_hist_enabled(),
lists:foreach(
fun(Chan) -> delete_channel(IsHistEnabled, Chan) end,
mnesia:select(?CHAN_REG_TAB, Pat, write)
).
delete_channel(Chan) -> delete_channel(IsHistEnabled, Chan) ->
mnesia:delete_object(?CHAN_REG_TAB, Chan, write), mnesia:delete_object(?CHAN_REG_TAB, Chan, write),
ok = when_hist_enabled(fun() -> insert_hist_t(Chan#channel.chid) end). case IsHistEnabled of
true ->
insert_hist_t(Chan#channel.chid);
false ->
ok
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% History entry operations %% History entry operations
%%--------------------------------------------------------------------
when_hist_enabled(F) ->
case is_hist_enabled() of
true ->
_ = F();
false ->
ok
end,
ok.
%% Insert unregistration history in a transaction when unregistering the last channel for a clientid. %% Insert unregistration history in a transaction when unregistering the last channel for a clientid.
insert_hist_t(ClientId) -> insert_hist_t(ClientId) ->

View File

@ -113,7 +113,7 @@ handle_info(start, #{next_clientid := NextClientId} = State) ->
end, end,
{noreply, State#{next_clientid := NewNext}}; {noreply, State#{next_clientid := NewNext}};
false -> false ->
%% if not enabled, dealy and check again %% if not enabled, delay and check again
%% because it might be enabled from online config change while waiting %% because it might be enabled from online config change while waiting
ok = send_delay_start(), ok = send_delay_start(),
{noreply, State} {noreply, State}
@ -142,7 +142,7 @@ cleanup_loop('$end_of_table', _Count, _IsExpired) ->
cleanup_loop(undefined, Count, IsExpired) -> cleanup_loop(undefined, Count, IsExpired) ->
cleanup_loop(mnesia:dirty_first(?CHAN_REG_TAB), Count, IsExpired); cleanup_loop(mnesia:dirty_first(?CHAN_REG_TAB), Count, IsExpired);
cleanup_loop(ClientId, Count, IsExpired) -> cleanup_loop(ClientId, Count, IsExpired) ->
Recods = mnesia:dirty_read(?CHAN_REG_TAB, ClientId), Records = mnesia:dirty_read(?CHAN_REG_TAB, ClientId),
Next = mnesia:dirty_next(?CHAN_REG_TAB, ClientId), Next = mnesia:dirty_next(?CHAN_REG_TAB, ClientId),
lists:foreach( lists:foreach(
fun(R) -> fun(R) ->
@ -153,7 +153,7 @@ cleanup_loop(ClientId, Count, IsExpired) ->
ok ok
end end
end, end,
Recods Records
), ),
cleanup_loop(Next, Count - 1, IsExpired). cleanup_loop(Next, Count - 1, IsExpired).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2019-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.

View File

@ -401,7 +401,7 @@ schema("/sessions_count") ->
required => false, required => false,
default => 0, default => 0,
desc => desc =>
<<"Include sessions expired after this time (UNIX Epoch in seconds precesion)">>, <<"Include sessions expired after this time (UNIX Epoch in seconds precision)">>,
example => 1705391625 example => 1705391625
})} })}
], ],
@ -1087,6 +1087,6 @@ client_example() ->
}. }.
sessions_count(get, #{query_string := QString}) -> sessions_count(get, #{query_string := QString}) ->
Since = maps:get(<<"since">>, QString, undefined), Since = maps:get(<<"since">>, QString, 0),
Count = emqx_cm_registry_keeper:count(Since), Count = emqx_cm_registry_keeper:count(Since),
{200, integer_to_binary(Count)}. {200, integer_to_binary(Count)}.

View File

@ -1422,7 +1422,7 @@ force_shutdown_enable.label:
broker_enable_session_registry.desc: broker_enable_session_registry.desc:
"""The Global Session Registry is a cluster-wide mechanism designed to maintain the uniqueness of client IDs within the cluster. """The Global Session Registry is a cluster-wide mechanism designed to maintain the uniqueness of client IDs within the cluster.
Recommendations for Use<br/> Recommendations for Use<br/>
- Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconneted to another node in the cluster, the new connection will need to find the old session and take it over. - Default Setting: It is generally advisable to enable. This feature is crucial for session takeover to work properly. For example if a client reconnected to another node in the cluster, the new connection will need to find the old session and take it over.
- Disabling the Feature: Disabling is an option for scenarios when all sessions expire immediately after client is disconnected (i.e. session expiry interval is zero). This can be relevant in certain specialized use cases. - Disabling the Feature: Disabling is an option for scenarios when all sessions expire immediately after client is disconnected (i.e. session expiry interval is zero). This can be relevant in certain specialized use cases.
Advantages of Disabling<br/> Advantages of Disabling<br/>
@ -1433,7 +1433,7 @@ broker_session_history_retain.desc:
"""The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance. """The duration to retain the session registration history. Setting this to a value greater than `0s` will increase memory usage and impact peformance.
This retained history can be used to monitor how many sessions were registered in the past configured duration. This retained history can be used to monitor how many sessions were registered in the past configured duration.
Note: This config has no effect if `enable_session_registry` is set to `false`.<br/> Note: This config has no effect if `enable_session_registry` is set to `false`.<br/>
Note: If the clients are suing random client IDs, it's not recommended to enable this feature, at least not for a long retain duration.<br/> Note: If the clients are using random client IDs, it's not recommended to enable this feature, at least not for a long retention period.<br/>
Note: When clustered, the lowest (but greater than `0s`) value among the nodes in the cluster will take effect.""" Note: When clustered, the lowest (but greater than `0s`) value among the nodes in the cluster will take effect."""
overload_protection_backoff_delay.desc: overload_protection_backoff_delay.desc: