Merge pull request #12057 from thalesmg/ds-session-gc-m-20231128

feat(ds): add session gc process
This commit is contained in:
Thales Macedo Garitezi 2023-11-30 17:51:52 -03:00 committed by GitHub
commit 113a4ad4b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 469 additions and 10 deletions

View File

@ -48,12 +48,36 @@ init_per_testcase(TestCase, Config) when
{nodes, Nodes}
| Config
];
init_per_testcase(t_session_gc = TestCase, Config) ->
Opts = #{
n => 3,
roles => [core, core, replicant],
extra_emqx_conf =>
"\n session_persistence {"
"\n last_alive_update_interval = 500ms "
"\n session_gc_interval = 2s "
"\n session_gc_batch_size = 1 "
"\n }"
},
Cluster = cluster(Opts),
ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)},
NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts),
[
{cluster, Cluster},
{node_specs, NodeSpecs},
{cluster_opts, ClusterOpts},
{nodes, Nodes},
{gc_interval, timer:seconds(2)}
| Config
];
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(TestCase, Config) when
TestCase =:= t_session_subscription_idempotency;
TestCase =:= t_session_unsubscription_idempotency
TestCase =:= t_session_unsubscription_idempotency;
TestCase =:= t_session_gc
->
Nodes = ?config(nodes, Config),
emqx_common_test_helpers:call_janitor(60_000),
@ -67,20 +91,32 @@ end_per_testcase(_TestCase, _Config) ->
%% Helper fns
%%------------------------------------------------------------------------------
cluster(#{n := N}) ->
Spec = #{role => core, apps => app_specs()},
cluster(#{n := N} = Opts) ->
MkRole = fun(M) ->
case maps:get(roles, Opts, undefined) of
undefined ->
core;
Roles ->
lists:nth(M, Roles)
end
end,
MkSpec = fun(M) -> #{role => MkRole(M), apps => app_specs(Opts)} end,
lists:map(
fun(M) ->
Name = list_to_atom("ds_SUITE" ++ integer_to_list(M)),
{Name, Spec}
{Name, MkSpec(M)}
end,
lists:seq(1, N)
).
app_specs() ->
app_specs(_Opts = #{}).
app_specs(Opts) ->
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[
emqx_durable_storage,
{emqx, "session_persistence = {enable = true}"}
{emqx, "session_persistence = {enable = true}" ++ ExtraEMQXConf}
].
get_mqtt_port(Node, Type) ->
@ -124,6 +160,29 @@ restart_node(Node, NodeSpec) ->
is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) ->
EI > 0.
list_all_sessions(Node) ->
erpc:call(Node, emqx_persistent_session_ds, list_all_sessions, []).
list_all_subscriptions(Node) ->
erpc:call(Node, emqx_persistent_session_ds, list_all_subscriptions, []).
list_all_pubranges(Node) ->
erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []).
prop_only_cores_run_gc(CoreNodes) ->
{"only core nodes run gc", fun(Trace) -> ?MODULE:prop_only_cores_run_gc(Trace, CoreNodes) end}.
prop_only_cores_run_gc(Trace, CoreNodes) ->
GCNodes = lists:usort([
N
|| #{
?snk_kind := K,
?snk_meta := #{node := N}
} <- Trace,
lists:member(K, [ds_session_gc, ds_session_gc_lock_taken]),
N =/= node()
]),
?assertEqual(lists:usort(CoreNodes), GCNodes).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -450,3 +509,122 @@ do_t_session_expiration(_Config, Opts) ->
[]
),
ok.
t_session_gc(Config) ->
GCInterval = ?config(gc_interval, Config),
[Node1, Node2, Node3] = Nodes = ?config(nodes, Config),
CoreNodes = [Node1, Node2],
[
Port1,
Port2,
Port3
] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes),
CommonParams = #{
clean_start => false,
proto_ver => v5
},
StartClient = fun(ClientId, Port, ExpiryInterval) ->
Params = maps:merge(CommonParams, #{
clientid => ClientId,
port => Port,
properties => #{'Session-Expiry-Interval' => ExpiryInterval}
}),
Client = start_client(Params),
{ok, _} = emqtt:connect(Client),
Client
end,
?check_trace(
begin
ClientId0 = <<"session_gc0">>,
Client0 = StartClient(ClientId0, Port1, 30),
ClientId1 = <<"session_gc1">>,
Client1 = StartClient(ClientId1, Port2, 1),
ClientId2 = <<"session_gc2">>,
Client2 = StartClient(ClientId2, Port3, 1),
lists:foreach(
fun(Client) ->
Topic = <<"some/topic">>,
Payload = <<"hi">>,
{ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client, Topic, ?QOS_1),
{ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1),
ok
end,
[Client0, Client1, Client2]
),
%% Clients are still alive; no session is garbage collected.
Res0 = ?block_until(
#{
?snk_kind := ds_session_gc,
?snk_span := {complete, _},
?snk_meta := #{node := N}
} when
N =/= node(),
3 * GCInterval + 1_000
),
?assertMatch({ok, _}, Res0),
{ok, #{?snk_meta := #{time := T0}}} = Res0,
Sessions0 = list_all_sessions(Node1),
Subs0 = list_all_subscriptions(Node1),
?assertEqual(3, map_size(Sessions0), #{sessions => Sessions0}),
?assertEqual(3, map_size(Subs0), #{subs => Subs0}),
%% Now we disconnect 2 of them; only those should be GC'ed.
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqtt:stop(Client1),
#{?snk_kind := terminate},
1_000
)
),
ct:pal("disconnected client1"),
?assertMatch(
{ok, {ok, _}},
?wait_async_action(
emqtt:stop(Client2),
#{?snk_kind := terminate},
1_000
)
),
ct:pal("disconnected client2"),
?assertMatch(
{ok, _},
?block_until(
#{
?snk_kind := ds_session_gc_cleaned,
?snk_meta := #{node := N, time := T},
session_ids := [ClientId1]
} when
N =/= node() andalso T > T0,
4 * GCInterval + 1_000
)
),
?assertMatch(
{ok, _},
?block_until(
#{
?snk_kind := ds_session_gc_cleaned,
?snk_meta := #{node := N, time := T},
session_ids := [ClientId2]
} when
N =/= node() andalso T > T0,
4 * GCInterval + 1_000
)
),
Sessions1 = list_all_sessions(Node1),
Subs1 = list_all_subscriptions(Node1),
?assertEqual(1, map_size(Sessions1), #{sessions => Sessions1}),
?assertEqual(1, map_size(Subs1), #{subs => Subs1}),
ok
end,
[
prop_only_cores_run_gc(CoreNodes)
]
),
ok.

View File

@ -47,7 +47,17 @@ init([]) ->
Locker = child_spec(emqx_cm_locker, 5000, worker),
Registry = child_spec(emqx_cm_registry, 5000, worker),
Manager = child_spec(emqx_cm, 5000, worker),
{ok, {SupFlags, [Banned, Flapping, Locker, Registry, Manager]}}.
DSSessionGCSup = child_spec(emqx_persistent_session_ds_sup, infinity, supervisor),
Children =
[
Banned,
Flapping,
Locker,
Registry,
Manager,
DSSessionGCSup
],
{ok, {SupFlags, Children}}.
%%--------------------------------------------------------------------
%% Internal functions

View File

@ -63,6 +63,9 @@
%% session table operations
-export([create_tables/0]).
%% internal export used by session GC process
-export([destroy_session/1]).
%% Remove me later (satisfy checks for an unused BPAPI)
-export([
do_open_iterator/3,
@ -986,8 +989,16 @@ expiry_interval(ConnInfo) ->
list_all_sessions() ->
DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB),
ConnInfo = #{},
Sessions = lists:map(
fun(SessionID) -> {SessionID, session_open(SessionID, ConnInfo)} end,
Sessions = lists:filtermap(
fun(SessionID) ->
Sess = session_open(SessionID, ConnInfo),
case Sess of
false ->
false;
_ ->
{true, {SessionID, Sess}}
end
end,
DSSessionIds
),
maps:from_list(Sessions).

View File

@ -0,0 +1,161 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_ds_gc_worker).
-behaviour(gen_server).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/qlc.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include("emqx_persistent_session_ds.hrl").
%% API
-export([
start_link/0
]).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2
]).
%% call/cast/info records
-record(gc, {}).
%%--------------------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%--------------------------------------------------------------------------------
%% `gen_server' API
%%--------------------------------------------------------------------------------
init(_Opts) ->
ensure_gc_timer(),
State = #{},
{ok, State}.
handle_call(_Call, _From, State) ->
{reply, error, State}.
handle_cast(_Cast, State) ->
{noreply, State}.
handle_info(#gc{}, State) ->
try_gc(),
ensure_gc_timer(),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------------------
%% Internal fns
%%--------------------------------------------------------------------------------
ensure_gc_timer() ->
Timeout = emqx_config:get([session_persistence, session_gc_interval]),
_ = erlang:send_after(Timeout, self(), #gc{}),
ok.
try_gc() ->
%% Only cores should run GC.
CoreNodes = mria_membership:running_core_nodelist(),
Res = global:trans(
{?MODULE, self()},
fun() -> ?tp_span(ds_session_gc, #{}, start_gc()) end,
CoreNodes,
%% Note: we set retries to 1 here because, in rare occasions, GC might start at the
%% same time in more than one node, and each one will abort the other. By allowing
%% one retry, at least one node will (hopefully) get to enter the transaction and
%% the other will abort. If GC runs too fast, both nodes might run in sequence.
%% But, in that case, GC is clearly not too costly, and that shouldn't be a problem,
%% resource-wise.
_Retries = 1
),
case Res of
aborted ->
?tp(ds_session_gc_lock_taken, #{}),
ok;
ok ->
ok
end.
now_ms() ->
erlang:system_time(millisecond).
start_gc() ->
do_gc(more).
zombie_session_ms() ->
NowMS = now_ms(),
GCInterval = emqx_config:get([session_persistence, session_gc_interval]),
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
TimeThreshold = max(GCInterval, BumpInterval) * 3,
ets:fun2ms(
fun(
#session{
id = DSSessionId,
last_alive_at = LastAliveAt,
conninfo = #{expiry_interval := EI}
}
) when
LastAliveAt + EI + TimeThreshold =< NowMS
->
DSSessionId
end
).
do_gc(more) ->
GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]),
MS = zombie_session_ms(),
{atomic, Next} = mria:transaction(?DS_MRIA_SHARD, fun() ->
Res = mnesia:select(?SESSION_TAB, MS, GCBatchSize, write),
case Res of
'$end_of_table' ->
done;
{[], Cont} ->
%% since `GCBatchsize' is just a "recommendation" for `select', we try only
%% _once_ the continuation and then stop if it yields nothing, to avoid a
%% dead loop.
case mnesia:select(Cont) of
'$end_of_table' ->
done;
{[], _Cont} ->
done;
{DSSessionIds0, _Cont} ->
do_gc_(DSSessionIds0),
more
end;
{DSSessionIds0, _Cont} ->
do_gc_(DSSessionIds0),
more
end
end),
do_gc(Next);
do_gc(done) ->
ok.
do_gc_(DSSessionIds) ->
lists:foreach(fun emqx_persistent_session_ds:destroy_session/1, DSSessionIds),
?tp(ds_session_gc_cleaned, #{session_ids => DSSessionIds}),
ok.

View File

@ -0,0 +1,78 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_session_ds_sup).
-behaviour(supervisor).
%% API
-export([
start_link/0
]).
%% `supervisor' API
-export([
init/1
]).
%%--------------------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------------------
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%--------------------------------------------------------------------------------
%% `supervisor' API
%%--------------------------------------------------------------------------------
init(Opts) ->
case emqx_persistent_message:is_persistence_enabled() of
true ->
do_init(Opts);
false ->
ignore
end.
do_init(_Opts) ->
SupFlags = #{
strategy => rest_for_one,
intensity => 10,
period => 2,
auto_shutdown => never
},
CoreChildren = [
worker(gc_worker, emqx_persistent_session_ds_gc_worker, [])
],
Children =
case mria_rlog:role() of
core -> CoreChildren;
replicant -> []
end,
{ok, {SupFlags, Children}}.
%%--------------------------------------------------------------------------------
%% Internal fns
%%--------------------------------------------------------------------------------
worker(Id, Mod, Args) ->
#{
id => Id,
start => {Mod, start_link, Args},
type => worker,
restart => permanent,
shutdown => 10_000,
significant => false
}.

