Merge pull request #12831 from thalesmg/ds-checkpoint-clean-m-20240404

feat(ds): clear all checkpoints when (re)starting storage layer
This commit is contained in:
Thales Macedo Garitezi 2024-04-04 15:41:50 -03:00 committed by GitHub
commit 8d58b40f33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 24 additions and 2 deletions

View File

@ -500,6 +500,7 @@ init({ShardId, Options}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}), logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}),
erase_schema_runtime(ShardId), erase_schema_runtime(ShardId),
clear_all_checkpoints(ShardId),
{ok, DB, CFRefs0} = rocksdb_open(ShardId, Options), {ok, DB, CFRefs0} = rocksdb_open(ShardId, Options),
{Schema, CFRefs} = {Schema, CFRefs} =
case get_schema_persistent(DB) of case get_schema_persistent(DB) of
@ -567,6 +568,23 @@ terminate(_Reason, #s{db = DB, shard_id = ShardId}) ->
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
-spec clear_all_checkpoints(shard_id()) -> ok.
clear_all_checkpoints(ShardId) ->
CheckpointBaseDir = checkpoints_dir(ShardId),
ok = filelib:ensure_path(CheckpointBaseDir),
{ok, AllFiles} = file:list_dir(CheckpointBaseDir),
CheckpointDirs = [Dir || Dir <- AllFiles, filelib:is_dir(Dir)],
lists:foreach(
fun(Dir) ->
logger:debug(#{
msg => "ds_storage_deleting_previous_checkpoint",
dir => Dir
}),
ok = file:del_dir_r(Dir)
end,
CheckpointDirs
).
-spec open_shard(shard_id(), rocksdb:db_handle(), cf_refs(), shard_schema()) -> -spec open_shard(shard_id(), rocksdb:db_handle(), cf_refs(), shard_schema()) ->
shard(). shard().
open_shard(ShardId, DB, CFRefs, ShardSchema) -> open_shard(ShardId, DB, CFRefs, ShardSchema) ->
@ -777,9 +795,13 @@ rocksdb_open(Shard, Options) ->
db_dir({DB, ShardId}) -> db_dir({DB, ShardId}) ->
filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]). filename:join([emqx_ds:base_dir(), DB, binary_to_list(ShardId)]).
-spec checkpoints_dir(shard_id()) -> file:filename().
checkpoints_dir({DB, ShardId}) ->
filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId)]).
-spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename(). -spec checkpoint_dir(shard_id(), _Name :: file:name()) -> file:filename().
checkpoint_dir({DB, ShardId}, Name) -> checkpoint_dir(ShardId, Name) ->
filename:join([emqx_ds:base_dir(), DB, checkpoints, binary_to_list(ShardId), Name]). filename:join([checkpoints_dir(ShardId), Name]).
-spec update_last_until(Schema, emqx_ds:time()) -> -spec update_last_until(Schema, emqx_ds:time()) ->
Schema | {error, exists | overlaps_existing_generations} Schema | {error, exists | overlaps_existing_generations}