feat(ps): add message gc

This commit is contained in:
Thales Macedo Garitezi 2024-01-18 14:28:37 -03:00 committed by zhongwencool
parent 7c0d37fdb9
commit d323fc7c27
6 changed files with 257 additions and 4 deletions

View File

@ -0,0 +1,157 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_persistent_message_ds_gc_worker).
-behaviour(gen_server).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("stdlib/include/qlc.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include("emqx_persistent_session_ds.hrl").
%% API
-export([
start_link/0,
gc/0
]).
%% `gen_server' API
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2
]).
%% call/cast/info records
-record(gc, {}).
%%--------------------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% For testing or manual ops
gc() ->
gen_server:call(?MODULE, #gc{}, infinity).
%%--------------------------------------------------------------------------------
%% `gen_server' API
%%--------------------------------------------------------------------------------
init(_Opts) ->
ensure_gc_timer(),
State = #{},
{ok, State}.
handle_call(#gc{}, _From, State) ->
maybe_gc(),
{reply, ok, State};
handle_call(_Call, _From, State) ->
{reply, error, State}.
handle_cast(_Cast, State) ->
{noreply, State}.
handle_info(#gc{}, State) ->
try_gc(),
ensure_gc_timer(),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------------------
%% Internal fns
%%--------------------------------------------------------------------------------
ensure_gc_timer() ->
Timeout = emqx_config:get([session_persistence, message_retention_period]),
_ = erlang:send_after(Timeout, self(), #gc{}),
ok.
try_gc() ->
%% Only cores should run GC.
CoreNodes = mria_membership:running_core_nodelist(),
Res = global:trans(
{?MODULE, self()},
fun maybe_gc/0,
CoreNodes,
%% Note: we set retries to 1 here because, in rare occasions, GC might start at the
%% same time in more than one node, and each one will abort the other. By allowing
%% one retry, at least one node will (hopefully) get to enter the transaction and
%% the other will abort. If GC runs too fast, both nodes might run in sequence.
%% But, in that case, GC is clearly not too costly, and that shouldn't be a problem,
%% resource-wise.
_Retries = 1
),
case Res of
aborted ->
?tp(ds_message_gc_lock_taken, #{}),
ok;
ok ->
ok
end.
now_ms() ->
erlang:system_time(millisecond).
maybe_gc() ->
AllGens = emqx_ds:list_generations_with_lifetimes(?PERSISTENT_MESSAGE_DB),
NowMS = now_ms(),
RetentionPeriod = emqx_config:get([session_persistence, message_retention_period]),
TimeThreshold = NowMS - RetentionPeriod,
maybe_create_new_generation(AllGens, TimeThreshold),
?tp_span(
ps_message_gc,
#{},
begin
ExpiredGens =
maps:filter(
fun(_GenId, #{until := Until}) ->
is_number(Until) andalso Until =< TimeThreshold
end,
AllGens
),
ExpiredGenIds = maps:keys(ExpiredGens),
lists:foreach(
fun(GenId) ->
ok = emqx_ds:drop_generation(?PERSISTENT_MESSAGE_DB, GenId),
?tp(message_gc_generation_dropped, #{gen_id => GenId})
end,
ExpiredGenIds
)
end
).
maybe_create_new_generation(AllGens, TimeThreshold) ->
NeedNewGen =
lists:all(
fun({_GenId, #{created_at := CreatedAt}}) ->
CreatedAt =< TimeThreshold
end,
maps:to_list(AllGens)
),
case NeedNewGen of
false ->
?tp(ps_message_gc_too_early, #{}),
ok;
true ->
ok = emqx_ds:add_generation(?PERSISTENT_MESSAGE_DB),
?tp(ps_message_gc_added_gen, #{})
end.

View File

@ -48,13 +48,14 @@ init(Opts) ->
do_init(_Opts) -> do_init(_Opts) ->
SupFlags = #{ SupFlags = #{
strategy => rest_for_one, strategy => one_for_one,
intensity => 10, intensity => 10,
period => 2, period => 2,
auto_shutdown => never auto_shutdown => never
}, },
CoreChildren = [ CoreChildren = [
worker(gc_worker, emqx_persistent_session_ds_gc_worker, []) worker(session_gc_worker, emqx_persistent_session_ds_gc_worker, []),
worker(message_gc_worker, emqx_persistent_message_ds_gc_worker, [])
], ],
Children = Children =
case mria_rlog:role() of case mria_rlog:role() of

View File

@ -1855,6 +1855,14 @@ fields("session_persistence") ->
desc => ?DESC(session_ds_session_gc_batch_size) desc => ?DESC(session_ds_session_gc_batch_size)
} }
)}, )},
{"message_retention_period",
sc(
timeout_duration(),
#{
default => <<"1d">>,
desc => ?DESC(session_ds_message_retention_period)
}
)},
{"force_persistence", {"force_persistence",
sc( sc(
boolean(), boolean(),

View File

@ -19,6 +19,7 @@
-include_lib("stdlib/include/assert.hrl"). -include_lib("stdlib/include/assert.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-compile(export_all). -compile(export_all).
@ -45,10 +46,20 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) ->
Cluster = cluster(), Cluster = cluster(),
Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}),
[{nodes, Nodes} | Config]; [{nodes, Nodes} | Config];
init_per_testcase(t_message_gc = TestCase, Config) ->
Opts = #{
extra_emqx_conf =>
"\n session_persistence.message_retention_period = 1s"
"\n session_persistence.storage.builtin.n_shards = 3"
},
common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
init_per_testcase(TestCase, Config) -> init_per_testcase(TestCase, Config) ->
common_init_per_testcase(TestCase, Config, _Opts = #{}).
common_init_per_testcase(TestCase, Config, Opts) ->
ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB),
Apps = emqx_cth_suite:start( Apps = emqx_cth_suite:start(
app_specs(), app_specs(Opts),
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
), ),
[{apps, Apps} | Config]. [{apps, Apps} | Config].
@ -379,6 +390,66 @@ t_publish_empty_topic_levels(_Config) ->
emqtt:stop(Pub) emqtt:stop(Pub)
end. end.
t_message_gc_too_young(_Config) ->
%% Check that GC doesn't attempt to create a new generation if there are fresh enough
%% generations around. The stability of this test relies on the default value for
%% message retention being long enough. Currently, the default is 1 hour.
?check_trace(
ok = emqx_persistent_message_ds_gc_worker:gc(),
fun(Trace) ->
?assertMatch([_], ?of_kind(ps_message_gc_too_early, Trace)),
ok
end
),
ok.
t_message_gc(Config) ->
%% Check that, after GC runs, a new generation is created, retaining messages, and
%% older messages no longer are accessible.
NShards = ?config(n_shards, Config),
?check_trace(
#{timetrap => 10_000},
begin
%% ensure some messages are in the first generation
?force_ordering(
#{?snk_kind := inserted_batch},
#{?snk_kind := ps_message_gc_added_gen}
),
Msgs0 = [
message(<<"foo/bar">>, <<"1">>, 0),
message(<<"foo/baz">>, <<"2">>, 1)
],
ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs0),
?tp(inserted_batch, #{}),
{ok, _} = ?block_until(#{?snk_kind := ps_message_gc_added_gen}),
Now = emqx_message:timestamp_now(),
Msgs1 = [
message(<<"foo/bar">>, <<"3">>, Now + 100),
message(<<"foo/baz">>, <<"4">>, Now + 101)
],
ok = emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, Msgs1),
{ok, _} = snabbkaffe:block_until(
?match_n_events(NShards, #{?snk_kind := message_gc_generation_dropped}),
infinity
),
TopicFilter = emqx_topic:words(<<"#">>),
StartTime = 0,
Msgs = consume(TopicFilter, StartTime),
%% only "1" and "2" should have been GC'ed
?assertEqual(
sets:from_list([<<"3">>, <<"4">>], [{version, 2}]),
sets:from_list([emqx_message:payload(Msg) || Msg <- Msgs], [{version, 2}])
),
ok
end,
[]
),
ok.
%% %%
connect(ClientId, CleanStart, EI) -> connect(ClientId, CleanStart, EI) ->
@ -438,9 +509,13 @@ publish(Node, Message) ->
erpc:call(Node, emqx, publish, [Message]). erpc:call(Node, emqx, publish, [Message]).
app_specs() -> app_specs() ->
app_specs(_Opts = #{}).
app_specs(Opts) ->
ExtraEMQXConf = maps:get(extra_emqx_conf, Opts, ""),
[ [
emqx_durable_storage, emqx_durable_storage,
{emqx, "session_persistence {enable = true}"} {emqx, "session_persistence {enable = true}" ++ ExtraEMQXConf}
]. ].
cluster() -> cluster() ->
@ -459,3 +534,11 @@ clear_db() ->
mria:stop(), mria:stop(),
ok = mnesia:delete_schema([node()]), ok = mnesia:delete_schema([node()]),
ok. ok.
message(Topic, Payload, PublishedAt) ->
#message{
topic = Topic,
payload = Payload,
timestamp = PublishedAt,
id = emqx_guid:gen()
}.

View File

@ -0,0 +1 @@
Added time-based message garbage collection to the RocksDB-based persistent session backend.

View File

@ -1608,5 +1608,8 @@ The session will query the DB for the new messages when the value of `FreeSpace`
`FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages.""" `FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages."""
session_ds_message_retention_period.desc:
"""The minimum amount of time that messages should be retained for. After messages have been in storage for at least this period of time, they'll be dropped."""
} }