View File

@ -1789,6 +1789,23 @@ fields("session_persistence") ->
desc => ?DESC(session_ds_last_alive_update_interval)
}
)},
{"session_gc_interval",
sc(
timeout_duration(),
#{
default => <<"10m">>,
desc => ?DESC(session_ds_session_gc_interval)
}
)},
{"session_gc_batch_size",
sc(
pos_integer(),
#{
default => 100,
importance => ?IMPORTANCE_LOW,
desc => ?DESC(session_ds_session_gc_batch_size)
}
)},
{"force_persistence",
sc(
boolean(),

View File

@ -510,8 +510,6 @@ t_persist_on_disconnect(Config) ->
?assertEqual(0, client_info(session_present, Client2)),
ok = emqtt:disconnect(Client2).
t_process_dies_session_expires(init, Config) -> skip_ds_tc(Config);
t_process_dies_session_expires('end', _Config) -> ok.
t_process_dies_session_expires(Config) ->
%% Emulate an error in the connect process,
%% or that the node of the process goes down.

View File

@ -1571,4 +1571,10 @@ session_builtin_n_shards.desc:
session_storage_backend_builtin.desc:
"""Builtin session storage backend utilizing embedded RocksDB key-value store."""
session_ds_session_gc_interval.desc:
"""The interval at which session garbage collection is executed for persistent sessions."""
session_ds_session_gc_batch_size.desc:
"""The size of each batch of expired persistent sessions to be garbage collected per iteration."""
}