From d0e6f22a79803c28e3e316356a00e36ad252064b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 12 Jun 2024 11:19:53 -0300 Subject: [PATCH] feat: support purging durable sessions during cluster purge Fixes https://emqx.atlassian.net/browse/EMQX-12405 --- .../src/emqx_eviction_agent.app.src | 2 +- .../src/emqx_eviction_agent.erl | 37 ++++++++- .../src/emqx_node_rebalance.app.src | 2 +- .../src/emqx_node_rebalance_purge.erl | 45 ++++++++++- .../test/emqx_node_rebalance_purge_SUITE.erl | 77 ++++++++++++++++--- 5 files changed, 144 insertions(+), 19 deletions(-) diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src index 7e692bf9c..b9aa14c61 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.app.src @@ -1,6 +1,6 @@ {application, emqx_eviction_agent, [ {description, "EMQX Eviction Agent"}, - {vsn, "5.1.6"}, + {vsn, "5.1.7"}, {registered, [ emqx_eviction_agent_sup, emqx_eviction_agent, diff --git a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl index 5c02da6fc..56126394b 100644 --- a/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl +++ b/apps/emqx_eviction_agent/src/emqx_eviction_agent.erl @@ -24,10 +24,12 @@ all_channels_count/0, session_count/0, session_count/1, + durable_session_count/0, evict_connections/1, evict_sessions/2, evict_sessions/3, - purge_sessions/1 + purge_sessions/1, + purge_durable_sessions/1 ]). %% RPC targets @@ -153,6 +155,18 @@ purge_sessions(N) -> {error, disabled} end. +-spec purge_durable_sessions(non_neg_integer()) -> ok | done | {error, disabled}. +purge_durable_sessions(N) -> + PersistenceEnabled = emqx_persistent_message:is_persistence_enabled(), + case enable_status() of + {enabled, _Kind, _ServerReference, _Options} when PersistenceEnabled -> + do_purge_durable_sessions(N); + {enabled, _Kind, _ServerReference, _Options} -> + done; + disabled -> + {error, disabled} + end. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -301,7 +315,10 @@ all_local_channels_count() -> table_count(channel_table(any)). session_count() -> - session_count(any). + session_count(any) + durable_session_count(). + +durable_session_count() -> + emqx_persistent_session_bookkeeper:get_disconnected_session_count(). session_count(ConnState) -> table_count(channel_table(ConnState)). @@ -455,5 +472,21 @@ do_purge_sessions(N) when N > 0 -> Channels ). +do_purge_durable_sessions(N) when N > 0 -> + Iterator = emqx_persistent_session_ds_state:make_session_iterator(), + {Sessions, _NewIterator} = emqx_persistent_session_ds_state:session_iterator_next(Iterator, N), + lists:foreach( + fun({ClientId, _Metadata}) -> + emqx_persistent_session_ds:destroy_session(ClientId) + end, + Sessions + ), + case Sessions of + [] -> + done; + _ -> + ok + end. + select_random(List) when length(List) > 0 -> lists:nth(rand:uniform(length(List)), List). diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src index e8967c556..c6cfae12b 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance.app.src @@ -1,6 +1,6 @@ {application, emqx_node_rebalance, [ {description, "EMQX Node Rebalance"}, - {vsn, "5.0.8"}, + {vsn, "5.0.9"}, {registered, [ emqx_node_rebalance_sup, emqx_node_rebalance, diff --git a/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl b/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl index a0952cdfe..0557d788e 100644 --- a/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl +++ b/apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl @@ -131,7 +131,9 @@ handle_event( } = Data ) -> case emqx_eviction_agent:all_channels_count() of - Sessions when Sessions > 0 -> + InMemSessions when InMemSessions > 0 -> + DSCount = emqx_eviction_agent:durable_session_count(), + Sessions = InMemSessions + DSCount, ok = purge_sessions(PurgeRate), ?tp( warning, @@ -145,11 +147,41 @@ handle_event( {keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]}; _Sessions = 0 -> NewData = Data#{current_conns => 0}, + {keep_state, NewData, [ + {state_timeout, 0, purge_ds} + ]} + end; +%% durable session purge +handle_event( + state_timeout, + purge_ds, + ?purging, + #{ + purge_rate := PurgeRate + } = Data +) -> + case purge_durable_sessions(PurgeRate) of + ok -> + %% Count is updated asynchronously; better rely on deletion results to known + %% when to stop. + Sessions = emqx_eviction_agent:durable_session_count(), + ?tp( + warning, + "cluster_purge_evict_sessions", + #{ + count => Sessions, + purge_rate => PurgeRate + } + ), + NewData = Data#{current_sessions => Sessions}, + {keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge_ds}]}; + done -> ?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}), - {next_state, ?cleaning_data, NewData, [ + {next_state, ?cleaning_data, Data, [ {state_timeout, 0, clean_retained_messages} ]} end; +%% retained message purge handle_event( state_timeout, clean_retained_messages, @@ -195,7 +227,11 @@ init_data(Data0, Opts) -> deinit(Data) -> Keys = - [initial_sessions, current_sessions | maps:keys(default_opts())], + [ + initial_sessions, + current_sessions + | maps:keys(default_opts()) + ], maps:without(Keys, Data). multicall(Nodes, F, A) -> @@ -231,3 +267,6 @@ purge_sessions(PurgeRate) -> Nodes = emqx:running_nodes(), _ = multicall(Nodes, purge_sessions, [PurgeRate]), ok. + +purge_durable_sessions(PurgeRate) -> + emqx_eviction_agent:purge_durable_sessions(PurgeRate). diff --git a/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl b/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl index 0daeac106..b98e579d1 100644 --- a/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl +++ b/apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl @@ -24,13 +24,23 @@ ] ). +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + all() -> - [{group, one_node}, {group, two_nodes}]. + [ + {group, durability_enabled}, + {group, one_node}, + {group, two_nodes} + ]. groups() -> - [ + Groups = [ {one_node, [], one_node_cases()}, {two_nodes, [], two_nodes_cases()} + ], + [ + {durability_enabled, [], Groups} + | Groups ]. two_nodes_cases() -> @@ -43,19 +53,17 @@ one_node_cases() -> emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases(). init_per_suite(Config) -> - Apps = emqx_cth_suite:start([emqx], #{ - work_dir => ?config(priv_dir, Config) - }), - [{apps, Apps} | Config]. + Config. -end_per_suite(Config) -> - ok = emqx_cth_suite:stop(?config(apps, Config)), +end_per_suite(_Config) -> ok. init_per_group(one_node, Config) -> [{cluster_type, one_node} | Config]; init_per_group(two_nodes, Config) -> - [{cluster_type, two_nodes} | Config]. + [{cluster_type, two_nodes} | Config]; +init_per_group(durability_enabled, Config) -> + [{durability, enabled} | Config]. end_per_group(_Group, _Config) -> ok. @@ -77,7 +85,7 @@ init_per_testcase(TestCase, Config) -> role => core, join_to => emqx_cth_cluster:node_name(Node1), listeners => true, - apps => app_specs() + apps => app_specs(Config) }, Cluster = [{Node, Spec} || Node <- Nodes], ClusterNodes = emqx_cth_cluster:start( @@ -98,12 +106,25 @@ end_per_testcase(_TestCase, Config) -> %% Helpers %%-------------------------------------------------------------------- -app_specs() -> +app_specs(CTConfig) -> + DSConfig = + case proplists:get_value(durability, CTConfig, disabled) of + enabled -> + #{ + durable_sessions => + #{ + enable => true + } + }; + disabled -> + #{} + end, [ {emqx, #{ before_start => fun() -> emqx_app:set_config_loader(?MODULE) end, + config => DSConfig, override_env => [{boot_modules, [broker, listeners]}] }}, {emqx_retainer, #{ @@ -320,6 +341,12 @@ t_session_purged(Config) -> NumClientsNode2 = 35, Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1), Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21), + AllClientPids = Node1Clients ++ Node2Clients, + AllClientIds = + lists:map( + fun(ClientPid) -> proplists:get_value(clientid, emqtt:info(ClientPid)) end, + AllClientPids + ), lists:foreach( fun(C) -> ClientId = proplists:get_value(clientid, emqtt:info(C)), @@ -354,6 +381,32 @@ t_session_purged(Config) -> ?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])), ?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])), - ok = drain_exits(Node1Clients ++ Node2Clients), + ok = drain_exits(AllClientPids), + + FormatFun = undefined, + ?assertEqual( + [], + ?ON( + Node1, + lists:flatmap( + fun(ClientId) -> + emqx_mgmt:lookup_client({clientid, ClientId}, FormatFun) + end, + AllClientIds + ) + ) + ), + ?assertEqual( + [], + ?ON( + Node2, + lists:flatmap( + fun(ClientId) -> + emqx_mgmt:lookup_client({clientid, ClientId}, FormatFun) + end, + AllClientIds + ) + ) + ), ok.