Merge pull request #13240 from thalesmg/ds-cluster-purge-m-20240612
feat: support purging durable sessions during cluster purge
This commit is contained in:
commit
3adf64e637
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_eviction_agent, [
|
{application, emqx_eviction_agent, [
|
||||||
{description, "EMQX Eviction Agent"},
|
{description, "EMQX Eviction Agent"},
|
||||||
{vsn, "5.1.6"},
|
{vsn, "5.1.7"},
|
||||||
{registered, [
|
{registered, [
|
||||||
emqx_eviction_agent_sup,
|
emqx_eviction_agent_sup,
|
||||||
emqx_eviction_agent,
|
emqx_eviction_agent,
|
||||||
|
|
|
@ -24,10 +24,12 @@
|
||||||
all_channels_count/0,
|
all_channels_count/0,
|
||||||
session_count/0,
|
session_count/0,
|
||||||
session_count/1,
|
session_count/1,
|
||||||
|
durable_session_count/0,
|
||||||
evict_connections/1,
|
evict_connections/1,
|
||||||
evict_sessions/2,
|
evict_sessions/2,
|
||||||
evict_sessions/3,
|
evict_sessions/3,
|
||||||
purge_sessions/1
|
purge_sessions/1,
|
||||||
|
purge_durable_sessions/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% RPC targets
|
%% RPC targets
|
||||||
|
@ -153,6 +155,18 @@ purge_sessions(N) ->
|
||||||
{error, disabled}
|
{error, disabled}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec purge_durable_sessions(non_neg_integer()) -> ok | done | {error, disabled}.
|
||||||
|
purge_durable_sessions(N) ->
|
||||||
|
PersistenceEnabled = emqx_persistent_message:is_persistence_enabled(),
|
||||||
|
case enable_status() of
|
||||||
|
{enabled, _Kind, _ServerReference, _Options} when PersistenceEnabled ->
|
||||||
|
do_purge_durable_sessions(N);
|
||||||
|
{enabled, _Kind, _ServerReference, _Options} ->
|
||||||
|
done;
|
||||||
|
disabled ->
|
||||||
|
{error, disabled}
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -301,7 +315,10 @@ all_local_channels_count() ->
|
||||||
table_count(channel_table(any)).
|
table_count(channel_table(any)).
|
||||||
|
|
||||||
session_count() ->
|
session_count() ->
|
||||||
session_count(any).
|
session_count(any) + durable_session_count().
|
||||||
|
|
||||||
|
durable_session_count() ->
|
||||||
|
emqx_persistent_session_bookkeeper:get_disconnected_session_count().
|
||||||
|
|
||||||
session_count(ConnState) ->
|
session_count(ConnState) ->
|
||||||
table_count(channel_table(ConnState)).
|
table_count(channel_table(ConnState)).
|
||||||
|
@ -455,5 +472,21 @@ do_purge_sessions(N) when N > 0 ->
|
||||||
Channels
|
Channels
|
||||||
).
|
).
|
||||||
|
|
||||||
|
do_purge_durable_sessions(N) when N > 0 ->
|
||||||
|
Iterator = emqx_persistent_session_ds_state:make_session_iterator(),
|
||||||
|
{Sessions, _NewIterator} = emqx_persistent_session_ds_state:session_iterator_next(Iterator, N),
|
||||||
|
lists:foreach(
|
||||||
|
fun({ClientId, _Metadata}) ->
|
||||||
|
emqx_persistent_session_ds:destroy_session(ClientId)
|
||||||
|
end,
|
||||||
|
Sessions
|
||||||
|
),
|
||||||
|
case Sessions of
|
||||||
|
[] ->
|
||||||
|
done;
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
select_random(List) when length(List) > 0 ->
|
select_random(List) when length(List) > 0 ->
|
||||||
lists:nth(rand:uniform(length(List)), List).
|
lists:nth(rand:uniform(length(List)), List).
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_node_rebalance, [
|
{application, emqx_node_rebalance, [
|
||||||
{description, "EMQX Node Rebalance"},
|
{description, "EMQX Node Rebalance"},
|
||||||
{vsn, "5.0.8"},
|
{vsn, "5.0.9"},
|
||||||
{registered, [
|
{registered, [
|
||||||
emqx_node_rebalance_sup,
|
emqx_node_rebalance_sup,
|
||||||
emqx_node_rebalance,
|
emqx_node_rebalance,
|
||||||
|
|
|
@ -131,7 +131,9 @@ handle_event(
|
||||||
} = Data
|
} = Data
|
||||||
) ->
|
) ->
|
||||||
case emqx_eviction_agent:all_channels_count() of
|
case emqx_eviction_agent:all_channels_count() of
|
||||||
Sessions when Sessions > 0 ->
|
InMemSessions when InMemSessions > 0 ->
|
||||||
|
DSCount = emqx_eviction_agent:durable_session_count(),
|
||||||
|
Sessions = InMemSessions + DSCount,
|
||||||
ok = purge_sessions(PurgeRate),
|
ok = purge_sessions(PurgeRate),
|
||||||
?tp(
|
?tp(
|
||||||
warning,
|
warning,
|
||||||
|
@ -145,11 +147,41 @@ handle_event(
|
||||||
{keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]};
|
{keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]};
|
||||||
_Sessions = 0 ->
|
_Sessions = 0 ->
|
||||||
NewData = Data#{current_conns => 0},
|
NewData = Data#{current_conns => 0},
|
||||||
|
{keep_state, NewData, [
|
||||||
|
{state_timeout, 0, purge_ds}
|
||||||
|
]}
|
||||||
|
end;
|
||||||
|
%% durable session purge
|
||||||
|
handle_event(
|
||||||
|
state_timeout,
|
||||||
|
purge_ds,
|
||||||
|
?purging,
|
||||||
|
#{
|
||||||
|
purge_rate := PurgeRate
|
||||||
|
} = Data
|
||||||
|
) ->
|
||||||
|
case purge_durable_sessions(PurgeRate) of
|
||||||
|
ok ->
|
||||||
|
%% Count is updated asynchronously; better rely on deletion results to known
|
||||||
|
%% when to stop.
|
||||||
|
Sessions = emqx_eviction_agent:durable_session_count(),
|
||||||
|
?tp(
|
||||||
|
warning,
|
||||||
|
"cluster_purge_evict_sessions",
|
||||||
|
#{
|
||||||
|
count => Sessions,
|
||||||
|
purge_rate => PurgeRate
|
||||||
|
}
|
||||||
|
),
|
||||||
|
NewData = Data#{current_sessions => Sessions},
|
||||||
|
{keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge_ds}]};
|
||||||
|
done ->
|
||||||
?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}),
|
?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}),
|
||||||
{next_state, ?cleaning_data, NewData, [
|
{next_state, ?cleaning_data, Data, [
|
||||||
{state_timeout, 0, clean_retained_messages}
|
{state_timeout, 0, clean_retained_messages}
|
||||||
]}
|
]}
|
||||||
end;
|
end;
|
||||||
|
%% retained message purge
|
||||||
handle_event(
|
handle_event(
|
||||||
state_timeout,
|
state_timeout,
|
||||||
clean_retained_messages,
|
clean_retained_messages,
|
||||||
|
@ -195,7 +227,11 @@ init_data(Data0, Opts) ->
|
||||||
|
|
||||||
deinit(Data) ->
|
deinit(Data) ->
|
||||||
Keys =
|
Keys =
|
||||||
[initial_sessions, current_sessions | maps:keys(default_opts())],
|
[
|
||||||
|
initial_sessions,
|
||||||
|
current_sessions
|
||||||
|
| maps:keys(default_opts())
|
||||||
|
],
|
||||||
maps:without(Keys, Data).
|
maps:without(Keys, Data).
|
||||||
|
|
||||||
multicall(Nodes, F, A) ->
|
multicall(Nodes, F, A) ->
|
||||||
|
@ -231,3 +267,6 @@ purge_sessions(PurgeRate) ->
|
||||||
Nodes = emqx:running_nodes(),
|
Nodes = emqx:running_nodes(),
|
||||||
_ = multicall(Nodes, purge_sessions, [PurgeRate]),
|
_ = multicall(Nodes, purge_sessions, [PurgeRate]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
purge_durable_sessions(PurgeRate) ->
|
||||||
|
emqx_eviction_agent:purge_durable_sessions(PurgeRate).
|
||||||
|
|
|
@ -24,13 +24,23 @@
|
||||||
]
|
]
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
[{group, one_node}, {group, two_nodes}].
|
[
|
||||||
|
{group, durability_enabled},
|
||||||
|
{group, one_node},
|
||||||
|
{group, two_nodes}
|
||||||
|
].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
[
|
Groups = [
|
||||||
{one_node, [], one_node_cases()},
|
{one_node, [], one_node_cases()},
|
||||||
{two_nodes, [], two_nodes_cases()}
|
{two_nodes, [], two_nodes_cases()}
|
||||||
|
],
|
||||||
|
[
|
||||||
|
{durability_enabled, [], Groups}
|
||||||
|
| Groups
|
||||||
].
|
].
|
||||||
|
|
||||||
two_nodes_cases() ->
|
two_nodes_cases() ->
|
||||||
|
@ -43,19 +53,17 @@ one_node_cases() ->
|
||||||
emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases().
|
emqx_common_test_helpers:all(?MODULE) -- two_nodes_cases().
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
Apps = emqx_cth_suite:start([emqx], #{
|
Config.
|
||||||
work_dir => ?config(priv_dir, Config)
|
|
||||||
}),
|
|
||||||
[{apps, Apps} | Config].
|
|
||||||
|
|
||||||
end_per_suite(Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = emqx_cth_suite:stop(?config(apps, Config)),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_per_group(one_node, Config) ->
|
init_per_group(one_node, Config) ->
|
||||||
[{cluster_type, one_node} | Config];
|
[{cluster_type, one_node} | Config];
|
||||||
init_per_group(two_nodes, Config) ->
|
init_per_group(two_nodes, Config) ->
|
||||||
[{cluster_type, two_nodes} | Config].
|
[{cluster_type, two_nodes} | Config];
|
||||||
|
init_per_group(durability_enabled, Config) ->
|
||||||
|
[{durability, enabled} | Config].
|
||||||
|
|
||||||
end_per_group(_Group, _Config) ->
|
end_per_group(_Group, _Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
@ -77,7 +85,7 @@ init_per_testcase(TestCase, Config) ->
|
||||||
role => core,
|
role => core,
|
||||||
join_to => emqx_cth_cluster:node_name(Node1),
|
join_to => emqx_cth_cluster:node_name(Node1),
|
||||||
listeners => true,
|
listeners => true,
|
||||||
apps => app_specs()
|
apps => app_specs(Config)
|
||||||
},
|
},
|
||||||
Cluster = [{Node, Spec} || Node <- Nodes],
|
Cluster = [{Node, Spec} || Node <- Nodes],
|
||||||
ClusterNodes = emqx_cth_cluster:start(
|
ClusterNodes = emqx_cth_cluster:start(
|
||||||
|
@ -98,12 +106,25 @@ end_per_testcase(_TestCase, Config) ->
|
||||||
%% Helpers
|
%% Helpers
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
app_specs() ->
|
app_specs(CTConfig) ->
|
||||||
|
DSConfig =
|
||||||
|
case proplists:get_value(durability, CTConfig, disabled) of
|
||||||
|
enabled ->
|
||||||
|
#{
|
||||||
|
durable_sessions =>
|
||||||
|
#{
|
||||||
|
enable => true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
disabled ->
|
||||||
|
#{}
|
||||||
|
end,
|
||||||
[
|
[
|
||||||
{emqx, #{
|
{emqx, #{
|
||||||
before_start => fun() ->
|
before_start => fun() ->
|
||||||
emqx_app:set_config_loader(?MODULE)
|
emqx_app:set_config_loader(?MODULE)
|
||||||
end,
|
end,
|
||||||
|
config => DSConfig,
|
||||||
override_env => [{boot_modules, [broker, listeners]}]
|
override_env => [{boot_modules, [broker, listeners]}]
|
||||||
}},
|
}},
|
||||||
{emqx_retainer, #{
|
{emqx_retainer, #{
|
||||||
|
@ -320,6 +341,12 @@ t_session_purged(Config) ->
|
||||||
NumClientsNode2 = 35,
|
NumClientsNode2 = 35,
|
||||||
Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1),
|
Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1),
|
||||||
Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21),
|
Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21),
|
||||||
|
AllClientPids = Node1Clients ++ Node2Clients,
|
||||||
|
AllClientIds =
|
||||||
|
lists:map(
|
||||||
|
fun(ClientPid) -> proplists:get_value(clientid, emqtt:info(ClientPid)) end,
|
||||||
|
AllClientPids
|
||||||
|
),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(C) ->
|
fun(C) ->
|
||||||
ClientId = proplists:get_value(clientid, emqtt:info(C)),
|
ClientId = proplists:get_value(clientid, emqtt:info(C)),
|
||||||
|
@ -354,6 +381,32 @@ t_session_purged(Config) ->
|
||||||
?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])),
|
?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])),
|
||||||
?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])),
|
?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])),
|
||||||
|
|
||||||
ok = drain_exits(Node1Clients ++ Node2Clients),
|
ok = drain_exits(AllClientPids),
|
||||||
|
|
||||||
|
FormatFun = undefined,
|
||||||
|
?assertEqual(
|
||||||
|
[],
|
||||||
|
?ON(
|
||||||
|
Node1,
|
||||||
|
lists:flatmap(
|
||||||
|
fun(ClientId) ->
|
||||||
|
emqx_mgmt:lookup_client({clientid, ClientId}, FormatFun)
|
||||||
|
end,
|
||||||
|
AllClientIds
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertEqual(
|
||||||
|
[],
|
||||||
|
?ON(
|
||||||
|
Node2,
|
||||||
|
lists:flatmap(
|
||||||
|
fun(ClientId) ->
|
||||||
|
emqx_mgmt:lookup_client({clientid, ClientId}, FormatFun)
|
||||||
|
end,
|
||||||
|
AllClientIds
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
Loading…
Reference in New Issue