feat: support purging durable sessions during cluster purge

Fixes https://emqx.atlassian.net/browse/EMQX-12405
This commit is contained in:
Thales Macedo Garitezi 2024-06-12 11:19:53 -03:00
parent fe303231cf
commit d0e6f22a79
5 changed files with 144 additions and 19 deletions

View File

@ -1,6 +1,6 @@
{application, emqx_eviction_agent, [ {application, emqx_eviction_agent, [
{description, "EMQX Eviction Agent"}, {description, "EMQX Eviction Agent"},
{vsn, "5.1.6"}, {vsn, "5.1.7"},
{registered, [ {registered, [
emqx_eviction_agent_sup, emqx_eviction_agent_sup,
emqx_eviction_agent, emqx_eviction_agent,

View File

@ -24,10 +24,12 @@
all_channels_count/0, all_channels_count/0,
session_count/0, session_count/0,
session_count/1, session_count/1,
durable_session_count/0,
evict_connections/1, evict_connections/1,
evict_sessions/2, evict_sessions/2,
evict_sessions/3, evict_sessions/3,
purge_sessions/1 purge_sessions/1,
purge_durable_sessions/1
]). ]).
%% RPC targets %% RPC targets
@ -153,6 +155,18 @@ purge_sessions(N) ->
{error, disabled} {error, disabled}
end. end.
-spec purge_durable_sessions(non_neg_integer()) -> ok | done | {error, disabled}.
purge_durable_sessions(N) ->
PersistenceEnabled = emqx_persistent_message:is_persistence_enabled(),
case enable_status() of
{enabled, _Kind, _ServerReference, _Options} when PersistenceEnabled ->
do_purge_durable_sessions(N);
{enabled, _Kind, _ServerReference, _Options} ->
done;
disabled ->
{error, disabled}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -301,7 +315,10 @@ all_local_channels_count() ->
table_count(channel_table(any)). table_count(channel_table(any)).
session_count() -> session_count() ->
session_count(any). session_count(any) + durable_session_count().
durable_session_count() ->
emqx_persistent_session_bookkeeper:get_disconnected_session_count().
session_count(ConnState) -> session_count(ConnState) ->
table_count(channel_table(ConnState)). table_count(channel_table(ConnState)).
@ -455,5 +472,21 @@ do_purge_sessions(N) when N > 0 ->
Channels Channels
). ).
do_purge_durable_sessions(N) when N > 0 ->
Iterator = emqx_persistent_session_ds_state:make_session_iterator(),
{Sessions, _NewIterator} = emqx_persistent_session_ds_state:session_iterator_next(Iterator, N),
lists:foreach(
fun({ClientId, _Metadata}) ->
emqx_persistent_session_ds:destroy_session(ClientId)
end,
Sessions
),
case Sessions of
[] ->
done;
_ ->
ok
end.
select_random(List) when length(List) > 0 -> select_random(List) when length(List) > 0 ->
lists:nth(rand:uniform(length(List)), List). lists:nth(rand:uniform(length(List)), List).

View File

@ -1,6 +1,6 @@
{application, emqx_node_rebalance, [ {application, emqx_node_rebalance, [
{description, "EMQX Node Rebalance"}, {description, "EMQX Node Rebalance"},
{vsn, "5.0.8"}, {vsn, "5.0.9"},
{registered, [ {registered, [
emqx_node_rebalance_sup, emqx_node_rebalance_sup,
emqx_node_rebalance, emqx_node_rebalance,

View File

@ -131,7 +131,9 @@ handle_event(
} = Data } = Data
) -> ) ->
case emqx_eviction_agent:all_channels_count() of case emqx_eviction_agent:all_channels_count() of
Sessions when Sessions > 0 -> InMemSessions when InMemSessions > 0 ->
DSCount = emqx_eviction_agent:durable_session_count(),
Sessions = InMemSessions + DSCount,
ok = purge_sessions(PurgeRate), ok = purge_sessions(PurgeRate),
?tp( ?tp(
warning, warning,
@ -145,11 +147,41 @@ handle_event(
{keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]}; {keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]};
_Sessions = 0 -> _Sessions = 0 ->
NewData = Data#{current_conns => 0}, NewData = Data#{current_conns => 0},
{keep_state, NewData, [
{state_timeout, 0, purge_ds}
]}
end;
%% durable session purge
handle_event(
state_timeout,
purge_ds,
?purging,
#{
purge_rate := PurgeRate
} = Data
) ->
case purge_durable_sessions(PurgeRate) of
ok ->
%% Count is updated asynchronously; better rely on deletion results to known
%% when to stop.
Sessions = emqx_eviction_agent:durable_session_count(),
?tp(
warning,
"cluster_purge_evict_sessions",
#{
count => Sessions,
purge_rate => PurgeRate
}
),
NewData = Data#{current_sessions => Sessions},
{keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge_ds}]};
done ->
?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}), ?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}),
{next_state, ?cleaning_data, NewData, [ {next_state, ?cleaning_data, Data, [
{state_timeout, 0, clean_retained_messages} {state_timeout, 0, clean_retained_messages}
]} ]}
end; end;
%% retained message purge
handle_event( handle_event(
state_timeout, state_timeout,
clean_retained_messages, clean_retained_messages,
@ -195,7 +227,11 @@ init_data(Data0, Opts) ->
deinit(Data) -> deinit(Data) ->
Keys = Keys =
[initial_sessions, current_sessions | maps:keys(default_opts())], [
initial_sessions,
current_sessions
| maps:keys(default_opts())
],
maps:without(Keys, Data). maps:without(Keys, Data).
multicall(Nodes, F, A) -> multicall(Nodes, F, A) ->
@ -231,3 +267,6 @@ purge_sessions(PurgeRate) ->
Nodes = emqx:running_nodes(), Nodes = emqx:running_nodes(),
_ = multicall(Nodes, purge_sessions, [PurgeRate]), _ = multicall(Nodes, purge_sessions, [PurgeRate]),
ok. ok.
purge_durable_sessions(PurgeRate) ->
emqx_eviction_agent:purge_durable_sessions(PurgeRate).

View File

@ -24,13 +24,23 @@
] ]
). ).
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
all() -> all() ->
[{group, one_node}, {group, two_nodes}]. [
{group, durability_enabled},
{group, one_node},
{group, two_nodes}
].
groups() -> groups() ->
[ Groups = [
{one_node, [], one_node_cases()}, {one_node, [], one_node_cases()},
{two_nodes, [], two_nodes_cases()} {two_nodes, [], two_nodes_cases()}
],
[
{durability_enabled, [], Groups}
| Groups
]. ].
two_nodes_cases() -> two_nodes_cases() ->
@ -43,19 +53,17 @@ one_node_cases() ->
emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases(). emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases().
init_per_suite(Config) -> init_per_suite(Config) ->
Apps = emqx_cth_suite:start([emqx], #{ Config.
work_dir => ?config(priv_dir, Config)
}),
[{apps, Apps} | Config].
end_per_suite(Config) -> end_per_suite(_Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok. ok.
init_per_group(one_node, Config) -> init_per_group(one_node, Config) ->
[{cluster_type, one_node} | Config]; [{cluster_type, one_node} | Config];
init_per_group(two_nodes, Config) -> init_per_group(two_nodes, Config) ->
[{cluster_type, two_nodes} | Config]. [{cluster_type, two_nodes} | Config];
init_per_group(durability_enabled, Config) ->
[{durability, enabled} | Config].
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
@ -77,7 +85,7 @@ init_per_testcase(TestCase, Config) ->
role => core, role => core,
join_to => emqx_cth_cluster:node_name(Node1), join_to => emqx_cth_cluster:node_name(Node1),
listeners => true, listeners => true,
apps => app_specs() apps => app_specs(Config)
}, },
Cluster = [{Node, Spec} || Node <- Nodes], Cluster = [{Node, Spec} || Node <- Nodes],
ClusterNodes = emqx_cth_cluster:start( ClusterNodes = emqx_cth_cluster:start(
@ -98,12 +106,25 @@ end_per_testcase(_TestCase, Config) ->
%% Helpers %% Helpers
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
app_specs() -> app_specs(CTConfig) ->
DSConfig =
case proplists:get_value(durability, CTConfig, disabled) of
enabled ->
#{
durable_sessions =>
#{
enable => true
}
};
disabled ->
#{}
end,
[ [
{emqx, #{ {emqx, #{
before_start => fun() -> before_start => fun() ->
emqx_app:set_config_loader(?MODULE) emqx_app:set_config_loader(?MODULE)
end, end,
config => DSConfig,
override_env => [{boot_modules, [broker, listeners]}] override_env => [{boot_modules, [broker, listeners]}]
}}, }},
{emqx_retainer, #{ {emqx_retainer, #{
@ -320,6 +341,12 @@ t_session_purged(Config) ->
NumClientsNode2 = 35, NumClientsNode2 = 35,
Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1), Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1),
Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21), Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21),
AllClientPids = Node1Clients ++ Node2Clients,
AllClientIds =
lists:map(
fun(ClientPid) -> proplists:get_value(clientid, emqtt:info(ClientPid)) end,
AllClientPids
),
lists:foreach( lists:foreach(
fun(C) -> fun(C) ->
ClientId = proplists:get_value(clientid, emqtt:info(C)), ClientId = proplists:get_value(clientid, emqtt:info(C)),
@ -354,6 +381,32 @@ t_session_purged(Config) ->
?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])), ?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])),
?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])), ?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])),
ok = drain_exits(Node1Clients ++ Node2Clients), ok = drain_exits(AllClientPids),
FormatFun = undefined,
?assertEqual(
[],
?ON(
Node1,
lists:flatmap(
fun(ClientId) ->
emqx_mgmt:lookup_client({clientid, ClientId}, FormatFun)
end,
AllClientIds
)
)
),
?assertEqual(
[],
?ON(
Node2,
lists:flatmap(
fun(ClientId) ->
emqx_mgmt:lookup_client({clientid, ClientId}, FormatFun)
end,
AllClientIds
)
)
),
ok. ok.