From 880f5e8f89905bc679d6d6d654f471db6e9e8a3d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 29 Nov 2023 13:53:29 -0300 Subject: [PATCH] feat(ds): add session gc process Fixes https://emqx.atlassian.net/browse/EMQX-9744 --- .../emqx_persistent_session_ds_SUITE.erl | 188 +++++++++++++++++- apps/emqx/src/emqx_cm_sup.erl | 12 +- apps/emqx/src/emqx_persistent_session_ds.erl | 15 +- .../src/emqx_persistent_session_ds_gc_sup.erl | 78 ++++++++ .../emqx_persistent_session_ds_gc_worker.erl | 161 +++++++++++++++ apps/emqx/src/emqx_schema.erl | 16 ++ .../test/emqx_persistent_session_SUITE.erl | 2 - rel/i18n/emqx_schema.hocon | 6 + 8 files changed, 468 insertions(+), 10 deletions(-) create mode 100644 apps/emqx/src/emqx_persistent_session_ds_gc_sup.erl create mode 100644 apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 05c1eb8f2..165f53b6d 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -48,12 +48,36 @@ init_per_testcase(TestCase, Config) when {nodes, Nodes} | Config ]; +init_per_testcase(t_session_gc = TestCase, Config) -> + Opts = #{ + n => 3, + roles => [core, core, replicant], + extra_emqx_conf => + "\n session_persistence {" + "\n last_alive_update_interval = 500ms " + "\n session_gc_interval = 2s " + "\n session_gc_batch_size = 1 " + "\n }" + }, + Cluster = cluster(Opts), + ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}, + NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts), + Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts), + [ + {cluster, Cluster}, + {node_specs, NodeSpecs}, + {cluster_opts, ClusterOpts}, + {nodes, Nodes}, + {gc_interval, timer:seconds(2)} + | Config + ]; init_per_testcase(_TestCase, Config) -> Config. end_per_testcase(TestCase, Config) when TestCase =:= t_session_subscription_idempotency; - TestCase =:= t_session_unsubscription_idempotency + TestCase =:= t_session_unsubscription_idempotency; + TestCase =:= t_session_gc -> Nodes = ?config(nodes, Config), emqx_common_test_helpers:call_janitor(60_000), @@ -67,20 +91,32 @@ end_per_testcase(_TestCase, _Config) -> %% Helper fns %%------------------------------------------------------------------------------ -cluster(#{n := N}) -> - Spec = #{role => core, apps => app_specs()}, +cluster(#{n := N} = Opts) -> + MkRole = fun(M) -> + case maps:get(roles, Opts, undefined) of + undefined -> + core; + Roles -> + lists:nth(M, Roles) + end + end, + MkSpec = fun(M) -> #{role => MkRole(M), apps => app_specs(Opts)} end, lists:map( fun(M) -> Name = list_to_atom("ds_SUITE" ++ integer_to_list(M)), - {Name, Spec} + {Name, MkSpec(M)} end, lists:seq(1, N) ). app_specs() -> + app_specs(_Opts = #{}). + +app_specs(Opts) -> + ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""), [ emqx_durable_storage, - {emqx, "session_persistence = {enable = true}"} + {emqx, "session_persistence = {enable = true}" ++ ExtraEMQXConf} ]. get_mqtt_port(Node, Type) -> @@ -143,6 +179,29 @@ restart_node(Node, NodeSpec) -> is_persistent_connect_opts(#{properties := #{'Session-Expiry-Interval' := EI}}) -> EI > 0. +list_all_sessions(Node) -> + erpc:call(Node, emqx_persistent_session_ds, list_all_sessions, []). + +list_all_subscriptions(Node) -> + erpc:call(Node, emqx_persistent_session_ds, list_all_subscriptions, []). + +list_all_pubranges(Node) -> + erpc:call(Node, emqx_persistent_session_ds, list_all_pubranges, []). + +prop_only_cores_run_gc(CoreNodes) -> + {"only core nodes run gc", fun(Trace) -> ?MODULE:prop_only_cores_run_gc(Trace, CoreNodes) end}. +prop_only_cores_run_gc(Trace, CoreNodes) -> + GCNodes = lists:usort([ + N + || #{ + ?snk_kind := K, + ?snk_meta := #{node := N} + } <- Trace, + lists:member(K, [ds_session_gc, ds_session_gc_lock_taken]), + N =/= node() + ]), + ?assertEqual(lists:usort(CoreNodes), GCNodes). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -469,3 +528,122 @@ do_t_session_expiration(_Config, Opts) -> [] ), ok. + +t_session_gc(Config) -> + GCInterval = ?config(gc_interval, Config), + [Node1, Node2, Node3] = Nodes = ?config(nodes, Config), + CoreNodes = [Node1, Node2], + [ + Port1, + Port2, + Port3 + ] = lists:map(fun(N) -> get_mqtt_port(N, tcp) end, Nodes), + CommonParams = #{ + clean_start => false, + proto_ver => v5 + }, + StartClient = fun(ClientId, Port, ExpiryInterval) -> + Params = maps:merge(CommonParams, #{ + clientid => ClientId, + port => Port, + properties => #{'Session-Expiry-Interval' => ExpiryInterval} + }), + Client = start_client(Params), + {ok, _} = emqtt:connect(Client), + Client + end, + + ?check_trace( + begin + ClientId0 = <<"session_gc0">>, + Client0 = StartClient(ClientId0, Port1, 30), + + ClientId1 = <<"session_gc1">>, + Client1 = StartClient(ClientId1, Port2, 1), + + ClientId2 = <<"session_gc2">>, + Client2 = StartClient(ClientId2, Port3, 1), + + lists:foreach( + fun(Client) -> + Topic = <<"some/topic">>, + Payload = <<"hi">>, + {ok, _, [?RC_GRANTED_QOS_1]} = emqtt:subscribe(Client, Topic, ?QOS_1), + {ok, _} = emqtt:publish(Client, Topic, Payload, ?QOS_1), + ok + end, + [Client0, Client1, Client2] + ), + + %% Clients are still alive; no session is garbage collected. + Res0 = ?block_until( + #{ + ?snk_kind := ds_session_gc, + ?snk_span := {complete, _}, + ?snk_meta := #{node := N} + } when + N =/= node(), + 3 * GCInterval + 1_000 + ), + ?assertMatch({ok, _}, Res0), + {ok, #{?snk_meta := #{time := T0}}} = Res0, + Sessions0 = list_all_sessions(Node1), + Subs0 = list_all_subscriptions(Node1), + ?assertEqual(3, map_size(Sessions0), #{sessions => Sessions0}), + ?assertEqual(3, map_size(Subs0), #{subs => Subs0}), + + %% Now we disconnect 2 of them; only those should be GC'ed. + ?assertMatch( + {ok, {ok, _}}, + ?wait_async_action( + emqtt:stop(Client1), + #{?snk_kind := terminate}, + 1_000 + ) + ), + ct:pal("disconnected client1"), + ?assertMatch( + {ok, {ok, _}}, + ?wait_async_action( + emqtt:stop(Client2), + #{?snk_kind := terminate}, + 1_000 + ) + ), + ct:pal("disconnected client2"), + ?assertMatch( + {ok, _}, + ?block_until( + #{ + ?snk_kind := ds_session_gc_cleaned, + ?snk_meta := #{node := N, time := T}, + session_ids := [ClientId1] + } when + N =/= node() andalso T > T0, + 4 * GCInterval + 1_000 + ) + ), + ?assertMatch( + {ok, _}, + ?block_until( + #{ + ?snk_kind := ds_session_gc_cleaned, + ?snk_meta := #{node := N, time := T}, + session_ids := [ClientId2] + } when + N =/= node() andalso T > T0, + 4 * GCInterval + 1_000 + ) + ), + Sessions1 = list_all_sessions(Node1), + Subs1 = list_all_subscriptions(Node1), + ?assertEqual(1, map_size(Sessions1), #{sessions => Sessions1}), + ?assertEqual(1, map_size(Subs1), #{subs => Subs1}), + + ok + end, + [ + prop_only_cores_run_gc(CoreNodes) + ] + ), + ok. diff --git a/apps/emqx/src/emqx_cm_sup.erl b/apps/emqx/src/emqx_cm_sup.erl index 9db73e8e4..a7db9c8be 100644 --- a/apps/emqx/src/emqx_cm_sup.erl +++ b/apps/emqx/src/emqx_cm_sup.erl @@ -47,7 +47,17 @@ init([]) -> Locker = child_spec(emqx_cm_locker, 5000, worker), Registry = child_spec(emqx_cm_registry, 5000, worker), Manager = child_spec(emqx_cm, 5000, worker), - {ok, {SupFlags, [Banned, Flapping, Locker, Registry, Manager]}}. + DSSessionGCSup = child_spec(emqx_persistent_session_ds_gc_sup, infinity, supervisor), + Children = + [ + Banned, + Flapping, + Locker, + Registry, + Manager, + DSSessionGCSup + ], + {ok, {SupFlags, Children}}. %%-------------------------------------------------------------------- %% Internal functions diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 03e0abfcd..9844e6d48 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -63,6 +63,9 @@ %% session table operations -export([create_tables/0]). +%% internal export used by session GC process +-export([destroy_session/1]). + %% Remove me later (satisfy checks for an unused BPAPI) -export([ do_open_iterator/3, @@ -986,8 +989,16 @@ expiry_interval(ConnInfo) -> list_all_sessions() -> DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB), ConnInfo = #{}, - Sessions = lists:map( - fun(SessionID) -> {SessionID, session_open(SessionID, ConnInfo)} end, + Sessions = lists:filtermap( + fun(SessionID) -> + Sess = session_open(SessionID, ConnInfo), + case Sess of + false -> + false; + _ -> + {true, {SessionID, Sess}} + end + end, DSSessionIds ), maps:from_list(Sessions). diff --git a/apps/emqx/src/emqx_persistent_session_ds_gc_sup.erl b/apps/emqx/src/emqx_persistent_session_ds_gc_sup.erl new file mode 100644 index 000000000..aff102e5d --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds_gc_sup.erl @@ -0,0 +1,78 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_persistent_session_ds_gc_sup). + +-behaviour(supervisor). + +%% API +-export([ + start_link/0 +]). + +%% `supervisor' API +-export([ + init/1 +]). + +%%-------------------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------------------- + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%-------------------------------------------------------------------------------- +%% `supervisor' API +%%-------------------------------------------------------------------------------- + +init(Opts) -> + case emqx_persistent_message:is_persistence_enabled() of + true -> + do_init(Opts); + false -> + ignore + end. + +do_init(_Opts) -> + SupFlags = #{ + strategy => rest_for_one, + intensity => 10, + period => 2, + auto_shutdown => never + }, + CoreChildren = [ + worker(gc_worker, emqx_persistent_session_ds_gc_worker, []) + ], + Children = + case mria_rlog:role() of + core -> CoreChildren; + replicant -> [] + end, + {ok, {SupFlags, Children}}. + +%%-------------------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------------------- + +worker(Id, Mod, Args) -> + #{ + id => Id, + start => {Mod, start_link, Args}, + type => worker, + restart => permanent, + shutdown => 10_000, + significant => false + }. diff --git a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl new file mode 100644 index 000000000..bf607804f --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl @@ -0,0 +1,161 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_persistent_session_ds_gc_worker). + +-behaviour(gen_server). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("stdlib/include/qlc.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-include("emqx_persistent_session_ds.hrl"). + +%% API +-export([ + start_link/0 +]). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +%% call/cast/info records +-record(gc, {}). + +%%-------------------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%-------------------------------------------------------------------------------- +%% `gen_server' API +%%-------------------------------------------------------------------------------- + +init(_Opts) -> + ensure_gc_timer(), + State = #{}, + {ok, State}. + +handle_call(_Call, _From, State) -> + {reply, error, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info(#gc{}, State) -> + try_gc(), + ensure_gc_timer(), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------------------- + +ensure_gc_timer() -> + Timeout = emqx_config:get([session_persistence, session_gc_interval]), + _ = erlang:send_after(Timeout, self(), #gc{}), + ok. + +try_gc() -> + %% Only cores should run GC. + CoreNodes = mria_membership:running_core_nodelist(), + Res = global:trans( + {?MODULE, self()}, + fun() -> ?tp_span(ds_session_gc, #{}, start_gc()) end, + CoreNodes, + %% Note: we set retries to 1 here because, in rare occasions, GC might start at the + %% same time in more than one node, and each one will abort the other. By allowing + %% one retry, at least one node will (hopefully) get to enter the transaction and + %% the other will abort. If GC runs too fast, both nodes might run in sequence. + %% But, in that case, GC is clearly not too costly, and that shouldn't be a problem, + %% resource-wise. + _Retries = 1 + ), + case Res of + aborted -> + ?tp(ds_session_gc_lock_taken, #{}), + ok; + ok -> + ok + end. + +now_ms() -> + erlang:system_time(millisecond). + +start_gc() -> + do_gc(more). + +zombie_session_ms() -> + NowMS = now_ms(), + GCInterval = emqx_config:get([session_persistence, session_gc_interval]), + BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), + TimeThreshold = max(GCInterval, BumpInterval) * 3, + ets:fun2ms( + fun( + #session{ + id = DSSessionId, + last_alive_at = LastAliveAt, + conninfo = #{expiry_interval := EI} + } + ) when + LastAliveAt + EI + TimeThreshold =< NowMS + -> + DSSessionId + end + ). + +do_gc(more) -> + GCBatchSize = emqx_config:get([session_persistence, session_gc_batch_size]), + MS = zombie_session_ms(), + {atomic, Next} = mria:transaction(?DS_MRIA_SHARD, fun() -> + Res = mnesia:select(?SESSION_TAB, MS, GCBatchSize, write), + case Res of + '$end_of_table' -> + done; + {[], Cont} -> + %% since `GCBatchsize' is just a "recommendation" for `select', we try only + %% _once_ the continuation and then stop if it yields nothing, to avoid a + %% dead loop. + case mnesia:select(Cont) of + '$end_of_table' -> + done; + {[], _Cont} -> + done; + {DSSessionIds0, _Cont} -> + do_gc_(DSSessionIds0), + more + end; + {DSSessionIds0, _Cont} -> + do_gc_(DSSessionIds0), + more + end + end), + do_gc(Next); +do_gc(done) -> + ok. + +do_gc_(DSSessionIds) -> + lists:foreach(fun emqx_persistent_session_ds:destroy_session/1, DSSessionIds), + ?tp(ds_session_gc_cleaned, #{session_ids => DSSessionIds}), + ok. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 1f319f985..2638baf7e 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1789,6 +1789,22 @@ fields("session_persistence") -> desc => ?DESC(session_ds_last_alive_update_interval) } )}, + {"session_gc_interval", + sc( + timeout_duration(), + #{ + default => <<"10m">>, + desc => ?DESC(session_ds_session_gc_interval) + } + )}, + {"session_gc_batch_size", + sc( + pos_integer(), + #{ + default => 100, + desc => ?DESC(session_ds_session_gc_batch_size) + } + )}, {"force_persistence", sc( boolean(), diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 041e4076b..d835fb944 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -510,8 +510,6 @@ t_persist_on_disconnect(Config) -> ?assertEqual(0, client_info(session_present, Client2)), ok = emqtt:disconnect(Client2). -t_process_dies_session_expires(init, Config) -> skip_ds_tc(Config); -t_process_dies_session_expires('end', _Config) -> ok. t_process_dies_session_expires(Config) -> %% Emulate an error in the connect process, %% or that the node of the process goes down. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index d931c66b1..2a6fb03ba 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1571,4 +1571,10 @@ session_builtin_n_shards.desc: session_storage_backend_builtin.desc: """Builtin session storage backend utilizing embedded RocksDB key-value store.""" +session_ds_session_gc_interval.desc: +"""The interval at which session garbage collection is executed for persistent sessions.""" + +session_ds_session_gc_batch_size.desc: +"""The size of each batch of expired persistent sessions to be garbage collected per iteration.""" + }