diff --git a/apps/emqx/src/emqx_persistent_message_ds_gc_worker.erl b/apps/emqx/src/emqx_persistent_message_ds_gc_worker.erl new file mode 100644 index 000000000..b960eae9e --- /dev/null +++ b/apps/emqx/src/emqx_persistent_message_ds_gc_worker.erl @@ -0,0 +1,157 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_persistent_message_ds_gc_worker). + +-behaviour(gen_server). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("stdlib/include/qlc.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-include("emqx_persistent_session_ds.hrl"). + +%% API +-export([ + start_link/0, + gc/0 +]). + +%% `gen_server' API +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +%% call/cast/info records +-record(gc, {}). + +%%-------------------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%% For testing or manual ops +gc() -> + gen_server:call(?MODULE, #gc{}, infinity). + +%%-------------------------------------------------------------------------------- +%% `gen_server' API +%%-------------------------------------------------------------------------------- + +init(_Opts) -> + ensure_gc_timer(), + State = #{}, + {ok, State}. + +handle_call(#gc{}, _From, State) -> + maybe_gc(), + {reply, ok, State}; +handle_call(_Call, _From, State) -> + {reply, error, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info(#gc{}, State) -> + try_gc(), + ensure_gc_timer(), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------------------- +%% Internal fns +%%-------------------------------------------------------------------------------- + +ensure_gc_timer() -> + Timeout = emqx_config:get([session_persistence, message_retention_period]), + _ = erlang:send_after(Timeout, self(), #gc{}), + ok. + +try_gc() -> + %% Only cores should run GC. + CoreNodes = mria_membership:running_core_nodelist(), + Res = global:trans( + {?MODULE, self()}, + fun maybe_gc/0, + CoreNodes, + %% Note: we set retries to 1 here because, in rare occasions, GC might start at the + %% same time in more than one node, and each one will abort the other. By allowing + %% one retry, at least one node will (hopefully) get to enter the transaction and + %% the other will abort. If GC runs too fast, both nodes might run in sequence. + %% But, in that case, GC is clearly not too costly, and that shouldn't be a problem, + %% resource-wise. + _Retries = 1 + ), + case Res of + aborted -> + ?tp(ds_message_gc_lock_taken, #{}), + ok; + ok -> + ok + end. + +now_ms() -> + erlang:system_time(millisecond). + +maybe_gc() -> + AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB), + NowMS = now_ms(), + RetentionPeriod = emqx_config:get([session_persistence, message_retention_period]), + TimeThreshold = NowMS - RetentionPeriod, + maybe_create_new_generation(AllGens, TimeThreshold), + ?tp_span( + ps_message_gc, + #{}, + begin + ExpiredGens = + maps:filter( + fun(_GenId, #{until := Until}) -> + is_number(Until) andalso Until =< TimeThreshold + end, + AllGens + ), + ExpiredGenIds = maps:keys(ExpiredGens), + lists:foreach( + fun(GenId) -> + ok = emqx_ds:drop_generation(?PERSISTENT_MESSAGE_DB, GenId), + ?tp(message_gc_generation_dropped, #{gen_id => GenId}) + end, + ExpiredGenIds + ) + end + ). + +maybe_create_new_generation(AllGens, TimeThreshold) -> + NeedNewGen = + lists:all( + fun({_GenId, #{created_at := CreatedAt}}) -> + CreatedAt =< TimeThreshold + end, + maps:to_list(AllGens) + ), + case NeedNewGen of + false -> + ?tp(ps_message_gc_too_early, #{}), + ok; + true -> + ok = emqx_ds:add_generation(?PERSISTENT_MESSAGE_DB), + ?tp(ps_message_gc_added_gen, #{}) + end. diff --git a/apps/emqx/src/emqx_persistent_session_ds_sup.erl b/apps/emqx/src/emqx_persistent_session_ds_sup.erl index 5bd620e8b..11e05be82 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_sup.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_sup.erl @@ -48,13 +48,14 @@ init(Opts) -> do_init(_Opts) -> SupFlags = #{ - strategy => rest_for_one, + strategy => one_for_one, intensity => 10, period => 2, auto_shutdown => never }, CoreChildren = [ - worker(gc_worker, emqx_persistent_session_ds_gc_worker, []) + worker(session_gc_worker, emqx_persistent_session_ds_gc_worker, []), + worker(message_gc_worker, emqx_persistent_message_ds_gc_worker, []) ], Children = case mria_rlog:role() of diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 7cd67089d..56d575bd9 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -1855,6 +1855,14 @@ fields("session_persistence") -> desc => ?DESC(session_ds_session_gc_batch_size) } )}, + {"message_retention_period", + sc( + timeout_duration(), + #{ + default => <<"1d">>, + desc => ?DESC(session_ds_message_retention_period) + } + )}, {"force_persistence", sc( boolean(), diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index f25f38098..c46d726f4 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -19,6 +19,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -compile(export_all). @@ -45,10 +46,20 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) -> Cluster = cluster(), Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), [{nodes, Nodes} | Config]; +init_per_testcase(t_message_gc = TestCase, Config) -> + Opts = #{ + extra_emqx_conf => + "\n session_persistence.message_retention_period = 1s" + "\n session_persistence.storage.builtin.n_shards = 3" + }, + common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts); init_per_testcase(TestCase, Config) -> + common_init_per_testcase(TestCase, Config, _Opts = #{}). + +common_init_per_testcase(TestCase, Config, Opts) -> ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), Apps = emqx_cth_suite:start( - app_specs(), + app_specs(Opts), #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} ), [{apps, Apps} | Config]. @@ -379,6 +390,66 @@ t_publish_empty_topic_levels(_Config) -> emqtt:stop(Pub) end. +t_message_gc_too_young(_Config) -> + %% Check that GC doesn't attempt to create a new generation if there are fresh enough + %% generations around. The stability of this test relies on the default value for + %% message retention being long enough. Currently, the default is 1 hour. + ?check_trace( + ok = emqx_persistent_message_ds_gc_worker:gc(), + fun(Trace) -> + ?assertMatch([_], ?of_kind(ps_message_gc_too_early, Trace)), + ok + end + ), + ok. + +t_message_gc(Config) -> + %% Check that, after GC runs, a new generation is created, retaining messages, and + %% older messages no longer are accessible. + NShards = ?config(n_shards, Config), + ?check_trace( + #{timetrap => 10_000}, + begin + %% ensure some messages are in the first generation + ?force_ordering( + #{?snk_kind := inserted_batch}, + #{?snk_kind := ps_message_gc_added_gen} + ), + Msgs0 = [ + message(<<"foo/bar">>, <<"1">>, 0), + message(<<"foo/baz">>, <<"2">>, 1) + ], + ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0), + ?tp(inserted_batch, #{}), + {ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}), + + Now = emqx_message:timestamp_now(), + Msgs1 = [ + message(<<"foo/bar">>, <<"3">>, Now + 100), + message(<<"foo/baz">>, <<"4">>, Now + 101) + ], + ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1), + + {ok, _} = snabbkaffe:block_until( + ?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}), + infinity + ), + + TopicFilter = emqx_topic:words(<<"#">>), + StartTime = 0, + Msgs = consume(TopicFilter, StartTime), + %% only "1" and "2" should have been GC'ed + ?assertEqual( + sets:from_list([<<"3">>, <<"4">>], [{version, 2}]), + sets:from_list([emqx_message:payload(Msg) || Msg <- Msgs], [{version, 2}]) + ), + + ok + end, + [] + ), + ok. + %% connect(ClientId, CleanStart, EI) -> @@ -438,9 +509,13 @@ publish(Node, Message) -> erpc:call(Node, emqx, publish, [Message]). app_specs() -> + app_specs(_Opts = #{}). + +app_specs(Opts) -> + ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""), [ emqx_durable_storage, - {emqx, "session_persistence {enable = true}"} + {emqx, "session_persistence {enable = true}" ++ ExtraEMQXConf} ]. cluster() -> @@ -459,3 +534,11 @@ clear_db() -> mria:stop(), ok = mnesia:delete_schema([node()]), ok. + +message(Topic, Payload, PublishedAt) -> + #message{ + topic = Topic, + payload = Payload, + timestamp = PublishedAt, + id = emqx_guid:gen() + }. diff --git a/changes/ce/feat-12338.en.md b/changes/ce/feat-12338.en.md new file mode 100644 index 000000000..8b8edcb76 --- /dev/null +++ b/changes/ce/feat-12338.en.md @@ -0,0 +1 @@ +Added time-based message garbage collection to the RocksDB-based persistent session backend. diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index af4251328..be16c765e 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1608,5 +1608,8 @@ The session will query the DB for the new messages when the value of `FreeSpace` `FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages.""" +session_ds_message_retention_period.desc: +"""The minimum amount of time that messages should be retained for. After messages have been in storage for at least this period of time, they'll be dropped.""" + }